// file : bbot/agent.cxx -*- C++ -*- // copyright : Copyright (c) 2014-2017 Code Synthesis Ltd // license : TBC; see accompanying LICENSE file #include #include // getpwuid() #include // PATH_MAX #include // signal() #include // sleep(), realink(), getuid() #include // ifreq #include // sockaddr_in #include // inet_ntop() #include #include #include #include #include // dir_iterator #include #include #include #include #include #include #include using namespace std; using namespace butl; using namespace bbot; namespace bbot { agent_options ops; const string bs_prot ("1"); string tc_name; string tc_num; string tc_id; uid_t uid; string uname; // Note: Linux-specific implementation. // string iface_addr (const string& i) { if (i.size () >= IFNAMSIZ) throw invalid_argument ("interface nama too long"); auto_fd fd (socket (AF_INET, SOCK_DGRAM | SOCK_CLOEXEC, 0)); if (fd.get () == -1) throw_system_error (errno); ifreq ifr; ifr.ifr_addr.sa_family = AF_INET; strcpy (ifr.ifr_name, i.c_str ()); if (ioctl (fd.get (), SIOCGIFADDR, &ifr) == -1) throw_system_error (errno); char buf[3 * 4 + 3 + 1]; // IPv4 address. if (inet_ntop (AF_INET, &reinterpret_cast (&ifr.ifr_addr)->sin_addr, buf, sizeof (buf)) == nullptr) throw_system_error (errno); return buf; } } // The btrfs tool likes to print informational messages, like "Created // snapshot such and such". Luckily, it writes them to stdout while proper // diagnostics to stderr. // template inline void btrfs (tracer& t, A&&... a) { if (verb >= 3) run_io (t, fdnull (), 2, 2, "btrfs", forward (a)...); else run_io (t, fdnull (), fdnull (), 2, "btrfs", forward (a)...); } template inline butl::process_exit::code_type btrfs_exit (tracer& t, A&&... a) { return verb >= 3 ? run_io_exit (t, fdnull (), 2, 2, "btrfs", forward (a)...) : run_io_exit (t, fdnull (), fdnull (), 2, "btrfs", forward (a)...); } // Bootstrap the machine. Return the bootstrapped machine manifest if // successful and nullopt otherwise (in which case the machine directory // should be cleaned and the machine ignored for now). // static optional bootstrap_machine (const dir_path& md, const machine_manifest& mm, optional obmm) { tracer trace ("bootstrap_machine"); bootstrapped_machine_manifest r { mm, toolchain_manifest {tc_id}, bootstrap_manifest { bootstrap_manifest::versions_type { {"bbot", BBOT_VERSION}, {"libbbot", LIBBBOT_VERSION}, {"libbpkg", LIBBPKG_VERSION}, {"libbutl", LIBBUTL_VERSION} } } }; if (ops.fake_bootstrap ()) { r.machine.mac = "de:ad:be:ef:de:ad"; } else try { string br ("br1"); // Using private bridge for now. // Start the TFTP server (server chroot is /build/tftp). Map: // // GET requests to /build/tftp/toolchain//* // PUT requests to /build/tftp/bootstrap//* // auto_rmdir arm (dir_path ("/build/tftp/bootstrap/" + tc_name)); try_mkdir_p (arm.path ()); // Bootstrap result manifest. // path mf (arm.path () / "manifest"); try_rmfile (mf); tftp_server tftpd ("Gr ^/?(.+)$ /toolchain/" + tc_name + "/\\1\n" + "Pr ^/?(.+)$ /bootstrap/" + tc_name + "/\\1\n"); l2 ([&]{trace << "tftp server on port " << tftpd.port ();}); // Start the machine. // unique_ptr m ( start_machine (md, mm, obmm ? obmm->machine.mac : nullopt, br, tftpd.port ())); { // If we are terminating with an exception then force the machine down. // Failed that, the machine's destructor will block waiting for its // completion. // auto mg ( make_exception_guard ( [&m, &md] () { info << "trying to force machine " << md << " down"; try {m->forcedown ();} catch (const failed&) {} })); // What happens if the bootstrap process hangs? The simple thing would // be to force the machine down after some timeout and then fail. But // that won't be very helpful for investigating the cause. So instead // the plan is to suspend it after some timeout, issue diagnostics // (without failing and which Build OS monitor will relay to the admin), // and wait for the external intervention. // auto soft_fail = [&md, &m] (const char* msg) { { diag_record dr (error); dr << msg << " for machine " << md << ", suspending"; m->print_info (dr); } m->suspend (); m->wait (); return nullopt; }; // The first request should be the toolchain download. Wait for up to 60 // seconds for that to arrive. In a sense we use it as an indication // that the machine has booted and the bootstrap process has started. // size_t to; const size_t startup_to (60); const size_t bootstrap_to (ops.bootstrap_timeout ()); const size_t shutdown_to (60); if (!tftpd.serve ((to = startup_to))) return soft_fail ("bootstrap startup timeout"); l2 ([&]{trace << "completed startup in " << startup_to - to << "s";}); // Next the bootstrap process may download additional toolchain // archives, build things, and then upload the result manifest. So on // our side we serve TFTP requests while periodically checking for the // manifest file. // for (to = bootstrap_to; to != 0 && !file_exists (mf); tftpd.serve (to)) ; if (to == 0) return soft_fail ("bootstrap timeout"); l2 ([&]{trace << "completed bootstrap in " << bootstrap_to - to << "s";}); // Shut the machine down cleanly. // if (!m->shutdown ((to = shutdown_to))) return soft_fail ("bootstrap shutdown timeout"); l2 ([&]{trace << "completed shutdown in " << shutdown_to - to << "s";}); } // Parse the result manifest. // try { r.bootstrap = parse_manifest (mf, "bootstrap"); } catch (const failed&) { error << "invalid bootstrap manifest for machine " << md; return nullopt; } r.machine.mac = m->mac; // Save the MAC address. } catch (const system_error& e) { fail << "tftp server error: " << e; } serialize_manifest (r, md / "manifest", "bootstrapped machine"); return r; } static machine_header_manifests enumerate_machines (const dir_path& rd) try { tracer trace ("enumerate_machines"); machine_header_manifests r; // The first level are machine volumes. // for (const dir_entry& ve: dir_iterator (rd)) { const string vn (ve.path ().string ()); // Ignore hidden directories. // if (ve.type () != entry_type::directory || vn[0] == '.') continue; const dir_path vd (dir_path (rd) /= vn); // Inside we have machines. // try { for (const dir_entry& me: dir_iterator (vd)) { const string mn (me.path ().string ()); if (me.type () != entry_type::directory || mn[0] == '.') continue; const dir_path md (dir_path (vd) /= mn); // Our endgoal here is to obtain a bootstrapped snapshot of this // machine while watching out for potential race conditions (machines // being added/upgraded/removed; see the manual for details). // // So here is our overall plan: // // 1. Resolve current subvolume link for our bootstrap protocol. // // 2. If there is no link, cleanup and ignore this machine. // // 3. Try to create a snapshot of current subvolume (this operation is // atomic). If failed (e.g., someone changed the link and removed // the subvolume in the meantime), retry from #1. // // 4. Compare the snapshot to the already bootstrapped version (if // any) and see if we need to re-bootstrap. If so, use the snapshot // as a starting point. Rename to bootstrapped at the end (atomic). // const dir_path lp (dir_path (md) /= (mn + '-' + bs_prot)); // -

