From 086567572e4a4172f68b6a5a246598fe9d84e132 Mon Sep 17 00:00:00 2001 From: Boris Kolpackov Date: Wed, 10 May 2023 09:06:38 +0200 Subject: Add global toolchain locking to agent This is the ground work for the task priority/interrupt support. --- bbot/agent/agent.cli | 10 + bbot/agent/agent.cxx | 656 +++++++++++++++++++++++++++++++-------------------- 2 files changed, 416 insertions(+), 250 deletions(-) diff --git a/bbot/agent/agent.cli b/bbot/agent/agent.cli index 22dc088..aa7eb59 100644 --- a/bbot/agent/agent.cli +++ b/bbot/agent/agent.cli @@ -65,6 +65,16 @@ namespace bbot interfaces, etc." } + string --toolchain-lock // Note: string to allow empty path. + { + "", + "Absolute path to the global toolchain lock file. If unspecified, then + \c{\b{/var/lock/bbot-agent-}\i{toolchain-name}\b{.lock}} is used by + default. If empty path is specified then no global locking is + performed. If one of the \cb{--fake-*} options is specified, then no + locking is performed by default." + } + standard_version --toolchain-ver { "", diff --git a/bbot/agent/agent.cxx b/bbot/agent/agent.cxx index ed90da0..17e3ac5 100644 --- a/bbot/agent/agent.cxx +++ b/bbot/agent/agent.cxx @@ -56,6 +56,7 @@ namespace bbot string tc_name; uint16_t tc_num; + path tc_lock; // Empty if no locking. standard_version tc_ver; string tc_id; @@ -362,7 +363,103 @@ bootstrap_machine (const dir_path& md, return r; } -// Machine locking. +// Global toolchain lock. +// +// The overall locking protocol is as follows: +// +// 1. Before enumerating the machines each agent instance acquires the global +// toolchain lock. +// +// 2. As the agent enumerates over the machines, it tries to acquire the lock +// for each machine. +// +// 3. If the agent encounters a machine that it needs to bootstrap, it +// releases all the other machine locks followed by the global lock, +// proceeds to bootstrap the machine, releases its lock, and restarts the +// process from scratch. +// +// 4. Otherwise, upon receiving a task response for one of the machines, the +// agent releases all the other machine locks followed by the global lock, +// proceeds to perform the task on the selected machine, releases its lock, +// and restarts the process from scratch. +// +// One notable implication of this protocol is that the machine locks are +// only acquired while holding the global toolchain lock but can be released +// while not holding this lock. +// +// (Note that because of this implication it can theoretically be possible +// to omit acquiring all the machine locks during the enumeration process, +// instead only acquiring the lock of the machine we need to bootstrap or +// build. However, the current approach is simpler since we still need +// to detect machines that are already locked, which entails acquiring +// the lock anyway.) +// +// Note that unlike the machine lock below, here we don't bother with removing +// the lock file. +// +class toolchain_lock +{ +public: + toolchain_lock () = default; // Empty lock. + + ~toolchain_lock () + { + unlock (true /* ignore_errors */); + } + + void + unlock (bool ignore_errors = false) + { + if (fl_) + { + fl_ = false; // We have tried. + + if (flock (fd_.get (), LOCK_UN) != 0 && !ignore_errors) + throw_generic_error (errno); + } + } + + toolchain_lock (toolchain_lock&&) = default; + toolchain_lock& operator= (toolchain_lock&&) = default; + + toolchain_lock (const toolchain_lock&) = delete; + toolchain_lock& operator= (const toolchain_lock&) = delete; + + // Implementation details. + // +public: + explicit + toolchain_lock (auto_fd&& fd) + : fd_ (move (fd)), fl_ (true) {} + +private: + auto_fd fd_; + bool fl_ = false; +}; + +// Note: returns empty lock if toolchain locking is disabled. +// +static optional +lock_toolchain (unsigned int timeout) +{ + if (tc_lock.empty ()) + return toolchain_lock (); + + auto_fd fd (fdopen (tc_lock, fdopen_mode::out | fdopen_mode::create)); + + for (; flock (fd.get (), LOCK_EX | LOCK_NB) != 0; sleep (1), --timeout) + { + if (errno != EWOULDBLOCK) + throw_generic_error (errno); + + if (timeout == 0) + return nullopt; + } + + return toolchain_lock (move (fd)); +} + +// Per-toolchain machine lock. // // We use flock(2) which is straightforward. The tricky part is cleaning the // file up. Here we may have a race when two processes are trying to open & @@ -445,7 +542,7 @@ lock_machine (const dir_path& tp) if (st1.st_ino == st2.st_ino) return machine_lock (move (fp), move (fd)); - // Note: unlocked by close(). + // Retry (note: lock is unlocked by auto_fd::close()). } } @@ -472,319 +569,352 @@ struct bootstrapped_machine }; using bootstrapped_machines = vector; -static bootstrapped_machines +static pair enumerate_machines (const dir_path& machines) try { tracer trace ("enumerate_machines", machines.string ().c_str ()); - bootstrapped_machines r; - - if (ops.fake_machine_specified ()) + for (;;) // From-scratch retry loop for after bootstrap (see below). { - auto mh ( - parse_manifest ( - ops.fake_machine (), "machine header")); - - r.push_back ( - bootstrapped_machine { - dir_path (ops.machines ()) /= mh.name, // For diagnostics. - bootstrapped_machine_manifest { - machine_manifest { - move (mh.id), - move (mh.name), - move (mh.summary), - machine_type::kvm, - string ("de:ad:be:ef:de:ad"), - nullopt, - strings ()}, - toolchain_manifest {tc_id}, - bootstrap_manifest {}}, - machine_lock ()}); + pair pr; - return r; - } + { + optional l; + while (!(l = lock_toolchain (60 /* seconds */))) + { + warn << "unable to acquire global toolchain lock " << tc_lock + << " for 60s"; + } + pr.first = move (*l); + } - // Notice and warn if there are no machines (as opposed to all of them being - // locked). - // - bool none (true); + bootstrapped_machines& r (pr.second); - // The first level are machine volumes. - // - for (const dir_entry& ve: dir_iterator (machines, dir_iterator::no_follow)) - { - const string vn (ve.path ().string ()); + if (ops.fake_machine_specified ()) + { + auto mh ( + parse_manifest ( + ops.fake_machine (), "machine header")); + + r.push_back ( + bootstrapped_machine { + dir_path (ops.machines ()) /= mh.name, // For diagnostics. + bootstrapped_machine_manifest { + machine_manifest { + move (mh.id), + move (mh.name), + move (mh.summary), + machine_type::kvm, + string ("de:ad:be:ef:de:ad"), + nullopt, + strings ()}, + toolchain_manifest {tc_id}, + bootstrap_manifest {}}, + machine_lock ()}); + + return pr; + } - // Ignore hidden directories. + // Notice and warn if there are no machines (as opposed to all of them + // being locked). // - if (ve.type () != entry_type::directory || vn[0] == '.') - continue; - - const dir_path vd (dir_path (machines) /= vn); + bool none (true); - // Inside we have machines. + // The first level are machine volumes. // - try + bool scratch (false); + for (const dir_entry& ve: dir_iterator (machines, dir_iterator::no_follow)) { - for (const dir_entry& me: dir_iterator (vd, dir_iterator::no_follow)) - { - const string mn (me.path ().string ()); + const string vn (ve.path ().string ()); - if (me.type () != entry_type::directory || mn[0] == '.') - continue; + // Ignore hidden directories. + // + if (ve.type () != entry_type::directory || vn[0] == '.') + continue; - const dir_path md (dir_path (vd) /= mn); + const dir_path vd (dir_path (machines) /= vn); - // Our endgoal here is to obtain a bootstrapped snapshot of this - // machine while watching out for potential race conditions (other - // instances as well as machines being added/upgraded/removed; see the - // manual for details). - // - // So here is our overall plan: - // - // 1. Resolve current subvolume link for our bootstrap protocol. - // - // 2. Lock the machine. This excludes any other instance from trying - // to perform the following steps. - // - // 3. If there is no link, cleanup old bootstrap (if any) and ignore - // this machine. - // - // 4. Try to create a snapshot of current subvolume (this operation is - // atomic). If failed (e.g., someone changed the link and removed - // the subvolume in the meantime), retry from #1. - // - // 5. Compare the snapshot to the already bootstrapped version (if - // any) and see if we need to re-bootstrap. If so, use the snapshot - // as a starting point. Rename to bootstrapped at the end (atomic). - // - dir_path lp (dir_path (md) /= (mn + '-' + bs_prot)); // -

