// file      : libbuild2/scheduler.hxx -*- C++ -*-
// license   : MIT; see accompanying LICENSE file

#ifndef LIBBUILD2_SCHEDULER_HXX
#define LIBBUILD2_SCHEDULER_HXX

#include <list>
#include <mutex>
#include <tuple>
#include <atomic>
#include <type_traits>        // aligned_storage, etc
#include <condition_variable>

#include <libbuild2/types.hxx>
#include <libbuild2/utility.hxx>

#include <libbuild2/export.hxx>

namespace build2
{
  // Scheduler of tasks and threads. Works best for "substantial" tasks (e.g.,
  // running a process), where in comparison thread synchronization overhead
  // is negligible.
  //
  // A thread (called "master") may need to perform several tasks which can be
  // done in parallel (e.g., update all the prerequisites or run all the
  // tests). To acomplish this, the master, via a call to async(), can ask the
  // scheduler to run a task in another thread (called "helper"). If a helper
  // is available, then the task is executed asynchronously by such a helper.
  // Otherwise, the task is (normally) executed synchronously as part of the
  // wait() call below. However, in certain cases (serial execution or full
  // queue), the task may be executed synchronously as part of the async()
  // call itself. Once the master thread has scheduled all the tasks, it calls
  // wait() to await for their completion.
  //
  // The scheduler makes sure that only a certain number of threads (for
  // example, the number of available hardware threads) are "active" at any
  // given time. When a master thread calls wait(), it is "suspended" until
  // all its asynchronous tasks are completed (at which point it becomes
  // "ready"). A suspension of a master results in either another ready master
  // being "resumed" or another helper thread becoming available.
  //
  // On completion of a task a helper thread returns to the scheduler which
  // can again lead either to a ready master being resumed (in which case the
  // helper is suspended) or the helper becoming available to perform another
  // task.
  //
  // Note that suspended threads are not reused as helpers. Rather, a new
  // helper thread is always created if none is available. This is done to
  // allow a ready master to continue as soon as possible. If it were reused
  // as a helper, then it could be blocked on a nested wait() further down the
  // stack. All this means that the number of threads created by the scheduler
  // will normally exceed the maximum active allowed.
  //
  class LIBBUILD2_SYMEXPORT scheduler
  {
  public:
    using atomic_count = std::atomic<size_t>;

    // F should return void and not throw any exceptions. The way the result
    // of a task is communicated back to the master thread is ad hoc, usually
    // via "out" arguments. Such result(s) can only be retrieved by the master
    // once its task count reaches the start count.
    //
    // The argument passing semantics is the same as for std::thread. In
    // particular, lvalue-references are passed as copies (use ref()/cref()
    // for the by-reference semantics), except the case where the task is
    // executed synchronously and as part of the async() call itself (this
    // subtlety can become important when passing shared locks; you would
    // only want it to be copied if the task is queued).
    //
    // Return true if the task was queued and false if it was executed
    // synchronously.
    //
    // If the scheduler is shutdown, throw system_error(ECANCELED).
    //
    template <typename F, typename... A>
    bool
    async (size_t start_count, atomic_count& task_count, F&&, A&&...);

    template <typename F, typename... A>
    bool
    async (atomic_count& task_count, F&& f, A&&... a)
    {
      return async (0, task_count, forward<F> (f), forward<A> (a)...);
    }

    // Wait until the task count reaches the start count or less. If the
    // scheduler is shutdown while waiting, throw system_error(ECANCELED).
    // Return the value of task count. Note that this is a synchronizaiton
    // point (i.e., the task count is checked with memory_order_acquire).
    //
    // Note that it is valid to wait on another thread's task count (that is,
    // without making any async() calls in this thread). However, if the start
    // count differs from the one passed to async(), then whomever sets the
    // start count to this alternative value must also call resume() below
    // in order to signal waiting threads.
    //
    // Note also that in this case (waiting on someone else's start count),
    // the async() call could execute the tasks synchronously without ever
    // incrementing the task count. Thus if waiting on another thread's start
    // count starts before/during async() calls, then it must be "gated" with
    // an alternative (lower) start count.
    //
    // Finally, if waiting on someone else's start count, it may be unsafe
    // (from the deadlock's point of view) to continue working through our own
    // queue (i.e., we may block waiting on a task that has been queued before
    // us which in turn may end up waiting on "us").
    //
    enum work_queue
    {
      work_none, // Don't work own queue.
      work_one,  // Work own queue rechecking the task count after every task.
      work_all   // Work own queue before rechecking the task count.
    };

