diff options
Diffstat (limited to 'bbot/agent/agent.cxx')
-rw-r--r-- | bbot/agent/agent.cxx | 3276 |
1 files changed, 2824 insertions, 452 deletions
diff --git a/bbot/agent/agent.cxx b/bbot/agent/agent.cxx index 60f7271..75f7228 100644 --- a/bbot/agent/agent.cxx +++ b/bbot/agent/agent.cxx @@ -1,15 +1,16 @@ // file : bbot/agent/agent.cxx -*- C++ -*- -// license : TBC; see accompanying LICENSE file +// license : MIT; see accompanying LICENSE file #include <bbot/agent/agent.hxx> #include <pwd.h> // getpwuid() #include <limits.h> // PATH_MAX -#include <signal.h> // signal() -#include <stdlib.h> // rand_r() -#include <unistd.h> // sleep(), getuid(), fsync(), [f]stat() +#include <signal.h> // signal(), kill() +#include <stdlib.h> // rand_r(), strto[u]ll() +#include <string.h> // strchr() +#include <unistd.h> // sleep(), getpid(), getuid(), fsync(), [f]stat() #include <ifaddrs.h> // getifaddrs(), freeifaddrs() -#include <sys/types.h> // stat +#include <sys/types.h> // stat, pid_t #include <sys/stat.h> // [f]stat() #include <sys/file.h> // flock() @@ -19,15 +20,22 @@ #include <sys/ioctl.h> #include <sys/socket.h> +#include <map> +#include <atomic> #include <chrono> +#include <thread> // thread::hardware_concurrency() #include <random> +#include <iomanip> // setw() +#include <numeric> // iota() #include <iostream> #include <system_error> // generic_category() -#include <libbutl/pager.mxx> -#include <libbutl/sha256.mxx> -#include <libbutl/openssl.mxx> -#include <libbutl/filesystem.mxx> // dir_iterator, try_rmfile(), readsymlink() +#include <libbutl/pager.hxx> +#include <libbutl/base64.hxx> +#include <libbutl/sha256.hxx> +#include <libbutl/openssl.hxx> +#include <libbutl/filesystem.hxx> // dir_iterator, try_rmfile(), readsymlink() +#include <libbutl/semantic-version.hxx> #include <libbbot/manifest.hxx> @@ -40,6 +48,7 @@ #include <bbot/agent/tftp.hxx> #include <bbot/agent/machine.hxx> +#include <bbot/agent/http-service.hxx> using namespace butl; using namespace bbot; @@ -47,6 +56,57 @@ using namespace bbot; using std::cout; using std::endl; +// If RAM minimum is not specified for a machine, then let's assume something +// plausible like 256MiB. This way we won't end up with degenerate cases where +// we attempt to start a machine with some absurd amount of RAM. +// +const std::uint64_t default_ram_minimum = 262144; + +static inline std::uint64_t +effective_ram_minimum (const machine_header_manifest& m) +{ + // Note: neither ram_minimum nor ram_maximum should be 0. + // + assert ((!m.ram_minimum || *m.ram_minimum != 0) && + (!m.ram_maximum || *m.ram_maximum != 0)); + + return (m.ram_minimum + ? *m.ram_minimum + : (m.ram_maximum && *m.ram_maximum < default_ram_minimum + ? *m.ram_maximum + : default_ram_minimum)); +} + +static std::mt19937 rand_gen (std::random_device {} ()); + +// According to the standard, atomic's use in the signal handler is only safe +// if it's lock-free. +// +#if !defined(ATOMIC_INT_LOCK_FREE) || ATOMIC_INT_LOCK_FREE != 2 +#error int is not lock-free on this architecture +#endif + +// While we can use memory_order_relaxed in a single-threaded program, let's +// use consume/release in case this process becomes multi-threaded in the +// future. +// +static std::atomic<unsigned int> sigurs1; + +using std::memory_order_consume; +using std::memory_order_release; + +extern "C" void +handle_signal (int sig) +{ + switch (sig) + { + case SIGHUP: exit (3); // Unimplemented feature. + case SIGTERM: exit (0); + case SIGUSR1: sigurs1.fetch_add (1, std::memory_order_release); break; + default: assert (false); + } +} + namespace bbot { agent_options ops; @@ -55,10 +115,12 @@ namespace bbot string tc_name; uint16_t tc_num; + path tc_lock; // Empty if no locking. standard_version tc_ver; string tc_id; - uint16_t inst; + uint16_t inst; // 1-based. + uint16_t inst_max; // 0 if priority monitoring is disabled. uint16_t offset; @@ -112,16 +174,16 @@ btrfs_exit (tracer& t, A&&... a) "btrfs", forward<A> (a)...); } -// Bootstrap the machine. Return the bootstrapped machine manifest if -// successful and nullopt otherwise (in which case the machine directory -// should be cleaned and the machine ignored for now). +// Bootstrap a build machine. Return the bootstrapped machine manifest if +// successful and nullopt otherwise (in which case the caller should clean up +// the machine directory and ignore the machine for now). // static optional<bootstrapped_machine_manifest> -bootstrap_machine (const dir_path& md, - const machine_manifest& mm, - optional<bootstrapped_machine_manifest> obmm) +bootstrap_build_machine (const dir_path& md, + const machine_manifest& mm, + optional<bootstrapped_machine_manifest> obmm) { - tracer trace ("bootstrap_machine", md.string ().c_str ()); + tracer trace ("bootstrap_build_machine", md.string ().c_str ()); bootstrapped_machine_manifest r { mm, @@ -143,10 +205,12 @@ bootstrap_machine (const dir_path& md, else try { + // Note: similar code in bootstrap_auxiliary_machine(). + // Start the TFTP server (server chroot is --tftp). Map: // - // GET requests to .../toolchains/<name>/* - // PUT requests to .../bootstrap/<name>-<instance>/* + // GET requests to .../toolchains/<toolchain>/* + // PUT requests to .../bootstrap/<toolchain>-<instance>/* // const string in_name (tc_name + '-' + to_string (inst)); auto_rmdir arm ((dir_path (ops.tftp ()) /= "bootstrap") /= in_name); @@ -170,7 +234,7 @@ bootstrap_machine (const dir_path& md, { tftp_server tftpd ("Gr ^/?(.+)$ /toolchains/" + tc_name + "/\\1\n" + "Pr ^/?(.+)$ /bootstrap/" + in_name + "/\\1\n", - ops.tftp_port () + offset); + ops.tftp_port () + offset + 0 /* build machine */); l3 ([&]{trace << "tftp server on port " << tftpd.port ();}); @@ -179,6 +243,9 @@ bootstrap_machine (const dir_path& md, unique_ptr<machine> m ( start_machine (md, mm, + 0 /* machine_num (build) */, + ops.cpu (), + ops.build_ram (), obmm ? obmm->machine.mac : nullopt, ops.bridge (), tftpd.port (), @@ -193,8 +260,11 @@ bootstrap_machine (const dir_path& md, make_exception_guard ( [&m, &md] () { - info << "trying to force machine " << md << " down"; - try {m->forcedown (false);} catch (const failed&) {} + if (m != nullptr) + { + info << "trying to force machine " << md << " down"; + try {m->forcedown (false);} catch (const failed&) {} + } })); // What happens if the bootstrap process hangs? The simple thing would @@ -218,6 +288,8 @@ bootstrap_machine (const dir_path& md, m->wait (false); m->cleanup (); info << "resuming after machine suspension"; + + // Note: snapshot cleaned up by the caller. } catch (const failed&) {} @@ -250,13 +322,14 @@ bootstrap_machine (const dir_path& md, }; // The first request should be the toolchain download. Wait for up to - // 5 minutes for that to arrive. In a sense we use it as an indication - // that the machine has booted and the bootstrap process has started. - // Why wait so long you may wonder? Well, we may be using a new MAC - // address and operating systems like Windows may need to digest that. + // 5 minutes (by default) for that to arrive. In a sense we use it as + // an indication that the machine has booted and the bootstrap process + // has started. Why wait so long you may wonder? Well, we may be using + // a new MAC address and operating systems like Windows may need to + // digest that. // size_t to; - const size_t startup_to (5 * 60); + const size_t startup_to (ops.bootstrap_startup ()); const size_t bootstrap_to (ops.bootstrap_timeout ()); const size_t shutdown_to (5 * 60); @@ -268,7 +341,9 @@ bootstrap_machine (const dir_path& md, break; if (!check_machine ()) - return nullopt; + { + return nullopt; // Note: snapshot cleaned up by the caller. + } } // This can mean two things: machine mis-configuration or what we @@ -289,6 +364,7 @@ bootstrap_machine (const dir_path& md, m->print_info (dr); try {m->forcedown (false);} catch (const failed&) {} + m = nullptr; // Disable exceptions guard above. continue; } @@ -311,7 +387,9 @@ bootstrap_machine (const dir_path& md, // The exit/upload is racy so we re-check. // if (!(file_not_empty (mf) || file_not_empty (mfo))) - return nullopt; + { + return nullopt; // Note: snapshot cleaned up by the caller. + } } bool old (false); @@ -361,7 +439,329 @@ bootstrap_machine (const dir_path& md, return r; } -// Machine locking. +// Bootstrap an auxiliary machine. Return the bootstrapped machine manifest if +// successful and nullopt otherwise (in which case the caller should clean up +// the machine directory and ignore the machine for now). +// +static vector<size_t> +divide_auxiliary_ram (const vector<const machine_header_manifest*>&); + +static optional<bootstrapped_machine_manifest> +bootstrap_auxiliary_machine (const dir_path& md, + const machine_manifest& mm, + optional<bootstrapped_machine_manifest> obmm) +{ + tracer trace ("bootstrap_auxiliary_machine", md.string ().c_str ()); + + bootstrapped_machine_manifest r { + mm, + toolchain_manifest {}, // Unused for auxiliary, + bootstrap_manifest {} // Unused for auxiliary. + }; + + if (ops.fake_bootstrap ()) + { + r.machine.mac = "de:ad:be:ef:de:ad"; + } + else + try + { + // Similar to bootstrap_build_machine() except here we just wait for the + // upload of the environment. + + // Start the TFTP server (server chroot is --tftp). Map: + // + // GET requests to /dev/null + // PUT requests to .../bootstrap/<toolchain>-<instance>/* + // + const string in_name (tc_name + '-' + to_string (inst)); + auto_rmdir arm ((dir_path (ops.tftp ()) /= "bootstrap") /= in_name); + try_mkdir_p (arm.path); + + // Environment upload. + // + path ef (arm.path / "environment"); + try_rmfile (ef); + + // Note that unlike build, here we use the same VM snapshot for retries, + // which is not ideal. + // + for (size_t retry (0);; ++retry) + { + tftp_server tftpd ("Gr ^/?(.+)$ " + string ("/dev/null") + '\n' + + "Pr ^/?(.+)$ /bootstrap/" + in_name + "/\\1\n", + ops.tftp_port () + offset + 1 /* auxiliary machine */); + + l3 ([&]{trace << "tftp server on port " << tftpd.port ();}); + + // If the machine specified RAM minimum, use that to make sure the + // machine can actually function with this amount of RAM. Otherwise, use + // the minium of RAM maximum (if specified) and the available auxiliary + // RAM (so we know this machine will at least work alone). For the + // latter case use divide_auxiliary_ram() to be consistent with the + // build case (see that function implementation for nuances). + // + size_t ram; + if (mm.ram_minimum) + ram = *mm.ram_minimum; + else + { + vector<size_t> rams (divide_auxiliary_ram ({&mm})); + assert (!rams.empty ()); // We should have skipped such a machine. + ram = rams.front (); + } + + // Start the machine. + // + unique_ptr<machine> m ( + start_machine (md, + mm, + 1 /* machine_num (first auxiliary) */, + ops.cpu (), + ram, + obmm ? obmm->machine.mac : nullopt, + ops.bridge (), + tftpd.port (), + false /* pub_vnc */)); + + { + // NOTE: see bootstrap_build_machine() for comments. + + auto mg ( + make_exception_guard ( + [&m, &md] () + { + if (m != nullptr) + { + info << "trying to force machine " << md << " down"; + try {m->forcedown (false);} catch (const failed&) {} + } + })); + + auto soft_fail = [&md, &m] (const char* msg) + { + { + diag_record dr (error); + dr << msg << " for machine " << md << ", suspending"; + m->print_info (dr); + } + + try + { + m->suspend (false); + m->wait (false); + m->cleanup (); + info << "resuming after machine suspension"; + + // Note: snapshot cleaned up by the caller. + } + catch (const failed&) {} + + return nullopt; + }; + + auto check_machine = [&md, &m] () + { + try + { + size_t t (0); + if (!m->wait (t /* seconds */, false /* fail_hard */)) + return true; // Still running. + + // Exited successfully. + } + catch (const failed&) + { + // Failed, exit code diagnostics has already been issued. + } + + diag_record dr (error); + dr << "machine " << md << " exited unexpectedly"; + m->print_info (dr); + + return false; + }; + + // Wait up to the specified timeout for the auxiliary machine to + // bootstrap. Note that such a machine may do extra setup work on the + // first boot (such as install some packages, etc) which may take some + // time. + // + size_t to; + const size_t bootstrap_to (ops.bootstrap_auxiliary ()); + const size_t shutdown_to (5 * 60); + + // Serve TFTP requests while periodically checking for the environment + // file. + // + for (to = bootstrap_to; to != 0; ) + { + if (tftpd.serve (to, 2)) + continue; + + if (!check_machine ()) + { + if (!file_not_empty (ef)) + { + return nullopt; // Note: snapshot cleaned up by the caller. + } + } + + if (file_not_empty (ef)) + { + if (!tftpd.serve (to, 5)) + break; + } + } + + if (to == 0) + { + if (retry > ops.bootstrap_retries ()) + return soft_fail ("bootstrap timeout"); + + // Note: keeping the logs behind (no cleanup). + + diag_record dr (warn); + dr << "machine " << mm.name << " mis-booted, retrying"; + m->print_info (dr); + + try {m->forcedown (false);} catch (const failed&) {} + m = nullptr; // Disable exceptions guard above. + continue; + } + + l3 ([&]{trace << "completed bootstrap in " << bootstrap_to - to << "s";}); + + // Shut the machine down cleanly. + // + if (!m->shutdown ((to = shutdown_to))) + return soft_fail ("bootstrap shutdown timeout"); + + l3 ([&]{trace << "completed shutdown in " << shutdown_to - to << "s";}); + + m->cleanup (); + } + + r.machine.mac = m->mac; // Save the MAC address. + + break; + } + } + catch (const system_error& e) + { + fail << "bootstrap error: " << e; + } + + serialize_manifest (r, md / "manifest", "bootstrapped machine"); + return r; +} + +// Global toolchain lock. +// +// The overall locking protocol is as follows: +// +// 1. Before enumerating the machines each agent instance acquires the global +// toolchain lock. +// +// 2. As the agent enumerates over the machines, it tries to acquire the lock +// for each machine. +// +// 3. If the agent encounters a machine that it needs to bootstrap, it +// releases all the other machine locks followed by the global lock, +// proceeds to bootstrap the machine, releases its lock, and restarts the +// process from scratch. +// +// 4. Otherwise, upon receiving a task response for one of the machines (plus, +// potentially, a number of auxiliary machines), the agent releases all the +// other machine locks followed by the global lock, proceeds to perform the +// task on the selected machine(s), releases their locks, and restarts the +// process from scratch. +// +// One notable implication of this protocol is that the machine locks are +// only acquired while holding the global toolchain lock but can be released +// while not holding this lock. +// +// (Note that because of this implication it can theoretically be possible +// to omit acquiring all the machine locks during the enumeration process, +// instead only acquiring the lock of the machine we need to bootstrap or +// build. However, the current approach is simpler since we still need +// to detect machines that are already locked, which entails acquiring +// the lock anyway.) +// +// Note that unlike the machine lock below, here we don't bother with removing +// the lock file. +// +class toolchain_lock +{ +public: + toolchain_lock () = default; // Empty lock. + + // Note: returns true if locking is disabled. + // + bool + locked () const + { + return tc_lock.empty () || fl_; + } + + void + unlock (bool ignore_errors = false) + { + if (fl_) + { + fl_ = false; // We have tried. + + if (flock (fd_.get (), LOCK_UN) != 0 && !ignore_errors) + throw_generic_error (errno); + } + } + + ~toolchain_lock () + { + unlock (true /* ignore_errors */); + } + + toolchain_lock (toolchain_lock&&) = default; + toolchain_lock& operator= (toolchain_lock&&) = default; + + toolchain_lock (const toolchain_lock&) = delete; + toolchain_lock& operator= (const toolchain_lock&) = delete; + + // Implementation details. + // +public: + explicit + toolchain_lock (auto_fd&& fd) + : fd_ (move (fd)), fl_ (true) {} + +private: + auto_fd fd_; + bool fl_ = false; +}; + +// Note: returns empty lock if toolchain locking is disabled. +// +static optional<toolchain_lock> +lock_toolchain (unsigned int timeout) +{ + if (tc_lock.empty ()) + return toolchain_lock (); + + auto_fd fd (fdopen (tc_lock, fdopen_mode::out | fdopen_mode::create)); + + for (; flock (fd.get (), LOCK_EX | LOCK_NB) != 0; sleep (1), --timeout) + { + if (errno != EWOULDBLOCK) + throw_generic_error (errno); + + if (timeout == 0) + return nullopt; + } + + return toolchain_lock (move (fd)); +} + +// Per-toolchain machine lock. // // We use flock(2) which is straightforward. The tricky part is cleaning the // file up. Here we may have a race when two processes are trying to open & @@ -374,14 +774,29 @@ bootstrap_machine (const dir_path& md, // guaranteed to be atomic (in case later we want to support exclusive // bootstrap and shared build). // +// Note also that we per-toolchain lock auxiliary machines even though they +// are not toolchain-specific. Doing it this way allows us to handle both +// types of machines consistently with regards to priorities, interrupts, etc. +// It also means we will have each auxiliary machine available per-toolchain +// rather than a single machine shared between all the toolchains, which is +// a good thing. +// class machine_lock { public: - machine_lock () = default; // Empty lock. + // A lock is either locked by this process or it contains information about + // the process holding the lock. + // + pid_t pid; // Process using the machine. + optional<uint64_t> prio; // Task priority (absent means being bootstrapped + // or have been suspended). - ~machine_lock () + machine_lock () = default; // Uninitialized lock. + + bool + locked () const { - unlock (true /* ignore_errors */); + return fl_; } void @@ -391,13 +806,69 @@ public: { fl_ = false; // We have tried. - try_rmfile (fp_, ignore_errors); + if (fd_ != nullfd) + { + try_rmfile (fp_, ignore_errors); - if (flock (fd_.get (), LOCK_UN) != 0 && !ignore_errors) - throw_generic_error (errno); + if (flock (fd_.get (), LOCK_UN) != 0 && !ignore_errors) + throw_generic_error (errno); + } } } + // Write the holding process information to the lock file. + // + // Must be called while holding the toolchain lock (see the lock_machine() + // implementation for rationale). + // + void + bootstrap (const toolchain_lock& tl) + { + assert (tl.locked () && fl_); + + if (fd_ != nullfd) + write (nullopt); + } + + void + perform_task (const toolchain_lock& tl, uint64_t prio) + { + assert (tl.locked () && fl_); + + if (fd_ != nullfd) + write (prio); + } + + // Truncate the holding process information after the call to perform_task() + // so that it doesn't contain the priority, marking the machine as being + // suspended. + // + // Note that this one can be called without holding the toolchain lock. + // + void + suspend_task () + { + assert (fl_); + + if (fd_ != nullfd) + { + assert (tp_ != 0); // Must be called after perform_task(). + + // While there is no direct statement to this effect in POSIX, the + // consensus on the internet is that truncation is atomic, in a sense + // that the reader shouldn't see a partially truncated content. Feels + // like should be doubly so when actually truncating as opposed to + // extending the size, which is what we do. + // + fdtruncate (fd_.get (), tp_); + } + } + + ~machine_lock () + { + unlock (true /* ignore_errors */); + } + machine_lock (machine_lock&&) = default; machine_lock& operator= (machine_lock&&) = default; @@ -407,30 +878,119 @@ public: // Implementation details. // public: + // If fd is nullfd, treat it as a fake lock (used for fake machines). + // machine_lock (path&& fp, auto_fd&& fd) : fp_ (move (fp)), fd_ (move (fd)), fl_ (true) {} + machine_lock (pid_t pi, optional<uint64_t> pr) + : pid (pi), prio (pr), fl_ (false) {} + private: - path fp_; - auto_fd fd_; - bool fl_ = false; + void + write (optional<uint64_t> prio) + { + pid_t pid (getpid ()); + + string l (to_string (pid)); + + if (prio) + { + tp_ = l.size (); // Truncate position. + + l += ' '; + l += to_string (*prio); + } + + auto n (fdwrite (fd_.get (), l.c_str (), l.size ())); + + if (n == -1) + throw_generic_ios_failure (errno); + + if (static_cast<size_t> (n) != l.size ()) + throw_generic_ios_failure (EFBIG); + } + +private: + path fp_; + auto_fd fd_; + bool fl_ = false; + uint64_t tp_ = 0; // Truncate position. }; -// Try to lock the machine given its -<toolchain> directory. +// Try to lock the machine given its -<toolchain> directory. Return unlocked +// lock with pid/prio if already in use. Must be called while holding the +// toolchain lock. // -static optional<machine_lock> -lock_machine (const dir_path& tp) +static machine_lock +lock_machine (const toolchain_lock& tl, const dir_path& tp) { + assert (tl.locked ()); + path fp (tp + ".lock"); // The -<toolchain>.lock file. for (;;) { - auto_fd fd (fdopen (fp, fdopen_mode::out | fdopen_mode::create)); + auto_fd fd (fdopen (fp, (fdopen_mode::in | + fdopen_mode::out | + fdopen_mode::create))); if (flock (fd.get (), LOCK_EX | LOCK_NB) != 0) { if (errno == EWOULDBLOCK) - return nullopt; + { + // The file should contain a line in the following format: + // + // <pid>[ <prio>] + // + char buf[64]; // Sufficient for 2 64-bit numbers (20 decimals max). + + auto sn (fdread (fd.get (), buf, sizeof (buf))); + + if (sn == -1) + throw_generic_ios_failure (errno); + + size_t n (static_cast<size_t> (sn)); + + // While there would be a race between locking the file then writing + // to it in one process and reading from it in another process, we are + // protected by the global toolchain lock, which must be held by both + // sides during this dance. + // + assert (n > 0 && n < sizeof (buf)); + buf[n] = '\0'; + + // Note also that it's possible that by the time we read the pid/prio + // the lock has already been released. But this case is no different + // from the lock being released after we have read pid/prio but before + // acting on this information (e.g., trying to interrupt the other + // process), which we have to deal with anyway. + // + pid_t pid; + optional<uint64_t> prio; + { + char* p (strchr (buf, ' ')); + char* e; + + { + errno = 0; + pid = strtoll (buf, &e, 10); // Note: pid_t is signed. + assert (errno != ERANGE && + e != buf && + (p != nullptr ? e == p : *e == '\0')); + } + + if (p != nullptr) + { + ++p; + errno = 0; + prio = strtoull (p, &e, 10); + assert (errno != ERANGE && e != p && *e == '\0'); + } + } + + return machine_lock (pid, prio); + } throw_generic_error (errno); } @@ -444,7 +1004,7 @@ lock_machine (const dir_path& tp) if (st1.st_ino == st2.st_ino) return machine_lock (move (fp), move (fd)); - // Note: unlocked by close(). + // Retry (note: lock is unlocked by auto_fd::close()). } } @@ -461,375 +1021,1131 @@ snapshot_path (const dir_path& tp) to_string (inst)); } -// Return available machines, (re-)bootstrapping them if necessary. +// Compare bbot and library versions returning -1 if older, 0 if the same, +// and +1 if newer. +// +static int +compare_bbot (const bootstrap_manifest& m) +{ + auto cmp = [&m] (const string& n, const char* v) -> int + { + standard_version sv (v); + auto i = m.versions.find (n); + + return (i == m.versions.end () || i->second < sv + ? -1 + : i->second > sv ? 1 : 0); + }; + + // Start from the top assuming a new dependency cannot be added without + // changing the dependent's version. + // + int r; + return ( + (r = cmp ("bbot", BBOT_VERSION_STR)) != 0 ? r : + (r = cmp ("libbbot", LIBBBOT_VERSION_STR)) != 0 ? r : + (r = cmp ("libbpkg", LIBBPKG_VERSION_STR)) != 0 ? r : + (r = cmp ("libbutl", LIBBUTL_VERSION_STR)) != 0 ? r : 0); +}; + +// Return the global toolchain lock and the list of available machines, +// (re-)bootstrapping them if necessary. +// +// Note that this function returns both machines that this process managed to +// lock as well as the machines locked by other processes (including those +// that are being bootstrapped or that have been suspended), in case the +// caller needs to interrupt one of them for a higher-priority task. In the +// latter case, the manifest is empty if the machine is bootstrapping or +// suspended and only has the machine_manifest information otherwise. (The +// bootstrapping/suspended machines have to be returned to get the correct +// count of currently active instances for the inst_max comparison.) +// +// Note that both build and auxiliary machines are returned. For auxiliary, +// toolchain and bootstrap manifests are unused and therefore always empty. // struct bootstrapped_machine { - dir_path path; - bootstrapped_machine_manifest manifest; machine_lock lock; + const dir_path path; + bootstrapped_machine_manifest manifest; }; using bootstrapped_machines = vector<bootstrapped_machine>; -static bootstrapped_machines +static pair<toolchain_lock, bootstrapped_machines> enumerate_machines (const dir_path& machines) try { tracer trace ("enumerate_machines", machines.string ().c_str ()); - bootstrapped_machines r; - - if (ops.fake_machine_specified ()) + for (;;) // From-scratch retry loop for after bootstrap (see below). { - auto mh ( - parse_manifest<machine_header_manifest> ( - ops.fake_machine (), "machine header")); - - r.push_back ( - bootstrapped_machine { - dir_path (ops.machines ()) /= mh.name, // For diagnostics. - bootstrapped_machine_manifest { - machine_manifest { - move (mh.id), - move (mh.name), - move (mh.summary), - machine_type::kvm, - string ("de:ad:be:ef:de:ad"), - nullopt, - strings ()}, - toolchain_manifest {tc_id}, - bootstrap_manifest {}}, - machine_lock ()}); - - return r; - } - - // Notice and warn if there are no machines (as opposed to all of them being - // locked). - // - bool none (true); + pair<toolchain_lock, bootstrapped_machines> pr; - // The first level are machine volumes. - // - for (const dir_entry& ve: dir_iterator (machines, - false /* ignore_dangling */)) - { - const string vn (ve.path ().string ()); + { + optional<toolchain_lock> l; + while (!(l = lock_toolchain (60 /* seconds */))) + { + // One typical situation where this can happen is when another agent + // takes a while to request a task (e.g., due to network issues). So + // this is an info as opposed to a warning. + // + info << "unable to acquire global toolchain lock " << tc_lock + << " for 60s"; + } + pr.first = move (*l); + } - // Ignore hidden directories. - // - if (ve.type () != entry_type::directory || vn[0] == '.') - continue; + toolchain_lock& tl (pr.first); + bootstrapped_machines& r (pr.second); - const dir_path vd (dir_path (machines) /= vn); + if (ops.fake_machine_specified ()) + { + auto mh ( + parse_manifest<machine_header_manifest> ( + ops.fake_machine (), "machine header")); + + r.push_back ( + bootstrapped_machine { + machine_lock (path (), nullfd), // Fake lock. + dir_path (ops.machines ()) /= mh.name, // For diagnostics. + bootstrapped_machine_manifest { + machine_manifest { + move (mh.id), + move (mh.name), + move (mh.summary), + machine_type::kvm, + string ("de:ad:be:ef:de:ad"), + nullopt, + strings ()}, + toolchain_manifest {tc_id}, + bootstrap_manifest {}}}); + + return pr; + } - // Inside we have machines. + // Notice and warn if there are no build machines (as opposed to all of + // them being busy). // - try + bool none (true); + + // We used to (re)-bootstrap machines as we are iterating. But with the + // introduction of the priority monitoring functionality we need to + // respect the --instance-max value. Which means we first need to try to + // lock all the machines in order to determine how many of them are busy + // then check this count against --instance-max, and only bootstrap if we + // are not over the limit. Which means we have to store all the + // information about a (first) machine that needs bootstrapping until + // after we have enumerated all of them. + // + struct pending_bootstrap { - for (const dir_entry& me: dir_iterator (vd, false /* ignore_dangling */)) - { - const string mn (me.path ().string ()); + machine_lock ml; + dir_path tp; // -<toolchain> + dir_path xp; // -<toolchain>-<xxx> + machine_manifest mm; + optional<bootstrapped_machine_manifest> bmm; + }; + optional<pending_bootstrap> pboot; - if (me.type () != entry_type::directory || mn[0] == '.') - continue; + // The first level are machine volumes. + // + for (const dir_entry& ve: dir_iterator (machines, dir_iterator::no_follow)) + { + const string vn (ve.path ().string ()); - const dir_path md (dir_path (vd) /= mn); + // Ignore hidden directories. + // + if (ve.type () != entry_type::directory || vn[0] == '.') + continue; - // Our endgoal here is to obtain a bootstrapped snapshot of this - // machine while watching out for potential race conditions (other - // instances as well as machines being added/upgraded/removed; see the - // manual for details). - // - // So here is our overall plan: - // - // 1. Resolve current subvolume link for our bootstrap protocol. - // - // 2. Lock the machine. This excludes any other instance from trying - // to perform the following steps. - // - // 3. If there is no link, cleanup old bootstrap (if any) and ignore - // this machine. - // - // 4. Try to create a snapshot of current subvolume (this operation is - // atomic). If failed (e.g., someone changed the link and removed - // the subvolume in the meantime), retry from #1. - // - // 5. Compare the snapshot to the already bootstrapped version (if - // any) and see if we need to re-bootstrap. If so, use the snapshot - // as a starting point. Rename to bootstrapped at the end (atomic). - // - dir_path lp (dir_path (md) /= (mn + '-' + bs_prot)); // -<P> - dir_path tp (dir_path (md) /= (mn + '-' + tc_name)); // -<toolchain> + const dir_path vd (dir_path (machines) /= vn); - auto delete_bootstrapped = [&tp, &trace] () // Delete -<toolchain>. + // Inside we have machines. + // + try + { + for (const dir_entry& me: dir_iterator (vd, dir_iterator::no_follow)) { - run_btrfs (trace, "property", "set", "-ts", tp, "ro", "false"); - run_btrfs (trace, "subvolume", "delete", tp); - }; + const string mn (me.path ().string ()); - for (size_t retry (0);; ++retry) - { - if (retry != 0) - sleep (1); + if (me.type () != entry_type::directory || mn[0] == '.') + continue; - // Resolve the link to subvolume path. + const dir_path md (dir_path (vd) /= mn); + + // Our endgoal here is to obtain a bootstrapped snapshot of this + // machine while watching out for potential race conditions (other + // instances as well as machines being added/upgraded/removed; see + // the manual for details). + // + // So here is our overall plan: // - dir_path sp; // <name>-<P>.<R> + // 1. Resolve current subvolume link for our bootstrap protocol. + // + // 2. Lock the machine. This excludes any other instance from trying + // to perform the following steps. + // + // 3. If there is no link, cleanup old bootstrap (if any) and ignore + // this machine. + // + // 4. Try to create a snapshot of current subvolume (this operation + // is atomic). If failed (e.g., someone changed the link and + // removed the subvolume in the meantime), retry from #1. + // + // 5. Compare the snapshot to the already bootstrapped version (if + // any) and see if we need to re-bootstrap. If so, use the + // snapshot as a starting point. Rename to bootstrapped at the + // end (atomic). + // + dir_path lp (dir_path (md) /= (mn + '-' + bs_prot)); // -<P> + dir_path tp (dir_path (md) /= (mn + '-' + tc_name)); // -<toolchain> - try + auto delete_bootstrapped = [&tp, &trace] () // Delete -<toolchain>. { - sp = path_cast<dir_path> (readsymlink (lp)); + run_btrfs (trace, "property", "set", "-ts", tp, "ro", "false"); + run_btrfs (trace, "subvolume", "delete", tp); + }; - if (sp.relative ()) - sp = md / sp; - } - catch (const system_error& e) + for (size_t retry (0);; ++retry) { - // Leave the subvolume path empty if the subvolume link doesn't - // exist and fail on any other error. + if (retry != 0) + sleep (1); + + // Resolve the link to subvolume path. // - if (e.code ().category () != std::generic_category () || - e.code ().value () != ENOENT) - fail << "unable to read subvolume link " << lp << ": " << e; - } + dir_path sp; // <name>-<P>.<R> - none = none && sp.empty (); + try + { + sp = path_cast<dir_path> (readsymlink (lp)); - // Try to lock the machine, skipping it if already locked. - // - optional<machine_lock> ml (lock_machine (tp)); + if (sp.relative ()) + sp = md / sp; + } + catch (const system_error& e) + { + // Leave the subvolume path empty if the subvolume link doesn't + // exist and fail on any other error. + // + if (e.code ().category () != std::generic_category () || + e.code ().value () != ENOENT) + fail << "unable to read subvolume link " << lp << ": " << e; + } - if (!ml) - { - l4 ([&]{trace << "skipping " << md << ": locked";}); - break; - } + // Try to lock the machine. + // + machine_lock ml (lock_machine (tl, tp)); - bool te (dir_exists (tp)); + if (!ml.locked ()) + { + machine_manifest mm; + if (ml.prio) + { + // Get the machine manifest (subset of the steps performed for + // the locked case below). + // + // Note that it's possible the machine we get is not what was + // originally locked by the other process (e.g., it has been + // upgraded since). It's also possible that if and when we + // interrupt and lock this machine, it will be a different + // machine (e.g., it has been upgraded since we read this + // machine manifest). To deal with all of that we will be + // reloading this information if/when we acquire the lock to + // this machine. + // + if (sp.empty ()) + { + l3 ([&]{trace << "skipping " << md << ": no subvolume link";}); + break; + } + + l3 ([&]{trace << "keeping " << md << ": locked by " << ml.pid + << " with priority " << *ml.prio;}); + + mm = parse_manifest<machine_manifest> ( + sp / "manifest", "machine"); + + none = none && mm.effective_role () == machine_role::auxiliary; + } + else // Bootstrapping/suspended. + { + l3 ([&]{trace << "keeping " << md << ": being bootstrapped " + << "or suspened by " << ml.pid;}); - // If the resolution fails, then this means there is no current - // machine subvolume (for this bootstrap protocol). In this case we - // clean up our toolchain subvolume (-<toolchain>, if any) and - // ignore this machine. - // - if (sp.empty ()) - { - if (te) - delete_bootstrapped (); + // Assume it is a build machine (we cannot determine whether + // it is build or auxiliary without loading its manifest). + // + none = false; + } - l3 ([&]{trace << "skipping " << md << ": no subvolume link";}); - break; - } + // Add the machine to the lists and bail out. + // + r.push_back (bootstrapped_machine { + move (ml), + move (tp), + bootstrapped_machine_manifest {move (mm), {}, {}}}); - // <name>-<toolchain>-<xxx> - // - const dir_path xp (snapshot_path (tp)); + break; + } - if (btrfs_exit (trace, "subvolume", "snapshot", sp, xp) != 0) - { - if (retry >= 10) - fail << "unable to snapshot subvolume " << sp; + bool te (dir_exists (tp)); - continue; - } + // If the resolution fails, then this means there is no current + // machine subvolume (for this bootstrap protocol). In this case + // we clean up our toolchain subvolume (-<toolchain>, if any) and + // ignore this machine. + // + if (sp.empty ()) + { + if (te) + delete_bootstrapped (); - // Load the (original) machine manifest. - // - auto mm ( - parse_manifest<machine_manifest> (sp / "manifest", "machine")); + l3 ([&]{trace << "skipping " << md << ": no subvolume link";}); + break; + } - // If we already have <name>-<toolchain>, see if it needs to be re- - // bootstrapped. Things that render it obsolete: - // - // 1. New machine revision (compare machine ids). - // 2. New toolchain (compare toolchain ids). - // 3. New bbot/libbbot (compare versions). - // - // The last case has a complication: what should we do if we have - // bootstrapped a newer version of bbot? This would mean that we are - // about to be stopped and upgraded (and the upgraded version will - // probably be able to use the result). So we simply ignore this - // machine for this run. + // <name>-<toolchain>-<xxx> + // + dir_path xp (snapshot_path (tp)); - // Return -1 if older, 0 if the same, and +1 if newer. - // - auto compare_bbot = [] (const bootstrap_manifest& m) -> int - { - auto cmp = [&m] (const string& n, const char* v) -> int + if (btrfs_exit (trace, "subvolume", "snapshot", sp, xp) != 0) { - standard_version sv (v); - auto i = m.versions.find (n); + if (retry >= 10) + fail << "unable to snapshot subvolume " << sp; - return (i == m.versions.end () || i->second < sv - ? -1 - : i->second > sv ? 1 : 0); - }; + continue; + } - // Start from the top assuming a new dependency cannot be added - // without changing the dependent's version. + // Load the (original) machine manifest. // - int r; - return - (r = cmp ("bbot", BBOT_VERSION_STR)) != 0 ? r : - (r = cmp ("libbbot", LIBBBOT_VERSION_STR)) != 0 ? r : - (r = cmp ("libbpkg", LIBBPKG_VERSION_STR)) != 0 ? r : - (r = cmp ("libbutl", LIBBUTL_VERSION_STR)) != 0 ? r : 0; - }; + machine_manifest mm ( + parse_manifest<machine_manifest> (sp / "manifest", "machine")); - optional<bootstrapped_machine_manifest> bmm; - if (te) - { - bmm = parse_manifest<bootstrapped_machine_manifest> ( - tp / "manifest", "bootstrapped machine"); + bool aux (mm.effective_role () == machine_role::auxiliary); - if (bmm->machine.id != mm.id) + // Skip machines for which we don't have sufficient RAM. + // + if (effective_ram_minimum (mm) > + (aux ? ops.auxiliary_ram () : ops.build_ram ())) { - l3 ([&]{trace << "re-bootstrapping " << tp << ": new machine";}); - te = false; + l3 ([&]{trace << "skipping " << md << ": insufficient RAM";}); + run_btrfs (trace, "subvolume", "delete", xp); + break; } - if (!tc_id.empty () && bmm->toolchain.id != tc_id) - { - l3 ([&]{trace << "re-bootstrapping " << tp << ": new toolchain";}); - te = false; - } + none = none && aux; - if (int i = compare_bbot (bmm->bootstrap)) + // If we already have <name>-<toolchain>, see if it needs to be + // re-bootstrapped. Things that render it obsolete: + // + // 1. New machine revision (compare machine ids). + // 2. New toolchain (compare toolchain ids, not auxiliary). + // 3. New bbot/libbbot (compare versions, not auxiliary). + // + // The last case has a complication: what should we do if we have + // bootstrapped a newer version of bbot? This would mean that we + // are about to be stopped and upgraded (and the upgraded version + // will probably be able to use the result). So we simply ignore + // this machine for this run. + // + // Note: see similar code in the machine interruption logic. + // + optional<bootstrapped_machine_manifest> bmm; + if (te) { - if (i < 0) + bmm = parse_manifest<bootstrapped_machine_manifest> ( + tp / "manifest", "bootstrapped machine"); + + if (bmm->machine.id != mm.id) { - l3 ([&]{trace << "re-bootstrapping " << tp << ": new bbot";}); + l3 ([&]{trace << "re-bootstrap " << tp << ": new machine";}); te = false; } - else + + if (!aux) { - l3 ([&]{trace << "ignoring " << tp << ": old bbot";}); - run_btrfs (trace, "subvolume", "delete", xp); - break; + if (!tc_id.empty () && bmm->toolchain.id != tc_id) + { + l3 ([&]{trace << "re-bootstrap " << tp << ": new toolchain";}); + te = false; + } + + if (int i = compare_bbot (bmm->bootstrap)) + { + if (i < 0) + { + l3 ([&]{trace << "re-bootstrap " << tp << ": new bbot";}); + te = false; + } + else + { + l3 ([&]{trace << "ignoring " << tp << ": old bbot";}); + run_btrfs (trace, "subvolume", "delete", xp); + break; + } + } } + + if (!te) + delete_bootstrapped (); } + else + l3 ([&]{trace << "bootstrap " << tp;}); if (!te) - delete_bootstrapped (); - } - else - l3 ([&]{trace << "bootstrapping " << tp;}); + { + // Ignore any other machines that need bootstrapping. + // + if (!pboot) + { + pboot = pending_bootstrap { + move (ml), move (tp), move (xp), move (mm), move (bmm)}; + } + else + run_btrfs (trace, "subvolume", "delete", xp); - if (!te) - { - // Use the <name>-<toolchain>-<xxx> snapshot that we have made to - // bootstrap the new machine. Then atomically rename it to - // <name>-<toolchain>. - // - // Also release all the machine locks that we have acquired so far - // since the bootstrap will take a while and other instances might - // be able to use them. + break; + } + else + run_btrfs (trace, "subvolume", "delete", xp); + + // Add the machine to the lists. // - r.clear (); + r.push_back ( + bootstrapped_machine {move (ml), move (tp), move (*bmm)}); - bmm = bootstrap_machine (xp, mm, move (bmm)); + break; + } // Retry loop. + } // Inner dir_iterator loop. + } + catch (const system_error& e) + { + fail << "unable to iterate over " << vd << ": " << e; + } + } // Outer dir_iterator loop. - if (!bmm) - { - l3 ([&]{trace << "ignoring " << tp << ": failed to bootstrap";}); - run_btrfs (trace, "subvolume", "delete", xp); - break; - } + // See if there is a pending bootstrap and whether we can perform it. + // + // What should we do if we can't (i.e., we are in the priority monitor + // mode)? Well, we could have found some machines that are already + // bootstrapped (busy or not) and there may be a higher-priority task for + // one of them, so it feels natural to return whatever we've got. + // + if (pboot) + { + dir_path& tp (pboot->tp); + dir_path& xp (pboot->xp); - try - { - mvdir (xp, tp); - } - catch (const system_error& e) - { - fail << "unable to rename " << xp << " to " << tp; - } + // Determine how many machines are busy (locked by other processes) and + // make sure it's below the --instance-max limit, if specified. + // + // We should only count build machines unless being bootstrapped (see + // above). + // + if (inst_max != 0) + { + size_t busy (0); + for (const bootstrapped_machine& m: r) + { + if (!m.lock.locked () && + (!m.lock.prio || + m.manifest.machine.effective_role () != machine_role::auxiliary)) + ++busy; + } - l2 ([&]{trace << "bootstrapped " << bmm->machine.name;}); + assert (busy <= inst_max); - // Check the bootstrapped bbot version as above and ignore this - // machine if it's newer than us. - // - if (int i = compare_bbot (bmm->bootstrap)) - { - if (i > 0) - { - l3 ([&]{trace << "ignoring " << tp << ": old bbot";}); - break; - } - else - warn << "bootstrapped " << tp << " bbot worker is older " - << "than agent; assuming test setup"; - } - } + if (busy == inst_max) + { + l3 ([&]{trace << "instance max reached attempting to bootstrap " + << tp;}); + run_btrfs (trace, "subvolume", "delete", xp); + return pr; + } + } + + machine_lock& ml (pboot->ml); + + l3 ([&]{trace << "bootstrapping " << tp;}); + + // Use the -<toolchain>-<xxx> snapshot that we have made to bootstrap + // the new machine. Then atomically rename it to -<toolchain>. + // + // Also release all the machine locks that we have acquired so far as + // well as the global toolchain lock, since the bootstrap will take a + // while and other instances might be able to use them. Because we are + // releasing the global lock, we have to restart the enumeration process + // from scratch. + // + r.clear (); + ml.bootstrap (tl); + tl.unlock (); + + bool aux (pboot->mm.effective_role () == machine_role::auxiliary); + + optional<bootstrapped_machine_manifest> bmm ( + aux + ? bootstrap_auxiliary_machine (xp, pboot->mm, move (pboot->bmm)) + : bootstrap_build_machine (xp, pboot->mm, move (pboot->bmm))); + + if (!bmm) + { + l3 ([&]{trace << "ignoring " << tp << ": failed to bootstrap";}); + run_btrfs (trace, "subvolume", "delete", xp); + continue; + } + + try + { + mvdir (xp, tp); + } + catch (const system_error& e) + { + fail << "unable to rename " << xp << " to " << tp; + } + + l2 ([&]{trace << "bootstrapped " << bmm->machine.name;}); + + // Check the bootstrapped bbot version as above and ignore this build + // machine if it's newer than us. + // + if (!aux) + { + if (int i = compare_bbot (bmm->bootstrap)) + { + if (i > 0) + l3 ([&]{trace << "ignoring " << tp << ": old bbot";}); else - run_btrfs (trace, "subvolume", "delete", xp); + warn << "bootstrapped " << tp << " bbot worker is older " + << "than agent; assuming test setup"; + } + } - // Add the machine to the lists. - // - r.push_back ( - bootstrapped_machine {move (tp), move (*bmm), move (*ml)}); + continue; // Re-enumerate from scratch. + } + + if (none) + warn << "no build machines for toolchain " << tc_name; + + return pr; + + } // From-scratch retry loop. + + // Unreachable. +} +catch (const system_error& e) +{ + fail << "unable to iterate over " << machines << ": " << e << endf; +} + +// Perform the build task throwing interrupt if it has been interrupted. +// +struct interrupt {}; + +// Start an auxiliary machine (steps 1-3 described in perfrom_task() below). +// +// Note that if the returned machine is NULL, then it means it has failed to +// start up (in which case the diagnostics has already been issued and +// snapshot cleaned up). +// +// Note: can throw interrupt. +// +struct auxiliary_machine_result +{ + dir_path snapshot; + unique_ptr<bbot::machine> machine; +}; + +using auxiliary_machine_results = vector<auxiliary_machine_result>; + +static pair<auxiliary_machine_result, string /* environment */> +start_auxiliary_machine (bootstrapped_machine& am, + const string& env_name, + uint16_t machine_num, + size_t ram, + const string& in_name, // <toolchain>-<instance> + const dir_path& tftp_put_dir, + optional<size_t> boost_cpus) +try +{ + tracer trace ("start_auxiliary_machine", am.path.string ().c_str ()); + // NOTE: a simplified version of perform_task() below. + + machine_lock& ml (am.lock); + const dir_path& md (am.path); + const bootstrapped_machine_manifest& mm (am.manifest); + + path ef (tftp_put_dir / "environment"); // Environment upload file. + path efm (ef + '-' + mm.machine.name); // Environment upload saved file. + try_rmfile (ef); + try_rmfile (efm); + + // <name>-<toolchain>-<xxx> + // + const dir_path xp (snapshot_path (md)); + + for (size_t retry (0);; ++retry) + { + if (retry != 0) + run_btrfs (trace, "subvolume", "delete", xp); + + run_btrfs (trace, "subvolume", "snapshot", md, xp); + + // Start the TFTP server. Map: + // + // GET requests to /dev/null + // PUT requests to .../build/<toolchain>-<instance>/put/* + // + // Note that we only need to run the TFTP server until we get the + // environment upload. Which means we could have reused the same port as + // the build machine. But let's keep things parallel to the VNC ports and + // use a seperate TFTP port for each auxiliary machine. + // + tftp_server tftpd ("Gr ^/?(.+)$ " + string ("/dev/null") + '\n' + + "Pr ^/?(.+)$ /build/" + in_name + "/put/\\1\n", + ops.tftp_port () + offset + machine_num); + + l3 ([&]{trace << "tftp server on port " << tftpd.port ();}); + + // Note: the machine handling logic is similar to bootstrap. Except here + // we have to cleanup the snapshot ourselves in case of suspension or + // unexpected exit. + + // Start the machine. + // + // Note that for now we don't support logging in into auxiliary machines + // in the interactive mode. Maybe one day. + // + unique_ptr<machine> m ( + start_machine (xp, + mm.machine, + machine_num, + boost_cpus ? *boost_cpus : ops.cpu (), + ram, + mm.machine.mac, + ops.bridge (), + tftpd.port (), + false /* public_vnc */)); + + auto mg ( + make_exception_guard ( + [&m, &xp] () + { + if (m != nullptr) + { + info << "trying to force machine " << xp << " down"; + try {m->forcedown (false);} catch (const failed&) {} + } + })); + + auto soft_fail = [&trace, &ml, &xp, &m] (const char* msg) + { + { + diag_record dr (error); + dr << msg << " for machine " << xp << ", suspending"; + m->print_info (dr); + } + + try + { + // Update the information in the machine lock to signal that the + // machine is suspended and cannot be interrupted. + // + ml.suspend_task (); + + m->suspend (false); + m->wait (false); + m->cleanup (); + run_btrfs (trace, "subvolume", "delete", xp); + info << "resuming after machine suspension"; + } + catch (const failed&) {} + + return make_pair (auxiliary_machine_result {move (xp), nullptr}, + string ()); + }; + + auto check_machine = [&xp, &m] () + { + try + { + size_t t (0); + if (!m->wait (t /* seconds */, false /* fail_hard */)) + return true; + } + catch (const failed&) {} + + diag_record dr (warn); + dr << "machine " << xp << " exited unexpectedly"; + m->print_info (dr); + + return false; + }; + + auto check_interrupt = [&trace, &xp, &m] () + { + if (sigurs1.load (std::memory_order_consume) == 0) + return; + + l2 ([&]{trace << "machine " << xp << " interruped";}); + + try {m->forcedown (false);} catch (const failed&) {} + m->cleanup (); + m = nullptr; // Disable exceptions guard above. + run_btrfs (trace, "subvolume", "delete", xp); + + throw interrupt (); + }; + + // Wait for up to 4 minutes (by default) for the environment upload (the + // same logic as in bootstrap_auxiliary_machine() except here the machine + // cannot just exit). + // + size_t to; + const size_t startup_to (ops.build_startup ()); + + for (to = startup_to; to != 0; ) + { + check_interrupt (); + + if (tftpd.serve (to, 2)) + continue; + + if (!check_machine ()) + { + // An auxiliary machine should not just exit. + // + return make_pair (auxiliary_machine_result {move (xp), nullptr}, + string ()); + } + + if (file_not_empty (ef)) + { + if (!tftpd.serve (to, 5)) break; + } + } + + if (to == 0) + { + if (retry > ops.build_retries ()) + return soft_fail ("build startup timeout"); + + // Note: keeping the logs behind (no cleanup). + + diag_record dr (warn); + dr << "machine " << mm.machine.name << " mis-booted, retrying"; + m->print_info (dr); + + try {m->forcedown (false);} catch (const failed&) {} + m = nullptr; // Disable exceptions guard above. + continue; + } + + l3 ([&]{trace << "completed startup in " << startup_to - to << "s";}); + + // Read the uploaded environment and, if necessary, append the name prefix + // (which we first make a valid C identifier and uppercase). + // + // Note that it may seem like a good idea to validate the format here. + // But that means we will essentially need to parse it twice (here and in + // worker). Plus, in worker we can comminucate some diagnostics by writing + // it to the build log (here all we can easily do is abort the task). So + // here we just append the name prefix to trimmed non-blank/comment lines. + // + string env_pfx (env_name.empty () + ? string () + : ucase (sanitize_identifier (env_name)) + '_'); + string env; + try + { + ifdstream is (ef, ifdstream::badbit); + for (string l; !eof (getline (is, l)); ) + { + trim (l); + + if (!env_pfx.empty () && !l.empty () && l.front () != '#') + l.insert (0, env_pfx); + + env += l; env += '\n'; + } + } + catch (const io_error& e) + { + fail << "unable to read from " << ef << ": " << e; + } + + // Rename and keep the environment file for debugging (it will be removed + // at the end as part of the tftp_put_dir cleanup). + // + mvfile (ef, efm); + + return make_pair (auxiliary_machine_result {move (xp), move (m)}, + move (env)); + } + + // Unreachable. +} +catch (const system_error& e) +{ + fail << "auxiliary machine startup error: " << e << endf; +} + +// Divide the auxiliary RAM among the specified machines. +// +// Issue diagnostics and return empty vector if the auxiliary RAM is +// insufficient. +// +static vector<size_t> // Parallel to mms. +divide_auxiliary_ram (const vector<const machine_header_manifest*>& mms) +{ + size_t ram (ops.auxiliary_ram ()); + + vector<size_t> rams; + vector<size_t> rnds; // Allocation rounds (see below). + + // First pass: allocate the minimums. + // + for (const machine_header_manifest* mm: mms) + { + size_t v (effective_ram_minimum (*mm)); + + assert (!mm->ram_maximum || v <= *mm->ram_maximum); // Sanity check. + + rams.push_back (v); + rnds.push_back (0); + + if (ram >= v) + ram -= v; + else + { + diag_record dr (error); + dr << "insufficient auxiliary RAM " << ops.auxiliary_ram () << "KiB"; + + for (size_t i (0); i != rams.size (); ++i) + dr << info << mms[i]->name << " requires minimum " << rams[i] << "KiB"; + + return {}; + } + } + + // Second pass: distribute the remaining RAM. + // + // We are going to do it in the ram_minimum increments to avoid ending up + // with odd amounts (while Linux can probably grok anything, who knows about + // Windows). + // + // To make the distribution fair we are going to count how many times we + // have increased each machine's allocation (the rnds vector). + // + for (size_t a (1); ram != 0; ) // Allocation round. + { + // Find a machine that would be satisfied with the least amount of RAM but + // which hasn't yet been given anything on this allocation round. + // + size_t min_i; // Min index. + size_t min_v (0); // Min value. + + // We are done if we couldn't give out any RAM and haven't seen any + // machines that have already been given something on this allocation + // round. + // + bool done (true); + + for (size_t i (0); i != rams.size (); ++i) + { + if (rnds[i] != a) + { + const machine_header_manifest& mm (*mms[i]); + + size_t o (rams[i]); + size_t v (effective_ram_minimum (mm)); + + // Don't allocate past maximum. + // + if (mm.ram_maximum && *mm.ram_maximum < o + v) + { + v = *mm.ram_maximum - o; + + if (v == 0) + continue; + } + + if (v <= ram && (min_v == 0 || min_v > v)) + { + min_i = i; + min_v = v; } } + else + done = false; } - catch (const system_error& e) + + if (min_v != 0) { - fail << "unable to iterate over " << vd << ": " << e; + rnds[min_i] = a; + rams[min_i] += min_v; + ram -= min_v; + } + else + { + if (done) + break; + + ++a; // Next allocation round. } } - if (none) - warn << "no build machines for toolchain " << tc_name; + return rams; +} - return r; +// Stop all the auxiliary machines and clear the passed list. +// +static void +stop_auxiliary_machines (auxiliary_machine_results& amrs) +{ + tracer trace ("stop_auxiliary_machines"); + + if (!amrs.empty ()) + { + // Do it in two passes to make sure all the machines are at least down. + // + for (const auxiliary_machine_result& amr: amrs) + { + if (amr.machine != nullptr) + { + try {amr.machine->forcedown (false);} catch (const failed&) {} + } + } + + // Make sure we don't retry the above even if the below fails. + // + auxiliary_machine_results tmp; + tmp.swap (amrs); + + for (const auxiliary_machine_result& amr: tmp) + { + if (amr.machine != nullptr) + { + amr.machine->cleanup (); + run_btrfs (trace, "subvolume", "delete", amr.snapshot); + } + } + } } -catch (const system_error& e) + +// Start all the auxiliary machines and patch in their combined environment +// into tm.auxiliary_environment. +// +// Return the started machines or empty list if any of them failed to start up +// (which means this function should only be called for non-empty ams). +// +// Note that the order of auxiliary machines in ams may not match that in +// tm.auxiliary_machines. +// +static auxiliary_machine_results +start_auxiliary_machines (const vector<bootstrapped_machine*>& ams, + task_manifest& tm, + const string& in_name, // <toolchain>-<instance> + const dir_path& tftp_put_dir, + optional<size_t> boost_cpus) { - fail << "unable to iterate over " << machines << ": " << e << endf; + tracer trace ("start_auxiliary_machines"); + + size_t n (tm.auxiliary_machines.size ()); + + assert (n != 0 && ams.size () == n); + + auxiliary_machine_results amrs; + + // Divide the auxiliary RAM among the machines. + // + vector<size_t> rams; + { + vector<const machine_header_manifest*> mms; + mms.reserve (n); + for (bootstrapped_machine* am: ams) + mms.push_back (&am->manifest.machine); + + rams = divide_auxiliary_ram (mms); + if (rams.empty ()) + return amrs; + + if (verb > 3) // l3 + for (size_t i (0); i != n; ++i) + trace << mms[i]->name << " allocated " << rams[i] << "KiB"; + } + + // Start the machines. + // + // Let's use the order in which they were specified in the task manifest + // (which will naturally be the order in which they are specified in the + // package manifest). This way amrs and tm.auxiliary_machines will be + // parallel. + // + string envs; // Combined environments. + + auto amg ( + make_exception_guard ( + [&amrs] () + { + if (!amrs.empty ()) + { + info << "trying to force auxiliary machines down"; + stop_auxiliary_machines (amrs); + } + })); + + for (size_t i (0); i != n; ++i) + { + const auxiliary_machine& tam (tm.auxiliary_machines[i]); + + auto b (ams.begin ()), e (ams.end ()); + auto j (find_if (b, e, + [&tam] (const bootstrapped_machine* m) + { + return m->manifest.machine.name == tam.name; + })); + assert (j != e); + + // Note: can throw interrupt. + // + pair<auxiliary_machine_result, string> p ( + start_auxiliary_machine (**j, + tam.environment_name, + i + 1, + rams[j - b], // Parallel to ams. + in_name, + tftp_put_dir, + boost_cpus)); + + if (p.first.machine == nullptr) + { + if (!amrs.empty ()) + { + info << "trying to force auxiliary machines down"; + stop_auxiliary_machines (amrs); // amrs is now empty. + } + + return amrs; + } + + amrs.push_back (move (p.first)); + + // Add the machine name as a header before its environment. + // + if (i != 0) envs += '\n'; + envs += "# "; envs += tam.name; envs += '\n'; + envs += "#\n"; + envs += p.second; // Always includes trailing newline. + } + + tm.auxiliary_environment = move (envs); + + return amrs; } -static result_manifest -perform_task (const dir_path& md, - const bootstrapped_machine_manifest& mm, - const task_manifest& tm) +struct perform_task_result +{ + auto_rmdir work_dir; // <tftp>/build/<toolchain>-<instance>/ + result_manifest manifest; + + // Uploaded archive, if any (somewhere inside work_dir). + // + optional<path> upload_archive; + + // Create the special empty result. + // + perform_task_result () = default; + + // Create task result without build artifacts. + // + explicit + perform_task_result (auto_rmdir&& d, result_manifest&& m) + : work_dir (move (d)), + manifest (move (m)) {} + + // Create task result with build artifacts. + // + perform_task_result (auto_rmdir&& d, result_manifest&& m, path&& a) + : work_dir (move (d)), + manifest (move (m)), + upload_archive (move (a)) {} +}; + +// Note that the task manifest is not const since we may need to patch in the +// auxiliary_environment value. +// +static perform_task_result +perform_task (toolchain_lock tl, // Note: assumes ownership. + bootstrapped_machine& bm, // Build machine. + const vector<bootstrapped_machine*>& ams, // Auxiliary machines. + task_manifest& tm, + optional<size_t> boost_cpus) try { - tracer trace ("perform_task", md.string ().c_str ()); + tracer trace ("perform_task", bm.path.string ().c_str ()); + + // Arm the interrupt handler and release the global toolchain lock. + // + // Note that there can be no interrupt while we are holding the global lock. + // + sigurs1.store (0, std::memory_order_release); + tl.unlock (); + + machine_lock& ml (bm.lock); + const dir_path& md (bm.path); + const bootstrapped_machine_manifest& mm (bm.manifest); + + const string in_name (tc_name + '-' + to_string (inst)); + auto_rmdir arm ((dir_path (ops.tftp ()) /= "build") /= in_name); + + try_mkdir_p (arm.path); result_manifest r { tm.name, tm.version, result_status::abort, - operation_results {}}; + operation_results {}, + nullopt /* worker_checksum */, + nullopt /* dependency_checksum */}; if (ops.fake_build ()) - return r; + return perform_task_result (move (arm), move (r)); // The overall plan is as follows: // - // 1. Snapshot the (bootstrapped) machine. + // 1. Snapshot the (bootstrapped) build machine. // // 2. Save the task manifest to the TFTP directory (to be accessed by the // worker). // // 3. Start the TFTP server and the machine. // - // 4. Serve TFTP requests while watching out for the result manifest. + // 4. Serve TFTP requests while watching out for the result manifest and + // interrupts. // // 5. Clean up (force the machine down and delete the snapshot). // + // If the task requires any auxiliary machines, then for each such machine + // perform the following steps 1-3 before step 1 above, and step 4 after + // step 5 above (that is, start all the auxiliary machines before the build + // machine and clean them up after): + // + // 1. Snapshot the (bootstrapped) auxiliary machine. + // + // 2. Start the TFTP server and the machine. + // + // 3. Handle TFTP upload requests until received the environment upload. + // + // 4. Clean up (force the machine down and delete the snapshot). // TFTP server mapping (server chroot is --tftp): // - // GET requests to .../build/<name>-<instance>/get/* - // PUT requests to .../build/<name>-<instance>/put/* + // GET requests to .../build/<toolchain>-<instance>/get/* + // PUT requests to .../build/<toolchain>-<instance>/put/* // - const string in_name (tc_name + '-' + to_string (inst)); - auto_rmdir arm ((dir_path (ops.tftp ()) /= "build") /= in_name); - dir_path gd (dir_path (arm.path) /= "get"); dir_path pd (dir_path (arm.path) /= "put"); @@ -838,11 +2154,15 @@ try path tf (gd / "task.manifest"); // Task manifest file. path rf (pd / "result.manifest.lz4"); // Result manifest file. - - serialize_manifest (tm, tf, "task"); + path af (pd / "upload.tar"); // Archive of build artifacts to upload. if (ops.fake_machine_specified ()) { + // Note: not handling interrupts here. Nor starting any auxiliary + // machines, naturally. + + serialize_manifest (tm, tf, "task"); + // Simply wait for the file to appear. // for (size_t i (0);; sleep (1)) @@ -863,7 +2183,40 @@ try } else { + // Start the auxiliary machines if any. + // + // If anything goes wrong, force them all down (failed that, the machine + // destructor will block waiting for their exit). + // + auxiliary_machine_results amrs; + + auto amg ( + make_exception_guard ( + [&amrs] () + { + if (!amrs.empty ()) + { + info << "trying to force auxiliary machines down"; + stop_auxiliary_machines (amrs); + } + })); + + if (!ams.empty ()) + { + amrs = start_auxiliary_machines (ams, tm, in_name, pd, boost_cpus); + + if (amrs.empty ()) + return perform_task_result (move (arm), move (r)); // Abort. + } + + // Note: tm.auxiliary_environment patched in by start_auxiliary_machines(). + // + serialize_manifest (tm, tf, "task"); + + // Start the build machine and perform the build. + // try_rmfile (rf); + try_rmfile (af); // <name>-<toolchain>-<xxx> // @@ -880,32 +2233,45 @@ try // tftp_server tftpd ("Gr ^/?(.+)$ /build/" + in_name + "/get/\\1\n" + "Pr ^/?(.+)$ /build/" + in_name + "/put/\\1\n", - ops.tftp_port () + offset); + ops.tftp_port () + offset + 0 /* build machine */); l3 ([&]{trace << "tftp server on port " << tftpd.port ();}); - // Start the machine. + // Note: the machine handling logic is similar to bootstrap. Except here + // we have to cleanup the snapshot ourselves in case of suspension or + // unexpected exit. // - unique_ptr<machine> m ( - start_machine (xp, - mm.machine, - mm.machine.mac, - ops.bridge (), - tftpd.port (), - tm.interactive.has_value ())); - - // Note: the machine handling logic is similar to bootstrap. + // NOTE: see similar code in start_auxiliary_machine() above. // { + // Start the machine. + // + unique_ptr<machine> m ( + start_machine (xp, + mm.machine, + 0 /* machine_num (build) */, + boost_cpus ? *boost_cpus : ops.cpu (), + ops.build_ram (), + mm.machine.mac, + ops.bridge (), + tftpd.port (), + tm.interactive.has_value () /* public_vnc */)); + auto mg ( make_exception_guard ( [&m, &xp] () { - info << "trying to force machine " << xp << " down"; - try {m->forcedown (false);} catch (const failed&) {} + if (m != nullptr) + { + info << "trying to force machine " << xp << " down"; + try {m->forcedown (false);} catch (const failed&) {} + } })); - auto soft_fail = [&xp, &m, &r] (const char* msg) + auto soft_fail = [&trace, + &amrs, + &ml, &xp, &m, + &arm, &r] (const char* msg) { { diag_record dr (error); @@ -913,16 +2279,36 @@ try m->print_info (dr); } + // What should we do about auxiliary machines? We could force them + // all down before suspending (and thus freeing them for use). That + // is the easy option. We could suspend them as well, but that feels + // like it will be a pain (will need to resume all of them when done + // investigating). Theoretically we could just let them run, but + // that won't play well with our interrupt logic since someone may + // attempt to interrupt us via one of them. So let's do easy for + // now. + // + // Note: always stop/suspend the build machine before the auxiliary + // machines to avoid any errors due the auxiliary machines being + // unavailable. try { + // Update the information in the machine lock to signal that the + // machine is suspended and cannot be interrupted. + // + ml.suspend_task (); + m->suspend (false); + stop_auxiliary_machines (amrs); m->wait (false); m->cleanup (); + m = nullptr; // Disable exceptions guard above. + run_btrfs (trace, "subvolume", "delete", xp); info << "resuming after machine suspension"; } catch (const failed&) {} - return r; + return perform_task_result (move (arm), move (r)); }; auto check_machine = [&xp, &m] () @@ -933,9 +2319,7 @@ try if (!m->wait (t /* seconds */, false /* fail_hard */)) return true; } - catch (const failed&) - { - } + catch (const failed&) {} diag_record dr (warn); dr << "machine " << xp << " exited unexpectedly"; @@ -944,26 +2328,76 @@ try return false; }; + auto check_auxiliary_machines = [&amrs] () + { + for (auxiliary_machine_result& amr: amrs) + { + try + { + size_t t (0); + if (!amr.machine->wait (t /* seconds */, false /* fail_hard */)) + continue; + } + catch (const failed&) {} + + diag_record dr (warn); + dr << "machine " << amr.snapshot << " exited unexpectedly"; + amr.machine->print_info (dr); + + return false; + } + + return true; + }; + + auto check_interrupt = [&trace, &amrs, &xp, &m] () + { + if (sigurs1.load (std::memory_order_consume) == 0) + return; + + l2 ([&]{trace << "machine " << xp << " interruped";}); + + try {m->forcedown (false);} catch (const failed&) {} + stop_auxiliary_machines (amrs); + m->cleanup (); + m = nullptr; // Disable exceptions guard above. + run_btrfs (trace, "subvolume", "delete", xp); + + throw interrupt (); + }; + // The first request should be the task manifest download. Wait for up - // to 2 minutes for that to arrive (again, that long to deal with - // flaky Windows networking). In a sense we use it as an indication - // that the machine has booted and the worker process has started. + // to 4 minutes (by default) for that to arrive (again, that long to + // deal with flaky Windows networking, etc). In a sense we use it as + // an indication that the machine has booted and the worker process + // has started. // size_t to; - const size_t startup_to (120); + const size_t startup_to (ops.build_startup ()); const size_t build_to (tm.interactive ? ops.intactive_timeout () : ops.build_timeout ()); - // Wait periodically making sure the machine is still alive. + // Wait periodically making sure the machine is still alive and + // checking for interrupts. // for (to = startup_to; to != 0; ) { + check_interrupt (); + if (tftpd.serve (to, 2)) break; - if (!check_machine ()) - return r; + bool bm; // Build machine still running. + if (!(bm = check_machine ()) || !check_auxiliary_machines ()) + { + if (bm) + try {m->forcedown (false);} catch (const failed&) {} + stop_auxiliary_machines (amrs); + m = nullptr; // Disable exceptions guard above. + run_btrfs (trace, "subvolume", "delete", xp); + return perform_task_result (move (arm), move (r)); + } } if (to == 0) @@ -978,26 +2412,38 @@ try m->print_info (dr); try {m->forcedown (false);} catch (const failed&) {} + m = nullptr; // Disable exceptions guard above. continue; } l3 ([&]{trace << "completed startup in " << startup_to - to << "s";}); - // Next the worker builds things and then uploads the result manifest. - // So on our side we serve TFTP requests while checking for the - // manifest file. To workaround some obscure filesystem races (the - // file's mtime/size is updated several seconds later; maybe tmpfs - // issue?), we periodically re-check. + // Next the worker builds things and then uploads optional archive of + // build artifacts and the result manifest afterwards. So on our side + // we serve TFTP requests while checking for the manifest file. To + // workaround some obscure filesystem races (the file's mtime/size is + // updated several seconds later; maybe tmpfs issue?), we periodically + // re-check. // for (to = build_to; to != 0; ) { + check_interrupt (); + if (tftpd.serve (to, 2)) continue; - if (!check_machine ()) + bool bm; // Build machine still running. + if (!(bm = check_machine ()) || !check_auxiliary_machines ()) { - if (!file_not_empty (rf)) - return r; + if (bm || !file_not_empty (rf)) + { + if (bm) + try {m->forcedown (false);} catch (const failed&) {} + stop_auxiliary_machines (amrs); + m = nullptr; // Disable exceptions guard above. + run_btrfs (trace, "subvolume", "delete", xp); + return perform_task_result (move (arm), move (r)); + } } if (file_not_empty (rf)) @@ -1050,7 +2496,9 @@ try // lease instead of a new one. // try {m->forcedown (false);} catch (const failed&) {} + stop_auxiliary_machines (amrs); m->cleanup (); + m = nullptr; // Disable exceptions guard above. } } @@ -1059,7 +2507,7 @@ try } } - // Update package name/version if the returned value as "unknown". + // Update package name/version if the returned value is "unknown". // if (r.version == bpkg::version ("0")) { @@ -1069,23 +2517,16 @@ try r.version = tm.version; } - return r; + return (!r.status || !file_exists (af) + ? perform_task_result (move (arm), move (r)) + : perform_task_result (move (arm), move (r), move (af))); } catch (const system_error& e) { fail << "build error: " << e << endf; } -extern "C" void -handle_signal (int sig) -{ - switch (sig) - { - case SIGHUP: exit (3); // Unimplemented feature. - case SIGTERM: exit (0); - default: assert (false); - } -} +static const string agent_checksum ("2"); // Logic version. int main (int argc, char* argv[]) @@ -1096,6 +2537,21 @@ try verb = ops.verbose (); +#if 0 + // ./bbot-agent --auxiliary-ram 4194304 + // + machine_header_manifest m1 { + "m1", "m1", "m1", machine_role::auxiliary, 512*1024, nullopt}; + machine_header_manifest m2 { + "m2", "m2", "m2", machine_role::auxiliary, 1024*1024, 3*512*1024}; + vector<const machine_header_manifest*> mms {&m1, &m2}; + vector<size_t> rams (divide_auxiliary_ram (mms)); + for (size_t i (0); i != rams.size (); ++i) + text << mms[i]->name << ' ' << rams[i] / 1024; + + return 0; +#endif + // @@ systemd 231 added JOURNAL_STREAM environment variable which allows // detecting if stderr is connected to the journal. // @@ -1175,7 +2631,7 @@ try << "libbpkg " << LIBBPKG_VERSION_ID << endl << "libbutl " << LIBBUTL_VERSION_ID << endl << "Copyright (c) " << BBOT_COPYRIGHT << "." << endl - << "TBC; All rights reserved" << endl; + << "This is free software released under the MIT license." << endl; return 0; } @@ -1194,40 +2650,102 @@ try tc_name = ops.toolchain_name (); tc_num = ops.toolchain_num (); + + if (ops.toolchain_lock_specified ()) + { + const string& l (ops.toolchain_lock ()); + + if (!l.empty ()) + { + tc_lock = path (l); + + if (!tc_lock.absolute ()) + fail << "--toolchain-lock value '" << l << "' should be absolute path"; + } + } + else if (!(ops.fake_bootstrap () || + ops.fake_build () || + ops.fake_machine_specified () || + ops.fake_request_specified ())) + tc_lock = path ("/var/lock/bbot-agent-" + tc_name + ".lock"); + tc_ver = (ops.toolchain_ver_specified () ? ops.toolchain_ver () : standard_version (BBOT_VERSION_STR)); tc_id = ops.toolchain_id (); - if (tc_num == 0 || tc_num > 99) - fail << "invalid --toolchain-num value " << tc_num; + if (tc_num == 0 || tc_num > 9) + fail << "--toolchain-num value " << tc_num << " out of range"; inst = ops.instance (); if (inst == 0 || inst > 99) - fail << "invalid --instance value " << inst; + fail << "--instance value " << inst << " out of range"; - offset = (tc_num - 1) * 100 + inst; + inst_max = ops.instance_max (); - // Controller URLs. + // The last decimal position is used for machine number, 0 for the build + // machine, non-0 for auxiliary machines (of which we can have maximum 9). // - if (argc < 2 && - !ops.dump_machines () && - !ops.fake_request_specified ()) - { - fail << "controller url expected" << - info << "run " << argv[0] << " --help for details"; - } + offset = (tc_num - 1) * 1000 + inst * 10; - strings controllers; + // Controller priority to URLs map. + // + std::map<uint64_t, strings> controllers; for (int i (1); i != argc; ++i) - controllers.push_back (argv[i]); + { + // [<prio>=]<url> + // + string a (argv[i]); + + // See if we have priority, falling back to priority 0 if absent. + // + uint64_t prio (0); + + // Note that we can also have `=` in <url> (e.g., parameters) so we will + // only consider `=` as ours if prior to it we only have digits. + // + size_t p (a.find ('=')); + if (p != string::npos && a.find_first_not_of ("0123456789") == p) + { + // Require exactly four or five digits in case we later need to extend + // the priority levels beyond the 10 possible values (e.g., DDCCBBAA). + // + if (p != 4 && p != 5) + fail << "four or five-digit controller url priority expected in '" + << a << "'"; + + char* e; + errno = 0; + prio = strtoull (a.c_str (), &e, 10); + assert (errno != ERANGE && e == a.c_str () + p); + + if (prio > 19999) + fail << "out of bounds controller url priority in '" << a << "'"; + + a.erase (0, p + 1); + } + + controllers[prio].push_back (move (a)); + } + + if (controllers.empty ()) + { + if (ops.dump_machines () || ops.fake_request_specified ()) + { + controllers[0].push_back ("https://example.org"); + } + else + fail << "controller url expected" << + info << "run " << argv[0] << " --help for details"; + } // Handle SIGHUP and SIGTERM. // if (signal (SIGHUP, &handle_signal) == SIG_ERR || - signal (SIGTERM, &handle_signal) == SIG_ERR) + signal (SIGTERM, &handle_signal) == SIG_ERR || + signal (SIGUSR1, &handle_signal) == SIG_ERR) fail << "unable to set signal handler: " << system_error (errno, std::generic_category ()); // Sanitize. @@ -1267,7 +2785,8 @@ try dr << info << "cpu(s) " << ops.cpu () << - info << "ram(kB) " << ops.ram () << + info << "build ram(KiB) " << ops.build_ram () << + info << "auxil ram(KiB) " << ops.auxiliary_ram () << info << "bridge " << ops.bridge (); if (fingerprint) @@ -1281,8 +2800,19 @@ try info << "toolchain id " << tc_id << info << "instance num " << inst; - for (const string& u: controllers) - dr << info << "controller url " << u; + if (inst_max != 0) + dr << info << "instance max " << inst_max; + + // Note: keep last since don't restore fill/setw. + // + for (const pair<const uint64_t, strings>& p: controllers) + { + for (const string& u: p.second) + { + dr.os.fill ('0'); + dr << info << "controller url " << std::setw (4) << p.first << '=' << u; + } + } } // The work loop. The steps we go through are: @@ -1296,9 +2826,11 @@ try // 4. If a build task is returned, do it, upload the result, and go to #1 // (immediately). // - auto rand_sleep = [g = std::mt19937 (std::random_device {} ())] () mutable + // NOTE: consider updating agent_checksum if making any logic changes. + // + auto rand_sleep = [] () { - return std::uniform_int_distribution<unsigned int> (50, 60) (g); + return std::uniform_int_distribution<unsigned int> (50, 60) (rand_gen); }; optional<interactive_mode> imode; @@ -1307,78 +2839,259 @@ try if (ops.interactive () != interactive_mode::false_) { imode = ops.interactive (); - ilogin = machine_vnc (true /* public */); + ilogin = machine_vnc (0 /* machine_num */, true /* public */); + } + + // Use the pkeyutl openssl command for signing the task response challenge + // if openssl version is greater or equal to 3.0.0 and the rsautl command + // otherwise. + // + // Note that openssl 3.0.0 deprecates rsautl in favor of pkeyutl. + // + const char* sign_cmd; + + try + { + optional<openssl_info> oi (openssl::info (trace, 2, ops.openssl ())); + + sign_cmd = oi && + oi->name == "OpenSSL" && + oi->version >= semantic_version {3, 0, 0} + ? "pkeyutl" + : "rsautl"; + } + catch (const system_error& e) + { + fail << "unable to obtain openssl version: " << e << endf; } for (unsigned int sleep (0);; ::sleep (sleep), sleep = 0) { - bootstrapped_machines ms (enumerate_machines (ops.machines ())); - - // Prepare task request. - // - task_request_manifest tq { - hname, - tc_name, - tc_ver, - imode, - ilogin, - fingerprint, - machine_header_manifests {} - }; + pair<toolchain_lock, bootstrapped_machines> er ( + enumerate_machines (ops.machines ())); - // Note: below we assume tq.size () == ms.size (). - // - for (const bootstrapped_machine& m: ms) - tq.machines.emplace_back (m.manifest.machine.id, - m.manifest.machine.name, - m.manifest.machine.summary); + toolchain_lock& tl (er.first); + bootstrapped_machines& ms (er.second); - if (ops.dump_machines ()) + // Determine the existing task priority range (using [0,0] if there are + // none) as well as whether we we should operate in the priority monitor + // mode. + // + uint64_t prio_min (~uint64_t (0)); + uint64_t prio_max (0); + bool prio_mon (false); { - for (const machine_header_manifest& m: tq.machines) - serialize_manifest (m, cout, "stdout", "machine"); + uint16_t busy (0); // Number of build machines locked by other processes. + bool task (false); // There is a build machine performing a task. - return 0; - } + for (const bootstrapped_machine& m: ms) + { + if (!m.lock.locked ()) + { + if (m.lock.prio) // Not bootstrapping/suspended. + { + if (m.manifest.machine.effective_role () != machine_role::auxiliary) + { + ++busy; + task = true; - if (tq.machines.empty ()) - { - // Normally this means all the machines are locked so sleep a bit less. - // - sleep = rand_sleep () / 2; - continue; + if (prio_min > *m.lock.prio) + prio_min = *m.lock.prio; + + if (prio_max < *m.lock.prio) + prio_max = *m.lock.prio; + } + } + else + ++busy; // Assume build machine (see enumerate_machines()). + } + } + + if (prio_min > prio_max) // No tasks. + prio_min = prio_max; + + if (inst_max != 0) + { + assert (busy <= inst_max); + + if (busy == inst_max) + { + if (!task) // All bootstrapping/suspended. + { + sleep = rand_sleep (); + continue; + } + + l2 ([&]{trace << "priority monitor, range [" << prio_min << ", " + << prio_max << "]";}); + + prio_mon = true; + } + } } - // Send task requests. - // - // Note that we have to do it while holding the lock on all the machines - // since we don't know which machine we will need. + // If we get a task, these contain all the corresponding information. // - string url; + task_request_manifest tq; task_response_manifest tr; + uint64_t prio; + string url; - if (ops.fake_request_specified ()) + // Iterate over controller priorities in reverse, that is, from highest to + // lowest (see the agent(1) man page for background on the priority + // levels). + // + // The following factors determine the lower bound of priorities we should + // consider: + // + // 1. If in the priority monitor mode, then we should only consider + // priorities that can interrupt the existing task with the lowest + // priority. + // + // Here is a representative sample of existing/interrupt priorities + // from which we derive the below formulae (remember that we only start + // interrupting from priority level 3): + // + // existing interrupt + // -------- --------- + // 5 >= 100 + // 55 >= 100 + // 555 >= 600 + // 999 >= 1000 + // 5055 >= 5100 + // 5555 >= 5600 + // 9999 >= 10000 + // + // Essentially, what we need to do is discard the lowest 2 levels and + // add 100, moving the priority to the next 3rd level. + // + // 2. Otherwise, we should factor in the "don't ask for lower-priority + // tasks" semantics that applies from the second priority level. + // + // Note also that the other half of this logic is below where we determine + // which machines we offer for each priority. + // + auto ce (controllers.end ()); + auto cb (controllers.lower_bound ( + prio_mon ? ((prio_min / 100) * 100) + 100 : + prio_max >= 10 ? prio_max - 1 : // Including this priority. + 0)); // Any priority. + + for (; cb != ce; ) { - auto t (parse_manifest<task_manifest> (ops.fake_request (), "task")); + const pair<const uint64_t, strings>& pu (*--ce); - tr = task_response_manifest { - "fake-session", // Dummy session. - nullopt, // No challenge. - url, // Empty result URL. - move (t)}; + prio = pu.first; + const strings& urls (pu.second); - url = "http://example.org"; - } - else - { - // Note that after completing each task we always start from the - // beginning of the list. This fact can be used to implement a poor - // man's priority system where we will continue serving the first listed - // controller for as long as it has tasks (and maybe in the future we - // will implement a proper priority system). + // Prepare task request (it will be the same within a given priority). + // + tq = task_request_manifest { + hname, + tc_name, + tc_ver, + imode, + ilogin, + fingerprint, + ops.auxiliary_ram (), + machine_header_manifests {}}; + + // Determine which machines we need to offer for this priority. + // + bool aux_only (true); // Only auxiliary machines are available. + { + bool interruptable (false); // There is build machine we can interrupt. + for (const bootstrapped_machine& m: ms) + { + const machine_manifest& mm (m.manifest.machine); + machine_role role (mm.effective_role ()); + + if (!m.lock.locked ()) + { + if (!m.lock.prio) // Skip bootstrapping/suspended. + continue; + + uint64_t eprio (*m.lock.prio); + + // Determine if our priority can interrupt the existing task. + // + // Based on the above discussion of the priority lower bound + // determination (and some menditation) it's clear that we can + // only interrupt the existing task if our priority is (at least) + // on a higher 3rd level. + // + if ((prio / 100) <= (eprio / 100)) + continue; + + if (role != machine_role::auxiliary) + interruptable = true; + } + + tq.machines.emplace_back (mm.id, + mm.name, + mm.summary, + role, + effective_ram_minimum (mm), + mm.ram_maximum); + + aux_only = aux_only && role == machine_role::auxiliary; + } + + // Sanity check: in the priority monitor mode we should only ask for a + // task if we can interrupt one (this should be taken care of by the + // priority lower bound calculation above). + // + assert (!prio_mon || interruptable); + } + + if (ops.dump_machines ()) + { + for (const machine_header_manifest& m: tq.machines) + serialize_manifest (m, cout, "stdout", "machine"); + + return 0; + } + + if (aux_only) + tq.machines.clear (); + + if (tq.machines.empty ()) + { + // If we have no build machines for this priority then we won't have + // any for any lower priority so bail out. + // + break; + } + + // Send task requests. + // + // Note that we have to do it while holding the lock on all the machines + // since we don't know which machine(s) we will need. // - for (const string& u: controllers) + vector<strings::const_iterator> rurls (urls.size ()); + std::iota (rurls.begin (), rurls.end (), urls.begin ()); + std::shuffle (rurls.begin (), rurls.end (), rand_gen); + + for (strings::const_iterator i: rurls) { + const string& u (*i); + + if (ops.fake_request_specified ()) + { + auto t (parse_manifest<task_manifest> (ops.fake_request (), "task")); + + tr = task_response_manifest { + "fake-session", // Dummy session. + nullopt, // No challenge. + string (), // Empty result URL. + vector<upload_url> (), + agent_checksum, + move (t)}; + + url = u; + break; + } + task_response_manifest r; try @@ -1401,7 +3114,11 @@ try try { - serialize_manifest (tq, c.out, u, "task request", false); + serialize_manifest (tq, + c.out, + u, + "task request", + false /* fail_hard */); } catch (const failed&) {f = true;} @@ -1436,8 +3153,8 @@ try { const task_manifest& t (*r.task); - // For security reasons let's require the repository location to be - // remote. + // For security reasons let's require the repository location to + // be remote. // if (t.repository.local ()) { @@ -1460,13 +3177,27 @@ try l2 ([&]{trace << "task for " << t.name << '/' << t.version << " " << "on " << t.machine << " " - << "from " << u;}); + << "from " << u << " " + << "priority " << prio;}); tr = move (r); url = u; break; } - } + } // url loop. + + if (!tr.session.empty ()) // Got a task. + break; + + } // prio loop. + + if (tq.machines.empty ()) // No machines (auxiliary-only already handled). + { + // Normally this means all the machines are busy so sleep a bit less. + // + l2 ([&]{trace << "all machines are busy, sleeping";}); + sleep = rand_sleep () / 2; + continue; } if (tr.session.empty ()) // No task from any of the controllers. @@ -1478,22 +3209,69 @@ try // We have a build task. // - // First find the index of the machine we were asked to use (and verify it - // is one of those we sent). Also unlock all the other machines. + task_manifest& t (*tr.task); + + // First verify the requested machines are from those we sent in tq and + // their roles match. + // + // Also verify the same machine is not picked multiple times by blanking + // out the corresponding entry in tq.machines. (Currently we are only + // capable of running one instance of each machine though we may want to + // relax that in the future, at which point we should send as many entries + // for the same machine in the task request as we are capable of running, + // applying the priority logic for each entry, etc). // - size_t i (ms.size ()); - for (size_t j (0); j != ms.size (); ++j) { - if (tq.machines[j].name == tr.task->machine) - i = j; - else - ms[j].lock.unlock (); + auto check = [&tq, &url] (const string& name, machine_role r) + { + auto i (find_if (tq.machines.begin (), tq.machines.end (), + [&name] (const machine_header_manifest& mh) + { + return mh.name == name; // Yes, names, not ids. + })); + + if (i == tq.machines.end ()) + { + error << "task from " << url << " for unknown machine " << name; + return false; + } + + if (i->effective_role () != r) + { + error << "task from " << url << " with mismatched role " + << " for machine " << name; + return false; + } + + i->name.clear (); // Blank out. + + return true; + }; + + auto check_aux = [&check] (const vector<auxiliary_machine>& ams) + { + for (const auxiliary_machine& am: ams) + if (!check (am.name, machine_role::auxiliary)) + return false; + return true; + }; + + if (!check (t.machine, machine_role::build) || + !check_aux (t.auxiliary_machines)) + { + if (ops.dump_task ()) + return 0; + + continue; + } } - if (i == ms.size ()) + // Also verify there are no more than 9 auxiliary machines (see the offset + // global variable for details). + // + if (t.auxiliary_machines.size () > 9) { - error << "task from " << url << " for unknown machine " - << tr.task->machine; + error << "task from " << url << " with more than 9 auxiliary machines"; if (ops.dump_task ()) return 0; @@ -1501,8 +3279,6 @@ try continue; } - task_manifest& t (*tr.task); - if (ops.dump_task ()) { serialize_manifest (t, cout, "stdout", "task"); @@ -1515,16 +3291,349 @@ try if (!ops.trust ().empty ()) t.trust = ops.trust (); - const dir_path& d (); // The -<toolchain> directory. - const bootstrapped_machine_manifest& m (); + // Reset the worker checksum if the task's agent checksum doesn't match + // the current one. + // + // Note that since the checksums are hierarchical, such reset will trigger + // resets of the "subordinate" checksums (dependency checksum, etc). + // + if (!tr.agent_checksum || *tr.agent_checksum != agent_checksum) + t.worker_checksum = nullopt; + + // Handle interrupts. + // + // Note that the interrupt can be triggered both by another process (the + // interrupt exception is thrown from perform_task()) as well as by this + // process in case we were unable to interrupt the other process (seeing + // that we have already received a task, responding with an interrupt + // feels like the most sensible option). + // + perform_task_result r; + bootstrapped_machine* pm (nullptr); // Build machine. + vector<bootstrapped_machine*> ams; // Auxiliary machines. + try + { + // First find the bootstrapped_machine instance in ms corresponding to + // the requested build machine. Also unlock all the other machines. + // + // While at it also see if we need to interrupt the selected machine (if + // busy), one of the existing (if we are at the max allowed instances, + // that is in the priority monitor mode), or all existing (if this is a + // priority level 4 task). + // + // Auxiliary machines complicate the matter a bit: we may now need to + // interrupt some subset of {build machine, auxiliary machines} that are + // necessary to perform this task. Note, however, that auxiliary + // machines are always subordinate to build machines, meaning that if + // there is a busy auxiliary machine, then there will be a busy build + // machine with the same pid/priority (and so if we interrup one + // auxiliary, then we will also interrupt the corresponding build plus + // any other auxiliaries it may be using). Based on that let's try to + // divide and conquer this by first dealing with build machines and then + // adding any auxiliary ones. + // + vector<bootstrapped_machine*> ims; // Machines to be interrupted. + size_t imt (0); // Number of "target" machines to interrupt (see below). + + // First pass: build machines. + // + for (bootstrapped_machine& m: ms) + { + const machine_manifest& mm (m.manifest.machine); - result_manifest r (perform_task (ms[i].path, ms[i].manifest, t)); + if (mm.effective_role () == machine_role::auxiliary) + continue; - ms[i].lock.unlock (); // No need to hold the lock any longer. + if (mm.name == t.machine) + { + assert (pm == nullptr); // Sanity check. + pm = &m; + } + else if (m.lock.locked ()) + m.lock.unlock (); + else if (m.lock.prio) // Not bootstrapping/suspended. + { + // Only consider machines that we can interrupt (see above). + // + if ((prio / 100) > (*m.lock.prio / 100)) + { + if (prio >= 1000) // Priority level 4 (interrupt all). + ims.push_back (&m); + else if (prio_mon) + { + // Find the lowest priority task to interrupt. + // + if (ims.empty ()) + ims.push_back (&m); + else if (*m.lock.prio < *ims.back ()->lock.prio) + ims.back () = &m; + } + } + } + } + + assert (pm != nullptr); // Sanity check. + + if (!pm->lock.locked ()) + { + assert (pm->lock.prio); // Sanity check (not bootstrapping/suspended). + + if (prio >= 1000) + ims.insert (ims.begin (), pm); // Interrupt first (see below). + else + ims = {pm}; + + imt++; + } + + // Second pass: auxiliary machines. + // + for (bootstrapped_machine& m: ms) + { + const machine_manifest& mm (m.manifest.machine); + + if (mm.effective_role () != machine_role::auxiliary) + continue; + + if (find_if (t.auxiliary_machines.begin (), t.auxiliary_machines.end (), + [&mm] (const auxiliary_machine& am) + { + return am.name == mm.name; + }) != t.auxiliary_machines.end ()) + { + if (!m.lock.locked ()) + { + assert (m.lock.prio); // Sanity check (not bootstrapping/suspended). + + if (ims.empty ()) + { + ims.push_back (&m); + } + else if (ims.front () == pm) + { + ims.insert (ims.begin () + 1, &m); // Interrupt early (see below). + } + else if (prio < 1000 && prio_mon && ams.empty () /* first */) + { + // Tricky: replace the lowest priority task we have picked on + // the first pass with this one. + // + assert (ims.size () == 1); // Sanity check. + ims.back () = &m; + } + else + ims.insert (ims.begin (), &m); // Interrupt first (see below). + + imt++; + } + + ams.push_back (&m); + } + else if (m.lock.locked ()) + m.lock.unlock (); + } + + // Note: the order of machines may not match. + // + assert (ams.size () == t.auxiliary_machines.size ()); // Sanity check. + + assert (!prio_mon || !ims.empty ()); // We should have at least one. + + // Move the toolchain lock into this scope so that it's automatically + // released on any failure (on the happy path it is released by + // perform_task()). + // + toolchain_lock& rtl (tl); + toolchain_lock tl (move (rtl)); + + // Interrupt the machines, if necessary. + // + // Note that if we are interrupting multiple machines, then the target + // build machine, if needs to be interrupted, must be first, followed + // but all the target auxiliary machines. This way if we are unable to + // successfully interrupt them, we don't interrupt the rest. + // + vector<pid_t> pids; // Avoid re-interrupting the same pid. + for (size_t i (0); i != ims.size (); ++i) + { + bootstrapped_machine* im (ims[i]); + + // Sanity checks. + // + assert (!im->lock.locked () && im->lock.prio); + assert (im != pm || i == 0); + + const dir_path& tp (im->path); // -<toolchain> path. + pid_t pid (im->lock.pid); + + l2 ([&]{trace << "interrupting " + << (i < imt ? "target" : "lower priority") + << " machine " << tp << ", pid " << pid;}); + + // The plan is to send the interrupt and then wait for the lock. + // + // Note that the interrupt cannot be "lost" (or attributed to a + // different task) since we are sending it while holding the global + // lock and the other process arms it also while holding the global + // lock. + // + // But what can happen is the other task becomes suspended, which we + // will not be able to interrupt. + // + if (find (pids.begin (), pids.end (), pid) == pids.end ()) + { + if (kill (pid, SIGUSR1) == -1) + { + // Ignore the case where there is no such process (e.g., the other + // process has terminated in which case the lock should be + // released automatically). + // + if (errno != ESRCH) + throw_generic_error (errno); + } + + pids.push_back (pid); + } + + // If we are interrupting additional machine in order to free up + // resources, there is no use acquiring their lock (or failing if + // unable to) since this is merely a performance optimization. + // + if (i >= imt) + continue; + + // Try to lock the machine. + // + // While this normally shouldn't take long, there could be parts of + // the perform_task() logic that we do not interrupt and that may take + // some time. + // + machine_lock ml; + + size_t retry (0); + for (; retry != 31; ++retry) + { + if (retry != 0) + ::sleep (1); + + ml = lock_machine (tl, tp); + + if (ml.locked ()) + break; + + if (ml.pid != pid) + { + error << "interrupted machine " << tp << " changed pid"; + throw interrupt (); + } + + if (!ml.prio) // Got suspended. + { + l2 ([&]{trace << "interrupted machine " << tp << " suspended";}); + throw interrupt (); + } + } + + if (!ml.locked ()) + { + warn << "unable to lock interrupted machine " << tp << " within " + << (retry - 1) << "s"; + throw interrupt (); + } + + // This is an interrupted machine (build or auxiliary) that we will be + // using. See if it needs a re-bootstrap, the same as in + // enumerate_machines(). If not, then transfer the bootstrap manifest + // and lock. + // + const machine_manifest& mm (im->manifest.machine); + + bootstrapped_machine_manifest bmm ( + parse_manifest<bootstrapped_machine_manifest> ( + tp / "manifest", "bootstrapped machine")); + + bool rb (false); + + if (bmm.machine.id != mm.id) + { + l3 ([&]{trace << "re-bootstrap " << tp << ": new machine";}); + rb = true; + } + + if (im == pm) // Only for build machine. + { + if (!tc_id.empty () && bmm.toolchain.id != tc_id) + { + l3 ([&]{trace << "re-bootstrap " << tp << ": new toolchain";}); + rb = true; + } + + if (int i = compare_bbot (bmm.bootstrap)) + { + if (i < 0) + { + l3 ([&]{trace << "re-bootstrap " << tp << ": new bbot";}); + rb = true; + } + else + { + l3 ([&]{trace << "ignoring " << tp << ": old bbot";}); + rb = true; + } + } + } + + // We are not going to try to re-bootstrap this machine "inline". + // + if (rb) + throw interrupt (); + + im->manifest = move (bmm); + im->lock = move (ml); + } + + // Check if we need to boost the number of CPUs to the full hardware + // concurrency. + // + optional<size_t> bcpus; + if (prio >= 10000) + bcpus = std::thread::hardware_concurrency (); + + pm->lock.perform_task (tl, prio); // Build machine. + for (bootstrapped_machine* am: ams) // Auxiliary machines. + am->lock.perform_task (tl, prio); + + r = perform_task (move (tl), *pm, ams, t, bcpus); + } + catch (const interrupt&) + { + // Note: no work_dir. + // + r = perform_task_result ( + auto_rmdir (), + result_manifest { + t.name, + t.version, + result_status::interrupt, + operation_results {}, + nullopt /* worker_checksum */, + nullopt /* dependency_checksum */}); + } + + // No need to hold the locks any longer. + // + if (pm != nullptr && pm->lock.locked ()) + pm->lock.unlock (); + + for (bootstrapped_machine* am: ams) + if (am->lock.locked ()) + am->lock.unlock (); + + result_manifest& rm (r.manifest); if (ops.dump_result ()) { - serialize_manifest (r, cout, "stdout", "result"); + serialize_manifest (rm, cout, "stdout", "result"); return 0; } @@ -1539,7 +3648,7 @@ try openssl os (trace, fdstream_mode::text, path ("-"), 2, - ops.openssl (), "rsautl", + ops.openssl (), sign_cmd, ops.openssl_option (), "-sign", "-inkey", ops.auth_key ()); os.out << *tr.challenge; @@ -1560,9 +3669,258 @@ try fail << "unable to sign task response challenge: " << e; } + // Re-package the build artifacts, if present, into the type/instance- + // specific archives and upload them to the type-specific URLs, if + // provided. + // + // Note that the initial upload archive content is organized as a bunch of + // upload/<type>/<instance>/*, where the second level directories are the + // upload types and the third level sub-directories are their instances. + // The resulting <instance>.tar archives content (which is what we submit + // to the type-specific handler) are organized as <instance>/*. + // + if (r.upload_archive && !tr.upload_urls.empty ()) + { + const path& ua (*r.upload_archive); + + // Extract the archive content into the parent directory of the archive + // file. But first, make sure the resulting directory doesn't exist. + // + // Note that while we don't assume where inside the working directory + // the archive is, we do assume that there is nothing clashing/precious + // in the upload/ directory which we are going to cleanup. + // + dir_path d (ua.directory ()); + + const dir_path ud (d / dir_path ("upload")); + try_rmdir_r (ud); + + try + { + process_exit pe ( + process_run_callback ( + trace, + fdopen_null (), // Don't expect to read from stdin. + 2, // Redirect stdout to stderr. + 2, + "tar", + "-xf", ua, + "-C", d)); + + if (!pe) + fail << "tar " << pe; + } + catch (const system_error& e) + { + // There must be something wrong with the setup or there is no space + // left on the disk, thus the failure is fatal. + // + fail << "unable to extract build artifacts from archive: " << e; + } + + try_rmfile (ua); // Let's free up the disk space. + + // To decrease nesting a bit, let's collect the type-specific upload + // directories and the corresponding URLs first. This way we can also + // create the archive files as the upload/ directory sub-entries without + // interfering with iterating over this directory. + // + vector<pair<dir_path, string>> urls; + + try + { + for (const dir_entry& te: dir_iterator (ud, dir_iterator::no_follow)) + { + const string& t (te.path ().string ()); + + // Can only be a result of the worker malfunction, thus the failure + // is fatal. + // + if (te.type () != entry_type::directory) + fail << "unexpected filesystem entry '" << t << "' in " << ud; + + auto i (find_if (tr.upload_urls.begin (), tr.upload_urls.end (), + [&t] (const upload_url& u) {return u.type == t;})); + + if (i == tr.upload_urls.end ()) + continue; + + urls.emplace_back (ud / path_cast<dir_path> (te.path ()), i->url); + } + } + catch (const system_error& e) + { + fail << "unable to iterate over " << ud << ": " << e; + } + + // Now create archives and upload. + // + for (const pair<dir_path, string>& p: urls) + { + const dir_path& td (p.first); // <type>/ + const string& url (p.second); + + try + { + for (const dir_entry& ie: dir_iterator (td, dir_iterator::no_follow)) + { + const string& i (ie.path ().string ()); // <instance> + + // Can only be a result of the worker malfunction, thus the + // failure is fatal. + // + if (ie.type () != entry_type::directory) + fail << "unexpected filesystem entry '" << i << "' in " << td; + + // Archive the upload instance files and, while at it, calculate + // the resulting archive checksum. + // + sha256 sha; + auto_rmfile ari (ud / (i + ".tar")); + + try + { + // Instruct tar to print the archive to stdout. + // + fdpipe in_pipe (fdopen_pipe (fdopen_mode::binary)); + + process pr ( + process_start_callback ( + trace, + fdopen_null (), // Don't expect to read from stdin. + in_pipe, + 2 /* stderr */, + "tar", + "--format", "ustar", + "-c", + "-C", td, + i)); + + // Shouldn't throw, unless something is severely damaged. + // + in_pipe.out.close (); + + ifdstream is ( + move (in_pipe.in), fdstream_mode::skip, ifdstream::badbit); + + ofdstream os (ari.path, fdopen_mode::binary); + + char buf[8192]; + while (!eof (is)) + { + is.read (buf, sizeof (buf)); + + if (size_t n = static_cast<size_t> (is.gcount ())) + { + sha.append (buf, n); + os.write (buf, n); + } + } + + os.close (); + + if (!pr.wait ()) + fail << "tar " << *pr.exit; + } + catch (const system_error& e) + { + // There must be something wrong with the setup or there is no + // space left on the disk, thus the failure is fatal. + // + fail << "unable to archive " << td << i << "/: " << e; + } + + // Post the upload instance archive. + // + using namespace http_service; + + parameters params ({ + {parameter::text, "session", tr.session}, + {parameter::text, "instance", i}, + {parameter::file, "archive", ari.path.string ()}, + {parameter::text, "sha256sum", sha.string ()}}); + + if (challenge) + params.push_back ({ + parameter::text, "challenge", base64_encode (*challenge)}); + + result pr (post (ops, url, params)); + + // Turn the potential upload failure into the "upload" operation + // error, amending the task result manifest. + // + if (pr.error) + { + // The "upload" operation result must be present (otherwise + // there would be nothing to upload). We can assume it is last. + // + assert (!rm.results.empty ()); + + operation_result& r (rm.results.back ()); + + // The "upload" operation result must be the last, if present. + // + assert (r.operation == "upload"); + + auto log = [&r, indent = false] (const string& t, + const string& l) mutable + { + if (indent) + r.log += " "; + else + indent = true; + + r.log += t; + r.log += ": "; + r.log += l; + r.log += '\n'; + }; + + log ("error", + "unable to upload " + td.leaf ().string () + '/' + i + + " build artifacts"); + + log ("error", *pr.error); + + if (!pr.message.empty ()) + log ("reason", pr.message); + + if (pr.reference) + log ("reference", *pr.reference); + + for (const manifest_name_value& nv: pr.body) + { + if (!nv.name.empty ()) + log (nv.name, nv.value); + } + + r.status |= result_status::error; + rm.status |= r.status; + + break; + } + } + + // Bail out on the instance archive upload failure. + // + if (!rm.status) + break; + } + catch (const system_error& e) + { + fail << "unable to iterate over " << td << ": " << e; + } + } + } + + result_status rs (rm.status); + // Upload the result. // - result_request_manifest rq {tr.session, move (challenge), move (r)}; + result_request_manifest rq {tr.session, + move (challenge), + agent_checksum, + move (rm)}; { const string& u (*tr.result_url); @@ -1586,7 +3944,20 @@ try try { - serialize_manifest (rq, c.out, u, "task request"); + // Don't break lines in the manifest values not to further increase + // the size of the result request manifest encoded representation. + // Note that this manifest can contain quite a few lines in the + // operation logs, potentially truncated to fit the upload limit + // (see worker/worker.cxx for details). Breaking these lines can + // increase the request size beyond this limit and so we can end up + // with the request failure. + // + serialize_manifest (rq, + c.out, + u, + "result request", + true /* fail_hard */, + true /* long_lines */); } catch (const failed&) {f = true;} @@ -1602,8 +3973,9 @@ try } } - l2 ([&]{trace << "built " << t.name << '/' << t.version << " " - << "on " << t.machine << " " + l2 ([&]{trace << "built " << t.name << '/' << t.version << ' ' + << "status " << rs << ' ' + << "on " << t.machine << ' ' << "for " << url;}); } } |