aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bbot/agent/agent.cxx299
1 files changed, 240 insertions, 59 deletions
diff --git a/bbot/agent/agent.cxx b/bbot/agent/agent.cxx
index edd6422..5c91025 100644
--- a/bbot/agent/agent.cxx
+++ b/bbot/agent/agent.cxx
@@ -5,7 +5,7 @@
#include <pwd.h> // getpwuid()
#include <limits.h> // PATH_MAX
-#include <signal.h> // signal()
+#include <signal.h> // signal(), kill()
#include <stdlib.h> // rand_r(), strto[u]ll()
#include <string.h> // strchr()
#include <unistd.h> // sleep(), getpid(), getuid(), fsync(), [f]stat()
@@ -761,6 +761,33 @@ snapshot_path (const dir_path& tp)
to_string (inst));
}
+// Compare bbot and library versions returning -1 if older, 0 if the same,
+// and +1 if newer.
+//
+static int
+compare_bbot (const bootstrap_manifest& m)
+{
+ auto cmp = [&m] (const string& n, const char* v) -> int
+ {
+ standard_version sv (v);
+ auto i = m.versions.find (n);
+
+ return (i == m.versions.end () || i->second < sv
+ ? -1
+ : i->second > sv ? 1 : 0);
+ };
+
+ // Start from the top assuming a new dependency cannot be added without
+ // changing the dependent's version.
+ //
+ int r;
+ return (
+ (r = cmp ("bbot", BBOT_VERSION_STR)) != 0 ? r :
+ (r = cmp ("libbbot", LIBBBOT_VERSION_STR)) != 0 ? r :
+ (r = cmp ("libbpkg", LIBBPKG_VERSION_STR)) != 0 ? r :
+ (r = cmp ("libbutl", LIBBUTL_VERSION_STR)) != 0 ? r : 0);
+};
+
// Return the global toolchain lock and the list of available machines,
// (re-)bootstrapping them if necessary.
//
@@ -829,32 +856,6 @@ try
return pr;
}
- // Compare bbot and library versions returning -1 if older, 0 if the same,
- // and +1 if newer.
- //
- auto compare_bbot = [] (const bootstrap_manifest& m) -> int
- {
- auto cmp = [&m] (const string& n, const char* v) -> int
- {
- standard_version sv (v);
- auto i = m.versions.find (n);
-
- return (i == m.versions.end () || i->second < sv
- ? -1
- : i->second > sv ? 1 : 0);
- };
-
- // Start from the top assuming a new dependency cannot be added without
- // changing the dependent's version.
- //
- int r;
- return (
- (r = cmp ("bbot", BBOT_VERSION_STR)) != 0 ? r :
- (r = cmp ("libbbot", LIBBBOT_VERSION_STR)) != 0 ? r :
- (r = cmp ("libbpkg", LIBBPKG_VERSION_STR)) != 0 ? r :
- (r = cmp ("libbutl", LIBBUTL_VERSION_STR)) != 0 ? r : 0);
- };
-
// Notice and warn if there are no machines (as opposed to all of them
// being busy).
//
@@ -1063,6 +1064,8 @@ try
// will probably be able to use the result). So we simply ignore
// this machine for this run.
//
+ // Note: see similar code in the machine interruption logic.
+ //
optional<bootstrapped_machine_manifest> bmm;
if (te)
{
@@ -2007,12 +2010,12 @@ try
// none) as well as whether we we should operate in the priority monitor
// mode.
//
- uint64_t prio_min (0);
+ uint64_t prio_min (~uint64_t (0));
uint64_t prio_max (0);
bool prio_mon (false);
{
uint16_t busy (0); // Number of machines locked by other processes.
- bool task (false); // There is a machine performaing a task.
+ bool task (false); // There is a machine performing a task.
for (const bootstrapped_machine& m: ms)
{
@@ -2033,6 +2036,9 @@ try
}
}
+ if (prio_min > prio_max) // No tasks.
+ prio_min = prio_max;
+
if (inst_max != 0)
{
assert (busy <= inst_max);
@@ -2053,14 +2059,6 @@ try
}
}
- // @@ For now bail out if in the priority monitor mode.
- //
- if (prio_mon)
- {
- sleep = rand_sleep () / 2;
- continue;
- }
-
// If we get a task, these contain all the corresponding information.
//
task_request_manifest tq;
@@ -2069,13 +2067,44 @@ try
string url;
// Iterate over controller priorities in reverse, that is, from highest to
- // lowest.
+ // lowest (see the agent(1) man page for background on the priority
+ // levels).
+ //
+ // The following factors determine the lower bound of priorities we should
+ // consider:
//
- // @@ Note: doing it in terms of direct iterators in anticipation for
- // lower_bound().
+ // 1. If in the priority monitor mode, then we should only consider
+ // priorities that can interrupt the existing task with the lowest
+ // priority.
+ //
+ // Here is a representative sample of existing/interrupt priorities
+ // from which we derive the below formulae (remember that we only start
+ // interrupting from priority level 3):
+ //
+ // existing interrupt
+ // -------- ---------
+ // 5 >= 100
+ // 55 >= 100
+ // 555 >= 600
+ // 999 >= 1000
+ // 5055 >= 5100
+ // 5555 >= 5600
+ // 9999 >= 10000
+ //
+ // Essentially, what we need to do is discard the lowest 2 levels and
+ // add 100, moving the priority to the next 3rd level.
+ //
+ // 2. Otherwise, we should factor in the "don't ask for lower-priority
+ // tasks" semantics that applies from the second priority level.
+ //
+ // Note also that the other half of this logic is below where we determine
+ // which machines we offer for each priority.
//
- auto cb (controllers.begin ());
auto ce (controllers.end ());
+ auto cb (controllers.lower_bound (
+ prio_mon ? ((prio_min / 100) * 100) + 100 :
+ prio_max >= 10 ? prio_max - 1 : // Including this priority.
+ 0)); // Any priority.
for (; cb != ce; )
{
@@ -2095,18 +2124,31 @@ try
fingerprint,
machine_header_manifests {}};
- // Note: do not assume tq.machines.size () == ms.size ().
+ // Determine which machines we need to offer for this priority.
//
for (const bootstrapped_machine& m: ms)
{
- // @@ For now skip machines locked by other processes.
- //
- // @@ Note: skip machines bootstrapping/suspended.
- //
- if (m.lock.locked ())
- tq.machines.emplace_back (m.manifest.machine.id,
- m.manifest.machine.name,
- m.manifest.machine.summary);
+ if (!m.lock.locked ())
+ {
+ if (!m.lock.prio) // Skip bootstrapping/suspended.
+ continue;
+
+ uint64_t eprio (*m.lock.prio);
+
+ // Determine if our priority can interrupt the existing task.
+ //
+ // Based on the above discussion of the priority lower bound
+ // determination (and some menditation) it's clear that we can only
+ // interrupt the existing task if our priority is (at least) on a
+ // higher 3rd level.
+ //
+ if ((prio / 100) <= (eprio / 100))
+ continue;
+ }
+
+ tq.machines.emplace_back (m.manifest.machine.id,
+ m.manifest.machine.name,
+ m.manifest.machine.summary);
}
if (ops.dump_machines ())
@@ -2321,26 +2363,165 @@ try
// Next find the corresponding bootstrapped_machine instance in ms. Also
// unlock all the other machines.
//
- // @@ TODO: looks like this is also where we will interrupt the machines
- // (thus inside the try block). Note that we have to do this
- // while holding the toolchain lock. Would be good to
- // unlock all the machines as well as the toolchain lock on
- // failure.
+ // While at it also find the lowest priority candidate to interrupt if
+ // necessary.
//
+ bootstrapped_machine* im (nullptr);
for (bootstrapped_machine& m: ms)
{
if (m.manifest.machine.name == t.machine)
{
assert (pm == nullptr); // Sanity check.
-
- m.lock.perform_task (tl, prio);
pm = &m;
}
- else
+ else if (m.lock.locked ())
m.lock.unlock ();
+ else if (m.lock.prio) // Not bootstrapping/suspended.
+ {
+ if (im == nullptr || *m.lock.prio < *im->lock.prio)
+ im = &m;
+ }
}
assert (pm != nullptr);
+ // Move the toolchain lock into this scope so that it's automatically
+ // released on any failure (on the happy path it is released by
+ // perform_task()).
+ //
+ toolchain_lock& rtl (tl);
+ toolchain_lock tl (move (rtl));
+
+ // See if we need to interrupt the selected machine (if busy) or one of
+ // the existing (if we are at the max allowed instances, that is in the
+ // priority monitor mode).
+ //
+ if (!pm->lock.locked ())
+ im = pm;
+ else if (prio_mon)
+ assert (im != nullptr); // We should have at least one.
+ else
+ im = nullptr; // No interrupt necessary.
+
+ if (im != nullptr)
+ {
+ assert (!im->lock.locked () && im->lock.prio); // Sanity checks.
+
+ const dir_path& tp (im->path); // -<toolchain> path.
+
+ l1 ([&]{trace << "interrupting "
+ << (im == pm ? "target" : "lowest priority")
+ << " machine " << tp << ", pid " << im->lock.pid;});
+
+ // The plan is to send the interrupt and then wait for the lock.
+ //
+ // Note that the interrupt cannot be "lost" (or attributed to a
+ // different task) since we are sending it while holding the global
+ // lock and the other process arms it also while holding the global
+ // lock.
+ //
+ // But what can happen is the other task becomes suspended, which we
+ // will not be able to interrupt.
+ //
+ if (kill (im->lock.pid, SIGUSR1) == -1)
+ {
+ // Ignore the case where there is no such process (e.g., the other
+ // process has terminated in which case the lock should be released
+ // automatically).
+ //
+ if (errno != ESRCH)
+ throw_generic_error (errno);
+ }
+
+ // Try to lock the machine.
+ //
+ // While this normally shouldn't take long, there could be parts of
+ // the perform_task() logic that we do not interrupt and that may take
+ // some time.
+ //
+ machine_lock ml;
+
+ size_t retry (0);
+ for (; retry != 31; ++retry)
+ {
+ if (retry != 0)
+ ::sleep (1);
+
+ ml = lock_machine (tl, tp);
+
+ if (ml.locked ())
+ break;
+
+ if (ml.pid != im->lock.pid)
+ {
+ error << "interrupted machine " << tp << " changed pid";
+ throw interrupt ();
+ }
+
+ if (!ml.prio) // Got suspended.
+ {
+ l2 ([&]{trace << "interrupted machine " << tp << " suspended";});
+ throw interrupt ();
+ }
+ }
+
+ if (!ml.locked ())
+ {
+ warn << "unable to lock interrupted machine " << tp << " within "
+ << (retry - 1) << "s";
+ throw interrupt ();
+ }
+
+ // If the interrupted machine is what we will use, see if it needs a
+ // re-bootstrap, the same as in enumerate_machines(). If not, then
+ // transfer the bootstrap manifest and lock.
+ //
+ if (im == pm)
+ {
+ const machine_manifest& mm (im->manifest.machine);
+
+ bootstrapped_machine_manifest bmm (
+ parse_manifest<bootstrapped_machine_manifest> (
+ tp / "manifest", "bootstrapped machine"));
+
+ bool rb (false);
+
+ if (bmm.machine.id != mm.id)
+ {
+ l3 ([&]{trace << "re-bootstrap " << tp << ": new machine";});
+ rb = true;
+ }
+
+ if (!tc_id.empty () && bmm.toolchain.id != tc_id)
+ {
+ l3 ([&]{trace << "re-bootstrap " << tp << ": new toolchain";});
+ rb = true;
+ }
+
+ if (int i = compare_bbot (bmm.bootstrap))
+ {
+ if (i < 0)
+ {
+ l3 ([&]{trace << "re-bootstrap " << tp << ": new bbot";});
+ rb = true;
+ }
+ else
+ {
+ l3 ([&]{trace << "ignoring " << tp << ": old bbot";});
+ rb = true;
+ }
+ }
+
+ // We are not going to try to re-bootstrap this machine "inline".
+ //
+ if (rb)
+ throw interrupt ();
+
+ im->manifest = move (bmm);
+ im->lock = move (ml);
+ }
+ }
+
+ pm->lock.perform_task (tl, prio);
r = perform_task (move (tl), pm->lock, pm->path, pm->manifest, t);
}
catch (const interrupt&)
@@ -2354,7 +2535,7 @@ try
nullopt /* dependency_checksum */};
}
- if (pm != nullptr) // Let's not assume.
+ if (pm != nullptr && pm->lock.locked ())
pm->lock.unlock (); // No need to hold the lock any longer.
if (ops.dump_result ())