    size_t
    wait (size_t start_count,
          const atomic_count& task_count,
          work_queue = work_all);

    size_t
    wait (const atomic_count& task_count, work_queue wq = work_all)
    {
      return wait (0, task_count, wq);
    }

    // Resume threads waiting on this task count.
    //
    void
    resume (const atomic_count& task_count);

    // An active thread that is about to wait for potentially significant time
    // on something other than task_count (e.g., mutex, condition variable,
    // timer, etc) should deactivate itself with the scheduler and then
    // reactivate once done waiting.
    //
    // The external flag indicates whether the wait is for an event external
    // to the scheduler, that is, triggered by something other than one of the
    // threads managed by the scheduler.
    //
    void
    deactivate (bool external);

    void
    activate (bool external, bool = false);

    // Sleep for the specified duration, deactivating the thread before going
    // to sleep and re-activating it after waking up (which means this
    // function may sleep potentially significantly longer than requested).
    //
    void
    sleep (const duration&);

    // Sleep without deactivating the thread. Essentially a portable
    // std::this_thread::sleep_for() implementation but only with the
    // milliseconds precision on some platforms.
    //
    static void
    active_sleep (const duration&);

    // Startup and shutdown.
    //
  public:
    // Unless already shut down, call shutdown() but ignore errors.
    //
    ~scheduler ();

    // Create a shut down scheduler.
    //
    scheduler () = default;

    // Create a started up scheduler.
    //
    // The initial active argument is the number of threads to assume are
    // already active (e.g., the calling thread). It must not be 0 (since
    // someone has to schedule the first task).
    //
    // If the maximum threads or task queue depth arguments are unspecified,
    // then appropriate defaults are used.
    //
    explicit
    scheduler (size_t max_active,
               size_t init_active = 1,
               size_t max_threads = 0,
               size_t queue_depth = 0,
               optional<size_t> max_stack = nullopt)
    {
      startup (max_active, init_active, max_threads, queue_depth, max_stack);
    }

    // Start the scheduler.
    //
    void
    startup (size_t max_active,
             size_t init_active = 1,
             size_t max_threads = 0,
             size_t queue_depth = 0,
             optional<size_t> max_stack = nullopt);

    // Return true if the scheduler was started up.
    //
    // Note: can only be called from threads that have observed creation,
    // startup, or shutdown.
    //
    bool
    started () const {return !shutdown_;}

    // Tune a started up scheduler.
    //
    // Currently one cannot increase the number of (initial) max_active, only
    // decrease it. Pass 0 to restore the initial value. Returns the old
    // value (0 if it is initial).
    //
    // Note that tuning can only be done while the scheduler is inactive, that
    // is, no threads are executing or waiting on a task. For example, in a
    // setup with a single initial active thread that would be after a return
    // from the top-level wait() call. Tuning the scheduler with more than one
    // initial active threads is currently not supported.
    //
    size_t
    tune (size_t max_active);

    struct tune_guard
    {
      tune_guard (): s_ (nullptr), o_ (0) {}
      tune_guard (scheduler& s, size_t ma): s_ (&s), o_ (s_->tune (ma)) {}
      tune_guard (tune_guard&& x): s_ (x.s_), o_ (x.o_) {x.s_ = nullptr;}
      tune_guard& operator= (tune_guard&& x)
      {
        if (&x != this)
        {
          s_ = x.s_;
          o_ = x.o_;
          x.s_ = nullptr;
        }
        return *this;
      }

      ~tune_guard ()
      {
        if (s_ != nullptr)
          s_->tune (o_);
      }

    private:
      scheduler* s_;
      size_t o_;
    };

    // Return true if the scheduler is configured to run tasks serially.
    //
    // Note: can only be called from threads that have observed startup.
    //
    bool
    serial () const {return max_active_ == 1;}

    // Wait for all the helper threads to terminate. Throw system_error on
    // failure. Note that the initially active threads are not waited for.
    // Return scheduling statistics.
    //
    struct stat
    {
      size_t thread_max_active     = 0; // max # of active threads allowed.
      size_t thread_max_total      = 0; // max # of total threads allowed.
      size_t thread_helpers        = 0; // # of helper threads created.
      size_t thread_max_waiting    = 0; // max # of waiters at any time.

      size_t task_queue_depth      = 0; // # of entries in a queue (capacity).
      size_t task_queue_full       = 0; // # of times task queue was full.
      size_t task_queue_remain     = 0; // # of tasks remaining in queue.

      size_t wait_queue_slots      = 0; // # of wait slots (buckets).
      size_t wait_queue_collisions = 0; // # of times slot had been occupied.
    };

