diff options
author | Boris Kolpackov <boris@codesynthesis.com> | 2016-12-13 12:30:14 +0200 |
---|---|---|
committer | Boris Kolpackov <boris@codesynthesis.com> | 2016-12-13 12:30:14 +0200 |
commit | 052dc48939a063b19a13c10cb2c735b4b06a4c4b (patch) | |
tree | 1ea8bd67374ee13cc3640d237d33ba165c495e25 | |
parent | 2d2cbcaf1c2afd1565502f8f0c83fb1cd56f6cec (diff) |
Various scheduler improvements and fixes
-rw-r--r-- | build2/scheduler | 252 | ||||
-rw-r--r-- | build2/scheduler.cxx | 129 | ||||
-rw-r--r-- | build2/scheduler.txx | 91 | ||||
-rw-r--r-- | unit-tests/scheduler/driver.cxx | 125 |
4 files changed, 415 insertions, 182 deletions
diff --git a/build2/scheduler b/build2/scheduler index 80b9ec9..ffb8acd 100644 --- a/build2/scheduler +++ b/build2/scheduler @@ -8,8 +8,6 @@ #include <mutex> #include <tuple> #include <atomic> -#include <cerrno> -#include <cassert> #include <type_traits> // aligned_storage, etc #include <condition_variable> @@ -49,15 +47,7 @@ namespace build2 // allow a ready master to continue as soon as possible. If it were reused // as a helper, then it could be blocked on a nested wait() further down the // stack. This means that the number of threads created by the scheduler - // will normally exceed the maximum active allowed. It should not, however, - // get excessive provided the tasks are not too deeply nested. - // - // @@ Actually, this is not quite correct: this seems to be a function of - // both depth and width of the task tree. Perhaps we should use a hybrid - // approach: when the number of helper threads reaches a certain limit - // we switch to reusing suspended threads? Also, a thread can rake its - // own and its subtask's queues directly in wait() since wait() won't - // return until they are done. + // will normally exceed the maximum active allowed. // class scheduler { @@ -77,68 +67,7 @@ namespace build2 // template <typename F, typename... A> void - async (atomic_count& task_count, F&& f, A&&... a) - { - using task = task_type<F, A...>; - - static_assert (sizeof (task) <= sizeof (task_data), - "insufficient space"); - - static_assert (std::is_trivially_destructible<task>::value, - "not trivially destructible"); - - // Push into the queue unless we are running serially or the queue is - // full. - // - task_data* td (nullptr); - - if (max_active_ != 1) - { - task_queue& tq (task_queue_ != nullptr - ? *task_queue_ - : create_queue ()); - - lock ql (tq.mutex); - - if (tq.shutdown) - throw system_error (ECANCELED, std::system_category ()); - - if ((td = push (tq)) != nullptr) - { - // Package the task. - // - new (&td->data) task { - &task_count, - decay_copy (forward <F> (f)), - typename task::args_type (decay_copy (forward <A> (a))...)}; - - td->thunk = &task_thunk<F, A...>; - } - else - tq.stat_full++; - } - - // If serial/full, then run the task synchronously. In this case - // there is no need to mess with task count. - // - if (td == nullptr) - { - forward<F> (f) (forward<A> (a)...); - return; - } - - // Increment the task count. - // - task_count.fetch_add (1, std::memory_order_release); - - lock l (mutex_); - task_ = true; - - // If there is a spare active thread, wake up (or create) the helper. - // - if (active_ < max_active_) - activate_helper (l); - } + async (atomic_count& task_count, F&&, A&&...); // Wait until the task count reaches 0. If the scheduler is shutdown // while waiting, throw system_error(ECANCELED). @@ -163,22 +92,24 @@ namespace build2 // already active (e.g., the calling thread). It must not be 0 (since // someone has to schedule the first task). // - // If maximum threads is unspecified, then a generally appropriate default - // limit it used. + // If the maximum threads or task queue depth arguments are unspecified, + // then appropriate defaults are used. // scheduler (size_t max_active, size_t init_active = 1, - size_t max_threads = 0) + size_t max_threads = 0, + size_t queue_depth = 0) { - startup (max_active, init_active, max_threads); + startup (max_active, init_active, max_threads, queue_depth); } - // Note: naturally not thread-safe. + // Start the scheduler. // void startup (size_t max_active, size_t init_active = 1, - size_t max_threads = 0); + size_t max_threads = 0, + size_t queue_depth = 0); // Wait for all the helper threads to terminate. Throw system_error on // failure. Note that the initially active threads are not waited for. @@ -201,6 +132,35 @@ namespace build2 stat shutdown (); + // If initially active thread(s) (besides the one that calls startup()) + // exist before the call to startup(), then they must call join() before + // executing any tasks. The two common cases where you don't have to call + // join are a single active thread that calls startup()/shutdown() or + // active thread(s) that are created after startup(). + // + void + join () + { + assert (task_queue_ = nullptr); + + // Lock the mutex to make sure the values set in startup() are visible + // in this thread. + // + lock l (mutex_); + } + + // If initially active thread(s) participate in multiple schedulers and/or + // sessions (intervals between startup() and shutdown()), then they must + // call leave() before joining another scheduler/session. Note that this + // applies to the active thread that calls shutdown(). Note that a thread + // can only participate in one scheduler at a time. + // + void + leave () + { + task_queue_ = nullptr; + } + // Return the number of hardware threads or 0 if unable to determine. // static size_t @@ -252,21 +212,7 @@ namespace build2 template <typename F, typename... A> static void - task_thunk (scheduler& s, lock& ql, void* td) - { - using task = task_type<F, A...>; - - // Move the data and release the lock. - // - task t (move (*static_cast<task*> (td))); - ql.unlock (); - - t.thunk (std::index_sequence_for<A...> ()); - - atomic_count& tc (*t.task_count); - if (--tc == 0) - s.resume (tc); // Resume a waiter, if any. - } + task_thunk (scheduler&, lock&, void*); template <typename T> static std::decay_t<T> @@ -284,8 +230,7 @@ namespace build2 // (init_active + helpers) <= max_threads // // Note that the first three are immutable between startup() and - // shutdown() so can be accessed without a lock (@@ What about - // the case of multiple initially active?). + // shutdown() so can be accessed without a lock (but see join()). // size_t init_active_ = 0; // Initially active threads. size_t max_active_ = 0; // Maximum number of active threads. @@ -378,11 +323,18 @@ namespace build2 size_t stat_full = 0; // Number of times pop() returned NULL. - // head <= stop <= tail + // Our task queue is circular with head being the index of the first + // element and tail -- of the last. Since this makes the empty and one + // element cases indistinguishable, we also keep the size. // - size_t head = 0; // Index of the first element. - size_t stop = 0; // Index of the element to stop at in pop_back(). - size_t tail = 0; // Index of the one past last element. + // The mark is an index somewhere between (figuratively speaking) head + // and tail, if enabled. If the mark is hit, then it is disabled until + // the queue becomes empty. + // + size_t head = 0; + size_t mark = 0; + size_t tail = 0; + size_t size = 0; unique_ptr<task_data[]> data; @@ -398,40 +350,102 @@ namespace build2 task_data* push (task_queue& tq) { - return tq.tail != task_queue_depth_ ? &tq.data[tq.tail++] : nullptr; + size_t& s (tq.size); + size_t& t (tq.tail); + + if (s != task_queue_depth_) + { + // normal wrap empty + // | | | + t = s != 0 ? (t != task_queue_depth_ - 1 ? t + 1 : 0) : t; + s++; + return &tq.data[t]; + } + + return nullptr; } bool - empty_front (task_queue& tq) const {return tq.head == tq.tail;} + empty_front (task_queue& tq) const {return tq.size == 0;} void pop_front (task_queue& tq, lock& ql) { - task_data& td (tq.data[tq.head++]); + size_t& s (tq.size); + size_t& h (tq.head); + size_t& m (tq.mark); + + bool a (h == m); // Adjust mark? + task_data& td (tq.data[h]); + + // normal wrap empty + // | | | + h = s != 1 ? (h != task_queue_depth_ - 1 ? h + 1 : 0) : h; - if (tq.head == tq.tail) - tq.stop = tq.head = tq.tail = 0; // Reset. - else if (tq.head > tq.stop) - tq.stop = tq.head; + if (--s == 0 || a) + m = h; // Reset or adjust the mark. // The thunk moves the task data to its stack, releases the lock, // and continues to execute the task. // td.thunk (*this, ql, &td.data); + ql.lock (); } bool - empty_back (task_queue& tq) const {return tq.stop == tq.tail;} + empty_back (task_queue& tq) const + { + return tq.size == 0 || tq.mark == task_queue_depth_; + } void pop_back (task_queue& tq, lock& ql) { - task_data& td (tq.data[--tq.tail]); + size_t& s (tq.size); + size_t& t (tq.tail); + size_t& m (tq.mark); + + bool a (t == m); // Adjust mark? - if (tq.head == tq.tail) - tq.stop = tq.head = tq.tail = 0; + task_data& td (tq.data[t]); + + // Save the old queue mark and set the new one in case the task we are + // about to run adds sub-tasks. + // + size_t om (m); + m = t; // Where next push() will go. + + // normal wrap empty + // | | | + t = s != 1 ? (t != 0 ? t - 1 : task_queue_depth_ - 1) : t; + --s; td.thunk (*this, ql, &td.data); + ql.lock (); + + // Restore the old mark (which we might have to adjust). + // + if (s == 0) + m = t; // Reset the mark. + else if (a) + m = task_queue_depth_; // Disable the mark. + else + // What happens if head goes past the old mark? In this case we will + // get into the empty queue state before we end up making any (wrong) + // decisions based on this value. Unfortunately there is no way to + // detect this (and do some sanity asserts) since things can wrap + // around. + // + // To put it another way, the understanding here is that after the + // task returns we will either have an empty queue or there will still + // be tasks between the old mark and and the current tail, something + // along these lines: + // + // OOOOOXXXXOOO + // | | | + // m h t + // + m = om; } // Each thread has its own queue. Instead of allocating all max_threads of @@ -440,9 +454,19 @@ namespace build2 // vector<unique_ptr<task_queue>> task_queues_; - // TLS cache of each thread's task queue. + // TLS cache of thread's task queue. + // + static + // Apparently Apple's Clang "temporarily disabled" C++11 thread_local + // until they can implement a "fast" version, which reportedly happened in + // XCode 8. So for now we will continue using __thread for this target. // - thread_local static task_queue* task_queue_; +#if defined(__apple_build_version__) && __apple_build_version__ < 8000000 + __thread +#else + thread_local +#endif + task_queue* task_queue_; task_queue& create_queue (); @@ -453,4 +477,6 @@ namespace build2 extern scheduler sched; } +#include <build2/scheduler.txx> + #endif // BUILD2_SCHEDULER diff --git a/build2/scheduler.cxx b/build2/scheduler.cxx index b62d2d4..4619e85 100644 --- a/build2/scheduler.cxx +++ b/build2/scheduler.cxx @@ -4,6 +4,8 @@ #include <build2/scheduler> +#include <cerrno> + using namespace std; namespace build2 @@ -16,27 +18,14 @@ namespace build2 // See if we can run some of our own tasks. // - task_queue& tq (*task_queue_); // Must have been initializied by async(). + task_queue& tq (*task_queue_); // Must have been set by async() or task + // would have been 0. for (lock ql (tq.mutex); !tq.shutdown && !empty_back (tq); ) - { - // Save the old stop point and set the new one in case the task we are - // about to run adds sub-tasks. - // - size_t stop (tq.stop); - tq.stop = tq.tail - 1; // Index of the first sub-task to be added (-1 - // is for the following pop_back()). - - pop_back (tq, ql); // Releases the lock. - ql.lock (); - - // Restore the old stop point which we might have to adjust. - // - tq.stop = tq.head > stop ? tq.head : tq.tail < stop ? tq.tail : stop; - } + pop_back (tq, ql); - // Note that empty task queue doesn't automatically mean the task count is - // zero (some might still be executing asynchronously). + // Note that empty task queue doesn't automatically mean the task count + // is zero (some might still be executing asynchronously). // if (task_count == 0) return; @@ -135,8 +124,16 @@ namespace build2 } void scheduler:: - startup (size_t max_active, size_t init_active, size_t max_threads) + startup (size_t max_active, + size_t init_active, + size_t max_threads, + size_t queue_depth) { + // Lock the mutex to make sure our changes are visible in (other) active + // threads. + // + lock l (mutex_); + // Use 4x max_active on 32-bit and 8x max_active on 64-bit. Unless we were // asked to run serially. // @@ -153,30 +150,53 @@ namespace build2 max_threads_ = max_threads; // This value should be proportional to the amount of hardware concurrency - // we have. Note that the queue entry is quite sizable. + // we have (no use queing things if helpers cannot keep up). Note that the + // queue entry is quite sizable. // - task_queue_depth_ = max_active * sizeof (void*) * 4; + task_queue_depth_ = queue_depth != 0 + ? queue_depth + : max_active * sizeof (void*) * 2; - task_queues_.clear (); task_queues_.reserve (max_threads_); - // Pick a nice prime for common max_threads numbers. Though Intel Xeons - // are all over the map when it comes to cores (6, 8, 10, 12, 14, 16, - // 18, 20, 22). + // Pick a nice prime for common max_threads numbers. Experience shows that + // we want something close to 2x for small numbers, then reduce to 1.5x + // in-between, and 1x for large ones. + // + // Note that Intel Xeons are all over the map when it comes to cores (6, + // 8, 10, 12, 14, 16, 18, 20, 22). // wait_queue_size_ = // HW threads x bits + // + // 2x + // max_threads == 8 ? 17 : // 2 x 4 max_threads == 16 ? 31 : // 4 x 4, 2 x 8 - max_threads == 32 ? 63 : // 4 x 8 - max_threads == 48 ? 97 : // 6 x 8 - max_threads == 64 ? 127 : // 8 x 8 - max_threads == 96 ? 191 : // 12 x 8 - max_threads == 128 ? 257 : // 16 x 8 - max_threads == 192 ? 383 : // 24 x 8 - max_threads == 256 ? 509 : // 32 x 8 - max_threads == 384 ? 769 : // 48 x 8 - max_threads == 512 ? 1021 : // 64 x 8 - 2 * max_threads - 1; + // + // 1.5x + // + max_threads == 32 ? 47 : // 4 x 8 + max_threads == 48 ? 53 : // 6 x 8 + max_threads == 64 ? 67 : // 8 x 8 + max_threads == 80 ? 89 : // 10 x 8 + // + // 1x + // + max_threads == 96 ? 101 : // 12 x 8 + max_threads == 112 ? 127 : // 14 x 8 + max_threads == 128 ? 131 : // 16 x 8 + max_threads == 144 ? 139 : // 18 x 8 + max_threads == 160 ? 157 : // 20 x 8 + max_threads == 176 ? 173 : // 22 x 8 + max_threads == 192 ? 191 : // 24 x 8 + max_threads == 224 ? 223 : // 28 x 8 + max_threads == 256 ? 251 : // 32 x 8 + max_threads == 288 ? 271 : // 36 x 8 + max_threads == 320 ? 313 : // 40 x 8 + max_threads == 352 ? 331 : // 44 x 8 + max_threads == 384 ? 367 : // 48 x 8 + max_threads == 512 ? 499 : // 64 x 8 + max_threads - 1; // Assume max_threads is even. wait_queue_.reset (new wait_slot[wait_queue_size_]); @@ -249,6 +269,11 @@ namespace build2 l.lock (); } + // Free the memory. + // + wait_queue_.reset (); + task_queues_.clear (); + r.thread_max_active = max_active_; r.thread_max_total = max_threads_; r.thread_max_waiting = stat_max_waiters_; @@ -281,7 +306,7 @@ namespace build2 starting_++; l.unlock (); - // Restore the counter if the thread creation fails. + // Restore the counters if the thread creation fails. // struct guard { @@ -338,10 +363,8 @@ namespace build2 { task_queue& tq (*s.task_queues_[i]); - for (lock ql (tq.mutex); - !tq.shutdown && !s.empty_front (tq); - ql.lock ()) - s.pop_front (tq, ql); // Releases the lock. + for (lock ql (tq.mutex); !tq.shutdown && !s.empty_front (tq); ) + s.pop_front (tq, ql); } l.lock (); @@ -367,16 +390,30 @@ namespace build2 s.helpers_--; } - thread_local scheduler::task_queue* scheduler::task_queue_ = nullptr; +#if defined(__apple_build_version__) && __apple_build_version__ < 8000000 + __thread +#else + thread_local +#endif + scheduler::task_queue* scheduler::task_queue_ = nullptr; auto scheduler:: create_queue () -> task_queue& { - lock l (mutex_); - task_queues_.push_back (make_unique<task_queue> (task_queue_depth_)); - task_queue_ = task_queues_.back ().get (); - task_queue_->shutdown = shutdown_; - return *task_queue_; + // Note that task_queue_depth is immutable between startup() and + // shutdown() (but see join()). + // + unique_ptr<task_queue> tqp (new task_queue (task_queue_depth_)); + task_queue& tq (*tqp); + + { + lock l (mutex_); + tq.shutdown = shutdown_; + task_queues_.push_back (move (tqp)); + } + + task_queue_ = &tq; + return tq; } scheduler sched; diff --git a/build2/scheduler.txx b/build2/scheduler.txx new file mode 100644 index 0000000..9202f0b --- /dev/null +++ b/build2/scheduler.txx @@ -0,0 +1,91 @@ +// file : build2/scheduler.txx -*- C++ -*- +// copyright : Copyright (c) 2014-2016 Code Synthesis Ltd +// license : MIT; see accompanying LICENSE file + +#include <cerrno> + +namespace build2 +{ + template <typename F, typename... A> + void scheduler:: + async (atomic_count& task_count, F&& f, A&&... a) + { + using task = task_type<F, A...>; + + static_assert (sizeof (task) <= sizeof (task_data::data), + "insufficient space"); + + static_assert (std::is_trivially_destructible<task>::value, + "not trivially destructible"); + + // Push into the queue unless we are running serially or the queue is + // full. + // + task_data* td (nullptr); + + if (max_active_ != 1) + { + task_queue* tq (task_queue_); // Single load. + if (tq == nullptr) + tq = &create_queue (); + + lock ql (tq->mutex); + + if (tq->shutdown) + throw system_error (ECANCELED, std::system_category ()); + + if ((td = push (*tq)) != nullptr) + { + // Package the task. + // + new (&td->data) task { + &task_count, + decay_copy (forward <F> (f)), + typename task::args_type (decay_copy (forward <A> (a))...)}; + + td->thunk = &task_thunk<F, A...>; + } + else + tq->stat_full++; + } + + // If serial/full, then run the task synchronously. In this case + // there is no need to mess with task count. + // + if (td == nullptr) + { + forward<F> (f) (forward<A> (a)...); + return; + } + + // Increment the task count. + // + task_count.fetch_add (1, std::memory_order_release); + + lock l (mutex_); + task_ = true; + + // If there is a spare active thread, wake up (or create) the helper. + // + if (active_ < max_active_) + activate_helper (l); + } + + template <typename F, typename... A> + void scheduler:: + task_thunk (scheduler& s, lock& ql, void* td) + { + using task = task_type<F, A...>; + + // Move the data and release the lock. + // + task t (move (*static_cast<task*> (td))); + ql.unlock (); + + t.thunk (std::index_sequence_for<A...> ()); + + atomic_count& tc (*t.task_count); + if (--tc == 0) + s.resume (tc); // Resume a waiter, if any. + } +} diff --git a/unit-tests/scheduler/driver.cxx b/unit-tests/scheduler/driver.cxx index 726415a..46bd55d 100644 --- a/unit-tests/scheduler/driver.cxx +++ b/unit-tests/scheduler/driver.cxx @@ -17,74 +17,133 @@ using namespace std; namespace build2 { - // Usage argv[0] <max-active-threads> + // Usage argv[0] [-v <volume>] [-d <difficulty>] [-c <concurrency>] + // [-q <queue-depth>] // + // -v task tree volume (affects both depth and width), for example 100 + // -d computational difficulty of each task, for example 10 + // -c max active threads, if unspecified or 0, then hardware concurrency + // -q task queue depth, if unspecified or 0, then appropriate default used + // + // Specifying any option also turns on the verbose mode. + // + // Notes on testing: + // + // 1. Ideally you would want to test things on an SMP machine. + // + // 2. When need to compare performance, disable turbo boost since its + // availability depends on CPU utilization/temperature: + // + // # echo '1' >/sys/devices/system/cpu/intel_pstate/no_turbo + // + // 3. Use turbostat(1) to see per-CPU details (utlization, frequency): + // + // $ sudo turbostat --interval 1 ./driver -d 8 -v 300 + // + static bool + prime (uint64_t); + + // Find # of primes in the [x, y) range. + // + static void + inner (uint64_t x, uint64_t y, uint64_t& r) + { + for (; x != y; ++x) + if (prime (x)) + r++; + }; + int main (int argc, char* argv[]) { bool verb (false); + + // Adjust assert() below if changing these defaults. + // + size_t volume (100); + uint32_t difficulty (10); + size_t max_active (0); + size_t queue_depth (0); - if (argc > 1) + for (int i (1); i != argc; ++i) { + string a (argv[i]); + + if (a == "-v") + volume = stoul (argv[++i]); + else if (a == "-d") + difficulty = stoul (argv[++i]); + else if (a == "-c") + max_active = stoul (argv[++i]); + else if (a == "-q") + queue_depth = stoul (argv[++i]); + else + assert (false); + verb = true; - max_active = stoul (argv[1]); } if (max_active == 0) max_active = scheduler::hardware_concurrency (); - scheduler s (max_active); + scheduler s (max_active, 1, 0, queue_depth); - auto inner = [] (size_t x, size_t y, size_t& out) + // Find # prime counts of primes in [i, d*i*i) ranges for i in (0, n]. + // + auto outer = [difficulty, &s] (size_t n, vector<uint64_t>& o, uint64_t& r) { - out = x + y; - this_thread::sleep_for (chrono::microseconds (out * 10)); - }; - - auto outer = [&s, &inner] (size_t n, size_t& out) - { - vector<size_t> result (2 * n, 0); scheduler::atomic_count task_count (0); - for (size_t i (0); i != 2 * n; ++i) + for (size_t i (1); i <= n; ++i) { + o[i - 1] = 0; s.async (task_count, inner, i, - i, - std::ref (result[i])); + i * i * difficulty, + ref (o[i - 1])); } s.wait (task_count); assert (task_count == 0); - for (size_t i (0); i != n; ++i) - out += result[i]; - - this_thread::sleep_for (chrono::microseconds (out * 10)); + for (uint64_t v: o) + r += prime (v) ? 1 : 0; }; - const size_t tasks (50); + vector<uint64_t> r (volume, 0); + vector<vector<uint64_t>> o (volume, vector<uint64_t> ()); - vector<size_t> result (tasks, 0); scheduler::atomic_count task_count (0); - for (size_t i (0); i != tasks; ++i) + for (size_t i (0); i != volume; ++i) { + o[i].resize (i); s.async (task_count, outer, i, - std::ref (result[i])); + ref (o[i]), + ref (r[i])); } s.wait (task_count); assert (task_count == 0); + size_t n (0); + for (uint64_t v: r) + n += v; + + if (volume == 100 && difficulty == 10) + assert (n == 580); + scheduler::stat st (s.shutdown ()); if (verb) { + cerr << "result " << n << endl + << endl; + cerr << "thread_max_active " << st.thread_max_active << endl << "thread_max_total " << st.thread_max_total << endl << "thread_helpers " << st.thread_helpers << endl @@ -99,6 +158,26 @@ namespace build2 return 0; } + + static bool + prime (uint64_t x) + { + if (x == 2 || x == 3) + return true; + + if (x < 2 || x % 2 == 0 || x % 3 == 0) + return false; + + // Test divisors starting from 5 and incrementing alternatively by 2/4. + // + for (uint64_t d (5), i (2); d * d <= x; d += i, i = 6 - i) + { + if (x % d == 0) + return false; + } + + return true; + } } int |