SimpleThreadPool.h
00001 // This file is part of Eigen, a lightweight C++ template library
00002 // for linear algebra.
00003 //
00004 // Copyright (C) 2014 Benoit Steiner <benoit.steiner.goog@gmail.com>
00005 //
00006 // This Source Code Form is subject to the terms of the Mozilla
00007 // Public License v. 2.0. If a copy of the MPL was not distributed
00008 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
00009 
00010 #ifndef EIGEN_CXX11_THREADPOOL_SIMPLE_THREAD_POOL_H
00011 #define EIGEN_CXX11_THREADPOOL_SIMPLE_THREAD_POOL_H
00012 
00013 namespace Eigen {
00014 
00015 // The implementation of the ThreadPool type ensures that the Schedule method
00016 // runs the functions it is provided in FIFO order when the scheduling is done
00017 // by a single thread.
00018 // Environment provides a way to create threads and also allows to intercept
00019 // task submission and execution.
00020 template <typename Environment>
00021 class SimpleThreadPoolTempl : public ThreadPoolInterface {
00022  public:
00023   // Construct a pool that contains "num_threads" threads.
00024   explicit SimpleThreadPoolTempl(int num_threads, Environment env = Environment())
00025       : env_(env), threads_(num_threads), waiters_(num_threads) {
00026     for (int i = 0; i < num_threads; i++) {
00027       threads_.push_back(env.CreateThread([this, i]() { WorkerLoop(i); }));
00028     }
00029   }
00030 
00031   // Wait until all scheduled work has finished and then destroy the
00032   // set of threads.
00033   ~SimpleThreadPoolTempl() {
00034     {
00035       // Wait for all work to get done.
00036       std::unique_lock<std::mutex> l(mu_);
00037       while (!pending_.empty()) {
00038         empty_.wait(l);
00039       }
00040       exiting_ = true;
00041 
00042       // Wakeup all waiters.
00043       for (auto w : waiters_) {
00044         w->ready = true;
00045         w->task.f = nullptr;
00046         w->cv.notify_one();
00047       }
00048     }
00049 
00050     // Wait for threads to finish.
00051     for (auto t : threads_) {
00052       delete t;
00053     }
00054   }
00055 
00056   // Schedule fn() for execution in the pool of threads. The functions are
00057   // executed in the order in which they are scheduled.
00058   void Schedule(std::function<void()> fn) final {
00059     Task t = env_.CreateTask(std::move(fn));
00060     std::unique_lock<std::mutex> l(mu_);
00061     if (waiters_.empty()) {
00062       pending_.push_back(std::move(t));
00063     } else {
00064       Waiter* w = waiters_.back();
00065       waiters_.pop_back();
00066       w->ready = true;
00067       w->task = std::move(t);
00068       w->cv.notify_one();
00069     }
00070   }
00071 
00072   int NumThreads() const final {
00073     return static_cast<int>(threads_.size());
00074   }
00075 
00076   int CurrentThreadId() const final {
00077     const PerThread* pt = this->GetPerThread();
00078     if (pt->pool == this) {
00079       return pt->thread_id;
00080     } else {
00081       return -1;
00082     }
00083   }
00084 
00085  protected:
00086   void WorkerLoop(int thread_id) {
00087     std::unique_lock<std::mutex> l(mu_);
00088     PerThread* pt = GetPerThread();
00089     pt->pool = this;
00090     pt->thread_id = thread_id;
00091     Waiter w;
00092     Task t;
00093     while (!exiting_) {
00094       if (pending_.empty()) {
00095         // Wait for work to be assigned to me
00096         w.ready = false;
00097         waiters_.push_back(&w);
00098         while (!w.ready) {
00099           w.cv.wait(l);
00100         }
00101         t = w.task;
00102         w.task.f = nullptr;
00103       } else {
00104         // Pick up pending work
00105         t = std::move(pending_.front());
00106         pending_.pop_front();
00107         if (pending_.empty()) {
00108           empty_.notify_all();
00109         }
00110       }
00111       if (t.f) {
00112         mu_.unlock();
00113         env_.ExecuteTask(t);
00114         t.f = nullptr;
00115         mu_.lock();
00116       }
00117     }
00118   }
00119 
00120  private:
00121   typedef typename Environment::Task Task;
00122   typedef typename Environment::EnvThread Thread;
00123 
00124   struct Waiter {
00125     std::condition_variable cv;
00126     Task task;
00127     bool ready;
00128   };
00129 
00130   struct PerThread {
00131     constexpr PerThread() : pool(NULL), thread_id(-1) { }
00132     SimpleThreadPoolTempl* pool;  // Parent pool, or null for normal threads.
00133     int thread_id;                // Worker thread index in pool.
00134   };
00135 
00136   Environment env_;
00137   std::mutex mu_;
00138   MaxSizeVector<Thread*> threads_;  // All threads
00139   MaxSizeVector<Waiter*> waiters_;  // Stack of waiting threads.
00140   std::deque<Task> pending_;        // Queue of pending work
00141   std::condition_variable empty_;   // Signaled on pending_.empty()
00142   bool exiting_ = false;
00143 
00144   PerThread* GetPerThread() const {
00145     EIGEN_THREAD_LOCAL PerThread per_thread;
00146     return &per_thread;
00147   }
00148 };
00149 
00150 typedef SimpleThreadPoolTempl<StlThreadEnvironment> SimpleThreadPool;
00151 
00152 }  // namespace Eigen
00153 
00154 #endif  // EIGEN_CXX11_THREADPOOL_SIMPLE_THREAD_POOL_H
 All Classes Functions Variables Typedefs Enumerator