aboutsummaryrefslogtreecommitdiff
path: root/bbot/agent
diff options
context:
space:
mode:
Diffstat (limited to 'bbot/agent')
-rw-r--r--bbot/agent/agent.cli139
-rw-r--r--bbot/agent/agent.cxx3320
-rw-r--r--bbot/agent/agent.hxx6
-rw-r--r--bbot/agent/http-service.cxx364
-rw-r--r--bbot/agent/http-service.hxx71
-rw-r--r--bbot/agent/machine.cxx148
-rw-r--r--bbot/agent/machine.hxx15
-rw-r--r--bbot/agent/tftp.cxx4
-rw-r--r--bbot/agent/tftp.hxx4
9 files changed, 3534 insertions, 537 deletions
diff --git a/bbot/agent/agent.cli b/bbot/agent/agent.cli
index b50a43a..23765cf 100644
--- a/bbot/agent/agent.cli
+++ b/bbot/agent/agent.cli
@@ -1,5 +1,5 @@
// file : bbot/agent.cli
-// license : TBC; see accompanying LICENSE file
+// license : MIT; see accompanying LICENSE file
include <libbbot/manifest.hxx>;
@@ -12,19 +12,77 @@ include <bbot/common.cli>;
namespace bbot
{
{
- "<options> <url>",
+ "<options> <priority> <url>",
"
\h|SYNOPSIS|
\c{\b{bbot-agent --help}\n
\b{bbot-agent --version}\n
- \b{bbot-agent} [<options>] <url>...}
+ \b{bbot-agent} [<options>] [<priority>=]<url>...}
\h|DESCRIPTION|
\cb{bbot-agent} @@ TODO.
+ The controller URL <priority> is a four or five-digit decimal value. If it
+ is absent, then \cb{0} (lowest priority) is assumed. URLs with equal
+ priority are queried at random.
+
+ The <priority> value has the \c{[\i{F}]\i{DCBA}} form which encodes four
+ priority levels (\ci{DCBA}) each occupying one decimal digit (so there are
+ 10 distinct priorities in each level) plus the optional boost flag
+ (\ci{F}). These levels offer different trade-offs between the speed of
+ completing a higher priority task and potentially discarding work that has
+ already been done.
+
+ The first priority level (\ci{A}) is a simple preference: among the URLs
+ with equal values for other levels (\ci{DCB}), those with higher first
+ level priorities are queried first.
+
+ The second priority level (\ci{B}) has the semantics of the first level
+ plus it prevents URLs with lower second priority level from being
+ queried until the task with a higher second priority level has completed,
+ effectively conserving the resources for the higher priority task.
+
+ The third priority level (\ci{C}) has the semantics of the second level
+ plus it may interrupt one lower third priority level task in order to
+ perform the higher third priority task (the interrupt is necessary if the
+ desired machine is used by the lower priority task or the number of tasks
+ already being performed is the maximum allowed to be performed
+ concurrently; see \cb{--instance-max}).
+
+ Finally, the fourth priority level (\ci{D}) has the semantics of the third
+ level except that not one but all the lower fourth priority level tasks
+ are interrupting, effectively dedicating all the available resources to
+ the higher priority task. This level can also be combined with the boost
+ flag \ci{F}. If this flag is \cb{1} then the higher priority task's CPU
+ number (\cb{--cpu}) is boosted to the full number of available hardware
+ threads (or, to view it another way, the fourth priority level has 20
+ possible values, not 10, with the first 0-9 being without the boost while
+ the last 10-19 being with the boost). Note that this boosting semantics
+ may not be accurate if the agent is executed with CPU affinity. Also note
+ that there is no corresponding RAM boosting and it's possible that in some
+ configurations the amount of RAM will be insufficient for the boosted CPU
+ count.
+
+ Note that the priority levels are hierarchical in a sense that within a
+ given higher level URLs can be further prioritized using the lower
+ levels. As an example, consider a deployment with three controller URLs:
+ background package rebuilds (\cb{pkg.example.org}), user-initiated CI
+ (\cb{ci.example.org}), and user-initiated interactive CI
+ (\cb{ici.example.org}). Given the following priorities:
+
+ \
+ 0000=https://pkg.example.org
+ 0100=https://ci.example.org
+ 0101=https://ici.example.org
+ \
+
+ Both types of CI tasks will interrupt one background rebuild task if
+ necessary while the interactive CI tasks will be merely preferred over
+ non-interactive.
+
Note that on termination \cb{bbot-agent} may leave behind a machine lock
and working machine snapshot. It is expected that the caller (normally
Build OS monitor) cleans them up before restarting the agent.
@@ -61,10 +119,20 @@ namespace bbot
"<num>",
"Toolchain number, 1 by default. If agents are running for several
toolchains, then each of them should have a unique toolchain number
- between 1 and 99. This number is used as an offset for network ports,
+ between 1 and 9. This number is used as an offset for network ports,
interfaces, etc."
}
+ string --toolchain-lock // Note: string to allow empty path.
+ {
+ "<path>",
+ "Absolute path to the global toolchain lock file. If unspecified, then
+ \c{\b{/var/lock/bbot-agent-}\i{toolchain-name}\b{.lock}} is used by
+ default. If empty path is specified then no global locking is
+ performed. If one of the \cb{--fake-*} options is specified, then no
+ locking is performed by default."
+ }
+
standard_version --toolchain-ver
{
"<stdver>",
@@ -101,16 +169,44 @@ namespace bbot
network ports, interfaces, etc."
}
+ uint16_t --instance-max = 0
+ {
+ "<num>",
+ "Maximum number of instances that can perform tasks concurrently. If the
+ number of instances that have been started is greater than this number
+ (normally by just one), then when the maximum number of tasks is
+ already being performed, the extra instances operate in the \i{priority
+ monitor} mode: they only query controller URLs with priorities higher
+ than of the existing tasks and can only perform a task by interrupting
+ one of them. If the maximum number of instances is \cb{0} (default),
+ then it is assumed the number of instances started is the maximum
+ number, essentially disabling the priority monitor functionality."
+ }
+
size_t --cpu = 1
{
"<num>",
"Number of CPUs (threads) to use, 1 by default."
}
- size_t --ram (1024 * 1024) // 1G
+ size_t --build-ram (4 * 1024 * 1024) // 4GiB
{
"<num>",
- "Amount of RAM (in kB) to use, 1G by default."
+ "Amount of RAM (in KiB) to use for the build machine, 4GiB by default."
+ }
+
+ size_t --auxiliary-ram = 0
+ {
+ "<num>",
+ "Amount of RAM (in KiB) to use for auxiliary machines. To disable
+ running auxiliary machines, specify \cb{0}. If unspecified, then
+ currently the behavior is the same as specifying \cb{0} but this
+ may change in the future (for example, to support a more dynamic
+ allocation strategy)."
+
+ // Note: it's not going to be easy to set it to unspecified in
+ // bbot-agent@.service so we may have to invent some special value,
+ // like `auto`.
}
string --bridge = "br1"
@@ -155,21 +251,35 @@ namespace bbot
}
// Low 23401+, 23501+, 23601+, etc., all look good collision-wise with
- // with anything useful.
+ // anything useful.
//
uint16_t --tftp-port = 23400
{
"<num>",
"TFTP server port base, 23400 by default. The actual port is calculated
- by adding an offset calculated based on the toolchain and instance
- numbers."
+ by adding an offset calculated based on the toolchain, instance, and
+ machine numbers."
+ }
+
+ size_t --bootstrap-startup = 300
+ {
+ "<sec>",
+ "Maximum number of seconds to wait for build machine bootstrap startup,
+ 300 (5 minutes) by default."
}
size_t --bootstrap-timeout = 3600
{
"<sec>",
- "Maximum number of seconds to wait for machine bootstrap completion,
- 3600 (60 minutes) by default."
+ "Maximum number of seconds to wait for build machine bootstrap
+ completion, 3600 (60 minutes) by default."
+ }
+
+ size_t --bootstrap-auxiliary = 900
+ {
+ "<sec>",
+ "Maximum number of seconds to wait for auxiliary machine bootstrap
+ completion, 900 (15 minutes) by default."
}
size_t --bootstrap-retries = 2
@@ -179,6 +289,13 @@ namespace bbot
by default."
}
+ size_t --build-startup = 240
+ {
+ "<sec>",
+ "Maximum number of seconds to wait for build startup, 240 (4 minutes) by
+ default. This value is used for both build and auxiliary machines."
+ }
+
size_t --build-timeout = 5400
{
"<sec>",
diff --git a/bbot/agent/agent.cxx b/bbot/agent/agent.cxx
index 60f7271..0b57208 100644
--- a/bbot/agent/agent.cxx
+++ b/bbot/agent/agent.cxx
@@ -1,15 +1,16 @@
// file : bbot/agent/agent.cxx -*- C++ -*-
-// license : TBC; see accompanying LICENSE file
+// license : MIT; see accompanying LICENSE file
#include <bbot/agent/agent.hxx>
#include <pwd.h> // getpwuid()
#include <limits.h> // PATH_MAX
-#include <signal.h> // signal()
-#include <stdlib.h> // rand_r()
-#include <unistd.h> // sleep(), getuid(), fsync(), [f]stat()
+#include <signal.h> // signal(), kill()
+#include <stdlib.h> // rand_r(), strto[u]ll()
+#include <string.h> // strchr()
+#include <unistd.h> // sleep(), getpid(), getuid(), fsync(), [f]stat()
#include <ifaddrs.h> // getifaddrs(), freeifaddrs()
-#include <sys/types.h> // stat
+#include <sys/types.h> // stat, pid_t
#include <sys/stat.h> // [f]stat()
#include <sys/file.h> // flock()
@@ -19,15 +20,22 @@
#include <sys/ioctl.h>
#include <sys/socket.h>
+#include <map>
+#include <atomic>
#include <chrono>
+#include <thread> // thread::hardware_concurrency()
#include <random>
+#include <iomanip> // setw()
+#include <numeric> // iota()
#include <iostream>
#include <system_error> // generic_category()
-#include <libbutl/pager.mxx>
-#include <libbutl/sha256.mxx>
-#include <libbutl/openssl.mxx>
-#include <libbutl/filesystem.mxx> // dir_iterator, try_rmfile(), readsymlink()
+#include <libbutl/pager.hxx>
+#include <libbutl/base64.hxx>
+#include <libbutl/sha256.hxx>
+#include <libbutl/openssl.hxx>
+#include <libbutl/filesystem.hxx> // dir_iterator, try_rmfile(), readsymlink()
+#include <libbutl/semantic-version.hxx>
#include <libbbot/manifest.hxx>
@@ -40,6 +48,7 @@
#include <bbot/agent/tftp.hxx>
#include <bbot/agent/machine.hxx>
+#include <bbot/agent/http-service.hxx>
using namespace butl;
using namespace bbot;
@@ -47,6 +56,57 @@ using namespace bbot;
using std::cout;
using std::endl;
+// If RAM minimum is not specified for a machine, then let's assume something
+// plausible like 256MiB. This way we won't end up with degenerate cases where
+// we attempt to start a machine with some absurd amount of RAM.
+//
+const std::uint64_t default_ram_minimum = 262144;
+
+static inline std::uint64_t
+effective_ram_minimum (const machine_header_manifest& m)
+{
+ // Note: neither ram_minimum nor ram_maximum should be 0.
+ //
+ assert ((!m.ram_minimum || *m.ram_minimum != 0) &&
+ (!m.ram_maximum || *m.ram_maximum != 0));
+
+ return (m.ram_minimum
+ ? *m.ram_minimum
+ : (m.ram_maximum && *m.ram_maximum < default_ram_minimum
+ ? *m.ram_maximum
+ : default_ram_minimum));
+}
+
+static std::mt19937 rand_gen (std::random_device {} ());
+
+// According to the standard, atomic's use in the signal handler is only safe
+// if it's lock-free.
+//
+#if !defined(ATOMIC_INT_LOCK_FREE) || ATOMIC_INT_LOCK_FREE != 2
+#error int is not lock-free on this architecture
+#endif
+
+// While we can use memory_order_relaxed in a single-threaded program, let's
+// use consume/release in case this process becomes multi-threaded in the
+// future.
+//
+static std::atomic<unsigned int> sigurs1;
+
+using std::memory_order_consume;
+using std::memory_order_release;
+
+extern "C" void
+handle_signal (int sig)
+{
+ switch (sig)
+ {
+ case SIGHUP: exit (3); // Unimplemented feature.
+ case SIGTERM: exit (0);
+ case SIGUSR1: sigurs1.fetch_add (1, std::memory_order_release); break;
+ default: assert (false);
+ }
+}
+
namespace bbot
{
agent_options ops;
@@ -55,10 +115,12 @@ namespace bbot
string tc_name;
uint16_t tc_num;
+ path tc_lock; // Empty if no locking.
standard_version tc_ver;
string tc_id;
- uint16_t inst;
+ uint16_t inst; // 1-based.
+ uint16_t inst_max; // 0 if priority monitoring is disabled.
uint16_t offset;
@@ -112,16 +174,16 @@ btrfs_exit (tracer& t, A&&... a)
"btrfs", forward<A> (a)...);
}
-// Bootstrap the machine. Return the bootstrapped machine manifest if
-// successful and nullopt otherwise (in which case the machine directory
-// should be cleaned and the machine ignored for now).
+// Bootstrap a build machine. Return the bootstrapped machine manifest if
+// successful and nullopt otherwise (in which case the caller should clean up
+// the machine directory and ignore the machine for now).
//
static optional<bootstrapped_machine_manifest>
-bootstrap_machine (const dir_path& md,
- const machine_manifest& mm,
- optional<bootstrapped_machine_manifest> obmm)
+bootstrap_build_machine (const dir_path& md,
+ const machine_manifest& mm,
+ optional<bootstrapped_machine_manifest> obmm)
{
- tracer trace ("bootstrap_machine", md.string ().c_str ());
+ tracer trace ("bootstrap_build_machine", md.string ().c_str ());
bootstrapped_machine_manifest r {
mm,
@@ -143,10 +205,12 @@ bootstrap_machine (const dir_path& md,
else
try
{
+ // Note: similar code in bootstrap_auxiliary_machine().
+
// Start the TFTP server (server chroot is --tftp). Map:
//
- // GET requests to .../toolchains/<name>/*
- // PUT requests to .../bootstrap/<name>-<instance>/*
+ // GET requests to .../toolchains/<toolchain>/*
+ // PUT requests to .../bootstrap/<toolchain>-<instance>/*
//
const string in_name (tc_name + '-' + to_string (inst));
auto_rmdir arm ((dir_path (ops.tftp ()) /= "bootstrap") /= in_name);
@@ -170,7 +234,7 @@ bootstrap_machine (const dir_path& md,
{
tftp_server tftpd ("Gr ^/?(.+)$ /toolchains/" + tc_name + "/\\1\n" +
"Pr ^/?(.+)$ /bootstrap/" + in_name + "/\\1\n",
- ops.tftp_port () + offset);
+ ops.tftp_port () + offset + 0 /* build machine */);
l3 ([&]{trace << "tftp server on port " << tftpd.port ();});
@@ -179,6 +243,9 @@ bootstrap_machine (const dir_path& md,
unique_ptr<machine> m (
start_machine (md,
mm,
+ 0 /* machine_num (build) */,
+ ops.cpu (),
+ ops.build_ram (),
obmm ? obmm->machine.mac : nullopt,
ops.bridge (),
tftpd.port (),
@@ -193,8 +260,11 @@ bootstrap_machine (const dir_path& md,
make_exception_guard (
[&m, &md] ()
{
- info << "trying to force machine " << md << " down";
- try {m->forcedown (false);} catch (const failed&) {}
+ if (m != nullptr)
+ {
+ info << "trying to force machine " << md << " down";
+ try {m->forcedown (false);} catch (const failed&) {}
+ }
}));
// What happens if the bootstrap process hangs? The simple thing would
@@ -218,6 +288,8 @@ bootstrap_machine (const dir_path& md,
m->wait (false);
m->cleanup ();
info << "resuming after machine suspension";
+
+ // Note: snapshot cleaned up by the caller.
}
catch (const failed&) {}
@@ -250,13 +322,14 @@ bootstrap_machine (const dir_path& md,
};
// The first request should be the toolchain download. Wait for up to
- // 5 minutes for that to arrive. In a sense we use it as an indication
- // that the machine has booted and the bootstrap process has started.
- // Why wait so long you may wonder? Well, we may be using a new MAC
- // address and operating systems like Windows may need to digest that.
+ // 5 minutes (by default) for that to arrive. In a sense we use it as
+ // an indication that the machine has booted and the bootstrap process
+ // has started. Why wait so long you may wonder? Well, we may be using
+ // a new MAC address and operating systems like Windows may need to
+ // digest that.
//
size_t to;
- const size_t startup_to (5 * 60);
+ const size_t startup_to (ops.bootstrap_startup ());
const size_t bootstrap_to (ops.bootstrap_timeout ());
const size_t shutdown_to (5 * 60);
@@ -268,7 +341,9 @@ bootstrap_machine (const dir_path& md,
break;
if (!check_machine ())
- return nullopt;
+ {
+ return nullopt; // Note: snapshot cleaned up by the caller.
+ }
}
// This can mean two things: machine mis-configuration or what we
@@ -289,6 +364,7 @@ bootstrap_machine (const dir_path& md,
m->print_info (dr);
try {m->forcedown (false);} catch (const failed&) {}
+ m = nullptr; // Disable exceptions guard above.
continue;
}
@@ -311,7 +387,9 @@ bootstrap_machine (const dir_path& md,
// The exit/upload is racy so we re-check.
//
if (!(file_not_empty (mf) || file_not_empty (mfo)))
- return nullopt;
+ {
+ return nullopt; // Note: snapshot cleaned up by the caller.
+ }
}
bool old (false);
@@ -361,7 +439,329 @@ bootstrap_machine (const dir_path& md,
return r;
}
-// Machine locking.
+// Bootstrap an auxiliary machine. Return the bootstrapped machine manifest if
+// successful and nullopt otherwise (in which case the caller should clean up
+// the machine directory and ignore the machine for now).
+//
+static vector<size_t>
+divide_auxiliary_ram (const vector<const machine_header_manifest*>&);
+
+static optional<bootstrapped_machine_manifest>
+bootstrap_auxiliary_machine (const dir_path& md,
+ const machine_manifest& mm,
+ optional<bootstrapped_machine_manifest> obmm)
+{
+ tracer trace ("bootstrap_auxiliary_machine", md.string ().c_str ());
+
+ bootstrapped_machine_manifest r {
+ mm,
+ toolchain_manifest {}, // Unused for auxiliary,
+ bootstrap_manifest {} // Unused for auxiliary.
+ };
+
+ if (ops.fake_bootstrap ())
+ {
+ r.machine.mac = "de:ad:be:ef:de:ad";
+ }
+ else
+ try
+ {
+ // Similar to bootstrap_build_machine() except here we just wait for the
+ // upload of the environment.
+
+ // Start the TFTP server (server chroot is --tftp). Map:
+ //
+ // GET requests to /dev/null
+ // PUT requests to .../bootstrap/<toolchain>-<instance>/*
+ //
+ const string in_name (tc_name + '-' + to_string (inst));
+ auto_rmdir arm ((dir_path (ops.tftp ()) /= "bootstrap") /= in_name);
+ try_mkdir_p (arm.path);
+
+ // Environment upload.
+ //
+ path ef (arm.path / "environment");
+ try_rmfile (ef);
+
+ // Note that unlike build, here we use the same VM snapshot for retries,
+ // which is not ideal.
+ //
+ for (size_t retry (0);; ++retry)
+ {
+ tftp_server tftpd ("Gr ^/?(.+)$ " + string ("/dev/null") + '\n' +
+ "Pr ^/?(.+)$ /bootstrap/" + in_name + "/\\1\n",
+ ops.tftp_port () + offset + 1 /* auxiliary machine */);
+
+ l3 ([&]{trace << "tftp server on port " << tftpd.port ();});
+
+ // If the machine specified RAM minimum, use that to make sure the
+ // machine can actually function with this amount of RAM. Otherwise, use
+ // the minium of RAM maximum (if specified) and the available auxiliary
+ // RAM (so we know this machine will at least work alone). For the
+ // latter case use divide_auxiliary_ram() to be consistent with the
+ // build case (see that function implementation for nuances).
+ //
+ size_t ram;
+ if (mm.ram_minimum)
+ ram = *mm.ram_minimum;
+ else
+ {
+ vector<size_t> rams (divide_auxiliary_ram ({&mm}));
+ assert (!rams.empty ()); // We should have skipped such a machine.
+ ram = rams.front ();
+ }
+
+ // Start the machine.
+ //
+ unique_ptr<machine> m (
+ start_machine (md,
+ mm,
+ 1 /* machine_num (first auxiliary) */,
+ ops.cpu (),
+ ram,
+ obmm ? obmm->machine.mac : nullopt,
+ ops.bridge (),
+ tftpd.port (),
+ false /* pub_vnc */));
+
+ {
+ // NOTE: see bootstrap_build_machine() for comments.
+
+ auto mg (
+ make_exception_guard (
+ [&m, &md] ()
+ {
+ if (m != nullptr)
+ {
+ info << "trying to force machine " << md << " down";
+ try {m->forcedown (false);} catch (const failed&) {}
+ }
+ }));
+
+ auto soft_fail = [&md, &m] (const char* msg)
+ {
+ {
+ diag_record dr (error);
+ dr << msg << " for machine " << md << ", suspending";
+ m->print_info (dr);
+ }
+
+ try
+ {
+ m->suspend (false);
+ m->wait (false);
+ m->cleanup ();
+ info << "resuming after machine suspension";
+
+ // Note: snapshot cleaned up by the caller.
+ }
+ catch (const failed&) {}
+
+ return nullopt;
+ };
+
+ auto check_machine = [&md, &m] ()
+ {
+ try
+ {
+ size_t t (0);
+ if (!m->wait (t /* seconds */, false /* fail_hard */))
+ return true; // Still running.
+
+ // Exited successfully.
+ }
+ catch (const failed&)
+ {
+ // Failed, exit code diagnostics has already been issued.
+ }
+
+ diag_record dr (error);
+ dr << "machine " << md << " exited unexpectedly";
+ m->print_info (dr);
+
+ return false;
+ };
+
+ // Wait up to the specified timeout for the auxiliary machine to
+ // bootstrap. Note that such a machine may do extra setup work on the
+ // first boot (such as install some packages, etc) which may take some
+ // time.
+ //
+ size_t to;
+ const size_t bootstrap_to (ops.bootstrap_auxiliary ());
+ const size_t shutdown_to (5 * 60);
+
+ // Serve TFTP requests while periodically checking for the environment
+ // file.
+ //
+ for (to = bootstrap_to; to != 0; )
+ {
+ if (tftpd.serve (to, 2))
+ continue;
+
+ if (!check_machine ())
+ {
+ if (!file_not_empty (ef))
+ {
+ return nullopt; // Note: snapshot cleaned up by the caller.
+ }
+ }
+
+ if (file_not_empty (ef))
+ {
+ if (!tftpd.serve (to, 5))
+ break;
+ }
+ }
+
+ if (to == 0)
+ {
+ if (retry > ops.bootstrap_retries ())
+ return soft_fail ("bootstrap timeout");
+
+ // Note: keeping the logs behind (no cleanup).
+
+ diag_record dr (warn);
+ dr << "machine " << mm.name << " mis-booted, retrying";
+ m->print_info (dr);
+
+ try {m->forcedown (false);} catch (const failed&) {}
+ m = nullptr; // Disable exceptions guard above.
+ continue;
+ }
+
+ l3 ([&]{trace << "completed bootstrap in " << bootstrap_to - to << "s";});
+
+ // Shut the machine down cleanly.
+ //
+ if (!m->shutdown ((to = shutdown_to)))
+ return soft_fail ("bootstrap shutdown timeout");
+
+ l3 ([&]{trace << "completed shutdown in " << shutdown_to - to << "s";});
+
+ m->cleanup ();
+ }
+
+ r.machine.mac = m->mac; // Save the MAC address.
+
+ break;
+ }
+ }
+ catch (const system_error& e)
+ {
+ fail << "bootstrap error: " << e;
+ }
+
+ serialize_manifest (r, md / "manifest", "bootstrapped machine");
+ return r;
+}
+
+// Global toolchain lock.
+//
+// The overall locking protocol is as follows:
+//
+// 1. Before enumerating the machines each agent instance acquires the global
+// toolchain lock.
+//
+// 2. As the agent enumerates over the machines, it tries to acquire the lock
+// for each machine.
+//
+// 3. If the agent encounters a machine that it needs to bootstrap, it
+// releases all the other machine locks followed by the global lock,
+// proceeds to bootstrap the machine, releases its lock, and restarts the
+// process from scratch.
+//
+// 4. Otherwise, upon receiving a task response for one of the machines (plus,
+// potentially, a number of auxiliary machines), the agent releases all the
+// other machine locks followed by the global lock, proceeds to perform the
+// task on the selected machine(s), releases their locks, and restarts the
+// process from scratch.
+//
+// One notable implication of this protocol is that the machine locks are
+// only acquired while holding the global toolchain lock but can be released
+// while not holding this lock.
+//
+// (Note that because of this implication it can theoretically be possible
+// to omit acquiring all the machine locks during the enumeration process,
+// instead only acquiring the lock of the machine we need to bootstrap or
+// build. However, the current approach is simpler since we still need
+// to detect machines that are already locked, which entails acquiring
+// the lock anyway.)
+//
+// Note that unlike the machine lock below, here we don't bother with removing
+// the lock file.
+//
+class toolchain_lock
+{
+public:
+ toolchain_lock () = default; // Empty lock.
+
+ // Note: returns true if locking is disabled.
+ //
+ bool
+ locked () const
+ {
+ return tc_lock.empty () || fl_;
+ }
+
+ void
+ unlock (bool ignore_errors = false)
+ {
+ if (fl_)
+ {
+ fl_ = false; // We have tried.
+
+ if (flock (fd_.get (), LOCK_UN) != 0 && !ignore_errors)
+ throw_generic_error (errno);
+ }
+ }
+
+ ~toolchain_lock ()
+ {
+ unlock (true /* ignore_errors */);
+ }
+
+ toolchain_lock (toolchain_lock&&) = default;
+ toolchain_lock& operator= (toolchain_lock&&) = default;
+
+ toolchain_lock (const toolchain_lock&) = delete;
+ toolchain_lock& operator= (const toolchain_lock&) = delete;
+
+ // Implementation details.
+ //
+public:
+ explicit
+ toolchain_lock (auto_fd&& fd)
+ : fd_ (move (fd)), fl_ (true) {}
+
+private:
+ auto_fd fd_;
+ bool fl_ = false;
+};
+
+// Note: returns empty lock if toolchain locking is disabled.
+//
+static optional<toolchain_lock>
+lock_toolchain (unsigned int timeout)
+{
+ if (tc_lock.empty ())
+ return toolchain_lock ();
+
+ auto_fd fd (fdopen (tc_lock, fdopen_mode::out | fdopen_mode::create));
+
+ for (; flock (fd.get (), LOCK_EX | LOCK_NB) != 0; sleep (1), --timeout)
+ {
+ if (errno != EWOULDBLOCK)
+ throw_generic_error (errno);
+
+ if (timeout == 0)
+ return nullopt;
+ }
+
+ return toolchain_lock (move (fd));
+}
+
+// Per-toolchain machine lock.
//
// We use flock(2) which is straightforward. The tricky part is cleaning the
// file up. Here we may have a race when two processes are trying to open &
@@ -374,14 +774,29 @@ bootstrap_machine (const dir_path& md,
// guaranteed to be atomic (in case later we want to support exclusive
// bootstrap and shared build).
//
+// Note also that we per-toolchain lock auxiliary machines even though they
+// are not toolchain-specific. Doing it this way allows us to handle both
+// types of machines consistently with regards to priorities, interrupts, etc.
+// It also means we will have each auxiliary machine available per-toolchain
+// rather than a single machine shared between all the toolchains, which is
+// a good thing.
+//
class machine_lock
{
public:
- machine_lock () = default; // Empty lock.
+ // A lock is either locked by this process or it contains information about
+ // the process holding the lock.
+ //
+ pid_t pid; // Process using the machine.
+ optional<uint64_t> prio; // Task priority (absent means being bootstrapped
+ // or have been suspended).
- ~machine_lock ()
+ machine_lock () = default; // Uninitialized lock.
+
+ bool
+ locked () const
{
- unlock (true /* ignore_errors */);
+ return fl_;
}
void
@@ -391,13 +806,69 @@ public:
{
fl_ = false; // We have tried.
- try_rmfile (fp_, ignore_errors);
+ if (fd_ != nullfd)
+ {
+ try_rmfile (fp_, ignore_errors);
- if (flock (fd_.get (), LOCK_UN) != 0 && !ignore_errors)
- throw_generic_error (errno);
+ if (flock (fd_.get (), LOCK_UN) != 0 && !ignore_errors)
+ throw_generic_error (errno);
+ }
+ }
+ }
+
+ // Write the holding process information to the lock file.
+ //
+ // Must be called while holding the toolchain lock (see the lock_machine()
+ // implementation for rationale).
+ //
+ void
+ bootstrap (const toolchain_lock& tl)
+ {
+ assert (tl.locked () && fl_);
+
+ if (fd_ != nullfd)
+ write (nullopt);
+ }
+
+ void
+ perform_task (const toolchain_lock& tl, uint64_t prio)
+ {
+ assert (tl.locked () && fl_);
+
+ if (fd_ != nullfd)
+ write (prio);
+ }
+
+ // Truncate the holding process information after the call to perform_task()
+ // so that it doesn't contain the priority, marking the machine as being
+ // suspended.
+ //
+ // Note that this one can be called without holding the toolchain lock.
+ //
+ void
+ suspend_task ()
+ {
+ assert (fl_);
+
+ if (fd_ != nullfd)
+ {
+ assert (tp_ != 0); // Must be called after perform_task().
+
+ // While there is no direct statement to this effect in POSIX, the
+ // consensus on the internet is that truncation is atomic, in a sense
+ // that the reader shouldn't see a partially truncated content. Feels
+ // like should be doubly so when actually truncating as opposed to
+ // extending the size, which is what we do.
+ //
+ fdtruncate (fd_.get (), tp_);
}
}
+ ~machine_lock ()
+ {
+ unlock (true /* ignore_errors */);
+ }
+
machine_lock (machine_lock&&) = default;
machine_lock& operator= (machine_lock&&) = default;
@@ -407,30 +878,119 @@ public:
// Implementation details.
//
public:
+ // If fd is nullfd, treat it as a fake lock (used for fake machines).
+ //
machine_lock (path&& fp, auto_fd&& fd)
: fp_ (move (fp)), fd_ (move (fd)), fl_ (true) {}
+ machine_lock (pid_t pi, optional<uint64_t> pr)
+ : pid (pi), prio (pr), fl_ (false) {}
+
private:
- path fp_;
- auto_fd fd_;
- bool fl_ = false;
+ void
+ write (optional<uint64_t> prio)
+ {
+ pid_t pid (getpid ());
+
+ string l (to_string (pid));
+
+ if (prio)
+ {
+ tp_ = l.size (); // Truncate position.
+
+ l += ' ';
+ l += to_string (*prio);
+ }
+
+ auto n (fdwrite (fd_.get (), l.c_str (), l.size ()));
+
+ if (n == -1)
+ throw_generic_ios_failure (errno);
+
+ if (static_cast<size_t> (n) != l.size ())
+ throw_generic_ios_failure (EFBIG);
+ }
+
+private:
+ path fp_;
+ auto_fd fd_;
+ bool fl_ = false;
+ uint64_t tp_ = 0; // Truncate position.
};
-// Try to lock the machine given its -<toolchain> directory.
+// Try to lock the machine given its -<toolchain> directory. Return unlocked
+// lock with pid/prio if already in use. Must be called while holding the
+// toolchain lock.
//
-static optional<machine_lock>
-lock_machine (const dir_path& tp)
+static machine_lock
+lock_machine (const toolchain_lock& tl, const dir_path& tp)
{
+ assert (tl.locked ());
+
path fp (tp + ".lock"); // The -<toolchain>.lock file.
for (;;)
{
- auto_fd fd (fdopen (fp, fdopen_mode::out | fdopen_mode::create));
+ auto_fd fd (fdopen (fp, (fdopen_mode::in |
+ fdopen_mode::out |
+ fdopen_mode::create)));
if (flock (fd.get (), LOCK_EX | LOCK_NB) != 0)
{
if (errno == EWOULDBLOCK)
- return nullopt;
+ {
+ // The file should contain a line in the following format:
+ //
+ // <pid>[ <prio>]
+ //
+ char buf[64]; // Sufficient for 2 64-bit numbers (20 decimals max).
+
+ auto sn (fdread (fd.get (), buf, sizeof (buf)));
+
+ if (sn == -1)
+ throw_generic_ios_failure (errno);
+
+ size_t n (static_cast<size_t> (sn));
+
+ // While there would be a race between locking the file then writing
+ // to it in one process and reading from it in another process, we are
+ // protected by the global toolchain lock, which must be held by both
+ // sides during this dance.
+ //
+ assert (n > 0 && n < sizeof (buf));
+ buf[n] = '\0';
+
+ // Note also that it's possible that by the time we read the pid/prio
+ // the lock has already been released. But this case is no different
+ // from the lock being released after we have read pid/prio but before
+ // acting on this information (e.g., trying to interrupt the other
+ // process), which we have to deal with anyway.
+ //
+ pid_t pid;
+ optional<uint64_t> prio;
+ {
+ char* p (strchr (buf, ' '));
+ char* e;
+
+ {
+ errno = 0;
+ pid = strtoll (buf, &e, 10); // Note: pid_t is signed.
+ assert (errno != ERANGE &&
+ e != buf &&
+ (p != nullptr ? e == p : *e == '\0'));
+ }
+
+ if (p != nullptr)
+ {
+ ++p;
+ errno = 0;
+ prio = strtoull (p, &e, 10);
+ assert (errno != ERANGE && e != p && *e == '\0');
+ }
+ }
+
+ return machine_lock (pid, prio);
+ }
throw_generic_error (errno);
}
@@ -444,7 +1004,7 @@ lock_machine (const dir_path& tp)
if (st1.st_ino == st2.st_ino)
return machine_lock (move (fp), move (fd));
- // Note: unlocked by close().
+ // Retry (note: lock is unlocked by auto_fd::close()).
}
}
@@ -461,375 +1021,1161 @@ snapshot_path (const dir_path& tp)
to_string (inst));
}
-// Return available machines, (re-)bootstrapping them if necessary.
+// Compare bbot and library versions returning -1 if older, 0 if the same,
+// and +1 if newer.
+//
+static int
+compare_bbot (const bootstrap_manifest& m)
+{
+ auto cmp = [&m] (const string& n, const char* v) -> int
+ {
+ standard_version sv (v);
+ auto i = m.versions.find (n);
+
+ return (i == m.versions.end () || i->second < sv
+ ? -1
+ : i->second > sv ? 1 : 0);
+ };
+
+ // Start from the top assuming a new dependency cannot be added without
+ // changing the dependent's version.
+ //
+ int r;
+ return (
+ (r = cmp ("bbot", BBOT_VERSION_STR)) != 0 ? r :
+ (r = cmp ("libbbot", LIBBBOT_VERSION_STR)) != 0 ? r :
+ (r = cmp ("libbpkg", LIBBPKG_VERSION_STR)) != 0 ? r :
+ (r = cmp ("libbutl", LIBBUTL_VERSION_STR)) != 0 ? r : 0);
+};
+
+// Return the global toolchain lock and the list of available machines,
+// (re-)bootstrapping them if necessary.
+//
+// Note that this function returns both machines that this process managed to
+// lock as well as the machines locked by other processes (including those
+// that are being bootstrapped or that have been suspended), in case the
+// caller needs to interrupt one of them for a higher-priority task. In the
+// latter case, the manifest is empty if the machine is bootstrapping or
+// suspended and only has the machine_manifest information otherwise. (The
+// bootstrapping/suspended machines have to be returned to get the correct
+// count of currently active instances for the inst_max comparison.)
+//
+// Note that both build and auxiliary machines are returned. For auxiliary,
+// toolchain and bootstrap manifests are unused and therefore always empty.
//
struct bootstrapped_machine
{
- dir_path path;
- bootstrapped_machine_manifest manifest;
machine_lock lock;
+ const dir_path path;
+ bootstrapped_machine_manifest manifest;
};
using bootstrapped_machines = vector<bootstrapped_machine>;
-static bootstrapped_machines
+static pair<toolchain_lock, bootstrapped_machines>
enumerate_machines (const dir_path& machines)
-try
{
tracer trace ("enumerate_machines", machines.string ().c_str ());
- bootstrapped_machines r;
+ size_t dir_iter_retries (0); // Directory iteration retry count (see below).
- if (ops.fake_machine_specified ())
+ for (;;) // From-scratch retry loop for after bootstrap (see below).
{
- auto mh (
- parse_manifest<machine_header_manifest> (
- ops.fake_machine (), "machine header"));
-
- r.push_back (
- bootstrapped_machine {
- dir_path (ops.machines ()) /= mh.name, // For diagnostics.
- bootstrapped_machine_manifest {
- machine_manifest {
- move (mh.id),
- move (mh.name),
- move (mh.summary),
- machine_type::kvm,
- string ("de:ad:be:ef:de:ad"),
- nullopt,
- strings ()},
- toolchain_manifest {tc_id},
- bootstrap_manifest {}},
- machine_lock ()});
-
- return r;
- }
+ pair<toolchain_lock, bootstrapped_machines> pr;
- // Notice and warn if there are no machines (as opposed to all of them being
- // locked).
- //
- bool none (true);
+ {
+ optional<toolchain_lock> l;
+ while (!(l = lock_toolchain (60 /* seconds */)))
+ {
+ // One typical situation where this can happen is when another agent
+ // takes a while to request a task (e.g., due to network issues). So
+ // this is an info as opposed to a warning.
+ //
+ info << "unable to acquire global toolchain lock " << tc_lock
+ << " for 60s";
+ }
+ pr.first = move (*l);
+ }
- // The first level are machine volumes.
- //
- for (const dir_entry& ve: dir_iterator (machines,
- false /* ignore_dangling */))
- {
- const string vn (ve.path ().string ());
+ toolchain_lock& tl (pr.first);
+ bootstrapped_machines& r (pr.second);
- // Ignore hidden directories.
- //
- if (ve.type () != entry_type::directory || vn[0] == '.')
- continue;
+ if (ops.fake_machine_specified ())
+ {
+ auto mh (
+ parse_manifest<machine_header_manifest> (
+ ops.fake_machine (), "machine header"));
+
+ r.push_back (
+ bootstrapped_machine {
+ machine_lock (path (), nullfd), // Fake lock.
+ dir_path (ops.machines ()) /= mh.name, // For diagnostics.
+ bootstrapped_machine_manifest {
+ machine_manifest {
+ move (mh.id),
+ move (mh.name),
+ move (mh.summary),
+ machine_type::kvm,
+ string ("de:ad:be:ef:de:ad"),
+ nullopt,
+ strings ()},
+ toolchain_manifest {tc_id},
+ bootstrap_manifest {}}});
+
+ return pr;
+ }
- const dir_path vd (dir_path (machines) /= vn);
+ // Notice and warn if there are no build machines (as opposed to all of
+ // them being busy).
+ //
+ bool none (true);
+
+ // We used to (re)-bootstrap machines as we are iterating. But with the
+ // introduction of the priority monitoring functionality we need to
+ // respect the --instance-max value. Which means we first need to try to
+ // lock all the machines in order to determine how many of them are busy
+ // then check this count against --instance-max, and only bootstrap if we
+ // are not over the limit. Which means we have to store all the
+ // information about a (first) machine that needs bootstrapping until
+ // after we have enumerated all of them.
+ //
+ struct pending_bootstrap
+ {
+ machine_lock ml;
+ dir_path tp; // -<toolchain>
+ dir_path xp; // -<toolchain>-<xxx>
+ machine_manifest mm;
+ optional<bootstrapped_machine_manifest> bmm;
+ };
+ optional<pending_bootstrap> pboot;
- // Inside we have machines.
+ // The first level are machine volumes.
//
try
{
- for (const dir_entry& me: dir_iterator (vd, false /* ignore_dangling */))
+ bool dir_iter_retry (false);
+ for (const dir_entry& ve:
+ dir_iterator (machines, dir_iterator::no_follow))
{
- const string mn (me.path ().string ());
+ const string vn (ve.path ().string ());
- if (me.type () != entry_type::directory || mn[0] == '.')
+ // Ignore hidden directories.
+ //
+ if (ve.type () != entry_type::directory || vn[0] == '.')
continue;
- const dir_path md (dir_path (vd) /= mn);
+ const dir_path vd (dir_path (machines) /= vn);
- // Our endgoal here is to obtain a bootstrapped snapshot of this
- // machine while watching out for potential race conditions (other
- // instances as well as machines being added/upgraded/removed; see the
- // manual for details).
- //
- // So here is our overall plan:
- //
- // 1. Resolve current subvolume link for our bootstrap protocol.
- //
- // 2. Lock the machine. This excludes any other instance from trying
- // to perform the following steps.
- //
- // 3. If there is no link, cleanup old bootstrap (if any) and ignore
- // this machine.
- //
- // 4. Try to create a snapshot of current subvolume (this operation is
- // atomic). If failed (e.g., someone changed the link and removed
- // the subvolume in the meantime), retry from #1.
- //
- // 5. Compare the snapshot to the already bootstrapped version (if
- // any) and see if we need to re-bootstrap. If so, use the snapshot
- // as a starting point. Rename to bootstrapped at the end (atomic).
+ // Inside we have machines.
//
- dir_path lp (dir_path (md) /= (mn + '-' + bs_prot)); // -<P>
- dir_path tp (dir_path (md) /= (mn + '-' + tc_name)); // -<toolchain>
-
- auto delete_bootstrapped = [&tp, &trace] () // Delete -<toolchain>.
- {
- run_btrfs (trace, "property", "set", "-ts", tp, "ro", "false");
- run_btrfs (trace, "subvolume", "delete", tp);
- };
-
- for (size_t retry (0);; ++retry)
+ try
{
- if (retry != 0)
- sleep (1);
+ for (const dir_entry& me: dir_iterator (vd, dir_iterator::no_follow))
+ {
+ const string mn (me.path ().string ());
- // Resolve the link to subvolume path.
- //
- dir_path sp; // <name>-<P>.<R>
+ if (me.type () != entry_type::directory || mn[0] == '.')
+ continue;
- try
- {
- sp = path_cast<dir_path> (readsymlink (lp));
+ const dir_path md (dir_path (vd) /= mn);
- if (sp.relative ())
- sp = md / sp;
- }
- catch (const system_error& e)
- {
- // Leave the subvolume path empty if the subvolume link doesn't
- // exist and fail on any other error.
+ // Our endgoal here is to obtain a bootstrapped snapshot of this
+ // machine while watching out for potential race conditions (other
+ // instances as well as machines being added/upgraded/removed; see
+ // the manual for details).
//
- if (e.code ().category () != std::generic_category () ||
- e.code ().value () != ENOENT)
- fail << "unable to read subvolume link " << lp << ": " << e;
- }
+ // So here is our overall plan:
+ //
+ // 1. Resolve current subvolume link for our bootstrap protocol.
+ //
+ // 2. Lock the machine. This excludes any other instance from
+ // trying to perform the following steps.
+ //
+ // 3. If there is no link, cleanup old bootstrap (if any) and
+ // ignore this machine.
+ //
+ // 4. Try to create a snapshot of current subvolume (this
+ // operation is atomic). If failed (e.g., someone changed the
+ // link and removed the subvolume in the meantime), retry from
+ // #1.
+ //
+ // 5. Compare the snapshot to the already bootstrapped version (if
+ // any) and see if we need to re-bootstrap. If so, use the
+ // snapshot as a starting point. Rename to bootstrapped at the
+ // end (atomic).
+ //
+ dir_path lp (dir_path (md) /= (mn + '-' + bs_prot)); // -<P>
+ dir_path tp (dir_path (md) /= (mn + '-' + tc_name)); // -<toolchain>
- none = none && sp.empty ();
+ auto delete_bootstrapped = [&tp, &trace] () // Delete -<toolchain>.
+ {
+ run_btrfs (trace, "property", "set", "-ts", tp, "ro", "false");
+ run_btrfs (trace, "subvolume", "delete", tp);
+ };
- // Try to lock the machine, skipping it if already locked.
- //
- optional<machine_lock> ml (lock_machine (tp));
+ for (size_t retry (0);; ++retry)
+ {
+ if (retry != 0)
+ sleep (1);
- if (!ml)
- {
- l4 ([&]{trace << "skipping " << md << ": locked";});
- break;
- }
+ // Resolve the link to subvolume path.
+ //
+ dir_path sp; // <name>-<P>.<R>
- bool te (dir_exists (tp));
+ try
+ {
+ sp = path_cast<dir_path> (readsymlink (lp));
- // If the resolution fails, then this means there is no current
- // machine subvolume (for this bootstrap protocol). In this case we
- // clean up our toolchain subvolume (-<toolchain>, if any) and
- // ignore this machine.
- //
- if (sp.empty ())
- {
- if (te)
- delete_bootstrapped ();
+ if (sp.relative ())
+ sp = md / sp;
+ }
+ catch (const system_error& e)
+ {
+ // Leave the subvolume path empty if the subvolume link
+ // doesn't exist and fail on any other error.
+ //
+ if (e.code ().category () != std::generic_category () ||
+ e.code ().value () != ENOENT)
+ fail << "unable to read subvolume link " << lp << ": " << e;
+ }
- l3 ([&]{trace << "skipping " << md << ": no subvolume link";});
- break;
- }
+ // Try to lock the machine.
+ //
+ machine_lock ml (lock_machine (tl, tp));
- // <name>-<toolchain>-<xxx>
- //
- const dir_path xp (snapshot_path (tp));
+ if (!ml.locked ())
+ {
+ machine_manifest mm;
+ if (ml.prio)
+ {
+ // Get the machine manifest (subset of the steps performed
+ // for the locked case below).
+ //
+ // Note that it's possible the machine we get is not what
+ // was originally locked by the other process (e.g., it has
+ // been upgraded since). It's also possible that if and when
+ // we interrupt and lock this machine, it will be a
+ // different machine (e.g., it has been upgraded since we
+ // read this machine manifest). To deal with all of that we
+ // will be reloading this information if/when we acquire the
+ // lock to this machine.
+ //
+ if (sp.empty ())
+ {
+ l3 ([&]{trace << "skipping " << md << ": no subvolume link";});
+ break;
+ }
+
+ l3 ([&]{trace << "keeping " << md << ": locked by " << ml.pid
+ << " with priority " << *ml.prio;});
+
+ mm = parse_manifest<machine_manifest> (
+ sp / "manifest", "machine");
+
+ none =
+ none && mm.effective_role () == machine_role::auxiliary;
+ }
+ else // Bootstrapping/suspended.
+ {
+ l3 ([&]{trace << "keeping " << md << ": being bootstrapped "
+ << "or suspened by " << ml.pid;});
+
+ // Assume it is a build machine (we cannot determine whether
+ // it is build or auxiliary without loading its manifest).
+ //
+ none = false;
+ }
+
+ // Add the machine to the lists and bail out.
+ //
+ r.push_back (bootstrapped_machine {
+ move (ml),
+ move (tp),
+ bootstrapped_machine_manifest {move (mm), {}, {}}});
- if (btrfs_exit (trace, "subvolume", "snapshot", sp, xp) != 0)
- {
- if (retry >= 10)
- fail << "unable to snapshot subvolume " << sp;
+ break;
+ }
- continue;
- }
+ bool te (dir_exists (tp));
- // Load the (original) machine manifest.
- //
- auto mm (
- parse_manifest<machine_manifest> (sp / "manifest", "machine"));
+ // If the resolution fails, then this means there is no current
+ // machine subvolume (for this bootstrap protocol). In this case
+ // we clean up our toolchain subvolume (-<toolchain>, if any)
+ // and ignore this machine.
+ //
+ if (sp.empty ())
+ {
+ if (te)
+ delete_bootstrapped ();
- // If we already have <name>-<toolchain>, see if it needs to be re-
- // bootstrapped. Things that render it obsolete:
- //
- // 1. New machine revision (compare machine ids).
- // 2. New toolchain (compare toolchain ids).
- // 3. New bbot/libbbot (compare versions).
- //
- // The last case has a complication: what should we do if we have
- // bootstrapped a newer version of bbot? This would mean that we are
- // about to be stopped and upgraded (and the upgraded version will
- // probably be able to use the result). So we simply ignore this
- // machine for this run.
+ l3 ([&]{trace << "skipping " << md << ": no subvolume link";});
+ break;
+ }
- // Return -1 if older, 0 if the same, and +1 if newer.
- //
- auto compare_bbot = [] (const bootstrap_manifest& m) -> int
- {
- auto cmp = [&m] (const string& n, const char* v) -> int
- {
- standard_version sv (v);
- auto i = m.versions.find (n);
+ // <name>-<toolchain>-<xxx>
+ //
+ dir_path xp (snapshot_path (tp));
- return (i == m.versions.end () || i->second < sv
- ? -1
- : i->second > sv ? 1 : 0);
- };
+ if (btrfs_exit (trace, "subvolume", "snapshot", sp, xp) != 0)
+ {
+ if (retry >= 10)
+ fail << "unable to snapshot subvolume " << sp;
- // Start from the top assuming a new dependency cannot be added
- // without changing the dependent's version.
- //
- int r;
- return
- (r = cmp ("bbot", BBOT_VERSION_STR)) != 0 ? r :
- (r = cmp ("libbbot", LIBBBOT_VERSION_STR)) != 0 ? r :
- (r = cmp ("libbpkg", LIBBPKG_VERSION_STR)) != 0 ? r :
- (r = cmp ("libbutl", LIBBUTL_VERSION_STR)) != 0 ? r : 0;
- };
-
- optional<bootstrapped_machine_manifest> bmm;
- if (te)
- {
- bmm = parse_manifest<bootstrapped_machine_manifest> (
- tp / "manifest", "bootstrapped machine");
+ continue;
+ }
- if (bmm->machine.id != mm.id)
- {
- l3 ([&]{trace << "re-bootstrapping " << tp << ": new machine";});
- te = false;
- }
+ // Load the (original) machine manifest.
+ //
+ machine_manifest mm (
+ parse_manifest<machine_manifest> (sp / "manifest", "machine"));
- if (!tc_id.empty () && bmm->toolchain.id != tc_id)
- {
- l3 ([&]{trace << "re-bootstrapping " << tp << ": new toolchain";});
- te = false;
- }
+ bool aux (mm.effective_role () == machine_role::auxiliary);
- if (int i = compare_bbot (bmm->bootstrap))
- {
- if (i < 0)
+ // Skip machines for which we don't have sufficient RAM.
+ //
+ if (effective_ram_minimum (mm) >
+ (aux ? ops.auxiliary_ram () : ops.build_ram ()))
+ {
+ l3 ([&]{trace << "skipping " << md << ": insufficient RAM";});
+ run_btrfs (trace, "subvolume", "delete", xp);
+ break;
+ }
+
+ none = none && aux;
+
+ // If we already have <name>-<toolchain>, see if it needs to be
+ // re-bootstrapped. Things that render it obsolete:
+ //
+ // 1. New machine revision (compare machine ids).
+ // 2. New toolchain (compare toolchain ids, not auxiliary).
+ // 3. New bbot/libbbot (compare versions, not auxiliary).
+ //
+ // The last case has a complication: what should we do if we
+ // have bootstrapped a newer version of bbot? This would mean
+ // that we are about to be stopped and upgraded (and the
+ // upgraded version will probably be able to use the result). So
+ // we simply ignore this machine for this run.
+ //
+ // Note: see similar code in the machine interruption logic.
+ //
+ optional<bootstrapped_machine_manifest> bmm;
+ if (te)
{
- l3 ([&]{trace << "re-bootstrapping " << tp << ": new bbot";});
- te = false;
+ bmm = parse_manifest<bootstrapped_machine_manifest> (
+ tp / "manifest", "bootstrapped machine");
+
+ if (bmm->machine.id != mm.id)
+ {
+ l3 ([&]{trace << "re-bootstrap " << tp << ": new machine";});
+ te = false;
+ }
+
+ if (!aux)
+ {
+ if (!tc_id.empty () && bmm->toolchain.id != tc_id)
+ {
+ l3 ([&]{trace << "re-bootstrap " << tp << ": new toolchain";});
+ te = false;
+ }
+
+ if (int i = compare_bbot (bmm->bootstrap))
+ {
+ if (i < 0)
+ {
+ l3 ([&]{trace << "re-bootstrap " << tp << ": new bbot";});
+ te = false;
+ }
+ else
+ {
+ l3 ([&]{trace << "ignoring " << tp << ": old bbot";});
+ run_btrfs (trace, "subvolume", "delete", xp);
+ break;
+ }
+ }
+ }
+
+ if (!te)
+ delete_bootstrapped ();
}
else
+ l3 ([&]{trace << "bootstrap " << tp;});
+
+ if (!te)
{
- l3 ([&]{trace << "ignoring " << tp << ": old bbot";});
- run_btrfs (trace, "subvolume", "delete", xp);
+ // Ignore any other machines that need bootstrapping.
+ //
+ if (!pboot)
+ {
+ pboot = pending_bootstrap {
+ move (ml), move (tp), move (xp), move (mm), move (bmm)};
+ }
+ else
+ run_btrfs (trace, "subvolume", "delete", xp);
+
break;
}
- }
+ else
+ run_btrfs (trace, "subvolume", "delete", xp);
- if (!te)
- delete_bootstrapped ();
- }
+ // Add the machine to the lists.
+ //
+ r.push_back (
+ bootstrapped_machine {move (ml), move (tp), move (*bmm)});
+
+ break;
+ } // Retry loop.
+ } // Inner dir_iterator loop.
+ }
+ catch (const system_error& e)
+ {
+ // Once in a while we get ENOENT while iterating over the machines
+ // volume directory. This directory contains the machine directories
+ // (not btrfs subvolumes) and is not being changed when we get this
+ // error. Maybe this is due to directory sizes/timestamps changes,
+ // but then we would expect to get this error a lot more often..? So
+ // this feels like a btrfs bug which we are going to retry a few
+ // times. See GH issue #349 for additional information.
+ //
+ dir_iter_retry = (dir_iter_retries++ != 3);
+
+ (dir_iter_retry
+ ? warn
+ : error) << "unable to iterate over " << vd << ": " << e;
+
+ if (dir_iter_retry)
+ break;
else
- l3 ([&]{trace << "bootstrapping " << tp;});
+ throw failed ();
+ }
+ } // Outer dir_iterator loop.
- if (!te)
- {
- // Use the <name>-<toolchain>-<xxx> snapshot that we have made to
- // bootstrap the new machine. Then atomically rename it to
- // <name>-<toolchain>.
- //
- // Also release all the machine locks that we have acquired so far
- // since the bootstrap will take a while and other instances might
- // be able to use them.
- //
- r.clear ();
+ if (dir_iter_retry)
+ continue; // Re-enumerate from scratch.
+ else
+ dir_iter_retries = 0; // Reset for re-enumeration due to other reasons.
+ }
+ catch (const system_error& e)
+ {
+ fail << "unable to iterate over " << machines << ": " << e;
+ }
- bmm = bootstrap_machine (xp, mm, move (bmm));
+ // See if there is a pending bootstrap and whether we can perform it.
+ //
+ // What should we do if we can't (i.e., we are in the priority monitor
+ // mode)? Well, we could have found some machines that are already
+ // bootstrapped (busy or not) and there may be a higher-priority task for
+ // one of them, so it feels natural to return whatever we've got.
+ //
+ if (pboot)
+ {
+ dir_path& tp (pboot->tp);
+ dir_path& xp (pboot->xp);
- if (!bmm)
- {
- l3 ([&]{trace << "ignoring " << tp << ": failed to bootstrap";});
- run_btrfs (trace, "subvolume", "delete", xp);
- break;
- }
+ // Determine how many machines are busy (locked by other processes) and
+ // make sure it's below the --instance-max limit, if specified.
+ //
+ // We should only count build machines unless being bootstrapped (see
+ // above).
+ //
+ if (inst_max != 0)
+ {
+ size_t busy (0);
+ for (const bootstrapped_machine& m: r)
+ {
+ if (!m.lock.locked () &&
+ (!m.lock.prio ||
+ m.manifest.machine.effective_role () != machine_role::auxiliary))
+ ++busy;
+ }
- try
- {
- mvdir (xp, tp);
- }
- catch (const system_error& e)
- {
- fail << "unable to rename " << xp << " to " << tp;
- }
+ assert (busy <= inst_max);
- l2 ([&]{trace << "bootstrapped " << bmm->machine.name;});
+ if (busy == inst_max)
+ {
+ l3 ([&]{trace << "instance max reached attempting to bootstrap "
+ << tp;});
+ run_btrfs (trace, "subvolume", "delete", xp);
+ return pr;
+ }
+ }
- // Check the bootstrapped bbot version as above and ignore this
- // machine if it's newer than us.
- //
- if (int i = compare_bbot (bmm->bootstrap))
- {
- if (i > 0)
- {
- l3 ([&]{trace << "ignoring " << tp << ": old bbot";});
- break;
- }
- else
- warn << "bootstrapped " << tp << " bbot worker is older "
- << "than agent; assuming test setup";
- }
- }
+ machine_lock& ml (pboot->ml);
+
+ l3 ([&]{trace << "bootstrapping " << tp;});
+
+ // Use the -<toolchain>-<xxx> snapshot that we have made to bootstrap
+ // the new machine. Then atomically rename it to -<toolchain>.
+ //
+ // Also release all the machine locks that we have acquired so far as
+ // well as the global toolchain lock, since the bootstrap will take a
+ // while and other instances might be able to use them. Because we are
+ // releasing the global lock, we have to restart the enumeration process
+ // from scratch.
+ //
+ r.clear ();
+ ml.bootstrap (tl);
+ tl.unlock ();
+
+ bool aux (pboot->mm.effective_role () == machine_role::auxiliary);
+
+ optional<bootstrapped_machine_manifest> bmm (
+ aux
+ ? bootstrap_auxiliary_machine (xp, pboot->mm, move (pboot->bmm))
+ : bootstrap_build_machine (xp, pboot->mm, move (pboot->bmm)));
+
+ if (!bmm)
+ {
+ l3 ([&]{trace << "ignoring " << tp << ": failed to bootstrap";});
+ run_btrfs (trace, "subvolume", "delete", xp);
+ continue;
+ }
+
+ try
+ {
+ mvdir (xp, tp);
+ }
+ catch (const system_error& e)
+ {
+ fail << "unable to rename " << xp << " to " << tp;
+ }
+
+ l2 ([&]{trace << "bootstrapped " << bmm->machine.name;});
+
+ // Check the bootstrapped bbot version as above and ignore this build
+ // machine if it's newer than us.
+ //
+ if (!aux)
+ {
+ if (int i = compare_bbot (bmm->bootstrap))
+ {
+ if (i > 0)
+ l3 ([&]{trace << "ignoring " << tp << ": old bbot";});
else
- run_btrfs (trace, "subvolume", "delete", xp);
+ warn << "bootstrapped " << tp << " bbot worker is older "
+ << "than agent; assuming test setup";
+ }
+ }
- // Add the machine to the lists.
- //
- r.push_back (
- bootstrapped_machine {move (tp), move (*bmm), move (*ml)});
+ continue; // Re-enumerate from scratch.
+ }
+
+ if (none)
+ warn << "no build machines for toolchain " << tc_name;
+
+ return pr;
+
+ } // From-scratch retry loop.
+
+ // Unreachable.
+}
+
+// Perform the build task throwing interrupt if it has been interrupted.
+//
+struct interrupt {};
+
+// Start an auxiliary machine (steps 1-3 described in perfrom_task() below).
+//
+// Note that if the returned machine is NULL, then it means it has failed to
+// start up (in which case the diagnostics has already been issued and
+// snapshot cleaned up).
+//
+// Note: can throw interrupt.
+//
+struct auxiliary_machine_result
+{
+ dir_path snapshot;
+ unique_ptr<bbot::machine> machine;
+};
+
+using auxiliary_machine_results = vector<auxiliary_machine_result>;
+
+static pair<auxiliary_machine_result, string /* environment */>
+start_auxiliary_machine (bootstrapped_machine& am,
+ const string& env_name,
+ uint16_t machine_num,
+ size_t ram,
+ const string& in_name, // <toolchain>-<instance>
+ const dir_path& tftp_put_dir,
+ optional<size_t> boost_cpus)
+try
+{
+ tracer trace ("start_auxiliary_machine", am.path.string ().c_str ());
+
+ // NOTE: a simplified version of perform_task() below.
+
+ machine_lock& ml (am.lock);
+ const dir_path& md (am.path);
+ const bootstrapped_machine_manifest& mm (am.manifest);
+
+ path ef (tftp_put_dir / "environment"); // Environment upload file.
+ path efm (ef + '-' + mm.machine.name); // Environment upload saved file.
+ try_rmfile (ef);
+ try_rmfile (efm);
+
+ // <name>-<toolchain>-<xxx>
+ //
+ const dir_path xp (snapshot_path (md));
+
+ for (size_t retry (0);; ++retry)
+ {
+ if (retry != 0)
+ run_btrfs (trace, "subvolume", "delete", xp);
+ run_btrfs (trace, "subvolume", "snapshot", md, xp);
+
+ // Start the TFTP server. Map:
+ //
+ // GET requests to /dev/null
+ // PUT requests to .../build/<toolchain>-<instance>/put/*
+ //
+ // Note that we only need to run the TFTP server until we get the
+ // environment upload. Which means we could have reused the same port as
+ // the build machine. But let's keep things parallel to the VNC ports and
+ // use a seperate TFTP port for each auxiliary machine.
+ //
+ tftp_server tftpd ("Gr ^/?(.+)$ " + string ("/dev/null") + '\n' +
+ "Pr ^/?(.+)$ /build/" + in_name + "/put/\\1\n",
+ ops.tftp_port () + offset + machine_num);
+
+ l3 ([&]{trace << "tftp server on port " << tftpd.port ();});
+
+ // Note: the machine handling logic is similar to bootstrap. Except here
+ // we have to cleanup the snapshot ourselves in case of suspension or
+ // unexpected exit.
+
+ // Start the machine.
+ //
+ // Note that for now we don't support logging in into auxiliary machines
+ // in the interactive mode. Maybe one day.
+ //
+ unique_ptr<machine> m (
+ start_machine (xp,
+ mm.machine,
+ machine_num,
+ boost_cpus ? *boost_cpus : ops.cpu (),
+ ram,
+ mm.machine.mac,
+ ops.bridge (),
+ tftpd.port (),
+ false /* public_vnc */));
+
+ auto mg (
+ make_exception_guard (
+ [&m, &xp] ()
+ {
+ if (m != nullptr)
+ {
+ info << "trying to force machine " << xp << " down";
+ try {m->forcedown (false);} catch (const failed&) {}
+ }
+ }));
+
+ auto soft_fail = [&trace, &ml, &xp, &m] (const char* msg)
+ {
+ {
+ diag_record dr (error);
+ dr << msg << " for machine " << xp << ", suspending";
+ m->print_info (dr);
+ }
+
+ try
+ {
+ // Update the information in the machine lock to signal that the
+ // machine is suspended and cannot be interrupted.
+ //
+ ml.suspend_task ();
+
+ m->suspend (false);
+ m->wait (false);
+ m->cleanup ();
+ run_btrfs (trace, "subvolume", "delete", xp);
+ info << "resuming after machine suspension";
+ }
+ catch (const failed&) {}
+
+ return make_pair (auxiliary_machine_result {move (xp), nullptr},
+ string ());
+ };
+
+ auto check_machine = [&xp, &m] ()
+ {
+ try
+ {
+ size_t t (0);
+ if (!m->wait (t /* seconds */, false /* fail_hard */))
+ return true;
+ }
+ catch (const failed&) {}
+
+ diag_record dr (warn);
+ dr << "machine " << xp << " exited unexpectedly";
+ m->print_info (dr);
+
+ return false;
+ };
+
+ auto check_interrupt = [&trace, &xp, &m] ()
+ {
+ if (sigurs1.load (std::memory_order_consume) == 0)
+ return;
+
+ l2 ([&]{trace << "machine " << xp << " interruped";});
+
+ try {m->forcedown (false);} catch (const failed&) {}
+ m->cleanup ();
+ m = nullptr; // Disable exceptions guard above.
+ run_btrfs (trace, "subvolume", "delete", xp);
+
+ throw interrupt ();
+ };
+
+ // Wait for up to 4 minutes (by default) for the environment upload (the
+ // same logic as in bootstrap_auxiliary_machine() except here the machine
+ // cannot just exit).
+ //
+ size_t to;
+ const size_t startup_to (ops.build_startup ());
+
+ for (to = startup_to; to != 0; )
+ {
+ check_interrupt ();
+
+ if (tftpd.serve (to, 2))
+ continue;
+
+ if (!check_machine ())
+ {
+ // An auxiliary machine should not just exit.
+ //
+ return make_pair (auxiliary_machine_result {move (xp), nullptr},
+ string ());
+ }
+
+ if (file_not_empty (ef))
+ {
+ if (!tftpd.serve (to, 5))
break;
+ }
+ }
+
+ if (to == 0)
+ {
+ if (retry > ops.build_retries ())
+ return soft_fail ("build startup timeout");
+
+ // Note: keeping the logs behind (no cleanup).
+
+ diag_record dr (warn);
+ dr << "machine " << mm.machine.name << " mis-booted, retrying";
+ m->print_info (dr);
+
+ try {m->forcedown (false);} catch (const failed&) {}
+ m = nullptr; // Disable exceptions guard above.
+ continue;
+ }
+
+ l3 ([&]{trace << "completed startup in " << startup_to - to << "s";});
+
+ // Read the uploaded environment and, if necessary, append the name prefix
+ // (which we first make a valid C identifier and uppercase).
+ //
+ // Note that it may seem like a good idea to validate the format here.
+ // But that means we will essentially need to parse it twice (here and in
+ // worker). Plus, in worker we can comminucate some diagnostics by writing
+ // it to the build log (here all we can easily do is abort the task). So
+ // here we just append the name prefix to trimmed non-blank/comment lines.
+ //
+ string env_pfx (env_name.empty ()
+ ? string ()
+ : ucase (sanitize_identifier (env_name)) + '_');
+ string env;
+ try
+ {
+ ifdstream is (ef, ifdstream::badbit);
+ for (string l; !eof (getline (is, l)); )
+ {
+ trim (l);
+
+ if (!env_pfx.empty () && !l.empty () && l.front () != '#')
+ l.insert (0, env_pfx);
+
+ env += l; env += '\n';
+ }
+ }
+ catch (const io_error& e)
+ {
+ fail << "unable to read from " << ef << ": " << e;
+ }
+
+ // Rename and keep the environment file for debugging (it will be removed
+ // at the end as part of the tftp_put_dir cleanup).
+ //
+ mvfile (ef, efm);
+
+ return make_pair (auxiliary_machine_result {move (xp), move (m)},
+ move (env));
+ }
+
+ // Unreachable.
+}
+catch (const system_error& e)
+{
+ fail << "auxiliary machine startup error: " << e << endf;
+}
+
+// Divide the auxiliary RAM among the specified machines.
+//
+// Issue diagnostics and return empty vector if the auxiliary RAM is
+// insufficient.
+//
+static vector<size_t> // Parallel to mms.
+divide_auxiliary_ram (const vector<const machine_header_manifest*>& mms)
+{
+ size_t ram (ops.auxiliary_ram ());
+
+ vector<size_t> rams;
+ vector<size_t> rnds; // Allocation rounds (see below).
+
+ // First pass: allocate the minimums.
+ //
+ for (const machine_header_manifest* mm: mms)
+ {
+ size_t v (effective_ram_minimum (*mm));
+
+ assert (!mm->ram_maximum || v <= *mm->ram_maximum); // Sanity check.
+
+ rams.push_back (v);
+ rnds.push_back (0);
+
+ if (ram >= v)
+ ram -= v;
+ else
+ {
+ diag_record dr (error);
+ dr << "insufficient auxiliary RAM " << ops.auxiliary_ram () << "KiB";
+
+ for (size_t i (0); i != rams.size (); ++i)
+ dr << info << mms[i]->name << " requires minimum " << rams[i] << "KiB";
+
+ return {};
+ }
+ }
+
+ // Second pass: distribute the remaining RAM.
+ //
+ // We are going to do it in the ram_minimum increments to avoid ending up
+ // with odd amounts (while Linux can probably grok anything, who knows about
+ // Windows).
+ //
+ // To make the distribution fair we are going to count how many times we
+ // have increased each machine's allocation (the rnds vector).
+ //
+ for (size_t a (1); ram != 0; ) // Allocation round.
+ {
+ // Find a machine that would be satisfied with the least amount of RAM but
+ // which hasn't yet been given anything on this allocation round.
+ //
+ size_t min_i; // Min index.
+ size_t min_v (0); // Min value.
+
+ // We are done if we couldn't give out any RAM and haven't seen any
+ // machines that have already been given something on this allocation
+ // round.
+ //
+ bool done (true);
+
+ for (size_t i (0); i != rams.size (); ++i)
+ {
+ if (rnds[i] != a)
+ {
+ const machine_header_manifest& mm (*mms[i]);
+
+ size_t o (rams[i]);
+ size_t v (effective_ram_minimum (mm));
+
+ // Don't allocate past maximum.
+ //
+ if (mm.ram_maximum && *mm.ram_maximum < o + v)
+ {
+ v = *mm.ram_maximum - o;
+
+ if (v == 0)
+ continue;
+ }
+
+ if (v <= ram && (min_v == 0 || min_v > v))
+ {
+ min_i = i;
+ min_v = v;
}
}
+ else
+ done = false;
}
- catch (const system_error& e)
+
+ if (min_v != 0)
{
- fail << "unable to iterate over " << vd << ": " << e;
+ rnds[min_i] = a;
+ rams[min_i] += min_v;
+ ram -= min_v;
+ }
+ else
+ {
+ if (done)
+ break;
+
+ ++a; // Next allocation round.
}
}
- if (none)
- warn << "no build machines for toolchain " << tc_name;
+ return rams;
+}
- return r;
+// Stop all the auxiliary machines and clear the passed list.
+//
+static void
+stop_auxiliary_machines (auxiliary_machine_results& amrs)
+{
+ tracer trace ("stop_auxiliary_machines");
+
+ if (!amrs.empty ())
+ {
+ // Do it in two passes to make sure all the machines are at least down.
+ //
+ for (const auxiliary_machine_result& amr: amrs)
+ {
+ if (amr.machine != nullptr)
+ {
+ try {amr.machine->forcedown (false);} catch (const failed&) {}
+ }
+ }
+
+ // Make sure we don't retry the above even if the below fails.
+ //
+ auxiliary_machine_results tmp;
+ tmp.swap (amrs);
+
+ for (const auxiliary_machine_result& amr: tmp)
+ {
+ if (amr.machine != nullptr)
+ {
+ amr.machine->cleanup ();
+ run_btrfs (trace, "subvolume", "delete", amr.snapshot);
+ }
+ }
+ }
}
-catch (const system_error& e)
+
+// Start all the auxiliary machines and patch in their combined environment
+// into tm.auxiliary_environment.
+//
+// Return the started machines or empty list if any of them failed to start up
+// (which means this function should only be called for non-empty ams).
+//
+// Note that the order of auxiliary machines in ams may not match that in
+// tm.auxiliary_machines.
+//
+static auxiliary_machine_results
+start_auxiliary_machines (const vector<bootstrapped_machine*>& ams,
+ task_manifest& tm,
+ const string& in_name, // <toolchain>-<instance>
+ const dir_path& tftp_put_dir,
+ optional<size_t> boost_cpus)
{
- fail << "unable to iterate over " << machines << ": " << e << endf;
+ tracer trace ("start_auxiliary_machines");
+
+ size_t n (tm.auxiliary_machines.size ());
+
+ assert (n != 0 && ams.size () == n);
+
+ auxiliary_machine_results amrs;
+
+ // Divide the auxiliary RAM among the machines.
+ //
+ vector<size_t> rams;
+ {
+ vector<const machine_header_manifest*> mms;
+ mms.reserve (n);
+ for (bootstrapped_machine* am: ams)
+ mms.push_back (&am->manifest.machine);
+
+ rams = divide_auxiliary_ram (mms);
+ if (rams.empty ())
+ return amrs;
+
+ if (verb > 3) // l3
+ for (size_t i (0); i != n; ++i)
+ trace << mms[i]->name << " allocated " << rams[i] << "KiB";
+ }
+
+ // Start the machines.
+ //
+ // Let's use the order in which they were specified in the task manifest
+ // (which will naturally be the order in which they are specified in the
+ // package manifest). This way amrs and tm.auxiliary_machines will be
+ // parallel.
+ //
+ string envs; // Combined environments.
+
+ auto amg (
+ make_exception_guard (
+ [&amrs] ()
+ {
+ if (!amrs.empty ())
+ {
+ info << "trying to force auxiliary machines down";
+ stop_auxiliary_machines (amrs);
+ }
+ }));
+
+ for (size_t i (0); i != n; ++i)
+ {
+ const auxiliary_machine& tam (tm.auxiliary_machines[i]);
+
+ auto b (ams.begin ()), e (ams.end ());
+ auto j (find_if (b, e,
+ [&tam] (const bootstrapped_machine* m)
+ {
+ return m->manifest.machine.name == tam.name;
+ }));
+ assert (j != e);
+
+ // Note: can throw interrupt.
+ //
+ pair<auxiliary_machine_result, string> p (
+ start_auxiliary_machine (**j,
+ tam.environment_name,
+ i + 1,
+ rams[j - b], // Parallel to ams.
+ in_name,
+ tftp_put_dir,
+ boost_cpus));
+
+ if (p.first.machine == nullptr)
+ {
+ if (!amrs.empty ())
+ {
+ info << "trying to force auxiliary machines down";
+ stop_auxiliary_machines (amrs); // amrs is now empty.
+ }
+
+ return amrs;
+ }
+
+ amrs.push_back (move (p.first));
+
+ // Add the machine name as a header before its environment.
+ //
+ if (i != 0) envs += '\n';
+ envs += "# "; envs += tam.name; envs += '\n';
+ envs += "#\n";
+ envs += p.second; // Always includes trailing newline.
+ }
+
+ tm.auxiliary_environment = move (envs);
+
+ return amrs;
}
-static result_manifest
-perform_task (const dir_path& md,
- const bootstrapped_machine_manifest& mm,
- const task_manifest& tm)
+struct perform_task_result
+{
+ auto_rmdir work_dir; // <tftp>/build/<toolchain>-<instance>/
+ result_manifest manifest;
+
+ // Uploaded archive, if any (somewhere inside work_dir).
+ //
+ optional<path> upload_archive;
+
+ // Create the special empty result.
+ //
+ perform_task_result () = default;
+
+ // Create task result without build artifacts.
+ //
+ explicit
+ perform_task_result (auto_rmdir&& d, result_manifest&& m)
+ : work_dir (move (d)),
+ manifest (move (m)) {}
+
+ // Create task result with build artifacts.
+ //
+ perform_task_result (auto_rmdir&& d, result_manifest&& m, path&& a)
+ : work_dir (move (d)),
+ manifest (move (m)),
+ upload_archive (move (a)) {}
+};
+
+// Note that the task manifest is not const since we may need to patch in the
+// auxiliary_environment value.
+//
+static perform_task_result
+perform_task (toolchain_lock tl, // Note: assumes ownership.
+ bootstrapped_machine& bm, // Build machine.
+ const vector<bootstrapped_machine*>& ams, // Auxiliary machines.
+ task_manifest& tm,
+ optional<size_t> boost_cpus)
try
{
- tracer trace ("perform_task", md.string ().c_str ());
+ tracer trace ("perform_task", bm.path.string ().c_str ());
+
+ // Arm the interrupt handler and release the global toolchain lock.
+ //
+ // Note that there can be no interrupt while we are holding the global lock.
+ //
+ sigurs1.store (0, std::memory_order_release);
+ tl.unlock ();
+
+ machine_lock& ml (bm.lock);
+ const dir_path& md (bm.path);
+ const bootstrapped_machine_manifest& mm (bm.manifest);
+
+ const string in_name (tc_name + '-' + to_string (inst));
+ auto_rmdir arm ((dir_path (ops.tftp ()) /= "build") /= in_name);
+
+ try_mkdir_p (arm.path);
result_manifest r {
tm.name,
tm.version,
result_status::abort,
- operation_results {}};
+ operation_results {},
+ nullopt /* worker_checksum */,
+ nullopt /* dependency_checksum */};
if (ops.fake_build ())
- return r;
+ return perform_task_result (move (arm), move (r));
// The overall plan is as follows:
//
- // 1. Snapshot the (bootstrapped) machine.
+ // 1. Snapshot the (bootstrapped) build machine.
//
// 2. Save the task manifest to the TFTP directory (to be accessed by the
// worker).
//
// 3. Start the TFTP server and the machine.
//
- // 4. Serve TFTP requests while watching out for the result manifest.
+ // 4. Serve TFTP requests while watching out for the result manifest and
+ // interrupts.
//
// 5. Clean up (force the machine down and delete the snapshot).
//
+ // If the task requires any auxiliary machines, then for each such machine
+ // perform the following steps 1-3 before step 1 above, and step 4 after
+ // step 5 above (that is, start all the auxiliary machines before the build
+ // machine and clean them up after):
+ //
+ // 1. Snapshot the (bootstrapped) auxiliary machine.
+ //
+ // 2. Start the TFTP server and the machine.
+ //
+ // 3. Handle TFTP upload requests until received the environment upload.
+ //
+ // 4. Clean up (force the machine down and delete the snapshot).
// TFTP server mapping (server chroot is --tftp):
//
- // GET requests to .../build/<name>-<instance>/get/*
- // PUT requests to .../build/<name>-<instance>/put/*
+ // GET requests to .../build/<toolchain>-<instance>/get/*
+ // PUT requests to .../build/<toolchain>-<instance>/put/*
//
- const string in_name (tc_name + '-' + to_string (inst));
- auto_rmdir arm ((dir_path (ops.tftp ()) /= "build") /= in_name);
-
dir_path gd (dir_path (arm.path) /= "get");
dir_path pd (dir_path (arm.path) /= "put");
@@ -838,11 +2184,15 @@ try
path tf (gd / "task.manifest"); // Task manifest file.
path rf (pd / "result.manifest.lz4"); // Result manifest file.
-
- serialize_manifest (tm, tf, "task");
+ path af (pd / "upload.tar"); // Archive of build artifacts to upload.
if (ops.fake_machine_specified ())
{
+ // Note: not handling interrupts here. Nor starting any auxiliary
+ // machines, naturally.
+
+ serialize_manifest (tm, tf, "task");
+
// Simply wait for the file to appear.
//
for (size_t i (0);; sleep (1))
@@ -863,7 +2213,40 @@ try
}
else
{
+ // Start the auxiliary machines if any.
+ //
+ // If anything goes wrong, force them all down (failed that, the machine
+ // destructor will block waiting for their exit).
+ //
+ auxiliary_machine_results amrs;
+
+ auto amg (
+ make_exception_guard (
+ [&amrs] ()
+ {
+ if (!amrs.empty ())
+ {
+ info << "trying to force auxiliary machines down";
+ stop_auxiliary_machines (amrs);
+ }
+ }));
+
+ if (!ams.empty ())
+ {
+ amrs = start_auxiliary_machines (ams, tm, in_name, pd, boost_cpus);
+
+ if (amrs.empty ())
+ return perform_task_result (move (arm), move (r)); // Abort.
+ }
+
+ // Note: tm.auxiliary_environment patched in by start_auxiliary_machines().
+ //
+ serialize_manifest (tm, tf, "task");
+
+ // Start the build machine and perform the build.
+ //
try_rmfile (rf);
+ try_rmfile (af);
// <name>-<toolchain>-<xxx>
//
@@ -880,32 +2263,45 @@ try
//
tftp_server tftpd ("Gr ^/?(.+)$ /build/" + in_name + "/get/\\1\n" +
"Pr ^/?(.+)$ /build/" + in_name + "/put/\\1\n",
- ops.tftp_port () + offset);
+ ops.tftp_port () + offset + 0 /* build machine */);
l3 ([&]{trace << "tftp server on port " << tftpd.port ();});
- // Start the machine.
+ // Note: the machine handling logic is similar to bootstrap. Except here
+ // we have to cleanup the snapshot ourselves in case of suspension or
+ // unexpected exit.
//
- unique_ptr<machine> m (
- start_machine (xp,
- mm.machine,
- mm.machine.mac,
- ops.bridge (),
- tftpd.port (),
- tm.interactive.has_value ()));
-
- // Note: the machine handling logic is similar to bootstrap.
+ // NOTE: see similar code in start_auxiliary_machine() above.
//
{
+ // Start the machine.
+ //
+ unique_ptr<machine> m (
+ start_machine (xp,
+ mm.machine,
+ 0 /* machine_num (build) */,
+ boost_cpus ? *boost_cpus : ops.cpu (),
+ ops.build_ram (),
+ mm.machine.mac,
+ ops.bridge (),
+ tftpd.port (),
+ tm.interactive.has_value () /* public_vnc */));
+
auto mg (
make_exception_guard (
[&m, &xp] ()
{
- info << "trying to force machine " << xp << " down";
- try {m->forcedown (false);} catch (const failed&) {}
+ if (m != nullptr)
+ {
+ info << "trying to force machine " << xp << " down";
+ try {m->forcedown (false);} catch (const failed&) {}
+ }
}));
- auto soft_fail = [&xp, &m, &r] (const char* msg)
+ auto soft_fail = [&trace,
+ &amrs,
+ &ml, &xp, &m,
+ &arm, &r] (const char* msg)
{
{
diag_record dr (error);
@@ -913,16 +2309,36 @@ try
m->print_info (dr);
}
+ // What should we do about auxiliary machines? We could force them
+ // all down before suspending (and thus freeing them for use). That
+ // is the easy option. We could suspend them as well, but that feels
+ // like it will be a pain (will need to resume all of them when done
+ // investigating). Theoretically we could just let them run, but
+ // that won't play well with our interrupt logic since someone may
+ // attempt to interrupt us via one of them. So let's do easy for
+ // now.
+ //
+ // Note: always stop/suspend the build machine before the auxiliary
+ // machines to avoid any errors due the auxiliary machines being
+ // unavailable.
try
{
+ // Update the information in the machine lock to signal that the
+ // machine is suspended and cannot be interrupted.
+ //
+ ml.suspend_task ();
+
m->suspend (false);
+ stop_auxiliary_machines (amrs);
m->wait (false);
m->cleanup ();
+ m = nullptr; // Disable exceptions guard above.
+ run_btrfs (trace, "subvolume", "delete", xp);
info << "resuming after machine suspension";
}
catch (const failed&) {}
- return r;
+ return perform_task_result (move (arm), move (r));
};
auto check_machine = [&xp, &m] ()
@@ -933,9 +2349,7 @@ try
if (!m->wait (t /* seconds */, false /* fail_hard */))
return true;
}
- catch (const failed&)
- {
- }
+ catch (const failed&) {}
diag_record dr (warn);
dr << "machine " << xp << " exited unexpectedly";
@@ -944,26 +2358,76 @@ try
return false;
};
+ auto check_auxiliary_machines = [&amrs] ()
+ {
+ for (auxiliary_machine_result& amr: amrs)
+ {
+ try
+ {
+ size_t t (0);
+ if (!amr.machine->wait (t /* seconds */, false /* fail_hard */))
+ continue;
+ }
+ catch (const failed&) {}
+
+ diag_record dr (warn);
+ dr << "machine " << amr.snapshot << " exited unexpectedly";
+ amr.machine->print_info (dr);
+
+ return false;
+ }
+
+ return true;
+ };
+
+ auto check_interrupt = [&trace, &amrs, &xp, &m] ()
+ {
+ if (sigurs1.load (std::memory_order_consume) == 0)
+ return;
+
+ l2 ([&]{trace << "machine " << xp << " interruped";});
+
+ try {m->forcedown (false);} catch (const failed&) {}
+ stop_auxiliary_machines (amrs);
+ m->cleanup ();
+ m = nullptr; // Disable exceptions guard above.
+ run_btrfs (trace, "subvolume", "delete", xp);
+
+ throw interrupt ();
+ };
+
// The first request should be the task manifest download. Wait for up
- // to 2 minutes for that to arrive (again, that long to deal with
- // flaky Windows networking). In a sense we use it as an indication
- // that the machine has booted and the worker process has started.
+ // to 4 minutes (by default) for that to arrive (again, that long to
+ // deal with flaky Windows networking, etc). In a sense we use it as
+ // an indication that the machine has booted and the worker process
+ // has started.
//
size_t to;
- const size_t startup_to (120);
+ const size_t startup_to (ops.build_startup ());
const size_t build_to (tm.interactive
? ops.intactive_timeout ()
: ops.build_timeout ());
- // Wait periodically making sure the machine is still alive.
+ // Wait periodically making sure the machine is still alive and
+ // checking for interrupts.
//
for (to = startup_to; to != 0; )
{
+ check_interrupt ();
+
if (tftpd.serve (to, 2))
break;
- if (!check_machine ())
- return r;
+ bool bm; // Build machine still running.
+ if (!(bm = check_machine ()) || !check_auxiliary_machines ())
+ {
+ if (bm)
+ try {m->forcedown (false);} catch (const failed&) {}
+ stop_auxiliary_machines (amrs);
+ m = nullptr; // Disable exceptions guard above.
+ run_btrfs (trace, "subvolume", "delete", xp);
+ return perform_task_result (move (arm), move (r));
+ }
}
if (to == 0)
@@ -978,26 +2442,38 @@ try
m->print_info (dr);
try {m->forcedown (false);} catch (const failed&) {}
+ m = nullptr; // Disable exceptions guard above.
continue;
}
l3 ([&]{trace << "completed startup in " << startup_to - to << "s";});
- // Next the worker builds things and then uploads the result manifest.
- // So on our side we serve TFTP requests while checking for the
- // manifest file. To workaround some obscure filesystem races (the
- // file's mtime/size is updated several seconds later; maybe tmpfs
- // issue?), we periodically re-check.
+ // Next the worker builds things and then uploads optional archive of
+ // build artifacts and the result manifest afterwards. So on our side
+ // we serve TFTP requests while checking for the manifest file. To
+ // workaround some obscure filesystem races (the file's mtime/size is
+ // updated several seconds later; maybe tmpfs issue?), we periodically
+ // re-check.
//
for (to = build_to; to != 0; )
{
+ check_interrupt ();
+
if (tftpd.serve (to, 2))
continue;
- if (!check_machine ())
+ bool bm; // Build machine still running.
+ if (!(bm = check_machine ()) || !check_auxiliary_machines ())
{
- if (!file_not_empty (rf))
- return r;
+ if (bm || !file_not_empty (rf))
+ {
+ if (bm)
+ try {m->forcedown (false);} catch (const failed&) {}
+ stop_auxiliary_machines (amrs);
+ m = nullptr; // Disable exceptions guard above.
+ run_btrfs (trace, "subvolume", "delete", xp);
+ return perform_task_result (move (arm), move (r));
+ }
}
if (file_not_empty (rf))
@@ -1050,7 +2526,9 @@ try
// lease instead of a new one.
//
try {m->forcedown (false);} catch (const failed&) {}
+ stop_auxiliary_machines (amrs);
m->cleanup ();
+ m = nullptr; // Disable exceptions guard above.
}
}
@@ -1059,7 +2537,7 @@ try
}
}
- // Update package name/version if the returned value as "unknown".
+ // Update package name/version if the returned value is "unknown".
//
if (r.version == bpkg::version ("0"))
{
@@ -1069,23 +2547,16 @@ try
r.version = tm.version;
}
- return r;
+ return (!r.status || !file_exists (af)
+ ? perform_task_result (move (arm), move (r))
+ : perform_task_result (move (arm), move (r), move (af)));
}
catch (const system_error& e)
{
fail << "build error: " << e << endf;
}
-extern "C" void
-handle_signal (int sig)
-{
- switch (sig)
- {
- case SIGHUP: exit (3); // Unimplemented feature.
- case SIGTERM: exit (0);
- default: assert (false);
- }
-}
+static const string agent_checksum ("2"); // Logic version.
int
main (int argc, char* argv[])
@@ -1096,6 +2567,21 @@ try
verb = ops.verbose ();
+#if 0
+ // ./bbot-agent --auxiliary-ram 4194304
+ //
+ machine_header_manifest m1 {
+ "m1", "m1", "m1", machine_role::auxiliary, 512*1024, nullopt};
+ machine_header_manifest m2 {
+ "m2", "m2", "m2", machine_role::auxiliary, 1024*1024, 3*512*1024};
+ vector<const machine_header_manifest*> mms {&m1, &m2};
+ vector<size_t> rams (divide_auxiliary_ram (mms));
+ for (size_t i (0); i != rams.size (); ++i)
+ text << mms[i]->name << ' ' << rams[i] / 1024;
+
+ return 0;
+#endif
+
// @@ systemd 231 added JOURNAL_STREAM environment variable which allows
// detecting if stderr is connected to the journal.
//
@@ -1175,7 +2661,7 @@ try
<< "libbpkg " << LIBBPKG_VERSION_ID << endl
<< "libbutl " << LIBBUTL_VERSION_ID << endl
<< "Copyright (c) " << BBOT_COPYRIGHT << "." << endl
- << "TBC; All rights reserved" << endl;
+ << "This is free software released under the MIT license." << endl;
return 0;
}
@@ -1194,40 +2680,102 @@ try
tc_name = ops.toolchain_name ();
tc_num = ops.toolchain_num ();
+
+ if (ops.toolchain_lock_specified ())
+ {
+ const string& l (ops.toolchain_lock ());
+
+ if (!l.empty ())
+ {
+ tc_lock = path (l);
+
+ if (!tc_lock.absolute ())
+ fail << "--toolchain-lock value '" << l << "' should be absolute path";
+ }
+ }
+ else if (!(ops.fake_bootstrap () ||
+ ops.fake_build () ||
+ ops.fake_machine_specified () ||
+ ops.fake_request_specified ()))
+ tc_lock = path ("/var/lock/bbot-agent-" + tc_name + ".lock");
+
tc_ver = (ops.toolchain_ver_specified ()
? ops.toolchain_ver ()
: standard_version (BBOT_VERSION_STR));
tc_id = ops.toolchain_id ();
- if (tc_num == 0 || tc_num > 99)
- fail << "invalid --toolchain-num value " << tc_num;
+ if (tc_num == 0 || tc_num > 9)
+ fail << "--toolchain-num value " << tc_num << " out of range";
inst = ops.instance ();
if (inst == 0 || inst > 99)
- fail << "invalid --instance value " << inst;
+ fail << "--instance value " << inst << " out of range";
- offset = (tc_num - 1) * 100 + inst;
+ inst_max = ops.instance_max ();
- // Controller URLs.
+ // The last decimal position is used for machine number, 0 for the build
+ // machine, non-0 for auxiliary machines (of which we can have maximum 9).
//
- if (argc < 2 &&
- !ops.dump_machines () &&
- !ops.fake_request_specified ())
- {
- fail << "controller url expected" <<
- info << "run " << argv[0] << " --help for details";
- }
+ offset = (tc_num - 1) * 1000 + inst * 10;
- strings controllers;
+ // Controller priority to URLs map.
+ //
+ std::map<uint64_t, strings> controllers;
for (int i (1); i != argc; ++i)
- controllers.push_back (argv[i]);
+ {
+ // [<prio>=]<url>
+ //
+ string a (argv[i]);
+
+ // See if we have priority, falling back to priority 0 if absent.
+ //
+ uint64_t prio (0);
+
+ // Note that we can also have `=` in <url> (e.g., parameters) so we will
+ // only consider `=` as ours if prior to it we only have digits.
+ //
+ size_t p (a.find ('='));
+ if (p != string::npos && a.find_first_not_of ("0123456789") == p)
+ {
+ // Require exactly four or five digits in case we later need to extend
+ // the priority levels beyond the 10 possible values (e.g., DDCCBBAA).
+ //
+ if (p != 4 && p != 5)
+ fail << "four or five-digit controller url priority expected in '"
+ << a << "'";
+
+ char* e;
+ errno = 0;
+ prio = strtoull (a.c_str (), &e, 10);
+ assert (errno != ERANGE && e == a.c_str () + p);
+
+ if (prio > 19999)
+ fail << "out of bounds controller url priority in '" << a << "'";
+
+ a.erase (0, p + 1);
+ }
+
+ controllers[prio].push_back (move (a));
+ }
+
+ if (controllers.empty ())
+ {
+ if (ops.dump_machines () || ops.fake_request_specified ())
+ {
+ controllers[0].push_back ("https://example.org");
+ }
+ else
+ fail << "controller url expected" <<
+ info << "run " << argv[0] << " --help for details";
+ }
// Handle SIGHUP and SIGTERM.
//
if (signal (SIGHUP, &handle_signal) == SIG_ERR ||
- signal (SIGTERM, &handle_signal) == SIG_ERR)
+ signal (SIGTERM, &handle_signal) == SIG_ERR ||
+ signal (SIGUSR1, &handle_signal) == SIG_ERR)
fail << "unable to set signal handler: "
<< system_error (errno, std::generic_category ()); // Sanitize.
@@ -1267,7 +2815,8 @@ try
dr <<
info << "cpu(s) " << ops.cpu () <<
- info << "ram(kB) " << ops.ram () <<
+ info << "build ram(KiB) " << ops.build_ram () <<
+ info << "auxil ram(KiB) " << ops.auxiliary_ram () <<
info << "bridge " << ops.bridge ();
if (fingerprint)
@@ -1281,8 +2830,19 @@ try
info << "toolchain id " << tc_id <<
info << "instance num " << inst;
- for (const string& u: controllers)
- dr << info << "controller url " << u;
+ if (inst_max != 0)
+ dr << info << "instance max " << inst_max;
+
+ // Note: keep last since don't restore fill/setw.
+ //
+ for (const pair<const uint64_t, strings>& p: controllers)
+ {
+ for (const string& u: p.second)
+ {
+ dr.os.fill ('0');
+ dr << info << "controller url " << std::setw (4) << p.first << '=' << u;
+ }
+ }
}
// The work loop. The steps we go through are:
@@ -1296,9 +2856,11 @@ try
// 4. If a build task is returned, do it, upload the result, and go to #1
// (immediately).
//
- auto rand_sleep = [g = std::mt19937 (std::random_device {} ())] () mutable
+ // NOTE: consider updating agent_checksum if making any logic changes.
+ //
+ auto rand_sleep = [] ()
{
- return std::uniform_int_distribution<unsigned int> (50, 60) (g);
+ return std::uniform_int_distribution<unsigned int> (50, 60) (rand_gen);
};
optional<interactive_mode> imode;
@@ -1307,78 +2869,259 @@ try
if (ops.interactive () != interactive_mode::false_)
{
imode = ops.interactive ();
- ilogin = machine_vnc (true /* public */);
+ ilogin = machine_vnc (0 /* machine_num */, true /* public */);
+ }
+
+ // Use the pkeyutl openssl command for signing the task response challenge
+ // if openssl version is greater or equal to 3.0.0 and the rsautl command
+ // otherwise.
+ //
+ // Note that openssl 3.0.0 deprecates rsautl in favor of pkeyutl.
+ //
+ const char* sign_cmd;
+
+ try
+ {
+ optional<openssl_info> oi (openssl::info (trace, 2, ops.openssl ()));
+
+ sign_cmd = oi &&
+ oi->name == "OpenSSL" &&
+ oi->version >= semantic_version {3, 0, 0}
+ ? "pkeyutl"
+ : "rsautl";
+ }
+ catch (const system_error& e)
+ {
+ fail << "unable to obtain openssl version: " << e << endf;
}
for (unsigned int sleep (0);; ::sleep (sleep), sleep = 0)
{
- bootstrapped_machines ms (enumerate_machines (ops.machines ()));
-
- // Prepare task request.
- //
- task_request_manifest tq {
- hname,
- tc_name,
- tc_ver,
- imode,
- ilogin,
- fingerprint,
- machine_header_manifests {}
- };
+ pair<toolchain_lock, bootstrapped_machines> er (
+ enumerate_machines (ops.machines ()));
- // Note: below we assume tq.size () == ms.size ().
- //
- for (const bootstrapped_machine& m: ms)
- tq.machines.emplace_back (m.manifest.machine.id,
- m.manifest.machine.name,
- m.manifest.machine.summary);
+ toolchain_lock& tl (er.first);
+ bootstrapped_machines& ms (er.second);
- if (ops.dump_machines ())
+ // Determine the existing task priority range (using [0,0] if there are
+ // none) as well as whether we we should operate in the priority monitor
+ // mode.
+ //
+ uint64_t prio_min (~uint64_t (0));
+ uint64_t prio_max (0);
+ bool prio_mon (false);
{
- for (const machine_header_manifest& m: tq.machines)
- serialize_manifest (m, cout, "stdout", "machine");
+ uint16_t busy (0); // Number of build machines locked by other processes.
+ bool task (false); // There is a build machine performing a task.
- return 0;
- }
+ for (const bootstrapped_machine& m: ms)
+ {
+ if (!m.lock.locked ())
+ {
+ if (m.lock.prio) // Not bootstrapping/suspended.
+ {
+ if (m.manifest.machine.effective_role () != machine_role::auxiliary)
+ {
+ ++busy;
+ task = true;
- if (tq.machines.empty ())
- {
- // Normally this means all the machines are locked so sleep a bit less.
- //
- sleep = rand_sleep () / 2;
- continue;
+ if (prio_min > *m.lock.prio)
+ prio_min = *m.lock.prio;
+
+ if (prio_max < *m.lock.prio)
+ prio_max = *m.lock.prio;
+ }
+ }
+ else
+ ++busy; // Assume build machine (see enumerate_machines()).
+ }
+ }
+
+ if (prio_min > prio_max) // No tasks.
+ prio_min = prio_max;
+
+ if (inst_max != 0)
+ {
+ assert (busy <= inst_max);
+
+ if (busy == inst_max)
+ {
+ if (!task) // All bootstrapping/suspended.
+ {
+ sleep = rand_sleep ();
+ continue;
+ }
+
+ l2 ([&]{trace << "priority monitor, range [" << prio_min << ", "
+ << prio_max << "]";});
+
+ prio_mon = true;
+ }
+ }
}
- // Send task requests.
+ // If we get a task, these contain all the corresponding information.
//
- // Note that we have to do it while holding the lock on all the machines
- // since we don't know which machine we will need.
- //
- string url;
+ task_request_manifest tq;
task_response_manifest tr;
+ uint64_t prio;
+ string url;
- if (ops.fake_request_specified ())
+ // Iterate over controller priorities in reverse, that is, from highest to
+ // lowest (see the agent(1) man page for background on the priority
+ // levels).
+ //
+ // The following factors determine the lower bound of priorities we should
+ // consider:
+ //
+ // 1. If in the priority monitor mode, then we should only consider
+ // priorities that can interrupt the existing task with the lowest
+ // priority.
+ //
+ // Here is a representative sample of existing/interrupt priorities
+ // from which we derive the below formulae (remember that we only start
+ // interrupting from priority level 3):
+ //
+ // existing interrupt
+ // -------- ---------
+ // 5 >= 100
+ // 55 >= 100
+ // 555 >= 600
+ // 999 >= 1000
+ // 5055 >= 5100
+ // 5555 >= 5600
+ // 9999 >= 10000
+ //
+ // Essentially, what we need to do is discard the lowest 2 levels and
+ // add 100, moving the priority to the next 3rd level.
+ //
+ // 2. Otherwise, we should factor in the "don't ask for lower-priority
+ // tasks" semantics that applies from the second priority level.
+ //
+ // Note also that the other half of this logic is below where we determine
+ // which machines we offer for each priority.
+ //
+ auto ce (controllers.end ());
+ auto cb (controllers.lower_bound (
+ prio_mon ? ((prio_min / 100) * 100) + 100 :
+ prio_max >= 10 ? prio_max - 1 : // Including this priority.
+ 0)); // Any priority.
+
+ for (; cb != ce; )
{
- auto t (parse_manifest<task_manifest> (ops.fake_request (), "task"));
+ const pair<const uint64_t, strings>& pu (*--ce);
- tr = task_response_manifest {
- "fake-session", // Dummy session.
- nullopt, // No challenge.
- url, // Empty result URL.
- move (t)};
+ prio = pu.first;
+ const strings& urls (pu.second);
- url = "http://example.org";
- }
- else
- {
- // Note that after completing each task we always start from the
- // beginning of the list. This fact can be used to implement a poor
- // man's priority system where we will continue serving the first listed
- // controller for as long as it has tasks (and maybe in the future we
- // will implement a proper priority system).
+ // Prepare task request (it will be the same within a given priority).
+ //
+ tq = task_request_manifest {
+ hname,
+ tc_name,
+ tc_ver,
+ imode,
+ ilogin,
+ fingerprint,
+ ops.auxiliary_ram (),
+ machine_header_manifests {}};
+
+ // Determine which machines we need to offer for this priority.
//
- for (const string& u: controllers)
+ bool aux_only (true); // Only auxiliary machines are available.
{
+ bool interruptable (false); // There is build machine we can interrupt.
+ for (const bootstrapped_machine& m: ms)
+ {
+ const machine_manifest& mm (m.manifest.machine);
+ machine_role role (mm.effective_role ());
+
+ if (!m.lock.locked ())
+ {
+ if (!m.lock.prio) // Skip bootstrapping/suspended.
+ continue;
+
+ uint64_t eprio (*m.lock.prio);
+
+ // Determine if our priority can interrupt the existing task.
+ //
+ // Based on the above discussion of the priority lower bound
+ // determination (and some menditation) it's clear that we can
+ // only interrupt the existing task if our priority is (at least)
+ // on a higher 3rd level.
+ //
+ if ((prio / 100) <= (eprio / 100))
+ continue;
+
+ if (role != machine_role::auxiliary)
+ interruptable = true;
+ }
+
+ tq.machines.emplace_back (mm.id,
+ mm.name,
+ mm.summary,
+ role,
+ effective_ram_minimum (mm),
+ mm.ram_maximum);
+
+ aux_only = aux_only && role == machine_role::auxiliary;
+ }
+
+ // Sanity check: in the priority monitor mode we should only ask for a
+ // task if we can interrupt one (this should be taken care of by the
+ // priority lower bound calculation above).
+ //
+ assert (!prio_mon || interruptable);
+ }
+
+ if (ops.dump_machines ())
+ {
+ for (const machine_header_manifest& m: tq.machines)
+ serialize_manifest (m, cout, "stdout", "machine");
+
+ return 0;
+ }
+
+ if (aux_only)
+ tq.machines.clear ();
+
+ if (tq.machines.empty ())
+ {
+ // If we have no build machines for this priority then we won't have
+ // any for any lower priority so bail out.
+ //
+ break;
+ }
+
+ // Send task requests.
+ //
+ // Note that we have to do it while holding the lock on all the machines
+ // since we don't know which machine(s) we will need.
+ //
+ vector<strings::const_iterator> rurls (urls.size ());
+ std::iota (rurls.begin (), rurls.end (), urls.begin ());
+ std::shuffle (rurls.begin (), rurls.end (), rand_gen);
+
+ for (strings::const_iterator i: rurls)
+ {
+ const string& u (*i);
+
+ if (ops.fake_request_specified ())
+ {
+ auto t (parse_manifest<task_manifest> (ops.fake_request (), "task"));
+
+ tr = task_response_manifest {
+ "fake-session", // Dummy session.
+ nullopt, // No challenge.
+ string (), // Empty result URL.
+ vector<upload_url> (),
+ agent_checksum,
+ move (t)};
+
+ url = u;
+ break;
+ }
+
task_response_manifest r;
try
@@ -1401,7 +3144,11 @@ try
try
{
- serialize_manifest (tq, c.out, u, "task request", false);
+ serialize_manifest (tq,
+ c.out,
+ u,
+ "task request",
+ false /* fail_hard */);
}
catch (const failed&) {f = true;}
@@ -1436,8 +3183,8 @@ try
{
const task_manifest& t (*r.task);
- // For security reasons let's require the repository location to be
- // remote.
+ // For security reasons let's require the repository location to
+ // be remote.
//
if (t.repository.local ())
{
@@ -1460,13 +3207,27 @@ try
l2 ([&]{trace << "task for " << t.name << '/' << t.version << " "
<< "on " << t.machine << " "
- << "from " << u;});
+ << "from " << u << " "
+ << "priority " << prio;});
tr = move (r);
url = u;
break;
}
- }
+ } // url loop.
+
+ if (!tr.session.empty ()) // Got a task.
+ break;
+
+ } // prio loop.
+
+ if (tq.machines.empty ()) // No machines (auxiliary-only already handled).
+ {
+ // Normally this means all the machines are busy so sleep a bit less.
+ //
+ l2 ([&]{trace << "all machines are busy, sleeping";});
+ sleep = rand_sleep () / 2;
+ continue;
}
if (tr.session.empty ()) // No task from any of the controllers.
@@ -1478,22 +3239,69 @@ try
// We have a build task.
//
- // First find the index of the machine we were asked to use (and verify it
- // is one of those we sent). Also unlock all the other machines.
+ task_manifest& t (*tr.task);
+
+ // First verify the requested machines are from those we sent in tq and
+ // their roles match.
+ //
+ // Also verify the same machine is not picked multiple times by blanking
+ // out the corresponding entry in tq.machines. (Currently we are only
+ // capable of running one instance of each machine though we may want to
+ // relax that in the future, at which point we should send as many entries
+ // for the same machine in the task request as we are capable of running,
+ // applying the priority logic for each entry, etc).
//
- size_t i (ms.size ());
- for (size_t j (0); j != ms.size (); ++j)
{
- if (tq.machines[j].name == tr.task->machine)
- i = j;
- else
- ms[j].lock.unlock ();
+ auto check = [&tq, &url] (const string& name, machine_role r)
+ {
+ auto i (find_if (tq.machines.begin (), tq.machines.end (),
+ [&name] (const machine_header_manifest& mh)
+ {
+ return mh.name == name; // Yes, names, not ids.
+ }));
+
+ if (i == tq.machines.end ())
+ {
+ error << "task from " << url << " for unknown machine " << name;
+ return false;
+ }
+
+ if (i->effective_role () != r)
+ {
+ error << "task from " << url << " with mismatched role "
+ << " for machine " << name;
+ return false;
+ }
+
+ i->name.clear (); // Blank out.
+
+ return true;
+ };
+
+ auto check_aux = [&check] (const vector<auxiliary_machine>& ams)
+ {
+ for (const auxiliary_machine& am: ams)
+ if (!check (am.name, machine_role::auxiliary))
+ return false;
+ return true;
+ };
+
+ if (!check (t.machine, machine_role::build) ||
+ !check_aux (t.auxiliary_machines))
+ {
+ if (ops.dump_task ())
+ return 0;
+
+ continue;
+ }
}
- if (i == ms.size ())
+ // Also verify there are no more than 9 auxiliary machines (see the offset
+ // global variable for details).
+ //
+ if (t.auxiliary_machines.size () > 9)
{
- error << "task from " << url << " for unknown machine "
- << tr.task->machine;
+ error << "task from " << url << " with more than 9 auxiliary machines";
if (ops.dump_task ())
return 0;
@@ -1501,8 +3309,6 @@ try
continue;
}
- task_manifest& t (*tr.task);
-
if (ops.dump_task ())
{
serialize_manifest (t, cout, "stdout", "task");
@@ -1515,16 +3321,349 @@ try
if (!ops.trust ().empty ())
t.trust = ops.trust ();
- const dir_path& d (); // The -<toolchain> directory.
- const bootstrapped_machine_manifest& m ();
+ // Reset the worker checksum if the task's agent checksum doesn't match
+ // the current one.
+ //
+ // Note that since the checksums are hierarchical, such reset will trigger
+ // resets of the "subordinate" checksums (dependency checksum, etc).
+ //
+ if (!tr.agent_checksum || *tr.agent_checksum != agent_checksum)
+ t.worker_checksum = nullopt;
+
+ // Handle interrupts.
+ //
+ // Note that the interrupt can be triggered both by another process (the
+ // interrupt exception is thrown from perform_task()) as well as by this
+ // process in case we were unable to interrupt the other process (seeing
+ // that we have already received a task, responding with an interrupt
+ // feels like the most sensible option).
+ //
+ perform_task_result r;
+ bootstrapped_machine* pm (nullptr); // Build machine.
+ vector<bootstrapped_machine*> ams; // Auxiliary machines.
+ try
+ {
+ // First find the bootstrapped_machine instance in ms corresponding to
+ // the requested build machine. Also unlock all the other machines.
+ //
+ // While at it also see if we need to interrupt the selected machine (if
+ // busy), one of the existing (if we are at the max allowed instances,
+ // that is in the priority monitor mode), or all existing (if this is a
+ // priority level 4 task).
+ //
+ // Auxiliary machines complicate the matter a bit: we may now need to
+ // interrupt some subset of {build machine, auxiliary machines} that are
+ // necessary to perform this task. Note, however, that auxiliary
+ // machines are always subordinate to build machines, meaning that if
+ // there is a busy auxiliary machine, then there will be a busy build
+ // machine with the same pid/priority (and so if we interrup one
+ // auxiliary, then we will also interrupt the corresponding build plus
+ // any other auxiliaries it may be using). Based on that let's try to
+ // divide and conquer this by first dealing with build machines and then
+ // adding any auxiliary ones.
+ //
+ vector<bootstrapped_machine*> ims; // Machines to be interrupted.
+ size_t imt (0); // Number of "target" machines to interrupt (see below).
+
+ // First pass: build machines.
+ //
+ for (bootstrapped_machine& m: ms)
+ {
+ const machine_manifest& mm (m.manifest.machine);
+
+ if (mm.effective_role () == machine_role::auxiliary)
+ continue;
+
+ if (mm.name == t.machine)
+ {
+ assert (pm == nullptr); // Sanity check.
+ pm = &m;
+ }
+ else if (m.lock.locked ())
+ m.lock.unlock ();
+ else if (m.lock.prio) // Not bootstrapping/suspended.
+ {
+ // Only consider machines that we can interrupt (see above).
+ //
+ if ((prio / 100) > (*m.lock.prio / 100))
+ {
+ if (prio >= 1000) // Priority level 4 (interrupt all).
+ ims.push_back (&m);
+ else if (prio_mon)
+ {
+ // Find the lowest priority task to interrupt.
+ //
+ if (ims.empty ())
+ ims.push_back (&m);
+ else if (*m.lock.prio < *ims.back ()->lock.prio)
+ ims.back () = &m;
+ }
+ }
+ }
+ }
+
+ assert (pm != nullptr); // Sanity check.
+
+ if (!pm->lock.locked ())
+ {
+ assert (pm->lock.prio); // Sanity check (not bootstrapping/suspended).
+
+ if (prio >= 1000)
+ ims.insert (ims.begin (), pm); // Interrupt first (see below).
+ else
+ ims = {pm};
+
+ imt++;
+ }
+
+ // Second pass: auxiliary machines.
+ //
+ for (bootstrapped_machine& m: ms)
+ {
+ const machine_manifest& mm (m.manifest.machine);
+
+ if (mm.effective_role () != machine_role::auxiliary)
+ continue;
+
+ if (find_if (t.auxiliary_machines.begin (), t.auxiliary_machines.end (),
+ [&mm] (const auxiliary_machine& am)
+ {
+ return am.name == mm.name;
+ }) != t.auxiliary_machines.end ())
+ {
+ if (!m.lock.locked ())
+ {
+ assert (m.lock.prio); // Sanity check (not bootstrapping/suspended).
+
+ if (ims.empty ())
+ {
+ ims.push_back (&m);
+ }
+ else if (ims.front () == pm)
+ {
+ ims.insert (ims.begin () + 1, &m); // Interrupt early (see below).
+ }
+ else if (prio < 1000 && prio_mon && ams.empty () /* first */)
+ {
+ // Tricky: replace the lowest priority task we have picked on
+ // the first pass with this one.
+ //
+ assert (ims.size () == 1); // Sanity check.
+ ims.back () = &m;
+ }
+ else
+ ims.insert (ims.begin (), &m); // Interrupt first (see below).
+
+ imt++;
+ }
+
+ ams.push_back (&m);
+ }
+ else if (m.lock.locked ())
+ m.lock.unlock ();
+ }
+
+ // Note: the order of machines may not match.
+ //
+ assert (ams.size () == t.auxiliary_machines.size ()); // Sanity check.
+
+ assert (!prio_mon || !ims.empty ()); // We should have at least one.
+
+ // Move the toolchain lock into this scope so that it's automatically
+ // released on any failure (on the happy path it is released by
+ // perform_task()).
+ //
+ toolchain_lock& rtl (tl);
+ toolchain_lock tl (move (rtl));
+
+ // Interrupt the machines, if necessary.
+ //
+ // Note that if we are interrupting multiple machines, then the target
+ // build machine, if needs to be interrupted, must be first, followed
+ // but all the target auxiliary machines. This way if we are unable to
+ // successfully interrupt them, we don't interrupt the rest.
+ //
+ vector<pid_t> pids; // Avoid re-interrupting the same pid.
+ for (size_t i (0); i != ims.size (); ++i)
+ {
+ bootstrapped_machine* im (ims[i]);
+
+ // Sanity checks.
+ //
+ assert (!im->lock.locked () && im->lock.prio);
+ assert (im != pm || i == 0);
- result_manifest r (perform_task (ms[i].path, ms[i].manifest, t));
+ const dir_path& tp (im->path); // -<toolchain> path.
+ pid_t pid (im->lock.pid);
- ms[i].lock.unlock (); // No need to hold the lock any longer.
+ l2 ([&]{trace << "interrupting "
+ << (i < imt ? "target" : "lower priority")
+ << " machine " << tp << ", pid " << pid;});
+
+ // The plan is to send the interrupt and then wait for the lock.
+ //
+ // Note that the interrupt cannot be "lost" (or attributed to a
+ // different task) since we are sending it while holding the global
+ // lock and the other process arms it also while holding the global
+ // lock.
+ //
+ // But what can happen is the other task becomes suspended, which we
+ // will not be able to interrupt.
+ //
+ if (find (pids.begin (), pids.end (), pid) == pids.end ())
+ {
+ if (kill (pid, SIGUSR1) == -1)
+ {
+ // Ignore the case where there is no such process (e.g., the other
+ // process has terminated in which case the lock should be
+ // released automatically).
+ //
+ if (errno != ESRCH)
+ throw_generic_error (errno);
+ }
+
+ pids.push_back (pid);
+ }
+
+ // If we are interrupting additional machine in order to free up
+ // resources, there is no use acquiring their lock (or failing if
+ // unable to) since this is merely a performance optimization.
+ //
+ if (i >= imt)
+ continue;
+
+ // Try to lock the machine.
+ //
+ // While this normally shouldn't take long, there could be parts of
+ // the perform_task() logic that we do not interrupt and that may take
+ // some time.
+ //
+ machine_lock ml;
+
+ size_t retry (0);
+ for (; retry != 31; ++retry)
+ {
+ if (retry != 0)
+ ::sleep (1);
+
+ ml = lock_machine (tl, tp);
+
+ if (ml.locked ())
+ break;
+
+ if (ml.pid != pid)
+ {
+ error << "interrupted machine " << tp << " changed pid";
+ throw interrupt ();
+ }
+
+ if (!ml.prio) // Got suspended.
+ {
+ l2 ([&]{trace << "interrupted machine " << tp << " suspended";});
+ throw interrupt ();
+ }
+ }
+
+ if (!ml.locked ())
+ {
+ warn << "unable to lock interrupted machine " << tp << " within "
+ << (retry - 1) << "s";
+ throw interrupt ();
+ }
+
+ // This is an interrupted machine (build or auxiliary) that we will be
+ // using. See if it needs a re-bootstrap, the same as in
+ // enumerate_machines(). If not, then transfer the bootstrap manifest
+ // and lock.
+ //
+ const machine_manifest& mm (im->manifest.machine);
+
+ bootstrapped_machine_manifest bmm (
+ parse_manifest<bootstrapped_machine_manifest> (
+ tp / "manifest", "bootstrapped machine"));
+
+ bool rb (false);
+
+ if (bmm.machine.id != mm.id)
+ {
+ l3 ([&]{trace << "re-bootstrap " << tp << ": new machine";});
+ rb = true;
+ }
+
+ if (im == pm) // Only for build machine.
+ {
+ if (!tc_id.empty () && bmm.toolchain.id != tc_id)
+ {
+ l3 ([&]{trace << "re-bootstrap " << tp << ": new toolchain";});
+ rb = true;
+ }
+
+ if (int i = compare_bbot (bmm.bootstrap))
+ {
+ if (i < 0)
+ {
+ l3 ([&]{trace << "re-bootstrap " << tp << ": new bbot";});
+ rb = true;
+ }
+ else
+ {
+ l3 ([&]{trace << "ignoring " << tp << ": old bbot";});
+ rb = true;
+ }
+ }
+ }
+
+ // We are not going to try to re-bootstrap this machine "inline".
+ //
+ if (rb)
+ throw interrupt ();
+
+ im->manifest = move (bmm);
+ im->lock = move (ml);
+ }
+
+ // Check if we need to boost the number of CPUs to the full hardware
+ // concurrency.
+ //
+ optional<size_t> bcpus;
+ if (prio >= 10000)
+ bcpus = std::thread::hardware_concurrency ();
+
+ pm->lock.perform_task (tl, prio); // Build machine.
+ for (bootstrapped_machine* am: ams) // Auxiliary machines.
+ am->lock.perform_task (tl, prio);
+
+ r = perform_task (move (tl), *pm, ams, t, bcpus);
+ }
+ catch (const interrupt&)
+ {
+ // Note: no work_dir.
+ //
+ r = perform_task_result (
+ auto_rmdir (),
+ result_manifest {
+ t.name,
+ t.version,
+ result_status::interrupt,
+ operation_results {},
+ nullopt /* worker_checksum */,
+ nullopt /* dependency_checksum */});
+ }
+
+ // No need to hold the locks any longer.
+ //
+ if (pm != nullptr && pm->lock.locked ())
+ pm->lock.unlock ();
+
+ for (bootstrapped_machine* am: ams)
+ if (am->lock.locked ())
+ am->lock.unlock ();
+
+ result_manifest& rm (r.manifest);
if (ops.dump_result ())
{
- serialize_manifest (r, cout, "stdout", "result");
+ serialize_manifest (rm, cout, "stdout", "result");
return 0;
}
@@ -1539,7 +3678,7 @@ try
openssl os (trace,
fdstream_mode::text, path ("-"), 2,
- ops.openssl (), "rsautl",
+ ops.openssl (), sign_cmd,
ops.openssl_option (), "-sign", "-inkey", ops.auth_key ());
os.out << *tr.challenge;
@@ -1560,9 +3699,258 @@ try
fail << "unable to sign task response challenge: " << e;
}
+ // Re-package the build artifacts, if present, into the type/instance-
+ // specific archives and upload them to the type-specific URLs, if
+ // provided.
+ //
+ // Note that the initial upload archive content is organized as a bunch of
+ // upload/<type>/<instance>/*, where the second level directories are the
+ // upload types and the third level sub-directories are their instances.
+ // The resulting <instance>.tar archives content (which is what we submit
+ // to the type-specific handler) are organized as <instance>/*.
+ //
+ if (r.upload_archive && !tr.upload_urls.empty ())
+ {
+ const path& ua (*r.upload_archive);
+
+ // Extract the archive content into the parent directory of the archive
+ // file. But first, make sure the resulting directory doesn't exist.
+ //
+ // Note that while we don't assume where inside the working directory
+ // the archive is, we do assume that there is nothing clashing/precious
+ // in the upload/ directory which we are going to cleanup.
+ //
+ dir_path d (ua.directory ());
+
+ const dir_path ud (d / dir_path ("upload"));
+ try_rmdir_r (ud);
+
+ try
+ {
+ process_exit pe (
+ process_run_callback (
+ trace,
+ fdopen_null (), // Don't expect to read from stdin.
+ 2, // Redirect stdout to stderr.
+ 2,
+ "tar",
+ "-xf", ua,
+ "-C", d));
+
+ if (!pe)
+ fail << "tar " << pe;
+ }
+ catch (const system_error& e)
+ {
+ // There must be something wrong with the setup or there is no space
+ // left on the disk, thus the failure is fatal.
+ //
+ fail << "unable to extract build artifacts from archive: " << e;
+ }
+
+ try_rmfile (ua); // Let's free up the disk space.
+
+ // To decrease nesting a bit, let's collect the type-specific upload
+ // directories and the corresponding URLs first. This way we can also
+ // create the archive files as the upload/ directory sub-entries without
+ // interfering with iterating over this directory.
+ //
+ vector<pair<dir_path, string>> urls;
+
+ try
+ {
+ for (const dir_entry& te: dir_iterator (ud, dir_iterator::no_follow))
+ {
+ const string& t (te.path ().string ());
+
+ // Can only be a result of the worker malfunction, thus the failure
+ // is fatal.
+ //
+ if (te.type () != entry_type::directory)
+ fail << "unexpected filesystem entry '" << t << "' in " << ud;
+
+ auto i (find_if (tr.upload_urls.begin (), tr.upload_urls.end (),
+ [&t] (const upload_url& u) {return u.type == t;}));
+
+ if (i == tr.upload_urls.end ())
+ continue;
+
+ urls.emplace_back (ud / path_cast<dir_path> (te.path ()), i->url);
+ }
+ }
+ catch (const system_error& e)
+ {
+ fail << "unable to iterate over " << ud << ": " << e;
+ }
+
+ // Now create archives and upload.
+ //
+ for (const pair<dir_path, string>& p: urls)
+ {
+ const dir_path& td (p.first); // <type>/
+ const string& url (p.second);
+
+ try
+ {
+ for (const dir_entry& ie: dir_iterator (td, dir_iterator::no_follow))
+ {
+ const string& i (ie.path ().string ()); // <instance>
+
+ // Can only be a result of the worker malfunction, thus the
+ // failure is fatal.
+ //
+ if (ie.type () != entry_type::directory)
+ fail << "unexpected filesystem entry '" << i << "' in " << td;
+
+ // Archive the upload instance files and, while at it, calculate
+ // the resulting archive checksum.
+ //
+ sha256 sha;
+ auto_rmfile ari (ud / (i + ".tar"));
+
+ try
+ {
+ // Instruct tar to print the archive to stdout.
+ //
+ fdpipe in_pipe (fdopen_pipe (fdopen_mode::binary));
+
+ process pr (
+ process_start_callback (
+ trace,
+ fdopen_null (), // Don't expect to read from stdin.
+ in_pipe,
+ 2 /* stderr */,
+ "tar",
+ "--format", "ustar",
+ "-c",
+ "-C", td,
+ i));
+
+ // Shouldn't throw, unless something is severely damaged.
+ //
+ in_pipe.out.close ();
+
+ ifdstream is (
+ move (in_pipe.in), fdstream_mode::skip, ifdstream::badbit);
+
+ ofdstream os (ari.path, fdopen_mode::binary);
+
+ char buf[8192];
+ while (!eof (is))
+ {
+ is.read (buf, sizeof (buf));
+
+ if (size_t n = static_cast<size_t> (is.gcount ()))
+ {
+ sha.append (buf, n);
+ os.write (buf, n);
+ }
+ }
+
+ os.close ();
+
+ if (!pr.wait ())
+ fail << "tar " << *pr.exit;
+ }
+ catch (const system_error& e)
+ {
+ // There must be something wrong with the setup or there is no
+ // space left on the disk, thus the failure is fatal.
+ //
+ fail << "unable to archive " << td << i << "/: " << e;
+ }
+
+ // Post the upload instance archive.
+ //
+ using namespace http_service;
+
+ parameters params ({
+ {parameter::text, "session", tr.session},
+ {parameter::text, "instance", i},
+ {parameter::file, "archive", ari.path.string ()},
+ {parameter::text, "sha256sum", sha.string ()}});
+
+ if (challenge)
+ params.push_back ({
+ parameter::text, "challenge", base64_encode (*challenge)});
+
+ result pr (post (ops, url, params));
+
+ // Turn the potential upload failure into the "upload" operation
+ // error, amending the task result manifest.
+ //
+ if (pr.error)
+ {
+ // The "upload" operation result must be present (otherwise
+ // there would be nothing to upload). We can assume it is last.
+ //
+ assert (!rm.results.empty ());
+
+ operation_result& r (rm.results.back ());
+
+ // The "upload" operation result must be the last, if present.
+ //
+ assert (r.operation == "upload");
+
+ auto log = [&r, indent = false] (const string& t,
+ const string& l) mutable
+ {
+ if (indent)
+ r.log += " ";
+ else
+ indent = true;
+
+ r.log += t;
+ r.log += ": ";
+ r.log += l;
+ r.log += '\n';
+ };
+
+ log ("error",
+ "unable to upload " + td.leaf ().string () + '/' + i +
+ " build artifacts");
+
+ log ("error", *pr.error);
+
+ if (!pr.message.empty ())
+ log ("reason", pr.message);
+
+ if (pr.reference)
+ log ("reference", *pr.reference);
+
+ for (const manifest_name_value& nv: pr.body)
+ {
+ if (!nv.name.empty ())
+ log (nv.name, nv.value);
+ }
+
+ r.status |= result_status::error;
+ rm.status |= r.status;
+
+ break;
+ }
+ }
+
+ // Bail out on the instance archive upload failure.
+ //
+ if (!rm.status)
+ break;
+ }
+ catch (const system_error& e)
+ {
+ fail << "unable to iterate over " << td << ": " << e;
+ }
+ }
+ }
+
+ result_status rs (rm.status);
+
// Upload the result.
//
- result_request_manifest rq {tr.session, move (challenge), move (r)};
+ result_request_manifest rq {tr.session,
+ move (challenge),
+ agent_checksum,
+ move (rm)};
{
const string& u (*tr.result_url);
@@ -1586,7 +3974,20 @@ try
try
{
- serialize_manifest (rq, c.out, u, "task request");
+ // Don't break lines in the manifest values not to further increase
+ // the size of the result request manifest encoded representation.
+ // Note that this manifest can contain quite a few lines in the
+ // operation logs, potentially truncated to fit the upload limit
+ // (see worker/worker.cxx for details). Breaking these lines can
+ // increase the request size beyond this limit and so we can end up
+ // with the request failure.
+ //
+ serialize_manifest (rq,
+ c.out,
+ u,
+ "result request",
+ true /* fail_hard */,
+ true /* long_lines */);
}
catch (const failed&) {f = true;}
@@ -1602,8 +4003,9 @@ try
}
}
- l2 ([&]{trace << "built " << t.name << '/' << t.version << " "
- << "on " << t.machine << " "
+ l2 ([&]{trace << "built " << t.name << '/' << t.version << ' '
+ << "status " << rs << ' '
+ << "on " << t.machine << ' '
<< "for " << url;});
}
}
diff --git a/bbot/agent/agent.hxx b/bbot/agent/agent.hxx
index ba3719e..9c8400f 100644
--- a/bbot/agent/agent.hxx
+++ b/bbot/agent/agent.hxx
@@ -1,5 +1,5 @@
// file : bbot/agent/agent.hxx -*- C++ -*-
-// license : TBC; see accompanying LICENSE file
+// license : MIT; see accompanying LICENSE file
#ifndef BBOT_AGENT_AGENT_HXX
#define BBOT_AGENT_AGENT_HXX
@@ -22,14 +22,14 @@ namespace bbot
extern standard_version tc_ver; // Toolchain version.
extern string tc_id; // Toolchain id.
- extern uint16_t inst; // Instance number.
+ extern uint16_t inst; // Instance number (1-based).
extern string hname; // Our host name.
extern string hip; // Our IP address.
extern uid_t uid; // Our effective user id.
extern string uname; // Our effective user name.
- extern uint16_t offset; // Agent offset.
+ extern uint16_t offset; // Agent offset (10-9990; used for ports).
// Random number generator (currently not MT-safe and limited to RAND_MAX).
//
diff --git a/bbot/agent/http-service.cxx b/bbot/agent/http-service.cxx
new file mode 100644
index 0000000..1921edc
--- /dev/null
+++ b/bbot/agent/http-service.cxx
@@ -0,0 +1,364 @@
+// file : bbot/agent/http-service.cxx -*- C++ -*-
+// license : MIT; see accompanying LICENSE file
+
+#include <bbot/agent/http-service.hxx>
+
+#include <libbutl/curl.hxx>
+
+#include <bbot/diagnostics.hxx>
+
+using namespace std;
+using namespace butl;
+
+namespace bbot
+{
+ namespace http_service
+ {
+ result
+ post (const agent_options& o, const string& u, const parameters& params)
+ {
+ tracer trace ("http_service::post");
+
+ using parser = manifest_parser;
+ using parsing = manifest_parsing;
+ using name_value = manifest_name_value;
+
+ // The overall plan is to post the data using the curl program, read
+ // the HTTP response status and content type, read and parse the body
+ // according to the content type, and obtain the result message and
+ // optional reference in case of both the request success and failure.
+ //
+ // The successful request response (HTTP status code 200) is expected to
+ // contain the result manifest (text/manifest content type). The faulty
+ // response (HTTP status code other than 200) can either contain the
+ // result manifest or a plain text error description (text/plain content
+ // type) or some other content (for example text/html). We will return
+ // the manifest message value, if available or the first line of the
+ // plain text error description or, as a last resort, construct the
+ // message from the HTTP status code and reason phrase. We will also
+ // return the error description if anything goes wrong with the HTTP
+ // request or the response manifest status value is not 200.
+ //
+ string message;
+ optional<uint16_t> status; // Request result manifest status value.
+ optional<string> reference;
+ vector<name_value> body;
+ optional<string> error;
+
+ // None of the 3XX redirect code semantics assume automatic re-posting.
+ // We will treat all such codes as failures, adding the location header
+ // value to the message for troubleshooting.
+ //
+ optional<string> location;
+
+ // Convert the submit arguments to curl's --form* options and cache the
+ // pointer to the file_text parameter value, if present, for writing
+ // into curl's stdin.
+ //
+ strings fos;
+ const string* file_text (nullptr);
+
+ for (const parameter& p: params)
+ {
+ if (p.type == parameter::file_text)
+ {
+ assert (file_text == nullptr);
+ file_text = &p.value;
+ }
+
+ fos.emplace_back (p.type == parameter::file ||
+ p.type == parameter::file_text
+ ? "--form"
+ : "--form-string");
+
+ fos.emplace_back (
+ p.type == parameter::file ? p.name + "=@" + p.value :
+ p.type == parameter::file_text ? p.name + "=@-" :
+ p.name + '=' + p.value);
+ }
+
+ // Note that we prefer the low-level process API for running curl over
+ // using butl::curl because in this context it is restrictive and
+ // inconvenient.
+ //
+ // Start curl program.
+ //
+ // Text mode seems appropriate.
+ //
+ fdpipe in_pipe;
+ fdpipe out_pipe;
+ process pr;
+
+ try
+ {
+ in_pipe = fdopen_pipe ();
+
+ out_pipe = (file_text != nullptr
+ ? fdopen_pipe ()
+ : fdpipe {fdopen_null (), nullfd});
+
+ pr = process_start_callback (trace,
+ out_pipe.in.get () /* stdin */,
+ in_pipe /* stdout */,
+ 2 /* stderr */,
+ "curl",
+
+ // Include the response headers in the
+ // output so we can get the status
+ // code/reason, content type, and the
+ // redirect location.
+ //
+ "--include",
+
+ "--max-time", o.request_timeout (),
+ "--connect-timeout", o.connect_timeout (),
+ fos,
+ u);
+
+ // Shouldn't throw, unless something is severely damaged.
+ //
+ in_pipe.out.close ();
+ out_pipe.in.close ();
+ }
+ catch (const process_error& e)
+ {
+ fail << "unable to execute curl: " << e;
+ }
+ catch (const io_error& e)
+ {
+ fail << "unable to open pipe: " << e;
+ }
+
+ auto finish = [&pr, &error] (bool io_read = false, bool io_write = false)
+ {
+ if (!pr.wait ())
+ error = "curl " + to_string (*pr.exit);
+ else if (io_read)
+ error = "error reading curl output";
+ else if (io_write)
+ error = "error writing curl input";
+ };
+
+ bool io_write (false);
+ bool io_read (false);
+
+ try
+ {
+ // First we read the HTTP response status line and headers. At this
+ // stage we will read until the empty line (containing just CRLF). Not
+ // being able to reach such a line is an error, which is the reason
+ // for the exception mask choice.
+ //
+ ifdstream is (
+ move (in_pipe.in),
+ fdstream_mode::skip,
+ ifdstream::badbit | ifdstream::failbit | ifdstream::eofbit);
+
+ if (file_text != nullptr)
+ {
+ ofdstream os (move (out_pipe.out));
+ os << *file_text;
+ os.close ();
+
+ // Indicate to the potential IO error handling that we are done with
+ // writing.
+ //
+ file_text = nullptr;
+ }
+
+ auto bad_response = [] (const string& d) {throw runtime_error (d);};
+
+ curl::http_status rs;
+
+ try
+ {
+ rs = curl::read_http_status (is, false /* skip_headers */);
+ }
+ catch (const invalid_argument& e)
+ {
+ bad_response (
+ string ("unable to read HTTP response status line: ") + e.what ());
+ }
+
+ // Read through the response headers until the empty line is
+ // encountered and obtain the content type and/or the redirect
+ // location, if present.
+ //
+ optional<string> ctype;
+
+ // Check if the line contains the specified header and return its
+ // value if that's the case. Return nullopt otherwise.
+ //
+ // Note that we don't expect the header values that we are interested
+ // in to span over multiple lines.
+ //
+ string l;
+ auto header = [&l] (const char* name) -> optional<string>
+ {
+ size_t n (string::traits_type::length (name));
+ if (!(icasecmp (name, l, n) == 0 && l[n] == ':'))
+ return nullopt;
+
+ string r;
+ size_t p (l.find_first_not_of (' ', n + 1)); // The value begin.
+ if (p != string::npos)
+ {
+ size_t e (l.find_last_not_of (' ')); // The value end.
+ assert (e != string::npos && e >= p);
+
+ r = string (l, p, e - p + 1);
+ }
+
+ return optional<string> (move (r));
+ };
+
+ while (!(l = curl::read_http_response_line (is)).empty ())
+ {
+ if (optional<string> v = header ("Content-Type"))
+ ctype = move (v);
+ else if (optional<string> v = header ("Location"))
+ {
+ if ((rs.code >= 301 && rs.code <= 303) || rs.code == 307)
+ location = move (v);
+ }
+ }
+
+ assert (!eof (is)); // Would have already failed otherwise.
+
+ // Now parse the response payload if the content type is specified and
+ // is recognized (text/manifest or text/plain), skip it (with the
+ // ifdstream's close() function) otherwise.
+ //
+ // Note that eof and getline() fail conditions are not errors anymore,
+ // so we adjust the exception mask accordingly.
+ //
+ is.exceptions (ifdstream::badbit);
+
+ if (ctype)
+ {
+ if (icasecmp ("text/manifest", *ctype, 13) == 0)
+ {
+ parser p (is, "manifest");
+ name_value nv (p.next ());
+
+ if (nv.empty ())
+ bad_response ("empty manifest");
+
+ const string& n (nv.name);
+ string& v (nv.value);
+
+ // The format version pair is verified by the parser.
+ //
+ assert (n.empty () && v == "1");
+
+ body.push_back (move (nv)); // Save the format version pair.
+
+ auto bad_value = [&p, &nv] (const string& d) {
+ throw parsing (p.name (), nv.value_line, nv.value_column, d);};
+
+ // Get and verify the HTTP status.
+ //
+ nv = p.next ();
+ if (n != "status")
+ bad_value ("no status specified");
+
+ uint16_t c (curl::parse_http_status_code (v));
+ if (c == 0)
+ bad_value ("invalid HTTP status '" + v + '\'');
+
+ if (c != rs.code)
+ bad_value ("status " + v + " doesn't match HTTP response "
+ "code " + to_string (rs.code));
+
+ // Get the message.
+ //
+ nv = p.next ();
+ if (n != "message" || v.empty ())
+ bad_value ("no message specified");
+
+ message = move (v);
+
+ // Try to get an optional reference.
+ //
+ nv = p.next ();
+
+ if (n == "reference")
+ {
+ if (v.empty ())
+ bad_value ("empty reference specified");
+
+ reference = move (v);
+
+ nv = p.next ();
+ }
+
+ // Save the remaining name/value pairs.
+ //
+ for (; !nv.empty (); nv = p.next ())
+ body.push_back (move (nv));
+
+ status = c;
+ }
+ else if (icasecmp ("text/plain", *ctype, 10) == 0)
+ getline (is, message); // Can result in the empty message.
+ }
+
+ is.close (); // Detect errors.
+
+ // The only meaningful result we expect is the manifest (status code
+ // is not necessarily 200). We unable to interpret any other cases and
+ // so report them as a bad response.
+ //
+ if (!status)
+ {
+ if (rs.code == 200)
+ bad_response ("manifest expected");
+
+ if (message.empty ())
+ {
+ message = "HTTP status code " + to_string (rs.code);
+
+ if (!rs.reason.empty ())
+ message += " (" + lcase (rs.reason) + ')';
+ }
+
+ if (location)
+ message += ", new location: " + *location;
+
+ bad_response ("bad server response");
+ }
+ }
+ catch (const io_error&)
+ {
+ // Presumably the child process failed and issued diagnostics so let
+ // finish() try to deal with that first.
+ //
+ (file_text != nullptr ? io_write : io_read) = true;
+ }
+ // Handle all parsing errors, including the manifest_parsing exception
+ // that inherits from the runtime_error exception.
+ //
+ // Note that the io_error class inherits from the runtime_error class,
+ // so this catch-clause must go last.
+ //
+ catch (const runtime_error& e)
+ {
+ finish (); // Sets the error variable on process failure.
+
+ if (!error)
+ error = e.what ();
+ }
+
+ if (!error)
+ finish (io_read, io_write);
+
+ assert (error || (status && !message.empty ()));
+
+ if (!error && *status != 200)
+ error = "status code " + to_string (*status);
+
+ return result {
+ move (error), move (message), move (reference), move (body)};
+ }
+ }
+}
diff --git a/bbot/agent/http-service.hxx b/bbot/agent/http-service.hxx
new file mode 100644
index 0000000..b50c6b7
--- /dev/null
+++ b/bbot/agent/http-service.hxx
@@ -0,0 +1,71 @@
+// file : bbot/agent/http-service.hxx -*- C++ -*-
+// license : MIT; see accompanying LICENSE file
+
+#ifndef BBOT_AGENT_HTTP_SERVICE_HXX
+#define BBOT_AGENT_HTTP_SERVICE_HXX
+
+#include <libbutl/manifest-types.hxx>
+
+#include <bbot/types.hxx>
+#include <bbot/utility.hxx>
+
+#include <bbot/agent/agent-options.hxx>
+
+// NOTE: this implementation is inspired by the bdep's http_service::post()
+// function. The key difference is the result::error member which is used
+// to return rather than fail on upload errors.
+
+namespace bbot
+{
+ namespace http_service
+ {
+ // If type is file, then the value is a path to be uploaded. If type is
+ // file_text, then the value is a file content to be uploaded.
+ //
+ struct parameter
+ {
+ enum {text, file, file_text} type;
+ string name;
+ string value;
+ };
+ using parameters = vector<parameter>;
+
+ struct result
+ {
+ // If error is present, then it contains the description of why the
+ // upload failed. In this case message contains additional information.
+ //
+ optional<string> error;
+ string message;
+ optional<string> reference;
+
+ // Does not include status, message, or reference.
+ //
+ vector<butl::manifest_name_value> body;
+ };
+
+ // Submit text parameters and/or upload files to an HTTP service via the
+ // POST method. Use the multipart/form-data content type if any files are
+ // uploaded and application/x-www-form-urlencoded otherwise.
+ //
+ // Note: currently only one file_text parameter can be specified.
+ //
+ // Return the response manifest message and reference (if present, see
+ // below) and the rest of the manifest values, if any. If unable to
+ // retrieve the response manifest, the message can also be set to the
+ // first line of the plain text error description or, as a last resort,
+ // constructed from the HTTP status code and reason phrase. Issue
+ // diagnostics and throw failed if something is wrong with the setup
+ // (unable to execute curl, etc).
+ //
+ // Note that the HTTP service is expected to respond with the result
+ // manifest that starts with the 'status' (HTTP status code) and 'message'
+ // (diagnostics message) values optionally followed by 'reference' and
+ // then other manifest values.
+ //
+ result
+ post (const agent_options&, const string& url, const parameters&);
+ }
+}
+
+#endif // BBOT_AGENT_HTTP_SERVICE_HXX
diff --git a/bbot/agent/machine.cxx b/bbot/agent/machine.cxx
index a7dc192..74c9b93 100644
--- a/bbot/agent/machine.cxx
+++ b/bbot/agent/machine.cxx
@@ -1,5 +1,5 @@
// file : bbot/agent/machine.cxx -*- C++ -*-
-// license : TBC; see accompanying LICENSE file
+// license : MIT; see accompanying LICENSE file
#include <bbot/agent/machine.hxx>
@@ -83,9 +83,9 @@ namespace bbot
}
static string
- create_tap (const string& br, uint16_t port)
+ create_tap (const string& br, uint16_t machine_num, uint16_t port)
{
- string t ("tap" + to_string (offset));
+ string t ("tap" + to_string (offset + machine_num));
tracer trace ("create_tap", t.c_str ());
@@ -126,8 +126,10 @@ namespace bbot
string bridge; // Bridge interface to which this tap belongs
uint16_t port; // UDP port to forward TFTP traffic to.
- tap (string b, uint16_t p)
- : iface (create_tap (b, p)), bridge (move (b)), port (p) {}
+ tap (string b, uint16_t machine_num, uint16_t p)
+ : iface (create_tap (b, machine_num, p)),
+ bridge (move (b)),
+ port (p) {}
~tap ()
{
@@ -169,6 +171,9 @@ namespace bbot
public:
kvm_machine (const dir_path&,
const machine_manifest&,
+ uint16_t machine_num,
+ size_t cpus,
+ size_t ram,
const optional<string>& mac,
const string& br_iface,
uint16_t tftp_port,
@@ -213,71 +218,69 @@ namespace bbot
kvm_machine::
kvm_machine (const dir_path& md,
const machine_manifest& mm,
+ uint16_t m_num,
+ size_t cpus,
+ size_t ram,
const optional<string>& omac,
const string& br,
- uint16_t port,
+ uint16_t tftp_port,
bool pub_vnc)
: machine (mm.mac ? *mm.mac : // Fixed mac from machine manifest.
omac ? *omac : // Generated mac from previous bootstrap.
generate_mac ()),
kvm ("kvm"),
- net (br, port),
- vnc (machine_vnc (pub_vnc)),
+ net (br, m_num, tftp_port),
+ vnc (machine_vnc (m_num, pub_vnc)),
monitor ("/tmp/monitor-" + tc_name + '-' + to_string (inst))
{
tracer trace ("kvm_machine", md.string ().c_str ());
+ // Monitor path.
+ //
+ if (m_num != 0)
+ {
+ monitor += '-';
+ monitor += to_string (m_num);
+ }
+
if (sizeof (sockaddr_un::sun_path) <= monitor.size ())
throw invalid_argument ("monitor unix socket path too long");
// Machine name.
//
// While we currently can only have one running machine per toolchain, add
- // the instance number for debuggability.
+ // the instance number and non-0 machine number for debuggability.
//
string name (mm.name + '-' + tc_name + '-' + to_string (inst));
+ if (m_num != 0)
+ {
+ name += '-';
+ name += to_string (m_num);
+ }
+
// Machine log. Note that it is only removed with an explicit cleanup()
// call.
//
log = path ("/tmp/" + path::traits_type::temp_name (name) + ".log");
- // Map logical CPUs to sockets/cores/threads keeping the number of and
- // cores even. Failed that, QEMU just makes it a machine with that number
- // of sockets and some operating systems (like Windows) can only do two.
+ // Map logical CPUs to sockets/cores/threads keeping the number of sockets
+ // and cores even. Failed that, QEMU just makes it a machine with that
+ // number of sockets and some operating systems (like Windows) can only do
+ // two.
//
// Note that for best results you may want to adjust (e.g., by over-
// committing) the number of CPUs to be power of 2.
//
- size_t cpus (ops.cpu ()), cores (cpus);
+ size_t cores (cpus);
- size_t sockets (cores >= 16 && cores % 4 == 0 ? 2 :
- cores >= 64 && cores % 8 == 0 ? 4 : 1);
+ size_t sockets (cores >= 256 && cores % 8 == 0 ? 4 :
+ cores >= 128 && cores % 4 == 0 ? 2 : 1);
cores /= sockets;
- size_t threads (cores >= 8 && cores % 4 == 0 ? 2 : 1);
+ size_t threads (cores >= 16 && cores % 4 == 0 ? 2 : 1);
cores /= threads;
- // We probably don't want to commit all the available RAM to the VM since
- // some of it could be used on the host side for caching, etc. So the
- // heuristics that we will use is 4G or 1G per CPU, whichever is greater
- // and the rest divide equally between the host and the VM.
- //
- // But the experience showed that we actually want to be able to precisely
- // control the amount of RAM assigned to VMs (e.g., for tmpfs size) and
- // without back-fudging for this heuristics.
- //
-#if 0
- size_t ram ((cpus < 4 ? 4 : cpus) * 1024 * 1024); // Kb.
-
- if (ram > ops.ram ())
- ram = ops.ram ();
- else
- ram += (ops.ram () - ram) / 2;
-#else
- size_t ram (ops.ram ());
-#endif
-
// If we have options, use that instead of the default network and
// disk configuration.
//
@@ -309,11 +312,21 @@ namespace bbot
}
else
{
- auto add = [&os] (string o, string v)
+ // @@ TMP: libstud-optional issue #1.
+ //
+#if 0
+ auto add = [&os] (string o, optional<string> v = {})
{
os.push_back (move (o));
- os.push_back (move (v));
+ if (v) os.push_back (move (*v));
};
+#else
+ auto add = [&os] (string o, string v = {})
+ {
+ os.push_back (move (o));
+ if (!v.empty ()) os.push_back (move (v));
+ };
+#endif
// Network.
//
@@ -334,6 +347,17 @@ namespace bbot
//"-drive", "if=none,id=disk0,format=raw,file=disk.img"
//"-device", "virtio-scsi-pci,id=scsi"
//"-device", "scsi-hd,drive=disk0"
+
+ // USB settings.
+ //
+ // These options should make graphical VMs usable from VNC.
+ //
+ // Note that the "standard" USB bus may not be available on
+ // architectures other than x86 (e.g., aarch64).
+ //
+ add ("-usb");
+ add ("-device", "usb-kbd");
+ add ("-device", "usb-tablet");
}
// Setup QMP (QEMU Machine Protocol) monitor to act as a log.
@@ -373,6 +397,7 @@ namespace bbot
2, // 1>&2 (QMP goes to stdout)
qmp_out,
process_env (kvm, md, env), // Run from the machine's directory.
+ "-enable-kvm",
"-name", name + ",debug-threads=on",
"-S", // Start suspended.
"-boot", "c", // Boot from disk.
@@ -382,14 +407,10 @@ namespace bbot
// RTC settings.
//
"-rtc", "clock=vm,driftfix=slew",
+#ifdef __x86_64__
"-no-hpet",
"-global", "kvm-pit.lost_tick_policy=discard",
-
- // USB settings.
- //
- // This option should make graphical VMs usable from VNC.
- //
- "-usb", "-device", "usb-tablet",
+#endif
// These can override the above but not below.
//
@@ -397,7 +418,7 @@ namespace bbot
// RAM and CPU configuration.
//
- "-m", to_string (ram / 1024) + "M",
+ "-m", to_string (ram / 1024) + 'M',
"-smp", (to_string (cpus) +
",sockets=" + to_string (sockets) +
",cores=" + to_string (cores) +
@@ -413,7 +434,7 @@ namespace bbot
// collision-wise with anything useful.
//
"-vnc",
- (pub_vnc ? ":" : "127.0.0.1:") + to_string (offset), // 5900 + offset
+ (pub_vnc ? ":" : "127.0.0.1:") + to_string (offset + m_num), // 5900-base
// QMP.
//
@@ -422,7 +443,7 @@ namespace bbot
// Monitor.
//
- "-chardev", "socket,id=mon,path=" + monitor.string () + ",server,nowait",
+ "-chardev", "socket,id=mon,path=" + monitor.string () + ",server=on,wait=off",
"-mon", "chardev=mon,mode=readline");
qmp_out.close ();
@@ -441,7 +462,8 @@ namespace bbot
}
catch (const io_error& e)
{
- fail << "unable to initialize QMP: " << e;
+ fail << "unable to initialize QMP: " << e <<
+ info << "see " << log;
}
// Start execution.
@@ -452,7 +474,8 @@ namespace bbot
}
catch (const system_error& e)
{
- fail << "unable to communicate with qemu monitor: " << e;
+ fail << "unable to communicate with qemu monitor: " << e <<
+ info << "see " << log;
}
}
@@ -504,7 +527,8 @@ namespace bbot
if (r)
return true;
- fail << "unable to communicate with qemu monitor: " << e;
+ fail << "unable to communicate with qemu monitor: " << e <<
+ info << "see " << log;
}
return wait (seconds);
@@ -525,7 +549,8 @@ namespace bbot
if (wait (t, fh))
return;
- fail (fh) << "unable to communicate with qemu monitor: " << e;
+ fail (fh) << "unable to communicate with qemu monitor: " << e <<
+ info << "see " << log;
}
wait (fh);
@@ -540,7 +565,8 @@ namespace bbot
}
catch (const system_error& e)
{
- fail (fh) << "unable to communicate with qemu monitor: " << e;
+ fail (fh) << "unable to communicate with qemu monitor: " << e <<
+ info << "see " << log;
}
}
@@ -575,7 +601,8 @@ namespace bbot
}
catch (const process_error& e)
{
- fail (fh) << "unable to wait for " << kvm << ": " << e << endf;
+ fail (fh) << "unable to wait for " << kvm << ": " << e <<
+ info << "see " << log << endf;
}
}
@@ -645,30 +672,37 @@ namespace bbot
unique_ptr<machine>
start_machine (const dir_path& md,
const machine_manifest& mm,
+ uint16_t machine_num,
+ size_t cpus,
+ size_t ram,
const optional<string>& mac,
const string& br_iface,
uint16_t tftp_port,
bool pub_vnc)
{
+ assert (machine_num < 10);
+
switch (mm.type)
{
case machine_type::kvm:
return make_unique<kvm_machine> (
- md, mm, mac, br_iface, tftp_port, pub_vnc);
+ md, mm, machine_num, cpus, ram, mac, br_iface, tftp_port, pub_vnc);
case machine_type::nspawn:
- assert (false); //@@ TODO
+ assert (false); // @@ TODO
}
return nullptr;
}
string
- machine_vnc (bool pub)
+ machine_vnc (uint16_t num, bool pub)
{
+ assert (num < 10);
+
string r (pub ? hip : "127.0.0.1");
r += ':';
- r += to_string (5900 + offset);
+ r += to_string (5900 + offset + num);
return r;
}
}
diff --git a/bbot/agent/machine.hxx b/bbot/agent/machine.hxx
index 9a47d12..13646db 100644
--- a/bbot/agent/machine.hxx
+++ b/bbot/agent/machine.hxx
@@ -1,5 +1,5 @@
// file : bbot/agent/machine.hxx -*- C++ -*-
-// license : TBC; see accompanying LICENSE file
+// license : MIT; see accompanying LICENSE file
#ifndef BBOT_AGENT_MACHINE_HXX
#define BBOT_AGENT_MACHINE_HXX
@@ -78,19 +78,28 @@ namespace bbot
class machine_manifest;
+ // The machine number should be between 0-9 with 0 for the build machine and
+ // 1-9 for the auxiliary machines.
+ //
+ // Note that tftp_port is not a base (in other words, it is expected to
+ // already be appropriately offset).
+ //
unique_ptr<machine>
start_machine (const dir_path&,
const machine_manifest&,
+ uint16_t machine_num,
+ size_t cpus,
+ size_t ram, // In KiB.
const optional<string>& mac,
const string& br_iface,
uint16_t tftp_port,
- bool pub_vnc);
+ bool public_vnc);
// Return the machine's public or private VNC session endpoint in the
// '<ip>:<port>' form.
//
string
- machine_vnc (bool pub_vnc);
+ machine_vnc (uint16_t machine_num, bool public_vnc);
}
#endif // BBOT_AGENT_MACHINE_HXX
diff --git a/bbot/agent/tftp.cxx b/bbot/agent/tftp.cxx
index 0df0d1b..58aaabc 100644
--- a/bbot/agent/tftp.cxx
+++ b/bbot/agent/tftp.cxx
@@ -1,5 +1,5 @@
// file : bbot/agent/tftp.cxx -*- C++ -*-
-// license : TBC; see accompanying LICENSE file
+// license : MIT; see accompanying LICENSE file
#include <bbot/agent/tftp.hxx>
@@ -126,7 +126,7 @@ namespace bbot
ops.tftp ());
// This is not really accurate since tftpd will, for example, serve
- // an upload request until it is complete. But it's close anough for
+ // an upload request until it is complete. But it's close enough for
// our needs.
//
sec -= (inc - static_cast<size_t> (timeout.tv_sec));
diff --git a/bbot/agent/tftp.hxx b/bbot/agent/tftp.hxx
index 2d02b29..5306dd1 100644
--- a/bbot/agent/tftp.hxx
+++ b/bbot/agent/tftp.hxx
@@ -1,5 +1,5 @@
// file : bbot/agent/tftp.hxx -*- C++ -*-
-// license : TBC; see accompanying LICENSE file
+// license : MIT; see accompanying LICENSE file
#ifndef BBOT_AGENT_TFTP_HXX
#define BBOT_AGENT_TFTP_HXX
@@ -29,7 +29,7 @@ namespace bbot
port () const;
// Wait for a TFTP request for up to the specified number of seconds. If
- // increment is not 0, then wait in the specified incremenets (i.e., wait
+ // increment is not 0, then wait in the specified increments (i.e., wait
// for up to that number of seconds; useful when one needs to also
// periodically check for something else). Update the timeout value as
// well as return true if a request was served and false otherwise.