diff options
author | Boris Kolpackov <boris@codesynthesis.com> | 2021-05-12 10:42:42 +0200 |
---|---|---|
committer | Boris Kolpackov <boris@codesynthesis.com> | 2021-05-12 16:36:20 +0200 |
commit | 330db1f15d95537e288b4c371a26e43b5a9b2196 (patch) | |
tree | 48cd3779bc7849bd0252da28e7b957a2a6c6c615 | |
parent | 88379eedeae654391711d8cdda17dfc2be6367ef (diff) |
Deal with helper thread starvation during phase switching
The implemented solution entails shadowing old phase queues so that helpers
don't pick up old phase tasks and boosting the max_threads count so that we
can create more helpers if all the existing ones are stuck in the old phase.
-rw-r--r-- | libbuild2/adhoc-rule-cxx.cxx | 4 | ||||
-rw-r--r-- | libbuild2/context.cxx | 60 | ||||
-rw-r--r-- | libbuild2/context.hxx | 12 | ||||
-rw-r--r-- | libbuild2/module.cxx | 4 | ||||
-rw-r--r-- | libbuild2/scheduler.cxx | 146 | ||||
-rw-r--r-- | libbuild2/scheduler.hxx | 81 |
6 files changed, 261 insertions, 46 deletions
diff --git a/libbuild2/adhoc-rule-cxx.cxx b/libbuild2/adhoc-rule-cxx.cxx index ece687f..1066f0a 100644 --- a/libbuild2/adhoc-rule-cxx.cxx +++ b/libbuild2/adhoc-rule-cxx.cxx @@ -297,6 +297,10 @@ namespace build2 auto_thread_env penv (nullptr); context& ctx (*t.ctx.module_context); + // Enter a scheduler sub-phase. + // + scheduler::phase_guard pg (ctx.sched); + // Mark the queue so that we don't work any tasks that may already be // there. // diff --git a/libbuild2/context.cxx b/libbuild2/context.cxx index 3ae8a07..e8232c7 100644 --- a/libbuild2/context.cxx +++ b/libbuild2/context.cxx @@ -621,7 +621,7 @@ namespace build2 } bool run_phase_mutex:: - lock (run_phase p) + lock (run_phase n) { bool r; @@ -632,7 +632,7 @@ namespace build2 // Increment the counter. // condition_variable* v (nullptr); - switch (p) + switch (n) { case run_phase::load: lc_++; v = &lv_; break; case run_phase::match: mc_++; v = &mv_; break; @@ -645,13 +645,13 @@ namespace build2 // if (u) { - ctx_.phase = p; + ctx_.phase = n; r = !fail_; } - else if (ctx_.phase != p) + else if (ctx_.phase != n) { ctx_.sched.deactivate (false /* external */); - for (; ctx_.phase != p; v->wait (l)) ; + for (; ctx_.phase != n; v->wait (l)) ; r = !fail_; l.unlock (); // Important: activate() can block. ctx_.sched.activate (false /* external */); @@ -662,7 +662,7 @@ namespace build2 // In case of load, acquire the exclusive access mutex. // - if (p == run_phase::load) + if (n == run_phase::load) { if (!lm_.try_lock ()) { @@ -677,11 +677,11 @@ namespace build2 } void run_phase_mutex:: - unlock (run_phase p) + unlock (run_phase o) { // In case of load, release the exclusive access mutex. // - if (p == run_phase::load) + if (o == run_phase::load) lm_.unlock (); { @@ -690,25 +690,35 @@ namespace build2 // Decrement the counter and see if this phase has become unlocked. // bool u (false); - switch (p) + switch (o) { case run_phase::load: u = (--lc_ == 0); break; case run_phase::match: u = (--mc_ == 0); break; case run_phase::execute: u = (--ec_ == 0); break; } - // If the phase is unlocked, pick a new phase and notify the waiters. - // Note that we notify all load waiters so that they can all serialize - // behind the second-level mutex. + // If the phase became unlocked, pick a new phase and notify the + // waiters. Note that we notify all load waiters so that they can all + // serialize behind the second-level mutex. // if (u) { + run_phase n; condition_variable* v; + if (lc_ != 0) {n = run_phase::load; v = &lv_;} + else if (mc_ != 0) {n = run_phase::match; v = &mv_;} + else if (ec_ != 0) {n = run_phase::execute; v = &ev_;} + else {n = run_phase::load; v = nullptr;} - if (lc_ != 0) {ctx_.phase = run_phase::load; v = &lv_;} - else if (mc_ != 0) {ctx_.phase = run_phase::match; v = &mv_;} - else if (ec_ != 0) {ctx_.phase = run_phase::execute; v = &ev_;} - else {ctx_.phase = run_phase::load; v = nullptr;} + ctx_.phase = n; + + // Enter/leave scheduler sub-phase. See also the other half in + // relock(). + // + if (o == run_phase::match && n == run_phase::execute) + ctx_.sched.push_phase (); + else if (o == run_phase::execute && n == run_phase::match) + ctx_.sched.pop_phase (); if (v != nullptr) { @@ -758,6 +768,14 @@ namespace build2 ctx_.phase = n; r = !fail_; + // Enter/leave scheduler sub-phase. See also the other half in + // unlock(). + // + if (o == run_phase::match && n == run_phase::execute) + ctx_.sched.push_phase (); + else if (o == run_phase::execute && n == run_phase::match) + ctx_.sched.pop_phase (); + // Notify others that could be waiting for this phase. // if (v != nullptr) @@ -846,7 +864,7 @@ namespace build2 phase_lock_instance = prev; ctx.phase_mutex.unlock (phase); - //text << this_thread::get_id () << " phase release " << p; + //text << this_thread::get_id () << " phase release " << phase; } } @@ -913,10 +931,13 @@ namespace build2 if (new_phase == run_phase::load) // Note: load lock is exclusive. ctx.load_generation++; - //text << this_thread::get_id () << " phase switch " << o << " " << n; + //text << this_thread::get_id () << " phase switch " + // << old_phase << " " << new_phase; } #if 0 + // NOTE: see push/pop_phase() logic if trying to enable this. + // phase_switch:: phase_switch (phase_unlock&& u, phase_lock&& l) : old_phase (u.l->phase), new_phase (l.phase) @@ -950,6 +971,7 @@ namespace build2 if (!r && !uncaught_exception ()) throw failed (); - //text << this_thread::get_id () << " phase restore " << n << " " << o; + //text << this_thread::get_id () << " phase restore " + // << new_phase << " " << old_phase; } } diff --git a/libbuild2/context.hxx b/libbuild2/context.hxx index ad89f58..c93c1c9 100644 --- a/libbuild2/context.hxx +++ b/libbuild2/context.hxx @@ -38,8 +38,7 @@ namespace build2 void unlock (run_phase); - // Switch from one phase to another. Semantically, just unlock() followed - // by lock() but more efficient. + // Switch from one phase to another. // bool relock (run_phase unlock, run_phase lock); @@ -116,7 +115,10 @@ namespace build2 // we have switched to another task). // // Note that sharing the same scheduler between multiple top-level contexts - // can currently be problematic due to operation-specific scheduler tuning. + // can currently be problematic due to operation-specific scheduler tuning + // as all as phase pushing/popping (perhaps this suggest that we should + // instead go the multiple communicating schedulers route, a la the job + // server). // // The loaded_modules state (module.hxx) is shared among all the contexts // (there is no way to have multiple shared library loading "contexts") and @@ -635,12 +637,12 @@ namespace build2 wait_guard (context&, atomic_count& task_count, - bool phase = false); + bool unlock_phase = false); wait_guard (context&, size_t start_count, atomic_count& task_count, - bool phase = false); + bool unlock_phase = false); void wait (); diff --git a/libbuild2/module.cxx b/libbuild2/module.cxx index fc50aef..2ee29d6 100644 --- a/libbuild2/module.cxx +++ b/libbuild2/module.cxx @@ -418,6 +418,10 @@ namespace build2 auto_thread_env penv (nullptr); context& ctx (*bs.ctx.module_context); + // Enter a scheduler sub-phase. + // + scheduler::phase_guard pg (ctx.sched); + // Mark the queue so that we don't work any tasks that may already be // there (we could be called in strange ways, for example, as part of // match via dir_search()). diff --git a/libbuild2/scheduler.cxx b/libbuild2/scheduler.cxx index 43da681..8c0ea17 100644 --- a/libbuild2/scheduler.cxx +++ b/libbuild2/scheduler.cxx @@ -113,7 +113,7 @@ namespace build2 { ready_condv_.notify_one (); } - else if (queued_task_count_.load (std::memory_order_consume) != 0 && + else if (queued_task_count_.load (memory_order_consume) != 0 && activate_helper (l)) ; else if (active_ == 0 && external_ == 0) @@ -405,8 +405,12 @@ namespace build2 if ((wait_queue_size_ = max_threads == 1 ? 0 : shard_size ()) != 0) wait_queue_.reset (new wait_slot[wait_queue_size_]); - // Reset counters. + // Reset other state. // + phase_.clear (); + + idle_reserve_ = 0; + stat_max_waiters_ = 0; stat_wait_collisions_ = 0; @@ -541,6 +545,132 @@ namespace build2 return r; } + void scheduler:: + push_phase () + { + if (max_active_ == 1) // Serial execution. + return; + + // Note that we cannot "wait out" until all the old phase threads + // deactivate themselves because we are called while holding the phase + // transition lock which may prevent that from happening. + // + lock l (mutex_); + + // Here is the problem: the old phase is likely to have a bunch of waiting + // threads with non-empty queues and after switching the phase new helpers + // are going to start working those queues (and immediately get blocked + // trying to lock the "old" phase). This is further exacerbated by the + // fact that helpers get tasks from the front of the queue while new tasks + // are added at the back. Which means helpers won't see any "new" phase + // tasks until enough of them get "sacrificed" (i.e., blocked) to clear + // the old phase backlog (or more like front-log in this case). + // + // Since none of the old phase tasks can make any progress until we return + // to the old phase, we need to somehow "hide" their tasks from the new + // phase helpers. The way we are going to do it is to temporarily (until + // pop) replace such queues with empty ones. This should be ok since a + // thread with such a "shadowed" queue won't wake up until we return to + // the old phase. + // + // Note also that the assumption here is that while we may still have + // "phase-less" threads milling around (e.g., transitioning from active to + // waiting), they do not access the queue (helpers are a special case that + // we deal with by locking the queue). + // + phase_.emplace_back (task_queues_.size ()); + vector<task_queue_data>& ph (phase_.back ()); + + auto j (ph.begin ()); + for (auto i (task_queues_.begin ()); i != task_queues_.end (); ++i, ++j) + { + task_queue& tq (*i); + lock ql (tq.mutex); + + if (tq.size != 0) + { + queued_task_count_.fetch_sub (tq.size, memory_order_release); + + // @@ TODO: should we make task_queue::data allocation lazy? On the + // other hand, we don't seem to get many non-empty queues here on + // real-world projects. + // + j->data.reset (new task_data[task_queue_depth_]); + tq.swap (*j); + } + } + + assert (queued_task_count_.load (memory_order_consume) == 0); + + // Boost the max_threads limit for the first sub-phase. + // + // Ideally/long-term we want to redo this by waking up one of the old + // phase waiting threads to serve as a helper. + // + if (phase_.size () == 1) + { + size_t cur_threads (init_active_ + helpers_ - idle_reserve_); + + old_eff_max_threads_ = (cur_threads > max_threads_ + ? cur_threads + : max_threads_); + old_max_threads_ = max_threads_; + + max_threads_ = old_eff_max_threads_ + max_threads_ / 2; + idle_reserve_ = 0; + } + } + + void scheduler:: + pop_phase () + { + if (max_active_ == 1) // Serial execution. + return; + + lock l (mutex_); + assert (!phase_.empty ()); + + // Restore the queue sizes. + // + assert (queued_task_count_.load (memory_order_consume) == 0); + + vector<task_queue_data>& ph (phase_.back ()); + + auto i (task_queues_.begin ()); + for (auto j (ph.begin ()); j != ph.end (); ++i, ++j) + { + if (j->size != 0) + { + task_queue& tq (*i); + lock ql (tq.mutex); + tq.swap (*j); + queued_task_count_.fetch_add (tq.size, memory_order_release); + } + } + + phase_.pop_back (); + + // Restore the original limit and reserve idle helpers that we created + // above the old (effective) limit. + // + if (phase_.size () == 0) + { + size_t cur_threads (init_active_ + helpers_); + + if (cur_threads > old_eff_max_threads_) + { + idle_reserve_ = cur_threads - old_eff_max_threads_; + + // Not necessarily the case since some helpers may have still picked + // up tasks from the old phase and are now in waiting_. + // + //assert (idle_reserve_ <= idle_); + } + + max_threads_ = old_max_threads_; + } + } + scheduler::monitor_guard scheduler:: monitor (atomic_count& c, size_t t, function<size_t (size_t)> f) { @@ -566,18 +696,18 @@ namespace build2 if (shutdown_) return false; - if (idle_ != 0) + if (idle_ > idle_reserve_) { idle_condv_.notify_one (); } // // Ignore the max_threads value if we have queued tasks but no active // threads. This means everyone is waiting for something to happen but - // nobody is doing anything (e.g., working the queues). This, for - // example, can happen if a thread waits for a task that is in its queue - // but is below the mark. + // nobody is doing anything (e.g., working the queues). This, for example, + // can happen if a thread waits for a task that is in its queue but is + // below the mark. // - else if (init_active_ + helpers_ < max_threads_ || + else if (init_active_ + helpers_ - idle_reserve_ < max_threads_ || (active_ == 0 && queued_task_count_.load (memory_order_consume) != 0)) { @@ -778,6 +908,8 @@ namespace build2 { s.active_++; + // Note: see the push_phase() logic if changing anything here. + // while (s.queued_task_count_.load (memory_order_consume) != 0) { // Queues are never removed which means we can get the current range diff --git a/libbuild2/scheduler.hxx b/libbuild2/scheduler.hxx index 9556caa..b85d353 100644 --- a/libbuild2/scheduler.hxx +++ b/libbuild2/scheduler.hxx @@ -135,6 +135,35 @@ namespace build2 L& lock, work_queue = work_all); + // Sub-phases. + // + // Note that these functions should be called while holding the lock + // protecting the phase transition, when there are no longer any threads + // in the old phase nor yet any threads in the new phase (or, equivalent, + // for example, if the old phase does not utilize the scheduler). + // + // In particular, for push, while we don't expect any further calls to + // async() or wait() in the old phase (until pop), there could still be + // active threads that haven't had a chance to deactivated themselves yet. + // For pop there should be no remaining tasks queued corresponding to the + // phase being popped. + // + void + push_phase (); + + void + pop_phase (); + + struct phase_guard + { + explicit + phase_guard (scheduler& s): s_ (s) {s_.push_phase ();} + ~phase_guard () {s_.pop_phase ();} + + private: + scheduler& s_; + }; + // Mark the queue so that we don't work any tasks that may already be // there. In the normal "bunch of acync() calls followed by wait()" // cases this happens automatically but in special cases where async() @@ -663,29 +692,43 @@ namespace build2 // size_t task_queue_depth_; // Multiple of max_active. - struct task_queue + // Our task queue is circular with head being the index of the first + // element and tail -- of the last. Since this makes the empty and one + // element cases indistinguishable, we also keep the size. + // + // The mark is an index somewhere between (figuratively speaking) head and + // tail, if enabled. If the mark is hit, then it is disabled until the + // queue becomes empty or it is reset by a push. + // + struct task_queue_data { - std::mutex mutex; - bool shutdown = false; - - size_t stat_full = 0; // Number of times push() returned NULL. - - // Our task queue is circular with head being the index of the first - // element and tail -- of the last. Since this makes the empty and one - // element cases indistinguishable, we also keep the size. - // - // The mark is an index somewhere between (figuratively speaking) head - // and tail, if enabled. If the mark is hit, then it is disabled until - // the queue becomes empty or it is reset by a push. - // size_t head = 0; size_t mark = 0; size_t tail = 0; size_t size = 0; unique_ptr<task_data[]> data; + }; + + struct task_queue: task_queue_data + { + std::mutex mutex; + bool shutdown = false; - task_queue (size_t depth): data (new task_data[depth]) {} + size_t stat_full = 0; // Number of times push() returned NULL. + + task_queue (size_t depth) {data.reset (new task_data[depth]);} + + void + swap (task_queue_data& d) + { + using std::swap; + swap (head, d.head); + swap (mark, d.mark); + swap (tail, d.tail); + swap (size, d.size); + swap (data, d.data); + } }; // Task queue API. Expects the queue mutex to be locked. @@ -857,6 +900,14 @@ namespace build2 static void queue (task_queue*) noexcept; + // Sub-phases. + // + small_vector<vector<task_queue_data>, 2> phase_; + + size_t idle_reserve_; + size_t old_max_threads_; + size_t old_eff_max_threads_; + private: optional<size_t> wait_impl (size_t, const atomic_count&, work_queue); |