![]() |
Eigen-unsupported
3.3.3
|
00001 // This file is part of Eigen, a lightweight C++ template library 00002 // for linear algebra. 00003 // 00004 // Copyright (C) 2014 Benoit Steiner <benoit.steiner.goog@gmail.com> 00005 // 00006 // This Source Code Form is subject to the terms of the Mozilla 00007 // Public License v. 2.0. If a copy of the MPL was not distributed 00008 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/. 00009 00010 #ifndef EIGEN_CXX11_THREADPOOL_SIMPLE_THREAD_POOL_H 00011 #define EIGEN_CXX11_THREADPOOL_SIMPLE_THREAD_POOL_H 00012 00013 namespace Eigen { 00014 00015 // The implementation of the ThreadPool type ensures that the Schedule method 00016 // runs the functions it is provided in FIFO order when the scheduling is done 00017 // by a single thread. 00018 // Environment provides a way to create threads and also allows to intercept 00019 // task submission and execution. 00020 template <typename Environment> 00021 class SimpleThreadPoolTempl : public ThreadPoolInterface { 00022 public: 00023 // Construct a pool that contains "num_threads" threads. 00024 explicit SimpleThreadPoolTempl(int num_threads, Environment env = Environment()) 00025 : env_(env), threads_(num_threads), waiters_(num_threads) { 00026 for (int i = 0; i < num_threads; i++) { 00027 threads_.push_back(env.CreateThread([this, i]() { WorkerLoop(i); })); 00028 } 00029 } 00030 00031 // Wait until all scheduled work has finished and then destroy the 00032 // set of threads. 00033 ~SimpleThreadPoolTempl() { 00034 { 00035 // Wait for all work to get done. 00036 std::unique_lock<std::mutex> l(mu_); 00037 while (!pending_.empty()) { 00038 empty_.wait(l); 00039 } 00040 exiting_ = true; 00041 00042 // Wakeup all waiters. 00043 for (auto w : waiters_) { 00044 w->ready = true; 00045 w->task.f = nullptr; 00046 w->cv.notify_one(); 00047 } 00048 } 00049 00050 // Wait for threads to finish. 00051 for (auto t : threads_) { 00052 delete t; 00053 } 00054 } 00055 00056 // Schedule fn() for execution in the pool of threads. The functions are 00057 // executed in the order in which they are scheduled. 00058 void Schedule(std::function<void()> fn) final { 00059 Task t = env_.CreateTask(std::move(fn)); 00060 std::unique_lock<std::mutex> l(mu_); 00061 if (waiters_.empty()) { 00062 pending_.push_back(std::move(t)); 00063 } else { 00064 Waiter* w = waiters_.back(); 00065 waiters_.pop_back(); 00066 w->ready = true; 00067 w->task = std::move(t); 00068 w->cv.notify_one(); 00069 } 00070 } 00071 00072 int NumThreads() const final { 00073 return static_cast<int>(threads_.size()); 00074 } 00075 00076 int CurrentThreadId() const final { 00077 const PerThread* pt = this->GetPerThread(); 00078 if (pt->pool == this) { 00079 return pt->thread_id; 00080 } else { 00081 return -1; 00082 } 00083 } 00084 00085 protected: 00086 void WorkerLoop(int thread_id) { 00087 std::unique_lock<std::mutex> l(mu_); 00088 PerThread* pt = GetPerThread(); 00089 pt->pool = this; 00090 pt->thread_id = thread_id; 00091 Waiter w; 00092 Task t; 00093 while (!exiting_) { 00094 if (pending_.empty()) { 00095 // Wait for work to be assigned to me 00096 w.ready = false; 00097 waiters_.push_back(&w); 00098 while (!w.ready) { 00099 w.cv.wait(l); 00100 } 00101 t = w.task; 00102 w.task.f = nullptr; 00103 } else { 00104 // Pick up pending work 00105 t = std::move(pending_.front()); 00106 pending_.pop_front(); 00107 if (pending_.empty()) { 00108 empty_.notify_all(); 00109 } 00110 } 00111 if (t.f) { 00112 mu_.unlock(); 00113 env_.ExecuteTask(t); 00114 t.f = nullptr; 00115 mu_.lock(); 00116 } 00117 } 00118 } 00119 00120 private: 00121 typedef typename Environment::Task Task; 00122 typedef typename Environment::EnvThread Thread; 00123 00124 struct Waiter { 00125 std::condition_variable cv; 00126 Task task; 00127 bool ready; 00128 }; 00129 00130 struct PerThread { 00131 constexpr PerThread() : pool(NULL), thread_id(-1) { } 00132 SimpleThreadPoolTempl* pool; // Parent pool, or null for normal threads. 00133 int thread_id; // Worker thread index in pool. 00134 }; 00135 00136 Environment env_; 00137 std::mutex mu_; 00138 MaxSizeVector<Thread*> threads_; // All threads 00139 MaxSizeVector<Waiter*> waiters_; // Stack of waiting threads. 00140 std::deque<Task> pending_; // Queue of pending work 00141 std::condition_variable empty_; // Signaled on pending_.empty() 00142 bool exiting_ = false; 00143 00144 PerThread* GetPerThread() const { 00145 EIGEN_THREAD_LOCAL PerThread per_thread; 00146 return &per_thread; 00147 } 00148 }; 00149 00150 typedef SimpleThreadPoolTempl<StlThreadEnvironment> SimpleThreadPool; 00151 00152 } // namespace Eigen 00153 00154 #endif // EIGEN_CXX11_THREADPOOL_SIMPLE_THREAD_POOL_H