// file : bbot/agent/agent.cxx -*- C++ -*- // license : MIT; see accompanying LICENSE file #include #include // getpwuid() #include // PATH_MAX #include // signal(), kill() #include // rand_r(), strto[u]ll() #include // strchr() #include // sleep(), getpid(), getuid(), fsync(), [f]stat() #include // getifaddrs(), freeifaddrs() #include // stat, pid_t #include // [f]stat() #include // flock() #include // ifreq #include // sockaddr_in #include // inet_ntop() #include #include #include #include #include #include // thread::hardware_concurrency() #include #include // setw() #include // iota() #include #include // generic_category() #include #include #include #include #include // dir_iterator, try_rmfile(), readsymlink() #include #include #include #include #include #include #include #include #include #include using namespace butl; using namespace bbot; using std::cout; using std::endl; // If RAM minimum is not specified for a machine, then let's assume something // plausible like 256MiB. This way we won't end up with degenerate cases where // we attempt to start a machine with some absurd amount of RAM. // const std::uint64_t default_ram_minimum = 262144; static inline std::uint64_t effective_ram_minimum (const machine_header_manifest& m) { // Note: neither ram_minimum nor ram_maximum should be 0. // assert ((!m.ram_minimum || *m.ram_minimum != 0) && (!m.ram_maximum || *m.ram_maximum != 0)); return (m.ram_minimum ? *m.ram_minimum : (m.ram_maximum && *m.ram_maximum < default_ram_minimum ? *m.ram_maximum : default_ram_minimum)); } static std::mt19937 rand_gen (std::random_device {} ()); // According to the standard, atomic's use in the signal handler is only safe // if it's lock-free. // #if !defined(ATOMIC_INT_LOCK_FREE) || ATOMIC_INT_LOCK_FREE != 2 #error int is not lock-free on this architecture #endif // While we can use memory_order_relaxed in a single-threaded program, let's // use consume/release in case this process becomes multi-threaded in the // future. // static std::atomic sigurs1; using std::memory_order_consume; using std::memory_order_release; extern "C" void handle_signal (int sig) { switch (sig) { case SIGHUP: exit (3); // Unimplemented feature. case SIGTERM: exit (0); case SIGUSR1: sigurs1.fetch_add (1, std::memory_order_release); break; default: assert (false); } } namespace bbot { agent_options ops; const string bs_prot ("1"); string tc_name; uint16_t tc_num; path tc_lock; // Empty if no locking. standard_version tc_ver; string tc_id; uint16_t inst; // 1-based. uint16_t inst_max; // 0 if priority monitoring is disabled. uint16_t offset; string hname; string hip; uid_t uid; string uname; } static void file_sync (const path& f) { auto_fd fd (fdopen (f, fdopen_mode::in)); if (fsync (fd.get ()) != 0) throw_system_error (errno); } static bool file_not_empty (const path& f) { if (file_exists (f)) { file_sync (f); return !file_empty (f); } return false; } // The btrfs tool likes to print informational messages, like "Created // snapshot such and such". Luckily, it writes them to stdout while proper // diagnostics goes to stderr. // template inline void run_btrfs (tracer& t, A&&... a) { if (verb >= 4) run_io (t, fdopen_null (), 2, 2, "btrfs", forward (a)...); else run_io (t, fdopen_null (), fdopen_null (), 2, "btrfs", forward (a)...); } template inline butl::process_exit::code_type btrfs_exit (tracer& t, A&&... a) { return verb >= 4 ? run_io_exit (t, fdopen_null (), 2, 2, "btrfs", forward (a)...) : run_io_exit (t, fdopen_null (), fdopen_null (), 2, "btrfs", forward (a)...); } // Bootstrap a build machine. Return the bootstrapped machine manifest if // successful and nullopt otherwise (in which case the caller should clean up // the machine directory and ignore the machine for now). // static optional bootstrap_build_machine (const dir_path& md, const machine_manifest& mm, optional obmm) { tracer trace ("bootstrap_build_machine", md.string ().c_str ()); bootstrapped_machine_manifest r { mm, toolchain_manifest {tc_id.empty () ? "bogus" : tc_id}, bootstrap_manifest { bootstrap_manifest::versions_type { {"bbot", standard_version (BBOT_VERSION_STR)}, {"libbbot", standard_version (LIBBBOT_VERSION_STR)}, {"libbpkg", standard_version (LIBBPKG_VERSION_STR)}, {"libbutl", standard_version (LIBBUTL_VERSION_STR)} } } }; if (ops.fake_bootstrap ()) { r.machine.mac = "de:ad:be:ef:de:ad"; } else try { // Note: similar code in bootstrap_auxiliary_machine(). // Start the TFTP server (server chroot is --tftp). Map: // // GET requests to .../toolchains//* // PUT requests to .../bootstrap/-/* // const string in_name (tc_name + '-' + to_string (inst)); auto_rmdir arm ((dir_path (ops.tftp ()) /= "bootstrap") /= in_name); try_mkdir_p (arm.path); // Bootstrap result manifest. // path mf (arm.path / "bootstrap.manifest"); try_rmfile (mf); // @@ TMP BC: also check for the old manifest name until we migrate all // the machines. // path mfo (arm.path / "manifest"); try_rmfile (mfo); // Note that unlike build, here we use the same VM snapshot for retries, // which is not ideal. // for (size_t retry (0);; ++retry) { tftp_server tftpd ("Gr ^/?(.+)$ /toolchains/" + tc_name + "/\\1\n" + "Pr ^/?(.+)$ /bootstrap/" + in_name + "/\\1\n", ops.tftp_port () + offset + 0 /* build machine */); l3 ([&]{trace << "tftp server on port " << tftpd.port ();}); // Start the machine. // unique_ptr m ( start_machine (md, mm, 0 /* machine_num (build) */, ops.cpu (), ops.build_ram (), obmm ? obmm->machine.mac : nullopt, ops.bridge (), tftpd.port (), false /* pub_vnc */)); { // If we are terminating with an exception then force the machine down. // Failed that, the machine's destructor will block waiting for its // completion. // auto mg ( make_exception_guard ( [&m, &md] () { if (m != nullptr) { info << "trying to force machine " << md << " down"; try {m->forcedown (false);} catch (const failed&) {} } })); // What happens if the bootstrap process hangs? The simple thing would // be to force the machine down after some timeout and then fail. But // that won't be very helpful for investigating the cause. So instead // the plan is to suspend it after some timeout, issue diagnostics // (without failing and which Build OS monitor will relay to the // operator), and wait for the external intervention. // auto soft_fail = [&md, &m] (const char* msg) { { diag_record dr (error); dr << msg << " for machine " << md << ", suspending"; m->print_info (dr); } try { m->suspend (false); m->wait (false); m->cleanup (); info << "resuming after machine suspension"; // Note: snapshot cleaned up by the caller. } catch (const failed&) {} return nullopt; }; // Check whether the machine is still running issuing diagnostics and // returning false if it unexpectedly terminated. // auto check_machine = [&md, &m] () { try { size_t t (0); if (!m->wait (t /* seconds */, false /* fail_hard */)) return true; // Still running. // Exited successfully. } catch (const failed&) { // Failed, exit code diagnostics has already been issued. } diag_record dr (error); dr << "machine " << md << " exited unexpectedly"; m->print_info (dr); return false; }; // The first request should be the toolchain download. Wait for up to // 5 minutes (by default) for that to arrive. In a sense we use it as // an indication that the machine has booted and the bootstrap process // has started. Why wait so long you may wonder? Well, we may be using // a new MAC address and operating systems like Windows may need to // digest that. // size_t to; const size_t startup_to (ops.bootstrap_startup ()); const size_t bootstrap_to (ops.bootstrap_timeout ()); const size_t shutdown_to (5 * 60); // Wait periodically making sure the machine is still alive. // for (to = startup_to; to != 0; ) { if (tftpd.serve (to, 2)) break; if (!check_machine ()) { return nullopt; // Note: snapshot cleaned up by the caller. } } // This can mean two things: machine mis-configuration or what we // euphemistically call a "mis-boot": the VM failed to boot for some // unknown/random reason. Mac OS is particularly know for suffering // from this. So the strategy is to retry it a couple of times and // then suspend for investigation. // if (to == 0) { if (retry > ops.bootstrap_retries ()) return soft_fail ("bootstrap startup timeout"); // Note: keeping the logs behind (no cleanup). diag_record dr (warn); dr << "machine " << mm.name << " mis-booted, retrying"; m->print_info (dr); try {m->forcedown (false);} catch (const failed&) {} m = nullptr; // Disable exceptions guard above. continue; } l3 ([&]{trace << "completed startup in " << startup_to - to << "s";}); // Next the bootstrap process may download additional toolchain // archives, build things, and then upload the result manifest. So on // our side we serve TFTP requests while periodically checking for the // manifest file. To workaround some obscure filesystem races (the // file's mtime/size is updated several seconds later; maybe tmpfs // issue?), we periodically re-check. // for (to = bootstrap_to; to != 0; ) { if (tftpd.serve (to, 2)) continue; if (!check_machine ()) { // The exit/upload is racy so we re-check. // if (!(file_not_empty (mf) || file_not_empty (mfo))) { return nullopt; // Note: snapshot cleaned up by the caller. } } bool old (false); if (file_not_empty (mf) || (old = file_not_empty (mfo))) { if (old) mf = move (mfo); // Wait for 5 seconds of inactivity. This is our desperate attempt // at handling interrupted uploads. // if (!tftpd.serve (to, 5)) break; } } if (to == 0) return soft_fail ("bootstrap timeout"); l3 ([&]{trace << "completed bootstrap in " << bootstrap_to - to << "s";}); // Shut the machine down cleanly. // if (!m->shutdown ((to = shutdown_to))) return soft_fail ("bootstrap shutdown timeout"); l3 ([&]{trace << "completed shutdown in " << shutdown_to - to << "s";}); m->cleanup (); } // Parse the result manifest. // r.bootstrap = parse_manifest (mf, "bootstrap"); r.machine.mac = m->mac; // Save the MAC address. break; } } catch (const system_error& e) { fail << "bootstrap error: " << e; } serialize_manifest (r, md / "manifest", "bootstrapped machine"); return r; } // Bootstrap an auxiliary machine. Return the bootstrapped machine manifest if // successful and nullopt otherwise (in which case the caller should clean up // the machine directory and ignore the machine for now). // static vector divide_auxiliary_ram (const vector&); static optional bootstrap_auxiliary_machine (const dir_path& md, const machine_manifest& mm, optional obmm) { tracer trace ("bootstrap_auxiliary_machine", md.string ().c_str ()); bootstrapped_machine_manifest r { mm, toolchain_manifest {}, // Unused for auxiliary, bootstrap_manifest {} // Unused for auxiliary. }; if (ops.fake_bootstrap ()) { r.machine.mac = "de:ad:be:ef:de:ad"; } else try { // Similar to bootstrap_build_machine() except here we just wait for the // upload of the environment. // Start the TFTP server (server chroot is --tftp). Map: // // GET requests to /dev/null // PUT requests to .../bootstrap/-/* // const string in_name (tc_name + '-' + to_string (inst)); auto_rmdir arm ((dir_path (ops.tftp ()) /= "bootstrap") /= in_name); try_mkdir_p (arm.path); // Environment upload. // path ef (arm.path / "environment"); try_rmfile (ef); // Note that unlike build, here we use the same VM snapshot for retries, // which is not ideal. // for (size_t retry (0);; ++retry) { tftp_server tftpd ("Gr ^/?(.+)$ " + string ("/dev/null") + '\n' + "Pr ^/?(.+)$ /bootstrap/" + in_name + "/\\1\n", ops.tftp_port () + offset + 1 /* auxiliary machine */); l3 ([&]{trace << "tftp server on port " << tftpd.port ();}); // If the machine specified RAM minimum, use that to make sure the // machine can actually function with this amount of RAM. Otherwise, use // the minium of RAM maximum (if specified) and the available auxiliary // RAM (so we know this machine will at least work alone). For the // latter case use divide_auxiliary_ram() to be consistent with the // build case (see that function implementation for nuances). // size_t ram; if (mm.ram_minimum) ram = *mm.ram_minimum; else { vector rams (divide_auxiliary_ram ({&mm})); assert (!rams.empty ()); // We should have skipped such a machine. ram = rams.front (); } // Start the machine. // unique_ptr m ( start_machine (md, mm, 1 /* machine_num (first auxiliary) */, ops.cpu (), ram, obmm ? obmm->machine.mac : nullopt, ops.bridge (), tftpd.port (), false /* pub_vnc */)); { // NOTE: see bootstrap_build_machine() for comments. auto mg ( make_exception_guard ( [&m, &md] () { if (m != nullptr) { info << "trying to force machine " << md << " down"; try {m->forcedown (false);} catch (const failed&) {} } })); auto soft_fail = [&md, &m] (const char* msg) { { diag_record dr (error); dr << msg << " for machine " << md << ", suspending"; m->print_info (dr); } try { m->suspend (false); m->wait (false); m->cleanup (); info << "resuming after machine suspension"; // Note: snapshot cleaned up by the caller. } catch (const failed&) {} return nullopt; }; auto check_machine = [&md, &m] () { try { size_t t (0); if (!m->wait (t /* seconds */, false /* fail_hard */)) return true; // Still running. // Exited successfully. } catch (const failed&) { // Failed, exit code diagnostics has already been issued. } diag_record dr (error); dr << "machine " << md << " exited unexpectedly"; m->print_info (dr); return false; }; // Wait up to the specified timeout for the auxiliary machine to // bootstrap. Note that such a machine may do extra setup work on the // first boot (such as install some packages, etc) which may take some // time. // size_t to; const size_t bootstrap_to (ops.bootstrap_auxiliary ()); const size_t shutdown_to (5 * 60); // Serve TFTP requests while periodically checking for the environment // file. // for (to = bootstrap_to; to != 0; ) { if (tftpd.serve (to, 2)) continue; if (!check_machine ()) { if (!file_not_empty (ef)) { return nullopt; // Note: snapshot cleaned up by the caller. } } if (file_not_empty (ef)) { if (!tftpd.serve (to, 5)) break; } } if (to == 0) { if (retry > ops.bootstrap_retries ()) return soft_fail ("bootstrap timeout"); // Note: keeping the logs behind (no cleanup). diag_record dr (warn); dr << "machine " << mm.name << " mis-booted, retrying"; m->print_info (dr); try {m->forcedown (false);} catch (const failed&) {} m = nullptr; // Disable exceptions guard above. continue; } l3 ([&]{trace << "completed bootstrap in " << bootstrap_to - to << "s";}); // Shut the machine down cleanly. // if (!m->shutdown ((to = shutdown_to))) return soft_fail ("bootstrap shutdown timeout"); l3 ([&]{trace << "completed shutdown in " << shutdown_to - to << "s";}); m->cleanup (); } r.machine.mac = m->mac; // Save the MAC address. break; } } catch (const system_error& e) { fail << "bootstrap error: " << e; } serialize_manifest (r, md / "manifest", "bootstrapped machine"); return r; } // Global toolchain lock. // // The overall locking protocol is as follows: // // 1. Before enumerating the machines each agent instance acquires the global // toolchain lock. // // 2. As the agent enumerates over the machines, it tries to acquire the lock // for each machine. // // 3. If the agent encounters a machine that it needs to bootstrap, it // releases all the other machine locks followed by the global lock, // proceeds to bootstrap the machine, releases its lock, and restarts the // process from scratch. // // 4. Otherwise, upon receiving a task response for one of the machines (plus, // potentially, a number of auxiliary machines), the agent releases all the // other machine locks followed by the global lock, proceeds to perform the // task on the selected machine(s), releases their locks, and restarts the // process from scratch. // // One notable implication of this protocol is that the machine locks are // only acquired while holding the global toolchain lock but can be released // while not holding this lock. // // (Note that because of this implication it can theoretically be possible // to omit acquiring all the machine locks during the enumeration process, // instead only acquiring the lock of the machine we need to bootstrap or // build. However, the current approach is simpler since we still need // to detect machines that are already locked, which entails acquiring // the lock anyway.) // // Note that unlike the machine lock below, here we don't bother with removing // the lock file. // class toolchain_lock { public: toolchain_lock () = default; // Empty lock. // Note: returns true if locking is disabled. // bool locked () const { return tc_lock.empty () || fl_; } void unlock (bool ignore_errors = false) { if (fl_) { fl_ = false; // We have tried. if (flock (fd_.get (), LOCK_UN) != 0 && !ignore_errors) throw_generic_error (errno); } } ~toolchain_lock () { unlock (true /* ignore_errors */); } toolchain_lock (toolchain_lock&&) = default; toolchain_lock& operator= (toolchain_lock&&) = default; toolchain_lock (const toolchain_lock&) = delete; toolchain_lock& operator= (const toolchain_lock&) = delete; // Implementation details. // public: explicit toolchain_lock (auto_fd&& fd) : fd_ (move (fd)), fl_ (true) {} private: auto_fd fd_; bool fl_ = false; }; // Note: returns empty lock if toolchain locking is disabled. // static optional lock_toolchain (unsigned int timeout) { if (tc_lock.empty ()) return toolchain_lock (); auto_fd fd (fdopen (tc_lock, fdopen_mode::out | fdopen_mode::create)); for (; flock (fd.get (), LOCK_EX | LOCK_NB) != 0; sleep (1), --timeout) { if (errno != EWOULDBLOCK) throw_generic_error (errno); if (timeout == 0) return nullopt; } return toolchain_lock (move (fd)); } // Per-toolchain machine lock. // // We use flock(2) which is straightforward. The tricky part is cleaning the // file up. Here we may have a race when two processes are trying to open & // lock the file that is being unlocked & removed by a third process. In this // case one of these processes may still open the old file. To resolve this, // after opening and locking the file, we verify that a new file hasn't // appeared by stat'ing the path and file descriptor and comparing the inodes. // // Note that converting a lock (shared to exclusive or vice versa) is not // guaranteed to be atomic (in case later we want to support exclusive // bootstrap and shared build). // // Note also that we per-toolchain lock auxiliary machines even though they // are not toolchain-specific. Doing it this way allows us to handle both // types of machines consistently with regards to priorities, interrupts, etc. // It also means we will have each auxiliary machine available per-toolchain // rather than a single machine shared between all the toolchains, which is // a good thing. // class machine_lock { public: // A lock is either locked by this process or it contains information about // the process holding the lock. // pid_t pid; // Process using the machine. optional prio; // Task priority (absent means being bootstrapped // or have been suspended). machine_lock () = default; // Uninitialized lock. bool locked () const { return fl_; } void unlock (bool ignore_errors = false) { if (fl_) { fl_ = false; // We have tried. if (fd_ != nullfd) { try_rmfile (fp_, ignore_errors); if (flock (fd_.get (), LOCK_UN) != 0 && !ignore_errors) throw_generic_error (errno); } } } // Write the holding process information to the lock file. // // Must be called while holding the toolchain lock (see the lock_machine() // implementation for rationale). // void bootstrap (const toolchain_lock& tl) { assert (tl.locked () && fl_); if (fd_ != nullfd) write (nullopt); } void perform_task (const toolchain_lock& tl, uint64_t prio) { assert (tl.locked () && fl_); if (fd_ != nullfd) write (prio); } // Truncate the holding process information after the call to perform_task() // so that it doesn't contain the priority, marking the machine as being // suspended. // // Note that this one can be called without holding the toolchain lock. // void suspend_task () { assert (fl_); if (fd_ != nullfd) { assert (tp_ != 0); // Must be called after perform_task(). // While there is no direct statement to this effect in POSIX, the // consensus on the internet is that truncation is atomic, in a sense // that the reader shouldn't see a partially truncated content. Feels // like should be doubly so when actually truncating as opposed to // extending the size, which is what we do. // fdtruncate (fd_.get (), tp_); } } ~machine_lock () { unlock (true /* ignore_errors */); } machine_lock (machine_lock&&) = default; machine_lock& operator= (machine_lock&&) = default; machine_lock (const machine_lock&) = delete; machine_lock& operator= (const machine_lock&) = delete; // Implementation details. // public: // If fd is nullfd, treat it as a fake lock (used for fake machines). // machine_lock (path&& fp, auto_fd&& fd) : fp_ (move (fp)), fd_ (move (fd)), fl_ (true) {} machine_lock (pid_t pi, optional pr) : pid (pi), prio (pr), fl_ (false) {} private: void write (optional prio) { pid_t pid (getpid ()); string l (to_string (pid)); if (prio) { tp_ = l.size (); // Truncate position. l += ' '; l += to_string (*prio); } auto n (fdwrite (fd_.get (), l.c_str (), l.size ())); if (n == -1) throw_generic_ios_failure (errno); if (static_cast (n) != l.size ()) throw_generic_ios_failure (EFBIG); } private: path fp_; auto_fd fd_; bool fl_ = false; uint64_t tp_ = 0; // Truncate position. }; // Try to lock the machine given its - directory. Return unlocked // lock with pid/prio if already in use. Must be called while holding the // toolchain lock. // static machine_lock lock_machine (const toolchain_lock& tl, const dir_path& tp) { assert (tl.locked ()); path fp (tp + ".lock"); // The -.lock file. for (;;) { auto_fd fd (fdopen (fp, (fdopen_mode::in | fdopen_mode::out | fdopen_mode::create))); if (flock (fd.get (), LOCK_EX | LOCK_NB) != 0) { if (errno == EWOULDBLOCK) { // The file should contain a line in the following format: // // [ ] // char buf[64]; // Sufficient for 2 64-bit numbers (20 decimals max). auto sn (fdread (fd.get (), buf, sizeof (buf))); if (sn == -1) throw_generic_ios_failure (errno); size_t n (static_cast (sn)); // While there would be a race between locking the file then writing // to it in one process and reading from it in another process, we are // protected by the global toolchain lock, which must be held by both // sides during this dance. // assert (n > 0 && n < sizeof (buf)); buf[n] = '\0'; // Note also that it's possible that by the time we read the pid/prio // the lock has already been released. But this case is no different // from the lock being released after we have read pid/prio but before // acting on this information (e.g., trying to interrupt the other // process), which we have to deal with anyway. // pid_t pid; optional prio; { char* p (strchr (buf, ' ')); char* e; { errno = 0; pid = strtoll (buf, &e, 10); // Note: pid_t is signed. assert (errno != ERANGE && e != buf && (p != nullptr ? e == p : *e == '\0')); } if (p != nullptr) { ++p; errno = 0; prio = strtoull (p, &e, 10); assert (errno != ERANGE && e != p && *e == '\0'); } } return machine_lock (pid, prio); } throw_generic_error (errno); } struct stat st1, st2; if (fstat (fd.get (), &st1) != 0 || stat (fp.string ().c_str (), &st2) != 0 ) // Both should succeed. throw_generic_error (errno); if (st1.st_ino == st2.st_ino) return machine_lock (move (fp), move (fd)); // Retry (note: lock is unlocked by auto_fd::close()). } } // Given the toolchain directory (-) return the snapshot path in // the -- form. // // We include the instance number into for debuggability. // static inline dir_path snapshot_path (const dir_path& tp) { return tp.directory () /= path::traits_type::temp_name (tp.leaf ().string () + '-' + to_string (inst)); } // Compare bbot and library versions returning -1 if older, 0 if the same, // and +1 if newer. // static int compare_bbot (const bootstrap_manifest& m) { auto cmp = [&m] (const string& n, const char* v) -> int { standard_version sv (v); auto i = m.versions.find (n); return (i == m.versions.end () || i->second < sv ? -1 : i->second > sv ? 1 : 0); }; // Start from the top assuming a new dependency cannot be added without // changing the dependent's version. // int r; return ( (r = cmp ("bbot", BBOT_VERSION_STR)) != 0 ? r : (r = cmp ("libbbot", LIBBBOT_VERSION_STR)) != 0 ? r : (r = cmp ("libbpkg", LIBBPKG_VERSION_STR)) != 0 ? r : (r = cmp ("libbutl", LIBBUTL_VERSION_STR)) != 0 ? r : 0); }; // Return the global toolchain lock and the list of available machines, // (re-)bootstrapping them if necessary. // // Note that this function returns both machines that this process managed to // lock as well as the machines locked by other processes (including those // that are being bootstrapped or that have been suspended), in case the // caller needs to interrupt one of them for a higher-priority task. In the // latter case, the manifest is empty if the machine is bootstrapping or // suspended and only has the machine_manifest information otherwise. (The // bootstrapping/suspended machines have to be returned to get the correct // count of currently active instances for the inst_max comparison.) // // Note that both build and auxiliary machines are returned. For auxiliary, // toolchain and bootstrap manifests are unused and therefore always empty. // struct bootstrapped_machine { machine_lock lock; const dir_path path; bootstrapped_machine_manifest manifest; }; using bootstrapped_machines = vector; static pair enumerate_machines (const dir_path& machines) try { tracer trace ("enumerate_machines", machines.string ().c_str ()); for (;;) // From-scratch retry loop for after bootstrap (see below). { pair pr; { optional l; while (!(l = lock_toolchain (60 /* seconds */))) { // One typical situation where this can happen is when another agent // takes a while to request a task (e.g., due to network issues). So // this is an info as opposed to a warning. // info << "unable to acquire global toolchain lock " << tc_lock << " for 60s"; } pr.first = move (*l); } toolchain_lock& tl (pr.first); bootstrapped_machines& r (pr.second); if (ops.fake_machine_specified ()) { auto mh ( parse_manifest ( ops.fake_machine (), "machine header")); r.push_back ( bootstrapped_machine { machine_lock (path (), nullfd), // Fake lock. dir_path (ops.machines ()) /= mh.name, // For diagnostics. bootstrapped_machine_manifest { machine_manifest { move (mh.id), move (mh.name), move (mh.summary), machine_type::kvm, string ("de:ad:be:ef:de:ad"), nullopt, strings ()}, toolchain_manifest {tc_id}, bootstrap_manifest {}}}); return pr; } // Notice and warn if there are no build machines (as opposed to all of // them being busy). // bool none (true); // We used to (re)-bootstrap machines as we are iterating. But with the // introduction of the priority monitoring functionality we need to // respect the --instance-max value. Which means we first need to try to // lock all the machines in order to determine how many of them are busy // then check this count against --instance-max, and only bootstrap if we // are not over the limit. Which means we have to store all the // information about a (first) machine that needs bootstrapping until // after we have enumerated all of them. // struct pending_bootstrap { machine_lock ml; dir_path tp; // - dir_path xp; // -- machine_manifest mm; optional bmm; }; optional pboot; // The first level are machine volumes. // for (const dir_entry& ve: dir_iterator (machines, dir_iterator::no_follow)) { const string vn (ve.path ().string ()); // Ignore hidden directories. // if (ve.type () != entry_type::directory || vn[0] == '.') continue; const dir_path vd (dir_path (machines) /= vn); // Inside we have machines. // try { for (const dir_entry& me: dir_iterator (vd, dir_iterator::no_follow)) { const string mn (me.path ().string ()); if (me.type () != entry_type::directory || mn[0] == '.') continue; const dir_path md (dir_path (vd) /= mn); // Our endgoal here is to obtain a bootstrapped snapshot of this // machine while watching out for potential race conditions (other // instances as well as machines being added/upgraded/removed; see // the manual for details). // // So here is our overall plan: // // 1. Resolve current subvolume link for our bootstrap protocol. // // 2. Lock the machine. This excludes any other instance from trying // to perform the following steps. // // 3. If there is no link, cleanup old bootstrap (if any) and ignore // this machine. // // 4. Try to create a snapshot of current subvolume (this operation // is atomic). If failed (e.g., someone changed the link and // removed the subvolume in the meantime), retry from #1. // // 5. Compare the snapshot to the already bootstrapped version (if // any) and see if we need to re-bootstrap. If so, use the // snapshot as a starting point. Rename to bootstrapped at the // end (atomic). // dir_path lp (dir_path (md) /= (mn + '-' + bs_prot)); // -

