// file : bbot/agent.cxx -*- C++ -*- // copyright : Copyright (c) 2014-2017 Code Synthesis Ltd // license : TBC; see accompanying LICENSE file #include #include // getpwuid() #include // PATH_MAX #include // signal() #include // sleep(), realink(), getuid() #include // ifreq #include // sockaddr_in #include // inet_ntop() #include #include #include #include #include // dir_iterator #include #include #include #include #include #include #include using namespace std; using namespace butl; using namespace bbot; namespace bbot { agent_options ops; const string bs_prot ("1"); string tc_name; string tc_num; string tc_id; uid_t uid; string uname; // Note: Linux-specific implementation. // string iface_addr (const string& i) { if (i.size () >= IFNAMSIZ) throw invalid_argument ("interface nama too long"); auto_fd fd (socket (AF_INET, SOCK_DGRAM | SOCK_CLOEXEC, 0)); if (fd.get () == -1) throw_system_error (errno); ifreq ifr; ifr.ifr_addr.sa_family = AF_INET; strcpy (ifr.ifr_name, i.c_str ()); if (ioctl (fd.get (), SIOCGIFADDR, &ifr) == -1) throw_system_error (errno); char buf[3 * 4 + 3 + 1]; // IPv4 address. if (inet_ntop (AF_INET, &reinterpret_cast (&ifr.ifr_addr)->sin_addr, buf, sizeof (buf)) == nullptr) throw_system_error (errno); return buf; } } // The btrfs tool likes to print informational messages, like "Created // snapshot such and such". Luckily, it writes them to stdout while proper // diagnostics to stderr. // template inline void btrfs (tracer& t, A&&... a) { if (verb >= 3) run_io (t, fdnull (), 2, 2, "btrfs", forward (a)...); else run_io (t, fdnull (), fdnull (), 2, "btrfs", forward (a)...); } template inline butl::process_exit::code_type btrfs_exit (tracer& t, A&&... a) { return verb >= 3 ? run_io_exit (t, fdnull (), 2, 2, "btrfs", forward (a)...) : run_io_exit (t, fdnull (), fdnull (), 2, "btrfs", forward (a)...); } static bootstrapped_machine_manifest bootstrap_machine (const dir_path& md, const machine_manifest& mm, optional obmm) { tracer trace ("bootstrap_machine"); bootstrapped_machine_manifest r { mm, toolchain_manifest {tc_id}, bootstrap_manifest { bootstrap_manifest::versions_type { {"bbot", BBOT_VERSION}, {"libbbot", LIBBBOT_VERSION}, {"libbpkg", LIBBPKG_VERSION}, {"libbutl", LIBBUTL_VERSION} } } }; if (ops.fake_bootstrap ()) { r.machine.mac = "de:ad:be:ef:de:ad"; } else try { string br ("br1"); // Use private bridge for now. // Start the TFTP server (server chroot is /build/tftp). Map: // // GET requests to /build/tftp/toolchain//* // PUT requests to /build/tftp/bootstrap//* // auto_rmdir arm (dir_path ("/build/tftp/bootstrap/" + tc_name)); try_mkdir_p (arm.path ()); tftp_server tftpd ("Gr ^/?(.+)$ /toolchain/" + tc_name + "/\\1\n" + "Pr ^/?(.+)$ /bootstrap/" + tc_name + "/\\1\n"); l2 ([&]{trace << "tftp server on port " << tftpd.port ();}); // Start the machine. // unique_ptr m ( start_machine (md, mm, obmm ? obmm->machine.mac : nullopt, br, tftpd.port ())); r.machine.mac = m->mac; // The first request should be the toolchain download. Wait for up to 60 // seconds for that to arrive. In a sense we use it as an indication that // the machine has booted and the bootstrap process has started. // size_t timeout (60); if (tftpd.serve (timeout)) { l2 ([&]{trace << "received first request in " << 60 - timeout << "s";}); } else { // @@ What should be do here? Non-fatal? Mark the machine as failed? // error << "bootstrap timeout during first request for machine " << md; m->forcedown (); throw failed (); } if (!m->shutdown ()) { error << "forcing machine " << md << " down"; m->forcedown (); throw failed (); } } catch (const system_error& e) { fail << "tftp server error: " << e; } serialize_manifest (r, md / "manifest", "bootstrapped machine"); return r; } static machine_header_manifests enumerate_machines (const dir_path& rd) try { tracer trace ("enumerate_machines"); machine_header_manifests r; // The first level are machine volumes. // for (const dir_entry& ve: dir_iterator (rd)) { const string vn (ve.path ().string ()); // Ignore hidden directories. // if (ve.type () != entry_type::directory || vn[0] == '.') continue; const dir_path vd (dir_path (rd) /= vn); // Inside we have machines. // try { for (const dir_entry& me: dir_iterator (vd)) { const string mn (me.path ().string ()); if (me.type () != entry_type::directory || mn[0] == '.') continue; const dir_path md (dir_path (vd) /= mn); // Our endgoal here is to obtain a bootstrapped snapshot of this // machine while watching out for potential race conditions (machines // being added/upgraded/removed; see the manual for details). // // So here is our overall plan: // // 1. Resolve current subvolume link for our bootstrap protocol. // // 2. If there is no link, cleanup and ignore this machine. // // 3. Try to create a snapshot of current subvolume (this operation is // atomic). If failed (e.g., someone changed the link and removed // the subvolume in the meantime), retry from #1. // // 4. Compare the snapshot to the already bootstrapped version (if // any) and see if we need to re-bootstrap. If so, use the snapshot // as a starting point. Rename to bootstrapped at the end (atomic). // const dir_path lp (dir_path (md) /= (mn + '-' + bs_prot)); // -

