diff options
author | Boris Kolpackov <boris@codesynthesis.com> | 2019-06-24 12:01:19 +0200 |
---|---|---|
committer | Karen Arutyunov <karen@codesynthesis.com> | 2019-07-01 18:13:55 +0300 |
commit | 977d07a3ae47ef204665d1eda2d642e5064724f3 (patch) | |
tree | 525a3d6421f61ce789b690191d3c30fc09be3517 /libbuild2/scheduler.hxx | |
parent | 7161b24963dd9da4d218f92c736b77c35c328a2d (diff) |
Split build system into library and driver
Diffstat (limited to 'libbuild2/scheduler.hxx')
-rw-r--r-- | libbuild2/scheduler.hxx | 709 |
1 files changed, 709 insertions, 0 deletions
diff --git a/libbuild2/scheduler.hxx b/libbuild2/scheduler.hxx new file mode 100644 index 0000000..09c9e02 --- /dev/null +++ b/libbuild2/scheduler.hxx @@ -0,0 +1,709 @@ +// file : libbuild2/scheduler.hxx -*- C++ -*- +// copyright : Copyright (c) 2014-2019 Code Synthesis Ltd +// license : MIT; see accompanying LICENSE file + +#ifndef LIBBUILD2_SCHEDULER_HXX +#define LIBBUILD2_SCHEDULER_HXX + +#include <list> +#include <mutex> +#include <tuple> +#include <atomic> +#include <type_traits> // aligned_storage, etc +#include <condition_variable> + +#include <libbuild2/types.hxx> +#include <libbuild2/utility.hxx> + +#include <libbuild2/export.hxx> + +namespace build2 +{ + // Scheduler of tasks and threads. Works best for "substantial" tasks (e.g., + // running a process), where in comparison thread synchronization overhead + // is negligible. + // + // A thread (called "master") may need to perform several tasks which can be + // done in parallel (e.g., update all the prerequisites or run all the + // tests). To acomplish this, the master, via a call to async(), can ask the + // scheduler to run a task in another thread (called "helper"). If a helper + // is available, then the task is executed asynchronously by such a helper. + // Otherwise, the task is (normally) executed synchronously as part of the + // wait() call below. However, in certain cases (serial execution or full + // queue), the task may be executed synchronously as part of the async() + // call itself. Once the master thread has scheduled all the tasks, it calls + // wait() to await for their completion. + // + // The scheduler makes sure that only a certain number of threads (for + // example, the number of available hardware threads) are "active" at any + // given time. When a master thread calls wait(), it is "suspended" until + // all its asynchronous tasks are completed (at which point it becomes + // "ready"). A suspension of a master results in either another ready master + // being "resumed" or another helper thread becoming available. + // + // On completion of a task a helper thread returns to the scheduler which + // can again lead either to a ready master being resumed (in which case the + // helper is suspended) or the helper becoming available to perform another + // task. + // + // Note that suspended threads are not reused as helpers. Rather, a new + // helper thread is always created if none is available. This is done to + // allow a ready master to continue as soon as possible. If it were reused + // as a helper, then it could be blocked on a nested wait() further down the + // stack. All this means that the number of threads created by the scheduler + // will normally exceed the maximum active allowed. + // + class LIBBUILD2_SYMEXPORT scheduler + { + public: + using atomic_count = std::atomic<size_t>; + + // F should return void and not throw any exceptions. The way the result + // of a task is communicated back to the master thread is ad hoc, usually + // via "out" arguments. Such result(s) can only be retrieved by the master + // once its task count reaches the start count. + // + // The argument passing semantics is the same as for std::thread. In + // particular, lvalue-references are passed as copies (use ref()/cref() + // for the by-reference semantics), except the case where the task is + // executed synchronously and as part of the async() call itself (this + // subtlety can become important when passing shared locks; you would + // only want it to be copied if the task is queued). + // + // Return true if the task was queued and false if it was executed + // synchronously. + // + // If the scheduler is shutdown, throw system_error(ECANCELED). + // + template <typename F, typename... A> + bool + async (size_t start_count, atomic_count& task_count, F&&, A&&...); + + template <typename F, typename... A> + bool + async (atomic_count& task_count, F&& f, A&&... a) + { + return async (0, task_count, forward<F> (f), forward<A> (a)...); + } + + // Wait until the task count reaches the start count or less. If the + // scheduler is shutdown while waiting, throw system_error(ECANCELED). + // Return the value of task count. Note that this is a synchronizaiton + // point (i.e., the task count is checked with memory_order_acquire). + // + // Note that it is valid to wait on another thread's task count (that is, + // without making any async() calls in this thread). However, if the start + // count differs from the one passed to async(), then whomever sets the + // start count to this alternative value must also call resume() below + // in order to signal waiting threads. + // + // Note also that in this case (waiting on someone else's start count), + // the async() call could execute the tasks synchronously without ever + // incrementing the task count. Thus if waiting on another thread's start + // count starts before/during async() calls, then it must be "gated" with + // an alternative (lower) start count. + // + // Finally, if waiting on someone else's start count, it may be unsafe + // (from the deadlock's point of view) to continue working through our own + // queue (i.e., we may block waiting on a task that has been queued before + // us which in turn may end up waiting on "us"). + // + enum work_queue + { + work_none, // Don't work own queue. + work_one, // Work own queue rechecking the task count after every task. + work_all // Work own queue before rechecking the task count. + }; + + size_t + wait (size_t start_count, + const atomic_count& task_count, + work_queue = work_all); + + size_t + wait (const atomic_count& task_count, work_queue wq = work_all) + { + return wait (0, task_count, wq); + } + + // Resume threads waiting on this task count. + // + void + resume (const atomic_count& task_count); + + // An active thread that is about to wait for potentially significant time + // on something other than task_count (e.g., mutex, condition variable) + // should deactivate itself with the scheduler and then reactivate once + // done waiting. + // + void + deactivate (); + + void + activate (bool collision = false); + + // Sleep for the specified duration, deactivating the thread before going + // to sleep and re-activating it after waking up (which means this + // function may sleep potentially significantly longer than requested). + // + void + sleep (const duration&); + + // Startup and shutdown. + // + public: + // Unless already shut down, call shutdown() but ignore errors. + // + ~scheduler (); + + // Create a shut down scheduler. + // + scheduler () = default; + + // Create a started up scheduler. + // + // The initial active argument is the number of threads to assume are + // already active (e.g., the calling thread). It must not be 0 (since + // someone has to schedule the first task). + // + // If the maximum threads or task queue depth arguments are unspecified, + // then appropriate defaults are used. + // + explicit + scheduler (size_t max_active, + size_t init_active = 1, + size_t max_threads = 0, + size_t queue_depth = 0, + optional<size_t> max_stack = nullopt) + { + startup (max_active, init_active, max_threads, queue_depth, max_stack); + } + + // Start the scheduler. + // + void + startup (size_t max_active, + size_t init_active = 1, + size_t max_threads = 0, + size_t queue_depth = 0, + optional<size_t> max_stack = nullopt); + + // Return true if the scheduler was started up. + // + // Note: can only be called from threads that have observed creation, + // startup, or shutdown. + // + bool + started () const {return !shutdown_;} + + // Tune a started up scheduler. + // + // Currently one cannot increase the number of max_active. Pass 0 to + // restore the initial value. + // + // Note that tuning can only be done while the scheduler is inactive, that + // is, no threads are executing a task or are suspended. For example, in a + // setup with a single initial active thread that would be after a return + // from the top-level wait() call. + // + void + tune (size_t max_active); + + // Return true if the scheduler is configured to run tasks serially. + // + // Note: can only be called from threads that have observed startup. + // + bool + serial () const {return max_active_ == 1;} + + // Wait for all the helper threads to terminate. Throw system_error on + // failure. Note that the initially active threads are not waited for. + // Return scheduling statistics. + // + struct stat + { + size_t thread_max_active = 0; // max # of active threads allowed. + size_t thread_max_total = 0; // max # of total threads allowed. + size_t thread_helpers = 0; // # of helper threads created. + size_t thread_max_waiting = 0; // max # of waiters at any time. + + size_t task_queue_depth = 0; // # of entries in a queue (capacity). + size_t task_queue_full = 0; // # of times task queue was full. + size_t task_queue_remain = 0; // # of tasks remaining in queue. + + size_t wait_queue_slots = 0; // # of wait slots (buckets). + size_t wait_queue_collisions = 0; // # of times slot had been occupied. + }; + + stat + shutdown (); + + // Progress monitoring. + // + // Setting and clearing of the monitor is not thread-safe. That is, it + // should be set before any tasks are queued and cleared after all of + // them have completed. + // + // The counter must go in one direction, either increasing or decreasing, + // and should contain the initial value during the call. Zero threshold + // value is reserved. + // + struct monitor_guard + { + explicit + monitor_guard (scheduler* s = nullptr): s_ (s) {} + monitor_guard (monitor_guard&& x): s_ (x.s_) {x.s_ = nullptr;} + monitor_guard& operator= (monitor_guard&& x) + { + if (&x != this) + { + s_ = x.s_; + x.s_ = nullptr; + } + return *this; + } + + ~monitor_guard () + { + if (s_ != nullptr) + { + lock l (s_->wait_idle ()); // See monitor() for details. + s_->monitor_count_ = nullptr; + s_->monitor_func_ = nullptr; + } + } + + explicit operator bool () const {return s_ != nullptr;} + + private: + scheduler* s_; + }; + + monitor_guard + monitor (atomic_count&, size_t threshold, function<size_t (size_t)>); + + // If initially active thread(s) (besides the one that calls startup()) + // exist before the call to startup(), then they must call join() before + // executing any tasks. The two common cases where you don't have to call + // join are a single active thread that calls startup()/shutdown() or + // active thread(s) that are created after startup(). + // + void + join () + { + assert (queue () == nullptr); + + // Lock the mutex to make sure the values set in startup() are visible + // in this thread. + // + lock l (mutex_); + } + + // If initially active thread(s) participate in multiple schedulers and/or + // sessions (intervals between startup() and shutdown()), then they must + // call leave() before joining another scheduler/session. Note that this + // applies to the active thread that calls shutdown(). Note that a thread + // can only participate in one scheduler at a time. + // + void + leave () + { + queue (nullptr); + } + + // Return the number of hardware threads or 0 if unable to determine. + // + static size_t + hardware_concurrency () + { + return std::thread::hardware_concurrency (); + } + + // Return a prime number that can be used as a lock shard size that's + // appropriate for the scheduler's concurrency. Use power of two values + // for mul for higher-contention shards and for div for lower-contention + // ones. Always return 1 for serial execution. + // + // Note: can only be called from threads that have observed startup. + // + size_t + shard_size (size_t mul = 1, size_t div = 1) const; + + // Assuming all the task have been executed, busy-wait for all the threads + // to become idle. Return the lock over the scheduler mutex. Normally you + // don't need to call this function directly. + // + using lock = std::unique_lock<std::mutex>; + + lock + wait_idle (); + + private: + void + activate_helper (lock&); + + void + create_helper (lock&); + + // We restrict ourselves to a single pointer as an argument in hope of + // a small object optimization. Return NULL. + // + // Note that the return type is void* to make the function usable with + // pthreads (see scheduler.cxx for details). + // + static void* + helper (void*); + + size_t + suspend (size_t start_count, const atomic_count& task_count); + + // Task encapsulation. + // + template <typename F, typename... A> + struct task_type + { + using func_type = std::decay_t<F>; + using args_type = std::tuple<std::decay_t<A>...>; + + atomic_count* task_count; + size_t start_count; + func_type func; + args_type args; + + template <size_t... i> + void + thunk (std::index_sequence<i...>) + { + move (func) (std::get<i> (move (args))...); + } + }; + + template <typename F, typename... A> + static void + task_thunk (scheduler&, lock&, void*); + + template <typename T> + static std::decay_t<T> + decay_copy (T&& x) {return forward<T> (x);} + + private: + // Monitor. + // + atomic_count* monitor_count_ = nullptr; // NULL if not used. + atomic_count monitor_tshold_; // 0 means locked. + size_t monitor_init_; // Initial count. + function<size_t (size_t)> monitor_func_; + + std::mutex mutex_; + bool shutdown_ = true; // Shutdown flag. + + optional<size_t> max_stack_; + + // The constraints that we must maintain: + // + // active <= max_active + // (init_active + helpers) <= max_threads (soft; see activate_helper()) + // + // Note that the first three are immutable between startup() and + // shutdown() so can be accessed without a lock (but see join()). + // + size_t init_active_ = 0; // Initially active threads. + size_t max_active_ = 0; // Maximum number of active threads. + size_t max_threads_ = 0; // Maximum number of total threads. + + size_t helpers_ = 0; // Number of helper threads created so far. + + // Every thread that we manage must be accounted for in one of these + // counters. And their sum should equal (init_active + helpers). + // + size_t active_ = 0; // Active master threads executing a task. + size_t idle_ = 0; // Idle helper threads waiting for a task. + size_t waiting_ = 0; // Suspended master threads waiting for their tasks. + size_t ready_ = 0; // Ready master thread waiting to become active. + size_t starting_ = 0; // Helper threads starting up. + + // Original values (as specified during startup) that can be altered via + // tuning. + // + size_t orig_max_active_ = 0; + + std::condition_variable idle_condv_; // Idle helpers queue. + std::condition_variable ready_condv_; // Ready masters queue. + + // Statistics counters. + // + size_t stat_max_waiters_; + size_t stat_wait_collisions_; + + // Progress counter. + // + // We increment it for each active->waiting->ready->active transition + // and it is used for deadlock detection (see deactivate()). + // + size_t progress_; + + // Wait queue. + // + // A wait slot blocks a bunch of threads. When they are (all) unblocked, + // they re-examine their respective conditions and either carry on or + // block again. + // + // The wait queue is a shard of slots. A thread picks a slot based on the + // address of its task count variable. How many slots do we need? This + // depends on the number of waiters that we can have which cannot be + // greater than the total number of threads. + // + // The pointer to the task count is used to identify the already waiting + // group of threads for collision statistics. + // + struct wait_slot + { + std::mutex mutex; + std::condition_variable condv; + size_t waiters = 0; + const atomic_count* task_count; + bool shutdown = true; + }; + + size_t wait_queue_size_; // Proportional to max_threads. + unique_ptr<wait_slot[]> wait_queue_; + + // Task queue. + // + // Each queue has its own mutex plus we have an atomic total count of the + // queued tasks. Note that it should only be modified while holding one + // of the queue locks. + // + atomic_count queued_task_count_; + + // For now we only support trivially-destructible tasks. + // + struct task_data + { + std::aligned_storage<sizeof (void*) * 8>::type data; + void (*thunk) (scheduler&, lock&, void*); + }; + + // We have two requirements: Firstly, we want to keep the master thread + // (the one that called wait()) busy working though its own queue for as + // long as possible before (if at all) it "reincarnates" as a helper. The + // main reason for this is the limited number of helpers we can create. + // + // Secondly, we don't want to block wait() longer than necessary since the + // master thread can do some work with the result. Plus, overall, we want + // to "unwind" task hierarchies as soon as possible since they hold up + // resources such as thread's stack. All this means that the master thread + // can only work through tasks that it has queued at this "level" of the + // async()/wait() calls since we know that wait() cannot return until + // they are done. + // + // To satisfy the first requirement, the master and helper threads get the + // tasks from different ends of the queue: master from the back while + // helpers from the front. And the master always adds new tasks to the + // back. + // + // To satisfy the second requirement, the master thread stores the index + // of the first task it has queued at this "level" and makes sure it + // doesn't try to deque any task beyond that. + // + size_t task_queue_depth_; // Multiple of max_active. + + struct task_queue + { + std::mutex mutex; + bool shutdown = false; + + size_t stat_full = 0; // Number of times push() returned NULL. + + // Our task queue is circular with head being the index of the first + // element and tail -- of the last. Since this makes the empty and one + // element cases indistinguishable, we also keep the size. + // + // The mark is an index somewhere between (figuratively speaking) head + // and tail, if enabled. If the mark is hit, then it is disabled until + // the queue becomes empty or it is reset by a push. + // + size_t head = 0; + size_t mark = 0; + size_t tail = 0; + size_t size = 0; + + unique_ptr<task_data[]> data; + + task_queue (size_t depth): data (new task_data[depth]) {} + }; + + // Task queue API. Expects the queue mutex to be locked. + // + + // Push a new task to the queue returning a pointer to the task data to be + // filled or NULL if the queue is full. + // + task_data* + push (task_queue& tq) + { + size_t& s (tq.size); + size_t& t (tq.tail); + size_t& m (tq.mark); + + if (s != task_queue_depth_) + { + // normal wrap empty + // | | | + t = s != 0 ? (t != task_queue_depth_ - 1 ? t + 1 : 0) : t; + s++; + + if (m == task_queue_depth_) // Enable the mark if first push. + m = t; + + queued_task_count_.fetch_add (1, std::memory_order_release); + return &tq.data[t]; + } + + return nullptr; + } + + bool + empty_front (task_queue& tq) const {return tq.size == 0;} + + void + pop_front (task_queue& tq, lock& ql) + { + size_t& s (tq.size); + size_t& h (tq.head); + size_t& m (tq.mark); + + bool a (h == m); // Adjust mark? + task_data& td (tq.data[h]); + + // normal wrap empty + // | | | + h = s != 1 ? (h != task_queue_depth_ - 1 ? h + 1 : 0) : h; + + if (--s == 0 || a) + m = h; // Reset or adjust the mark. + + execute (ql, td); + } + + bool + empty_back (task_queue& tq) const + { + return tq.size == 0 || tq.mark == task_queue_depth_; + } + + void + pop_back (task_queue& tq, lock& ql) + { + size_t& s (tq.size); + size_t& t (tq.tail); + size_t& m (tq.mark); + + bool a (t == m); // Adjust mark? + + task_data& td (tq.data[t]); + + // Save the old queue mark and disable it in case the task we are about + // to run adds sub-tasks. The first push(), if any, will reset it. + // + size_t om (m); + m = task_queue_depth_; + + // normal wrap empty + // | | | + t = s != 1 ? (t != 0 ? t - 1 : task_queue_depth_ - 1) : t; + --s; + + execute (ql, td); + + // Restore the old mark (which we might have to adjust). + // + if (s == 0) + m = t; // Reset the mark. + else if (a) + m = task_queue_depth_; // Disable the mark. + else + // What happens if head goes past the old mark? In this case we will + // get into the empty queue state before we end up making any (wrong) + // decisions based on this value. Unfortunately there is no way to + // detect this (and do some sanity asserts) since things can wrap + // around. + // + // To put it another way, the understanding here is that after the + // task returns we will either have an empty queue or there will still + // be tasks between the old mark and the current tail, something along + // these lines: + // + // OOOOOXXXXOOO + // | | | + // m h t + // + m = om; + } + + void + execute (lock& ql, task_data& td) + { + queued_task_count_.fetch_sub (1, std::memory_order_release); + + // The thunk moves the task data to its stack, releases the lock, + // and continues to execute the task. + // + td.thunk (*this, ql, &td.data); + + // See if we need to call the monitor (see also the serial version + // in async()). + // + if (monitor_count_ != nullptr) + { + // Note that we don't care if we don't see the updated values right + // away. + // + if (size_t t = monitor_tshold_.load (memory_order_relaxed)) + { + // "Lock" the monitor by setting threshold to 0. + // + if (monitor_tshold_.compare_exchange_strong ( + t, + 0, + memory_order_release, + memory_order_relaxed)) + { + // Now we are the only ones messing with this. + // + size_t v (monitor_count_->load (memory_order_relaxed)); + + if (v != monitor_init_) + { + // See which direction we are going. + // + if (v > monitor_init_ ? (v >= t) : (v <= t)) + t = monitor_func_ (v); + } + + monitor_tshold_.store (t, memory_order_release); + } + } + } + + ql.lock (); + } + + // Each thread has its own queue which are stored in this list. + // + std::list<task_queue> task_queues_; + + task_queue& + create_queue (); + + static task_queue* + queue () noexcept; + + static void + queue (task_queue*) noexcept; + }; +} + +#include <libbuild2/scheduler.txx> + +#endif // LIBBUILD2_SCHEDULER_HXX |