From 4ec9106cdcce82d2f560f201b7f40691f5455da8 Mon Sep 17 00:00:00 2001 From: Boris Kolpackov Date: Tue, 19 Mar 2024 08:16:17 +0200 Subject: Implement auxiliary machine support in bbot-agent --- bbot/agent/agent.cli | 25 +- bbot/agent/agent.cxx | 1516 +++++++++++++++++++++++++++++++++++++++------- bbot/agent/agent.hxx | 4 +- bbot/agent/machine.cxx | 86 +-- bbot/agent/machine.hxx | 14 +- bbot/agent/tftp.hxx | 2 +- bbot/bbot-agent@.service | 2 + bbot/buildfile | 2 +- bbot/utility.hxx | 1 + 9 files changed, 1362 insertions(+), 290 deletions(-) (limited to 'bbot') diff --git a/bbot/agent/agent.cli b/bbot/agent/agent.cli index 060cba0..23765cf 100644 --- a/bbot/agent/agent.cli +++ b/bbot/agent/agent.cli @@ -119,7 +119,7 @@ namespace bbot "", "Toolchain number, 1 by default. If agents are running for several toolchains, then each of them should have a unique toolchain number - between 1 and 99. This number is used as an offset for network ports, + between 1 and 9. This number is used as an offset for network ports, interfaces, etc." } @@ -195,7 +195,7 @@ namespace bbot "Amount of RAM (in KiB) to use for the build machine, 4GiB by default." } - size_t --auxiliary-ram + size_t --auxiliary-ram = 0 { "", "Amount of RAM (in KiB) to use for auxiliary machines. To disable @@ -251,28 +251,35 @@ namespace bbot } // Low 23401+, 23501+, 23601+, etc., all look good collision-wise with - // with anything useful. + // anything useful. // uint16_t --tftp-port = 23400 { "", "TFTP server port base, 23400 by default. The actual port is calculated - by adding an offset calculated based on the toolchain and instance - numbers." + by adding an offset calculated based on the toolchain, instance, and + machine numbers." } size_t --bootstrap-startup = 300 { "", - "Maximum number of seconds to wait for machine bootstrap startup, + "Maximum number of seconds to wait for build machine bootstrap startup, 300 (5 minutes) by default." } size_t --bootstrap-timeout = 3600 { "", - "Maximum number of seconds to wait for machine bootstrap completion, - 3600 (60 minutes) by default." + "Maximum number of seconds to wait for build machine bootstrap + completion, 3600 (60 minutes) by default." + } + + size_t --bootstrap-auxiliary = 900 + { + "", + "Maximum number of seconds to wait for auxiliary machine bootstrap + completion, 900 (15 minutes) by default." } size_t --bootstrap-retries = 2 @@ -286,7 +293,7 @@ namespace bbot { "", "Maximum number of seconds to wait for build startup, 240 (4 minutes) by - default." + default. This value is used for both build and auxiliary machines." } size_t --build-timeout = 5400 diff --git a/bbot/agent/agent.cxx b/bbot/agent/agent.cxx index 8f860a2..5278eba 100644 --- a/bbot/agent/agent.cxx +++ b/bbot/agent/agent.cxx @@ -56,6 +56,27 @@ using namespace bbot; using std::cout; using std::endl; +// If RAM minimum is not specified for a machine, then let's assume something +// plausible like 256MiB. This way we won't end up with degenerate cases where +// we attempt to start a machine with some absurd amount of RAM. +// +const std::uint64_t default_ram_minimum = 252144; + +static inline std::uint64_t +effective_ram_minimum (const machine_header_manifest& m) +{ + // Note: neither ram_minimum nor ram_maximum should be 0. + // + assert ((!m.ram_minimum || *m.ram_minimum != 0) && + (!m.ram_maximum || *m.ram_maximum != 0)); + + return (m.ram_minimum + ? *m.ram_minimum + : (m.ram_maximum && *m.ram_maximum < default_ram_minimum + ? *m.ram_maximum + : default_ram_minimum)); +} + static std::mt19937 rand_gen (std::random_device {} ()); // According to the standard, atomic's use in the signal handler is only safe @@ -153,16 +174,16 @@ btrfs_exit (tracer& t, A&&... a) "btrfs", forward (a)...); } -// Bootstrap the machine. Return the bootstrapped machine manifest if -// successful and nullopt otherwise (in which case the machine directory -// should be cleaned and the machine ignored for now). +// Bootstrap a build machine. Return the bootstrapped machine manifest if +// successful and nullopt otherwise (in which case the caller should clean up +// the machine directory and ignore the machine for now). // static optional -bootstrap_machine (const dir_path& md, - const machine_manifest& mm, - optional obmm) +bootstrap_build_machine (const dir_path& md, + const machine_manifest& mm, + optional obmm) { - tracer trace ("bootstrap_machine", md.string ().c_str ()); + tracer trace ("bootstrap_build_machine", md.string ().c_str ()); bootstrapped_machine_manifest r { mm, @@ -184,10 +205,12 @@ bootstrap_machine (const dir_path& md, else try { + // Note: similar code in bootstrap_auxiliary_machine(). + // Start the TFTP server (server chroot is --tftp). Map: // - // GET requests to .../toolchains//* - // PUT requests to .../bootstrap/-/* + // GET requests to .../toolchains//* + // PUT requests to .../bootstrap/-/* // const string in_name (tc_name + '-' + to_string (inst)); auto_rmdir arm ((dir_path (ops.tftp ()) /= "bootstrap") /= in_name); @@ -211,7 +234,7 @@ bootstrap_machine (const dir_path& md, { tftp_server tftpd ("Gr ^/?(.+)$ /toolchains/" + tc_name + "/\\1\n" + "Pr ^/?(.+)$ /bootstrap/" + in_name + "/\\1\n", - ops.tftp_port () + offset); + ops.tftp_port () + offset + 0 /* build machine */); l3 ([&]{trace << "tftp server on port " << tftpd.port ();}); @@ -220,6 +243,9 @@ bootstrap_machine (const dir_path& md, unique_ptr m ( start_machine (md, mm, + 0 /* machine_num (build) */, + ops.cpu (), + ops.build_ram (), obmm ? obmm->machine.mac : nullopt, ops.bridge (), tftpd.port (), @@ -234,8 +260,11 @@ bootstrap_machine (const dir_path& md, make_exception_guard ( [&m, &md] () { - info << "trying to force machine " << md << " down"; - try {m->forcedown (false);} catch (const failed&) {} + if (m != nullptr) + { + info << "trying to force machine " << md << " down"; + try {m->forcedown (false);} catch (const failed&) {} + } })); // What happens if the bootstrap process hangs? The simple thing would @@ -260,7 +289,7 @@ bootstrap_machine (const dir_path& md, m->cleanup (); info << "resuming after machine suspension"; - // Note: snapshot cleaned up by the caller of bootstrap_machine(). + // Note: snapshot cleaned up by the caller. } catch (const failed&) {} @@ -313,8 +342,7 @@ bootstrap_machine (const dir_path& md, if (!check_machine ()) { - // Note: snapshot cleaned up by the caller of bootstrap_machine(). - return nullopt; + return nullopt; // Note: snapshot cleaned up by the caller. } } @@ -336,6 +364,7 @@ bootstrap_machine (const dir_path& md, m->print_info (dr); try {m->forcedown (false);} catch (const failed&) {} + m = nullptr; // Disable exceptions guard above. continue; } @@ -359,8 +388,7 @@ bootstrap_machine (const dir_path& md, // if (!(file_not_empty (mf) || file_not_empty (mfo))) { - // Note: snapshot cleaned up by the caller of bootstrap_machine(). - return nullopt; + return nullopt; // Note: snapshot cleaned up by the caller. } } @@ -411,6 +439,223 @@ bootstrap_machine (const dir_path& md, return r; } +// Bootstrap an auxiliary machine. Return the bootstrapped machine manifest if +// successful and nullopt otherwise (in which case the caller should clean up +// the machine directory and ignore the machine for now). +// +static vector +divide_auxiliary_ram (const vector&); + +static optional +bootstrap_auxiliary_machine (const dir_path& md, + const machine_manifest& mm, + optional obmm) +{ + tracer trace ("bootstrap_auxiliary_machine", md.string ().c_str ()); + + bootstrapped_machine_manifest r { + mm, + toolchain_manifest {}, // Unused for auxiliary, + bootstrap_manifest {} // Unused for auxiliary. + }; + + if (ops.fake_bootstrap ()) + { + r.machine.mac = "de:ad:be:ef:de:ad"; + } + else + try + { + // Similar to bootstrap_build_machine() except here we just wait for the + // upload of the environment. + + // Start the TFTP server (server chroot is --tftp). Map: + // + // GET requests to /dev/null + // PUT requests to .../bootstrap/-/* + // + const string in_name (tc_name + '-' + to_string (inst)); + auto_rmdir arm ((dir_path (ops.tftp ()) /= "bootstrap") /= in_name); + try_mkdir_p (arm.path); + + // Environment upload. + // + path ef (arm.path / "environment"); + try_rmfile (ef); + + // Note that unlike build, here we use the same VM snapshot for retries, + // which is not ideal. + // + for (size_t retry (0);; ++retry) + { + tftp_server tftpd ("Gr ^/?(.+)$ " + string ("/dev/null") + '\n' + + "Pr ^/?(.+)$ /bootstrap/" + in_name + "/\\1\n", + ops.tftp_port () + offset + 1 /* auxiliary machine */); + + l3 ([&]{trace << "tftp server on port " << tftpd.port ();}); + + // If the machine specified RAM minimum, use that to make sure the + // machine can actually function with this amount of RAM. Otherwise, use + // the minium of RAM maximum (if specified) and the available auxiliary + // RAM (so we know this machine will at least work alone). For the + // latter case use divide_auxiliary_ram() to be consistent with the + // build case (see that function implementation for nuances). + // + size_t ram; + if (mm.ram_minimum) + ram = *mm.ram_minimum; + else + { + vector rams (divide_auxiliary_ram ({&mm})); + assert (!rams.empty ()); // We should have skipped such a machine. + ram = rams.front (); + } + + // Start the machine. + // + unique_ptr m ( + start_machine (md, + mm, + 1 /* machine_num (first auxiliary) */, + ops.cpu (), + ram, + obmm ? obmm->machine.mac : nullopt, + ops.bridge (), + tftpd.port (), + false /* pub_vnc */)); + + { + // NOTE: see bootstrap_build_machine() for comments. + + auto mg ( + make_exception_guard ( + [&m, &md] () + { + if (m != nullptr) + { + info << "trying to force machine " << md << " down"; + try {m->forcedown (false);} catch (const failed&) {} + } + })); + + auto soft_fail = [&md, &m] (const char* msg) + { + { + diag_record dr (error); + dr << msg << " for machine " << md << ", suspending"; + m->print_info (dr); + } + + try + { + m->suspend (false); + m->wait (false); + m->cleanup (); + info << "resuming after machine suspension"; + + // Note: snapshot cleaned up by the caller. + } + catch (const failed&) {} + + return nullopt; + }; + + auto check_machine = [&md, &m] () + { + try + { + size_t t (0); + if (!m->wait (t /* seconds */, false /* fail_hard */)) + return true; // Still running. + + // Exited successfully. + } + catch (const failed&) + { + // Failed, exit code diagnostics has already been issued. + } + + diag_record dr (error); + dr << "machine " << md << " exited unexpectedly"; + m->print_info (dr); + + return false; + }; + + // Wait up to the specified timeout for the auxiliary machine to + // bootstrap. Note that such a machine may do extra setup work on the + // first boot (such as install some packages, etc) which may take some + // time. + // + size_t to; + const size_t bootstrap_to (ops.bootstrap_auxiliary ()); + const size_t shutdown_to (5 * 60); + + // Serve TFTP requests while periodically checking for the environment + // file. + // + for (to = bootstrap_to; to != 0; ) + { + if (tftpd.serve (to, 2)) + continue; + + if (!check_machine ()) + { + if (!file_not_empty (ef)) + { + return nullopt; // Note: snapshot cleaned up by the caller. + } + } + + if (file_not_empty (ef)) + { + if (!tftpd.serve (to, 5)) + break; + } + } + + if (to == 0) + { + if (retry > ops.bootstrap_retries ()) + return soft_fail ("bootstrap timeout"); + + // Note: keeping the logs behind (no cleanup). + + diag_record dr (warn); + dr << "machine " << mm.name << " mis-booted, retrying"; + m->print_info (dr); + + try {m->forcedown (false);} catch (const failed&) {} + m = nullptr; // Disable exceptions guard above. + continue; + } + + l3 ([&]{trace << "completed bootstrap in " << bootstrap_to - to << "s";}); + + // Shut the machine down cleanly. + // + if (!m->shutdown ((to = shutdown_to))) + return soft_fail ("bootstrap shutdown timeout"); + + l3 ([&]{trace << "completed shutdown in " << shutdown_to - to << "s";}); + + m->cleanup (); + } + + r.machine.mac = m->mac; // Save the MAC address. + + break; + } + } + catch (const system_error& e) + { + fail << "bootstrap error: " << e; + } + + serialize_manifest (r, md / "manifest", "bootstrapped machine"); + return r; +} + // Global toolchain lock. // // The overall locking protocol is as follows: @@ -426,10 +671,11 @@ bootstrap_machine (const dir_path& md, // proceeds to bootstrap the machine, releases its lock, and restarts the // process from scratch. // -// 4. Otherwise, upon receiving a task response for one of the machines, the -// agent releases all the other machine locks followed by the global lock, -// proceeds to perform the task on the selected machine, releases its lock, -// and restarts the process from scratch. +// 4. Otherwise, upon receiving a task response for one of the machines (plus, +// potentially, a number of auxiliary machines), the agent releases all the +// other machine locks followed by the global lock, proceeds to perform the +// task on the selected machine(s), releases their locks, and restarts the +// process from scratch. // // One notable implication of this protocol is that the machine locks are // only acquired while holding the global toolchain lock but can be released @@ -528,6 +774,13 @@ lock_toolchain (unsigned int timeout) // guaranteed to be atomic (in case later we want to support exclusive // bootstrap and shared build). // +// Note also that we per-toolchain lock auxiliary machines even though they +// are not toolchain-specific. Doing it this way allows us to handle both +// types of machines consistently with regards to priorities, interrupts, etc. +// It also means we will have each auxiliary machine available per-toolchain +// rather than a single machine shared between all the toolchains, which is +// a good thing. +// class machine_lock { public: @@ -807,10 +1060,13 @@ compare_bbot (const bootstrap_manifest& m) // bootstrapping/suspended machines have to be returned to get the correct // count of currently active instances for the inst_max comparison.) // +// Note that both build and auxiliary machines are returned. For auxiliary, +// toolchain and bootstrap manifests are unused and therefore always empty. +// struct bootstrapped_machine { - dir_path path; machine_lock lock; + const dir_path path; bootstrapped_machine_manifest manifest; }; using bootstrapped_machines = vector; @@ -850,8 +1106,8 @@ try r.push_back ( bootstrapped_machine { - dir_path (ops.machines ()) /= mh.name, // For diagnostics. machine_lock (path (), nullfd), // Fake lock. + dir_path (ops.machines ()) /= mh.name, // For diagnostics. bootstrapped_machine_manifest { machine_manifest { move (mh.id), @@ -867,8 +1123,8 @@ try return pr; } - // Notice and warn if there are no machines (as opposed to all of them - // being busy). + // Notice and warn if there are no build machines (as opposed to all of + // them being busy). // bool none (true); @@ -976,8 +1232,6 @@ try fail << "unable to read subvolume link " << lp << ": " << e; } - none = none && sp.empty (); - // Try to lock the machine. // machine_lock ml (lock_machine (tl, tp)); @@ -1010,18 +1264,25 @@ try mm = parse_manifest ( sp / "manifest", "machine"); + + none = none && mm.effective_role () == machine_role::auxiliary; } else // Bootstrapping/suspended. { l3 ([&]{trace << "keeping " << md << ": being bootstrapped " << "or suspened by " << ml.pid;}); + + // Assume it is a build machine (we cannot determine whether + // it is build or auxiliary without loading its manifest). + // + none = false; } // Add the machine to the lists and bail out. // r.push_back (bootstrapped_machine { - move (tp), move (ml), + move (tp), bootstrapped_machine_manifest {move (mm), {}, {}}}); break; @@ -1057,15 +1318,29 @@ try // Load the (original) machine manifest. // - auto mm ( + machine_manifest mm ( parse_manifest (sp / "manifest", "machine")); + bool aux (mm.effective_role () == machine_role::auxiliary); + + // Skip machines for which we don't have sufficient RAM. + // + if (effective_ram_minimum (mm) < + (aux ? ops.auxiliary_ram () : ops.build_ram ())) + { + l3 ([&]{trace << "skipping " << md << ": insufficient RAM";}); + run_btrfs (trace, "subvolume", "delete", xp); + break; + } + + none = none && aux; + // If we already have -, see if it needs to be // re-bootstrapped. Things that render it obsolete: // // 1. New machine revision (compare machine ids). - // 2. New toolchain (compare toolchain ids). - // 3. New bbot/libbbot (compare versions). + // 2. New toolchain (compare toolchain ids, not auxiliary). + // 3. New bbot/libbbot (compare versions, not auxiliary). // // The last case has a complication: what should we do if we have // bootstrapped a newer version of bbot? This would mean that we @@ -1087,24 +1362,27 @@ try te = false; } - if (!tc_id.empty () && bmm->toolchain.id != tc_id) + if (!aux) { - l3 ([&]{trace << "re-bootstrap " << tp << ": new toolchain";}); - te = false; - } - - if (int i = compare_bbot (bmm->bootstrap)) - { - if (i < 0) + if (!tc_id.empty () && bmm->toolchain.id != tc_id) { - l3 ([&]{trace << "re-bootstrap " << tp << ": new bbot";}); + l3 ([&]{trace << "re-bootstrap " << tp << ": new toolchain";}); te = false; } - else + + if (int i = compare_bbot (bmm->bootstrap)) { - l3 ([&]{trace << "ignoring " << tp << ": old bbot";}); - run_btrfs (trace, "subvolume", "delete", xp); - break; + if (i < 0) + { + l3 ([&]{trace << "re-bootstrap " << tp << ": new bbot";}); + te = false; + } + else + { + l3 ([&]{trace << "ignoring " << tp << ": old bbot";}); + run_btrfs (trace, "subvolume", "delete", xp); + break; + } } } @@ -1134,121 +1412,624 @@ try // Add the machine to the lists. // r.push_back ( - bootstrapped_machine {move (tp), move (ml), move (*bmm)}); + bootstrapped_machine {move (ml), move (tp), move (*bmm)}); + + break; + } // Retry loop. + } // Inner dir_iterator loop. + } + catch (const system_error& e) + { + fail << "unable to iterate over " << vd << ": " << e; + } + } // Outer dir_iterator loop. + + // See if there is a pending bootstrap and whether we can perform it. + // + // What should we do if we can't (i.e., we are in the priority monitor + // mode)? Well, we could have found some machines that are already + // bootstrapped (busy or not) and there may be a higher-priority task for + // one of them, so it feels natural to return whatever we've got. + // + if (pboot) + { + dir_path& tp (pboot->tp); + dir_path& xp (pboot->xp); + + // Determine how many machines are busy (locked by other processes) and + // make sure it's below the --instance-max limit, if specified. + // + // We should only count build machines unless being bootstrapped (see + // above). + // + if (inst_max != 0) + { + size_t busy (0); + for (const bootstrapped_machine& m: r) + { + if (!m.lock.locked () && + (!m.lock.prio || + m.manifest.machine.effective_role () != machine_role::auxiliary)) + ++busy; + } + + assert (busy <= inst_max); + + if (busy == inst_max) + { + l3 ([&]{trace << "instance max reached attempting to bootstrap " + << tp;}); + run_btrfs (trace, "subvolume", "delete", xp); + return pr; + } + } + + machine_lock& ml (pboot->ml); + + l3 ([&]{trace << "bootstrapping " << tp;}); + + // Use the -- snapshot that we have made to bootstrap + // the new machine. Then atomically rename it to -. + // + // Also release all the machine locks that we have acquired so far as + // well as the global toolchain lock, since the bootstrap will take a + // while and other instances might be able to use them. Because we are + // releasing the global lock, we have to restart the enumeration process + // from scratch. + // + r.clear (); + ml.bootstrap (tl); + tl.unlock (); + + bool aux (pboot->mm.effective_role () == machine_role::auxiliary); + + optional bmm ( + aux + ? bootstrap_auxiliary_machine (xp, pboot->mm, move (pboot->bmm)) + : bootstrap_build_machine (xp, pboot->mm, move (pboot->bmm))); + + if (!bmm) + { + l3 ([&]{trace << "ignoring " << tp << ": failed to bootstrap";}); + run_btrfs (trace, "subvolume", "delete", xp); + continue; + } + + try + { + mvdir (xp, tp); + } + catch (const system_error& e) + { + fail << "unable to rename " << xp << " to " << tp; + } + + l2 ([&]{trace << "bootstrapped " << bmm->machine.name;}); + + // Check the bootstrapped bbot version as above and ignore this build + // machine if it's newer than us. + // + if (!aux) + { + if (int i = compare_bbot (bmm->bootstrap)) + { + if (i > 0) + l3 ([&]{trace << "ignoring " << tp << ": old bbot";}); + else + warn << "bootstrapped " << tp << " bbot worker is older " + << "than agent; assuming test setup"; + } + } + + continue; // Re-enumerate from scratch. + } + + if (none) + warn << "no build machines for toolchain " << tc_name; + + return pr; + + } // From-scratch retry loop. + + // Unreachable. +} +catch (const system_error& e) +{ + fail << "unable to iterate over " << machines << ": " << e << endf; +} + +// Perform the build task throwing interrupt if it has been interrupted. +// +struct interrupt {}; + +// Start an auxiliary machine (steps 1-3 described in perfrom_task() below). +// +// Note that if the returned machine is NULL, then it means it has failed to +// start up (in which case the diagnostics has already been issued and +// snapshot cleaned up). +// +// Note: can throw interrupt. +// +struct auxiliary_machine_result +{ + dir_path snapshot; + unique_ptr machine; +}; + +using auxiliary_machine_results = vector; + +static pair +start_auxiliary_machine (bootstrapped_machine& am, + const string& env_name, + uint16_t machine_num, + size_t ram, + const string& in_name, // - + const dir_path& tftp_put_dir, + optional boost_cpus) +try +{ + tracer trace ("start_auxiliary_machine", am.path.string ().c_str ()); + + // NOTE: a simplified version of perform_task() below. + + machine_lock& ml (am.lock); + const dir_path& md (am.path); + const bootstrapped_machine_manifest& mm (am.manifest); + + path ef (tftp_put_dir / "environment"); // Environment upload file. + try_rmfile (ef); + + // -- + // + const dir_path xp (snapshot_path (md)); + + for (size_t retry (0);; ++retry) + { + if (retry != 0) + run_btrfs (trace, "subvolume", "delete", xp); + + run_btrfs (trace, "subvolume", "snapshot", md, xp); + + // Start the TFTP server. Map: + // + // GET requests to /dev/null + // PUT requests to .../build/-/put/* + // + // Note that we only need to run the TFTP server until we get the + // environment upload. Which means we could have reused the same port as + // the build machine. But let's keep things parallel to the VNC ports and + // use a seperate TFTP port for each auxiliary machine. + // + tftp_server tftpd ("Gr ^/?(.+)$ " + string ("/dev/null") + '\n' + + "Pr ^/?(.+)$ /build/" + in_name + "/put/\\1\n", + ops.tftp_port () + offset + machine_num); + + l3 ([&]{trace << "tftp server on port " << tftpd.port ();}); + + // Note: the machine handling logic is similar to bootstrap. Except here + // we have to cleanup the snapshot ourselves in case of suspension or + // unexpected exit. + + // Start the machine. + // + // Note that for now we don't support logging in into auxiliary machines + // in the interactive mode. Maybe one day. + // + unique_ptr m ( + start_machine (xp, + mm.machine, + machine_num, + boost_cpus ? *boost_cpus : ops.cpu (), + ram, + mm.machine.mac, + ops.bridge (), + tftpd.port (), + false /* public_vnc */)); + + auto mg ( + make_exception_guard ( + [&m, &xp] () + { + if (m != nullptr) + { + info << "trying to force machine " << xp << " down"; + try {m->forcedown (false);} catch (const failed&) {} + } + })); + + auto soft_fail = [&trace, &ml, &xp, &m] (const char* msg) + { + { + diag_record dr (error); + dr << msg << " for machine " << xp << ", suspending"; + m->print_info (dr); + } + + try + { + // Update the information in the machine lock to signal that the + // machine is suspended and cannot be interrupted. + // + ml.suspend_task (); + + m->suspend (false); + m->wait (false); + m->cleanup (); + run_btrfs (trace, "subvolume", "delete", xp); + info << "resuming after machine suspension"; + } + catch (const failed&) {} + + return make_pair (auxiliary_machine_result {move (xp), nullptr}, + string ()); + }; + + auto check_machine = [&xp, &m] () + { + try + { + size_t t (0); + if (!m->wait (t /* seconds */, false /* fail_hard */)) + return true; + } + catch (const failed&) {} + + diag_record dr (warn); + dr << "machine " << xp << " exited unexpectedly"; + m->print_info (dr); + + return false; + }; + + auto check_interrupt = [&trace, &xp, &m] () + { + if (sigurs1.load (std::memory_order_consume) == 0) + return; + + l2 ([&]{trace << "machine " << xp << " interruped";}); + + try {m->forcedown (false);} catch (const failed&) {} + m->cleanup (); + m = nullptr; // Disable exceptions guard above. + run_btrfs (trace, "subvolume", "delete", xp); + + throw interrupt (); + }; + + // Wait for up to 4 minutes (by default) for the environment upload (the + // same logic as in bootstrap_auxiliary_machine() except here the machine + // cannot just exit). + // + size_t to; + const size_t startup_to (ops.build_startup ()); + + for (to = startup_to; to != 0; ) + { + check_interrupt (); + + if (tftpd.serve (to, 2)) + continue; + + if (!check_machine ()) + { + // An auxiliary machine should not just exit. + // + return make_pair (auxiliary_machine_result {move (xp), nullptr}, + string ()); + } + + if (file_not_empty (ef)) + { + if (!tftpd.serve (to, 5)) + break; + } + } + + if (to == 0) + { + if (retry > ops.build_retries ()) + return soft_fail ("build startup timeout"); + + // Note: keeping the logs behind (no cleanup). + + diag_record dr (warn); + dr << "machine " << mm.machine.name << " mis-booted, retrying"; + m->print_info (dr); + + try {m->forcedown (false);} catch (const failed&) {} + m = nullptr; // Disable exceptions guard above. + continue; + } + + l3 ([&]{trace << "completed startup in " << startup_to - to << "s";}); + + // Read the uploaded environment and, if necessary, append the name prefix + // (which we first make a valid C identifier). + // + // Note that it may seem like a good idea to validate the format here. + // But that means we will essentially need to parse it twice (here and in + // worker). Plus, in worker we can comminucate some diagnostics by writing + // it to the build log (here all we can easily do is abort the task). So + // here we just append the name prefix to trimmed non-blank/comment lines. + // + string env_pfx (env_name.empty () + ? string () + : sanitize_identifier (env_name) + '_'); + string env; + try + { + ifdstream is (ef, ifdstream::badbit); + for (string l; !eof (getline (is, l)); ) + { + trim (l); + + if (!env_pfx.empty () && !l.empty () && l.front () != '#') + l.insert (0, env_pfx); + + env += l; env += '\n'; + } + } + catch (const io_error& e) + { + fail << "unable to read from " << ef << ": " << e; + } + + // Rename and keep the environment file for debugging (it will be removed + // at the end as part of the tftp_put_dir cleanup). + // + mvfile (ef, ef + '-' + mm.machine.name); + + return make_pair (auxiliary_machine_result {move (xp), move (m)}, + move (env)); + } + + // Unreachable. +} +catch (const system_error& e) +{ + fail << "auxiliary machine startup error: " << e << endf; +} + +// Divide the auxiliary RAM among the specified machines. +// +// Issue diagnostics and return empty vector if the auxiliary RAM is +// insufficient. +// +static vector // Parallel to mms. +divide_auxiliary_ram (const vector& mms) +{ + size_t ram (ops.auxiliary_ram ()); + + vector rams; + vector rnds; // Allocation rounds (see below). + + // First pass: allocate the minimums. + // + for (const machine_manifest* mm: mms) + { + size_t v (effective_ram_minimum (*mm)); + + assert (!mm->ram_maximum || v <= *mm->ram_maximum); // Sanity check. + + rams.push_back (v); + rnds.push_back (0); + + if (ram >= v) + ram -= v; + else + { + diag_record dr (error); + dr << "insufficient auxiliary RAM " << ops.auxiliary_ram () << "KiB"; - break; - } // Retry loop. - } // Inner dir_iterator loop. - } - catch (const system_error& e) - { - fail << "unable to iterate over " << vd << ": " << e; - } - } // Outer dir_iterator loop. + for (size_t i (0); i != rams.size (); ++i) + dr << info << mms[i]->name << " requires minimum " << rams[i] << "KiB"; - // See if there is a pending bootstrap and whether we can perform it. + return {}; + } + } + + // Second pass: distribute the remaining RAM. + // + // We are going to do it in the ram_minimum increments to avoid ending up + // with odd amounts (while Linux can probably grok anything, who knows about + // Windows). + // + // To make the distribution fair we are going to count how many times we + // have increased each machine's allocation (the rnds vector). + // + for (size_t a (1); ram != 0; ) // Allocation round. + { + // Find a machine that would be satisfied with the least amount of RAM but + // which hasn't yet been given anything on this allocation round. // - // What should we do if we can't (i.e., we are in the priority minitor - // mode)? Well, we could have found some machines that are already - // bootstrapped (busy or not) and there may be a higher-priority task for - // one of them, so it feels natural to return whatever we've got. + size_t min_i; // Min index. + size_t min_v (0); // Min value. + + // We are done if we couldn't give out any RAM and haven't seen any + // machines that have already been given something on this allocation + // round. // - if (pboot) - { - dir_path& tp (pboot->tp); - dir_path& xp (pboot->xp); + bool done (true); - // Determine how many machines are busy (locked by other processes) and - // make sure it's below the --instance-max limit, if specified. - // - if (inst_max != 0) + for (size_t i (0); i != rams.size (); ++i) + { + if (rnds[i] != a) { - size_t busy (0); - for (const bootstrapped_machine& m: r) - if (!m.lock.locked ()) - ++busy; + const machine_manifest& mm (*mms[i]); - assert (busy <= inst_max); + size_t o (rams[i]); + size_t v (effective_ram_minimum (mm)); - if (busy == inst_max) + // Don't allocate past maximum. + // + if (mm.ram_maximum && *mm.ram_maximum < o + v) { - l3 ([&]{trace << "instance max reached attempting to bootstrap " - << tp;}); - run_btrfs (trace, "subvolume", "delete", xp); - return pr; + v = *mm.ram_maximum - o; + + if (v == 0) + continue; + } + + if (v <= ram && (min_v == 0 || min_v > v)) + { + min_i = i; + min_v = v; } } + else + done = false; + } - machine_lock& ml (pboot->ml); + if (min_v != 0) + { + rnds[min_i] = a; + rams[min_i] += min_v; + ram -= min_v; + } + else + { + if (done) + break; - l3 ([&]{trace << "bootstrapping " << tp;}); + ++a; // Next allocation round. + } + } - // Use the -- snapshot that we have made to bootstrap - // the new machine. Then atomically rename it to -. - // - // Also release all the machine locks that we have acquired so far as - // well as the global toolchain lock, since the bootstrap will take a - // while and other instances might be able to use them. Because we are - // releasing the global lock, we have to restart the enumeration process - // from scratch. - // - r.clear (); - ml.bootstrap (tl); - tl.unlock (); + return rams; +} - optional bmm ( - bootstrap_machine (xp, pboot->mm, move (pboot->bmm))); +// Stop all the auxiliary machines and clear the passed list. +// +static void +stop_auxiliary_machines (auxiliary_machine_results& amrs) +{ + tracer trace ("stop_auxiliary_machines"); - if (!bmm) + if (!amrs.empty ()) + { + // Do it in two passes to make sure all the machines are at least down. + // + for (const auxiliary_machine_result& amr: amrs) + { + if (amr.machine != nullptr) { - l3 ([&]{trace << "ignoring " << tp << ": failed to bootstrap";}); - run_btrfs (trace, "subvolume", "delete", xp); - continue; + try {amr.machine->forcedown (false);} catch (const failed&) {} } + } - try - { - mvdir (xp, tp); - } - catch (const system_error& e) + // Make sure we don't retry the above even if the below fails. + // + auxiliary_machine_results tmp; + tmp.swap (amrs); + + for (const auxiliary_machine_result& amr: tmp) + { + if (amr.machine != nullptr) { - fail << "unable to rename " << xp << " to " << tp; + amr.machine->cleanup (); + run_btrfs (trace, "subvolume", "delete", amr.snapshot); } + } + } +} - l2 ([&]{trace << "bootstrapped " << bmm->machine.name;}); +// Start all the auxiliary machines and patch in their combined environment +// into tm.auxiliary_environment. +// +// Return the started machines or empty list if any of them failed to start up +// (which means this function should only be called for non-empty ams). +// +// Note that the order of auxiliary machines in ams may not match that in +// tm.auxiliary_machines. +// +static auxiliary_machine_results +start_auxiliary_machines (const vector& ams, + task_manifest& tm, + const string& in_name, // - + const dir_path& tftp_put_dir, + optional boost_cpus) +{ + tracer trace ("start_auxiliary_machines"); - // Check the bootstrapped bbot version as above and ignore this machine - // if it's newer than us. - // - if (int i = compare_bbot (bmm->bootstrap)) + size_t n (tm.auxiliary_machines.size ()); + + assert (n != 0 && ams.size () == n); + + auxiliary_machine_results amrs; + + // Divide the auxiliary RAM among the machines. + // + vector rams; + { + vector mms; + mms.reserve (n); + for (bootstrapped_machine* am: ams) + mms.push_back (&am->manifest.machine); + + rams = divide_auxiliary_ram (mms); + if (rams.empty ()) + return amrs; + + if (verb > 3) // l3 + for (size_t i (0); i != n; ++i) + trace << mms[i]->name << " allocated " << rams[i] << "KiB"; + } + + // Start the machines. + // + // Let's use the order in which they were specified in the task manifest + // (which will naturally be the order in which they are specified in the + // package manifest). This way amrs and tm.auxiliary_machines will be + // parallel. + // + string envs; // Combined environments. + + for (size_t i (0); i != n; ++i) + { + const auxiliary_machine& tam (tm.auxiliary_machines[i]); + + auto b (ams.begin ()), e (ams.end ()); + auto j (find_if (b, e, + [&tam] (const bootstrapped_machine* m) + { + return m->manifest.machine.name == tam.name; + })); + assert (j != e); + + pair p ( + start_auxiliary_machine (**j, + tam.environment_name, + i + 1, + rams[j - b], // Parallel to ams. + in_name, + tftp_put_dir, + boost_cpus)); + + if (p.first.machine == nullptr) + { + if (!amrs.empty ()) { - if (i > 0) - l3 ([&]{trace << "ignoring " << tp << ": old bbot";}); - else - warn << "bootstrapped " << tp << " bbot worker is older " - << "than agent; assuming test setup"; + info << "trying to force auxiliary machines down"; + stop_auxiliary_machines (amrs); // amrs is now empty. } - continue; // Re-enumerate from scratch. + return amrs; } - if (none) - warn << "no build machines for toolchain " << tc_name; + amrs.push_back (move (p.first)); - return pr; + // Add the machine name as a header before its environment. + // + envs += "# "; envs += tam.name; envs += '\n'; + envs += "#\n"; + envs += p.second; + envs += '\n'; + } - } // From-scratch retry loop. + tm.auxiliary_environment = move (envs); - // Unreachable. + return amrs; } -catch (const system_error& e) -{ - fail << "unable to iterate over " << machines << ": " << e << endf; -} - -// Perform the build task throwing interrupt if it has been interrupted. -// -struct interrupt {}; struct perform_task_result { @@ -1278,16 +2059,18 @@ struct perform_task_result upload_archive (move (a)) {} }; +// Note that the task manifest is not const since we may need to patch in the +// auxiliary_environment value. +// static perform_task_result perform_task (toolchain_lock tl, // Note: assumes ownership. - machine_lock& ml, - const dir_path& md, - const bootstrapped_machine_manifest& mm, - const task_manifest& tm, + bootstrapped_machine& bm, // Build machine. + const vector& ams, // Auxiliary machines. + task_manifest& tm, optional boost_cpus) try { - tracer trace ("perform_task", md.string ().c_str ()); + tracer trace ("perform_task", bm.path.string ().c_str ()); // Arm the interrupt handler and release the global toolchain lock. // @@ -1296,6 +2079,10 @@ try sigurs1.store (0, std::memory_order_release); tl.unlock (); + machine_lock& ml (bm.lock); + const dir_path& md (bm.path); + const bootstrapped_machine_manifest& mm (bm.manifest); + const string in_name (tc_name + '-' + to_string (inst)); auto_rmdir arm ((dir_path (ops.tftp ()) /= "build") /= in_name); @@ -1314,7 +2101,7 @@ try // The overall plan is as follows: // - // 1. Snapshot the (bootstrapped) machine. + // 1. Snapshot the (bootstrapped) build machine. // // 2. Save the task manifest to the TFTP directory (to be accessed by the // worker). @@ -1326,6 +2113,18 @@ try // // 5. Clean up (force the machine down and delete the snapshot). // + // If the task requires any auxiliary machines, then for each such machine + // perform the following steps 1-3 before step 1 above, and step 4 after + // step 5 above (that is, start all the auxiliary machines before the build + // machine and clean them up after): + // + // 1. Snapshot the (bootstrapped) auxiliary machine. + // + // 2. Start the TFTP server and the machine. + // + // 3. Handle TFTP upload requests until received the environment upload. + // + // 4. Clean up (force the machine down and delete the snapshot). // TFTP server mapping (server chroot is --tftp): // @@ -1342,11 +2141,12 @@ try path rf (pd / "result.manifest.lz4"); // Result manifest file. path af (pd / "upload.tar"); // Archive of build artifacts to upload. - serialize_manifest (tm, tf, "task"); - if (ops.fake_machine_specified ()) { - // Note: not handling interrupts here. + // Note: not handling interrupts here. Nor starting any auxiliary + // machines, naturally. + + serialize_manifest (tm, tf, "task"); // Simply wait for the file to appear. // @@ -1368,6 +2168,38 @@ try } else { + // Start the auxiliary machines if any. + // + // If anything goes wrong, force them all down (failed that, the machine + // destructor will block waiting for their exit). + // + auxiliary_machine_results amrs; + + auto amg ( + make_exception_guard ( + [&amrs] () + { + if (!amrs.empty ()) + { + info << "trying to force auxiliary machines down"; + stop_auxiliary_machines (amrs); + } + })); + + if (!ams.empty ()) + { + amrs = start_auxiliary_machines (ams, tm, in_name, pd, boost_cpus); + + if (amrs.empty ()) + return perform_task_result (move (arm), move (r)); // Abort. + } + + // Note: tm.auxiliary_environment patched in by start_auxiliary_machines(). + // + serialize_manifest (tm, tf, "task"); + + // Start the build machine and perform the build. + // try_rmfile (rf); try_rmfile (af); @@ -1386,7 +2218,7 @@ try // tftp_server tftpd ("Gr ^/?(.+)$ /build/" + in_name + "/get/\\1\n" + "Pr ^/?(.+)$ /build/" + in_name + "/put/\\1\n", - ops.tftp_port () + offset); + ops.tftp_port () + offset + 0 /* build machine */); l3 ([&]{trace << "tftp server on port " << tftpd.port ();}); @@ -1394,17 +2226,21 @@ try // we have to cleanup the snapshot ourselves in case of suspension or // unexpected exit. // + // NOTE: see similar code in start_auxiliary_machine() above. + // { // Start the machine. // unique_ptr m ( start_machine (xp, mm.machine, + 0 /* machine_num (build) */, + boost_cpus ? *boost_cpus : ops.cpu (), + ops.build_ram (), mm.machine.mac, ops.bridge (), tftpd.port (), - tm.interactive.has_value (), - boost_cpus)); + tm.interactive.has_value () /* public_vnc */)); auto mg ( make_exception_guard ( @@ -1417,7 +2253,10 @@ try } })); - auto soft_fail = [&trace, &ml, &xp, &m, &arm, &r] (const char* msg) + auto soft_fail = [&trace, + &amrs, + &ml, &xp, &m, + &arm, &r] (const char* msg) { { diag_record dr (error); @@ -1425,6 +2264,18 @@ try m->print_info (dr); } + // What should we do about auxiliary machines? We could force them + // all down before suspending (and thus freeing them for use). That + // is the easy option. We could suspend them as well, but that feels + // like it will be a pain (will need to resume all of them when done + // investigating). Theoretically we could just let them run, but + // that won't play well with our interrupt logic since someone may + // attempt to interrupt us via one of them. So let's do easy for + // now. + // + // Note: always stop/suspend the build machine before the auxiliary + // machines to avoid any errors due the auxiliary machines being + // unavailable. try { // Update the information in the machine lock to signal that the @@ -1433,8 +2284,10 @@ try ml.suspend_task (); m->suspend (false); + stop_auxiliary_machines (amrs); m->wait (false); m->cleanup (); + m = nullptr; // Disable exceptions guard above. run_btrfs (trace, "subvolume", "delete", xp); info << "resuming after machine suspension"; } @@ -1460,7 +2313,29 @@ try return false; }; - auto check_interrupt = [&trace, &xp, &m] () + auto check_auxiliary_machines = [&amrs] () + { + for (auxiliary_machine_result& amr: amrs) + { + try + { + size_t t (0); + if (!amr.machine->wait (t /* seconds */, false /* fail_hard */)) + continue; + } + catch (const failed&) {} + + diag_record dr (warn); + dr << "machine " << amr.snapshot << " exited unexpectedly"; + amr.machine->print_info (dr); + + return false; + } + + return true; + }; + + auto check_interrupt = [&trace, &amrs, &xp, &m] () { if (sigurs1.load (std::memory_order_consume) == 0) return; @@ -1468,6 +2343,7 @@ try l2 ([&]{trace << "machine " << xp << " interruped";}); try {m->forcedown (false);} catch (const failed&) {} + stop_auxiliary_machines (amrs); m->cleanup (); m = nullptr; // Disable exceptions guard above. run_btrfs (trace, "subvolume", "delete", xp); @@ -1497,8 +2373,13 @@ try if (tftpd.serve (to, 2)) break; - if (!check_machine ()) + bool bm; // Build machine still running. + if (!(bm = check_machine ()) || !check_auxiliary_machines ()) { + if (bm) + try {m->forcedown (false);} catch (const failed&) {} + stop_auxiliary_machines (amrs); + m = nullptr; // Disable exceptions guard above. run_btrfs (trace, "subvolume", "delete", xp); return perform_task_result (move (arm), move (r)); } @@ -1516,6 +2397,7 @@ try m->print_info (dr); try {m->forcedown (false);} catch (const failed&) {} + m = nullptr; // Disable exceptions guard above. continue; } @@ -1535,10 +2417,15 @@ try if (tftpd.serve (to, 2)) continue; - if (!check_machine ()) + bool bm; // Build machine still running. + if (!(bm = check_machine ()) || !check_auxiliary_machines ()) { - if (!file_not_empty (rf)) + if (bm || !file_not_empty (rf)) { + if (bm) + try {m->forcedown (false);} catch (const failed&) {} + stop_auxiliary_machines (amrs); + m = nullptr; // Disable exceptions guard above. run_btrfs (trace, "subvolume", "delete", xp); return perform_task_result (move (arm), move (r)); } @@ -1594,7 +2481,9 @@ try // lease instead of a new one. // try {m->forcedown (false);} catch (const failed&) {} + stop_auxiliary_machines (amrs); m->cleanup (); + m = nullptr; // Disable exceptions guard above. } } @@ -1755,17 +2644,20 @@ try : standard_version (BBOT_VERSION_STR)); tc_id = ops.toolchain_id (); - if (tc_num == 0 || tc_num > 99) - fail << "invalid --toolchain-num value " << tc_num; + if (tc_num == 0 || tc_num > 9) + fail << "--toolchain-num value " << tc_num << " out of range"; inst = ops.instance (); if (inst == 0 || inst > 99) - fail << "invalid --instance value " << inst; + fail << "--instance value " << inst << " out of range"; inst_max = ops.instance_max (); - offset = (tc_num - 1) * 100 + inst; + // The last decimal position is used for machine number, 0 for the build + // machine, non-0 for auxiliary machines (of which we can have maximum 9). + // + offset = (tc_num - 1) * 1000 + inst * 10; // Controller priority to URLs map. // @@ -1917,7 +2809,7 @@ try if (ops.interactive () != interactive_mode::false_) { imode = ops.interactive (); - ilogin = machine_vnc (true /* public */); + ilogin = machine_vnc (0 /* machine_num */, true /* public */); } // Use the pkeyutl openssl command for signing the task response challenge @@ -1959,25 +2851,29 @@ try uint64_t prio_max (0); bool prio_mon (false); { - uint16_t busy (0); // Number of machines locked by other processes. - bool task (false); // There is a machine performing a task. + uint16_t busy (0); // Number of build machines locked by other processes. + bool task (false); // There is a build machine performing a task. for (const bootstrapped_machine& m: ms) { if (!m.lock.locked ()) { - ++busy; - if (m.lock.prio) // Not bootstrapping/suspended. { - task = true; + if (m.manifest.machine.effective_role () != machine_role::auxiliary) + { + ++busy; + task = true; - if (prio_min > *m.lock.prio) - prio_min = *m.lock.prio; + if (prio_min > *m.lock.prio) + prio_min = *m.lock.prio; - if (prio_max < *m.lock.prio) - prio_max = *m.lock.prio; + if (prio_max < *m.lock.prio) + prio_max = *m.lock.prio; + } } + else + ++busy; // Assume build machine (see enumerate_machines()). } } @@ -2067,15 +2963,19 @@ try imode, ilogin, fingerprint, - nullopt /* auxiliary_ram */, // @@ TMP AUXILIARY + ops.auxiliary_ram (), machine_header_manifests {}}; // Determine which machines we need to offer for this priority. // + bool aux_only (true); // Only auxiliary machines are available. { - bool interruptable (false); + bool interruptable (false); // There is build machine we can interrupt. for (const bootstrapped_machine& m: ms) { + const machine_manifest& mm (m.manifest.machine); + machine_role role (mm.effective_role ()); + if (!m.lock.locked ()) { if (!m.lock.prio) // Skip bootstrapping/suspended. @@ -2093,18 +2993,18 @@ try if ((prio / 100) <= (eprio / 100)) continue; - interruptable = true; + if (role != machine_role::auxiliary) + interruptable = true; } - tq.machines.emplace_back (m.manifest.machine.id, - m.manifest.machine.name, - m.manifest.machine.summary, - // - // @@ TMP AUXILIARY - // - nullopt /* role */, - nullopt /* ram_minimum */, - nullopt /* ram_maximum */); + tq.machines.emplace_back (mm.id, + mm.name, + mm.summary, + role, + effective_ram_minimum (mm), + mm.ram_maximum); + + aux_only = aux_only && role == machine_role::auxiliary; } // Sanity check: in the priority monitor mode we should only ask for a @@ -2122,10 +3022,13 @@ try return 0; } + if (aux_only) + tq.machines.clear (); + if (tq.machines.empty ()) { - // If we have no machines for this priority then we won't have any - // for any lower priority so bail out. + // If we have no build machines for this priority then we won't have + // any for any lower priority so bail out. // break; } @@ -2133,7 +3036,7 @@ try // Send task requests. // // Note that we have to do it while holding the lock on all the machines - // since we don't know which machine we will need. + // since we don't know which machine(s) we will need. // vector rurls (urls.size ()); std::iota (rurls.begin (), rurls.end (), urls.begin ()); @@ -2174,9 +3077,8 @@ try "--max-time", ops.request_timeout (), "--connect-timeout", ops.connect_timeout ()); - // This is tricky/hairy: we may fail hard parsing the output - // before seeing that curl exited with an error and failing - // softly. + // This is tricky/hairy: we may fail hard parsing the output before + // seeing that curl exited with an error and failing softly. // bool f (false); @@ -2259,7 +3161,7 @@ try } // prio loop. - if (tq.machines.empty ()) // No machines. + if (tq.machines.empty ()) // No machines (auxiliary-only already handled). { // Normally this means all the machines are busy so sleep a bit less. // @@ -2279,15 +3181,67 @@ try // task_manifest& t (*tr.task); - // First verify the requested machine is one of those we sent in tq. + // First verify the requested machines are from those we sent in tq and + // their roles match. + // + // Also verify the same machine is not picked multiple times by blanking + // out the corresponding entry in tq.machines. (Currently we are only + // capable of running one instance of each machine though we may want to + // relax that in the future, at which point we should send as many entries + // for the same machine in the task request as we are capable of running, + // applying the priority logic for each entry, etc). + // + { + auto check = [&tq, &url] (const string& name, machine_role r) + { + auto i (find_if (tq.machines.begin (), tq.machines.end (), + [&name] (const machine_header_manifest& mh) + { + return mh.name == name; // Yes, names, not ids. + })); + + if (i == tq.machines.end ()) + { + error << "task from " << url << " for unknown machine " << name; + return false; + } + + if (i->effective_role () != r) + { + error << "task from " << url << " with mismatched role " + << " for machine " << name; + return false; + } + + i->name.clear (); // Blank out. + + return true; + }; + + auto check_aux = [&check] (const vector& ams) + { + for (const auxiliary_machine& am: ams) + if (!check (am.name, machine_role::auxiliary)) + return false; + return true; + }; + + if (!check (t.machine, machine_role::build) || + !check_aux (t.auxiliary_machines)) + { + if (ops.dump_task ()) + return 0; + + continue; + } + } + + // Also verify there are no more than 9 auxiliary machines (see the offset + // global variable for details). // - if (find_if (tq.machines.begin (), tq.machines.end (), - [&t] (const machine_header_manifest& mh) - { - return mh.name == t.machine; // Yes, names, not ids. - }) == tq.machines.end ()) + if (t.auxiliary_machines.size () > 9) { - error << "task from " << url << " for unknown machine " << t.machine; + error << "task from " << url << " with more than 9 auxiliary machines"; if (ops.dump_task ()) return 0; @@ -2325,21 +3279,42 @@ try // feels like the most sensible option). // perform_task_result r; - bootstrapped_machine* pm (nullptr); + bootstrapped_machine* pm (nullptr); // Build machine. + vector ams; // Auxiliary machines. try { - // Next find the corresponding bootstrapped_machine instance in ms. Also - // unlock all the other machines. + // First find the bootstrapped_machine instance in ms corresponding to + // the requested build machine. Also unlock all the other machines. // // While at it also see if we need to interrupt the selected machine (if // busy), one of the existing (if we are at the max allowed instances, // that is in the priority monitor mode), or all existing (if this is a // priority level 4 task). // - vector ims; + // Auxiliary machines complicate the matter a bit: we may now need to + // interrupt some subset of {build machine, auxiliary machines} that are + // necessary to perform this task. Note, however, that auxiliary + // machines are always subordinate to build machines, meaning that if + // there is a busy auxiliary machine, then there will be a busy build + // machine with the same pid/priority (and so if we interrup one + // auxiliary, then we will also interrupt the corresponding build plus + // any other auxiliaries it may be using). Based on that let's try to + // divide and conquer this by first dealing with build machines and then + // adding any auxiliary ones. + // + vector ims; // Machines to be interrupted. + size_t imt (0); // Number of "target" machines to interrupt (see below). + + // First pass: build machines. + // for (bootstrapped_machine& m: ms) { - if (m.manifest.machine.name == t.machine) + const machine_manifest& mm (m.manifest.machine); + + if (mm.effective_role () == machine_role::auxiliary) + continue; + + if (mm.name == t.machine) { assert (pm == nullptr); // Sanity check. pm = &m; @@ -2367,16 +3342,71 @@ try } } - assert (pm != nullptr); + assert (pm != nullptr); // Sanity check. if (!pm->lock.locked ()) { + assert (pm->lock.prio); // Sanity check (not bootstrapping/suspended). + if (prio >= 1000) ims.insert (ims.begin (), pm); // Interrupt first (see below). else ims = {pm}; + + imt++; + } + + // Second pass: auxiliary machines. + // + for (bootstrapped_machine& m: ms) + { + const machine_manifest& mm (m.manifest.machine); + + if (mm.effective_role () != machine_role::auxiliary) + continue; + + if (find_if (t.auxiliary_machines.begin (), t.auxiliary_machines.end (), + [&mm] (const auxiliary_machine& am) + { + return am.name == mm.name; + }) != t.auxiliary_machines.end ()) + { + if (!m.lock.locked ()) + { + assert (m.lock.prio); // Sanity check (not bootstrapping/suspended). + + if (ims.empty ()) + { + ims.push_back (&m); + } + else if (ims.front () == pm) + { + ims.insert (ims.begin () + 1, &m); // Interrupt early (see below). + } + else if (prio < 1000 && prio_mon && ams.empty () /* first */) + { + // Tricky: replace the lowest priority task we have picked on + // the first pass with this one. + // + assert (ims.size () == 1); // Sanity check. + ims.back () = &m; + } + else + ims.insert (ims.begin (), &m); // Interrupt first (see below). + + imt++; + } + + ams.push_back (&m); + } + else if (m.lock.locked ()) + m.lock.unlock (); } + // Note: the order of machines may not match. + // + assert (ams.size () == t.auxiliary_machines.size ()); // Sanity check. + assert (!prio_mon || !ims.empty ()); // We should have at least one. // Move the toolchain lock into this scope so that it's automatically @@ -2389,23 +3419,26 @@ try // Interrupt the machines, if necessary. // // Note that if we are interrupting multiple machines, then the target - // machine, if needs to be interrupted, must be first. This way if we - // are unable to successfully interrupt it, we don't interrupt the rest. + // build machine, if needs to be interrupted, must be first, followed + // but all the target auxiliary machines. This way if we are unable to + // successfully interrupt them, we don't interrupt the rest. // - for (bootstrapped_machine* im: ims) + vector pids; // Avoid re-interrupting the same pid. + for (size_t i (0); i != ims.size (); ++i) { - bool first (im == ims.front ()); + bootstrapped_machine* im (ims[i]); // Sanity checks. // assert (!im->lock.locked () && im->lock.prio); - assert (im != pm || first); + assert (im != pm || i == 0); const dir_path& tp (im->path); // - path. + pid_t pid (im->lock.pid); l2 ([&]{trace << "interrupting " - << (im == pm ? "target" : "lower priority") - << " machine " << tp << ", pid " << im->lock.pid;}); + << (i < imt ? "target" : "lower priority") + << " machine " << tp << ", pid " << pid;}); // The plan is to send the interrupt and then wait for the lock. // @@ -2417,21 +3450,26 @@ try // But what can happen is the other task becomes suspended, which we // will not be able to interrupt. // - if (kill (im->lock.pid, SIGUSR1) == -1) + if (find (pids.begin (), pids.end (), pid) == pids.end ()) { - // Ignore the case where there is no such process (e.g., the other - // process has terminated in which case the lock should be released - // automatically). - // - if (errno != ESRCH) - throw_generic_error (errno); + if (kill (pid, SIGUSR1) == -1) + { + // Ignore the case where there is no such process (e.g., the other + // process has terminated in which case the lock should be + // released automatically). + // + if (errno != ESRCH) + throw_generic_error (errno); + } + + pids.push_back (pid); } - // If we are interrupting multiple machine, there is no use acquiring - // the lock (or failing if unable to) for subsequent machines since - // this is merely a performance optimization. + // If we are interrupting additional machine in order to free up + // resources, there is no use acquiring their lock (or failing if + // unable to) since this is merely a performance optimization. // - if (!first) + if (i >= imt) continue; // Try to lock the machine. @@ -2453,7 +3491,7 @@ try if (ml.locked ()) break; - if (ml.pid != im->lock.pid) + if (ml.pid != pid) { error << "interrupted machine " << tp << " changed pid"; throw interrupt (); @@ -2473,26 +3511,27 @@ try throw interrupt (); } - // If the interrupted machine is what we will use, see if it needs a - // re-bootstrap, the same as in enumerate_machines(). If not, then - // transfer the bootstrap manifest and lock. + // This is an interrupted machine (build or auxiliary) that we will be + // using. See if it needs a re-bootstrap, the same as in + // enumerate_machines(). If not, then transfer the bootstrap manifest + // and lock. // - if (im == pm) - { - const machine_manifest& mm (im->manifest.machine); + const machine_manifest& mm (im->manifest.machine); - bootstrapped_machine_manifest bmm ( - parse_manifest ( - tp / "manifest", "bootstrapped machine")); + bootstrapped_machine_manifest bmm ( + parse_manifest ( + tp / "manifest", "bootstrapped machine")); - bool rb (false); + bool rb (false); - if (bmm.machine.id != mm.id) - { - l3 ([&]{trace << "re-bootstrap " << tp << ": new machine";}); - rb = true; - } + if (bmm.machine.id != mm.id) + { + l3 ([&]{trace << "re-bootstrap " << tp << ": new machine";}); + rb = true; + } + if (im == pm) // Only for build machine. + { if (!tc_id.empty () && bmm.toolchain.id != tc_id) { l3 ([&]{trace << "re-bootstrap " << tp << ": new toolchain";}); @@ -2512,15 +3551,15 @@ try rb = true; } } + } - // We are not going to try to re-bootstrap this machine "inline". - // - if (rb) - throw interrupt (); + // We are not going to try to re-bootstrap this machine "inline". + // + if (rb) + throw interrupt (); - im->manifest = move (bmm); - im->lock = move (ml); - } + im->manifest = move (bmm); + im->lock = move (ml); } // Check if we need to boost the number of CPUs to the full hardware @@ -2530,8 +3569,11 @@ try if (prio >= 10000) bcpus = std::thread::hardware_concurrency (); - pm->lock.perform_task (tl, prio); - r = perform_task (move (tl), pm->lock, pm->path, pm->manifest, t, bcpus); + pm->lock.perform_task (tl, prio); // Build machine. + for (bootstrapped_machine* am: ams) // Auxiliary machines. + am->lock.perform_task (tl, prio); + + r = perform_task (move (tl), *pm, ams, t, bcpus); } catch (const interrupt&) { @@ -2548,8 +3590,14 @@ try nullopt /* dependency_checksum */}); } + // No need to hold the locks any longer. + // if (pm != nullptr && pm->lock.locked ()) - pm->lock.unlock (); // No need to hold the lock any longer. + pm->lock.unlock (); + + for (bootstrapped_machine* am: ams) + if (am->lock.locked ()) + am->lock.unlock (); result_manifest& rm (r.manifest); diff --git a/bbot/agent/agent.hxx b/bbot/agent/agent.hxx index 72b819b..9c8400f 100644 --- a/bbot/agent/agent.hxx +++ b/bbot/agent/agent.hxx @@ -22,14 +22,14 @@ namespace bbot extern standard_version tc_ver; // Toolchain version. extern string tc_id; // Toolchain id. - extern uint16_t inst; // Instance number. + extern uint16_t inst; // Instance number (1-based). extern string hname; // Our host name. extern string hip; // Our IP address. extern uid_t uid; // Our effective user id. extern string uname; // Our effective user name. - extern uint16_t offset; // Agent offset. + extern uint16_t offset; // Agent offset (10-9990; used for ports). // Random number generator (currently not MT-safe and limited to RAND_MAX). // diff --git a/bbot/agent/machine.cxx b/bbot/agent/machine.cxx index 2d9ad4f..74c9b93 100644 --- a/bbot/agent/machine.cxx +++ b/bbot/agent/machine.cxx @@ -83,9 +83,9 @@ namespace bbot } static string - create_tap (const string& br, uint16_t port) + create_tap (const string& br, uint16_t machine_num, uint16_t port) { - string t ("tap" + to_string (offset)); + string t ("tap" + to_string (offset + machine_num)); tracer trace ("create_tap", t.c_str ()); @@ -126,8 +126,10 @@ namespace bbot string bridge; // Bridge interface to which this tap belongs uint16_t port; // UDP port to forward TFTP traffic to. - tap (string b, uint16_t p) - : iface (create_tap (b, p)), bridge (move (b)), port (p) {} + tap (string b, uint16_t machine_num, uint16_t p) + : iface (create_tap (b, machine_num, p)), + bridge (move (b)), + port (p) {} ~tap () { @@ -169,11 +171,13 @@ namespace bbot public: kvm_machine (const dir_path&, const machine_manifest&, + uint16_t machine_num, + size_t cpus, + size_t ram, const optional& mac, const string& br_iface, uint16_t tftp_port, - bool pub_vnc, - optional boost_cpus = nullopt); + bool pub_vnc); virtual bool shutdown (size_t& seconds) override; @@ -214,31 +218,47 @@ namespace bbot kvm_machine:: kvm_machine (const dir_path& md, const machine_manifest& mm, + uint16_t m_num, + size_t cpus, + size_t ram, const optional& omac, const string& br, - uint16_t port, - bool pub_vnc, - optional bcpus) + uint16_t tftp_port, + bool pub_vnc) : machine (mm.mac ? *mm.mac : // Fixed mac from machine manifest. omac ? *omac : // Generated mac from previous bootstrap. generate_mac ()), kvm ("kvm"), - net (br, port), - vnc (machine_vnc (pub_vnc)), + net (br, m_num, tftp_port), + vnc (machine_vnc (m_num, pub_vnc)), monitor ("/tmp/monitor-" + tc_name + '-' + to_string (inst)) { tracer trace ("kvm_machine", md.string ().c_str ()); + // Monitor path. + // + if (m_num != 0) + { + monitor += '-'; + monitor += to_string (m_num); + } + if (sizeof (sockaddr_un::sun_path) <= monitor.size ()) throw invalid_argument ("monitor unix socket path too long"); // Machine name. // // While we currently can only have one running machine per toolchain, add - // the instance number for debuggability. + // the instance number and non-0 machine number for debuggability. // string name (mm.name + '-' + tc_name + '-' + to_string (inst)); + if (m_num != 0) + { + name += '-'; + name += to_string (m_num); + } + // Machine log. Note that it is only removed with an explicit cleanup() // call. // @@ -252,7 +272,7 @@ namespace bbot // Note that for best results you may want to adjust (e.g., by over- // committing) the number of CPUs to be power of 2. // - size_t cpus (bcpus ? *bcpus : ops.cpu ()), cores (cpus); + size_t cores (cpus); size_t sockets (cores >= 256 && cores % 8 == 0 ? 4 : cores >= 128 && cores % 4 == 0 ? 2 : 1); @@ -261,26 +281,6 @@ namespace bbot size_t threads (cores >= 16 && cores % 4 == 0 ? 2 : 1); cores /= threads; - // We probably don't want to commit all the available RAM to the VM since - // some of it could be used on the host side for caching, etc. So the - // heuristics that we will use is 4G or 1G per CPU, whichever is greater - // and the rest divide equally between the host and the VM. - // - // But the experience showed that we actually want to be able to precisely - // control the amount of RAM assigned to VMs (e.g., for tmpfs size) and - // without back-fudging for this heuristics. - // -#if 0 - size_t ram ((cpus < 4 ? 4 : cpus) * 1024 * 1024); // Kb. - - if (ram > ops.ram ()) - ram = ops.ram (); - else - ram += (ops.ram () - ram) / 2; -#else - size_t ram (ops.build_ram ()); -#endif - // If we have options, use that instead of the default network and // disk configuration. // @@ -434,7 +434,7 @@ namespace bbot // collision-wise with anything useful. // "-vnc", - (pub_vnc ? ":" : "127.0.0.1:") + to_string (offset), // 5900 + offset + (pub_vnc ? ":" : "127.0.0.1:") + to_string (offset + m_num), // 5900-base // QMP. // @@ -672,31 +672,37 @@ namespace bbot unique_ptr start_machine (const dir_path& md, const machine_manifest& mm, + uint16_t machine_num, + size_t cpus, + size_t ram, const optional& mac, const string& br_iface, uint16_t tftp_port, - bool pub_vnc, - optional bcpus) + bool pub_vnc) { + assert (machine_num < 10); + switch (mm.type) { case machine_type::kvm: return make_unique ( - md, mm, mac, br_iface, tftp_port, pub_vnc, bcpus); + md, mm, machine_num, cpus, ram, mac, br_iface, tftp_port, pub_vnc); case machine_type::nspawn: - assert (false); //@@ TODO + assert (false); // @@ TODO } return nullptr; } string - machine_vnc (bool pub) + machine_vnc (uint16_t num, bool pub) { + assert (num < 10); + string r (pub ? hip : "127.0.0.1"); r += ':'; - r += to_string (5900 + offset); + r += to_string (5900 + offset + num); return r; } } diff --git a/bbot/agent/machine.hxx b/bbot/agent/machine.hxx index 0bb74b9..13646db 100644 --- a/bbot/agent/machine.hxx +++ b/bbot/agent/machine.hxx @@ -78,20 +78,28 @@ namespace bbot class machine_manifest; + // The machine number should be between 0-9 with 0 for the build machine and + // 1-9 for the auxiliary machines. + // + // Note that tftp_port is not a base (in other words, it is expected to + // already be appropriately offset). + // unique_ptr start_machine (const dir_path&, const machine_manifest&, + uint16_t machine_num, + size_t cpus, + size_t ram, // In KiB. const optional& mac, const string& br_iface, uint16_t tftp_port, - bool pub_vnc, - optional boost_cpus = nullopt); + bool public_vnc); // Return the machine's public or private VNC session endpoint in the // ':' form. // string - machine_vnc (bool pub_vnc); + machine_vnc (uint16_t machine_num, bool public_vnc); } #endif // BBOT_AGENT_MACHINE_HXX diff --git a/bbot/agent/tftp.hxx b/bbot/agent/tftp.hxx index 103c9d6..5306dd1 100644 --- a/bbot/agent/tftp.hxx +++ b/bbot/agent/tftp.hxx @@ -29,7 +29,7 @@ namespace bbot port () const; // Wait for a TFTP request for up to the specified number of seconds. If - // increment is not 0, then wait in the specified incremenets (i.e., wait + // increment is not 0, then wait in the specified increments (i.e., wait // for up to that number of seconds; useful when one needs to also // periodically check for something else). Update the timeout value as // well as return true if a request was served and false otherwise. diff --git a/bbot/bbot-agent@.service b/bbot/bbot-agent@.service index d379b3c..253cc61 100644 --- a/bbot/bbot-agent@.service +++ b/bbot/bbot-agent@.service @@ -23,6 +23,7 @@ Environment=AUTH_KEY= Environment=INTERACTIVE=false Environment=BOOTSTRAP_TIMEOUT=3600 +Environment=BOOTSTRAP_AUXILIARY=900 Environment=BOOTSTRAP_RETRIES=2 Environment=BUILD_TIMEOUT=5400 @@ -56,6 +57,7 @@ ExecStart=/build/bots/default/bin/bbot-agent \ --auth-key ${AUTH_KEY} \ --interactive ${INTERACTIVE} \ --bootstrap-timeout ${BOOTSTRAP_TIMEOUT} \ + --bootstrap-auxiliary ${BOOTSTRAP_AUXILIARY} \ --bootstrap-retries ${BOOTSTRAP_RETRIES} \ --build-timeout ${BUILD_TIMEOUT} \ --build-retries ${BUILD_RETRIES} \ diff --git a/bbot/buildfile b/bbot/buildfile index cb7b576..bbca810 100644 --- a/bbot/buildfile +++ b/bbot/buildfile @@ -99,7 +99,7 @@ if $cli.configured # Usage options. # cli.options += --suppress-undocumented --long-usage --ansi-color \ ---page-usage 'bbot::print_$name$_' --option-length 23 +--page-usage 'bbot::print_$name$_' --option-length 25 agent/cli.cxx{agent-options}: cli.options += --include-prefix bbot/agent \ --guard-prefix BBOT_AGENT diff --git a/bbot/utility.hxx b/bbot/utility.hxx index 9bc517c..7758db4 100644 --- a/bbot/utility.hxx +++ b/bbot/utility.hxx @@ -37,6 +37,7 @@ namespace bbot // // using butl::icasecmp; + using butl::sanitize_identifier; using butl::reverse_iterate; using butl::make_guard; -- cgit v1.1