aboutsummaryrefslogtreecommitdiff
path: root/bbot/agent/agent.cxx
diff options
context:
space:
mode:
Diffstat (limited to 'bbot/agent/agent.cxx')
-rw-r--r--bbot/agent/agent.cxx327
1 files changed, 243 insertions, 84 deletions
diff --git a/bbot/agent/agent.cxx b/bbot/agent/agent.cxx
index 7a64a18..eec6072 100644
--- a/bbot/agent/agent.cxx
+++ b/bbot/agent/agent.cxx
@@ -4,11 +4,14 @@
#include <bbot/agent/agent.hxx>
-#include <pwd.h> // getpwuid()
-#include <limits.h> // PATH_MAX
-#include <signal.h> // signal()
-#include <stdlib.h> // rand_r()
-#include <unistd.h> // sleep(), realink(), getuid(), fsync()
+#include <pwd.h> // getpwuid()
+#include <limits.h> // PATH_MAX
+#include <signal.h> // signal()
+#include <stdlib.h> // rand_r()
+#include <unistd.h> // sleep(), realink(), getuid(), fsync(), [f]stat()
+#include <sys/types.h> // stat
+#include <sys/stat.h> // [f]stat()
+#include <sys/file.h> // flock()
#include <net/if.h> // ifreq
#include <netinet/in.h> // sockaddr_in
@@ -17,12 +20,13 @@
#include <sys/socket.h>
#include <chrono>
+#include <random>
#include <iostream>
#include <libbutl/pager.mxx>
#include <libbutl/sha256.mxx>
#include <libbutl/openssl.mxx>
-#include <libbutl/filesystem.mxx> // dir_iterator
+#include <libbutl/filesystem.mxx> // dir_iterator, try_rmfile()
#include <libbbot/manifest.hxx>
@@ -53,6 +57,10 @@ namespace bbot
standard_version tc_ver;
string tc_id;
+ uint16_t inst;
+
+ uint16_t offset;
+
string hname;
uid_t uid;
string uname;
@@ -79,7 +87,7 @@ file_not_empty (const path& f)
// The btrfs tool likes to print informational messages, like "Created
// snapshot such and such". Luckily, it writes them to stdout while proper
-// diagnostics to stderr.
+// diagnostics goes to stderr.
//
template <typename... A>
inline void
@@ -136,9 +144,10 @@ bootstrap_machine (const dir_path& md,
// Start the TFTP server (server chroot is --tftp). Map:
//
// GET requests to .../toolchains/<name>/*
- // PUT requests to .../bootstrap/<name>/*
+ // PUT requests to .../bootstrap/<name>-<instance>/*
//
- auto_rmdir arm ((dir_path (ops.tftp ()) /= "bootstrap") /= tc_name);
+ const string in_name (tc_name + '-' + to_string (inst));
+ auto_rmdir arm ((dir_path (ops.tftp ()) /= "bootstrap") /= in_name);
try_mkdir_p (arm.path);
// Bootstrap result manifest.
@@ -158,8 +167,8 @@ bootstrap_machine (const dir_path& md,
for (size_t retry (0);; ++retry)
{
tftp_server tftpd ("Gr ^/?(.+)$ /toolchains/" + tc_name + "/\\1\n" +
- "Pr ^/?(.+)$ /bootstrap/" + tc_name + "/\\1\n",
- ops.tftp_port () + tc_num);
+ "Pr ^/?(.+)$ /bootstrap/" + in_name + "/\\1\n",
+ ops.tftp_port () + offset);
l3 ([&]{trace << "tftp server on port " << tftpd.port ();});
@@ -296,16 +305,122 @@ bootstrap_machine (const dir_path& md,
return r;
}
-// Return available machines and their directories as a parallel array.
+// Machine locking.
+//
+// We use flock(2) which is straightforward. The tricky part is cleaning the
+// file up. Here we may have a race when two processes are trying to open &
+// lock the file that is being unlocked & removed by a third process. In this
+// case one of these processes may still open the old file. To resolve this,
+// after opening and locking the file, we verify that a new file hasn't
+// appeared by stat'ing the path and file descriptor and comparing the inodes.
+//
+// Note that converting a lock (shared to exclusive or vice versa) is not
+// guaranteed to be atomic (in case later we want to support exclusive
+// bootstrap and shared build).
+//
+class machine_lock
+{
+public:
+ machine_lock () = default; // Empty lock.
+
+ ~machine_lock ()
+ {
+ unlock (true /* ignore_errors */);
+ }
+
+ void
+ unlock (bool ignore_errors = false)
+ {
+ if (fl_)
+ {
+ fl_ = false; // We have tried.
+
+ try_rmfile (fp_, ignore_errors);
+
+ if (flock (fd_.get (), LOCK_UN) != 0 && !ignore_errors)
+ throw_generic_error (errno);
+ }
+ }
+
+ machine_lock (machine_lock&&) = default;
+ machine_lock& operator= (machine_lock&&) = default;
+
+ machine_lock (const machine_lock&) = delete;
+ machine_lock& operator= (const machine_lock&) = delete;
+
+ // Implementation details.
+ //
+public:
+ machine_lock (path&& fp, auto_fd&& fd)
+ : fp_ (move (fp)), fd_ (move (fd)), fl_ (true) {}
+
+private:
+ path fp_;
+ auto_fd fd_;
+ bool fl_ = false;
+};
+
+// Try to lock the machine given its -<toolchain> directory.
+//
+static optional<machine_lock>
+lock_machine (const dir_path& tp)
+{
+ path fp (tp + ".lock"); // The -<toolchain>.lock file.
+
+ for (;;)
+ {
+ auto_fd fd (fdopen (fp, fdopen_mode::out | fdopen_mode::create));
+
+ if (flock (fd.get (), LOCK_EX | LOCK_NB) != 0)
+ {
+ if (errno == EWOULDBLOCK)
+ return nullopt;
+
+ throw_generic_error (errno);
+ }
+
+ struct stat st1, st2;
+
+ if (fstat (fd.get (), &st1) != 0 ||
+ stat (fp.string ().c_str (), &st2) != 0 ) // Both should succeed.
+ throw_generic_error (errno);
+
+ if (st1.st_ino == st2.st_ino)
+ return machine_lock (move (fp), move (fd));
+
+ // Note: unlocked by close().
+ }
+}
+
+// Given the toolchain directory (-<toolchain>) return the snapshot path in
+// the <name>-<toolchain>-<xxx> form.
+//
+// We include the instance number into <xxx> for debuggability.
//
-static pair<bootstrapped_machine_manifests, dir_paths>
+static inline dir_path
+snapshot_path (const dir_path& tp)
+{
+ return tp.directory () /=
+ path::traits::temp_name (tp.leaf ().string () + '-' + to_string (inst));
+}
+
+// Return available machines, (re-)bootstrapping them if necessary.
+//
+struct bootstrapped_machine
+{
+ dir_path path;
+ bootstrapped_machine_manifest manifest;
+ machine_lock lock;
+};
+using bootstrapped_machines = vector<bootstrapped_machine>;
+
+static bootstrapped_machines
enumerate_machines (const dir_path& machines)
try
{
tracer trace ("enumerate_machines", machines.string ().c_str ());
- bootstrapped_machine_manifests rm;
- dir_paths rd;
+ bootstrapped_machines r;
if (ops.fake_machine_specified ())
{
@@ -313,25 +428,30 @@ try
parse_manifest<machine_header_manifest> (
ops.fake_machine (), "machine header"));
- rm.push_back (
- bootstrapped_machine_manifest {
- machine_manifest {
- mh.id,
- mh.name,
- mh.summary,
- machine_type::kvm,
- string ("de:ad:be:ef:de:ad"),
- nullopt,
- strings ()},
- toolchain_manifest {tc_id},
- bootstrap_manifest {}
- });
-
- rd.push_back (dir_path (ops.machines ()) /= mh.name); // For diagnostics.
-
- return make_pair (move (rm), move (rd));
+ r.push_back (
+ bootstrapped_machine {
+ dir_path (ops.machines ()) /= mh.name, // For diagnostics.
+ bootstrapped_machine_manifest {
+ machine_manifest {
+ move (mh.id),
+ move (mh.name),
+ move (mh.summary),
+ machine_type::kvm,
+ string ("de:ad:be:ef:de:ad"),
+ nullopt,
+ strings ()},
+ toolchain_manifest {tc_id},
+ bootstrap_manifest {}},
+ machine_lock ()});
+
+ return r;
}
+ // Notice and warn if there are no machines (as opposed to all of them being
+ // locked).
+ //
+ bool none (true);
+
// The first level are machine volumes.
//
for (const dir_entry& ve: dir_iterator (machines,
@@ -350,8 +470,7 @@ try
//
try
{
- for (const dir_entry& me: dir_iterator (vd,
- false /* ignore_dangling */))
+ for (const dir_entry& me: dir_iterator (vd, false /* ignore_dangling */))
{
const string mn (me.path ().string ());
@@ -361,28 +480,32 @@ try
const dir_path md (dir_path (vd) /= mn);
// Our endgoal here is to obtain a bootstrapped snapshot of this
- // machine while watching out for potential race conditions (machines
- // being added/upgraded/removed; see the manual for details).
+ // machine while watching out for potential race conditions (other
+ // instances as well as machines being added/upgraded/removed; see the
+ // manual for details).
//
// So here is our overall plan:
//
// 1. Resolve current subvolume link for our bootstrap protocol.
//
- // 2. If there is no link, cleanup and ignore this machine.
+ // 2. Lock the machine. This excludes any other instance from trying
+ // to perform the following steps.
//
- // 3. Try to create a snapshot of current subvolume (this operation is
+ // 3. If there is no link, cleanup old bootstrap (if any) and ignore
+ // this machine.
+ //
+ // 4. Try to create a snapshot of current subvolume (this operation is
// atomic). If failed (e.g., someone changed the link and removed
// the subvolume in the meantime), retry from #1.
//
- // 4. Compare the snapshot to the already bootstrapped version (if
+ // 5. Compare the snapshot to the already bootstrapped version (if
// any) and see if we need to re-bootstrap. If so, use the snapshot
// as a starting point. Rename to bootstrapped at the end (atomic).
//
dir_path lp (dir_path (md) /= (mn + '-' + bs_prot)); // -<P>
dir_path tp (dir_path (md) /= (mn + '-' + tc_name)); // -<toolchain>
- bool te (dir_exists (tp));
- auto delete_t = [&tp, &trace] ()
+ auto delete_bootstrapped = [&tp, &trace] () // Delete -<toolchain>.
{
run_btrfs (trace, "property", "set", "-ts", tp, "ro", "false");
run_btrfs (trace, "subvolume", "delete", tp);
@@ -421,15 +544,29 @@ try
fail << "unable to read subvolume link " << lp << ": " << e;
}
+ none = none && sp.empty ();
+
+ // Try to lock the machine, skipping it if already locked.
+ //
+ optional<machine_lock> ml (lock_machine (tp));
+
+ if (!ml)
+ {
+ l4 ([&]{trace << "skipping " << md << ": locked";});
+ break;
+ }
+
+ bool te (dir_exists (tp));
+
// If the resolution fails, then this means there is no current
// machine subvolume (for this bootstrap protocol). In this case we
- // clean up our toolchain subvolume (<name>-<toolchain>) and ignore
- // this machine.
+ // clean up our toolchain subvolume (-<toolchain>, if any) and
+ // ignore this machine.
//
if (sp.empty ())
{
if (te)
- delete_t ();
+ delete_bootstrapped ();
l3 ([&]{trace << "skipping " << md << ": no subvolume link";});
break;
@@ -437,8 +574,7 @@ try
// <name>-<toolchain>-<xxx>
//
- const dir_path xp (
- dir_path (md) /= path::traits::temp_name (mn + '-' + tc_name));
+ const dir_path xp (snapshot_path (tp));
if (btrfs_exit (trace, "subvolume", "snapshot", sp, xp) != 0)
{
@@ -525,7 +661,7 @@ try
}
if (!te)
- delete_t ();
+ delete_bootstrapped ();
}
else
l3 ([&]{trace << "bootstrapping " << tp;});
@@ -576,8 +712,8 @@ try
// Add the machine to the lists.
//
- rm.push_back (move (*bmm));
- rd.push_back (move (tp));
+ r.push_back (
+ bootstrapped_machine {move (tp), move (*bmm), move (*ml)});
break;
}
@@ -589,7 +725,10 @@ try
}
}
- return make_pair (move (rm), move (rd));
+ if (none)
+ warn << "no build machines for toolchain " << tc_name;
+
+ return r;
}
catch (const system_error& e)
{
@@ -629,10 +768,11 @@ try
// TFTP server mapping (server chroot is --tftp):
//
- // GET requests to .../build/<name>/get/*
- // PUT requests to .../build/<name>/put/*
+ // GET requests to .../build/<name>-<instance>/get/*
+ // PUT requests to .../build/<name>-<instance>/put/*
//
- auto_rmdir arm ((dir_path (ops.tftp ()) /= "build") /= tc_name);
+ const string in_name (tc_name + '-' + to_string (inst));
+ auto_rmdir arm ((dir_path (ops.tftp ()) /= "build") /= in_name);
dir_path gd (dir_path (arm.path) /= "get");
dir_path pd (dir_path (arm.path) /= "put");
@@ -671,8 +811,7 @@ try
// <name>-<toolchain>-<xxx>
//
- const dir_path xp (
- md.directory () /= path::traits::temp_name (md.leaf ().string ()));
+ const dir_path xp (snapshot_path (md));
string br ("br1"); // Using private bridge for now.
@@ -685,9 +824,9 @@ try
// Start the TFTP server.
//
- tftp_server tftpd ("Gr ^/?(.+)$ /build/" + tc_name + "/get/\\1\n" +
- "Pr ^/?(.+)$ /build/" + tc_name + "/put/\\1\n",
- ops.tftp_port () + tc_num);
+ tftp_server tftpd ("Gr ^/?(.+)$ /build/" + in_name + "/get/\\1\n" +
+ "Pr ^/?(.+)$ /build/" + in_name + "/put/\\1\n",
+ ops.tftp_port () + offset);
l3 ([&]{trace << "tftp server on port " << tftpd.port ();});
@@ -903,6 +1042,15 @@ try
: standard_version (BBOT_VERSION_STR));
tc_id = ops.toolchain_id ();
+ if (tc_num == 0 || tc_num > 99)
+ fail << "invalid --toolchain-num value " << tc_num;
+
+ inst = ops.instance ();
+
+ if (inst == 0 || inst > 99)
+ fail << "invalid --instance value " << inst;
+
+ offset = (tc_num - 1) * 100 + inst;
// Controller URLs.
//
@@ -968,6 +1116,7 @@ try
info << "toolchain num " << tc_num <<
info << "toolchain ver " << tc_ver.string () <<
info << "toolchain id " << tc_id <<
+ info << "instance num " << inst <<
info << "CPU(s) " << ops.cpu () <<
info << "RAM(kB) " << ops.ram ();
@@ -986,13 +1135,14 @@ try
// 4. If a build task is returned, do it, upload the result, and go to #1
// (immediately).
//
- for (bool sleep (false);; ::sleep (sleep ? 60 : 0), sleep = false)
+ auto rand_sleep = [g = std::mt19937 (std::random_device {} ())] () mutable
{
- // Enumerate the machines.
- //
- auto mp (enumerate_machines (ops.machines ()));
- bootstrapped_machine_manifests& ms (mp.first);
- dir_paths& ds (mp.second);
+ return std::uniform_int_distribution<unsigned int> (50, 60) (g);
+ };
+
+ for (unsigned int sleep (0);; ::sleep (sleep), sleep = 0)
+ {
+ bootstrapped_machines ms (enumerate_machines (ops.machines ()));
// Prepare task request.
//
@@ -1004,10 +1154,12 @@ try
machine_header_manifests {}
};
- for (const bootstrapped_machine_manifest& m: ms)
- tq.machines.emplace_back (m.machine.id,
- m.machine.name,
- m.machine.summary);
+ // Note: below we assume tq.size () == ms.size ().
+ //
+ for (const bootstrapped_machine& m: ms)
+ tq.machines.emplace_back (m.manifest.machine.id,
+ m.manifest.machine.name,
+ m.manifest.machine.summary);
if (ops.dump_machines ())
{
@@ -1019,13 +1171,17 @@ try
if (tq.machines.empty ())
{
- warn << "no build machines for toolchain " << tc_name;
- sleep = true;
+ // Normally this means all the machines are locked so sleep a bit less.
+ //
+ sleep = rand_sleep () / 2;
continue;
}
// Send task requests.
//
+ // Note that we have to do it while holding the lock on all the machines
+ // since we don't know which machine we will need.
+ //
string url;
task_response_manifest tr;
@@ -1131,22 +1287,22 @@ try
if (tr.session.empty ()) // No task from any of the controllers.
{
l2 ([&]{trace << "no tasks from any controllers, sleeping";});
- sleep = true;
+ sleep = rand_sleep ();
continue;
}
// We have a build task.
//
- // First find the index of the machine we were asked to use (and also
- // verify it is one of those we sent).
+ // First find the index of the machine we were asked to use (and verify it
+ // is one of those we sent). Also unlock all the other machines.
//
- size_t i (0);
- for (const machine_header_manifest& m: tq.machines)
+ size_t i (ms.size ());
+ for (size_t j (0); j != ms.size (); ++j)
{
- if (m.name == tr.task->machine)
- break;
-
- ++i;
+ if (tq.machines[j].name == tr.task->machine)
+ i = j;
+ else
+ ms[j].lock.unlock ();
}
if (i == ms.size ())
@@ -1174,10 +1330,12 @@ try
if (!ops.trust ().empty ())
t.trust = ops.trust ();
- const dir_path& d (ds[i]); // The -<toolchain> directory.
- const bootstrapped_machine_manifest& m (ms[i]);
+ const dir_path& d (); // The -<toolchain> directory.
+ const bootstrapped_machine_manifest& m ();
+
+ result_manifest r (perform_task (ms[i].path, ms[i].manifest, t));
- result_manifest r (perform_task (d, m, t));
+ ms[i].lock.unlock (); // No need to hold the lock any longer.
if (ops.dump_result ())
{
@@ -1185,7 +1343,7 @@ try
return 0;
}
- // Prepare answer to the private key challenge.
+ // Prepare the answer to the private key challenge.
//
optional<vector<char>> challenge;
@@ -1211,7 +1369,8 @@ try
catch (const system_error& e)
{
// The task response challenge is valid (verified by manifest parser),
- // so there is something wrong with setup, and so the failure is fatal.
+ // so there must be something wrong with the setup and the failure is
+ // fatal.
//
fail << "unable to sign task response challenge: " << e;
}