aboutsummaryrefslogtreecommitdiff
path: root/bbot/agent.cxx
diff options
context:
space:
mode:
authorBoris Kolpackov <boris@codesynthesis.com>2017-04-18 13:29:50 +0200
committerBoris Kolpackov <boris@codesynthesis.com>2017-04-18 13:29:50 +0200
commit0e2f76b6f0ecb4b4c00a4c8001843b3c54bc08ad (patch)
tree3f0735a7b7e0be27823b23e24806fe9650548dc6 /bbot/agent.cxx
parent1804e3e8cf3b8f1bb14e197dada1697c40bed144 (diff)
Finish agent and worker logic
Diffstat (limited to 'bbot/agent.cxx')
-rw-r--r--bbot/agent.cxx473
1 files changed, 403 insertions, 70 deletions
diff --git a/bbot/agent.cxx b/bbot/agent.cxx
index 634a94d..eed49d6 100644
--- a/bbot/agent.cxx
+++ b/bbot/agent.cxx
@@ -28,6 +28,7 @@
#include <bbot/tftp>
#include <bbot/machine>
+#include <bbot/machine-manifest>
#include <bbot/bootstrap-manifest>
using namespace std;
@@ -41,9 +42,12 @@ namespace bbot
const string bs_prot ("1");
string tc_name;
- string tc_num;
+ size_t tc_num;
string tc_id;
+ strings controllers;
+
+ string hname;
uid_t uid;
string uname;
@@ -84,9 +88,9 @@ namespace bbot
//
template <typename... A>
inline void
-btrfs (tracer& t, A&&... a)
+run_btrfs (tracer& t, A&&... a)
{
- if (verb >= 3)
+ if (verb >= 4)
run_io (t, fdnull (), 2, 2, "btrfs", forward<A> (a)...);
else
run_io (t, fdnull (), fdnull (), 2, "btrfs", forward<A> (a)...);
@@ -96,7 +100,7 @@ template <typename... A>
inline butl::process_exit::code_type
btrfs_exit (tracer& t, A&&... a)
{
- return verb >= 3
+ return verb >= 4
? run_io_exit (t, fdnull (), 2, 2, "btrfs", forward<A> (a)...)
: run_io_exit (t, fdnull (), fdnull (), 2, "btrfs", forward<A> (a)...);
}
@@ -114,7 +118,7 @@ bootstrap_machine (const dir_path& md,
bootstrapped_machine_manifest r {
mm,
- toolchain_manifest {tc_id},
+ toolchain_manifest {tc_id.empty () ? "bogus" : tc_id},
bootstrap_manifest {
bootstrap_manifest::versions_type {
{"bbot", BBOT_VERSION},
@@ -134,12 +138,12 @@ bootstrap_machine (const dir_path& md,
{
string br ("br1"); // Using private bridge for now.
- // Start the TFTP server (server chroot is /build/tftp). Map:
+ // Start the TFTP server (server chroot is --tftp). Map:
//
- // GET requests to /build/tftp/toolchain/<name>/*
- // PUT requests to /build/tftp/bootstrap/<name>/*
+ // GET requests to .../toolchain/<name>/*
+ // PUT requests to .../bootstrap/<name>/*
//
- auto_rmdir arm (dir_path ("/build/tftp/bootstrap/" + tc_name));
+ auto_rmdir arm ((dir_path (ops.tftp ()) /= "bootstrap") /= tc_name);
try_mkdir_p (arm.path ());
// Bootstrap result manifest.
@@ -150,7 +154,7 @@ bootstrap_machine (const dir_path& md,
tftp_server tftpd ("Gr ^/?(.+)$ /toolchain/" + tc_name + "/\\1\n" +
"Pr ^/?(.+)$ /bootstrap/" + tc_name + "/\\1\n");
- l2 ([&]{trace << "tftp server on port " << tftpd.port ();});
+ l3 ([&]{trace << "tftp server on port " << tftpd.port ();});
// Start the machine.
//
@@ -205,7 +209,7 @@ bootstrap_machine (const dir_path& md,
if (!tftpd.serve ((to = startup_to)))
return soft_fail ("bootstrap startup timeout");
- l2 ([&]{trace << "completed startup in " << startup_to - to << "s";});
+ l3 ([&]{trace << "completed startup in " << startup_to - to << "s";});
// Next the bootstrap process may download additional toolchain
// archives, build things, and then upload the result manifest. So on
@@ -217,50 +221,45 @@ bootstrap_machine (const dir_path& md,
if (to == 0)
return soft_fail ("bootstrap timeout");
- l2 ([&]{trace << "completed bootstrap in " << bootstrap_to - to << "s";});
+ l3 ([&]{trace << "completed bootstrap in " << bootstrap_to - to << "s";});
// Shut the machine down cleanly.
//
if (!m->shutdown ((to = shutdown_to)))
return soft_fail ("bootstrap shutdown timeout");
- l2 ([&]{trace << "completed shutdown in " << shutdown_to - to << "s";});
+ l3 ([&]{trace << "completed shutdown in " << shutdown_to - to << "s";});
}
// Parse the result manifest.
//
- try
- {
- r.bootstrap = parse_manifest<bootstrap_manifest> (mf, "bootstrap");
- }
- catch (const failed&)
- {
- error << "invalid bootstrap manifest for machine " << md;
- return nullopt;
- }
+ r.bootstrap = parse_manifest<bootstrap_manifest> (mf, "bootstrap");
r.machine.mac = m->mac; // Save the MAC address.
}
catch (const system_error& e)
{
- fail << "tftp server error: " << e;
+ fail << "bootstrap error: " << e;
}
serialize_manifest (r, md / "manifest", "bootstrapped machine");
return r;
}
-static machine_header_manifests
-enumerate_machines (const dir_path& rd)
+// Return available machines and their directories as a parallel array.
+//
+static pair<bootstrapped_machine_manifests, dir_paths>
+enumerate_machines (const dir_path& machines)
try
{
tracer trace ("enumerate_machines");
- machine_header_manifests r;
+ bootstrapped_machine_manifests rm;
+ dir_paths rd;
// The first level are machine volumes.
//
- for (const dir_entry& ve: dir_iterator (rd))
+ for (const dir_entry& ve: dir_iterator (machines))
{
const string vn (ve.path ().string ());
@@ -269,7 +268,7 @@ try
if (ve.type () != entry_type::directory || vn[0] == '.')
continue;
- const dir_path vd (dir_path (rd) /= vn);
+ const dir_path vd (dir_path (machines) /= vn);
// Inside we have machines.
//
@@ -302,14 +301,14 @@ try
// any) and see if we need to re-bootstrap. If so, use the snapshot
// as a starting point. Rename to bootstrapped at the end (atomic).
//
- const dir_path lp (dir_path (md) /= (mn + '-' + bs_prot)); // -<P>
- const dir_path tp (dir_path (md) /= (mn + '-' + tc_name)); // -<too...>
+ dir_path lp (dir_path (md) /= (mn + '-' + bs_prot)); // -<P>
+ dir_path tp (dir_path (md) /= (mn + '-' + tc_name)); // -<toolchain>
bool te (dir_exists (tp));
auto delete_t = [&tp, &trace] ()
{
- btrfs (trace, "property", "set", "-ts", tp, "ro", "false");
- btrfs (trace, "subvolume", "delete", tp);
+ run_btrfs (trace, "property", "set", "-ts", tp, "ro", "false");
+ run_btrfs (trace, "subvolume", "delete", tp);
};
for (size_t retry (0);; ++retry)
@@ -355,14 +354,14 @@ try
if (te)
delete_t ();
- l2 ([&]{trace << "skipping " << md << ": no subvolume link";});
+ l3 ([&]{trace << "skipping " << md << ": no subvolume link";});
break;
}
// <name>-<toolchain>-<xxx>
//
- const dir_path xp (dir_path (md) /=
- path::traits::temp_name (mn + '-' + tc_name));
+ const dir_path xp (
+ dir_path (md) /= path::traits::temp_name (mn + '-' + tc_name));
if (btrfs_exit (trace, "subvolume", "snapshot", sp, xp) != 0)
{
@@ -422,13 +421,13 @@ try
if (bmm->machine.id != mm.id)
{
- l2 ([&]{trace << "re-bootstrapping " << tp << ": new machine";});
+ l3 ([&]{trace << "re-bootstrapping " << tp << ": new machine";});
te = false;
}
- if (bmm->toolchain.id != tc_id)
+ if (!tc_id.empty () && bmm->toolchain.id != tc_id)
{
- l2 ([&]{trace << "re-bootstrapping " << tp << ": new toolchain";});
+ l3 ([&]{trace << "re-bootstrapping " << tp << ": new toolchain";});
te = false;
}
@@ -436,13 +435,13 @@ try
{
if (i < 0)
{
- l2 ([&]{trace << "re-bootstrapping " << tp << ": new bbot";});
+ l3 ([&]{trace << "re-bootstrapping " << tp << ": new bbot";});
te = false;
}
else
{
- l2 ([&]{trace << "ignoring " << tp << ": old bbot";});
- btrfs (trace, "subvolume", "delete", xp);
+ l3 ([&]{trace << "ignoring " << tp << ": old bbot";});
+ run_btrfs (trace, "subvolume", "delete", xp);
break;
}
}
@@ -451,7 +450,7 @@ try
delete_t ();
}
else
- l2 ([&]{trace << "bootstrapping " << tp;});
+ l3 ([&]{trace << "bootstrapping " << tp;});
if (!te)
{
@@ -463,8 +462,8 @@ try
if (!bmm)
{
- l2 ([&]{trace << "ignoring " << tp << ": failed to bootstrap";});
- btrfs (trace, "subvolume", "delete", xp);
+ l3 ([&]{trace << "ignoring " << tp << ": failed to bootstrap";});
+ run_btrfs (trace, "subvolume", "delete", xp);
break;
}
@@ -477,25 +476,25 @@ try
fail << "unable to rename " << xp << " to " << tp;
}
- // Check the boostrapped bbot version as above and ignore this
+ l2 ([&]{trace << "bootstrapped " << bmm->machine.name;});
+
+ // Check the bootstrapped bbot version as above and ignore this
// machine if it's newer than us.
//
if (int i = compare_bbot (bmm->bootstrap))
{
assert (i > 0);
- l2 ([&]{trace << "ignoring " << tp << ": old bbot";});
+ l3 ([&]{trace << "ignoring " << tp << ": old bbot";});
break;
}
}
else
- btrfs (trace, "subvolume", "delete", xp);
+ run_btrfs (trace, "subvolume", "delete", xp);
- // Add the machine to the list.
+ // Add the machine to the lists.
//
- r.push_back (
- machine_header_manifest (move (mm.id),
- move (mm.name),
- move (mm.summary)));
+ rm.push_back (move (*bmm));
+ rd.push_back (move (tp));
break;
}
@@ -507,11 +506,161 @@ try
}
}
- return r;
+ return make_pair (move (rm), move (rd));
}
catch (const system_error& e)
{
- fail << "unable to iterate over " << rd << ": " << e << endf;
+ fail << "unable to iterate over " << machines << ": " << e << endf;
+}
+
+static result_manifest
+perform_task (const dir_path& md,
+ const bootstrapped_machine_manifest& mm,
+ const task_manifest& tm)
+{
+ tracer trace ("perform_task");
+
+ result_manifest r {
+ tm.name,
+ tm.version,
+ result_status::abort,
+ operation_results {}};
+
+ if (ops.fake_build ())
+ return r;
+
+ // The overall plan is as follows:
+ //
+ // 1. Snapshot the (bootstrapped) machine.
+ //
+ // 2. Save the task manifest to the TFTP directory (to be accessed by the
+ // worker).
+ //
+ // 3. Start the TFTP server and the machine.
+ //
+ // 4. Serve TFTP requests while watching out for the result manifest.
+ //
+ // 5. Clean up (force the machine down and delete the snapshot).
+ //
+ try
+ {
+ // <name>-<toolchain>-<xxx>
+ //
+ const dir_path xp (
+ md.directory () /= path::traits::temp_name (md.leaf ().string ()));
+
+ run_btrfs (trace, "subvolume", "snapshot", md, xp);
+
+ string br ("br1"); // Using private bridge for now.
+
+ // Start the TFTP server (server chroot is --tftp). Map:
+ //
+ // GET requests to .../build/<name>/get/*
+ // PUT requests to .../build/<name>/put/*
+ //
+ auto_rmdir arm ((dir_path (ops.tftp ()) /= "build") /= tc_name);
+
+ dir_path gd (dir_path (arm.path ()) /= "get");
+ dir_path pd (dir_path (arm.path ()) /= "put");
+
+ try_mkdir_p (gd);
+ try_mkdir_p (pd);
+
+ path tf (gd / "manifest"); // Task manifest file.
+ path rf (pd / "manifest"); // Result manifest file.
+
+ serialize_manifest (tm, tf, "task");
+
+ tftp_server tftpd ("Gr ^/?(.+)$ /build/" + tc_name + "/get/\\1\n" +
+ "Pr ^/?(.+)$ /build/" + tc_name + "/put/\\1\n");
+
+ l3 ([&]{trace << "tftp server on port " << tftpd.port ();});
+
+ // Start the machine.
+ //
+ unique_ptr<machine> m (
+ start_machine (xp,
+ mm.machine,
+ mm.machine.mac,
+ br,
+ tftpd.port ()));
+
+ // Note: the machine handling logic is similar to bootstrap.
+ //
+ {
+ auto mg (
+ make_exception_guard (
+ [&m, &xp] ()
+ {
+ info << "trying to force machine " << xp << " down";
+ try {m->forcedown ();} catch (const failed&) {}
+ }));
+
+ auto soft_fail = [&xp, &m, &r] (const char* msg)
+ {
+ {
+ diag_record dr (error);
+ dr << msg << " for machine " << xp << ", suspending";
+ m->print_info (dr);
+ }
+ m->suspend ();
+ m->wait ();
+ return r;
+ };
+
+ // The first request should be the task manifest download. Wait for up
+ // to 60 seconds for that to arrive. In a sense we use it as an
+ // indication that the machine has booted and the worker process has
+ // started.
+ //
+ size_t to;
+ const size_t startup_to (60);
+ const size_t build_to (ops.build_timeout ());
+
+ if (!tftpd.serve ((to = startup_to)))
+ return soft_fail ("build startup timeout");
+
+ l3 ([&]{trace << "completed startup in " << startup_to - to << "s";});
+
+ // Next the worker builds things and then uploads the result manifest.
+ // So on our side we serve TFTP requests while checking for the manifest
+ // file.
+ //
+ for (to = build_to; to != 0 && !file_exists (rf); tftpd.serve (to)) ;
+
+ if (to == 0)
+ return soft_fail ("build timeout");
+
+ l3 ([&]{trace << "completed build in " << build_to - to << "s";});
+
+ // Force the machine down (there is no need wasting time on clean
+ // shutdown since the next step is to drop the snapshot).
+ //
+ m->forcedown ();
+ }
+
+ run_btrfs (trace, "subvolume", "delete", xp);
+
+ // Parse the result manifest.
+ //
+ r = parse_manifest<result_manifest> (rf, "result");
+
+ // Update package name/version if the returned value as "unknown".
+ //
+ if (r.version == bpkg::version ("0"))
+ {
+ assert (r.status == result_status::abnormal);
+
+ r.name = tm.name;
+ r.version = tm.version;
+ }
+ }
+ catch (const system_error& e)
+ {
+ fail << "build error: " << e;
+ }
+
+ return r;
}
extern "C" void
@@ -538,9 +687,11 @@ try
verb = ops.verbose ();
- uid = getuid ();
- uname = getpwuid (uid)->pw_name;
-
+ // Note that unlike other projects, here we distinguish between fail
+ // (critical error, agent terminates) and error (non-fatal error, agent
+ // continues to run). This means we should be careful not using fail
+ // to report normal errors and vice-versa.
+ //
if (ops.systemd_daemon ())
{
// Map to systemd severity prefixes (see sd-daemon(3) for details). Note
@@ -554,7 +705,7 @@ try
info.indent_ =
text.indent_ = systemd_indent;
- fail.type_ = "<3>";
+ fail.type_ = "<2>";
error.type_ = "<3>";
warn.type_ = "<4>";
info.type_ = "<6>";
@@ -568,6 +719,19 @@ try
tracer trace ("main");
+ uid = getuid ();
+ uname = getpwuid (uid)->pw_name;
+
+ {
+ char buf[HOST_NAME_MAX + 1];
+
+ if (gethostname (buf, sizeof (buf)) == -1)
+ fail << "unable to obtain hostname: "
+ << system_error (errno, generic_category ()); // Sanitize.
+
+ hname = buf;
+ }
+
// On POSIX ignore SIGPIPE which is signaled to a pipe-writing process if
// the pipe reading end is closed. Note that by default this signal
// terminates a process. Also note that there is no way to disable this
@@ -602,13 +766,16 @@ try
return p.wait () ? 0 : 1;
}
- if (argc != 4)
- fail << "toolchain name/id/num excected" <<
+ tc_name = ops.toolchain_name ();
+ tc_num = ops.toolchain_num ();
+ tc_id = ops.toolchain_id ();
+
+ if (argc < 2)
+ fail << "controller url expected" <<
info << "run " << argv[0] << " --help for details";
- tc_name = argv[1];
- tc_num = argv[2];
- tc_id = argv[3];
+ for (int i (1); i != argc; ++i)
+ controllers.push_back (argv[i]);
// Handle SIGHUP and SIGTERM.
//
@@ -619,26 +786,192 @@ try
// The work loop. The steps we go through are:
//
- // 1. Enumerate the available machines, (re-)bootstrapping any of necessary.
+ // 1. Enumerate the available machines, (re-)bootstrapping any if necessary.
//
// 2. Poll controller(s) for build tasks.
//
- // 3. If no build tasks are available, go to #1 after sleeping a bit.
+ // 3. If no build tasks are available, go to #1 (after sleeping a bit).
//
// 4. If a build task is returned, do it, upload the result, and go to #1
- // immediately.
+ // (immediately).
//
- for (unsigned int s; (s = 60); sleep (s))
+ for (bool sleep (false);; ::sleep (sleep ? 60 : 0), sleep = false)
{
- machine_header_manifests mms (enumerate_machines (ops.machines ()));
+ // Enumerate the machines.
+ //
+ auto mp (enumerate_machines (ops.machines ()));
+ bootstrapped_machine_manifests& ms (mp.first);
+ dir_paths& ds (mp.second);
+
+ // Prepare task request.
+ //
+ // @@ TODO: key fingerprint.
+ //
+ task_request_manifest tq {hname, "", machine_header_manifests {}};
+
+ for (const bootstrapped_machine_manifest& m: ms)
+ tq.machines.emplace_back (m.machine.id,
+ m.machine.name,
+ m.machine.summary);
if (ops.dump_machines ())
{
- for (const machine_header_manifest& mm: mms)
- serialize_manifest (mm, cout, "stdout", "machine manifest");
+ for (const machine_header_manifest& m: tq.machines)
+ serialize_manifest (m, cout, "stdout", "machine");
return 0;
}
+
+ if (tq.machines.empty ())
+ {
+ warn << "no build machines for toolchain " << tc_name;
+ sleep = true;
+ continue;
+ }
+
+ // Send task requests.
+ //
+ //
+ string url;
+ task_response_manifest tr;
+
+ if (ops.fake_request_specified ())
+ {
+ const path& f (ops.fake_request ());
+ task_manifest t (f.string () != "-"
+ ? parse_manifest<task_manifest> (f, "task")
+ : parse_manifest<task_manifest> (cin, "stdin", "task"));
+
+ url = controllers[0];
+
+ tr = task_response_manifest {
+ "fake-session", // Dummy session.
+ string (), // Empty challange.
+ url, // Empty result URL.
+ move (t)};
+ }
+ else
+ {
+ for (const string& u: controllers)
+ {
+ try
+ {
+ http_curl c (trace,
+ path ("-"),
+ path ("-"),
+ curl::post,
+ u,
+ "--header", "Content-Type: text/manifest",
+ "--max-time", ops.request_timeout ());
+
+ serialize_manifest (tq, c.out, u, "task request");
+ c.out.close ();
+
+ tr = parse_manifest<task_response_manifest> (
+ c.in, u, "task response");
+ c.in.close ();
+
+ if (!c.wait ())
+ throw_generic_error (EIO);
+ }
+ catch (const system_error& e)
+ {
+ error << "unable to request task from " << u << ": " << e;
+ continue;
+ }
+
+ if (!tr.session.empty ()) // Got a task.
+ {
+ url = u;
+ break;
+ }
+ }
+ }
+
+ if (tr.session.empty ()) // No task from any of the controllers.
+ {
+ sleep = true;
+ continue;
+ }
+
+ // We have a build task.
+ //
+ // First find the index of the machine we were asked to use (and also
+ // verify it is one of those we sent).
+ //
+ size_t i (0);
+ for (const bootstrapped_machine_manifest& m: ms)
+ {
+ if (m.machine.name == tr.task->machine)
+ break;
+
+ ++i;
+ }
+
+ if (i == ms.size ())
+ {
+ error << "task from " << url << " for unknown machine "
+ << tr.task->machine;
+
+ if (ops.dump_task ())
+ return 0;
+
+ continue;
+ }
+
+ const task_manifest& t (*tr.task);
+
+ if (ops.dump_task ())
+ {
+ serialize_manifest (t, cout, "stdout", "task");
+ return 0;
+ }
+
+ const dir_path& d (ds[i]); // The -<toolchain> directory.
+ const bootstrapped_machine_manifest& m (ms[i]);
+
+ result_manifest r (perform_task (d, m, t));
+
+ if (ops.dump_result ())
+ {
+ serialize_manifest (r, cout, "stdout", "result");
+ return 0;
+ }
+
+ // Upload the result.
+ //
+ // @@ TODO challange
+ //
+ result_request_manifest rq {tr.session, "", move (r)};
+ {
+ const string& u (*tr.result_url);
+
+ try
+ {
+ http_curl c (trace,
+ path ("-"),
+ nullfd, // Not expecting any data in response.
+ curl::post,
+ u,
+ "--header", "Content-Type: text/manifest",
+ "--max-time", ops.request_timeout ());
+
+ serialize_manifest (rq, c.out, u, "task request");
+ c.out.close ();
+
+ if (!c.wait ())
+ throw_generic_error (EIO);
+ }
+ catch (const system_error& e)
+ {
+ error << "unable to upload result to " << u << ": " << e;
+ continue;
+ }
+ }
+
+ l2 ([&]{trace << "built " << t.name << '/' << t.version << " "
+ << "on " << t.machine << " "
+ << "for " << url;});
}
}
catch (const failed&)