TensorDeviceThreadPool.h
00001 // This file is part of Eigen, a lightweight C++ template library
00002 // for linear algebra.
00003 //
00004 // Copyright (C) 2014 Benoit Steiner <benoit.steiner.goog@gmail.com>
00005 //
00006 // This Source Code Form is subject to the terms of the Mozilla
00007 // Public License v. 2.0. If a copy of the MPL was not distributed
00008 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
00009 
00010 #if defined(EIGEN_USE_THREADS) && !defined(EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H)
00011 #define EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
00012 
00013 namespace Eigen {
00014 
00015 // Use the SimpleThreadPool by default. We'll switch to the new non blocking
00016 // thread pool later.
00017 #ifndef EIGEN_USE_SIMPLE_THREAD_POOL
00018 template <typename Env> using ThreadPoolTempl = NonBlockingThreadPoolTempl<Env>;
00019 typedef NonBlockingThreadPool ThreadPool;
00020 #else
00021 template <typename Env> using ThreadPoolTempl = SimpleThreadPoolTempl<Env>;
00022 typedef SimpleThreadPool ThreadPool;
00023 #endif
00024 
00025 
00026 // Barrier is an object that allows one or more threads to wait until
00027 // Notify has been called a specified number of times.
00028 class Barrier {
00029  public:
00030   Barrier(unsigned int count) : state_(count << 1), notified_(false) {
00031     eigen_assert(((count << 1) >> 1) == count);
00032   }
00033   ~Barrier() {
00034     eigen_assert((state_>>1) == 0);
00035   }
00036 
00037   void Notify() {
00038     unsigned int v = state_.fetch_sub(2, std::memory_order_acq_rel) - 2;
00039     if (v != 1) {
00040       eigen_assert(((v + 2) & ~1) != 0);
00041       return;  // either count has not dropped to 0, or waiter is not waiting
00042     }
00043     std::unique_lock<std::mutex> l(mu_);
00044     eigen_assert(!notified_);
00045     notified_ = true;
00046     cv_.notify_all();
00047   }
00048 
00049   void Wait() {
00050     unsigned int v = state_.fetch_or(1, std::memory_order_acq_rel);
00051     if ((v >> 1) == 0) return;
00052     std::unique_lock<std::mutex> l(mu_);
00053     while (!notified_) {
00054       cv_.wait(l);
00055     }
00056   }
00057 
00058  private:
00059   std::mutex mu_;
00060   std::condition_variable cv_;
00061   std::atomic<unsigned int> state_;  // low bit is waiter flag
00062   bool notified_;
00063 };
00064 
00065 
00066 // Notification is an object that allows a user to to wait for another
00067 // thread to signal a notification that an event has occurred.
00068 //
00069 // Multiple threads can wait on the same Notification object,
00070 // but only one caller must call Notify() on the object.
00071 struct Notification : Barrier {
00072   Notification() : Barrier(1) {};
00073 };
00074 
00075 
00076 // Runs an arbitrary function and then calls Notify() on the passed in
00077 // Notification.
00078 template <typename Function, typename... Args> struct FunctionWrapperWithNotification
00079 {
00080   static void run(Notification* n, Function f, Args... args) {
00081     f(args...);
00082     if (n) {
00083       n->Notify();
00084     }
00085   }
00086 };
00087 
00088 template <typename Function, typename... Args> struct FunctionWrapperWithBarrier
00089 {
00090   static void run(Barrier* b, Function f, Args... args) {
00091     f(args...);
00092     if (b) {
00093       b->Notify();
00094     }
00095   }
00096 };
00097 
00098 template <typename SyncType>
00099 static EIGEN_STRONG_INLINE void wait_until_ready(SyncType* n) {
00100   if (n) {
00101     n->Wait();
00102   }
00103 }
00104 
00105 
00106 // Build a thread pool device on top the an existing pool of threads.
00107 struct ThreadPoolDevice {
00108   // The ownership of the thread pool remains with the caller.
00109   ThreadPoolDevice(ThreadPoolInterface* pool, int num_cores) : pool_(pool), num_threads_(num_cores) { }
00110 
00111   EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const {
00112     return internal::aligned_malloc(num_bytes);
00113   }
00114 
00115   EIGEN_STRONG_INLINE void deallocate(void* buffer) const {
00116     internal::aligned_free(buffer);
00117   }
00118 
00119   EIGEN_STRONG_INLINE void memcpy(void* dst, const void* src, size_t n) const {
00120     ::memcpy(dst, src, n);
00121   }
00122   EIGEN_STRONG_INLINE void memcpyHostToDevice(void* dst, const void* src, size_t n) const {
00123     memcpy(dst, src, n);
00124   }
00125   EIGEN_STRONG_INLINE void memcpyDeviceToHost(void* dst, const void* src, size_t n) const {
00126     memcpy(dst, src, n);
00127   }
00128 
00129   EIGEN_STRONG_INLINE void memset(void* buffer, int c, size_t n) const {
00130     ::memset(buffer, c, n);
00131   }
00132 
00133   EIGEN_STRONG_INLINE int numThreads() const {
00134     return num_threads_;
00135   }
00136 
00137   EIGEN_STRONG_INLINE size_t firstLevelCacheSize() const {
00138     return l1CacheSize();
00139   }
00140 
00141   EIGEN_STRONG_INLINE size_t lastLevelCacheSize() const {
00142     // The l3 cache size is shared between all the cores.
00143     return l3CacheSize() / num_threads_;
00144   }
00145 
00146   EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE int majorDeviceVersion() const {
00147     // Should return an enum that encodes the ISA supported by the CPU
00148     return 1;
00149   }
00150 
00151   template <class Function, class... Args>
00152   EIGEN_STRONG_INLINE Notification* enqueue(Function&& f, Args&&... args) const {
00153     Notification* n = new Notification();
00154     pool_->Schedule(std::bind(&FunctionWrapperWithNotification<Function, Args...>::run, n, f, args...));
00155     return n;
00156   }
00157 
00158   template <class Function, class... Args>
00159   EIGEN_STRONG_INLINE void enqueue_with_barrier(Barrier* b,
00160                                                 Function&& f,
00161                                                 Args&&... args) const {
00162     pool_->Schedule(std::bind(
00163         &FunctionWrapperWithBarrier<Function, Args...>::run, b, f, args...));
00164   }
00165 
00166   template <class Function, class... Args>
00167   EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f, Args&&... args) const {
00168     pool_->Schedule(std::bind(f, args...));
00169   }
00170 
00171   // Returns a logical thread index between 0 and pool_->NumThreads() - 1 if
00172   // called from one of the threads in pool_. Returns -1 otherwise.
00173   EIGEN_STRONG_INLINE int currentThreadId() const {
00174     return pool_->CurrentThreadId();
00175   }
00176 
00177   // parallelFor executes f with [0, n) arguments in parallel and waits for
00178   // completion. F accepts a half-open interval [first, last).
00179   // Block size is choosen based on the iteration cost and resulting parallel
00180   // efficiency. If block_align is not nullptr, it is called to round up the
00181   // block size.
00182   void parallelFor(Index n, const TensorOpCost& cost,
00183                    std::function<Index(Index)> block_align,
00184                    std::function<void(Index, Index)> f) const {
00185     typedef TensorCostModel<ThreadPoolDevice> CostModel;
00186     if (n <= 1 || numThreads() == 1 ||
00187         CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
00188       f(0, n);
00189       return;
00190     }
00191 
00192     // Calculate block size based on (1) the iteration cost and (2) parallel
00193     // efficiency. We want blocks to be not too small to mitigate
00194     // parallelization overheads; not too large to mitigate tail
00195     // effect and potential load imbalance and we also want number
00196     // of blocks to be evenly dividable across threads.
00197 
00198     double block_size_f = 1.0 / CostModel::taskSize(1, cost);
00199     Index block_size = numext::mini(n, numext::maxi<Index>(1, block_size_f));
00200     const Index max_block_size =
00201         numext::mini(n, numext::maxi<Index>(1, 2 * block_size_f));
00202     if (block_align) {
00203       Index new_block_size = block_align(block_size);
00204       eigen_assert(new_block_size >= block_size);
00205       block_size = numext::mini(n, new_block_size);
00206     }
00207     Index block_count = divup(n, block_size);
00208     // Calculate parallel efficiency as fraction of total CPU time used for
00209     // computations:
00210     double max_efficiency =
00211         static_cast<double>(block_count) /
00212         (divup<int>(block_count, numThreads()) * numThreads());
00213     // Now try to increase block size up to max_block_size as long as it
00214     // doesn't decrease parallel efficiency.
00215     for (Index prev_block_count = block_count; prev_block_count > 1;) {
00216       // This is the next block size that divides size into a smaller number
00217       // of blocks than the current block_size.
00218       Index coarser_block_size = divup(n, prev_block_count - 1);
00219       if (block_align) {
00220         Index new_block_size = block_align(coarser_block_size);
00221         eigen_assert(new_block_size >= coarser_block_size);
00222         coarser_block_size = numext::mini(n, new_block_size);
00223       }
00224       if (coarser_block_size > max_block_size) {
00225         break;  // Reached max block size. Stop.
00226       }
00227       // Recalculate parallel efficiency.
00228       const Index coarser_block_count = divup(n, coarser_block_size);
00229       eigen_assert(coarser_block_count < prev_block_count);
00230       prev_block_count = coarser_block_count;
00231       const double coarser_efficiency =
00232           static_cast<double>(coarser_block_count) /
00233           (divup<int>(coarser_block_count, numThreads()) * numThreads());
00234       if (coarser_efficiency + 0.01 >= max_efficiency) {
00235         // Taking it.
00236         block_size = coarser_block_size;
00237         block_count = coarser_block_count;
00238         if (max_efficiency < coarser_efficiency) {
00239           max_efficiency = coarser_efficiency;
00240         }
00241       }
00242     }
00243 
00244     // Recursively divide size into halves until we reach block_size.
00245     // Division code rounds mid to block_size, so we are guaranteed to get
00246     // block_count leaves that do actual computations.
00247     Barrier barrier(static_cast<unsigned int>(block_count));
00248     std::function<void(Index, Index)> handleRange;
00249     handleRange = [=, &handleRange, &barrier, &f](Index first, Index last) {
00250       if (last - first <= block_size) {
00251         // Single block or less, execute directly.
00252         f(first, last);
00253         barrier.Notify();
00254         return;
00255       }
00256       // Split into halves and submit to the pool.
00257       Index mid = first + divup((last - first) / 2, block_size) * block_size;
00258       pool_->Schedule([=, &handleRange]() { handleRange(mid, last); });
00259       pool_->Schedule([=, &handleRange]() { handleRange(first, mid); });
00260     };
00261     handleRange(0, n);
00262     barrier.Wait();
00263   }
00264 
00265   // Convenience wrapper for parallelFor that does not align blocks.
00266   void parallelFor(Index n, const TensorOpCost& cost,
00267                    std::function<void(Index, Index)> f) const {
00268     parallelFor(n, cost, nullptr, std::move(f));
00269   }
00270 
00271  private:
00272   ThreadPoolInterface* pool_;
00273   int num_threads_;
00274 };
00275 
00276 
00277 }  // end namespace Eigen
00278 
00279 #endif // EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
 All Classes Functions Variables Typedefs Enumerator