dir_path tp (dir_path (md) /= (mn + '-' + tc_name)); // - auto delete_bootstrapped = [&tp, &trace] () // Delete -. { run_btrfs (trace, "property", "set", "-ts", tp, "ro", "false"); run_btrfs (trace, "subvolume", "delete", tp); }; for (size_t retry (0);; ++retry) { if (retry != 0) sleep (1); // Resolve the link to subvolume path. // dir_path sp; // -

. try { sp = path_cast (readsymlink (lp)); if (sp.relative ()) sp = md / sp; } catch (const system_error& e) { // Leave the subvolume path empty if the subvolume link doesn't // exist and fail on any other error. // if (e.code ().category () != std::generic_category () || e.code ().value () != ENOENT) fail << "unable to read subvolume link " << lp << ": " << e; } // Try to lock the machine. // machine_lock ml (lock_machine (tl, tp)); if (!ml.locked ()) { machine_manifest mm; if (ml.prio) { // Get the machine manifest (subset of the steps performed for // the locked case below). // // Note that it's possible the machine we get is not what was // originally locked by the other process (e.g., it has been // upgraded since). It's also possible that if and when we // interrupt and lock this machine, it will be a different // machine (e.g., it has been upgraded since we read this // machine manifest). To deal with all of that we will be // reloading this information if/when we acquire the lock to // this machine. // if (sp.empty ()) { l3 ([&]{trace << "skipping " << md << ": no subvolume link";}); break; } l3 ([&]{trace << "keeping " << md << ": locked by " << ml.pid << " with priority " << *ml.prio;}); mm = parse_manifest ( sp / "manifest", "machine"); none = none && mm.effective_role () == machine_role::auxiliary; } else // Bootstrapping/suspended. { l3 ([&]{trace << "keeping " << md << ": being bootstrapped " << "or suspened by " << ml.pid;}); // Assume it is a build machine (we cannot determine whether // it is build or auxiliary without loading its manifest). // none = false; } // Add the machine to the lists and bail out. // r.push_back (bootstrapped_machine { move (ml), move (tp), bootstrapped_machine_manifest {move (mm), {}, {}}}); break; } bool te (dir_exists (tp)); // If the resolution fails, then this means there is no current // machine subvolume (for this bootstrap protocol). In this case // we clean up our toolchain subvolume (-, if any) and // ignore this machine. // if (sp.empty ()) { if (te) delete_bootstrapped (); l3 ([&]{trace << "skipping " << md << ": no subvolume link";}); break; } // -- // dir_path xp (snapshot_path (tp)); if (btrfs_exit (trace, "subvolume", "snapshot", sp, xp) != 0) { if (retry >= 10) fail << "unable to snapshot subvolume " << sp; continue; } // Load the (original) machine manifest. // machine_manifest mm ( parse_manifest (sp / "manifest", "machine")); bool aux (mm.effective_role () == machine_role::auxiliary); // Skip machines for which we don't have sufficient RAM. // if (effective_ram_minimum (mm) > (aux ? ops.auxiliary_ram () : ops.build_ram ())) { l3 ([&]{trace << "skipping " << md << ": insufficient RAM";}); run_btrfs (trace, "subvolume", "delete", xp); break; } none = none && aux; // If we already have -, see if it needs to be // re-bootstrapped. Things that render it obsolete: // // 1. New machine revision (compare machine ids). // 2. New toolchain (compare toolchain ids, not auxiliary). // 3. New bbot/libbbot (compare versions, not auxiliary). // // The last case has a complication: what should we do if we have // bootstrapped a newer version of bbot? This would mean that we // are about to be stopped and upgraded (and the upgraded version // will probably be able to use the result). So we simply ignore // this machine for this run. // // Note: see similar code in the machine interruption logic. // optional bmm; if (te) { bmm = parse_manifest ( tp / "manifest", "bootstrapped machine"); if (bmm->machine.id != mm.id) { l3 ([&]{trace << "re-bootstrap " << tp << ": new machine";}); te = false; } if (!aux) { if (!tc_id.empty () && bmm->toolchain.id != tc_id) { l3 ([&]{trace << "re-bootstrap " << tp << ": new toolchain";}); te = false; } if (int i = compare_bbot (bmm->bootstrap)) { if (i < 0) { l3 ([&]{trace << "re-bootstrap " << tp << ": new bbot";}); te = false; } else { l3 ([&]{trace << "ignoring " << tp << ": old bbot";}); run_btrfs (trace, "subvolume", "delete", xp); break; } } } if (!te) delete_bootstrapped (); } else l3 ([&]{trace << "bootstrap " << tp;}); if (!te) { // Ignore any other machines that need bootstrapping. // if (!pboot) { pboot = pending_bootstrap { move (ml), move (tp), move (xp), move (mm), move (bmm)}; } else run_btrfs (trace, "subvolume", "delete", xp); break; } else run_btrfs (trace, "subvolume", "delete", xp); // Add the machine to the lists. // r.push_back ( bootstrapped_machine {move (ml), move (tp), move (*bmm)}); break; } // Retry loop. } // Inner dir_iterator loop. } catch (const system_error& e) { fail << "unable to iterate over " << vd << ": " << e; } } // Outer dir_iterator loop. // See if there is a pending bootstrap and whether we can perform it. // // What should we do if we can't (i.e., we are in the priority monitor // mode)? Well, we could have found some machines that are already // bootstrapped (busy or not) and there may be a higher-priority task for // one of them, so it feels natural to return whatever we've got. // if (pboot) { dir_path& tp (pboot->tp); dir_path& xp (pboot->xp); // Determine how many machines are busy (locked by other processes) and // make sure it's below the --instance-max limit, if specified. // // We should only count build machines unless being bootstrapped (see // above). // if (inst_max != 0) { size_t busy (0); for (const bootstrapped_machine& m: r) { if (!m.lock.locked () && (!m.lock.prio || m.manifest.machine.effective_role () != machine_role::auxiliary)) ++busy; } assert (busy <= inst_max); if (busy == inst_max) { l3 ([&]{trace << "instance max reached attempting to bootstrap " << tp;}); run_btrfs (trace, "subvolume", "delete", xp); return pr; } } machine_lock& ml (pboot->ml); l3 ([&]{trace << "bootstrapping " << tp;}); // Use the -- snapshot that we have made to bootstrap // the new machine. Then atomically rename it to -. // // Also release all the machine locks that we have acquired so far as // well as the global toolchain lock, since the bootstrap will take a // while and other instances might be able to use them. Because we are // releasing the global lock, we have to restart the enumeration process // from scratch. // r.clear (); ml.bootstrap (tl); tl.unlock (); bool aux (pboot->mm.effective_role () == machine_role::auxiliary); optional bmm ( aux ? bootstrap_auxiliary_machine (xp, pboot->mm, move (pboot->bmm)) : bootstrap_build_machine (xp, pboot->mm, move (pboot->bmm))); if (!bmm) { l3 ([&]{trace << "ignoring " << tp << ": failed to bootstrap";}); run_btrfs (trace, "subvolume", "delete", xp); continue; } try { mvdir (xp, tp); } catch (const system_error& e) { fail << "unable to rename " << xp << " to " << tp; } l2 ([&]{trace << "bootstrapped " << bmm->machine.name;}); // Check the bootstrapped bbot version as above and ignore this build // machine if it's newer than us. // if (!aux) { if (int i = compare_bbot (bmm->bootstrap)) { if (i > 0) l3 ([&]{trace << "ignoring " << tp << ": old bbot";}); else warn << "bootstrapped " << tp << " bbot worker is older " << "than agent; assuming test setup"; } } continue; // Re-enumerate from scratch. } if (none) warn << "no build machines for toolchain " << tc_name; return pr; } // From-scratch retry loop. // Unreachable. } catch (const system_error& e) { fail << "unable to iterate over " << machines << ": " << e << endf; } // Perform the build task throwing interrupt if it has been interrupted. // struct interrupt {}; // Start an auxiliary machine (steps 1-3 described in perfrom_task() below). // // Note that if the returned machine is NULL, then it means it has failed to // start up (in which case the diagnostics has already been issued and // snapshot cleaned up). // // Note: can throw interrupt. // struct auxiliary_machine_result { dir_path snapshot; unique_ptr machine; }; using auxiliary_machine_results = vector; static pair start_auxiliary_machine (bootstrapped_machine& am, const string& env_name, uint16_t machine_num, size_t ram, const string& in_name, // - const dir_path& tftp_put_dir, optional boost_cpus) try { tracer trace ("start_auxiliary_machine", am.path.string ().c_str ()); // NOTE: a simplified version of perform_task() below. machine_lock& ml (am.lock); const dir_path& md (am.path); const bootstrapped_machine_manifest& mm (am.manifest); path ef (tftp_put_dir / "environment"); // Environment upload file. path efm (ef + '-' + mm.machine.name); // Environment upload saved file. try_rmfile (ef); try_rmfile (efm); // -- // const dir_path xp (snapshot_path (md)); for (size_t retry (0);; ++retry) { if (retry != 0) run_btrfs (trace, "subvolume", "delete", xp); run_btrfs (trace, "subvolume", "snapshot", md, xp); // Start the TFTP server. Map: // // GET requests to /dev/null // PUT requests to .../build/-/put/* // // Note that we only need to run the TFTP server until we get the // environment upload. Which means we could have reused the same port as // the build machine. But let's keep things parallel to the VNC ports and // use a seperate TFTP port for each auxiliary machine. // tftp_server tftpd ("Gr ^/?(.+)$ " + string ("/dev/null") + '\n' + "Pr ^/?(.+)$ /build/" + in_name + "/put/\\1\n", ops.tftp_port () + offset + machine_num); l3 ([&]{trace << "tftp server on port " << tftpd.port ();}); // Note: the machine handling logic is similar to bootstrap. Except here // we have to cleanup the snapshot ourselves in case of suspension or // unexpected exit. // Start the machine. // // Note that for now we don't support logging in into auxiliary machines // in the interactive mode. Maybe one day. // unique_ptr m ( start_machine (xp, mm.machine, machine_num, boost_cpus ? *boost_cpus : ops.cpu (), ram, mm.machine.mac, ops.bridge (), tftpd.port (), false /* public_vnc */)); auto mg ( make_exception_guard ( [&m, &xp] () { if (m != nullptr) { info << "trying to force machine " << xp << " down"; try {m->forcedown (false);} catch (const failed&) {} } })); auto soft_fail = [&trace, &ml, &xp, &m] (const char* msg) { { diag_record dr (error); dr << msg << " for machine " << xp << ", suspending"; m->print_info (dr); } try { // Update the information in the machine lock to signal that the // machine is suspended and cannot be interrupted. // ml.suspend_task (); m->suspend (false); m->wait (false); m->cleanup (); run_btrfs (trace, "subvolume", "delete", xp); info << "resuming after machine suspension"; } catch (const failed&) {} return make_pair (auxiliary_machine_result {move (xp), nullptr}, string ()); }; auto check_machine = [&xp, &m] () { try { size_t t (0); if (!m->wait (t /* seconds */, false /* fail_hard */)) return true; } catch (const failed&) {} diag_record dr (warn); dr << "machine " << xp << " exited unexpectedly"; m->print_info (dr); return false; }; auto check_interrupt = [&trace, &xp, &m] () { if (sigurs1.load (std::memory_order_consume) == 0) return; l2 ([&]{trace << "machine " << xp << " interruped";}); try {m->forcedown (false);} catch (const failed&) {} m->cleanup (); m = nullptr; // Disable exceptions guard above. run_btrfs (trace, "subvolume", "delete", xp); throw interrupt (); }; // Wait for up to 4 minutes (by default) for the environment upload (the // same logic as in bootstrap_auxiliary_machine() except here the machine // cannot just exit). // size_t to; const size_t startup_to (ops.build_startup ()); for (to = startup_to; to != 0; ) { check_interrupt (); if (tftpd.serve (to, 2)) continue; if (!check_machine ()) { // An auxiliary machine should not just exit. // return make_pair (auxiliary_machine_result {move (xp), nullptr}, string ()); } if (file_not_empty (ef)) { if (!tftpd.serve (to, 5)) break; } } if (to == 0) { if (retry > ops.build_retries ()) return soft_fail ("build startup timeout"); // Note: keeping the logs behind (no cleanup). diag_record dr (warn); dr << "machine " << mm.machine.name << " mis-booted, retrying"; m->print_info (dr); try {m->forcedown (false);} catch (const failed&) {} m = nullptr; // Disable exceptions guard above. continue; } l3 ([&]{trace << "completed startup in " << startup_to - to << "s";}); // Read the uploaded environment and, if necessary, append the name prefix // (which we first make a valid C identifier and uppercase). // // Note that it may seem like a good idea to validate the format here. // But that means we will essentially need to parse it twice (here and in // worker). Plus, in worker we can comminucate some diagnostics by writing // it to the build log (here all we can easily do is abort the task). So // here we just append the name prefix to trimmed non-blank/comment lines. // string env_pfx (env_name.empty () ? string () : ucase (sanitize_identifier (env_name)) + '_'); string env; try { ifdstream is (ef, ifdstream::badbit); for (string l; !eof (getline (is, l)); ) { trim (l); if (!env_pfx.empty () && !l.empty () && l.front () != '#') l.insert (0, env_pfx); env += l; env += '\n'; } } catch (const io_error& e) { fail << "unable to read from " << ef << ": " << e; } // Rename and keep the environment file for debugging (it will be removed // at the end as part of the tftp_put_dir cleanup). // mvfile (ef, efm); return make_pair (auxiliary_machine_result {move (xp), move (m)}, move (env)); } // Unreachable. } catch (const system_error& e) { fail << "auxiliary machine startup error: " << e << endf; } // Divide the auxiliary RAM among the specified machines. // // Issue diagnostics and return empty vector if the auxiliary RAM is // insufficient. // static vector // Parallel to mms. divide_auxiliary_ram (const vector& mms) { size_t ram (ops.auxiliary_ram ()); vector rams; vector rnds; // Allocation rounds (see below). // First pass: allocate the minimums. // for (const machine_header_manifest* mm: mms) { size_t v (effective_ram_minimum (*mm)); assert (!mm->ram_maximum || v <= *mm->ram_maximum); // Sanity check. rams.push_back (v); rnds.push_back (0); if (ram >= v) ram -= v; else { diag_record dr (error); dr << "insufficient auxiliary RAM " << ops.auxiliary_ram () << "KiB"; for (size_t i (0); i != rams.size (); ++i) dr << info << mms[i]->name << " requires minimum " << rams[i] << "KiB"; return {}; } } // Second pass: distribute the remaining RAM. // // We are going to do it in the ram_minimum increments to avoid ending up // with odd amounts (while Linux can probably grok anything, who knows about // Windows). // // To make the distribution fair we are going to count how many times we // have increased each machine's allocation (the rnds vector). // for (size_t a (1); ram != 0; ) // Allocation round. { // Find a machine that would be satisfied with the least amount of RAM but // which hasn't yet been given anything on this allocation round. // size_t min_i; // Min index. size_t min_v (0); // Min value. // We are done if we couldn't give out any RAM and haven't seen any // machines that have already been given something on this allocation // round. // bool done (true); for (size_t i (0); i != rams.size (); ++i) { if (rnds[i] != a) { const machine_header_manifest& mm (*mms[i]); size_t o (rams[i]); size_t v (effective_ram_minimum (mm)); // Don't allocate past maximum. // if (mm.ram_maximum && *mm.ram_maximum < o + v) { v = *mm.ram_maximum - o; if (v == 0) continue; } if (v <= ram && (min_v == 0 || min_v > v)) { min_i = i; min_v = v; } } else done = false; } if (min_v != 0) { rnds[min_i] = a; rams[min_i] += min_v; ram -= min_v; } else { if (done) break; ++a; // Next allocation round. } } return rams; } // Stop all the auxiliary machines and clear the passed list. // static void stop_auxiliary_machines (auxiliary_machine_results& amrs) { tracer trace ("stop_auxiliary_machines"); if (!amrs.empty ()) { // Do it in two passes to make sure all the machines are at least down. // for (const auxiliary_machine_result& amr: amrs) { if (amr.machine != nullptr) { try {amr.machine->forcedown (false);} catch (const failed&) {} } } // Make sure we don't retry the above even if the below fails. // auxiliary_machine_results tmp; tmp.swap (amrs); for (const auxiliary_machine_result& amr: tmp) { if (amr.machine != nullptr) { amr.machine->cleanup (); run_btrfs (trace, "subvolume", "delete", amr.snapshot); } } } } // Start all the auxiliary machines and patch in their combined environment // into tm.auxiliary_environment. // // Return the started machines or empty list if any of them failed to start up // (which means this function should only be called for non-empty ams). // // Note that the order of auxiliary machines in ams may not match that in // tm.auxiliary_machines. // static auxiliary_machine_results start_auxiliary_machines (const vector& ams, task_manifest& tm, const string& in_name, // - const dir_path& tftp_put_dir, optional boost_cpus) { tracer trace ("start_auxiliary_machines"); size_t n (tm.auxiliary_machines.size ()); assert (n != 0 && ams.size () == n); auxiliary_machine_results amrs; // Divide the auxiliary RAM among the machines. // vector rams; { vector mms; mms.reserve (n); for (bootstrapped_machine* am: ams) mms.push_back (&am->manifest.machine); rams = divide_auxiliary_ram (mms); if (rams.empty ()) return amrs; if (verb > 3) // l3 for (size_t i (0); i != n; ++i) trace << mms[i]->name << " allocated " << rams[i] << "KiB"; } // Start the machines. // // Let's use the order in which they were specified in the task manifest // (which will naturally be the order in which they are specified in the // package manifest). This way amrs and tm.auxiliary_machines will be // parallel. // string envs; // Combined environments. auto amg ( make_exception_guard ( [&amrs] () { if (!amrs.empty ()) { info << "trying to force auxiliary machines down"; stop_auxiliary_machines (amrs); } })); for (size_t i (0); i != n; ++i) { const auxiliary_machine& tam (tm.auxiliary_machines[i]); auto b (ams.begin ()), e (ams.end ()); auto j (find_if (b, e, [&tam] (const bootstrapped_machine* m) { return m->manifest.machine.name == tam.name; })); assert (j != e); // Note: can throw interrupt. // pair p ( start_auxiliary_machine (**j, tam.environment_name, i + 1, rams[j - b], // Parallel to ams. in_name, tftp_put_dir, boost_cpus)); if (p.first.machine == nullptr) { if (!amrs.empty ()) { info << "trying to force auxiliary machines down"; stop_auxiliary_machines (amrs); // amrs is now empty. } return amrs; } amrs.push_back (move (p.first)); // Add the machine name as a header before its environment. // if (i != 0) envs += '\n'; envs += "# "; envs += tam.name; envs += '\n'; envs += "#\n"; envs += p.second; // Always includes trailing newline. } tm.auxiliary_environment = move (envs); return amrs; } struct perform_task_result { auto_rmdir work_dir; // /build/-/ result_manifest manifest; // Uploaded archive, if any (somewhere inside work_dir). // optional upload_archive; // Create the special empty result. // perform_task_result () = default; // Create task result without build artifacts. // explicit perform_task_result (auto_rmdir&& d, result_manifest&& m) : work_dir (move (d)), manifest (move (m)) {} // Create task result with build artifacts. // perform_task_result (auto_rmdir&& d, result_manifest&& m, path&& a) : work_dir (move (d)), manifest (move (m)), upload_archive (move (a)) {} }; // Note that the task manifest is not const since we may need to patch in the // auxiliary_environment value. // static perform_task_result perform_task (toolchain_lock tl, // Note: assumes ownership. bootstrapped_machine& bm, // Build machine. const vector& ams, // Auxiliary machines. task_manifest& tm, optional boost_cpus) try { tracer trace ("perform_task", bm.path.string ().c_str ()); // Arm the interrupt handler and release the global toolchain lock. // // Note that there can be no interrupt while we are holding the global lock. // sigurs1.store (0, std::memory_order_release); tl.unlock (); machine_lock& ml (bm.lock); const dir_path& md (bm.path); const bootstrapped_machine_manifest& mm (bm.manifest); const string in_name (tc_name + '-' + to_string (inst)); auto_rmdir arm ((dir_path (ops.tftp ()) /= "build") /= in_name); try_mkdir_p (arm.path); result_manifest r { tm.name, tm.version, result_status::abort, operation_results {}, nullopt /* worker_checksum */, nullopt /* dependency_checksum */}; if (ops.fake_build ()) return perform_task_result (move (arm), move (r)); // The overall plan is as follows: // // 1. Snapshot the (bootstrapped) build machine. // // 2. Save the task manifest to the TFTP directory (to be accessed by the // worker). // // 3. Start the TFTP server and the machine. // // 4. Serve TFTP requests while watching out for the result manifest and // interrupts. // // 5. Clean up (force the machine down and delete the snapshot). // // If the task requires any auxiliary machines, then for each such machine // perform the following steps 1-3 before step 1 above, and step 4 after // step 5 above (that is, start all the auxiliary machines before the build // machine and clean them up after): // // 1. Snapshot the (bootstrapped) auxiliary machine. // // 2. Start the TFTP server and the machine. // // 3. Handle TFTP upload requests until received the environment upload. // // 4. Clean up (force the machine down and delete the snapshot). // TFTP server mapping (server chroot is --tftp): // // GET requests to .../build/-/get/* // PUT requests to .../build/-/put/* // dir_path gd (dir_path (arm.path) /= "get"); dir_path pd (dir_path (arm.path) /= "put"); try_mkdir_p (gd); try_mkdir_p (pd); path tf (gd / "task.manifest"); // Task manifest file. path rf (pd / "result.manifest.lz4"); // Result manifest file. path af (pd / "upload.tar"); // Archive of build artifacts to upload. if (ops.fake_machine_specified ()) { // Note: not handling interrupts here. Nor starting any auxiliary // machines, naturally. serialize_manifest (tm, tf, "task"); // Simply wait for the file to appear. // for (size_t i (0);; sleep (1)) { if (file_not_empty (rf)) { // Wait a bit to make sure we see complete manifest. // sleep (2); break; } if (i++ % 10 == 0) l3 ([&]{trace << "waiting for result manifest";}); } r = parse_manifest (rf, "result"); } else { // Start the auxiliary machines if any. // // If anything goes wrong, force them all down (failed that, the machine // destructor will block waiting for their exit). // auxiliary_machine_results amrs; auto amg ( make_exception_guard ( [&amrs] () { if (!amrs.empty ()) { info << "trying to force auxiliary machines down"; stop_auxiliary_machines (amrs); } })); if (!ams.empty ()) { amrs = start_auxiliary_machines (ams, tm, in_name, pd, boost_cpus); if (amrs.empty ()) return perform_task_result (move (arm), move (r)); // Abort. } // Note: tm.auxiliary_environment patched in by start_auxiliary_machines(). // serialize_manifest (tm, tf, "task"); // Start the build machine and perform the build. // try_rmfile (rf); try_rmfile (af); // -- // const dir_path xp (snapshot_path (md)); for (size_t retry (0);; ++retry) { if (retry != 0) run_btrfs (trace, "subvolume", "delete", xp); run_btrfs (trace, "subvolume", "snapshot", md, xp); // Start the TFTP server. // tftp_server tftpd ("Gr ^/?(.+)$ /build/" + in_name + "/get/\\1\n" + "Pr ^/?(.+)$ /build/" + in_name + "/put/\\1\n", ops.tftp_port () + offset + 0 /* build machine */); l3 ([&]{trace << "tftp server on port " << tftpd.port ();}); // Note: the machine handling logic is similar to bootstrap. Except here // we have to cleanup the snapshot ourselves in case of suspension or // unexpected exit. // // NOTE: see similar code in start_auxiliary_machine() above. // { // Start the machine. // unique_ptr m ( start_machine (xp, mm.machine, 0 /* machine_num (build) */, boost_cpus ? *boost_cpus : ops.cpu (), ops.build_ram (), mm.machine.mac, ops.bridge (), tftpd.port (), tm.interactive.has_value () /* public_vnc */)); auto mg ( make_exception_guard ( [&m, &xp] () { if (m != nullptr) { info << "trying to force machine " << xp << " down"; try {m->forcedown (false);} catch (const failed&) {} } })); auto soft_fail = [&trace, &amrs, &ml, &xp, &m, &arm, &r] (const char* msg) { { diag_record dr (error); dr << msg << " for machine " << xp << ", suspending"; m->print_info (dr); } // What should we do about auxiliary machines? We could force them // all down before suspending (and thus freeing them for use). That // is the easy option. We could suspend them as well, but that feels // like it will be a pain (will need to resume all of them when done // investigating). Theoretically we could just let them run, but // that won't play well with our interrupt logic since someone may // attempt to interrupt us via one of them. So let's do easy for // now. // // Note: always stop/suspend the build machine before the auxiliary // machines to avoid any errors due the auxiliary machines being // unavailable. try { // Update the information in the machine lock to signal that the // machine is suspended and cannot be interrupted. // ml.suspend_task (); m->suspend (false); stop_auxiliary_machines (amrs); m->wait (false); m->cleanup (); m = nullptr; // Disable exceptions guard above. run_btrfs (trace, "subvolume", "delete", xp); info << "resuming after machine suspension"; } catch (const failed&) {} return perform_task_result (move (arm), move (r)); }; auto check_machine = [&xp, &m] () { try { size_t t (0); if (!m->wait (t /* seconds */, false /* fail_hard */)) return true; } catch (const failed&) {} diag_record dr (warn); dr << "machine " << xp << " exited unexpectedly"; m->print_info (dr); return false; }; auto check_auxiliary_machines = [&amrs] () { for (auxiliary_machine_result& amr: amrs) { try { size_t t (0); if (!amr.machine->wait (t /* seconds */, false /* fail_hard */)) continue; } catch (const failed&) {} diag_record dr (warn); dr << "machine " << amr.snapshot << " exited unexpectedly"; amr.machine->print_info (dr); return false; } return true; }; auto check_interrupt = [&trace, &amrs, &xp, &m] () { if (sigurs1.load (std::memory_order_consume) == 0) return; l2 ([&]{trace << "machine " << xp << " interruped";}); try {m->forcedown (false);} catch (const failed&) {} stop_auxiliary_machines (amrs); m->cleanup (); m = nullptr; // Disable exceptions guard above. run_btrfs (trace, "subvolume", "delete", xp); throw interrupt (); }; // The first request should be the task manifest download. Wait for up // to 4 minutes (by default) for that to arrive (again, that long to // deal with flaky Windows networking, etc). In a sense we use it as // an indication that the machine has booted and the worker process // has started. // size_t to; const size_t startup_to (ops.build_startup ()); const size_t build_to (tm.interactive ? ops.intactive_timeout () : ops.build_timeout ()); // Wait periodically making sure the machine is still alive and // checking for interrupts. // for (to = startup_to; to != 0; ) { check_interrupt (); if (tftpd.serve (to, 2)) break; bool bm; // Build machine still running. if (!(bm = check_machine ()) || !check_auxiliary_machines ()) { if (bm) try {m->forcedown (false);} catch (const failed&) {} stop_auxiliary_machines (amrs); m = nullptr; // Disable exceptions guard above. run_btrfs (trace, "subvolume", "delete", xp); return perform_task_result (move (arm), move (r)); } } if (to == 0) { if (retry > ops.build_retries ()) return soft_fail ("build startup timeout"); // Note: keeping the logs behind (no cleanup). diag_record dr (warn); dr << "machine " << mm.machine.name << " mis-booted, retrying"; m->print_info (dr); try {m->forcedown (false);} catch (const failed&) {} m = nullptr; // Disable exceptions guard above. continue; } l3 ([&]{trace << "completed startup in " << startup_to - to << "s";}); // Next the worker builds things and then uploads optional archive of // build artifacts and the result manifest afterwards. So on our side // we serve TFTP requests while checking for the manifest file. To // workaround some obscure filesystem races (the file's mtime/size is // updated several seconds later; maybe tmpfs issue?), we periodically // re-check. // for (to = build_to; to != 0; ) { check_interrupt (); if (tftpd.serve (to, 2)) continue; bool bm; // Build machine still running. if (!(bm = check_machine ()) || !check_auxiliary_machines ()) { if (bm || !file_not_empty (rf)) { if (bm) try {m->forcedown (false);} catch (const failed&) {} stop_auxiliary_machines (amrs); m = nullptr; // Disable exceptions guard above. run_btrfs (trace, "subvolume", "delete", xp); return perform_task_result (move (arm), move (r)); } } if (file_not_empty (rf)) { if (!tftpd.serve (to, 5)) break; } } if (to != 0) { l3 ([&]{trace << "completed build in " << build_to - to << "s";}); // Parse the result manifest. // try { r = parse_manifest (rf, "result", false); } catch (const failed&) { r.status = result_status::abnormal; // Soft-fail below. } } else { // Suspend the machine for non-interactive builds and fall through // to abort for interactive (i.e., "the user went for lunch" case). // if (!tm.interactive) return soft_fail ("build timeout"); } if (r.status == result_status::abnormal) { // If the build terminated abnormally, suspend the machine for // investigation. // return soft_fail ("build terminated abnormally"); } else { // Force the machine down (there is no need wasting time on clean // shutdown since the next step is to drop the snapshot). Also fail // softly if things go badly. // // One thing to keep in mind are DHCP leases: with this approach // they will not be released. However, since we reuse the same MAC // address since bootstrap, on the next build we should get the same // lease instead of a new one. // try {m->forcedown (false);} catch (const failed&) {} stop_auxiliary_machines (amrs); m->cleanup (); m = nullptr; // Disable exceptions guard above. } } run_btrfs (trace, "subvolume", "delete", xp); break; } } // Update package name/version if the returned value is "unknown". // if (r.version == bpkg::version ("0")) { assert (r.status == result_status::abnormal); r.name = tm.name; r.version = tm.version; } return (!r.status || !file_exists (af) ? perform_task_result (move (arm), move (r)) : perform_task_result (move (arm), move (r), move (af))); } catch (const system_error& e) { fail << "build error: " << e << endf; } static const string agent_checksum ("2"); // Logic version. int main (int argc, char* argv[]) try { cli::argv_scanner scan (argc, argv, true); ops.parse (scan); verb = ops.verbose (); #if 0 // ./bbot-agent --auxiliary-ram 4194304 // machine_header_manifest m1 { "m1", "m1", "m1", machine_role::auxiliary, 512*1024, nullopt}; machine_header_manifest m2 { "m2", "m2", "m2", machine_role::auxiliary, 1024*1024, 3*512*1024}; vector mms {&m1, &m2}; vector rams (divide_auxiliary_ram (mms)); for (size_t i (0); i != rams.size (); ++i) text << mms[i]->name << ' ' << rams[i] / 1024; return 0; #endif // @@ systemd 231 added JOURNAL_STREAM environment variable which allows // detecting if stderr is connected to the journal. // if (ops.systemd_daemon ()) systemd_diagnostics (true); // With critical errors. tracer trace ("main"); uid = getuid (); uname = getpwuid (uid)->pw_name; // Obtain our hostname. // { char buf[HOST_NAME_MAX + 1]; if (gethostname (buf, sizeof (buf)) == -1) fail << "unable to obtain hostname: " << system_error (errno, std::generic_category ()); // Sanitize. hname = buf; } // Obtain our IP address as a first discovered non-loopback IPv4 address. // // Note: Linux-specific implementation. // { ifaddrs* i; if (getifaddrs (&i) == -1) fail << "unable to obtain IP addresses: " << system_error (errno, std::generic_category ()); // Sanitize. unique_ptr deleter (i, freeifaddrs); for (; i != nullptr; i = i->ifa_next) { sockaddr* sa (i->ifa_addr); if (sa != nullptr && // Configured. (i->ifa_flags & IFF_LOOPBACK) == 0 && // Not a loopback interface. (i->ifa_flags & IFF_UP) != 0 && // Up. sa->sa_family == AF_INET) // Ignore IPv6 for now. { char buf[INET_ADDRSTRLEN]; // IPv4 address. if (inet_ntop (AF_INET, &reinterpret_cast (sa)->sin_addr, buf, sizeof (buf)) == nullptr) fail << "unable to obtain IPv4 address: " << system_error (errno, std::generic_category ()); // Sanitize. hip = buf; break; } } if (hip.empty ()) fail << "no IPv4 address configured"; } // On POSIX ignore SIGPIPE which is signaled to a pipe-writing process if // the pipe reading end is closed. Note that by default this signal // terminates a process. Also note that there is no way to disable this // behavior on a file descriptor basis or for the write() function call. // if (signal (SIGPIPE, SIG_IGN) == SIG_ERR) fail << "unable to ignore broken pipe (SIGPIPE) signal: " << system_error (errno, std::generic_category ()); // Sanitize. // Version. // if (ops.version ()) { cout << "bbot-agent " << BBOT_VERSION_ID << endl << "libbbot " << LIBBBOT_VERSION_ID << endl << "libbpkg " << LIBBPKG_VERSION_ID << endl << "libbutl " << LIBBUTL_VERSION_ID << endl << "Copyright (c) " << BBOT_COPYRIGHT << "." << endl << "This is free software released under the MIT license." << endl; return 0; } // Help. // if (ops.help ()) { pager p ("bbot-agent help", false); print_bbot_agent_usage (p.stream ()); // If the pager failed, assume it has issued some diagnostics. // return p.wait () ? 0 : 1; } tc_name = ops.toolchain_name (); tc_num = ops.toolchain_num (); if (ops.toolchain_lock_specified ()) { const string& l (ops.toolchain_lock ()); if (!l.empty ()) { tc_lock = path (l); if (!tc_lock.absolute ()) fail << "--toolchain-lock value '" << l << "' should be absolute path"; } } else if (!(ops.fake_bootstrap () || ops.fake_build () || ops.fake_machine_specified () || ops.fake_request_specified ())) tc_lock = path ("/var/lock/bbot-agent-" + tc_name + ".lock"); tc_ver = (ops.toolchain_ver_specified () ? ops.toolchain_ver () : standard_version (BBOT_VERSION_STR)); tc_id = ops.toolchain_id (); if (tc_num == 0 || tc_num > 9) fail << "--toolchain-num value " << tc_num << " out of range"; inst = ops.instance (); if (inst == 0 || inst > 99) fail << "--instance value " << inst << " out of range"; inst_max = ops.instance_max (); // The last decimal position is used for machine number, 0 for the build // machine, non-0 for auxiliary machines (of which we can have maximum 9). // offset = (tc_num - 1) * 1000 + inst * 10; // Controller priority to URLs map. // std::map controllers; for (int i (1); i != argc; ++i) { // [=] // string a (argv[i]); // See if we have priority, falling back to priority 0 if absent. // uint64_t prio (0); // Note that we can also have `=` in (e.g., parameters) so we will // only consider `=` as ours if prior to it we only have digits. // size_t p (a.find ('=')); if (p != string::npos && a.find_first_not_of ("0123456789") == p) { // Require exactly four or five digits in case we later need to extend // the priority levels beyond the 10 possible values (e.g., DDCCBBAA). // if (p != 4 && p != 5) fail << "four or five-digit controller url priority expected in '" << a << "'"; char* e; errno = 0; prio = strtoull (a.c_str (), &e, 10); assert (errno != ERANGE && e == a.c_str () + p); if (prio > 19999) fail << "out of bounds controller url priority in '" << a << "'"; a.erase (0, p + 1); } controllers[prio].push_back (move (a)); } if (controllers.empty ()) { if (ops.dump_machines () || ops.fake_request_specified ()) { controllers[0].push_back ("https://example.org"); } else fail << "controller url expected" << info << "run " << argv[0] << " --help for details"; } // Handle SIGHUP and SIGTERM. // if (signal (SIGHUP, &handle_signal) == SIG_ERR || signal (SIGTERM, &handle_signal) == SIG_ERR || signal (SIGUSR1, &handle_signal) == SIG_ERR) fail << "unable to set signal handler: " << system_error (errno, std::generic_category ()); // Sanitize. optional fingerprint; if (ops.auth_key_specified ()) try { // Note that the process always prints to STDERR, so we redirect it to the // null device. We also check for the key file existence to print more // meaningful error message if that's not the case. // if (!file_exists (ops.auth_key ())) throw_generic_error (ENOENT); openssl os (trace, ops.auth_key (), path ("-"), fdopen_null (), ops.openssl (), "rsa", ops.openssl_option (), "-pubout", "-outform", "DER"); fingerprint = sha256 (os.in).string (); os.in.close (); if (!os.wait ()) throw_generic_error (EIO); } catch (const system_error& e) { fail << "unable to obtain authentication public key: " << e; } if (ops.systemd_daemon ()) { diag_record dr; dr << info << "bbot agent " << BBOT_VERSION_ID; dr << info << "cpu(s) " << ops.cpu () << info << "build ram(KiB) " << ops.build_ram () << info << "auxil ram(KiB) " << ops.auxiliary_ram () << info << "bridge " << ops.bridge (); if (fingerprint) dr << info << "auth key fp " << *fingerprint; dr << info << "interactive " << to_string (ops.interactive()) << info << "toolchain name " << tc_name << info << "toolchain num " << tc_num << info << "toolchain ver " << tc_ver.string () << info << "toolchain id " << tc_id << info << "instance num " << inst; if (inst_max != 0) dr << info << "instance max " << inst_max; // Note: keep last since don't restore fill/setw. // for (const pair& p: controllers) { for (const string& u: p.second) { dr.os.fill ('0'); dr << info << "controller url " << std::setw (4) << p.first << '=' << u; } } } // The work loop. The steps we go through are: // // 1. Enumerate the available machines, (re-)bootstrapping any if necessary. // // 2. Poll controller(s) for build tasks. // // 3. If no build tasks are available, go to #1 (after sleeping a bit). // // 4. If a build task is returned, do it, upload the result, and go to #1 // (immediately). // // NOTE: consider updating agent_checksum if making any logic changes. // auto rand_sleep = [] () { return std::uniform_int_distribution (50, 60) (rand_gen); }; optional imode; optional ilogin; if (ops.interactive () != interactive_mode::false_) { imode = ops.interactive (); ilogin = machine_vnc (0 /* machine_num */, true /* public */); } // Use the pkeyutl openssl command for signing the task response challenge // if openssl version is greater or equal to 3.0.0 and the rsautl command // otherwise. // // Note that openssl 3.0.0 deprecates rsautl in favor of pkeyutl. // const char* sign_cmd; try { optional oi (openssl::info (trace, 2, ops.openssl ())); sign_cmd = oi && oi->name == "OpenSSL" && oi->version >= semantic_version {3, 0, 0} ? "pkeyutl" : "rsautl"; } catch (const system_error& e) { fail << "unable to obtain openssl version: " << e << endf; } for (unsigned int sleep (0);; ::sleep (sleep), sleep = 0) { pair er ( enumerate_machines (ops.machines ())); toolchain_lock& tl (er.first); bootstrapped_machines& ms (er.second); // Determine the existing task priority range (using [0,0] if there are // none) as well as whether we we should operate in the priority monitor // mode. // uint64_t prio_min (~uint64_t (0)); uint64_t prio_max (0); bool prio_mon (false); { uint16_t busy (0); // Number of build machines locked by other processes. bool task (false); // There is a build machine performing a task. for (const bootstrapped_machine& m: ms) { if (!m.lock.locked ()) { if (m.lock.prio) // Not bootstrapping/suspended. { if (m.manifest.machine.effective_role () != machine_role::auxiliary) { ++busy; task = true; if (prio_min > *m.lock.prio) prio_min = *m.lock.prio; if (prio_max < *m.lock.prio) prio_max = *m.lock.prio; } } else ++busy; // Assume build machine (see enumerate_machines()). } } if (prio_min > prio_max) // No tasks. prio_min = prio_max; if (inst_max != 0) { assert (busy <= inst_max); if (busy == inst_max) { if (!task) // All bootstrapping/suspended. { sleep = rand_sleep (); continue; } l2 ([&]{trace << "priority monitor, range [" << prio_min << ", " << prio_max << "]";}); prio_mon = true; } } } // If we get a task, these contain all the corresponding information. // task_request_manifest tq; task_response_manifest tr; uint64_t prio; string url; // Iterate over controller priorities in reverse, that is, from highest to // lowest (see the agent(1) man page for background on the priority // levels). // // The following factors determine the lower bound of priorities we should // consider: // // 1. If in the priority monitor mode, then we should only consider // priorities that can interrupt the existing task with the lowest // priority. // // Here is a representative sample of existing/interrupt priorities // from which we derive the below formulae (remember that we only start // interrupting from priority level 3): // // existing interrupt // -------- --------- // 5 >= 100 // 55 >= 100 // 555 >= 600 // 999 >= 1000 // 5055 >= 5100 // 5555 >= 5600 // 9999 >= 10000 // // Essentially, what we need to do is discard the lowest 2 levels and // add 100, moving the priority to the next 3rd level. // // 2. Otherwise, we should factor in the "don't ask for lower-priority // tasks" semantics that applies from the second priority level. // // Note also that the other half of this logic is below where we determine // which machines we offer for each priority. // auto ce (controllers.end ()); auto cb (controllers.lower_bound ( prio_mon ? ((prio_min / 100) * 100) + 100 : prio_max >= 10 ? prio_max - 1 : // Including this priority. 0)); // Any priority. for (; cb != ce; ) { const pair& pu (*--ce); prio = pu.first; const strings& urls (pu.second); // Prepare task request (it will be the same within a given priority). // tq = task_request_manifest { hname, tc_name, tc_ver, imode, ilogin, fingerprint, ops.auxiliary_ram (), machine_header_manifests {}}; // Determine which machines we need to offer for this priority. // bool aux_only (true); // Only auxiliary machines are available. { bool interruptable (false); // There is build machine we can interrupt. for (const bootstrapped_machine& m: ms) { const machine_manifest& mm (m.manifest.machine); machine_role role (mm.effective_role ()); if (!m.lock.locked ()) { if (!m.lock.prio) // Skip bootstrapping/suspended. continue; uint64_t eprio (*m.lock.prio); // Determine if our priority can interrupt the existing task. // // Based on the above discussion of the priority lower bound // determination (and some menditation) it's clear that we can // only interrupt the existing task if our priority is (at least) // on a higher 3rd level. // if ((prio / 100) <= (eprio / 100)) continue; if (role != machine_role::auxiliary) interruptable = true; } tq.machines.emplace_back (mm.id, mm.name, mm.summary, role, effective_ram_minimum (mm), mm.ram_maximum); aux_only = aux_only && role == machine_role::auxiliary; } // Sanity check: in the priority monitor mode we should only ask for a // task if we can interrupt one (this should be taken care of by the // priority lower bound calculation above). // assert (!prio_mon || interruptable); } if (ops.dump_machines ()) { for (const machine_header_manifest& m: tq.machines) serialize_manifest (m, cout, "stdout", "machine"); return 0; } if (aux_only) tq.machines.clear (); if (tq.machines.empty ()) { // If we have no build machines for this priority then we won't have // any for any lower priority so bail out. // break; } // Send task requests. // // Note that we have to do it while holding the lock on all the machines // since we don't know which machine(s) we will need. // vector rurls (urls.size ()); std::iota (rurls.begin (), rurls.end (), urls.begin ()); std::shuffle (rurls.begin (), rurls.end (), rand_gen); for (strings::const_iterator i: rurls) { const string& u (*i); if (ops.fake_request_specified ()) { auto t (parse_manifest (ops.fake_request (), "task")); tr = task_response_manifest { "fake-session", // Dummy session. nullopt, // No challenge. string (), // Empty result URL. vector (), agent_checksum, move (t)}; url = u; break; } task_response_manifest r; try { http_curl c (trace, path ("-"), path ("-"), curl::post, u, "--header", "Content-Type: text/manifest", "--retry", ops.request_retries (), "--retry-max-time", ops.request_timeout (), "--max-time", ops.request_timeout (), "--connect-timeout", ops.connect_timeout ()); // This is tricky/hairy: we may fail hard parsing the output before // seeing that curl exited with an error and failing softly. // bool f (false); try { serialize_manifest (tq, c.out, u, "task request", false /* fail_hard */); } catch (const failed&) {f = true;} c.out.close (); if (!f) try { r = parse_manifest ( c.in, u, "task response", false); } catch (const failed&) {f = true;} c.in.close (); if (!c.wait () || f) throw_generic_error (EIO); } catch (const system_error& e) { error << "unable to request task from " << u << ": " << e; continue; } if (r.challenge && !fingerprint) // Controller misbehaves. { error << "unexpected challenge from " << u << ": " << *r.challenge; continue; } if (!r.session.empty ()) // Got a task. { const task_manifest& t (*r.task); // For security reasons let's require the repository location to // be remote. // if (t.repository.local ()) { error << "local repository from " << u << ": " << t.repository; continue; } // Make sure that the task interactivity matches the requested mode. // if (( t.interactive && !imode) || (!t.interactive && imode && *imode == interactive_mode::true_)) { if (t.interactive) error << "interactive task from " << u << ": " << *t.interactive; else error << "non-interactive task from " << u; continue; } l2 ([&]{trace << "task for " << t.name << '/' << t.version << " " << "on " << t.machine << " " << "from " << u << " " << "priority " << prio;}); tr = move (r); url = u; break; } } // url loop. if (!tr.session.empty ()) // Got a task. break; } // prio loop. if (tq.machines.empty ()) // No machines (auxiliary-only already handled). { // Normally this means all the machines are busy so sleep a bit less. // l2 ([&]{trace << "all machines are busy, sleeping";}); sleep = rand_sleep () / 2; continue; } if (tr.session.empty ()) // No task from any of the controllers. { l2 ([&]{trace << "no tasks from any controllers, sleeping";}); sleep = rand_sleep (); continue; } // We have a build task. // task_manifest& t (*tr.task); // First verify the requested machines are from those we sent in tq and // their roles match. // // Also verify the same machine is not picked multiple times by blanking // out the corresponding entry in tq.machines. (Currently we are only // capable of running one instance of each machine though we may want to // relax that in the future, at which point we should send as many entries // for the same machine in the task request as we are capable of running, // applying the priority logic for each entry, etc). // { auto check = [&tq, &url] (const string& name, machine_role r) { auto i (find_if (tq.machines.begin (), tq.machines.end (), [&name] (const machine_header_manifest& mh) { return mh.name == name; // Yes, names, not ids. })); if (i == tq.machines.end ()) { error << "task from " << url << " for unknown machine " << name; return false; } if (i->effective_role () != r) { error << "task from " << url << " with mismatched role " << " for machine " << name; return false; } i->name.clear (); // Blank out. return true; }; auto check_aux = [&check] (const vector& ams) { for (const auxiliary_machine& am: ams) if (!check (am.name, machine_role::auxiliary)) return false; return true; }; if (!check (t.machine, machine_role::build) || !check_aux (t.auxiliary_machines)) { if (ops.dump_task ()) return 0; continue; } } // Also verify there are no more than 9 auxiliary machines (see the offset // global variable for details). // if (t.auxiliary_machines.size () > 9) { error << "task from " << url << " with more than 9 auxiliary machines"; if (ops.dump_task ()) return 0; continue; } if (ops.dump_task ()) { serialize_manifest (t, cout, "stdout", "task"); return 0; } // If we have our own repository certificate fingerprints, then use them // to replace what we have received from the controller. // if (!ops.trust ().empty ()) t.trust = ops.trust (); // Reset the worker checksum if the task's agent checksum doesn't match // the current one. // // Note that since the checksums are hierarchical, such reset will trigger // resets of the "subordinate" checksums (dependency checksum, etc). // if (!tr.agent_checksum || *tr.agent_checksum != agent_checksum) t.worker_checksum = nullopt; // Handle interrupts. // // Note that the interrupt can be triggered both by another process (the // interrupt exception is thrown from perform_task()) as well as by this // process in case we were unable to interrupt the other process (seeing // that we have already received a task, responding with an interrupt // feels like the most sensible option). // perform_task_result r; bootstrapped_machine* pm (nullptr); // Build machine. vector ams; // Auxiliary machines. try { // First find the bootstrapped_machine instance in ms corresponding to // the requested build machine. Also unlock all the other machines. // // While at it also see if we need to interrupt the selected machine (if // busy), one of the existing (if we are at the max allowed instances, // that is in the priority monitor mode), or all existing (if this is a // priority level 4 task). // // Auxiliary machines complicate the matter a bit: we may now need to // interrupt some subset of {build machine, auxiliary machines} that are // necessary to perform this task. Note, however, that auxiliary // machines are always subordinate to build machines, meaning that if // there is a busy auxiliary machine, then there will be a busy build // machine with the same pid/priority (and so if we interrup one // auxiliary, then we will also interrupt the corresponding build plus // any other auxiliaries it may be using). Based on that let's try to // divide and conquer this by first dealing with build machines and then // adding any auxiliary ones. // vector ims; // Machines to be interrupted. size_t imt (0); // Number of "target" machines to interrupt (see below). // First pass: build machines. // for (bootstrapped_machine& m: ms) { const machine_manifest& mm (m.manifest.machine); if (mm.effective_role () == machine_role::auxiliary) continue; if (mm.name == t.machine) { assert (pm == nullptr); // Sanity check. pm = &m; } else if (m.lock.locked ()) m.lock.unlock (); else if (m.lock.prio) // Not bootstrapping/suspended. { // Only consider machines that we can interrupt (see above). // if ((prio / 100) > (*m.lock.prio / 100)) { if (prio >= 1000) // Priority level 4 (interrupt all). ims.push_back (&m); else if (prio_mon) { // Find the lowest priority task to interrupt. // if (ims.empty ()) ims.push_back (&m); else if (*m.lock.prio < *ims.back ()->lock.prio) ims.back () = &m; } } } } assert (pm != nullptr); // Sanity check. if (!pm->lock.locked ()) { assert (pm->lock.prio); // Sanity check (not bootstrapping/suspended). if (prio >= 1000) ims.insert (ims.begin (), pm); // Interrupt first (see below). else ims = {pm}; imt++; } // Second pass: auxiliary machines. // for (bootstrapped_machine& m: ms) { const machine_manifest& mm (m.manifest.machine); if (mm.effective_role () != machine_role::auxiliary) continue; if (find_if (t.auxiliary_machines.begin (), t.auxiliary_machines.end (), [&mm] (const auxiliary_machine& am) { return am.name == mm.name; }) != t.auxiliary_machines.end ()) { if (!m.lock.locked ()) { assert (m.lock.prio); // Sanity check (not bootstrapping/suspended). if (ims.empty ()) { ims.push_back (&m); } else if (ims.front () == pm) { ims.insert (ims.begin () + 1, &m); // Interrupt early (see below). } else if (prio < 1000 && prio_mon && ams.empty () /* first */) { // Tricky: replace the lowest priority task we have picked on // the first pass with this one. // assert (ims.size () == 1); // Sanity check. ims.back () = &m; } else ims.insert (ims.begin (), &m); // Interrupt first (see below). imt++; } ams.push_back (&m); } else if (m.lock.locked ()) m.lock.unlock (); } // Note: the order of machines may not match. // assert (ams.size () == t.auxiliary_machines.size ()); // Sanity check. assert (!prio_mon || !ims.empty ()); // We should have at least one. // Move the toolchain lock into this scope so that it's automatically // released on any failure (on the happy path it is released by // perform_task()). // toolchain_lock& rtl (tl); toolchain_lock tl (move (rtl)); // Interrupt the machines, if necessary. // // Note that if we are interrupting multiple machines, then the target // build machine, if needs to be interrupted, must be first, followed // but all the target auxiliary machines. This way if we are unable to // successfully interrupt them, we don't interrupt the rest. // vector pids; // Avoid re-interrupting the same pid. for (size_t i (0); i != ims.size (); ++i) { bootstrapped_machine* im (ims[i]); // Sanity checks. // assert (!im->lock.locked () && im->lock.prio); assert (im != pm || i == 0); const dir_path& tp (im->path); // - path. pid_t pid (im->lock.pid); l2 ([&]{trace << "interrupting " << (i < imt ? "target" : "lower priority") << " machine " << tp << ", pid " << pid;}); // The plan is to send the interrupt and then wait for the lock. // // Note that the interrupt cannot be "lost" (or attributed to a // different task) since we are sending it while holding the global // lock and the other process arms it also while holding the global // lock. // // But what can happen is the other task becomes suspended, which we // will not be able to interrupt. // if (find (pids.begin (), pids.end (), pid) == pids.end ()) { if (kill (pid, SIGUSR1) == -1) { // Ignore the case where there is no such process (e.g., the other // process has terminated in which case the lock should be // released automatically). // if (errno != ESRCH) throw_generic_error (errno); } pids.push_back (pid); } // If we are interrupting additional machine in order to free up // resources, there is no use acquiring their lock (or failing if // unable to) since this is merely a performance optimization. // if (i >= imt) continue; // Try to lock the machine. // // While this normally shouldn't take long, there could be parts of // the perform_task() logic that we do not interrupt and that may take // some time. // machine_lock ml; size_t retry (0); for (; retry != 31; ++retry) { if (retry != 0) ::sleep (1); ml = lock_machine (tl, tp); if (ml.locked ()) break; if (ml.pid != pid) { error << "interrupted machine " << tp << " changed pid"; throw interrupt (); } if (!ml.prio) // Got suspended. { l2 ([&]{trace << "interrupted machine " << tp << " suspended";}); throw interrupt (); } } if (!ml.locked ()) { warn << "unable to lock interrupted machine " << tp << " within " << (retry - 1) << "s"; throw interrupt (); } // This is an interrupted machine (build or auxiliary) that we will be // using. See if it needs a re-bootstrap, the same as in // enumerate_machines(). If not, then transfer the bootstrap manifest // and lock. // const machine_manifest& mm (im->manifest.machine); bootstrapped_machine_manifest bmm ( parse_manifest ( tp / "manifest", "bootstrapped machine")); bool rb (false); if (bmm.machine.id != mm.id) { l3 ([&]{trace << "re-bootstrap " << tp << ": new machine";}); rb = true; } if (im == pm) // Only for build machine. { if (!tc_id.empty () && bmm.toolchain.id != tc_id) { l3 ([&]{trace << "re-bootstrap " << tp << ": new toolchain";}); rb = true; } if (int i = compare_bbot (bmm.bootstrap)) { if (i < 0) { l3 ([&]{trace << "re-bootstrap " << tp << ": new bbot";}); rb = true; } else { l3 ([&]{trace << "ignoring " << tp << ": old bbot";}); rb = true; } } } // We are not going to try to re-bootstrap this machine "inline". // if (rb) throw interrupt (); im->manifest = move (bmm); im->lock = move (ml); } // Check if we need to boost the number of CPUs to the full hardware // concurrency. // optional bcpus; if (prio >= 10000) bcpus = std::thread::hardware_concurrency (); pm->lock.perform_task (tl, prio); // Build machine. for (bootstrapped_machine* am: ams) // Auxiliary machines. am->lock.perform_task (tl, prio); r = perform_task (move (tl), *pm, ams, t, bcpus); } catch (const interrupt&) { // Note: no work_dir. // r = perform_task_result ( auto_rmdir (), result_manifest { t.name, t.version, result_status::interrupt, operation_results {}, nullopt /* worker_checksum */, nullopt /* dependency_checksum */}); } // No need to hold the locks any longer. // if (pm != nullptr && pm->lock.locked ()) pm->lock.unlock (); for (bootstrapped_machine* am: ams) if (am->lock.locked ()) am->lock.unlock (); result_manifest& rm (r.manifest); if (ops.dump_result ()) { serialize_manifest (rm, cout, "stdout", "result"); return 0; } // Prepare the answer to the private key challenge. // optional> challenge; if (tr.challenge) try { assert (ops.auth_key_specified ()); openssl os (trace, fdstream_mode::text, path ("-"), 2, ops.openssl (), sign_cmd, ops.openssl_option (), "-sign", "-inkey", ops.auth_key ()); os.out << *tr.challenge; os.out.close (); challenge = os.in.read_binary (); os.in.close (); if (!os.wait ()) throw_generic_error (EIO); } catch (const system_error& e) { // The task response challenge is valid (verified by manifest parser), // so there must be something wrong with the setup and the failure is // fatal. // fail << "unable to sign task response challenge: " << e; } // Re-package the build artifacts, if present, into the type/instance- // specific archives and upload them to the type-specific URLs, if // provided. // // Note that the initial upload archive content is organized as a bunch of // upload///*, where the second level directories are the // upload types and the third level sub-directories are their instances. // The resulting .tar archives content (which is what we submit // to the type-specific handler) are organized as /*. // if (r.upload_archive && !tr.upload_urls.empty ()) { const path& ua (*r.upload_archive); // Extract the archive content into the parent directory of the archive // file. But first, make sure the resulting directory doesn't exist. // // Note that while we don't assume where inside the working directory // the archive is, we do assume that there is nothing clashing/precious // in the upload/ directory which we are going to cleanup. // dir_path d (ua.directory ()); const dir_path ud (d / dir_path ("upload")); try_rmdir_r (ud); try { process_exit pe ( process_run_callback ( trace, fdopen_null (), // Don't expect to read from stdin. 2, // Redirect stdout to stderr. 2, "tar", "-xf", ua, "-C", d)); if (!pe) fail << "tar " << pe; } catch (const system_error& e) { // There must be something wrong with the setup or there is no space // left on the disk, thus the failure is fatal. // fail << "unable to extract build artifacts from archive: " << e; } try_rmfile (ua); // Let's free up the disk space. // To decrease nesting a bit, let's collect the type-specific upload // directories and the corresponding URLs first. This way we can also // create the archive files as the upload/ directory sub-entries without // interfering with iterating over this directory. // vector> urls; try { for (const dir_entry& te: dir_iterator (ud, dir_iterator::no_follow)) { const string& t (te.path ().string ()); // Can only be a result of the worker malfunction, thus the failure // is fatal. // if (te.type () != entry_type::directory) fail << "unexpected filesystem entry '" << t << "' in " << ud; auto i (find_if (tr.upload_urls.begin (), tr.upload_urls.end (), [&t] (const upload_url& u) {return u.type == t;})); if (i == tr.upload_urls.end ()) continue; urls.emplace_back (ud / path_cast (te.path ()), i->url); } } catch (const system_error& e) { fail << "unable to iterate over " << ud << ": " << e; } // Now create archives and upload. // for (const pair& p: urls) { const dir_path& td (p.first); // / const string& url (p.second); try { for (const dir_entry& ie: dir_iterator (td, dir_iterator::no_follow)) { const string& i (ie.path ().string ()); // // Can only be a result of the worker malfunction, thus the // failure is fatal. // if (ie.type () != entry_type::directory) fail << "unexpected filesystem entry '" << i << "' in " << td; // Archive the upload instance files and, while at it, calculate // the resulting archive checksum. // sha256 sha; auto_rmfile ari (ud / (i + ".tar")); try { // Instruct tar to print the archive to stdout. // fdpipe in_pipe (fdopen_pipe (fdopen_mode::binary)); process pr ( process_start_callback ( trace, fdopen_null (), // Don't expect to read from stdin. in_pipe, 2 /* stderr */, "tar", "--format", "ustar", "-c", "-C", td, i)); // Shouldn't throw, unless something is severely damaged. // in_pipe.out.close (); ifdstream is ( move (in_pipe.in), fdstream_mode::skip, ifdstream::badbit); ofdstream os (ari.path, fdopen_mode::binary); char buf[8192]; while (!eof (is)) { is.read (buf, sizeof (buf)); if (size_t n = static_cast (is.gcount ())) { sha.append (buf, n); os.write (buf, n); } } os.close (); if (!pr.wait ()) fail << "tar " << *pr.exit; } catch (const system_error& e) { // There must be something wrong with the setup or there is no // space left on the disk, thus the failure is fatal. // fail << "unable to archive " << td << i << "/: " << e; } // Post the upload instance archive. // using namespace http_service; parameters params ({ {parameter::text, "session", tr.session}, {parameter::text, "instance", i}, {parameter::file, "archive", ari.path.string ()}, {parameter::text, "sha256sum", sha.string ()}}); if (challenge) params.push_back ({ parameter::text, "challenge", base64_encode (*challenge)}); result pr (post (ops, url, params)); // Turn the potential upload failure into the "upload" operation // error, amending the task result manifest. // if (pr.error) { // The "upload" operation result must be present (otherwise // there would be nothing to upload). We can assume it is last. // assert (!rm.results.empty ()); operation_result& r (rm.results.back ()); // The "upload" operation result must be the last, if present. // assert (r.operation == "upload"); auto log = [&r, indent = false] (const string& t, const string& l) mutable { if (indent) r.log += " "; else indent = true; r.log += t; r.log += ": "; r.log += l; r.log += '\n'; }; log ("error", "unable to upload " + td.leaf ().string () + '/' + i + " build artifacts"); log ("error", *pr.error); if (!pr.message.empty ()) log ("reason", pr.message); if (pr.reference) log ("reference", *pr.reference); for (const manifest_name_value& nv: pr.body) { if (!nv.name.empty ()) log (nv.name, nv.value); } r.status |= result_status::error; rm.status |= r.status; break; } } // Bail out on the instance archive upload failure. // if (!rm.status) break; } catch (const system_error& e) { fail << "unable to iterate over " << td << ": " << e; } } } result_status rs (rm.status); // Upload the result. // result_request_manifest rq {tr.session, move (challenge), agent_checksum, move (rm)}; { const string& u (*tr.result_url); try { http_curl c (trace, path ("-"), nullfd, // Not expecting any data in response. curl::post, u, "--header", "Content-Type: text/manifest", "--retry", ops.request_retries (), "--retry-max-time", ops.request_timeout (), "--max-time", ops.request_timeout (), "--connect-timeout", ops.connect_timeout ()); // This is tricky/hairy: we may fail hard writing the input before // seeing that curl exited with an error and failing softly. // bool f (false); try { // Don't break lines in the manifest values not to further increase // the size of the result request manifest encoded representation. // Note that this manifest can contain quite a few lines in the // operation logs, potentially truncated to fit the upload limit // (see worker/worker.cxx for details). Breaking these lines can // increase the request size beyond this limit and so we can end up // with the request failure. // serialize_manifest (rq, c.out, u, "result request", true /* fail_hard */, true /* long_lines */); } catch (const failed&) {f = true;} c.out.close (); if (!c.wait () || f) throw_generic_error (EIO); } catch (const system_error& e) { error << "unable to upload result to " << u << ": " << e; continue; } } l2 ([&]{trace << "built " << t.name << '/' << t.version << ' ' << "status " << rs << ' ' << "on " << t.machine << ' ' << "for " << url;}); } } catch (const failed&) { return 1; // Diagnostics has already been issued. } catch (const cli::exception& e) { error << e; return 1; } namespace bbot { static unsigned int rand_seed; // Seed for rand_r(); size_t genrand () { if (rand_seed == 0) rand_seed = static_cast ( std::chrono::system_clock::now ().time_since_epoch ().count ()); return static_cast (rand_r (&rand_seed)); } // Note: Linux-specific implementation. // string iface_addr (const string& i) { if (i.size () >= IFNAMSIZ) throw invalid_argument ("interface name too long"); auto_fd fd (socket (AF_INET, SOCK_DGRAM | SOCK_CLOEXEC, 0)); if (fd.get () == -1) throw_system_error (errno); ifreq ifr; ifr.ifr_addr.sa_family = AF_INET; strcpy (ifr.ifr_name, i.c_str ()); if (ioctl (fd.get (), SIOCGIFADDR, &ifr) == -1) throw_system_error (errno); char buf[INET_ADDRSTRLEN]; // IPv4 address. if (inet_ntop (AF_INET, &reinterpret_cast (&ifr.ifr_addr)->sin_addr, buf, sizeof (buf)) == nullptr) throw_system_error (errno); return buf; } }