    stat
    shutdown ();

    // Progress monitoring.
    //
    // Setting and clearing of the monitor is not thread-safe. That is, it
    // should be set before any tasks are queued and cleared after all of
    // them have completed.
    //
    // The counter must go in one direction, either increasing or decreasing,
    // and should contain the initial value during the call. Zero threshold
    // value is reserved.
    //
    struct monitor_guard
    {
      explicit
      monitor_guard (scheduler* s = nullptr): s_ (s) {}
      monitor_guard (monitor_guard&& x): s_ (x.s_) {x.s_ = nullptr;}
      monitor_guard& operator= (monitor_guard&& x)
      {
        if (&x != this)
        {
          s_ = x.s_;
          x.s_ = nullptr;
        }
        return *this;
      }

      ~monitor_guard ()
      {
        if (s_ != nullptr)
        {
          lock l (s_->wait_idle ()); // See monitor() for details.
          s_->monitor_count_ = nullptr;
          s_->monitor_func_  = nullptr;
        }
      }

      explicit operator bool () const {return s_ != nullptr;}

    private:
      scheduler* s_;
    };

    monitor_guard
    monitor (atomic_count&, size_t threshold, function<size_t (size_t)>);

    // If initially active thread(s) (besides the one that calls startup())
    // exist before the call to startup(), then they must call join() before
    // executing any tasks. The two common cases where you don't have to call
    // join are a single active thread that calls startup()/shutdown() or
    // active thread(s) that are created after startup().
    //
    void
    join ()
    {
      assert (queue () == nullptr);

      // Lock the mutex to make sure the values set in startup() are visible
      // in this thread.
      //
      lock l (mutex_);
    }

    // If initially active thread(s) participate in multiple schedulers and/or
    // sessions (intervals between startup() and shutdown()), then they must
    // call leave() before joining another scheduler/session. Note that this
    // applies to the active thread that calls shutdown(). Note that a thread
    // can only participate in one scheduler at a time.
    //
    void
    leave ()
    {
      queue (nullptr);
    }

    // Return the number of hardware threads or 0 if unable to determine.
    //
    static size_t
    hardware_concurrency ()
    {
      return std::thread::hardware_concurrency ();
    }

    // Return a prime number that can be used as a lock shard size that's
    // appropriate for the scheduler's concurrency. Use power of two values
    // for mul for higher-contention shards and for div for lower-contention
    // ones. Always return 1 for serial execution.
    //
    // Note: can only be called from threads that have observed startup.
    //
    size_t
    shard_size (size_t mul = 1, size_t div = 1) const;

    // Assuming all the task have been executed, busy-wait for all the threads
    // to become idle. Return the lock over the scheduler mutex. Normally you
    // don't need to call this function directly.
    //
    using lock = std::unique_lock<std::mutex>;

    lock
    wait_idle ();

  private:
    bool
    activate_helper (lock&);

    void
    create_helper (lock&);

    // We restrict ourselves to a single pointer as an argument in hope of
    // a small object optimization. Return NULL.
    //
    // Note that the return type is void* to make the function usable with
    // pthreads (see scheduler.cxx for details).
    //
    static void*
    helper (void*);

    size_t
    suspend (size_t start_count, const atomic_count& task_count);

    // Task encapsulation.
    //
    template <typename F, typename... A>
    struct task_type
    {
      using func_type = std::decay_t<F>;
      using args_type = std::tuple<std::decay_t<A>...>;

      atomic_count* task_count;
      size_t start_count;
      func_type func;
      args_type args;

      template <size_t... i>
      void
      thunk (std::index_sequence<i...>)
      {
        move (func) (std::get<i> (move (args))...);
      }
    };

    template <typename F, typename... A>
    static void
    task_thunk (scheduler&, lock&, void*);

    template <typename T>
    static std::decay_t<T>
    decay_copy (T&& x) {return forward<T> (x);}

  private:
    // Monitor.
    //
    atomic_count*              monitor_count_ = nullptr;  // NULL if not used.
    atomic_count               monitor_tshold_;           // 0 means locked.
    size_t                     monitor_init_;             // Initial count.
    function<size_t (size_t)>  monitor_func_;

    std::mutex mutex_;
    bool shutdown_ = true;  // Shutdown flag.

    optional<size_t> max_stack_;