const dir_path tp (dir_path (md) /= (mn + '-' + tc_name)); // - bool te (dir_exists (tp)); auto delete_t = [&tp, &trace] () { btrfs (trace, "property", "set", "-ts", tp, "ro", "false"); btrfs (trace, "subvolume", "delete", tp); }; for (size_t retry (0);; ++retry) { if (retry != 0) sleep (1); // Resolve the link to subvolume path. // dir_path sp; // -

. try { char b [PATH_MAX + 1]; ssize_t r (readlink (lp.string ().c_str (), b, sizeof (b))); if (r == -1) { if (errno != ENOENT) throw_generic_error (errno); } else if (static_cast (r) >= sizeof (b)) throw_generic_error (EINVAL); else { b[r] = '\0'; sp = dir_path (b); if (sp.relative ()) sp = md / sp; } } catch (const system_error& e) { fail << "unable to read subvolume link " << lp << ": " << e; } // If the resolution fails, then this means there is no current // machine subvolume (for this bootstrap protocol). In this case we // clean up our toolchain subvolume (-) and ignore // this machine. // if (sp.empty ()) { if (te) delete_t (); l2 ([&]{trace << "skipping " << md << ": no subvolume link";}); break; } // -- // const dir_path xp (dir_path (md) /= path::traits::temp_name (mn + '-' + tc_name)); if (btrfs_exit (trace, "subvolume", "snapshot", sp, xp) != 0) { if (retry >= 10) fail << "unable to snapshot subvolume " << sp; continue; } // Load the (original) machine manifest. // auto mm ( parse_manifest (sp / "manifest", "machine")); // If we already have -, see if it needs to be re- // bootstrapped. Things that render it obsolete: // // 1. New machine revision (compare machine ids). // 2. New toolchain (compare toolchain ids). // 3. New bbot/libbbot (compare versions). // // The last case has a complication: what should we do if we have // bootstrapped a newer version of bbot? This would mean that we are // about to be stopped and upgraded (and the upgraded version will // probably be able to use the result). So we simply ignore this // machine for this run. // Return -1 if older, 0 if the same, and +1 if newer. // auto compare_bbot = [] (const bootstrap_manifest& m) -> int { auto cmp = [&m] (const string& n, uint64_t v) -> int { auto i = m.versions.find (n); return i == m.versions.end () || i->second < v ? -1 : i->second > v ? 1 : 0; }; // Start from the top assuming a new dependency cannot be added // without changing the dependent's version. // int r; return (r = cmp ("bbot", BBOT_VERSION)) != 0 ? r : (r = cmp ("libbbot", LIBBBOT_VERSION)) != 0 ? r : (r = cmp ("libbpkg", LIBBPKG_VERSION)) != 0 ? r : (r = cmp ("libbutl", LIBBUTL_VERSION)) != 0 ? r : 0; }; optional bmm; if (te) { bmm = parse_manifest ( tp / "manifest", "bootstrapped machine"); if (bmm->machine.id != mm.id) { l2 ([&]{trace << "re-bootstrapping " << tp << ": new machine";}); te = false; } if (bmm->toolchain.id != tc_id) { l2 ([&]{trace << "re-bootstrapping " << tp << ": new toolchain";}); te = false; } if (int i = compare_bbot (bmm->bootstrap)) { if (i < 0) { l2 ([&]{trace << "re-bootstrapping " << tp << ": new bbot";}); te = false; } else { l2 ([&]{trace << "ignoring " << tp << ": old bbot";}); btrfs (trace, "subvolume", "delete", xp); break; } } if (!te) delete_t (); } else l2 ([&]{trace << "bootstrapping " << tp;}); if (!te) { // Use the -- snapshot that we have made to // bootstrap the new machine. Then atomically rename it to // -. // bmm = bootstrap_machine (xp, mm, move (bmm)); if (!bmm) { l2 ([&]{trace << "ignoring " << tp << ": failed to bootstrap";}); btrfs (trace, "subvolume", "delete", xp); break; } try { mvdir (xp, tp); } catch (const system_error& e) { fail << "unable to rename " << xp << " to " << tp; } // Check the boostrapped bbot version as above and ignore this // machine if it's newer than us. // if (int i = compare_bbot (bmm->bootstrap)) { assert (i > 0); l2 ([&]{trace << "ignoring " << tp << ": old bbot";}); break; } } else btrfs (trace, "subvolume", "delete", xp); // Add the machine to the list. // r.push_back ( machine_header_manifest (move (mm.id), move (mm.name), move (mm.summary))); break; } } } catch (const system_error& e) { fail << "unable to iterate over " << vd << ": " << e << endf; } } return r; } catch (const system_error& e) { fail << "unable to iterate over " << rd << ": " << e << endf; } extern "C" void handle_signal (int sig) { switch (sig) { case SIGHUP: exit (3); // Unimplemented feature. case SIGTERM: exit (0); default: assert (false); } } // Right arrow followed by newline. // const char systemd_indent[] = "\xE2\x86\xB2\n"; int main (int argc, char* argv[]) try { cli::argv_scanner scan (argc, argv, true); ops.parse (scan); verb = ops.verbose (); uid = getuid (); uname = getpwuid (uid)->pw_name; if (ops.systemd_daemon ()) { // Map to systemd severity prefixes (see sd-daemon(3) for details). Note // that here we assume we will never have location (like file name which // would end up being before the prefix). // trace_indent = fail.indent_ = error.indent_ = warn.indent_ = info.indent_ = text.indent_ = systemd_indent; fail.type_ = "<3>"; error.type_ = "<3>"; warn.type_ = "<4>"; info.type_ = "<6>"; trace_type = "<7>"; info << "bbot agent for " << tc_name << '/' << tc_num << info << "toolchain id " << tc_id << info << "CPU(s) " << ops.cpu () << info << "RAM(kB) " << ops.ram (); } tracer trace ("main"); // On POSIX ignore SIGPIPE which is signaled to a pipe-writing process if // the pipe reading end is closed. Note that by default this signal // terminates a process. Also note that there is no way to disable this // behavior on a file descriptor basis or for the write() function call. // if (signal (SIGPIPE, SIG_IGN) == SIG_ERR) fail << "unable to ignore broken pipe (SIGPIPE) signal: " << system_error (errno, generic_category ()); // Sanitize. // Version. // if (ops.version ()) { cout << "bbot-agent " << BBOT_VERSION_STR << endl << "libbbot " << LIBBBOT_VERSION_STR << endl << "libbutl " << LIBBUTL_VERSION_STR << endl << "Copyright (c) 2014-2017 Code Synthesis Ltd" << endl << "TBC; All rights reserved" << endl; return 0; } // Help. // if (ops.help ()) { pager p ("bbot-agent help", false); print_bbot_agent_usage (p.stream ()); // If the pager failed, assume it has issued some diagnostics. // return p.wait () ? 0 : 1; } if (argc != 4) fail << "toolchain name/id/num excected" << info << "run " << argv[0] << " --help for details"; tc_name = argv[1]; tc_num = argv[2]; tc_id = argv[3]; // Handle SIGHUP and SIGTERM. // if (signal (SIGHUP, &handle_signal) == SIG_ERR || signal (SIGTERM, &handle_signal) == SIG_ERR) fail << "unable to set signal handler: " << system_error (errno, generic_category ()); // Sanitize. // The work loop. The steps we go through are: // // 1. Enumerate the available machines, (re-)bootstrapping any of necessary. // // 2. Poll controller(s) for build tasks. // // 3. If no build tasks are available, go to #1 after sleeping a bit. // // 4. If a build task is returned, do it, upload the result, and go to #1 // immediately. // for (unsigned int s; (s = 60); sleep (s)) { machine_header_manifests mms (enumerate_machines (ops.machines ())); if (ops.dump_machines ()) { for (const machine_header_manifest& mm: mms) serialize_manifest (mm, cout, "stdout", "machine manifest"); return 0; } } } catch (const failed&) { return 1; // Diagnostics has already been issued. } catch (const cli::exception& e) { error << e; return 1; }