RunQueue.h
00001 // This file is part of Eigen, a lightweight C++ template library
00002 // for linear algebra.
00003 //
00004 // Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
00005 //
00006 // This Source Code Form is subject to the terms of the Mozilla
00007 // Public License v. 2.0. If a copy of the MPL was not distributed
00008 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
00009 
00010 #ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
00011 #define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
00012 
00013 
00014 namespace Eigen {
00015 
00016 // RunQueue is a fixed-size, partially non-blocking deque or Work items.
00017 // Operations on front of the queue must be done by a single thread (owner),
00018 // operations on back of the queue can be done by multiple threads concurrently.
00019 //
00020 // Algorithm outline:
00021 // All remote threads operating on the queue back are serialized by a mutex.
00022 // This ensures that at most two threads access state: owner and one remote
00023 // thread (Size aside). The algorithm ensures that the occupied region of the
00024 // underlying array is logically continuous (can wraparound, but no stray
00025 // occupied elements). Owner operates on one end of this region, remote thread
00026 // operates on the other end. Synchronization between these threads
00027 // (potential consumption of the last element and take up of the last empty
00028 // element) happens by means of state variable in each element. States are:
00029 // empty, busy (in process of insertion of removal) and ready. Threads claim
00030 // elements (empty->busy and ready->busy transitions) by means of a CAS
00031 // operation. The finishing transition (busy->empty and busy->ready) are done
00032 // with plain store as the element is exclusively owned by the current thread.
00033 //
00034 // Note: we could permit only pointers as elements, then we would not need
00035 // separate state variable as null/non-null pointer value would serve as state,
00036 // but that would require malloc/free per operation for large, complex values
00037 // (and this is designed to store std::function<()>).
00038 template <typename Work, unsigned kSize>
00039 class RunQueue {
00040  public:
00041   RunQueue() : front_(0), back_(0) {
00042     // require power-of-two for fast masking
00043     eigen_assert((kSize & (kSize - 1)) == 0);
00044     eigen_assert(kSize > 2);            // why would you do this?
00045     eigen_assert(kSize <= (64 << 10));  // leave enough space for counter
00046     for (unsigned i = 0; i < kSize; i++)
00047       array_[i].state.store(kEmpty, std::memory_order_relaxed);
00048   }
00049 
00050   ~RunQueue() { eigen_assert(Size() == 0); }
00051 
00052   // PushFront inserts w at the beginning of the queue.
00053   // If queue is full returns w, otherwise returns default-constructed Work.
00054   Work PushFront(Work w) {
00055     unsigned front = front_.load(std::memory_order_relaxed);
00056     Elem* e = &array_[front & kMask];
00057     uint8_t s = e->state.load(std::memory_order_relaxed);
00058     if (s != kEmpty ||
00059         !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
00060       return w;
00061     front_.store(front + 1 + (kSize << 1), std::memory_order_relaxed);
00062     e->w = std::move(w);
00063     e->state.store(kReady, std::memory_order_release);
00064     return Work();
00065   }
00066 
00067   // PopFront removes and returns the first element in the queue.
00068   // If the queue was empty returns default-constructed Work.
00069   Work PopFront() {
00070     unsigned front = front_.load(std::memory_order_relaxed);
00071     Elem* e = &array_[(front - 1) & kMask];
00072     uint8_t s = e->state.load(std::memory_order_relaxed);
00073     if (s != kReady ||
00074         !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
00075       return Work();
00076     Work w = std::move(e->w);
00077     e->state.store(kEmpty, std::memory_order_release);
00078     front = ((front - 1) & kMask2) | (front & ~kMask2);
00079     front_.store(front, std::memory_order_relaxed);
00080     return w;
00081   }
00082 
00083   // PushBack adds w at the end of the queue.
00084   // If queue is full returns w, otherwise returns default-constructed Work.
00085   Work PushBack(Work w) {
00086     std::unique_lock<std::mutex> lock(mutex_);
00087     unsigned back = back_.load(std::memory_order_relaxed);
00088     Elem* e = &array_[(back - 1) & kMask];
00089     uint8_t s = e->state.load(std::memory_order_relaxed);
00090     if (s != kEmpty ||
00091         !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
00092       return w;
00093     back = ((back - 1) & kMask2) | (back & ~kMask2);
00094     back_.store(back, std::memory_order_relaxed);
00095     e->w = std::move(w);
00096     e->state.store(kReady, std::memory_order_release);
00097     return Work();
00098   }
00099 
00100   // PopBack removes and returns the last elements in the queue.
00101   // Can fail spuriously.
00102   Work PopBack() {
00103     if (Empty()) return Work();
00104     std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
00105     if (!lock) return Work();
00106     unsigned back = back_.load(std::memory_order_relaxed);
00107     Elem* e = &array_[back & kMask];
00108     uint8_t s = e->state.load(std::memory_order_relaxed);
00109     if (s != kReady ||
00110         !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
00111       return Work();
00112     Work w = std::move(e->w);
00113     e->state.store(kEmpty, std::memory_order_release);
00114     back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed);
00115     return w;
00116   }
00117 
00118   // PopBackHalf removes and returns half last elements in the queue.
00119   // Returns number of elements removed. But can also fail spuriously.
00120   unsigned PopBackHalf(std::vector<Work>* result) {
00121     if (Empty()) return 0;
00122     std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
00123     if (!lock) return 0;
00124     unsigned back = back_.load(std::memory_order_relaxed);
00125     unsigned size = Size();
00126     unsigned mid = back;
00127     if (size > 1) mid = back + (size - 1) / 2;
00128     unsigned n = 0;
00129     unsigned start = 0;
00130     for (; static_cast<int>(mid - back) >= 0; mid--) {
00131       Elem* e = &array_[mid & kMask];
00132       uint8_t s = e->state.load(std::memory_order_relaxed);
00133       if (n == 0) {
00134         if (s != kReady ||
00135             !e->state.compare_exchange_strong(s, kBusy,
00136                                               std::memory_order_acquire))
00137           continue;
00138         start = mid;
00139       } else {
00140         // Note: no need to store temporal kBusy, we exclusively own these
00141         // elements.
00142         eigen_assert(s == kReady);
00143       }
00144       result->push_back(std::move(e->w));
00145       e->state.store(kEmpty, std::memory_order_release);
00146       n++;
00147     }
00148     if (n != 0)
00149       back_.store(start + 1 + (kSize << 1), std::memory_order_relaxed);
00150     return n;
00151   }
00152 
00153   // Size returns current queue size.
00154   // Can be called by any thread at any time.
00155   unsigned Size() const {
00156     // Emptiness plays critical role in thread pool blocking. So we go to great
00157     // effort to not produce false positives (claim non-empty queue as empty).
00158     for (;;) {
00159       // Capture a consistent snapshot of front/tail.
00160       unsigned front = front_.load(std::memory_order_acquire);
00161       unsigned back = back_.load(std::memory_order_acquire);
00162       unsigned front1 = front_.load(std::memory_order_relaxed);
00163       if (front != front1) continue;
00164       int size = (front & kMask2) - (back & kMask2);
00165       // Fix overflow.
00166       if (size < 0) size += 2 * kSize;
00167       // Order of modification in push/pop is crafted to make the queue look
00168       // larger than it is during concurrent modifications. E.g. pop can
00169       // decrement size before the corresponding push has incremented it.
00170       // So the computed size can be up to kSize + 1, fix it.
00171       if (size > static_cast<int>(kSize)) size = kSize;
00172       return size;
00173     }
00174   }
00175 
00176   // Empty tests whether container is empty.
00177   // Can be called by any thread at any time.
00178   bool Empty() const { return Size() == 0; }
00179 
00180  private:
00181   static const unsigned kMask = kSize - 1;
00182   static const unsigned kMask2 = (kSize << 1) - 1;
00183   struct Elem {
00184     std::atomic<uint8_t> state;
00185     Work w;
00186   };
00187   enum {
00188     kEmpty,
00189     kBusy,
00190     kReady,
00191   };
00192   std::mutex mutex_;
00193   // Low log(kSize) + 1 bits in front_ and back_ contain rolling index of
00194   // front/back, repsectively. The remaining bits contain modification counters
00195   // that are incremented on Push operations. This allows us to (1) distinguish
00196   // between empty and full conditions (if we would use log(kSize) bits for
00197   // position, these conditions would be indistinguishable); (2) obtain
00198   // consistent snapshot of front_/back_ for Size operation using the
00199   // modification counters.
00200   std::atomic<unsigned> front_;
00201   std::atomic<unsigned> back_;
00202   Elem array_[kSize];
00203 
00204   RunQueue(const RunQueue&) = delete;
00205   void operator=(const RunQueue&) = delete;
00206 };
00207 
00208 }  // namespace Eigen
00209 
00210 #endif  // EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
 All Classes Functions Variables Typedefs Enumerator