diff options
Diffstat (limited to 'bbot/agent/agent.cxx')
-rw-r--r-- | bbot/agent/agent.cxx | 327 |
1 files changed, 243 insertions, 84 deletions
diff --git a/bbot/agent/agent.cxx b/bbot/agent/agent.cxx index 7a64a18..eec6072 100644 --- a/bbot/agent/agent.cxx +++ b/bbot/agent/agent.cxx @@ -4,11 +4,14 @@ #include <bbot/agent/agent.hxx> -#include <pwd.h> // getpwuid() -#include <limits.h> // PATH_MAX -#include <signal.h> // signal() -#include <stdlib.h> // rand_r() -#include <unistd.h> // sleep(), realink(), getuid(), fsync() +#include <pwd.h> // getpwuid() +#include <limits.h> // PATH_MAX +#include <signal.h> // signal() +#include <stdlib.h> // rand_r() +#include <unistd.h> // sleep(), realink(), getuid(), fsync(), [f]stat() +#include <sys/types.h> // stat +#include <sys/stat.h> // [f]stat() +#include <sys/file.h> // flock() #include <net/if.h> // ifreq #include <netinet/in.h> // sockaddr_in @@ -17,12 +20,13 @@ #include <sys/socket.h> #include <chrono> +#include <random> #include <iostream> #include <libbutl/pager.mxx> #include <libbutl/sha256.mxx> #include <libbutl/openssl.mxx> -#include <libbutl/filesystem.mxx> // dir_iterator +#include <libbutl/filesystem.mxx> // dir_iterator, try_rmfile() #include <libbbot/manifest.hxx> @@ -53,6 +57,10 @@ namespace bbot standard_version tc_ver; string tc_id; + uint16_t inst; + + uint16_t offset; + string hname; uid_t uid; string uname; @@ -79,7 +87,7 @@ file_not_empty (const path& f) // The btrfs tool likes to print informational messages, like "Created // snapshot such and such". Luckily, it writes them to stdout while proper -// diagnostics to stderr. +// diagnostics goes to stderr. // template <typename... A> inline void @@ -136,9 +144,10 @@ bootstrap_machine (const dir_path& md, // Start the TFTP server (server chroot is --tftp). Map: // // GET requests to .../toolchains/<name>/* - // PUT requests to .../bootstrap/<name>/* + // PUT requests to .../bootstrap/<name>-<instance>/* // - auto_rmdir arm ((dir_path (ops.tftp ()) /= "bootstrap") /= tc_name); + const string in_name (tc_name + '-' + to_string (inst)); + auto_rmdir arm ((dir_path (ops.tftp ()) /= "bootstrap") /= in_name); try_mkdir_p (arm.path); // Bootstrap result manifest. @@ -158,8 +167,8 @@ bootstrap_machine (const dir_path& md, for (size_t retry (0);; ++retry) { tftp_server tftpd ("Gr ^/?(.+)$ /toolchains/" + tc_name + "/\\1\n" + - "Pr ^/?(.+)$ /bootstrap/" + tc_name + "/\\1\n", - ops.tftp_port () + tc_num); + "Pr ^/?(.+)$ /bootstrap/" + in_name + "/\\1\n", + ops.tftp_port () + offset); l3 ([&]{trace << "tftp server on port " << tftpd.port ();}); @@ -296,16 +305,122 @@ bootstrap_machine (const dir_path& md, return r; } -// Return available machines and their directories as a parallel array. +// Machine locking. +// +// We use flock(2) which is straightforward. The tricky part is cleaning the +// file up. Here we may have a race when two processes are trying to open & +// lock the file that is being unlocked & removed by a third process. In this +// case one of these processes may still open the old file. To resolve this, +// after opening and locking the file, we verify that a new file hasn't +// appeared by stat'ing the path and file descriptor and comparing the inodes. +// +// Note that converting a lock (shared to exclusive or vice versa) is not +// guaranteed to be atomic (in case later we want to support exclusive +// bootstrap and shared build). +// +class machine_lock +{ +public: + machine_lock () = default; // Empty lock. + + ~machine_lock () + { + unlock (true /* ignore_errors */); + } + + void + unlock (bool ignore_errors = false) + { + if (fl_) + { + fl_ = false; // We have tried. + + try_rmfile (fp_, ignore_errors); + + if (flock (fd_.get (), LOCK_UN) != 0 && !ignore_errors) + throw_generic_error (errno); + } + } + + machine_lock (machine_lock&&) = default; + machine_lock& operator= (machine_lock&&) = default; + + machine_lock (const machine_lock&) = delete; + machine_lock& operator= (const machine_lock&) = delete; + + // Implementation details. + // +public: + machine_lock (path&& fp, auto_fd&& fd) + : fp_ (move (fp)), fd_ (move (fd)), fl_ (true) {} + +private: + path fp_; + auto_fd fd_; + bool fl_ = false; +}; + +// Try to lock the machine given its -<toolchain> directory. +// +static optional<machine_lock> +lock_machine (const dir_path& tp) +{ + path fp (tp + ".lock"); // The -<toolchain>.lock file. + + for (;;) + { + auto_fd fd (fdopen (fp, fdopen_mode::out | fdopen_mode::create)); + + if (flock (fd.get (), LOCK_EX | LOCK_NB) != 0) + { + if (errno == EWOULDBLOCK) + return nullopt; + + throw_generic_error (errno); + } + + struct stat st1, st2; + + if (fstat (fd.get (), &st1) != 0 || + stat (fp.string ().c_str (), &st2) != 0 ) // Both should succeed. + throw_generic_error (errno); + + if (st1.st_ino == st2.st_ino) + return machine_lock (move (fp), move (fd)); + + // Note: unlocked by close(). + } +} + +// Given the toolchain directory (-<toolchain>) return the snapshot path in +// the <name>-<toolchain>-<xxx> form. +// +// We include the instance number into <xxx> for debuggability. // -static pair<bootstrapped_machine_manifests, dir_paths> +static inline dir_path +snapshot_path (const dir_path& tp) +{ + return tp.directory () /= + path::traits::temp_name (tp.leaf ().string () + '-' + to_string (inst)); +} + +// Return available machines, (re-)bootstrapping them if necessary. +// +struct bootstrapped_machine +{ + dir_path path; + bootstrapped_machine_manifest manifest; + machine_lock lock; +}; +using bootstrapped_machines = vector<bootstrapped_machine>; + +static bootstrapped_machines enumerate_machines (const dir_path& machines) try { tracer trace ("enumerate_machines", machines.string ().c_str ()); - bootstrapped_machine_manifests rm; - dir_paths rd; + bootstrapped_machines r; if (ops.fake_machine_specified ()) { @@ -313,25 +428,30 @@ try parse_manifest<machine_header_manifest> ( ops.fake_machine (), "machine header")); - rm.push_back ( - bootstrapped_machine_manifest { - machine_manifest { - mh.id, - mh.name, - mh.summary, - machine_type::kvm, - string ("de:ad:be:ef:de:ad"), - nullopt, - strings ()}, - toolchain_manifest {tc_id}, - bootstrap_manifest {} - }); - - rd.push_back (dir_path (ops.machines ()) /= mh.name); // For diagnostics. - - return make_pair (move (rm), move (rd)); + r.push_back ( + bootstrapped_machine { + dir_path (ops.machines ()) /= mh.name, // For diagnostics. + bootstrapped_machine_manifest { + machine_manifest { + move (mh.id), + move (mh.name), + move (mh.summary), + machine_type::kvm, + string ("de:ad:be:ef:de:ad"), + nullopt, + strings ()}, + toolchain_manifest {tc_id}, + bootstrap_manifest {}}, + machine_lock ()}); + + return r; } + // Notice and warn if there are no machines (as opposed to all of them being + // locked). + // + bool none (true); + // The first level are machine volumes. // for (const dir_entry& ve: dir_iterator (machines, @@ -350,8 +470,7 @@ try // try { - for (const dir_entry& me: dir_iterator (vd, - false /* ignore_dangling */)) + for (const dir_entry& me: dir_iterator (vd, false /* ignore_dangling */)) { const string mn (me.path ().string ()); @@ -361,28 +480,32 @@ try const dir_path md (dir_path (vd) /= mn); // Our endgoal here is to obtain a bootstrapped snapshot of this - // machine while watching out for potential race conditions (machines - // being added/upgraded/removed; see the manual for details). + // machine while watching out for potential race conditions (other + // instances as well as machines being added/upgraded/removed; see the + // manual for details). // // So here is our overall plan: // // 1. Resolve current subvolume link for our bootstrap protocol. // - // 2. If there is no link, cleanup and ignore this machine. + // 2. Lock the machine. This excludes any other instance from trying + // to perform the following steps. // - // 3. Try to create a snapshot of current subvolume (this operation is + // 3. If there is no link, cleanup old bootstrap (if any) and ignore + // this machine. + // + // 4. Try to create a snapshot of current subvolume (this operation is // atomic). If failed (e.g., someone changed the link and removed // the subvolume in the meantime), retry from #1. // - // 4. Compare the snapshot to the already bootstrapped version (if + // 5. Compare the snapshot to the already bootstrapped version (if // any) and see if we need to re-bootstrap. If so, use the snapshot // as a starting point. Rename to bootstrapped at the end (atomic). // dir_path lp (dir_path (md) /= (mn + '-' + bs_prot)); // -<P> dir_path tp (dir_path (md) /= (mn + '-' + tc_name)); // -<toolchain> - bool te (dir_exists (tp)); - auto delete_t = [&tp, &trace] () + auto delete_bootstrapped = [&tp, &trace] () // Delete -<toolchain>. { run_btrfs (trace, "property", "set", "-ts", tp, "ro", "false"); run_btrfs (trace, "subvolume", "delete", tp); @@ -421,15 +544,29 @@ try fail << "unable to read subvolume link " << lp << ": " << e; } + none = none && sp.empty (); + + // Try to lock the machine, skipping it if already locked. + // + optional<machine_lock> ml (lock_machine (tp)); + + if (!ml) + { + l4 ([&]{trace << "skipping " << md << ": locked";}); + break; + } + + bool te (dir_exists (tp)); + // If the resolution fails, then this means there is no current // machine subvolume (for this bootstrap protocol). In this case we - // clean up our toolchain subvolume (<name>-<toolchain>) and ignore - // this machine. + // clean up our toolchain subvolume (-<toolchain>, if any) and + // ignore this machine. // if (sp.empty ()) { if (te) - delete_t (); + delete_bootstrapped (); l3 ([&]{trace << "skipping " << md << ": no subvolume link";}); break; @@ -437,8 +574,7 @@ try // <name>-<toolchain>-<xxx> // - const dir_path xp ( - dir_path (md) /= path::traits::temp_name (mn + '-' + tc_name)); + const dir_path xp (snapshot_path (tp)); if (btrfs_exit (trace, "subvolume", "snapshot", sp, xp) != 0) { @@ -525,7 +661,7 @@ try } if (!te) - delete_t (); + delete_bootstrapped (); } else l3 ([&]{trace << "bootstrapping " << tp;}); @@ -576,8 +712,8 @@ try // Add the machine to the lists. // - rm.push_back (move (*bmm)); - rd.push_back (move (tp)); + r.push_back ( + bootstrapped_machine {move (tp), move (*bmm), move (*ml)}); break; } @@ -589,7 +725,10 @@ try } } - return make_pair (move (rm), move (rd)); + if (none) + warn << "no build machines for toolchain " << tc_name; + + return r; } catch (const system_error& e) { @@ -629,10 +768,11 @@ try // TFTP server mapping (server chroot is --tftp): // - // GET requests to .../build/<name>/get/* - // PUT requests to .../build/<name>/put/* + // GET requests to .../build/<name>-<instance>/get/* + // PUT requests to .../build/<name>-<instance>/put/* // - auto_rmdir arm ((dir_path (ops.tftp ()) /= "build") /= tc_name); + const string in_name (tc_name + '-' + to_string (inst)); + auto_rmdir arm ((dir_path (ops.tftp ()) /= "build") /= in_name); dir_path gd (dir_path (arm.path) /= "get"); dir_path pd (dir_path (arm.path) /= "put"); @@ -671,8 +811,7 @@ try // <name>-<toolchain>-<xxx> // - const dir_path xp ( - md.directory () /= path::traits::temp_name (md.leaf ().string ())); + const dir_path xp (snapshot_path (md)); string br ("br1"); // Using private bridge for now. @@ -685,9 +824,9 @@ try // Start the TFTP server. // - tftp_server tftpd ("Gr ^/?(.+)$ /build/" + tc_name + "/get/\\1\n" + - "Pr ^/?(.+)$ /build/" + tc_name + "/put/\\1\n", - ops.tftp_port () + tc_num); + tftp_server tftpd ("Gr ^/?(.+)$ /build/" + in_name + "/get/\\1\n" + + "Pr ^/?(.+)$ /build/" + in_name + "/put/\\1\n", + ops.tftp_port () + offset); l3 ([&]{trace << "tftp server on port " << tftpd.port ();}); @@ -903,6 +1042,15 @@ try : standard_version (BBOT_VERSION_STR)); tc_id = ops.toolchain_id (); + if (tc_num == 0 || tc_num > 99) + fail << "invalid --toolchain-num value " << tc_num; + + inst = ops.instance (); + + if (inst == 0 || inst > 99) + fail << "invalid --instance value " << inst; + + offset = (tc_num - 1) * 100 + inst; // Controller URLs. // @@ -968,6 +1116,7 @@ try info << "toolchain num " << tc_num << info << "toolchain ver " << tc_ver.string () << info << "toolchain id " << tc_id << + info << "instance num " << inst << info << "CPU(s) " << ops.cpu () << info << "RAM(kB) " << ops.ram (); @@ -986,13 +1135,14 @@ try // 4. If a build task is returned, do it, upload the result, and go to #1 // (immediately). // - for (bool sleep (false);; ::sleep (sleep ? 60 : 0), sleep = false) + auto rand_sleep = [g = std::mt19937 (std::random_device {} ())] () mutable { - // Enumerate the machines. - // - auto mp (enumerate_machines (ops.machines ())); - bootstrapped_machine_manifests& ms (mp.first); - dir_paths& ds (mp.second); + return std::uniform_int_distribution<unsigned int> (50, 60) (g); + }; + + for (unsigned int sleep (0);; ::sleep (sleep), sleep = 0) + { + bootstrapped_machines ms (enumerate_machines (ops.machines ())); // Prepare task request. // @@ -1004,10 +1154,12 @@ try machine_header_manifests {} }; - for (const bootstrapped_machine_manifest& m: ms) - tq.machines.emplace_back (m.machine.id, - m.machine.name, - m.machine.summary); + // Note: below we assume tq.size () == ms.size (). + // + for (const bootstrapped_machine& m: ms) + tq.machines.emplace_back (m.manifest.machine.id, + m.manifest.machine.name, + m.manifest.machine.summary); if (ops.dump_machines ()) { @@ -1019,13 +1171,17 @@ try if (tq.machines.empty ()) { - warn << "no build machines for toolchain " << tc_name; - sleep = true; + // Normally this means all the machines are locked so sleep a bit less. + // + sleep = rand_sleep () / 2; continue; } // Send task requests. // + // Note that we have to do it while holding the lock on all the machines + // since we don't know which machine we will need. + // string url; task_response_manifest tr; @@ -1131,22 +1287,22 @@ try if (tr.session.empty ()) // No task from any of the controllers. { l2 ([&]{trace << "no tasks from any controllers, sleeping";}); - sleep = true; + sleep = rand_sleep (); continue; } // We have a build task. // - // First find the index of the machine we were asked to use (and also - // verify it is one of those we sent). + // First find the index of the machine we were asked to use (and verify it + // is one of those we sent). Also unlock all the other machines. // - size_t i (0); - for (const machine_header_manifest& m: tq.machines) + size_t i (ms.size ()); + for (size_t j (0); j != ms.size (); ++j) { - if (m.name == tr.task->machine) - break; - - ++i; + if (tq.machines[j].name == tr.task->machine) + i = j; + else + ms[j].lock.unlock (); } if (i == ms.size ()) @@ -1174,10 +1330,12 @@ try if (!ops.trust ().empty ()) t.trust = ops.trust (); - const dir_path& d (ds[i]); // The -<toolchain> directory. - const bootstrapped_machine_manifest& m (ms[i]); + const dir_path& d (); // The -<toolchain> directory. + const bootstrapped_machine_manifest& m (); + + result_manifest r (perform_task (ms[i].path, ms[i].manifest, t)); - result_manifest r (perform_task (d, m, t)); + ms[i].lock.unlock (); // No need to hold the lock any longer. if (ops.dump_result ()) { @@ -1185,7 +1343,7 @@ try return 0; } - // Prepare answer to the private key challenge. + // Prepare the answer to the private key challenge. // optional<vector<char>> challenge; @@ -1211,7 +1369,8 @@ try catch (const system_error& e) { // The task response challenge is valid (verified by manifest parser), - // so there is something wrong with setup, and so the failure is fatal. + // so there must be something wrong with the setup and the failure is + // fatal. // fail << "unable to sign task response challenge: " << e; } |