- dir_path tp (dir_path (md) /= (mn + '-' + tc_name)); // - - - auto delete_bootstrapped = [&tp, &trace] () // Delete -. + // Inside we have machines. + // + try + { + for (const dir_entry& me: dir_iterator (vd, dir_iterator::no_follow)) { - run_btrfs (trace, "property", "set", "-ts", tp, "ro", "false"); - run_btrfs (trace, "subvolume", "delete", tp); - }; + const string mn (me.path ().string ()); - for (size_t retry (0);; ++retry) - { - if (retry != 0) - sleep (1); + if (me.type () != entry_type::directory || mn[0] == '.') + continue; + + const dir_path md (dir_path (vd) /= mn); - // Resolve the link to subvolume path. + // Our endgoal here is to obtain a bootstrapped snapshot of this + // machine while watching out for potential race conditions (other + // instances as well as machines being added/upgraded/removed; see + // the manual for details). + // + // So here is our overall plan: + // + // 1. Resolve current subvolume link for our bootstrap protocol. + // + // 2. Lock the machine. This excludes any other instance from trying + // to perform the following steps. // - dir_path sp; // -

. + // 3. If there is no link, cleanup old bootstrap (if any) and ignore + // this machine. + // + // 4. Try to create a snapshot of current subvolume (this operation + // is atomic). If failed (e.g., someone changed the link and + // removed the subvolume in the meantime), retry from #1. + // + // 5. Compare the snapshot to the already bootstrapped version (if + // any) and see if we need to re-bootstrap. If so, use the + // snapshot as a starting point. Rename to bootstrapped at the + // end (atomic). + // + dir_path lp (dir_path (md) /= (mn + '-' + bs_prot)); // -

