// file : libbuild2/scheduler.cxx -*- C++ -*- // license : MIT; see accompanying LICENSE file #include <libbuild2/scheduler.hxx> #if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__) # include <pthread.h> # ifdef __FreeBSD__ # include <pthread_np.h> // pthread_attr_get_np() # endif #endif #ifndef _WIN32 # include <thread> // this_thread::sleep_for() #else # include <libbutl/win32-utility.hxx> # include <chrono> #endif #include <cerrno> #include <libbuild2/diagnostics.hxx> using namespace std; namespace build2 { // TLS cache of thread's task queue. // // Note that scheduler::task_queue struct is private. // static #ifdef __cpp_thread_local thread_local #else __thread #endif void* scheduler_queue = nullptr; scheduler::task_queue* scheduler:: queue () noexcept { return static_cast<scheduler::task_queue*> (scheduler_queue); } void scheduler:: queue (scheduler::task_queue* q) noexcept { scheduler_queue = q; } size_t scheduler:: wait (size_t start_count, const atomic_count& task_count, work_queue wq) { // Note that task_count is a synchronization point. // size_t tc; if ((tc = task_count.load (memory_order_acquire)) <= start_count) return tc; assert (max_active_ != 1); // Serial execution, nobody to wait for. // See if we can run some of our own tasks. // if (wq != work_none) { // If we are waiting on someone else's task count then there migh still // be no queue (set by async()). // if (task_queue* tq = queue ()) { for (lock ql (tq->mutex); !tq->shutdown && !empty_back (*tq); ) { pop_back (*tq, ql); if (wq == work_one) { if ((tc = task_count.load (memory_order_acquire)) <= start_count) return tc; } } // Note that empty task queue doesn't automatically mean the task // count has been decremented (some might still be executing // asynchronously). // if ((tc = task_count.load (memory_order_acquire)) <= start_count) return tc; } } return suspend (start_count, task_count); } void scheduler:: deactivate (bool external) { if (max_active_ == 1) // Serial execution. return; lock l (mutex_); active_--; waiting_++; if (external) external_++; progress_.fetch_add (1, memory_order_relaxed); if (waiting_ > stat_max_waiters_) stat_max_waiters_ = waiting_; // A spare active thread has become available. If there are ready masters // or eager helpers, wake someone up. // if (ready_ != 0) { ready_condv_.notify_one (); } else if (queued_task_count_.load (std::memory_order_consume) != 0 && activate_helper (l)) ; else if (active_ == 0 && external_ == 0) { // Note that we tried to handle this directly in this thread but that // wouldn't work for the phase lock case where we call deactivate and // then go wait on a condition variable: we would be doing deadlock // detection while holding the lock that prevents other threads from // making progress! So it has to be a separate monitoring thread. // dead_condv_.notify_one (); } } void scheduler:: activate (bool external, bool collision) { if (max_active_ == 1) // Serial execution. return; lock l (mutex_); if (collision) stat_wait_collisions_++; // If we have spare active threads, then become active. Otherwise it // enters the ready queue. // if (external) external_--; waiting_--; ready_++; progress_.fetch_add (1, memory_order_relaxed); while (!shutdown_ && active_ >= max_active_) ready_condv_.wait (l); ready_--; active_++; progress_.fetch_add (1, memory_order_relaxed); if (shutdown_) throw_generic_error (ECANCELED); } void scheduler:: sleep (const duration& d) { deactivate (true /* external */); active_sleep (d); activate (true /* external */); } void scheduler:: active_sleep (const duration& d) { // MinGW GCC 4.9 doesn't implement this_thread so use Win32 Sleep(). // #ifndef _WIN32 this_thread::sleep_for (d); #else using namespace chrono; Sleep (static_cast<DWORD> (duration_cast<milliseconds> (d).count ())); #endif } size_t scheduler:: suspend (size_t start_count, const atomic_count& task_count) { wait_slot& s ( wait_queue_[ hash<const atomic_count*> () (&task_count) % wait_queue_size_]); // This thread is no longer active. // deactivate (false /* external */); // Note that the task count is checked while holding the lock. We also // have to notify while holding the lock (see resume()). The aim here // is not to end up with a notification that happens between the check // and the wait. // size_t tc (0); bool collision; { lock l (s.mutex); // We have a collision if there is already a waiter for a different // task count. // collision = (s.waiters++ != 0 && s.task_count != &task_count); // This is nuanced: we want to always have the task count of the last // thread to join the queue. Otherwise, if threads are leaving and // joining the queue simultaneously, we may end up with a task count of // a thread group that is no longer waiting. // s.task_count = &task_count; // We could probably relax the atomic access since we use a mutex for // synchronization though this has a different tradeoff (calling wait // because we don't see the count). // while (!(s.shutdown || (tc = task_count.load (memory_order_acquire)) <= start_count)) s.condv.wait (l); s.waiters--; } // This thread is no longer waiting. // activate (false /* external */, collision); return tc; } void scheduler:: resume (const atomic_count& tc) { if (max_active_ == 1) // Serial execution, nobody to wakeup. return; wait_slot& s ( wait_queue_[hash<const atomic_count*> () (&tc) % wait_queue_size_]); // See suspend() for why we must hold the lock. // lock l (s.mutex); if (s.waiters != 0) s.condv.notify_all (); } scheduler:: ~scheduler () { try { shutdown (); } catch (system_error&) {} } auto scheduler:: wait_idle () -> lock { lock l (mutex_); assert (waiting_ == 0); assert (ready_ == 0); while (active_ != init_active_ || starting_ != 0) { l.unlock (); this_thread::yield (); l.lock (); } return l; } size_t scheduler:: shard_size (size_t mul, size_t div) const { size_t n (max_threads_ == 1 ? 0 : max_threads_ * mul / div / 4); // Return true if the specified number is prime. // auto prime = [] (size_t n) { // Check whether any number from 2 to the square root of n evenly // divides n, and return false if that's the case. // // While iterating i till sqrt(n) would be more efficient let's do // without floating arithmetic, since it doesn't make much difference // for the numbers we evaluate. Note that checking for i <= n / 2 is // just as efficient for small numbers but degrades much faster for // bigger numbers. // for (size_t i (2); i * i <= n; ++i) { if (n % i == 0) return false; } return n > 1; }; // Return a prime number that is not less than the specified number. // auto next_prime = [&prime] (size_t n) { // Note that there is always a prime number in [n, 2 * n). // for (;; ++n) { if (prime (n)) return n; } }; // Experience shows that we want something close to 2x for small numbers, // then reduce to 1.5x in-between, and 1x for large ones. // // Note that Intel Xeons are all over the map when it comes to cores (6, // 8, 10, 12, 14, 16, 18, 20, 22). // // HW threads x arch-bits (see max_threads below). // return n == 0 ? 1 : // serial n == 1 ? 3 : // odd prime number n <= 16 ? next_prime (n * 2) : // {2, 4} x 4, 2 x 8 n <= 80 ? next_prime (n * 3 / 2) : // {4, 6, 8, 10} x 8 next_prime (n) ; // {12, 14, 16, ...} x 8, ... } void scheduler:: startup (size_t max_active, size_t init_active, size_t max_threads, size_t queue_depth, optional<size_t> max_stack) { // Lock the mutex to make sure our changes are visible in (other) active // threads. // lock l (mutex_); max_stack_ = max_stack; // Use 8x max_active on 32-bit and 32x max_active on 64-bit. Unless we // were asked to run serially. // if (max_threads == 0) max_threads = (max_active == 1 ? 1 : sizeof (void*) < 8 ? 8 : 32) * max_active; assert (shutdown_ && init_active != 0 && init_active <= max_active && max_active <= max_threads); active_ = init_active_ = init_active; max_active_ = orig_max_active_ = max_active; max_threads_ = max_threads; // This value should be proportional to the amount of hardware concurrency // we have (no use queing things up if helpers cannot keep up). Note that // the queue entry is quite sizable. // // The relationship is as follows: we want to have a deeper queue if the // tasks take long (e.g., compilation) and shorter if they are quick (e.g, // test execution). If the tasks are quick then the synchronization // overhead required for queuing/dequeuing things starts to dominate. // task_queue_depth_ = queue_depth != 0 ? queue_depth : max_active * 4; queued_task_count_.store (0, memory_order_relaxed); if ((wait_queue_size_ = max_threads == 1 ? 0 : shard_size ()) != 0) wait_queue_.reset (new wait_slot[wait_queue_size_]); // Reset counters. // stat_max_waiters_ = 0; stat_wait_collisions_ = 0; progress_.store (0, memory_order_relaxed); for (size_t i (0); i != wait_queue_size_; ++i) wait_queue_[i].shutdown = false; shutdown_ = false; if (max_active_ != 1) dead_thread_ = thread (deadlock_monitor, this); } size_t scheduler:: tune (size_t max_active) { // Note that if we tune a parallel scheduler to run serially, we will // still have the deadlock monitoring thread running. // With multiple initial active threads we will need to make changes to // max_active_ visible to other threads and which we currently say can be // accessed between startup and shutdown without a lock. // assert (init_active_ == 1); if (max_active == 0) max_active = orig_max_active_; if (max_active != max_active_) { assert (max_active >= init_active_ && max_active <= orig_max_active_); // The scheduler must not be active though some threads might still be // comming off from finishing a task. So we busy-wait for them. // lock l (wait_idle ()); swap (max_active_, max_active); } return max_active == orig_max_active_ ? 0 : max_active; } auto scheduler:: shutdown () -> stat { // Our overall approach to shutdown is not to try and stop everything as // quickly as possible but rather to avoid performing any tasks. This // avoids having code littered with if(shutdown) on every other line. stat r; lock l (mutex_); if (!shutdown_) { // Collect statistics. // r.thread_helpers = helpers_; // Signal shutdown. // shutdown_ = true; for (size_t i (0); i != wait_queue_size_; ++i) { wait_slot& ws (wait_queue_[i]); lock l (ws.mutex); ws.shutdown = true; } for (task_queue& tq: task_queues_) { lock ql (tq.mutex); r.task_queue_full += tq.stat_full; tq.shutdown = true; } // Wait for all the helpers to terminate waking up any thread that // sleeps. // while (helpers_ != 0) { bool i (idle_ != 0); bool r (ready_ != 0); bool w (waiting_ != 0); l.unlock (); if (i) idle_condv_.notify_all (); if (r) ready_condv_.notify_all (); if (w) for (size_t i (0); i != wait_queue_size_; ++i) wait_queue_[i].condv.notify_all (); this_thread::yield (); l.lock (); } assert (external_ == 0); // Wait for the deadlock monitor (the only remaining thread). // if (orig_max_active_ != 1) // See tune() for why not max_active_. { l.unlock (); dead_condv_.notify_one (); dead_thread_.join (); } // Free the memory. // wait_queue_.reset (); task_queues_.clear (); r.thread_max_active = orig_max_active_; r.thread_max_total = max_threads_; r.thread_max_waiting = stat_max_waiters_; r.task_queue_depth = task_queue_depth_; r.task_queue_remain = queued_task_count_.load (memory_order_consume); r.wait_queue_slots = wait_queue_size_; r.wait_queue_collisions = stat_wait_collisions_; } return r; } scheduler::monitor_guard scheduler:: monitor (atomic_count& c, size_t t, function<size_t (size_t)> f) { assert (monitor_count_ == nullptr && t != 0); // While the scheduler must not be active, some threads might still be // comming off from finishing a task and trying to report progress. So we // busy-wait for them (also in ~monitor_guard()). // lock l (wait_idle ()); monitor_count_ = &c; monitor_tshold_.store (t, memory_order_relaxed); monitor_init_ = c.load (memory_order_relaxed); monitor_func_ = move (f); return monitor_guard (this); } bool scheduler:: activate_helper (lock& l) { if (shutdown_) return false; if (idle_ != 0) { idle_condv_.notify_one (); } // // Ignore the max_threads value if we have queued tasks but no active // threads. This means everyone is waiting for something to happen but // nobody is doing anything (e.g., working the queues). This, for // example, can happen if a thread waits for a task that is in its queue // but is below the mark. // else if (init_active_ + helpers_ < max_threads_ || (active_ == 0 && queued_task_count_.load (memory_order_consume) != 0)) { create_helper (l); } else return false; return true; } void scheduler:: create_helper (lock& l) { helpers_++; starting_++; l.unlock (); // Restore the counters if the thread creation fails. // struct guard { lock* l; size_t& h; size_t& s; ~guard () {if (l != nullptr) {l->lock (); h--; s--;}} } g {&l, helpers_, starting_}; // For some platforms/compilers the default stack size for newly created // threads may differ from that of the main thread. Here are the default // main/new thread sizes (in KB) for some of them: // // Linux : 8192 / 8196 // FreeBSD : 524288 / 2048 // MacOS : 8192 / 512 // MinGW : 2048 / 2048 // VC : 1024 / 1024 // // Provided the main thread size is less-equal than // LIBBUILD2_SANE_STACK_SIZE (which defaults to // sizeof(void*)*LIBBUILD2_DEFAULT_STACK_SIZE), we make sure that the new // thread stack is the same as for the main thread. Otherwise, we cap it // at LIBBUILD2_DEFAULT_STACK_SIZE (default: 8MB). This can also be // overridden at runtime with the --max-stack build2 driver option // (remember to update its documentation of changing anything here). // // On Windows the stack size is the same for all threads and is customized // at the linking stage (see build2/buildfile). Thus neither *_STACK_SIZE // nor --max-stack have any effect here. // // On Linux, FreeBSD and MacOS there is no way to change it once and for // all newly created threads. Thus we will use pthreads, creating threads // with the stack size of the current thread. This way all threads will // inherit the main thread's stack size (since the first helper is always // created by the main thread). // // Note also the interaction with our backtrace functionality: in order to // get the complete stack trace we let unhandled exceptions escape the // thread function expecting the runtime to still call std::terminate. In // particular, having a noexcept function anywhere on the exception's path // causes the stack trace to be truncated, at least on Linux. // #if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__) #ifndef LIBBUILD2_DEFAULT_STACK_SIZE # define LIBBUILD2_DEFAULT_STACK_SIZE 8388608 // 8MB #endif #ifndef LIBBUILD2_SANE_STACK_SIZE # define LIBBUILD2_SANE_STACK_SIZE (sizeof(void*) * LIBBUILD2_DEFAULT_STACK_SIZE) #endif // Auto-deleter. // struct attr_deleter { void operator() (pthread_attr_t* a) const { int r (pthread_attr_destroy (a)); // We should be able to destroy the valid attributes object, unless // something is severely damaged. // assert (r == 0); } }; // Calculate the current thread stack size. Don't forget to update #if // conditions above when adding the stack size customization for a new // platforms/compilers. // size_t stack_size; { #ifdef __linux__ // Note that the attributes must not be initialized. // pthread_attr_t attr; int r (pthread_getattr_np (pthread_self (), &attr)); if (r != 0) throw_system_error (r); unique_ptr<pthread_attr_t, attr_deleter> ad (&attr); r = pthread_attr_getstacksize (&attr, &stack_size); if (r != 0) throw_system_error (r); #elif defined(__FreeBSD__) pthread_attr_t attr; int r (pthread_attr_init (&attr)); if (r != 0) throw_system_error (r); unique_ptr<pthread_attr_t, attr_deleter> ad (&attr); r = pthread_attr_get_np (pthread_self (), &attr); if (r != 0) throw_system_error (r); r = pthread_attr_getstacksize (&attr, &stack_size); if (r != 0) throw_system_error (r); #else // defined(__APPLE__) stack_size = pthread_get_stacksize_np (pthread_self ()); #endif } // Cap the size if necessary. // if (max_stack_) { if (*max_stack_ != 0 && stack_size > *max_stack_) stack_size = *max_stack_; } else if (stack_size > LIBBUILD2_SANE_STACK_SIZE) stack_size = LIBBUILD2_DEFAULT_STACK_SIZE; pthread_attr_t attr; int r (pthread_attr_init (&attr)); if (r != 0) throw_system_error (r); unique_ptr<pthread_attr_t, attr_deleter> ad (&attr); // Create the thread already detached. // r = pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); if (r != 0) throw_system_error (r); r = pthread_attr_setstacksize (&attr, stack_size); if (r != 0) throw_system_error (r); pthread_t t; r = pthread_create (&t, &attr, helper, this); if (r != 0) throw_system_error (r); #else thread t (helper, this); t.detach (); #endif g.l = nullptr; // Disarm. } void* scheduler:: helper (void* d) { scheduler& s (*static_cast<scheduler*> (d)); // Note that this thread can be in an in-between state (not active or // idle) but only while holding the lock. Which means that if we have the // lock then we can account for all of them (this is important during // shutdown). Except when the thread is just starting, before acquiring // the lock for the first time, which we handle with the starting count. // lock l (s.mutex_); s.starting_--; while (!s.shutdown_) { // If there is a spare active thread, become active and go looking for // some work. // if (s.active_ < s.max_active_) { s.active_++; while (s.queued_task_count_.load (memory_order_consume) != 0) { // Queues are never removed which means we can get the current range // and release the main lock while examining each of them. // auto it (s.task_queues_.begin ()); size_t n (s.task_queues_.size ()); // Different to end(). l.unlock (); // Note: we have to be careful not to advance the iterator past the // last element (since what's past could be changing). // for (size_t i (0);; ++it) { task_queue& tq (*it); for (lock ql (tq.mutex); !tq.shutdown && !s.empty_front (tq); ) s.pop_front (tq, ql); if (++i == n) break; } l.lock (); } s.active_--; // While executing the tasks a thread might have become ready // (equivalent logic to deactivate()). // if (s.ready_ != 0) s.ready_condv_.notify_one (); else if (s.active_ == 0 && s.external_ == 0) s.dead_condv_.notify_one (); } // Become idle and wait for a notification. // s.idle_++; s.idle_condv_.wait (l); s.idle_--; } s.helpers_--; return nullptr; } auto scheduler:: create_queue () -> task_queue& { // Note that task_queue_depth is immutable between startup() and // shutdown() (but see join()). // task_queue* tq; { lock l (mutex_); task_queues_.emplace_back (task_queue_depth_); tq = &task_queues_.back (); tq->shutdown = shutdown_; } queue (tq); return *tq; } void* scheduler:: deadlock_monitor (void* d) { using namespace chrono; scheduler& s (*static_cast<scheduler*> (d)); lock l (s.mutex_); while (!s.shutdown_) { s.dead_condv_.wait (l); while (s.active_ == 0 && s.external_ == 0 && !s.shutdown_) { // We may have a deadlock which can happen because of dependency // cycles. // // Relying on the active_ count alone is not precise enough, however: // some threads might be transitioning between active/waiting/ready // states. Carefully accounting for this is not trivial, to say the // least (especially in the face of spurious wakeups). So we are going // to do a "fuzzy" deadlock detection by measuring "progress". The // idea is that those transitions should be pretty short-lived and so // if we wait for a few thousand context switches, then we should be // able to distinguish a real deadlock from the transition case. // size_t op (s.progress_.load (memory_order_relaxed)), np (op); l.unlock (); for (size_t i (0), n (10000), m (9990); op == np && i != n; ++i) { // On the last few iterations sleep a bit instead of yielding (in // case yield() is a noop; we use the consume order for the same // reason). // if (i <= m) this_thread::yield (); else active_sleep ((i - m) * 20ms); np = s.progress_.load (memory_order_consume); } l.lock (); // Re-check active/external counts for good measure (in case we were // spinning too fast). // if (np == op && s.active_ == 0 && s.external_ == 0 && !s.shutdown_) { // Shutting things down cleanly is tricky: we could have handled it // in the scheduler (e.g., by setting a flag and then waking // everyone up, similar to shutdown). But there could also be // "external waiters" that have called deactivate() -- we have no // way to wake those up. So for now we are going to abort (the nice // thing about abort is if this is not a dependency cycle, then we // have a core to examine). // error << "deadlock suspected, aborting" << info << "deadlocks are normally caused by dependency cycles" << info << "re-run with -s to diagnose dependency cycles"; terminate (false /* trace */); } } } return nullptr; } }