//------------------------------------------------------------------------------ // File: RWMutex.cc //------------------------------------------------------------------------------ /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2018 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "common/StacktraceHere.hh" #include "common/Timing.hh" #include "common/Logging.hh" #include "common/RWMutex.hh" #include "common/PthreadRWMutex.hh" #include "common/SharedMutex.hh" #include #include #include EOSCOMMONNAMESPACE_BEGIN #ifdef EOS_INSTRUMENTED_RWMUTEX std::atomic RWMutex::mRdCumulatedWait_static {0}; std::atomic RWMutex::mWrCumulatedWait_static {0}; std::atomic RWMutex::mRdLockCounterSample_static {0}; std::atomic RWMutex::mWrLockCounterSample_static {0}; std::atomic RWMutex::mRdMaxWait_static {0}; std::atomic RWMutex::mWrMaxWait_static {0}; std::atomic RWMutex::mRdMinWait_static {std::numeric_limits::max()}; std::atomic RWMutex::mWrMinWait_static {std::numeric_limits::max()}; size_t RWMutex::timingCompensation = 0; size_t RWMutex::timingLatency = 0; size_t RWMutex::orderCheckingLatency = 0; size_t RWMutex::lockUnlockDuration = 0; int RWMutex::sSamplingModulo = (int)(0.01 * RAND_MAX); bool RWMutex::staticInitialized = false; bool RWMutex::sEnableGlobalTiming = false; bool RWMutex::sEnableGlobalDeadlockCheck = false; bool RWMutex::sEnableGlobalOrderCheck = false; RWMutex::rules_t* RWMutex::rules_static = NULL; std::map* RWMutex::ruleIndex2Name_static = NULL; std::map* RWMutex::ruleName2Index_static = NULL; thread_local bool* RWMutex::orderCheckReset_staticthread = NULL; thread_local unsigned long RWMutex::ordermask_staticthread[EOS_RWMUTEX_ORDER_NRULES]; std::map* RWMutex::threadOrderCheckResetFlags_static = NULL; pthread_rwlock_t RWMutex::mOrderChkLock; std::mutex RWMutex::sOpMutex; RWMutex::MapMutexNameT RWMutex::sMtxNameMap; RWMutex::MapMutexOpT RWMutex::sTidMtxOpMap; const char* RWMutex::LOCK_STATE[] = {"N", "wLR", "wULR", "LR", "wLW", "wULW", "LW", NULL}; #define EOS_RWMUTEX_CHECKORDER_LOCK if(sEnableGlobalOrderCheck) CheckAndLockOrder(); #define EOS_RWMUTEX_CHECKORDER_UNLOCK if(sEnableGlobalOrderCheck) CheckAndUnlockOrder(); #define EOS_RWMUTEX_TIMER_START \ uint64_t l1stamp = Timing::GetNowInNs(); \ bool issampled = false; uint64_t tstamp = 0; \ if (mEnableTiming || sEnableGlobalTiming) { \ issampled = mEnableSampling ? (!((++mCounter)%mSamplingModulo)) : true; \ if (issampled) tstamp = Timing::GetNowInNs(); \ } // what = mRd or mWr #define EOS_RWMUTEX_TIMER_STOP_AND_UPDATE(what) \ ++(what##LockCounter); \ if(issampled) { \ tstamp = Timing::GetNowInNs() - tstamp; \ if(mEnableTiming) { \ ++(what##LockCounterSample); \ what##CumulatedWait += tstamp; \ bool needloop=true; \ do {size_t mymax = what##MaxWait.load(); \ if (tstamp > mymax) \ needloop = !(what##MaxWait).compare_exchange_strong(mymax, tstamp); \ else needloop = false; \ } while(needloop); \ do {size_t mymin = what##MinWait.load(); \ if (tstamp < mymin) \ needloop = !(what##MinWait).compare_exchange_strong(mymin, tstamp); \ else needloop = false; \ } while(needloop); \ } \ if(sEnableGlobalTiming) { \ ++(what##LockCounterSample_static); \ what##CumulatedWait_static += tstamp; \ bool needloop = true; \ do {size_t mymax = what##MaxWait_static.load(); \ if (tstamp > mymax) \ needloop = !(what##MaxWait_static).compare_exchange_strong(mymax, tstamp); \ else needloop = false; \ } while(needloop); \ do {size_t mymin = what##MinWait_static.load(); \ if (tstamp < mymin) \ needloop = !(what##MinWait_static).compare_exchange_strong(mymin, tstamp); \ else needloop = false; \ } while(needloop); \ } \ } \ uint64_t l2stamp = Timing::GetNowInNs(); \ (what##LockLeadTime) += (l2stamp-l1stamp); #else #define EOS_RWMUTEX_CHECKORDER_LOCK #define EOS_RWMUTEX_CHECKORDER_UNLOCK #define EOS_RWMUTEX_TIMER_START #define EOS_RWMUTEX_TIMER_STOP_AND_UPDATE(what) ++(what##LockCounter); #endif //------------------------------------------------------------------------------ // Constructor //------------------------------------------------------------------------------ RWMutex::RWMutex(bool prefer_rd): mBlocking(false), mMutexImpl(nullptr), mRdLockCounter(0), mWrLockCounter(0), mPreferRd(prefer_rd), mName("unnamed") { // Try to get write lock in 5 seconds, then release quickly and retry wlocktime.tv_sec = 5; wlocktime.tv_nsec = 0; #ifdef EOS_INSTRUMENTED_RWMUTEX mSamplingModulo = 300; if (!staticInitialized) { staticInitialized = true; InitializeClass(); } mCounter = 0; mEnableTiming = false; mEnableSampling = false; mEnableDeadlockCheck = false; mTransientDeadlockCheck = false; nrules = 0; mCollectionMutex = PTHREAD_MUTEX_INITIALIZER; ResetTimingStatistics(); #endif if (getenv("EOS_USE_PTHREAD_MUTEX")) { mMutexImpl = new PthreadRWMutex(prefer_rd); } else { mMutexImpl = new SharedMutex(); } mBlockedForInterval = 10000; mBlockedStackTracing = false; } //------------------------------------------------------------------------------ // Destructor //------------------------------------------------------------------------------ RWMutex::~RWMutex() { #ifdef EOS_INSTRUMENTED_RWMUTEX pthread_rwlock_rdlock(&mOrderChkLock); std::map >* rules = NULL; for (auto rit = rules_static->begin(); rit != rules_static->end(); rit++) { // for each rule for (auto it = rit->second.begin(); it != rit->second.end(); it++) { // for each RWMutex involved in that rule if ((*it) == dynamic_cast(this)) { if (rules == NULL) { rules = new std::map >(*rules_static); } rules->erase(rit->first); // remove the rule if it contains this } } } pthread_rwlock_unlock(&mOrderChkLock); if (rules != NULL) { ResetOrderRule(); // Inserts the remaining rules for (auto it = rules->begin(); it != rules->end(); it++) { AddOrderRule(it->first, it->second); } delete rules; } #endif delete mMutexImpl; } //------------------------------------------------------------------------------ // Move assignment operator //------------------------------------------------------------------------------ RWMutex& RWMutex::operator=(RWMutex&& other) noexcept { if (this != &other) { this->mMutexImpl = other.mMutexImpl; other.mMutexImpl = nullptr; this->mBlocking = other.mBlocking; } return *this; } //------------------------------------------------------------------------------ // Move constructor //------------------------------------------------------------------------------ RWMutex::RWMutex(RWMutex&& other) noexcept { *this = std::move(other); } //------------------------------------------------------------------------------ // Try to read lock the mutex within the timeout value //------------------------------------------------------------------------------ bool RWMutex::TimedRdLock(uint64_t timeout_ns) { EOS_RWMUTEX_CHECKORDER_LOCK; EOS_RWMUTEX_TIMER_START; #ifdef EOS_INSTRUMENTED_RWMUTEX if (sEnableGlobalDeadlockCheck) { mTransientDeadlockCheck = true; } if (mEnableDeadlockCheck || mTransientDeadlockCheck) { EnterCheckDeadlock(true); } #endif int retc = mMutexImpl->TimedRdLock(timeout_ns); #ifdef EOS_INSTRUMENTED_RWMUTEX if (retc && (mEnableDeadlockCheck || mTransientDeadlockCheck)) { ExitCheckDeadlock(true); } #endif EOS_RWMUTEX_TIMER_STOP_AND_UPDATE(mRd); if (retc) { EOS_RWMUTEX_CHECKORDER_UNLOCK; } return (retc == 0); } //------------------------------------------------------------------------------ // Lock for read //------------------------------------------------------------------------------ void RWMutex::LockRead() { EOS_RWMUTEX_CHECKORDER_LOCK; EOS_RWMUTEX_TIMER_START; #ifdef EOS_INSTRUMENTED_RWMUTEX if (sEnableGlobalDeadlockCheck) { mTransientDeadlockCheck = true; } if (mEnableDeadlockCheck || mTransientDeadlockCheck) { EnterCheckDeadlock(true); } #endif int retc = 0; if ((retc = mMutexImpl->LockRead())) { fprintf(stderr, "%s Failed to read-lock: %s\n", __FUNCTION__, strerror(retc)); std::terminate(); } EOS_RWMUTEX_TIMER_STOP_AND_UPDATE(mRd); } //------------------------------------------------------------------------------ // Unlock a read lock //------------------------------------------------------------------------------ void RWMutex::UnLockRead() { EOS_RWMUTEX_CHECKORDER_UNLOCK; #ifdef EOS_INSTRUMENTED_RWMUTEX if (mEnableDeadlockCheck || mTransientDeadlockCheck) { ExitCheckDeadlock(true); } #endif int retc = 0; if ((retc = mMutexImpl->UnLockRead())) { fprintf(stderr, "%s Failed to read-unlock: %s\n", __FUNCTION__, strerror(retc)); std::terminate(); } #ifdef EOS_INSTRUMENTED_RWMUTEX if (!sEnableGlobalDeadlockCheck) { mTransientDeadlockCheck = false; } if (!mEnableDeadlockCheck && !mTransientDeadlockCheck) { DropDeadlockCheck(); } #endif } //------------------------------------------------------------------------------ // Lock for write //------------------------------------------------------------------------------ void RWMutex::LockWrite() { EOS_RWMUTEX_CHECKORDER_LOCK; EOS_RWMUTEX_TIMER_START; #ifdef EOS_INSTRUMENTED_RWMUTEX if (sEnableGlobalDeadlockCheck) { mTransientDeadlockCheck = true; } if (mEnableDeadlockCheck || mTransientDeadlockCheck) { EnterCheckDeadlock(false); } #endif int retc = 0; if (mBlocking) { // A blocking mutex is just a normal lock for write if ((retc = mMutexImpl->LockWrite())) { fprintf(stderr, "%s Failed to write-lock: %s\n", __FUNCTION__, strerror(retc)); std::terminate(); } } else { #ifdef __APPLE__ // Mac does not support timed mutexes if ((retc = mMutexImpl->LockWrite())) { fprintf(stderr, "%s Failed to write-lock: %s\n", __FUNCTION__, strerror(retc)); std::terminate(); } #else // A non-blocking mutex tries for few seconds to write lock, then releases. // It has the side effect, that it allows dead locked readers to jump ahead // the lock queue. while (true) { uint64_t timeout_ns = wlocktime.tv_nsec + wlocktime.tv_sec * 1e9; int rc = mMutexImpl->TimedWrLock(timeout_ns); if (rc) { if (rc != ETIMEDOUT) { fprintf(stderr, "=== WRITE LOCK EXCEPTION == TID=%llu OBJECT=%llx rc=%d\n", (unsigned long long) XrdSysThread::ID(), (unsigned long long) this, rc); std::terminate(); } else { // fprintf(stderr,"==== WRITE LOCK PENDING ==== TID=%llu OBJECT=%llx\n", // (unsigned long long)XrdSysThread::ID(), (unsigned long long)this); std::this_thread::sleep_for(std::chrono::milliseconds(500)); } } else { // fprintf(stderr,"=== WRITE LOCK ACQUIRED ==== TID=%llu OBJECT=%llx\n", // (unsigned long long)XrdSysThread::ID(), (unsigned long long)this); break; } } #endif } EOS_RWMUTEX_TIMER_STOP_AND_UPDATE(mWr); } //------------------------------------------------------------------------------ // Unlock a write lock //------------------------------------------------------------------------------ void RWMutex::UnLockWrite() { EOS_RWMUTEX_CHECKORDER_UNLOCK; #ifdef EOS_INSTRUMENTED_RWMUTEX if (mEnableDeadlockCheck || mTransientDeadlockCheck) { ExitCheckDeadlock(false); } #endif int retc = 0; if ((retc = mMutexImpl->UnLockWrite())) { fprintf(stderr, "%s Failed to write-unlock: %s\n", __FUNCTION__, strerror(retc)); std::terminate(); } #ifdef EOS_INSTRUMENTED_RWMUTEX if (!sEnableGlobalDeadlockCheck) { mTransientDeadlockCheck = false; if (!mEnableDeadlockCheck) { DropDeadlockCheck(); } } #endif } //------------------------------------------------------------------------------ // Lock for write but give up after wlocktime //------------------------------------------------------------------------------ bool RWMutex::TimedWrLock(uint64_t timeout_ns) { EOS_RWMUTEX_CHECKORDER_LOCK; #ifdef EOS_INSTRUMENTED_RWMUTEX if (sEnableGlobalDeadlockCheck) { mTransientDeadlockCheck = true; } if (mEnableDeadlockCheck || mTransientDeadlockCheck) { EnterCheckDeadlock(false); } #endif int retc = mMutexImpl->TimedWrLock(timeout_ns); #ifdef EOS_INSTRUMENTED_RWMUTEX if (retc && (mEnableDeadlockCheck || mTransientDeadlockCheck)) { ExitCheckDeadlock(false); } #endif if (retc == 0) { } else { EOS_RWMUTEX_CHECKORDER_UNLOCK; } return (retc == 0); } #ifdef EOS_INSTRUMENTED_RWMUTEX //------------------------------------------------------------------------------ // Performs the initialization of the class //------------------------------------------------------------------------------ void RWMutex::InitializeClass() { int retc = 0; if ((retc = pthread_rwlock_init(&mOrderChkLock, NULL))) { fprintf(stderr, "%s Failed to initialize order check lock: %s\n", __FUNCTION__, strerror(retc)); std::terminate(); } rules_static = new RWMutex::rules_t(); RWMutex::ruleIndex2Name_static = new std::map; RWMutex::ruleName2Index_static = new std::map; RWMutex::threadOrderCheckResetFlags_static = new std::map; } //------------------------------------------------------------------------------ // Reset statistics at the instance level //------------------------------------------------------------------------------ void RWMutex::ResetTimingStatistics() { // Might need a mutex or at least a flag!!! mRdMaxWait.store(std::numeric_limits::min()); mWrMaxWait.store(std::numeric_limits::min()); mRdMinWait.store(std::numeric_limits::max()); mWrMinWait.store(std::numeric_limits::max()); mRdLockCounterSample.store(0); mWrLockCounterSample.store(0); mRdCumulatedWait.store(0); mWrCumulatedWait.store(0); } //----------------------------------------------------------------------------- // Reset statistics at the class level //------------------------------------------------------------------------------ void RWMutex::ResetTimingStatisticsGlobal() { mRdMaxWait_static.store(std::numeric_limits::min()); mWrMaxWait_static.store(std::numeric_limits::min()); mRdMinWait_static.store(std::numeric_limits::max()); mWrMinWait_static.store(std::numeric_limits::max()); mRdLockCounterSample_static.store(0); mWrLockCounterSample_static.store(0); mRdCumulatedWait_static.store(0); mWrCumulatedWait_static.store(0); } #ifdef __APPLE__ int RWMutex::round(double number) { return (number < 0.0 ? ceil(number - 0.5) : floor(number + 0.5)); } #endif //------------------------------------------------------------------------------ // Check for deadlocks //------------------------------------------------------------------------------ void RWMutex::EnterCheckDeadlock(bool rd_lock) { std::thread::id tid = std::this_thread::get_id(); pthread_mutex_lock(&mCollectionMutex); if (rd_lock) { auto it = mThreadsRdLock.find(tid); if (it != mThreadsRdLock.end()) { ++it->second; // For non-preferred rd lock - since is a re-entrant read lock, if there // is any write lock pending then this will deadlock if (!mPreferRd && mThreadsWrLock.size()) { std::cerr << eos::common::getStacktrace(); pthread_mutex_unlock(&mCollectionMutex); throw std::runtime_error("double read lock during write lock"); } } else { mThreadsRdLock.insert(std::make_pair(tid, 1)); } } else { if (mThreadsWrLock.find(tid) != mThreadsWrLock.end()) { // This is a case of double write lock std::cerr << eos::common::getStacktrace(); pthread_mutex_unlock(&mCollectionMutex); throw std::runtime_error("double write lock"); } mThreadsWrLock.insert(tid); } pthread_mutex_unlock(&mCollectionMutex); } //------------------------------------------------------------------------------ // Helper function to check for deadlocks //------------------------------------------------------------------------------ void RWMutex::ExitCheckDeadlock(bool rd_lock) { std::thread::id tid = std::this_thread::get_id(); pthread_mutex_lock(&mCollectionMutex); if (rd_lock) { auto it = mThreadsRdLock.find(tid); if (it == mThreadsRdLock.end()) { fprintf(stderr, "%s Extra read unlock\n", __FUNCTION__); pthread_mutex_unlock(&mCollectionMutex); throw std::runtime_error("extra read unlock"); } if (--it->second == 0) { mThreadsRdLock.erase(it); } } else { auto it = mThreadsWrLock.find(tid); if (it == mThreadsWrLock.end()) { fprintf(stderr, "%s Extra write unlock\n", __FUNCTION__); pthread_mutex_unlock(&mCollectionMutex); throw std::runtime_error("extra write unlock"); } mThreadsWrLock.erase(it); } pthread_mutex_unlock(&mCollectionMutex); } //------------------------------------------------------------------------------ // Clear the data structures used for detecting deadlocks //------------------------------------------------------------------------------ void RWMutex::DropDeadlockCheck() { pthread_mutex_lock(&mCollectionMutex); mThreadsRdLock.clear(); mThreadsWrLock.clear(); pthread_mutex_unlock(&mCollectionMutex); } //------------------------------------------------------------------------------ // Enable sampling of timings //------------------------------------------------------------------------------ void RWMutex::SetSampling(bool on, float rate) { mEnableSampling = on; ResetTimingStatistics(); if (rate < 0) { mSamplingModulo = sSamplingModulo; } else #ifdef __APPLE__ mSamplingModulo = std::min(RAND_MAX, std::max(0, (int) round(1.0 / rate))); #else mSamplingModulo = std::min(RAND_MAX, std::max(0, (int) std::round(1.0 / rate))); #endif } //------------------------------------------------------------------------------ // Return the timing sampling rate/status //------------------------------------------------------------------------------ float RWMutex::GetSampling() { if (!mEnableSampling) { return -1.0; } else { return 1.0 / mSamplingModulo; } } //------------------------------------------------------------------------------ // Compute the SamplingRate corresponding to a given CPU overhead //------------------------------------------------------------------------------ float RWMutex::GetSamplingRateFromCPUOverhead(const double& overhead) { RWMutex mutex; bool entimglobbak = RWMutex::GetTimingGlobal(); mutex.SetTiming(true); mutex.SetSampling(true, 1.0); RWMutex::SetTimingGlobal(true); size_t monitoredTiming = Timing::GetNowInNs(); for (int k = 0; k < 1e6; k++) { mutex.LockWrite(); mutex.UnLockWrite(); } monitoredTiming = Timing::GetNowInNs() - monitoredTiming; mutex.SetTiming(false); mutex.SetSampling(false); RWMutex::SetTimingGlobal(false); size_t unmonitoredTiming = Timing::GetNowInNs(); for (int k = 0; k < 1e6; k++) { mutex.LockWrite(); mutex.UnLockWrite(); } unmonitoredTiming = Timing::GetNowInNs() - unmonitoredTiming; RWMutex::SetTimingGlobal(entimglobbak); float mutexShare = unmonitoredTiming; float timingShare = monitoredTiming - unmonitoredTiming; float samplingRate = std::min(1.0, std::max(0.0, overhead * mutexShare / timingShare)); RWMutex::sSamplingModulo = (int)(1.0 / samplingRate); return samplingRate; } //------------------------------------------------------------------------------ // Reset order checking rules //------------------------------------------------------------------------------ void RWMutex::ResetOrderRule() { bool sav = sEnableGlobalOrderCheck; sEnableGlobalOrderCheck = false; // Leave some time to all the threads to finish their book keeping activity // regarding order checking usleep(100000); pthread_rwlock_wrlock(&mOrderChkLock); // Remove all the dead threads from the map // ~~~~~~~~~~~~~~~~~~~~~~~~~~ NOTICE ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // !!! THIS DOESN'T WORK SO DEAD THREADS ARE NOT REMOVED FROM THE MAP // !!! THIS IS BECAUSE THERE IS NO RELIABLE WAY TO CHECK IF A THREAD IS STILL // !!! RUNNING. THIS IS NOT A PROBLEM FOR EOS BECAUSE XROOTD REUSES ITS THREADS // !!! SO THE MAP DOESN'T GO INTO AN INFINITE GROWTH. // ~~~~~~~~~~~~~~~~~~~~~~~~~~ NOTICE ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ #if 0 for (auto it = threadOrderCheckResetFlags_static.begin(); it != threadOrderCheckResetFlags_static.end(); ++it) { if (XrdSysThread::Signal(it->first, 0)) { // this line crashes when the threads is no more valid. threadOrderCheckResetFlags_static.erase(it); it = threadOrderCheckResetFlags_static.begin(); } } #endif // Tell the threads to reset the states of the order mask (because it's thread-local) for (auto it = threadOrderCheckResetFlags_static->begin(); it != threadOrderCheckResetFlags_static->end(); ++it) { it->second = true; } // Tell all the RWMutex that they are not involved in any order checking anymore for (auto rit = rules_static->begin(); rit != rules_static->end(); ++rit) { for (auto it = rit->second.begin(); it != rit->second.end(); ++it) { // For each RWMutex involved in that rule static_cast(*it)->nrules = 0; // no rule involved } } // Clear the manager side. ruleName2Index_static->clear(); ruleIndex2Name_static->clear(); rules_static->clear(); pthread_rwlock_unlock(&mOrderChkLock); sEnableGlobalOrderCheck = sav; } //------------------------------------------------------------------------------ // Remove an order checking rule //------------------------------------------------------------------------------ int RWMutex::RemoveOrderRule(const std::string& rulename) { // Make a local copy of the rules and remove the required rule std::map > rules = (*rules_static); if (!rules.erase(rulename)) { return 0; } // Reset the rules ResetOrderRule(); // Add all the rules but the removed one for (auto it = rules.begin(); it != rules.end(); ++it) { AddOrderRule(it->first, it->second); } return 1; } //------------------------------------------------------------------------------ // Compute the cost in time of taking timings so that it can be compensated in // the statistics //------------------------------------------------------------------------------ size_t RWMutex::EstimateTimingCompensation(size_t loopsize) { size_t t = Timing::GetNowInNs(); for (unsigned long k = 0; k < loopsize; k++) { struct timespec ts; eos::common::Timing::GetTimeSpec(ts); } t = Timing::GetNowInNs() - t; return size_t(double(t) / loopsize); } //------------------------------------------------------------------------------ // Compute the speed for lock/unlock cycle //------------------------------------------------------------------------------ size_t RWMutex::EstimateLockUnlockDuration(size_t loopsize) { RWMutex mutex; bool sav = RWMutex::GetTimingGlobal(); bool sav2 = RWMutex::GetOrderCheckingGlobal(); RWMutex::SetTimingGlobal(false); RWMutex::SetOrderCheckingGlobal(false); mutex.SetTiming(false); mutex.SetSampling(false); size_t t = Timing::GetNowInNs(); for (size_t k = 0; k < loopsize; k++) { mutex.LockWrite(); mutex.UnLockWrite(); } t = Timing::GetNowInNs() - t; RWMutex::SetTimingGlobal(sav); RWMutex::SetOrderCheckingGlobal(sav2); return size_t(double(t) / loopsize); } //------------------------------------------------------------------------------ // Compute the latency introduced by taking timings //------------------------------------------------------------------------------ size_t RWMutex::EstimateTimingAddedLatency(size_t loopsize, bool globaltiming) { RWMutex mutex; bool sav = RWMutex::GetTimingGlobal(); bool sav2 = RWMutex::GetOrderCheckingGlobal(); RWMutex::SetTimingGlobal(globaltiming); RWMutex::SetOrderCheckingGlobal(false); mutex.SetTiming(true); mutex.SetSampling(true, 1.0); size_t t = Timing::GetNowInNs(); for (size_t k = 0; k < loopsize; k++) { mutex.LockWrite(); mutex.UnLockWrite(); } size_t s = Timing::GetNowInNs() - t; RWMutex::SetTimingGlobal(false); mutex.SetTiming(false); mutex.SetSampling(false); t = Timing::GetNowInNs(); for (size_t k = 0; k < loopsize; k++) { mutex.LockWrite(); mutex.UnLockWrite(); } t = Timing::GetNowInNs() - t; RWMutex::SetTimingGlobal(sav); RWMutex::SetOrderCheckingGlobal(sav2); return size_t(double(s - t) / loopsize); } //------------------------------------------------------------------------------ // Compute the latency introduced by checking the mutexes locking orders //------------------------------------------------------------------------------ size_t RWMutex::EstimateOrderCheckingAddedLatency(size_t nmutexes, size_t loopsize) { std::vector mutexes; mutexes.reserve(nmutexes); for (size_t i = 0; i < nmutexes; ++i) { mutexes.push_back(new RWMutex()); } std::vector order; order.reserve(nmutexes); for (auto& mtx : mutexes) { mtx->SetTiming(false); mtx->SetSampling(false); order.push_back(mtx); } RWMutex::AddOrderRule("estimaterule", order); bool sav = RWMutex::GetTimingGlobal(); bool sav2 = RWMutex::GetOrderCheckingGlobal(); RWMutex::SetTimingGlobal(false); RWMutex::SetOrderCheckingGlobal(true); size_t t = Timing::GetNowInNs(); for (size_t k = 0; k < loopsize; k++) { for (auto it = mutexes.begin(); it != mutexes.end(); ++it) { (*it)->LockWrite(); } for (auto it = mutexes.rbegin(); it != mutexes.rend(); ++it) { (*it)->UnLockWrite(); } } size_t s = Timing::GetNowInNs() - t; RWMutex::SetOrderCheckingGlobal(false); t = Timing::GetNowInNs(); for (size_t k = 0; k < loopsize; ++k) { for (auto it = mutexes.begin(); it != mutexes.end(); ++it) { (*it)->LockWrite(); } for (auto it = mutexes.rbegin(); it != mutexes.rend(); ++it) { (*it)->UnLockWrite(); } } t = Timing::GetNowInNs() - t; RWMutex::SetTimingGlobal(sav); RWMutex::SetOrderCheckingGlobal(sav2); RemoveOrderRule("estimaterule"); for (size_t i = 0; i < nmutexes; ++i) { delete mutexes[i]; } return size_t(double(s - t) / (loopsize * nmutexes)); } //------------------------------------------------------------------------------- // Estimate latencies and compensation //------------------------------------------------------------------------------ void RWMutex::EstimateLatenciesAndCompensation(size_t loopsize) { timingCompensation = EstimateTimingCompensation(loopsize); timingLatency = EstimateTimingAddedLatency(loopsize); orderCheckingLatency = EstimateOrderCheckingAddedLatency(3, loopsize); lockUnlockDuration = EstimateLockUnlockDuration(loopsize); std::cerr << " timing compensation = " << timingCompensation << std::endl; std::cerr << " timing latency = " << timingLatency << std::endl; std::cerr << " order latency = " << orderCheckingLatency << std::endl; std::cerr << " lock/unlock duration = " << lockUnlockDuration << std::endl; } //------------------------------------------------------------------------------ // Get the timing statistics at the instance level //------------------------------------------------------------------------------ void RWMutex::GetTimingStatistics(TimingStats& stats, bool compensate) { size_t compensation = (compensate ? timingCompensation : 0); stats.readLockCounterSample.store(mRdLockCounterSample.load()); stats.writeLockCounterSample.store(mWrLockCounterSample.load()); stats.averagewaitread = 0; if (mRdLockCounterSample.load() != 0) { double avg = (double(mRdCumulatedWait.load()) / mRdLockCounterSample.load() - compensation); if (avg > 0) { stats.averagewaitread = avg; } } stats.averagewaitwrite = 0; if (mWrLockCounterSample.load() != 0) { double avg = (double(mWrCumulatedWait.load()) / mWrLockCounterSample.load() - compensation); if (avg > 0) { stats.averagewaitwrite = avg; } } if (mRdMinWait.load() != std::numeric_limits::max()) { long long compensated = mRdMinWait.load() - compensation; if (compensated > 0) { stats.minwaitread = compensated; } else { stats.minwaitread = 0; } } else { stats.minwaitread = std::numeric_limits::max(); } if (mRdMaxWait.load() != std::numeric_limits::min()) { long long compensated = mRdMaxWait.load() - compensation; if (compensated > 0) { stats.maxwaitread = compensated; } else { stats.maxwaitread = 0; } } else { stats.maxwaitread = std::numeric_limits::min(); } if (mWrMinWait.load() != std::numeric_limits::max()) { long long compensated = mWrMinWait.load() - compensation; if (compensated > 0) { stats.minwaitwrite = compensated; } else { stats.minwaitwrite = 0; } } else { stats.minwaitwrite = std::numeric_limits::max(); } if (mWrMaxWait.load() != std::numeric_limits::min()) { long long compensated = mWrMaxWait.load() - compensation; if (compensated > 0) { stats.maxwaitwrite = compensated; } else { stats.maxwaitwrite = 0; } } else { stats.maxwaitwrite = std::numeric_limits::min(); } } //------------------------------------------------------------------------------ // Check the order defined by the rules and update //------------------------------------------------------------------------------ void RWMutex::OrderViolationMessage(unsigned char rule, const std::string& message) { void* array[10]; unsigned long threadid = XrdSysThread::Num(); // Get void*'s for all entries on the stack size_t size = backtrace(array, 10); const std::string& rulename = (*ruleIndex2Name_static)[ruleLocalIndexToGlobalIndex[rule]]; fprintf(stderr, "RWMutex: Order Checking Error in thread %lu\n %s\n in rule " "%s :\nLocking Order should be:\n", threadid, message.c_str(), rulename.c_str()); std::vector order = (*rules_static)[rulename]; for (auto ito = order.begin(); ito != order.end(); ++ito) { fprintf(stderr, "\t%12s (%p)", static_cast(*ito)->mName.c_str(), (*ito)); } fprintf(stderr, "\nThe lock states of these mutexes are (before the violating" " lock/unlock) :\n"); for (unsigned char k = 0; k < order.size(); k++) { unsigned long int mask = (1 << k); fprintf(stderr, "\t%d", int((ordermask_staticthread[rule] & mask) != 0)); } fprintf(stderr, "\n"); backtrace_symbols_fd(array, size, 2); } //------------------------------------------------------------------------------ // Check the orders defined by the rules and update for a lock //------------------------------------------------------------------------------ void RWMutex::CheckAndLockOrder() { // Initialize the thread local ordermask if not already done if (orderCheckReset_staticthread == NULL) { ResetCheckOrder(); } if (*orderCheckReset_staticthread) { ResetCheckOrder(); *orderCheckReset_staticthread = false; } for (unsigned char k = 0; k < nrules; k++) { unsigned long int mask = (1 << rankinrule[k]); // Check if following mutex is already locked in the same thread if (ordermask_staticthread[k] >= mask) { char strmess[1024]; sprintf(strmess, "locking %s at address %p", mName.c_str(), this); OrderViolationMessage(k, strmess); } ordermask_staticthread[k] |= mask; } } //------------------------------------------------------------------------------ // Check the orders defined by the rules and update for an unlock //------------------------------------------------------------------------------ void RWMutex::CheckAndUnlockOrder() { // Initialize the thread local ordermask if not already done if (orderCheckReset_staticthread == NULL) { ResetCheckOrder(); } if (*orderCheckReset_staticthread) { ResetCheckOrder(); *orderCheckReset_staticthread = false; } for (unsigned char k = 0; k < nrules; k++) { unsigned long int mask = (1 << rankinrule[k]); if (0) { // we don't care about unlocking order violations, there is no problem with that // check if following mutex is already locked in the same thread if (ordermask_staticthread[k] >= (mask << 1)) { char strmess[1024]; sprintf(strmess, "unlocking %s at address %p", mName.c_str(), this); OrderViolationMessage(k, strmess); } } ordermask_staticthread[k] &= (~mask); } } //------------------------------------------------------------------------------ // Get the timing statistics at the class level //------------------------------------------------------------------------------ void RWMutex::GetTimingStatisticsGlobal(TimingStats& stats, bool compensate) { size_t compensation = compensate ? timingCompensation : 0; stats.readLockCounterSample.store(mRdLockCounterSample_static.load()); stats.writeLockCounterSample.store(mWrLockCounterSample_static.load()); stats.averagewaitread = 0; if (mRdLockCounterSample_static.load() != 0) { double avg = (double(mRdCumulatedWait_static.load()) / mRdLockCounterSample_static.load() - compensation); if (avg > 0) { stats.averagewaitread = avg; } } stats.averagewaitwrite = 0; if (mWrLockCounterSample_static.load() != 0) { double avg = (double(mWrCumulatedWait_static.load()) / mWrLockCounterSample_static.load() - compensation); if (avg > 0) { stats.averagewaitwrite = avg; } } if (mRdMinWait_static.load() != std::numeric_limits::max()) { long long compensated = mRdMinWait_static.load() - compensation; if (compensated > 0) { stats.minwaitread = compensated; } else { stats.minwaitread = 0; } } else { stats.minwaitread = std::numeric_limits::max(); } if (mWrMaxWait_static.load() != std::numeric_limits::min()) { long long compensated = mWrMaxWait_static.load() - compensation; if (compensated > 0) { stats.maxwaitread = compensated; } else { stats.maxwaitread = 0; } } else { stats.maxwaitread = std::numeric_limits::min(); } if (mWrMinWait_static.load() != std::numeric_limits::max()) { long long compensated = mWrMinWait_static.load() - compensation; if (compensated > 0) { stats.minwaitwrite = compensated; } else { stats.minwaitwrite = 0; } } else { stats.minwaitwrite = std::numeric_limits::max(); } if (mWrMaxWait_static.load() != std::numeric_limits::min()) { long long compensated = mWrMaxWait_static.load() - compensation; if (compensated > 0) { stats.maxwaitwrite = compensated; } else { stats.maxwaitwrite = 0; } } else { stats.maxwaitwrite = std::numeric_limits::min(); } } //------------------------------------------------------------------------------ // Add or overwrite an order checking rule //------------------------------------------------------------------------------ int RWMutex::AddOrderRule(const std::string& rulename, const std::vector& order) { bool sav = sEnableGlobalOrderCheck; sEnableGlobalOrderCheck = false; // Leave time to all the threads to finish their book-keeping activity // regarding order checking usleep(100000); pthread_rwlock_wrlock(&mOrderChkLock); // If we reached the max number of rules, ignore if (rules_static->size() == EOS_RWMUTEX_ORDER_NRULES || order.size() > 63) { sEnableGlobalOrderCheck = sav; pthread_rwlock_unlock(&mOrderChkLock); return -1; } (*rules_static)[rulename] = order; int ruleIdx = rules_static->size() - 1; // update the maps (*ruleName2Index_static)[rulename] = ruleIdx; (*ruleIndex2Name_static)[ruleIdx] = rulename; // update each object unsigned char count = 0; for (auto it = order.begin(); it != order.end(); it++) { // ruleIdx begin at 0 // Each RWMutex has its own number of rules, they are all <= than // EOS_RWMUTEX_ORDER_NRULES static_cast (*it)->rankinrule[static_cast (*it)->nrules] = count; static_cast (*it)->ruleLocalIndexToGlobalIndex[static_cast (*it)->nrules++] = ruleIdx; count++; } pthread_rwlock_unlock(&mOrderChkLock); sEnableGlobalOrderCheck = sav; return 0; } //------------------------------------------------------------------------------ // Reset the order checking mechanism for the current thread //------------------------------------------------------------------------------ void RWMutex::ResetCheckOrder() { // Reset the order mask for (int k = 0; k < EOS_RWMUTEX_ORDER_NRULES; k++) { ordermask_staticthread[k] = 0; } // Update orderCheckReset_staticthread, this memory should be specific to // this thread pthread_t tid = XrdSysThread::ID(); pthread_rwlock_rdlock(&mOrderChkLock); if (threadOrderCheckResetFlags_static->find(tid) == threadOrderCheckResetFlags_static->end()) { pthread_rwlock_unlock(&mOrderChkLock); pthread_rwlock_wrlock(&mOrderChkLock); (*threadOrderCheckResetFlags_static)[tid] = false; } orderCheckReset_staticthread = &(*threadOrderCheckResetFlags_static)[tid]; pthread_rwlock_unlock(&mOrderChkLock); } #endif //------------------------------------------------------------------------------ // Record mutex operation type //------------------------------------------------------------------------------ void RWMutex::RecordMutexOp(uint64_t ptr_val, LOCK_T op) { #ifdef EOS_INSTRUMENTED_RWMUTEX // Only record info about the named mutexes if (sMtxNameMap.find(ptr_val) == sMtxNameMap.end()) { return; } pid_t tid = syscall(SYS_gettid); std::unique_lock lock(sOpMutex); auto& mtx_op_map = sTidMtxOpMap[tid]; mtx_op_map[ptr_val] = op; #endif // EOS_INSTRUMENTED_MUTEX } //------------------------------------------------------------------------------ // Print the status of the mutex locks for the calling thread id //------------------------------------------------------------------------------ void RWMutex::PrintMutexOps(std::ostringstream& oss) { #ifdef EOS_INSTRUMENTED_RWMUTEX pid_t tid = syscall(SYS_gettid); std::unique_lock lock(sOpMutex); const auto it = sTidMtxOpMap.find(tid); if (it == sTidMtxOpMap.end()) { return; } for (const auto& elem : it->second) { std::string name; if (RWMutex::sMtxNameMap.count(elem.first)) { oss << RWMutex::sMtxNameMap[elem.first] << ": " << eos::common::RWMutex::LOCK_STATE[(int)elem.second] << " "; } else { oss << elem.first << ": " << eos::common::RWMutex::LOCK_STATE[(int)elem.second] << " "; } } #endif // EOS_INSTRUMENTED_MUTEX } //------------------------------------------------------------------------------ // ***** Class RWMutexWriteLock ***** //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ // Constructor //------------------------------------------------------------------------------ RWMutexWriteLock::RWMutexWriteLock(RWMutex& mutex, const char* function, const char* file, int line): mWrMutex(nullptr) { Grab(mutex, function, file, line); } //---------------------------------------------------------------------------- // Grab mutex and write lock it //---------------------------------------------------------------------------- void RWMutexWriteLock::Grab(RWMutex& mutex, const char* function, const char* file, int line) { mFunction = function; mFile = file; mLine = line; if (mWrMutex) { throw std::runtime_error("already holding a mutex"); } mWrMutex = &mutex; RWMutex::RecordMutexOp((uint64_t)mWrMutex->GetRawPtr(), RWMutex::LOCK_T::eWantLockWrite); mWrMutex->LockWrite(); RWMutex::RecordMutexOp((uint64_t)mWrMutex->GetRawPtr(), RWMutex::LOCK_T::eLockWrite); mAcquiredAt = std::chrono::steady_clock::now(); mAcquiredAtSystem = std::chrono::system_clock::now(); } //---------------------------------------------------------------------------- // Release the write lock after grab //---------------------------------------------------------------------------- void RWMutexWriteLock::Release() { if (mWrMutex) { RWMutex::RecordMutexOp((uint64_t)mWrMutex->GetRawPtr(), RWMutex::LOCK_T::eWantUnLockWrite); mWrMutex->UnLockWrite(); RWMutex::RecordMutexOp((uint64_t)mWrMutex->GetRawPtr(), RWMutex::LOCK_T::eNone); int64_t blockedinterval = mWrMutex->BlockedForMsInterval(); mWrMutex->AddWriteLockTime(blockedinterval); bool blockedtracing = mWrMutex->BlockedStackTracing(); mReleasedAt = std::chrono::steady_clock::now(); mReleasedAtSystem = std::chrono::system_clock::now(); mWrMutex->addBlockingTimeInfos(mAcquiredAtSystem,mReleasedAtSystem); std::chrono::milliseconds blockedFor = std::chrono::duration_cast (mReleasedAt - mAcquiredAt); if (blockedFor.count() > blockedinterval) { std::ostringstream ss; ss << "write lock [ " << mWrMutex->getName() << " ] held for " << blockedFor.count() << " milliseconds"; if (blockedtracing) { ss << " : "; ss << eos::common::getStacktrace(); } eos_third_party_warning(mFunction, mFile, mLine, "%s", ss.str().c_str()); } mWrMutex = nullptr; } } //------------------------------------------------------------------------------ // ***** Class RWMutexReadLock ***** //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ // Constructor //------------------------------------------------------------------------------ RWMutexReadLock::RWMutexReadLock(RWMutex& mutex, const char* function, const char* file, int line): mRdMutex(nullptr) { Grab(mutex, function, file, line); } //---------------------------------------------------------------------------- // Grab mutex and write lock it //---------------------------------------------------------------------------- void RWMutexReadLock::Grab(RWMutex& mutex, const char* function, const char* file, int line) { mFunction = function; mLine = line; mFile = file; if (mRdMutex) { throw std::runtime_error("already holding a mutex"); } mRdMutex = &mutex; RWMutex::RecordMutexOp((uint64_t)mRdMutex->GetRawPtr(), RWMutex::LOCK_T::eWantLockRead); mRdMutex->LockRead(); RWMutex::RecordMutexOp((uint64_t)mRdMutex->GetRawPtr(), RWMutex::LOCK_T::eLockRead); // mAcquiredAt must be updated _after_ we get the lock, since LockRead // may take a long time to complete mAcquiredAt = std::chrono::steady_clock::now(); } //------------------------------------------------------------------------------ // Release the read lock after grab //------------------------------------------------------------------------------ void RWMutexReadLock::Release() { if (mRdMutex) { RWMutex::RecordMutexOp((uint64_t)mRdMutex->GetRawPtr(), RWMutex::LOCK_T::eWantUnLockRead); mRdMutex->UnLockRead(); RWMutex::RecordMutexOp((uint64_t)mRdMutex->GetRawPtr(), RWMutex::LOCK_T::eNone); int64_t blockedinterval = mRdMutex->BlockedForMsInterval(); mRdMutex->AddReadLockTime(blockedinterval); bool blockedtracing = mRdMutex->BlockedStackTracing(); std::chrono::milliseconds blockedFor = std::chrono::duration_cast (std::chrono::steady_clock::now() - mAcquiredAt); if (blockedFor.count() > blockedinterval) { std::ostringstream ss; ss << "read lock [ " << mRdMutex->getName() << " ] held for " << blockedFor.count() << " milliseconds"; if (blockedtracing) { ss << " : " << eos::common::getStacktrace(); } eos_third_party_warning(mFunction, mFile, mLine, "%s", ss.str().c_str()); } mRdMutex = nullptr; } } EOSCOMMONNAMESPACE_END