    // The constraints that we must maintain:
    //
    //                  active <= max_active
    // (init_active + helpers) <= max_threads (soft; see activate_helper())
    //
    // Note that the first three are immutable between the startup() and
    // shutdown() calls so can be accessed without a lock (but see join() and
    // except for max_active_ which can be changed by tune() but only when the
    // scheduler is idle).
    //
    size_t init_active_ = 0; // Initially active threads.
    size_t max_active_  = 0; // Maximum number of active threads.
    size_t max_threads_ = 0; // Maximum number of total threads.

    size_t helpers_     = 0; // Number of helper threads created so far.

    // Every thread that we manage (except for the special deadlock monitor)
    // must be accounted for in one of these counters. And their sum should
    // equal (init_active + helpers).
    //
    size_t active_   = 0;  // Active master threads executing a task.
    size_t idle_     = 0;  // Idle helper threads waiting for a task.
    size_t waiting_  = 0;  // Suspended master threads waiting on their tasks.
    size_t ready_    = 0;  // Ready master thread waiting to become active.
    size_t starting_ = 0;  // Helper threads starting up.

    // Number of waiting threads that are waiting for an external event.
    //
    size_t external_ = 0;

    // Original values (as specified during startup) that can be altered via
    // tuning.
    //
    size_t orig_max_active_ = 0;

    std::condition_variable idle_condv_;  // Idle helpers queue.
    std::condition_variable ready_condv_; // Ready masters queue.

    // Statistics counters.
    //
    size_t stat_max_waiters_;
    size_t stat_wait_collisions_;

    // Progress counter.
    //
    // We increment it for each active->waiting->ready->active transition
    // and it is used for deadlock detection (see deactivate()).
    //
    // Note that it still serves our purpose even if the value wraps around
    // (e.g., on a 32-bit platform).
    //
    atomic_count progress_;

    // Deadlock detection.
    //
    std::thread             dead_thread_;
    std::condition_variable dead_condv_;

    static void*
    deadlock_monitor (void*);

    // Wait queue.
    //
    // A wait slot blocks a bunch of threads. When they are (all) unblocked,
    // they re-examine their respective conditions and either carry on or
    // block again.
    //
    // The wait queue is a shard of slots. A thread picks a slot based on the
    // address of its task count variable. How many slots do we need? This
    // depends on the number of waiters that we can have which cannot be
    // greater than the total number of threads.
    //
    // The pointer to the task count is used to identify the already waiting
    // group of threads for collision statistics.
    //
    struct wait_slot
    {
      std::mutex mutex;
      std::condition_variable condv;
      size_t waiters = 0;
      const atomic_count* task_count;
      bool shutdown = true;
    };

    size_t wait_queue_size_; // Proportional to max_threads.
    unique_ptr<wait_slot[]> wait_queue_;

    // Task queue.
    //
    // Each queue has its own mutex plus we have an atomic total count of the
    // queued tasks. Note that it should only be modified while holding one
    // of the queue locks.
    //
    atomic_count queued_task_count_;

    // For now we only support trivially-destructible tasks.
    //
    struct task_data
    {
      std::aligned_storage<sizeof (void*) * 8>::type data;
      void (*thunk) (scheduler&, lock&, void*);
    };

    // We have two requirements: Firstly, we want to keep the master thread
    // (the one that called wait()) busy working though its own queue for as
    // long as possible before (if at all) it "reincarnates" as a helper. The
    // main reason for this is the limited number of helpers we can create.
    //
    // Secondly, we don't want to block wait() longer than necessary since the
    // master thread can do some work with the result. Plus, overall, we want
    // to "unwind" task hierarchies as soon as possible since they hold up
    // resources such as thread's stack. All this means that the master thread
    // can only work through tasks that it has queued at this "level" of the
    // async()/wait() calls since we know that wait() cannot return until
    // they are done.
    //
    // To satisfy the first requirement, the master and helper threads get the
    // tasks from different ends of the queue: master from the back while
    // helpers from the front. And the master always adds new tasks to the
    // back.
    //
    // To satisfy the second requirement, the master thread stores the index
    // of the first task it has queued at this "level" and makes sure it
    // doesn't try to deque any task beyond that.
    //
    size_t task_queue_depth_; // Multiple of max_active.

    struct task_queue
    {
      std::mutex mutex;
      bool shutdown = false;

      size_t stat_full = 0; // Number of times push() returned NULL.

      // Our task queue is circular with head being the index of the first
      // element and tail -- of the last. Since this makes the empty and one
      // element cases indistinguishable, we also keep the size.
      //
      // The mark is an index somewhere between (figuratively speaking) head
      // and tail, if enabled. If the mark is hit, then it is disabled until
      // the queue becomes empty or it is reset by a push.
      //
      size_t head = 0;
      size_t mark = 0;
      size_t tail = 0;
      size_t size = 0;