+ dir_path tp (dir_path (md) /= (mn + '-' + tc_name)); // - - try + auto delete_bootstrapped = [&tp, &trace] () // Delete -. { - sp = path_cast (readsymlink (lp)); + run_btrfs (trace, "property", "set", "-ts", tp, "ro", "false"); + run_btrfs (trace, "subvolume", "delete", tp); + }; - if (sp.relative ()) - sp = md / sp; - } - catch (const system_error& e) + for (size_t retry (0);; ++retry) { - // Leave the subvolume path empty if the subvolume link doesn't - // exist and fail on any other error. - // - if (e.code ().category () != std::generic_category () || - e.code ().value () != ENOENT) - fail << "unable to read subvolume link " << lp << ": " << e; - } + if (retry != 0) + sleep (1); - none = none && sp.empty (); + // Resolve the link to subvolume path. + // + dir_path sp; // -

. - // Try to lock the machine, skipping it if already locked. - // - optional ml (lock_machine (tp)); + try + { + sp = path_cast (readsymlink (lp)); - if (!ml) - { - l4 ([&]{trace << "skipping " << md << ": locked";}); - break; - } + if (sp.relative ()) + sp = md / sp; + } + catch (const system_error& e) + { + // Leave the subvolume path empty if the subvolume link doesn't + // exist and fail on any other error. + // + if (e.code ().category () != std::generic_category () || + e.code ().value () != ENOENT) + fail << "unable to read subvolume link " << lp << ": " << e; + } - bool te (dir_exists (tp)); + none = none && sp.empty (); - // If the resolution fails, then this means there is no current - // machine subvolume (for this bootstrap protocol). In this case we - // clean up our toolchain subvolume (-, if any) and - // ignore this machine. - // - if (sp.empty ()) - { - if (te) - delete_bootstrapped (); - - l3 ([&]{trace << "skipping " << md << ": no subvolume link";}); - break; - } + // Try to lock the machine, skipping it if already locked. + // + optional ml (lock_machine (tp)); - // -- - // - const dir_path xp (snapshot_path (tp)); + if (!ml) + { + l4 ([&]{trace << "skipping " << md << ": locked";}); + break; + } - if (btrfs_exit (trace, "subvolume", "snapshot", sp, xp) != 0) - { - if (retry >= 10) - fail << "unable to snapshot subvolume " << sp; + bool te (dir_exists (tp)); - continue; - } + // If the resolution fails, then this means there is no current + // machine subvolume (for this bootstrap protocol). In this case + // we clean up our toolchain subvolume (-, if any) and + // ignore this machine. + // + if (sp.empty ()) + { + if (te) + delete_bootstrapped (); - // Load the (original) machine manifest. - // - auto mm ( - parse_manifest (sp / "manifest", "machine")); + l3 ([&]{trace << "skipping " << md << ": no subvolume link";}); + break; + } - // If we already have -, see if it needs to be re- - // bootstrapped. Things that render it obsolete: - // - // 1. New machine revision (compare machine ids). - // 2. New toolchain (compare toolchain ids). - // 3. New bbot/libbbot (compare versions). - // - // The last case has a complication: what should we do if we have - // bootstrapped a newer version of bbot? This would mean that we are - // about to be stopped and upgraded (and the upgraded version will - // probably be able to use the result). So we simply ignore this - // machine for this run. + // -- + // + const dir_path xp (snapshot_path (tp)); - // Return -1 if older, 0 if the same, and +1 if newer. - // - auto compare_bbot = [] (const bootstrap_manifest& m) -> int - { - auto cmp = [&m] (const string& n, const char* v) -> int + if (btrfs_exit (trace, "subvolume", "snapshot", sp, xp) != 0) { - standard_version sv (v); - auto i = m.versions.find (n); + if (retry >= 10) + fail << "unable to snapshot subvolume " << sp; - return (i == m.versions.end () || i->second < sv - ? -1 - : i->second > sv ? 1 : 0); - }; + continue; + } - // Start from the top assuming a new dependency cannot be added - // without changing the dependent's version. + // Load the (original) machine manifest. // - int r; - return - (r = cmp ("bbot", BBOT_VERSION_STR)) != 0 ? r : - (r = cmp ("libbbot", LIBBBOT_VERSION_STR)) != 0 ? r : - (r = cmp ("libbpkg", LIBBPKG_VERSION_STR)) != 0 ? r : - (r = cmp ("libbutl", LIBBUTL_VERSION_STR)) != 0 ? r : 0; - }; + auto mm ( + parse_manifest (sp / "manifest", "machine")); - optional bmm; - if (te) - { - bmm = parse_manifest ( - tp / "manifest", "bootstrapped machine"); + // If we already have -, see if it needs to be + // re-bootstrapped. Things that render it obsolete: + // + // 1. New machine revision (compare machine ids). + // 2. New toolchain (compare toolchain ids). + // 3. New bbot/libbbot (compare versions). + // + // The last case has a complication: what should we do if we have + // bootstrapped a newer version of bbot? This would mean that we + // are about to be stopped and upgraded (and the upgraded version + // will probably be able to use the result). So we simply ignore + // this machine for this run. - if (bmm->machine.id != mm.id) + // Return -1 if older, 0 if the same, and +1 if newer. + // + auto compare_bbot = [] (const bootstrap_manifest& m) -> int { - l3 ([&]{trace << "re-bootstrapping " << tp << ": new machine";}); - te = false; - } + auto cmp = [&m] (const string& n, const char* v) -> int + { + standard_version sv (v); + auto i = m.versions.find (n); + + return (i == m.versions.end () || i->second < sv + ? -1 + : i->second > sv ? 1 : 0); + }; + + // Start from the top assuming a new dependency cannot be added + // without changing the dependent's version. + // + int r; + return ( + (r = cmp ("bbot", BBOT_VERSION_STR)) != 0 ? r : + (r = cmp ("libbbot", LIBBBOT_VERSION_STR)) != 0 ? r : + (r = cmp ("libbpkg", LIBBPKG_VERSION_STR)) != 0 ? r : + (r = cmp ("libbutl", LIBBUTL_VERSION_STR)) != 0 ? r : 0); + }; - if (!tc_id.empty () && bmm->toolchain.id != tc_id) + optional bmm; + if (te) { - l3 ([&]{trace << "re-bootstrapping " << tp << ": new toolchain";}); - te = false; - } + bmm = parse_manifest ( + tp / "manifest", "bootstrapped machine"); - if (int i = compare_bbot (bmm->bootstrap)) - { - if (i < 0) + if (bmm->machine.id != mm.id) { - l3 ([&]{trace << "re-bootstrapping " << tp << ": new bbot";}); + l3 ([&]{trace << "re-bootstrapping " << tp << ": new machine";}); te = false; } - else + + if (!tc_id.empty () && bmm->toolchain.id != tc_id) { - l3 ([&]{trace << "ignoring " << tp << ": old bbot";}); - run_btrfs (trace, "subvolume", "delete", xp); - break; + l3 ([&]{trace << "re-bootstrapping " << tp << ": new toolchain";}); + te = false; } - } - if (!te) - delete_bootstrapped (); - } - else - l3 ([&]{trace << "bootstrapping " << tp;}); - - if (!te) - { - // Use the -- snapshot that we have made to - // bootstrap the new machine. Then atomically rename it to - // -. - // - // Also release all the machine locks that we have acquired so far - // since the bootstrap will take a while and other instances might - // be able to use them. - // - r.clear (); - - bmm = bootstrap_machine (xp, mm, move (bmm)); + if (int i = compare_bbot (bmm->bootstrap)) + { + if (i < 0) + { + l3 ([&]{trace << "re-bootstrapping " << tp << ": new bbot";}); + te = false; + } + else + { + l3 ([&]{trace << "ignoring " << tp << ": old bbot";}); + run_btrfs (trace, "subvolume", "delete", xp); + break; + } + } - if (!bmm) - { - l3 ([&]{trace << "ignoring " << tp << ": failed to bootstrap";}); - run_btrfs (trace, "subvolume", "delete", xp); - break; + if (!te) + delete_bootstrapped (); } + else + l3 ([&]{trace << "bootstrapping " << tp;}); - try - { - mvdir (xp, tp); - } - catch (const system_error& e) + if (!te) { - fail << "unable to rename " << xp << " to " << tp; - } + // Use the -- snapshot that we have made + // to bootstrap the new machine. Then atomically rename it to + // -. + // + // Also release all the machine locks that we have acquired so + // far as well as the global toolchain lock, since the bootstrap + // will take a while and other instances might be able to use + // them. Because we are releasing the global lock, we have to + // restart the enumeration process from scratch. + // + r.clear (); + pr.first.unlock (); + scratch = true; + + bmm = bootstrap_machine (xp, mm, move (bmm)); + + if (!bmm) + { + l3 ([&]{trace << "ignoring " << tp << ": failed to bootstrap";}); + run_btrfs (trace, "subvolume", "delete", xp); + break; + } - l2 ([&]{trace << "bootstrapped " << bmm->machine.name;}); + try + { + mvdir (xp, tp); + } + catch (const system_error& e) + { + fail << "unable to rename " << xp << " to " << tp; + } - // Check the bootstrapped bbot version as above and ignore this - // machine if it's newer than us. - // - if (int i = compare_bbot (bmm->bootstrap)) - { - if (i > 0) + l2 ([&]{trace << "bootstrapped " << bmm->machine.name;}); + + // Check the bootstrapped bbot version as above and ignore this + // machine if it's newer than us. + // + if (int i = compare_bbot (bmm->bootstrap)) { - l3 ([&]{trace << "ignoring " << tp << ": old bbot";}); - break; + if (i > 0) + l3 ([&]{trace << "ignoring " << tp << ": old bbot";}); + else + warn << "bootstrapped " << tp << " bbot worker is older " + << "than agent; assuming test setup"; } - else - warn << "bootstrapped " << tp << " bbot worker is older " - << "than agent; assuming test setup"; + + break; // Restart from scratch. } - } - else - run_btrfs (trace, "subvolume", "delete", xp); + else + run_btrfs (trace, "subvolume", "delete", xp); - // Add the machine to the lists. - // - r.push_back ( - bootstrapped_machine {move (tp), move (*bmm), move (*ml)}); + // Add the machine to the lists. + // + r.push_back ( + bootstrapped_machine {move (tp), move (*bmm), move (*ml)}); + break; + } // Retry loop. + + if (scratch) + break; + + } // Inner dir_iterator loop. + + if (scratch) break; - } } - } - catch (const system_error& e) - { - fail << "unable to iterate over " << vd << ": " << e; - } - } + catch (const system_error& e) + { + fail << "unable to iterate over " << vd << ": " << e; + } + } // Outer dir_iterator loop. - if (none) - warn << "no build machines for toolchain " << tc_name; + if (scratch) + continue; - return r; + if (none) + warn << "no build machines for toolchain " << tc_name; + + return pr; + + } // From-scratch retry loop. + + // Unreachable. } catch (const system_error& e) { @@ -1296,6 +1426,25 @@ try tc_name = ops.toolchain_name (); tc_num = ops.toolchain_num (); + + if (ops.toolchain_lock_specified ()) + { + const string& l (ops.toolchain_lock ()); + + if (!l.empty ()) + { + tc_lock = path (l); + + if (!tc_lock.absolute ()) + fail << "--toolchain-lock value '" << l << "' should be absolute path"; + } + } + else if (!(ops.fake_bootstrap () || + ops.fake_build () || + ops.fake_machine_specified () || + ops.fake_request_specified ())) + tc_lock = path ("/var/lock/bbot-agent-" + tc_name + ".lock"); + tc_ver = (ops.toolchain_ver_specified () ? ops.toolchain_ver () : standard_version (BBOT_VERSION_STR)); @@ -1439,7 +1588,11 @@ try for (unsigned int sleep (0);; ::sleep (sleep), sleep = 0) { - bootstrapped_machines ms (enumerate_machines (ops.machines ())); + pair er ( + enumerate_machines (ops.machines ())); + + toolchain_lock& tl (er.first); + bootstrapped_machines& ms (er.second); // Prepare task request. // @@ -1611,7 +1764,8 @@ try // We have a build task. // // First find the index of the machine we were asked to use (and verify it - // is one of those we sent). Also unlock all the other machines. + // is one of those we sent). Also unlock all the other machines as well as + // the global toolchain lock. // size_t i (ms.size ()); for (size_t j (0); j != ms.size (); ++j) @@ -1622,6 +1776,8 @@ try ms[j].lock.unlock (); } + tl.unlock (); + if (i == ms.size ()) { error << "task from " << url << " for unknown machine " -- cgit v1.1