EventCount.h
00001 // This file is part of Eigen, a lightweight C++ template library
00002 // for linear algebra.
00003 //
00004 // Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
00005 //
00006 // This Source Code Form is subject to the terms of the Mozilla
00007 // Public License v. 2.0. If a copy of the MPL was not distributed
00008 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
00009 
00010 #ifndef EIGEN_CXX11_THREADPOOL_EVENTCOUNT_H_
00011 #define EIGEN_CXX11_THREADPOOL_EVENTCOUNT_H_
00012 
00013 namespace Eigen {
00014 
00015 // EventCount allows to wait for arbitrary predicates in non-blocking
00016 // algorithms. Think of condition variable, but wait predicate does not need to
00017 // be protected by a mutex. Usage:
00018 // Waiting thread does:
00019 //
00020 //   if (predicate)
00021 //     return act();
00022 //   EventCount::Waiter& w = waiters[my_index];
00023 //   ec.Prewait(&w);
00024 //   if (predicate) {
00025 //     ec.CancelWait(&w);
00026 //     return act();
00027 //   }
00028 //   ec.CommitWait(&w);
00029 //
00030 // Notifying thread does:
00031 //
00032 //   predicate = true;
00033 //   ec.Notify(true);
00034 //
00035 // Notify is cheap if there are no waiting threads. Prewait/CommitWait are not
00036 // cheap, but they are executed only if the preceeding predicate check has
00037 // failed.
00038 //
00039 // Algorihtm outline:
00040 // There are two main variables: predicate (managed by user) and state_.
00041 // Operation closely resembles Dekker mutual algorithm:
00042 // https://en.wikipedia.org/wiki/Dekker%27s_algorithm
00043 // Waiting thread sets state_ then checks predicate, Notifying thread sets
00044 // predicate then checks state_. Due to seq_cst fences in between these
00045 // operations it is guaranteed than either waiter will see predicate change
00046 // and won't block, or notifying thread will see state_ change and will unblock
00047 // the waiter, or both. But it can't happen that both threads don't see each
00048 // other changes, which would lead to deadlock.
00049 class EventCount {
00050  public:
00051   class Waiter;
00052 
00053   EventCount(MaxSizeVector<Waiter>& waiters) : waiters_(waiters) {
00054     eigen_assert(waiters.size() < (1 << kWaiterBits) - 1);
00055     // Initialize epoch to something close to overflow to test overflow.
00056     state_ = kStackMask | (kEpochMask - kEpochInc * waiters.size() * 2);
00057   }
00058 
00059   ~EventCount() {
00060     // Ensure there are no waiters.
00061     eigen_assert((state_.load() & (kStackMask | kWaiterMask)) == kStackMask);
00062   }
00063 
00064   // Prewait prepares for waiting.
00065   // After calling this function the thread must re-check the wait predicate
00066   // and call either CancelWait or CommitWait passing the same Waiter object.
00067   void Prewait(Waiter* w) {
00068     w->epoch = state_.fetch_add(kWaiterInc, std::memory_order_relaxed);
00069     std::atomic_thread_fence(std::memory_order_seq_cst);
00070   }
00071 
00072   // CommitWait commits waiting.
00073   void CommitWait(Waiter* w) {
00074     w->state = Waiter::kNotSignaled;
00075     // Modification epoch of this waiter.
00076     uint64_t epoch =
00077         (w->epoch & kEpochMask) +
00078         (((w->epoch & kWaiterMask) >> kWaiterShift) << kEpochShift);
00079     uint64_t state = state_.load(std::memory_order_seq_cst);
00080     for (;;) {
00081       if (int64_t((state & kEpochMask) - epoch) < 0) {
00082         // The preceeding waiter has not decided on its fate. Wait until it
00083         // calls either CancelWait or CommitWait, or is notified.
00084         EIGEN_THREAD_YIELD();
00085         state = state_.load(std::memory_order_seq_cst);
00086         continue;
00087       }
00088       // We've already been notified.
00089       if (int64_t((state & kEpochMask) - epoch) > 0) return;
00090       // Remove this thread from prewait counter and add it to the waiter list.
00091       eigen_assert((state & kWaiterMask) != 0);
00092       uint64_t newstate = state - kWaiterInc + kEpochInc;
00093       newstate = (newstate & ~kStackMask) | (w - &waiters_[0]);
00094       if ((state & kStackMask) == kStackMask)
00095         w->next.store(nullptr, std::memory_order_relaxed);
00096       else
00097         w->next.store(&waiters_[state & kStackMask], std::memory_order_relaxed);
00098       if (state_.compare_exchange_weak(state, newstate,
00099                                        std::memory_order_release))
00100         break;
00101     }
00102     Park(w);
00103   }
00104 
00105   // CancelWait cancels effects of the previous Prewait call.
00106   void CancelWait(Waiter* w) {
00107     uint64_t epoch =
00108         (w->epoch & kEpochMask) +
00109         (((w->epoch & kWaiterMask) >> kWaiterShift) << kEpochShift);
00110     uint64_t state = state_.load(std::memory_order_relaxed);
00111     for (;;) {
00112       if (int64_t((state & kEpochMask) - epoch) < 0) {
00113         // The preceeding waiter has not decided on its fate. Wait until it
00114         // calls either CancelWait or CommitWait, or is notified.
00115         EIGEN_THREAD_YIELD();
00116         state = state_.load(std::memory_order_relaxed);
00117         continue;
00118       }
00119       // We've already been notified.
00120       if (int64_t((state & kEpochMask) - epoch) > 0) return;
00121       // Remove this thread from prewait counter.
00122       eigen_assert((state & kWaiterMask) != 0);
00123       if (state_.compare_exchange_weak(state, state - kWaiterInc + kEpochInc,
00124                                        std::memory_order_relaxed))
00125         return;
00126     }
00127   }
00128 
00129   // Notify wakes one or all waiting threads.
00130   // Must be called after changing the associated wait predicate.
00131   void Notify(bool all) {
00132     std::atomic_thread_fence(std::memory_order_seq_cst);
00133     uint64_t state = state_.load(std::memory_order_acquire);
00134     for (;;) {
00135       // Easy case: no waiters.
00136       if ((state & kStackMask) == kStackMask && (state & kWaiterMask) == 0)
00137         return;
00138       uint64_t waiters = (state & kWaiterMask) >> kWaiterShift;
00139       uint64_t newstate;
00140       if (all) {
00141         // Reset prewait counter and empty wait list.
00142         newstate = (state & kEpochMask) + (kEpochInc * waiters) + kStackMask;
00143       } else if (waiters) {
00144         // There is a thread in pre-wait state, unblock it.
00145         newstate = state + kEpochInc - kWaiterInc;
00146       } else {
00147         // Pop a waiter from list and unpark it.
00148         Waiter* w = &waiters_[state & kStackMask];
00149         Waiter* wnext = w->next.load(std::memory_order_relaxed);
00150         uint64_t next = kStackMask;
00151         if (wnext != nullptr) next = wnext - &waiters_[0];
00152         // Note: we don't add kEpochInc here. ABA problem on the lock-free stack
00153         // can't happen because a waiter is re-pushed onto the stack only after
00154         // it was in the pre-wait state which inevitably leads to epoch
00155         // increment.
00156         newstate = (state & kEpochMask) + next;
00157       }
00158       if (state_.compare_exchange_weak(state, newstate,
00159                                        std::memory_order_acquire)) {
00160         if (!all && waiters) return;  // unblocked pre-wait thread
00161         if ((state & kStackMask) == kStackMask) return;
00162         Waiter* w = &waiters_[state & kStackMask];
00163         if (!all) w->next.store(nullptr, std::memory_order_relaxed);
00164         Unpark(w);
00165         return;
00166       }
00167     }
00168   }
00169 
00170   class Waiter {
00171     friend class EventCount;
00172     // Align to 128 byte boundary to prevent false sharing with other Waiter objects in the same vector.
00173     EIGEN_ALIGN_TO_BOUNDARY(128) std::atomic<Waiter*> next;
00174     std::mutex mu;
00175     std::condition_variable cv;
00176     uint64_t epoch;
00177     unsigned state;
00178     enum {
00179       kNotSignaled,
00180       kWaiting,
00181       kSignaled,
00182     };
00183   };
00184 
00185  private:
00186   // State_ layout:
00187   // - low kStackBits is a stack of waiters committed wait.
00188   // - next kWaiterBits is count of waiters in prewait state.
00189   // - next kEpochBits is modification counter.
00190   static const uint64_t kStackBits = 16;
00191   static const uint64_t kStackMask = (1ull << kStackBits) - 1;
00192   static const uint64_t kWaiterBits = 16;
00193   static const uint64_t kWaiterShift = 16;
00194   static const uint64_t kWaiterMask = ((1ull << kWaiterBits) - 1)
00195                                       << kWaiterShift;
00196   static const uint64_t kWaiterInc = 1ull << kWaiterBits;
00197   static const uint64_t kEpochBits = 32;
00198   static const uint64_t kEpochShift = 32;
00199   static const uint64_t kEpochMask = ((1ull << kEpochBits) - 1) << kEpochShift;
00200   static const uint64_t kEpochInc = 1ull << kEpochShift;
00201   std::atomic<uint64_t> state_;
00202   MaxSizeVector<Waiter>& waiters_;
00203 
00204   void Park(Waiter* w) {
00205     std::unique_lock<std::mutex> lock(w->mu);
00206     while (w->state != Waiter::kSignaled) {
00207       w->state = Waiter::kWaiting;
00208       w->cv.wait(lock);
00209     }
00210   }
00211 
00212   void Unpark(Waiter* waiters) {
00213     Waiter* next = nullptr;
00214     for (Waiter* w = waiters; w; w = next) {
00215       next = w->next.load(std::memory_order_relaxed);
00216       unsigned state;
00217       {
00218         std::unique_lock<std::mutex> lock(w->mu);
00219         state = w->state;
00220         w->state = Waiter::kSignaled;
00221       }
00222       // Avoid notifying if it wasn't waiting.
00223       if (state == Waiter::kWaiting) w->cv.notify_one();
00224     }
00225   }
00226 
00227   EventCount(const EventCount&) = delete;
00228   void operator=(const EventCount&) = delete;
00229 };
00230 
00231 }  // namespace Eigen
00232 
00233 #endif  // EIGEN_CXX11_THREADPOOL_EVENTCOUNT_H_
 All Classes Functions Variables Typedefs Enumerator