diff options
author | Boris Kolpackov <boris@codesynthesis.com> | 2023-05-15 09:54:57 +0200 |
---|---|---|
committer | Boris Kolpackov <boris@codesynthesis.com> | 2023-05-15 12:40:02 +0200 |
commit | 58f2faa41b9e8002584f84c600bb0080cdfa9f99 (patch) | |
tree | 947f2138ed26287249f5a50ec85d0dffec6c40bb | |
parent | 91f15d09253e4ae369dafc1c11d4c72c7a93cb0c (diff) |
Implement priority lower bound calculation and machine interruption
-rw-r--r-- | bbot/agent/agent.cxx | 299 |
1 files changed, 240 insertions, 59 deletions
diff --git a/bbot/agent/agent.cxx b/bbot/agent/agent.cxx index edd6422..5c91025 100644 --- a/bbot/agent/agent.cxx +++ b/bbot/agent/agent.cxx @@ -5,7 +5,7 @@ #include <pwd.h> // getpwuid() #include <limits.h> // PATH_MAX -#include <signal.h> // signal() +#include <signal.h> // signal(), kill() #include <stdlib.h> // rand_r(), strto[u]ll() #include <string.h> // strchr() #include <unistd.h> // sleep(), getpid(), getuid(), fsync(), [f]stat() @@ -761,6 +761,33 @@ snapshot_path (const dir_path& tp) to_string (inst)); } +// Compare bbot and library versions returning -1 if older, 0 if the same, +// and +1 if newer. +// +static int +compare_bbot (const bootstrap_manifest& m) +{ + auto cmp = [&m] (const string& n, const char* v) -> int + { + standard_version sv (v); + auto i = m.versions.find (n); + + return (i == m.versions.end () || i->second < sv + ? -1 + : i->second > sv ? 1 : 0); + }; + + // Start from the top assuming a new dependency cannot be added without + // changing the dependent's version. + // + int r; + return ( + (r = cmp ("bbot", BBOT_VERSION_STR)) != 0 ? r : + (r = cmp ("libbbot", LIBBBOT_VERSION_STR)) != 0 ? r : + (r = cmp ("libbpkg", LIBBPKG_VERSION_STR)) != 0 ? r : + (r = cmp ("libbutl", LIBBUTL_VERSION_STR)) != 0 ? r : 0); +}; + // Return the global toolchain lock and the list of available machines, // (re-)bootstrapping them if necessary. // @@ -829,32 +856,6 @@ try return pr; } - // Compare bbot and library versions returning -1 if older, 0 if the same, - // and +1 if newer. - // - auto compare_bbot = [] (const bootstrap_manifest& m) -> int - { - auto cmp = [&m] (const string& n, const char* v) -> int - { - standard_version sv (v); - auto i = m.versions.find (n); - - return (i == m.versions.end () || i->second < sv - ? -1 - : i->second > sv ? 1 : 0); - }; - - // Start from the top assuming a new dependency cannot be added without - // changing the dependent's version. - // - int r; - return ( - (r = cmp ("bbot", BBOT_VERSION_STR)) != 0 ? r : - (r = cmp ("libbbot", LIBBBOT_VERSION_STR)) != 0 ? r : - (r = cmp ("libbpkg", LIBBPKG_VERSION_STR)) != 0 ? r : - (r = cmp ("libbutl", LIBBUTL_VERSION_STR)) != 0 ? r : 0); - }; - // Notice and warn if there are no machines (as opposed to all of them // being busy). // @@ -1063,6 +1064,8 @@ try // will probably be able to use the result). So we simply ignore // this machine for this run. // + // Note: see similar code in the machine interruption logic. + // optional<bootstrapped_machine_manifest> bmm; if (te) { @@ -2007,12 +2010,12 @@ try // none) as well as whether we we should operate in the priority monitor // mode. // - uint64_t prio_min (0); + uint64_t prio_min (~uint64_t (0)); uint64_t prio_max (0); bool prio_mon (false); { uint16_t busy (0); // Number of machines locked by other processes. - bool task (false); // There is a machine performaing a task. + bool task (false); // There is a machine performing a task. for (const bootstrapped_machine& m: ms) { @@ -2033,6 +2036,9 @@ try } } + if (prio_min > prio_max) // No tasks. + prio_min = prio_max; + if (inst_max != 0) { assert (busy <= inst_max); @@ -2053,14 +2059,6 @@ try } } - // @@ For now bail out if in the priority monitor mode. - // - if (prio_mon) - { - sleep = rand_sleep () / 2; - continue; - } - // If we get a task, these contain all the corresponding information. // task_request_manifest tq; @@ -2069,13 +2067,44 @@ try string url; // Iterate over controller priorities in reverse, that is, from highest to - // lowest. + // lowest (see the agent(1) man page for background on the priority + // levels). + // + // The following factors determine the lower bound of priorities we should + // consider: // - // @@ Note: doing it in terms of direct iterators in anticipation for - // lower_bound(). + // 1. If in the priority monitor mode, then we should only consider + // priorities that can interrupt the existing task with the lowest + // priority. + // + // Here is a representative sample of existing/interrupt priorities + // from which we derive the below formulae (remember that we only start + // interrupting from priority level 3): + // + // existing interrupt + // -------- --------- + // 5 >= 100 + // 55 >= 100 + // 555 >= 600 + // 999 >= 1000 + // 5055 >= 5100 + // 5555 >= 5600 + // 9999 >= 10000 + // + // Essentially, what we need to do is discard the lowest 2 levels and + // add 100, moving the priority to the next 3rd level. + // + // 2. Otherwise, we should factor in the "don't ask for lower-priority + // tasks" semantics that applies from the second priority level. + // + // Note also that the other half of this logic is below where we determine + // which machines we offer for each priority. // - auto cb (controllers.begin ()); auto ce (controllers.end ()); + auto cb (controllers.lower_bound ( + prio_mon ? ((prio_min / 100) * 100) + 100 : + prio_max >= 10 ? prio_max - 1 : // Including this priority. + 0)); // Any priority. for (; cb != ce; ) { @@ -2095,18 +2124,31 @@ try fingerprint, machine_header_manifests {}}; - // Note: do not assume tq.machines.size () == ms.size (). + // Determine which machines we need to offer for this priority. // for (const bootstrapped_machine& m: ms) { - // @@ For now skip machines locked by other processes. - // - // @@ Note: skip machines bootstrapping/suspended. - // - if (m.lock.locked ()) - tq.machines.emplace_back (m.manifest.machine.id, - m.manifest.machine.name, - m.manifest.machine.summary); + if (!m.lock.locked ()) + { + if (!m.lock.prio) // Skip bootstrapping/suspended. + continue; + + uint64_t eprio (*m.lock.prio); + + // Determine if our priority can interrupt the existing task. + // + // Based on the above discussion of the priority lower bound + // determination (and some menditation) it's clear that we can only + // interrupt the existing task if our priority is (at least) on a + // higher 3rd level. + // + if ((prio / 100) <= (eprio / 100)) + continue; + } + + tq.machines.emplace_back (m.manifest.machine.id, + m.manifest.machine.name, + m.manifest.machine.summary); } if (ops.dump_machines ()) @@ -2321,26 +2363,165 @@ try // Next find the corresponding bootstrapped_machine instance in ms. Also // unlock all the other machines. // - // @@ TODO: looks like this is also where we will interrupt the machines - // (thus inside the try block). Note that we have to do this - // while holding the toolchain lock. Would be good to - // unlock all the machines as well as the toolchain lock on - // failure. + // While at it also find the lowest priority candidate to interrupt if + // necessary. // + bootstrapped_machine* im (nullptr); for (bootstrapped_machine& m: ms) { if (m.manifest.machine.name == t.machine) { assert (pm == nullptr); // Sanity check. - - m.lock.perform_task (tl, prio); pm = &m; } - else + else if (m.lock.locked ()) m.lock.unlock (); + else if (m.lock.prio) // Not bootstrapping/suspended. + { + if (im == nullptr || *m.lock.prio < *im->lock.prio) + im = &m; + } } assert (pm != nullptr); + // Move the toolchain lock into this scope so that it's automatically + // released on any failure (on the happy path it is released by + // perform_task()). + // + toolchain_lock& rtl (tl); + toolchain_lock tl (move (rtl)); + + // See if we need to interrupt the selected machine (if busy) or one of + // the existing (if we are at the max allowed instances, that is in the + // priority monitor mode). + // + if (!pm->lock.locked ()) + im = pm; + else if (prio_mon) + assert (im != nullptr); // We should have at least one. + else + im = nullptr; // No interrupt necessary. + + if (im != nullptr) + { + assert (!im->lock.locked () && im->lock.prio); // Sanity checks. + + const dir_path& tp (im->path); // -<toolchain> path. + + l1 ([&]{trace << "interrupting " + << (im == pm ? "target" : "lowest priority") + << " machine " << tp << ", pid " << im->lock.pid;}); + + // The plan is to send the interrupt and then wait for the lock. + // + // Note that the interrupt cannot be "lost" (or attributed to a + // different task) since we are sending it while holding the global + // lock and the other process arms it also while holding the global + // lock. + // + // But what can happen is the other task becomes suspended, which we + // will not be able to interrupt. + // + if (kill (im->lock.pid, SIGUSR1) == -1) + { + // Ignore the case where there is no such process (e.g., the other + // process has terminated in which case the lock should be released + // automatically). + // + if (errno != ESRCH) + throw_generic_error (errno); + } + + // Try to lock the machine. + // + // While this normally shouldn't take long, there could be parts of + // the perform_task() logic that we do not interrupt and that may take + // some time. + // + machine_lock ml; + + size_t retry (0); + for (; retry != 31; ++retry) + { + if (retry != 0) + ::sleep (1); + + ml = lock_machine (tl, tp); + + if (ml.locked ()) + break; + + if (ml.pid != im->lock.pid) + { + error << "interrupted machine " << tp << " changed pid"; + throw interrupt (); + } + + if (!ml.prio) // Got suspended. + { + l2 ([&]{trace << "interrupted machine " << tp << " suspended";}); + throw interrupt (); + } + } + + if (!ml.locked ()) + { + warn << "unable to lock interrupted machine " << tp << " within " + << (retry - 1) << "s"; + throw interrupt (); + } + + // If the interrupted machine is what we will use, see if it needs a + // re-bootstrap, the same as in enumerate_machines(). If not, then + // transfer the bootstrap manifest and lock. + // + if (im == pm) + { + const machine_manifest& mm (im->manifest.machine); + + bootstrapped_machine_manifest bmm ( + parse_manifest<bootstrapped_machine_manifest> ( + tp / "manifest", "bootstrapped machine")); + + bool rb (false); + + if (bmm.machine.id != mm.id) + { + l3 ([&]{trace << "re-bootstrap " << tp << ": new machine";}); + rb = true; + } + + if (!tc_id.empty () && bmm.toolchain.id != tc_id) + { + l3 ([&]{trace << "re-bootstrap " << tp << ": new toolchain";}); + rb = true; + } + + if (int i = compare_bbot (bmm.bootstrap)) + { + if (i < 0) + { + l3 ([&]{trace << "re-bootstrap " << tp << ": new bbot";}); + rb = true; + } + else + { + l3 ([&]{trace << "ignoring " << tp << ": old bbot";}); + rb = true; + } + } + + // We are not going to try to re-bootstrap this machine "inline". + // + if (rb) + throw interrupt (); + + im->manifest = move (bmm); + im->lock = move (ml); + } + } + + pm->lock.perform_task (tl, prio); r = perform_task (move (tl), pm->lock, pm->path, pm->manifest, t); } catch (const interrupt&) @@ -2354,7 +2535,7 @@ try nullopt /* dependency_checksum */}; } - if (pm != nullptr) // Let's not assume. + if (pm != nullptr && pm->lock.locked ()) pm->lock.unlock (); // No need to hold the lock any longer. if (ops.dump_result ()) |