diff options
author | Boris Kolpackov <boris@codesynthesis.com> | 2017-04-18 13:29:50 +0200 |
---|---|---|
committer | Boris Kolpackov <boris@codesynthesis.com> | 2017-04-18 13:29:50 +0200 |
commit | 0e2f76b6f0ecb4b4c00a4c8001843b3c54bc08ad (patch) | |
tree | 3f0735a7b7e0be27823b23e24806fe9650548dc6 /bbot/agent.cxx | |
parent | 1804e3e8cf3b8f1bb14e197dada1697c40bed144 (diff) |
Finish agent and worker logic
Diffstat (limited to 'bbot/agent.cxx')
-rw-r--r-- | bbot/agent.cxx | 473 |
1 files changed, 403 insertions, 70 deletions
diff --git a/bbot/agent.cxx b/bbot/agent.cxx index 634a94d..eed49d6 100644 --- a/bbot/agent.cxx +++ b/bbot/agent.cxx @@ -28,6 +28,7 @@ #include <bbot/tftp> #include <bbot/machine> +#include <bbot/machine-manifest> #include <bbot/bootstrap-manifest> using namespace std; @@ -41,9 +42,12 @@ namespace bbot const string bs_prot ("1"); string tc_name; - string tc_num; + size_t tc_num; string tc_id; + strings controllers; + + string hname; uid_t uid; string uname; @@ -84,9 +88,9 @@ namespace bbot // template <typename... A> inline void -btrfs (tracer& t, A&&... a) +run_btrfs (tracer& t, A&&... a) { - if (verb >= 3) + if (verb >= 4) run_io (t, fdnull (), 2, 2, "btrfs", forward<A> (a)...); else run_io (t, fdnull (), fdnull (), 2, "btrfs", forward<A> (a)...); @@ -96,7 +100,7 @@ template <typename... A> inline butl::process_exit::code_type btrfs_exit (tracer& t, A&&... a) { - return verb >= 3 + return verb >= 4 ? run_io_exit (t, fdnull (), 2, 2, "btrfs", forward<A> (a)...) : run_io_exit (t, fdnull (), fdnull (), 2, "btrfs", forward<A> (a)...); } @@ -114,7 +118,7 @@ bootstrap_machine (const dir_path& md, bootstrapped_machine_manifest r { mm, - toolchain_manifest {tc_id}, + toolchain_manifest {tc_id.empty () ? "bogus" : tc_id}, bootstrap_manifest { bootstrap_manifest::versions_type { {"bbot", BBOT_VERSION}, @@ -134,12 +138,12 @@ bootstrap_machine (const dir_path& md, { string br ("br1"); // Using private bridge for now. - // Start the TFTP server (server chroot is /build/tftp). Map: + // Start the TFTP server (server chroot is --tftp). Map: // - // GET requests to /build/tftp/toolchain/<name>/* - // PUT requests to /build/tftp/bootstrap/<name>/* + // GET requests to .../toolchain/<name>/* + // PUT requests to .../bootstrap/<name>/* // - auto_rmdir arm (dir_path ("/build/tftp/bootstrap/" + tc_name)); + auto_rmdir arm ((dir_path (ops.tftp ()) /= "bootstrap") /= tc_name); try_mkdir_p (arm.path ()); // Bootstrap result manifest. @@ -150,7 +154,7 @@ bootstrap_machine (const dir_path& md, tftp_server tftpd ("Gr ^/?(.+)$ /toolchain/" + tc_name + "/\\1\n" + "Pr ^/?(.+)$ /bootstrap/" + tc_name + "/\\1\n"); - l2 ([&]{trace << "tftp server on port " << tftpd.port ();}); + l3 ([&]{trace << "tftp server on port " << tftpd.port ();}); // Start the machine. // @@ -205,7 +209,7 @@ bootstrap_machine (const dir_path& md, if (!tftpd.serve ((to = startup_to))) return soft_fail ("bootstrap startup timeout"); - l2 ([&]{trace << "completed startup in " << startup_to - to << "s";}); + l3 ([&]{trace << "completed startup in " << startup_to - to << "s";}); // Next the bootstrap process may download additional toolchain // archives, build things, and then upload the result manifest. So on @@ -217,50 +221,45 @@ bootstrap_machine (const dir_path& md, if (to == 0) return soft_fail ("bootstrap timeout"); - l2 ([&]{trace << "completed bootstrap in " << bootstrap_to - to << "s";}); + l3 ([&]{trace << "completed bootstrap in " << bootstrap_to - to << "s";}); // Shut the machine down cleanly. // if (!m->shutdown ((to = shutdown_to))) return soft_fail ("bootstrap shutdown timeout"); - l2 ([&]{trace << "completed shutdown in " << shutdown_to - to << "s";}); + l3 ([&]{trace << "completed shutdown in " << shutdown_to - to << "s";}); } // Parse the result manifest. // - try - { - r.bootstrap = parse_manifest<bootstrap_manifest> (mf, "bootstrap"); - } - catch (const failed&) - { - error << "invalid bootstrap manifest for machine " << md; - return nullopt; - } + r.bootstrap = parse_manifest<bootstrap_manifest> (mf, "bootstrap"); r.machine.mac = m->mac; // Save the MAC address. } catch (const system_error& e) { - fail << "tftp server error: " << e; + fail << "bootstrap error: " << e; } serialize_manifest (r, md / "manifest", "bootstrapped machine"); return r; } -static machine_header_manifests -enumerate_machines (const dir_path& rd) +// Return available machines and their directories as a parallel array. +// +static pair<bootstrapped_machine_manifests, dir_paths> +enumerate_machines (const dir_path& machines) try { tracer trace ("enumerate_machines"); - machine_header_manifests r; + bootstrapped_machine_manifests rm; + dir_paths rd; // The first level are machine volumes. // - for (const dir_entry& ve: dir_iterator (rd)) + for (const dir_entry& ve: dir_iterator (machines)) { const string vn (ve.path ().string ()); @@ -269,7 +268,7 @@ try if (ve.type () != entry_type::directory || vn[0] == '.') continue; - const dir_path vd (dir_path (rd) /= vn); + const dir_path vd (dir_path (machines) /= vn); // Inside we have machines. // @@ -302,14 +301,14 @@ try // any) and see if we need to re-bootstrap. If so, use the snapshot // as a starting point. Rename to bootstrapped at the end (atomic). // - const dir_path lp (dir_path (md) /= (mn + '-' + bs_prot)); // -<P> - const dir_path tp (dir_path (md) /= (mn + '-' + tc_name)); // -<too...> + dir_path lp (dir_path (md) /= (mn + '-' + bs_prot)); // -<P> + dir_path tp (dir_path (md) /= (mn + '-' + tc_name)); // -<toolchain> bool te (dir_exists (tp)); auto delete_t = [&tp, &trace] () { - btrfs (trace, "property", "set", "-ts", tp, "ro", "false"); - btrfs (trace, "subvolume", "delete", tp); + run_btrfs (trace, "property", "set", "-ts", tp, "ro", "false"); + run_btrfs (trace, "subvolume", "delete", tp); }; for (size_t retry (0);; ++retry) @@ -355,14 +354,14 @@ try if (te) delete_t (); - l2 ([&]{trace << "skipping " << md << ": no subvolume link";}); + l3 ([&]{trace << "skipping " << md << ": no subvolume link";}); break; } // <name>-<toolchain>-<xxx> // - const dir_path xp (dir_path (md) /= - path::traits::temp_name (mn + '-' + tc_name)); + const dir_path xp ( + dir_path (md) /= path::traits::temp_name (mn + '-' + tc_name)); if (btrfs_exit (trace, "subvolume", "snapshot", sp, xp) != 0) { @@ -422,13 +421,13 @@ try if (bmm->machine.id != mm.id) { - l2 ([&]{trace << "re-bootstrapping " << tp << ": new machine";}); + l3 ([&]{trace << "re-bootstrapping " << tp << ": new machine";}); te = false; } - if (bmm->toolchain.id != tc_id) + if (!tc_id.empty () && bmm->toolchain.id != tc_id) { - l2 ([&]{trace << "re-bootstrapping " << tp << ": new toolchain";}); + l3 ([&]{trace << "re-bootstrapping " << tp << ": new toolchain";}); te = false; } @@ -436,13 +435,13 @@ try { if (i < 0) { - l2 ([&]{trace << "re-bootstrapping " << tp << ": new bbot";}); + l3 ([&]{trace << "re-bootstrapping " << tp << ": new bbot";}); te = false; } else { - l2 ([&]{trace << "ignoring " << tp << ": old bbot";}); - btrfs (trace, "subvolume", "delete", xp); + l3 ([&]{trace << "ignoring " << tp << ": old bbot";}); + run_btrfs (trace, "subvolume", "delete", xp); break; } } @@ -451,7 +450,7 @@ try delete_t (); } else - l2 ([&]{trace << "bootstrapping " << tp;}); + l3 ([&]{trace << "bootstrapping " << tp;}); if (!te) { @@ -463,8 +462,8 @@ try if (!bmm) { - l2 ([&]{trace << "ignoring " << tp << ": failed to bootstrap";}); - btrfs (trace, "subvolume", "delete", xp); + l3 ([&]{trace << "ignoring " << tp << ": failed to bootstrap";}); + run_btrfs (trace, "subvolume", "delete", xp); break; } @@ -477,25 +476,25 @@ try fail << "unable to rename " << xp << " to " << tp; } - // Check the boostrapped bbot version as above and ignore this + l2 ([&]{trace << "bootstrapped " << bmm->machine.name;}); + + // Check the bootstrapped bbot version as above and ignore this // machine if it's newer than us. // if (int i = compare_bbot (bmm->bootstrap)) { assert (i > 0); - l2 ([&]{trace << "ignoring " << tp << ": old bbot";}); + l3 ([&]{trace << "ignoring " << tp << ": old bbot";}); break; } } else - btrfs (trace, "subvolume", "delete", xp); + run_btrfs (trace, "subvolume", "delete", xp); - // Add the machine to the list. + // Add the machine to the lists. // - r.push_back ( - machine_header_manifest (move (mm.id), - move (mm.name), - move (mm.summary))); + rm.push_back (move (*bmm)); + rd.push_back (move (tp)); break; } @@ -507,11 +506,161 @@ try } } - return r; + return make_pair (move (rm), move (rd)); } catch (const system_error& e) { - fail << "unable to iterate over " << rd << ": " << e << endf; + fail << "unable to iterate over " << machines << ": " << e << endf; +} + +static result_manifest +perform_task (const dir_path& md, + const bootstrapped_machine_manifest& mm, + const task_manifest& tm) +{ + tracer trace ("perform_task"); + + result_manifest r { + tm.name, + tm.version, + result_status::abort, + operation_results {}}; + + if (ops.fake_build ()) + return r; + + // The overall plan is as follows: + // + // 1. Snapshot the (bootstrapped) machine. + // + // 2. Save the task manifest to the TFTP directory (to be accessed by the + // worker). + // + // 3. Start the TFTP server and the machine. + // + // 4. Serve TFTP requests while watching out for the result manifest. + // + // 5. Clean up (force the machine down and delete the snapshot). + // + try + { + // <name>-<toolchain>-<xxx> + // + const dir_path xp ( + md.directory () /= path::traits::temp_name (md.leaf ().string ())); + + run_btrfs (trace, "subvolume", "snapshot", md, xp); + + string br ("br1"); // Using private bridge for now. + + // Start the TFTP server (server chroot is --tftp). Map: + // + // GET requests to .../build/<name>/get/* + // PUT requests to .../build/<name>/put/* + // + auto_rmdir arm ((dir_path (ops.tftp ()) /= "build") /= tc_name); + + dir_path gd (dir_path (arm.path ()) /= "get"); + dir_path pd (dir_path (arm.path ()) /= "put"); + + try_mkdir_p (gd); + try_mkdir_p (pd); + + path tf (gd / "manifest"); // Task manifest file. + path rf (pd / "manifest"); // Result manifest file. + + serialize_manifest (tm, tf, "task"); + + tftp_server tftpd ("Gr ^/?(.+)$ /build/" + tc_name + "/get/\\1\n" + + "Pr ^/?(.+)$ /build/" + tc_name + "/put/\\1\n"); + + l3 ([&]{trace << "tftp server on port " << tftpd.port ();}); + + // Start the machine. + // + unique_ptr<machine> m ( + start_machine (xp, + mm.machine, + mm.machine.mac, + br, + tftpd.port ())); + + // Note: the machine handling logic is similar to bootstrap. + // + { + auto mg ( + make_exception_guard ( + [&m, &xp] () + { + info << "trying to force machine " << xp << " down"; + try {m->forcedown ();} catch (const failed&) {} + })); + + auto soft_fail = [&xp, &m, &r] (const char* msg) + { + { + diag_record dr (error); + dr << msg << " for machine " << xp << ", suspending"; + m->print_info (dr); + } + m->suspend (); + m->wait (); + return r; + }; + + // The first request should be the task manifest download. Wait for up + // to 60 seconds for that to arrive. In a sense we use it as an + // indication that the machine has booted and the worker process has + // started. + // + size_t to; + const size_t startup_to (60); + const size_t build_to (ops.build_timeout ()); + + if (!tftpd.serve ((to = startup_to))) + return soft_fail ("build startup timeout"); + + l3 ([&]{trace << "completed startup in " << startup_to - to << "s";}); + + // Next the worker builds things and then uploads the result manifest. + // So on our side we serve TFTP requests while checking for the manifest + // file. + // + for (to = build_to; to != 0 && !file_exists (rf); tftpd.serve (to)) ; + + if (to == 0) + return soft_fail ("build timeout"); + + l3 ([&]{trace << "completed build in " << build_to - to << "s";}); + + // Force the machine down (there is no need wasting time on clean + // shutdown since the next step is to drop the snapshot). + // + m->forcedown (); + } + + run_btrfs (trace, "subvolume", "delete", xp); + + // Parse the result manifest. + // + r = parse_manifest<result_manifest> (rf, "result"); + + // Update package name/version if the returned value as "unknown". + // + if (r.version == bpkg::version ("0")) + { + assert (r.status == result_status::abnormal); + + r.name = tm.name; + r.version = tm.version; + } + } + catch (const system_error& e) + { + fail << "build error: " << e; + } + + return r; } extern "C" void @@ -538,9 +687,11 @@ try verb = ops.verbose (); - uid = getuid (); - uname = getpwuid (uid)->pw_name; - + // Note that unlike other projects, here we distinguish between fail + // (critical error, agent terminates) and error (non-fatal error, agent + // continues to run). This means we should be careful not using fail + // to report normal errors and vice-versa. + // if (ops.systemd_daemon ()) { // Map to systemd severity prefixes (see sd-daemon(3) for details). Note @@ -554,7 +705,7 @@ try info.indent_ = text.indent_ = systemd_indent; - fail.type_ = "<3>"; + fail.type_ = "<2>"; error.type_ = "<3>"; warn.type_ = "<4>"; info.type_ = "<6>"; @@ -568,6 +719,19 @@ try tracer trace ("main"); + uid = getuid (); + uname = getpwuid (uid)->pw_name; + + { + char buf[HOST_NAME_MAX + 1]; + + if (gethostname (buf, sizeof (buf)) == -1) + fail << "unable to obtain hostname: " + << system_error (errno, generic_category ()); // Sanitize. + + hname = buf; + } + // On POSIX ignore SIGPIPE which is signaled to a pipe-writing process if // the pipe reading end is closed. Note that by default this signal // terminates a process. Also note that there is no way to disable this @@ -602,13 +766,16 @@ try return p.wait () ? 0 : 1; } - if (argc != 4) - fail << "toolchain name/id/num excected" << + tc_name = ops.toolchain_name (); + tc_num = ops.toolchain_num (); + tc_id = ops.toolchain_id (); + + if (argc < 2) + fail << "controller url expected" << info << "run " << argv[0] << " --help for details"; - tc_name = argv[1]; - tc_num = argv[2]; - tc_id = argv[3]; + for (int i (1); i != argc; ++i) + controllers.push_back (argv[i]); // Handle SIGHUP and SIGTERM. // @@ -619,26 +786,192 @@ try // The work loop. The steps we go through are: // - // 1. Enumerate the available machines, (re-)bootstrapping any of necessary. + // 1. Enumerate the available machines, (re-)bootstrapping any if necessary. // // 2. Poll controller(s) for build tasks. // - // 3. If no build tasks are available, go to #1 after sleeping a bit. + // 3. If no build tasks are available, go to #1 (after sleeping a bit). // // 4. If a build task is returned, do it, upload the result, and go to #1 - // immediately. + // (immediately). // - for (unsigned int s; (s = 60); sleep (s)) + for (bool sleep (false);; ::sleep (sleep ? 60 : 0), sleep = false) { - machine_header_manifests mms (enumerate_machines (ops.machines ())); + // Enumerate the machines. + // + auto mp (enumerate_machines (ops.machines ())); + bootstrapped_machine_manifests& ms (mp.first); + dir_paths& ds (mp.second); + + // Prepare task request. + // + // @@ TODO: key fingerprint. + // + task_request_manifest tq {hname, "", machine_header_manifests {}}; + + for (const bootstrapped_machine_manifest& m: ms) + tq.machines.emplace_back (m.machine.id, + m.machine.name, + m.machine.summary); if (ops.dump_machines ()) { - for (const machine_header_manifest& mm: mms) - serialize_manifest (mm, cout, "stdout", "machine manifest"); + for (const machine_header_manifest& m: tq.machines) + serialize_manifest (m, cout, "stdout", "machine"); return 0; } + + if (tq.machines.empty ()) + { + warn << "no build machines for toolchain " << tc_name; + sleep = true; + continue; + } + + // Send task requests. + // + // + string url; + task_response_manifest tr; + + if (ops.fake_request_specified ()) + { + const path& f (ops.fake_request ()); + task_manifest t (f.string () != "-" + ? parse_manifest<task_manifest> (f, "task") + : parse_manifest<task_manifest> (cin, "stdin", "task")); + + url = controllers[0]; + + tr = task_response_manifest { + "fake-session", // Dummy session. + string (), // Empty challange. + url, // Empty result URL. + move (t)}; + } + else + { + for (const string& u: controllers) + { + try + { + http_curl c (trace, + path ("-"), + path ("-"), + curl::post, + u, + "--header", "Content-Type: text/manifest", + "--max-time", ops.request_timeout ()); + + serialize_manifest (tq, c.out, u, "task request"); + c.out.close (); + + tr = parse_manifest<task_response_manifest> ( + c.in, u, "task response"); + c.in.close (); + + if (!c.wait ()) + throw_generic_error (EIO); + } + catch (const system_error& e) + { + error << "unable to request task from " << u << ": " << e; + continue; + } + + if (!tr.session.empty ()) // Got a task. + { + url = u; + break; + } + } + } + + if (tr.session.empty ()) // No task from any of the controllers. + { + sleep = true; + continue; + } + + // We have a build task. + // + // First find the index of the machine we were asked to use (and also + // verify it is one of those we sent). + // + size_t i (0); + for (const bootstrapped_machine_manifest& m: ms) + { + if (m.machine.name == tr.task->machine) + break; + + ++i; + } + + if (i == ms.size ()) + { + error << "task from " << url << " for unknown machine " + << tr.task->machine; + + if (ops.dump_task ()) + return 0; + + continue; + } + + const task_manifest& t (*tr.task); + + if (ops.dump_task ()) + { + serialize_manifest (t, cout, "stdout", "task"); + return 0; + } + + const dir_path& d (ds[i]); // The -<toolchain> directory. + const bootstrapped_machine_manifest& m (ms[i]); + + result_manifest r (perform_task (d, m, t)); + + if (ops.dump_result ()) + { + serialize_manifest (r, cout, "stdout", "result"); + return 0; + } + + // Upload the result. + // + // @@ TODO challange + // + result_request_manifest rq {tr.session, "", move (r)}; + { + const string& u (*tr.result_url); + + try + { + http_curl c (trace, + path ("-"), + nullfd, // Not expecting any data in response. + curl::post, + u, + "--header", "Content-Type: text/manifest", + "--max-time", ops.request_timeout ()); + + serialize_manifest (rq, c.out, u, "task request"); + c.out.close (); + + if (!c.wait ()) + throw_generic_error (EIO); + } + catch (const system_error& e) + { + error << "unable to upload result to " << u << ": " << e; + continue; + } + } + + l2 ([&]{trace << "built " << t.name << '/' << t.version << " " + << "on " << t.machine << " " + << "for " << url;}); } } catch (const failed&) |