const dir_path tp (dir_path (md) /= (mn + '-' + tc_name)); // - bool te (dir_exists (tp)); auto delete_t = [&tp, &trace] () { btrfs (trace, "property", "set", "-ts", tp, "ro", "false"); btrfs (trace, "subvolume", "delete", tp); }; for (size_t retry (0);; ++retry) { if (retry != 0) sleep (1); // Resolve the link to subvolume path. // dir_path sp; // -

. try { char b [PATH_MAX + 1]; ssize_t r (readlink (lp.string ().c_str (), b, sizeof (b))); if (r == -1) { if (errno != ENOENT) throw_generic_error (errno); } else if (static_cast (r) >= sizeof (b)) throw_generic_error (EINVAL); else { b[r] = '\0'; sp = dir_path (b); if (sp.relative ()) sp = md / sp; } } catch (const system_error& e) { fail << "unable to read subvolume link " << lp << ": " << e; } // If the resolution fails, then this means there is no current // machine subvolume (for this bootstrap protocol). In this case we // clean up our toolchain subvolume (-) and ignore // this machine. // if (sp.empty ()) { if (te) delete_t (); l2 ([&]{trace << "skipping " << md << ": no subvolume link";}); break; } // -- // const dir_path xp (dir_path (md) /= path::traits::temp_name (mn + '-' + tc_name)); if (btrfs_exit (trace, "subvolume", "snapshot", sp, xp) != 0) { if (retry >= 10) fail << "unable to snapshot subvolume " << sp; continue; } // Load the (original) machine manifest. // auto mm ( parse_manifest (sp / "manifest", "machine")); // If we already have -, see if it needs to be re- // bootstrapped. Things that render it obsolete: // // 1. New machine revision (compare machine ids). // 2. New toolchain (compare toolchain ids). // 3. New bbot/libbbot (compare versions). // // The last case has a complication: what should we do if we have // bootstrapped a newer version of bbot? This would mean that we are // about to be stopped and upgraded (and the upgraded version will // probably be able to use the result). So we simply ignore this // machine for this run. // Return -1 if older, 0 if the same, and +1 if newer. // auto compare_bbot = [] (const bootstrap_manifest& m) -> int { auto cmp = [&m] (const string& n, uint64_t v) -> int { auto i = m.versions.find (n); return i == m.versions.end () || i->second < v ? -1 : i->second > v ? 1 : 0; }; // Start from the top assuming a new dependency cannot be added // without changing the dependent's version. // int r; return (r = cmp ("bbot", BBOT_VERSION)) != 0 ? r : (r = cmp ("libbbot", LIBBBOT_VERSION)) != 0 ? r : (r = cmp ("libbpkg", LIBBPKG_VERSION)) != 0 ? r : (r = cmp ("libbutl", LIBBUTL_VERSION)) != 0 ? r : 0; }; optional obmm; if (te) { obmm = parse_manifest ( tp / "manifest", "bootstrapped machine"); if (obmm->machine.id != mm.id) { l2 ([&]{trace << "re-bootstrapping " << tp << ": new machine";}); te = false; } if (obmm->toolchain.id != tc_id) { l2 ([&]{trace << "re-bootstrapping " << tp << ": new toolchain";}); te = false; } if (int i = compare_bbot (obmm->bootstrap)) { if (i < 0) { l2 ([&]{trace << "re-bootstrapping " << tp << ": new bbot";}); te = false; } else { l2 ([&]{trace << "ignoring " << tp << ": old bbot";}); btrfs (trace, "subvolume", "delete", xp); break; } } if (!te) delete_t (); } else l2 ([&]{trace << "bootstrapping " << tp;}); if (!te) { // Use the -- snapshot that we have made to // bootstrap the new machine. Then atomically rename it to // -. // bootstrapped_machine_manifest bmm ( bootstrap_machine (xp, mm, move (obmm))); try { mvdir (xp, tp); } catch (const system_error& e) { fail << "unable to rename " << xp << " to " << tp; } te = true; // Check the boostrapped bbot version as above and ignore this // machine if it's newer than us. // if (int i = compare_bbot (bmm.bootstrap)) { assert (i > 0); l2 ([&]{trace << "ignoring " << tp << ": old bbot";}); break; } } else btrfs (trace, "subvolume", "delete", xp); // Add the machine to the list. // r.push_back ( machine_header_manifest (move (mm.id), move (mm.name), move (mm.summary))); break; } } } catch (const system_error& e) { fail << "unable to iterate over " << vd << ": " << e << endf; } } return r; } catch (const system_error& e) { fail << "unable to iterate over " << rd << ": " << e << endf; } extern "C" void handle_signal (int sig) { switch (sig) { case SIGHUP: exit (3); // Unimplemented feature. case SIGTERM: exit (0); default: assert (false); } } // Right arrow followed by newline. // const char systemd_indent[] = "\xE2\x86\xB2\n"; int main (int argc, char* argv[]) try { cli::argv_scanner scan (argc, argv, true); ops.parse (scan); verb = ops.verbose (); uid = getuid (); uname = getpwuid (uid)->pw_name; if (ops.systemd_daemon ()) { // Map to systemd severity prefixes (see sd-daemon(3) for details). Note // that here we assume we will never have location (like file name which // would end up being before the prefix). // trace_indent = fail.indent_ = error.indent_ = warn.indent_ = info.indent_ = text.indent_ = systemd_indent; fail.type_ = "<3>"; error.type_ = "<3>"; warn.type_ = "<4>"; info.type_ = "<6>"; trace_type = "<7>"; info << "bbot agent for " << tc_name << '/' << tc_num << info << "toolchain id " << tc_id << info << "CPU(s) " << ops.cpu () << info << "RAM(kB) " << ops.ram (); } tracer trace ("main"); // On POSIX ignore SIGPIPE which is signaled to a pipe-writing process if // the pipe reading end is closed. Note that by default this signal // terminates a process. Also note that there is no way to disable this // behavior on a file descriptor basis or for the write() function call. // if (signal (SIGPIPE, SIG_IGN) == SIG_ERR) fail << "unable to ignore broken pipe (SIGPIPE) signal: " << system_error (errno, generic_category ()); // Sanitize. // Version. // if (ops.version ()) { cout << "bbot-agent " << BBOT_VERSION_STR << endl << "libbbot " << LIBBBOT_VERSION_STR << endl << "libbutl " << LIBBUTL_VERSION_STR << endl << "Copyright (c) 2014-2017 Code Synthesis Ltd" << endl << "TBC; All rights reserved" << endl; return 0; } // Help. // if (ops.help ()) { pager p ("bbot-agent help", false); print_bbot_agent_usage (p.stream ()); // If the pager failed, assume it has issued some diagnostics. // return p.wait () ? 0 : 1; } if (argc != 4) fail << "toolchain name/id/num excected" << info << "run " << argv[0] << " --help for details"; tc_name = argv[1]; tc_num = argv[2]; tc_id = argv[3]; // Handle SIGHUP and SIGTERM. // if (signal (SIGHUP, &handle_signal) == SIG_ERR || signal (SIGTERM, &handle_signal) == SIG_ERR) fail << "unable to set signal handler: " << system_error (errno, generic_category ()); // Sanitize. // The work loop. The steps we go through are: // // 1. Enumerate the available machines, (re-)bootstrapping any of necessary. // // 2. Poll controller(s) for build tasks. // // 3. If no build tasks are available, go to #1 after sleeping a bit. // // 4. If a build task is returned, do it, upload the result, and go to #1 // immediately. // for (unsigned int s; (s = 60); sleep (s)) { machine_header_manifests mms (enumerate_machines (ops.machines ())); if (ops.dump_machines ()) { for (const machine_header_manifest& mm: mms) serialize_manifest (mm, cout, "stdout", "machine manifest"); return 0; } } } catch (const failed&) { return 1; // Diagnostics has already been issued. } catch (const cli::exception& e) { error << e; return 1; }