![]() |
Eigen-unsupported
3.3.3
|
00001 // This file is part of Eigen, a lightweight C++ template library 00002 // for linear algebra. 00003 // 00004 // Copyright (C) 2014 Benoit Steiner <benoit.steiner.goog@gmail.com> 00005 // 00006 // This Source Code Form is subject to the terms of the Mozilla 00007 // Public License v. 2.0. If a copy of the MPL was not distributed 00008 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/. 00009 00010 #if defined(EIGEN_USE_THREADS) && !defined(EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H) 00011 #define EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H 00012 00013 namespace Eigen { 00014 00015 // Use the SimpleThreadPool by default. We'll switch to the new non blocking 00016 // thread pool later. 00017 #ifndef EIGEN_USE_SIMPLE_THREAD_POOL 00018 template <typename Env> using ThreadPoolTempl = NonBlockingThreadPoolTempl<Env>; 00019 typedef NonBlockingThreadPool ThreadPool; 00020 #else 00021 template <typename Env> using ThreadPoolTempl = SimpleThreadPoolTempl<Env>; 00022 typedef SimpleThreadPool ThreadPool; 00023 #endif 00024 00025 00026 // Barrier is an object that allows one or more threads to wait until 00027 // Notify has been called a specified number of times. 00028 class Barrier { 00029 public: 00030 Barrier(unsigned int count) : state_(count << 1), notified_(false) { 00031 eigen_assert(((count << 1) >> 1) == count); 00032 } 00033 ~Barrier() { 00034 eigen_assert((state_>>1) == 0); 00035 } 00036 00037 void Notify() { 00038 unsigned int v = state_.fetch_sub(2, std::memory_order_acq_rel) - 2; 00039 if (v != 1) { 00040 eigen_assert(((v + 2) & ~1) != 0); 00041 return; // either count has not dropped to 0, or waiter is not waiting 00042 } 00043 std::unique_lock<std::mutex> l(mu_); 00044 eigen_assert(!notified_); 00045 notified_ = true; 00046 cv_.notify_all(); 00047 } 00048 00049 void Wait() { 00050 unsigned int v = state_.fetch_or(1, std::memory_order_acq_rel); 00051 if ((v >> 1) == 0) return; 00052 std::unique_lock<std::mutex> l(mu_); 00053 while (!notified_) { 00054 cv_.wait(l); 00055 } 00056 } 00057 00058 private: 00059 std::mutex mu_; 00060 std::condition_variable cv_; 00061 std::atomic<unsigned int> state_; // low bit is waiter flag 00062 bool notified_; 00063 }; 00064 00065 00066 // Notification is an object that allows a user to to wait for another 00067 // thread to signal a notification that an event has occurred. 00068 // 00069 // Multiple threads can wait on the same Notification object, 00070 // but only one caller must call Notify() on the object. 00071 struct Notification : Barrier { 00072 Notification() : Barrier(1) {}; 00073 }; 00074 00075 00076 // Runs an arbitrary function and then calls Notify() on the passed in 00077 // Notification. 00078 template <typename Function, typename... Args> struct FunctionWrapperWithNotification 00079 { 00080 static void run(Notification* n, Function f, Args... args) { 00081 f(args...); 00082 if (n) { 00083 n->Notify(); 00084 } 00085 } 00086 }; 00087 00088 template <typename Function, typename... Args> struct FunctionWrapperWithBarrier 00089 { 00090 static void run(Barrier* b, Function f, Args... args) { 00091 f(args...); 00092 if (b) { 00093 b->Notify(); 00094 } 00095 } 00096 }; 00097 00098 template <typename SyncType> 00099 static EIGEN_STRONG_INLINE void wait_until_ready(SyncType* n) { 00100 if (n) { 00101 n->Wait(); 00102 } 00103 } 00104 00105 00106 // Build a thread pool device on top the an existing pool of threads. 00107 struct ThreadPoolDevice { 00108 // The ownership of the thread pool remains with the caller. 00109 ThreadPoolDevice(ThreadPoolInterface* pool, int num_cores) : pool_(pool), num_threads_(num_cores) { } 00110 00111 EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const { 00112 return internal::aligned_malloc(num_bytes); 00113 } 00114 00115 EIGEN_STRONG_INLINE void deallocate(void* buffer) const { 00116 internal::aligned_free(buffer); 00117 } 00118 00119 EIGEN_STRONG_INLINE void memcpy(void* dst, const void* src, size_t n) const { 00120 ::memcpy(dst, src, n); 00121 } 00122 EIGEN_STRONG_INLINE void memcpyHostToDevice(void* dst, const void* src, size_t n) const { 00123 memcpy(dst, src, n); 00124 } 00125 EIGEN_STRONG_INLINE void memcpyDeviceToHost(void* dst, const void* src, size_t n) const { 00126 memcpy(dst, src, n); 00127 } 00128 00129 EIGEN_STRONG_INLINE void memset(void* buffer, int c, size_t n) const { 00130 ::memset(buffer, c, n); 00131 } 00132 00133 EIGEN_STRONG_INLINE int numThreads() const { 00134 return num_threads_; 00135 } 00136 00137 EIGEN_STRONG_INLINE size_t firstLevelCacheSize() const { 00138 return l1CacheSize(); 00139 } 00140 00141 EIGEN_STRONG_INLINE size_t lastLevelCacheSize() const { 00142 // The l3 cache size is shared between all the cores. 00143 return l3CacheSize() / num_threads_; 00144 } 00145 00146 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE int majorDeviceVersion() const { 00147 // Should return an enum that encodes the ISA supported by the CPU 00148 return 1; 00149 } 00150 00151 template <class Function, class... Args> 00152 EIGEN_STRONG_INLINE Notification* enqueue(Function&& f, Args&&... args) const { 00153 Notification* n = new Notification(); 00154 pool_->Schedule(std::bind(&FunctionWrapperWithNotification<Function, Args...>::run, n, f, args...)); 00155 return n; 00156 } 00157 00158 template <class Function, class... Args> 00159 EIGEN_STRONG_INLINE void enqueue_with_barrier(Barrier* b, 00160 Function&& f, 00161 Args&&... args) const { 00162 pool_->Schedule(std::bind( 00163 &FunctionWrapperWithBarrier<Function, Args...>::run, b, f, args...)); 00164 } 00165 00166 template <class Function, class... Args> 00167 EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f, Args&&... args) const { 00168 pool_->Schedule(std::bind(f, args...)); 00169 } 00170 00171 // Returns a logical thread index between 0 and pool_->NumThreads() - 1 if 00172 // called from one of the threads in pool_. Returns -1 otherwise. 00173 EIGEN_STRONG_INLINE int currentThreadId() const { 00174 return pool_->CurrentThreadId(); 00175 } 00176 00177 // parallelFor executes f with [0, n) arguments in parallel and waits for 00178 // completion. F accepts a half-open interval [first, last). 00179 // Block size is choosen based on the iteration cost and resulting parallel 00180 // efficiency. If block_align is not nullptr, it is called to round up the 00181 // block size. 00182 void parallelFor(Index n, const TensorOpCost& cost, 00183 std::function<Index(Index)> block_align, 00184 std::function<void(Index, Index)> f) const { 00185 typedef TensorCostModel<ThreadPoolDevice> CostModel; 00186 if (n <= 1 || numThreads() == 1 || 00187 CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) { 00188 f(0, n); 00189 return; 00190 } 00191 00192 // Calculate block size based on (1) the iteration cost and (2) parallel 00193 // efficiency. We want blocks to be not too small to mitigate 00194 // parallelization overheads; not too large to mitigate tail 00195 // effect and potential load imbalance and we also want number 00196 // of blocks to be evenly dividable across threads. 00197 00198 double block_size_f = 1.0 / CostModel::taskSize(1, cost); 00199 Index block_size = numext::mini(n, numext::maxi<Index>(1, block_size_f)); 00200 const Index max_block_size = 00201 numext::mini(n, numext::maxi<Index>(1, 2 * block_size_f)); 00202 if (block_align) { 00203 Index new_block_size = block_align(block_size); 00204 eigen_assert(new_block_size >= block_size); 00205 block_size = numext::mini(n, new_block_size); 00206 } 00207 Index block_count = divup(n, block_size); 00208 // Calculate parallel efficiency as fraction of total CPU time used for 00209 // computations: 00210 double max_efficiency = 00211 static_cast<double>(block_count) / 00212 (divup<int>(block_count, numThreads()) * numThreads()); 00213 // Now try to increase block size up to max_block_size as long as it 00214 // doesn't decrease parallel efficiency. 00215 for (Index prev_block_count = block_count; prev_block_count > 1;) { 00216 // This is the next block size that divides size into a smaller number 00217 // of blocks than the current block_size. 00218 Index coarser_block_size = divup(n, prev_block_count - 1); 00219 if (block_align) { 00220 Index new_block_size = block_align(coarser_block_size); 00221 eigen_assert(new_block_size >= coarser_block_size); 00222 coarser_block_size = numext::mini(n, new_block_size); 00223 } 00224 if (coarser_block_size > max_block_size) { 00225 break; // Reached max block size. Stop. 00226 } 00227 // Recalculate parallel efficiency. 00228 const Index coarser_block_count = divup(n, coarser_block_size); 00229 eigen_assert(coarser_block_count < prev_block_count); 00230 prev_block_count = coarser_block_count; 00231 const double coarser_efficiency = 00232 static_cast<double>(coarser_block_count) / 00233 (divup<int>(coarser_block_count, numThreads()) * numThreads()); 00234 if (coarser_efficiency + 0.01 >= max_efficiency) { 00235 // Taking it. 00236 block_size = coarser_block_size; 00237 block_count = coarser_block_count; 00238 if (max_efficiency < coarser_efficiency) { 00239 max_efficiency = coarser_efficiency; 00240 } 00241 } 00242 } 00243 00244 // Recursively divide size into halves until we reach block_size. 00245 // Division code rounds mid to block_size, so we are guaranteed to get 00246 // block_count leaves that do actual computations. 00247 Barrier barrier(static_cast<unsigned int>(block_count)); 00248 std::function<void(Index, Index)> handleRange; 00249 handleRange = [=, &handleRange, &barrier, &f](Index first, Index last) { 00250 if (last - first <= block_size) { 00251 // Single block or less, execute directly. 00252 f(first, last); 00253 barrier.Notify(); 00254 return; 00255 } 00256 // Split into halves and submit to the pool. 00257 Index mid = first + divup((last - first) / 2, block_size) * block_size; 00258 pool_->Schedule([=, &handleRange]() { handleRange(mid, last); }); 00259 pool_->Schedule([=, &handleRange]() { handleRange(first, mid); }); 00260 }; 00261 handleRange(0, n); 00262 barrier.Wait(); 00263 } 00264 00265 // Convenience wrapper for parallelFor that does not align blocks. 00266 void parallelFor(Index n, const TensorOpCost& cost, 00267 std::function<void(Index, Index)> f) const { 00268 parallelFor(n, cost, nullptr, std::move(f)); 00269 } 00270 00271 private: 00272 ThreadPoolInterface* pool_; 00273 int num_threads_; 00274 }; 00275 00276 00277 } // end namespace Eigen 00278 00279 #endif // EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H