aboutsummaryrefslogtreecommitdiff
path: root/bbot/agent/agent.cxx
diff options
context:
space:
mode:
Diffstat (limited to 'bbot/agent/agent.cxx')
-rw-r--r--bbot/agent/agent.cxx235
1 files changed, 156 insertions, 79 deletions
diff --git a/bbot/agent/agent.cxx b/bbot/agent/agent.cxx
index a8b7c77..850f2b1 100644
--- a/bbot/agent/agent.cxx
+++ b/bbot/agent/agent.cxx
@@ -20,8 +20,10 @@
#include <sys/ioctl.h>
#include <sys/socket.h>
+#include <map>
#include <chrono>
#include <random>
+#include <iomanip> // setw()
#include <iostream>
#include <system_error> // generic_category()
@@ -1635,20 +1637,57 @@ try
offset = (tc_num - 1) * 100 + inst;
- // Controller URLs.
+ // Controller priority to URLs map.
//
- if (argc < 2 &&
- !ops.dump_machines () &&
- !ops.fake_request_specified ())
+ std::map<uint64_t, strings> controllers;
+
+ for (int i (1); i != argc; ++i)
{
- fail << "controller url expected" <<
- info << "run " << argv[0] << " --help for details";
- }
+ // [<prio>=]<url>
+ //
+ string a (argv[i]);
+
+ // See if we have priority, falling back to priority 0 if absent.
+ //
+ uint64_t prio (0);
- strings controllers;
+ // Note that we can also have `=` in <url> (e.g., parameters) so we will
+ // only consider `=` as ours if prior to it we only have digits.
+ //
+ size_t p (a.find ('='));
+ if (p != string::npos && a.find_first_not_of ("0123456789") == p)
+ {
+ // Require exactly four or five digits in case we later need to extend
+ // the priority levels beyond the 10 possible values (e.g., DDCCBBAA).
+ //
+ if (p != 4 && p != 5)
+ fail << "four or five-digit controller url priority expected in '"
+ << a << "'";
- for (int i (1); i != argc; ++i)
- controllers.push_back (argv[i]);
+ char* e;
+ errno = 0;
+ prio = strtoull (a.c_str (), &e, 10);
+ assert (errno != ERANGE && e == a.c_str () + p);
+
+ if (prio > 19999)
+ fail << "out of bounds controller url priority in '" << a << "'";
+
+ a.erase (0, p + 1);
+ }
+
+ controllers[prio].push_back (move (a));
+ }
+
+ if (controllers.empty ())
+ {
+ if (ops.dump_machines () || ops.fake_request_specified ())
+ {
+ controllers[0].push_back ("https://example.org");
+ }
+ else
+ fail << "controller url expected" <<
+ info << "run " << argv[0] << " --help for details";
+ }
// Handle SIGHUP and SIGTERM.
//
@@ -1710,8 +1749,16 @@ try
if (inst_max != 0)
dr << info << "instance max " << inst_max;
- for (const string& u: controllers)
- dr << info << "controller url " << u;
+ // Note: keep last since don't restore fill/setw.
+ //
+ for (const pair<const uint64_t, strings>& p: controllers)
+ {
+ for (const string& u: p.second)
+ {
+ dr.os.fill ('0');
+ dr << info << "controller url " << std::setw (4) << p.first << '=' << u;
+ }
+ }
}
// The work loop. The steps we go through are:
@@ -1815,79 +1862,94 @@ try
continue;
}
- // Prepare task request.
+ // If we get a task, these contain all the corresponding information.
//
- task_request_manifest tq {
- hname,
- tc_name,
- tc_ver,
- imode,
- ilogin,
- fingerprint,
- machine_header_manifests {}
- };
-
- // Note: do not assume tq.machines.size () == ms.size ().
+ task_request_manifest tq;
+ task_response_manifest tr;
+ uint64_t prio;
+ string url;
+
+ // Iterate over controller priorities in reverse, that is, from highest to
+ // lowest.
//
- for (const bootstrapped_machine& m: ms)
- {
- // @@ For now skip machines locked by other processes.
- //
- // @@ Note: skip machines being bootstrapped.
- //
- if (m.lock.locked ())
- tq.machines.emplace_back (m.manifest.machine.id,
- m.manifest.machine.name,
- m.manifest.machine.summary);
- }
+ // @@ Note: doing it in terms of direct iterators in anticipation for
+ // lower_bound().
+ //
+ auto cb (controllers.begin ());
+ auto ce (controllers.end ());
- if (ops.dump_machines ())
+ for (; cb != ce; )
{
- for (const machine_header_manifest& m: tq.machines)
- serialize_manifest (m, cout, "stdout", "machine");
+ const pair<const uint64_t, strings>& pu (*--ce);
- return 0;
- }
+ prio = pu.first;
+ const strings& urls (pu.second);
- if (tq.machines.empty ())
- {
- // Normally this means all the machines are locked so sleep a bit less.
+ // Prepare task request (it will be the same within a given priority).
//
- sleep = rand_sleep () / 2;
- continue;
- }
+ tq = task_request_manifest {
+ hname,
+ tc_name,
+ tc_ver,
+ imode,
+ ilogin,
+ fingerprint,
+ machine_header_manifests {}};
+
+ // Note: do not assume tq.machines.size () == ms.size ().
+ //
+ for (const bootstrapped_machine& m: ms)
+ {
+ // @@ For now skip machines locked by other processes.
+ //
+ // @@ Note: skip machines being bootstrapped.
+ //
+ if (m.lock.locked ())
+ tq.machines.emplace_back (m.manifest.machine.id,
+ m.manifest.machine.name,
+ m.manifest.machine.summary);
+ }
- // Send task requests.
- //
- // Note that we have to do it while holding the lock on all the machines
- // since we don't know which machine we will need.
- //
- string url;
- task_response_manifest tr;
+ if (ops.dump_machines ())
+ {
+ for (const machine_header_manifest& m: tq.machines)
+ serialize_manifest (m, cout, "stdout", "machine");
- if (ops.fake_request_specified ())
- {
- auto t (parse_manifest<task_manifest> (ops.fake_request (), "task"));
+ return 0;
+ }
- tr = task_response_manifest {
- "fake-session", // Dummy session.
- nullopt, // No challenge.
- url, // Empty result URL.
- agent_checksum,
- move (t)};
+ if (tq.machines.empty ())
+ {
+ // If we have no machines for this priority then we won't have any
+ // for any lower priority so bail out.
+ //
+ break;
+ }
- url = "http://example.org";
- }
- else
- {
- // Note that after completing each task we always start from the
- // beginning of the list. This fact can be used to implement a poor
- // man's priority system where we will continue serving the first listed
- // controller for as long as it has tasks (and maybe in the future we
- // will implement a proper priority system).
+ // Send task requests.
+ //
+ // Note that we have to do it while holding the lock on all the machines
+ // since we don't know which machine we will need.
+ //
+ // @@ TODO: need to iterate in random order somehow.
//
- for (const string& u: controllers)
+ for (const string& u: urls)
{
+ if (ops.fake_request_specified ())
+ {
+ auto t (parse_manifest<task_manifest> (ops.fake_request (), "task"));
+
+ tr = task_response_manifest {
+ "fake-session", // Dummy session.
+ nullopt, // No challenge.
+ string (), // Empty result URL.
+ agent_checksum,
+ move (t)};
+
+ url = u;
+ break;
+ }
+
task_response_manifest r;
try
@@ -1903,8 +1965,9 @@ try
"--max-time", ops.request_timeout (),
"--connect-timeout", ops.connect_timeout ());
- // This is tricky/hairy: we may fail hard parsing the output before
- // seeing that curl exited with an error and failing softly.
+ // This is tricky/hairy: we may fail hard parsing the output
+ // before seeing that curl exited with an error and failing
+ // softly.
//
bool f (false);
@@ -1949,8 +2012,8 @@ try
{
const task_manifest& t (*r.task);
- // For security reasons let's require the repository location to be
- // remote.
+ // For security reasons let's require the repository location to
+ // be remote.
//
if (t.repository.local ())
{
@@ -1973,13 +2036,27 @@ try
l2 ([&]{trace << "task for " << t.name << '/' << t.version << " "
<< "on " << t.machine << " "
- << "from " << u;});
+ << "from " << u << " "
+ << "priority " << prio;});
tr = move (r);
url = u;
break;
}
- }
+ } // url loop.
+
+ if (!tr.session.empty ()) // Got a task.
+ break;
+
+ } // prio loop.
+
+ if (tq.machines.empty ()) // No machines.
+ {
+ // Normally this means all the machines are busy so sleep a bit less.
+ //
+ l2 ([&]{trace << "all machines are busy, sleeping";});
+ sleep = rand_sleep () / 2;
+ continue;
}
if (tr.session.empty ()) // No task from any of the controllers.
@@ -2006,7 +2083,7 @@ try
{
if (mh.name == m.manifest.machine.name)
{
- m.lock.write (tl, 1234 /* prio */);
+ m.lock.write (tl, prio);
pm = &m;
}
else