      unique_ptr<task_data[]> data;

      task_queue (size_t depth): data (new task_data[depth]) {}
    };

    // Task queue API. Expects the queue mutex to be locked.
    //

    // Push a new task to the queue returning a pointer to the task data to be
    // filled or NULL if the queue is full.
    //
    task_data*
    push (task_queue& tq)
    {
      size_t& s (tq.size);
      size_t& t (tq.tail);
      size_t& m (tq.mark);

      if (s != task_queue_depth_)
      {
        //                                         normal  wrap empty
        //                                         |       |    |
        t = s != 0 ? (t != task_queue_depth_ - 1 ? t + 1 : 0) : t;
        s++;

        if (m == task_queue_depth_) // Enable the mark if first push.
          m = t;

        queued_task_count_.fetch_add (1, std::memory_order_release);
        return &tq.data[t];
      }

      return nullptr;
    }

    bool
    empty_front (task_queue& tq) const {return tq.size == 0;}

    void
    pop_front (task_queue& tq, lock& ql)
    {
      size_t& s (tq.size);
      size_t& h (tq.head);
      size_t& m (tq.mark);

      bool a (h == m); // Adjust mark?
      task_data& td (tq.data[h]);

      //                                         normal  wrap empty
      //                                         |       |    |
      h = s != 1 ? (h != task_queue_depth_ - 1 ? h + 1 : 0) : h;

      if (--s == 0 || a)
        m = h; // Reset or adjust the mark.

      execute (ql, td);
    }

    bool
    empty_back (task_queue& tq) const
    {
      return tq.size == 0 || tq.mark == task_queue_depth_;
    }

    void
    pop_back (task_queue& tq, lock& ql)
    {
      size_t& s (tq.size);
      size_t& t (tq.tail);
      size_t& m (tq.mark);

      bool a (t == m); // Adjust mark?

      task_data& td (tq.data[t]);

      // Save the old queue mark and disable it in case the task we are about
      // to run adds sub-tasks. The first push(), if any, will reset it.
      //
      size_t om (m);
      m = task_queue_depth_;

      //                     normal  wrap                     empty
      //                     |       |                        |
      t = s != 1 ? (t != 0 ? t - 1 : task_queue_depth_ - 1) : t;
      --s;

      execute (ql, td);

      // Restore the old mark (which we might have to adjust).
      //
      if (s == 0)
        m = t;                 // Reset the mark.
      else if (a)
        m = task_queue_depth_; // Disable the mark.
      else
        // What happens if head goes past the old mark? In this case we will
        // get into the empty queue state before we end up making any (wrong)
        // decisions based on this value. Unfortunately there is no way to
        // detect this (and do some sanity asserts) since things can wrap
        // around.
        //
        // To put it another way, the understanding here is that after the
        // task returns we will either have an empty queue or there will still
        // be tasks between the old mark and the current tail, something along
        // these lines:
        //
        // OOOOOXXXXOOO
        //   |  |  |
        //   m  h  t
        //
        m = om;
    }

    void
    execute (lock& ql, task_data& td)
    {
      queued_task_count_.fetch_sub (1, std::memory_order_release);

      // The thunk moves the task data to its stack, releases the lock,
      // and continues to execute the task.
      //
      td.thunk (*this, ql, &td.data);

      // See if we need to call the monitor (see also the serial version
      // in async()).
      //
      if (monitor_count_ != nullptr)
      {
        // Note that we don't care if we don't see the updated values right
        // away.
        //
        if (size_t t = monitor_tshold_.load (memory_order_relaxed))
        {
          // "Lock" the monitor by setting threshold to 0.
          //
          if (monitor_tshold_.compare_exchange_strong (
                t,
                0,
                memory_order_release,
                memory_order_relaxed))
          {
            // Now we are the only ones messing with this.
            //
            size_t v (monitor_count_->load (memory_order_relaxed));

            if (v != monitor_init_)
            {
              // See which direction we are going.
              //
              if (v > monitor_init_ ? (v >= t) : (v <= t))
                t = monitor_func_ (v);
            }

            monitor_tshold_.store (t, memory_order_release);
          }
        }
      }

      ql.lock ();
    }

    // Each thread has its own queue which are stored in this list.
    //
    std::list<task_queue> task_queues_;

    task_queue&
    create_queue ();

    static task_queue*
    queue () noexcept;

    static void
    queue (task_queue*) noexcept;
  };
}

#include <libbuild2/scheduler.txx>

#endif // LIBBUILD2_SCHEDULER_HXX