diff options
author | Boris Kolpackov <boris@codesynthesis.com> | 2017-02-09 20:23:47 +0200 |
---|---|---|
committer | Boris Kolpackov <boris@codesynthesis.com> | 2017-02-13 12:42:42 +0200 |
commit | 107357ca4d02341810f47dee20df034e9c4574e0 (patch) | |
tree | 8a276d03c9f0a38f172d850fac033aa83ee2d454 | |
parent | 29e92840c5eb6e56e047d1b0d2b80db66e9eaae5 (diff) |
Various scheduler fixes, enhancements, and tuning
While the task_ flags logic is hairy, it seems to work.
-rw-r--r-- | build2/scheduler | 23 | ||||
-rw-r--r-- | build2/scheduler.cxx | 166 |
2 files changed, 117 insertions, 72 deletions
diff --git a/build2/scheduler b/build2/scheduler index 7447021..1ce98e8 100644 --- a/build2/scheduler +++ b/build2/scheduler @@ -98,13 +98,20 @@ namespace build2 // count starts before/during async() calls, then it must be "gated" with // an alternative (lower) start count. // + // Finally, if waiting on someone else's start count, it is most likely + // unsafe (from the deadlock's point of view) to continue working through + // our own queue (i.e., we may block waiting on a task that has been + // queued before us which in turn may end up waiting on "us"). + // void - wait (size_t start_count, const atomic_count& task_count); + wait (size_t start_count, + const atomic_count& task_count, + bool work_queue = true); void - wait (const atomic_count& task_count) + wait (const atomic_count& task_count, bool work_queue = true) { - wait (0, task_count); + wait (0, task_count, work_queue); } // Resume threads waiting on this task count. @@ -161,6 +168,11 @@ namespace build2 void tune (size_t max_active); + // Return true if the scheduler is running serial. + // + bool + serial () const {return max_active_ == 1;} + // Wait for all the helper threads to terminate. Throw system_error on // failure. Note that the initially active threads are not waited for. // Return scheduling statistics. @@ -338,8 +350,7 @@ namespace build2 // // Each queue has its own mutex. If the task_ flag above is true then // there *might* be a task in one of the queues. If it is false, then - // it means there are either no tasks or someone is busy working on - // them. + // it means there are definitely no tasks. // // For now we only support trivially-destructible tasks. // @@ -378,7 +389,7 @@ namespace build2 std::mutex mutex; bool shutdown = false; - size_t stat_full = 0; // Number of times pop() returned NULL. + size_t stat_full = 0; // Number of times push() returned NULL. // Our task queue is circular with head being the index of the first // element and tail -- of the last. Since this makes the empty and one diff --git a/build2/scheduler.cxx b/build2/scheduler.cxx index 244f6b8..0e5e280 100644 --- a/build2/scheduler.cxx +++ b/build2/scheduler.cxx @@ -11,7 +11,7 @@ using namespace std; namespace build2 { void scheduler:: - wait (size_t start_count, const atomic_count& task_count) + wait (size_t start_count, const atomic_count& task_count, bool work_queue) { if (task_count <= start_count) return; @@ -20,19 +20,23 @@ namespace build2 // See if we can run some of our own tasks. // - // If we are waiting on someone else's task count then there migh still - // be no queue which is set by async(). - // - if (task_queue* tq = task_queue_) + if (work_queue) { - for (lock ql (tq->mutex); !tq->shutdown && !empty_back (*tq); ) - pop_back (*tq, ql); - - // Note that empty task queue doesn't automatically mean the task count - // has been decremented (some might still be executing asynchronously). + // If we are waiting on someone else's task count then there migh still + // be no queue which is set by async(). // - if (task_count <= start_count) - return; + if (task_queue* tq = task_queue_) + { + for (lock ql (tq->mutex); !tq->shutdown && !empty_back (*tq); ) + pop_back (*tq, ql); + + // Note that empty task queue doesn't automatically mean the task + // count has been decremented (some might still be executing + // asynchronously). + // + if (task_count <= start_count) + return; + } } suspend (start_count, task_count); @@ -48,6 +52,7 @@ namespace build2 // { lock l (mutex_); + active_--; waiting_++; @@ -154,11 +159,11 @@ namespace build2 // lock l (mutex_); - // Use 4x max_active on 32-bit and 8x max_active on 64-bit. Unless we were - // asked to run serially. + // Use 8x max_active on 32-bit and 16x max_active on 64-bit. Unless we + // were asked to run serially. // if (max_threads == 0) - max_threads = max_active * (max_active == 1 ? 1 : sizeof (void*)); + max_threads = max_active * (max_active == 1 ? 1 : sizeof (void*) * 2); assert (shutdown_ && init_active != 0 && @@ -177,48 +182,55 @@ namespace build2 ? queue_depth : max_active * sizeof (void*) * 2; - task_queues_.reserve (max_threads_); + if (max_active != 1) + task_queues_.reserve (max_threads_); - // Pick a nice prime for common max_threads numbers. Experience shows that - // we want something close to 2x for small numbers, then reduce to 1.5x - // in-between, and 1x for large ones. + // Pick a nice prime for common max_threads/2 numbers. Experience shows + // that we want something close to 2x for small numbers, then reduce to + // 1.5x in-between, and 1x for large ones. // // Note that Intel Xeons are all over the map when it comes to cores (6, // 8, 10, 12, 14, 16, 18, 20, 22). // - wait_queue_size_ = // HW threads x bits - // - // 2x - // - max_threads == 8 ? 17 : // 2 x 4 - max_threads == 16 ? 31 : // 4 x 4, 2 x 8 - // - // 1.5x - // - max_threads == 32 ? 47 : // 4 x 8 - max_threads == 48 ? 53 : // 6 x 8 - max_threads == 64 ? 67 : // 8 x 8 - max_threads == 80 ? 89 : // 10 x 8 - // - // 1x - // - max_threads == 96 ? 101 : // 12 x 8 - max_threads == 112 ? 127 : // 14 x 8 - max_threads == 128 ? 131 : // 16 x 8 - max_threads == 144 ? 139 : // 18 x 8 - max_threads == 160 ? 157 : // 20 x 8 - max_threads == 176 ? 173 : // 22 x 8 - max_threads == 192 ? 191 : // 24 x 8 - max_threads == 224 ? 223 : // 28 x 8 - max_threads == 256 ? 251 : // 32 x 8 - max_threads == 288 ? 271 : // 36 x 8 - max_threads == 320 ? 313 : // 40 x 8 - max_threads == 352 ? 331 : // 44 x 8 - max_threads == 384 ? 367 : // 48 x 8 - max_threads == 512 ? 499 : // 64 x 8 - max_threads - 1; // Assume max_threads is even. - - wait_queue_.reset (new wait_slot[wait_queue_size_]); + { + size_t n (max_threads / 2); + + wait_queue_size_ = // HW threads x bits + n == 0 ? 0 : // serial + // + // 2x + // + n == 8 ? 17 : // 2 x 4 + n == 16 ? 31 : // 4 x 4, 2 x 8 + // + // 1.5x + // + n == 32 ? 47 : // 4 x 8 + n == 48 ? 53 : // 6 x 8 + n == 64 ? 67 : // 8 x 8 + n == 80 ? 89 : // 10 x 8 + // + // 1x + // + n == 96 ? 101 : // 12 x 8 + n == 112 ? 127 : // 14 x 8 + n == 128 ? 131 : // 16 x 8 + n == 144 ? 139 : // 18 x 8 + n == 160 ? 157 : // 20 x 8 + n == 176 ? 173 : // 22 x 8 + n == 192 ? 191 : // 24 x 8 + n == 224 ? 223 : // 28 x 8 + n == 256 ? 251 : // 32 x 8 + n == 288 ? 271 : // 36 x 8 + n == 320 ? 313 : // 40 x 8 + n == 352 ? 331 : // 44 x 8 + n == 384 ? 367 : // 48 x 8 + n == 512 ? 499 : // 64 x 8 + n - 1; // Assume it is even. + } + + if (wait_queue_size_ != 0) + wait_queue_.reset (new wait_slot[wait_queue_size_]); // Reset stats counters. // @@ -284,7 +296,7 @@ namespace build2 for (unique_ptr<task_queue>& tq: task_queues_) { - lock l (tq->mutex); + lock ql (tq->mutex); r.task_queue_full += tq->stat_full; tq->shutdown = true; } @@ -396,26 +408,48 @@ namespace build2 while (s.task_) // There might be a task. { - s.task_ = false; // We will process all that are currently there. - - // Queues are never removed and there shouldn't be any reallocations - // since we reserve maximum possible size upfront. Which means we - // can get the current number of queues and release the main lock - // while examining each of them. + // The tricky part here is to clear the task_ flag with confidence. + // So we quickly scan the queues for any tasks while holding the + // scheduler lock. If all of them are empty, then we can clear the + // task_ flag. // - size_t n (s.task_queues_.size ()); - l.unlock (); + bool empty (true); - for (size_t i (0); i != n; ++i) + for (size_t i (0), n (s.task_queues_.size ()); i != n; ++i) { task_queue& tq (*s.task_queues_[i]); - for (lock ql (tq.mutex); !tq.shutdown && !s.empty_front (tq); ) - s.pop_front (tq, ql); + lock ql (tq.mutex); // Lock the queue. + + if (tq.shutdown) + break; + + if (!s.empty_front (tq)) + { + if (empty) + { + empty = false; + l.unlock (); // Release scheduler lock. + } + + // Work on this queue for as long as we can then continue with + // the rest (in which case empty will be false and scheduler + // lock is already released). + // + // Note that the queues are never removed and there shouldn't be + // any reallocations since we reserve the maximum possible size + // upfront. + // + do + s.pop_front (tq, ql); + while (!tq.shutdown && !s.empty_front (tq)); + } } - l.lock (); - // If task_ became true, then there might be new tasks. + if (empty) + s.task_ = false; // Scheduler still locked. + else + l.lock (); // Relock and rescan. } s.active_--; |