// ---------------------------------------------------------------------- // File: InFlightTracker.hh // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2019 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #pragma once #include "mgm/Namespace.hh" #include "common/Logging.hh" #include "common/VirtualIdentity.hh" #include "common/table_formatter/TableFormatterBase.hh" #include #include EOSMGMNAMESPACE_BEGIN //------------------------------------------------------------------------------ //! @brief Keep track of how many requests are currently in-flight. //! It's also possible to use this as a barrier to further requests - useful //! when shutting down. //------------------------------------------------------------------------------ class InFlightTracker: public eos::common::LogId { public: //---------------------------------------------------------------------------- //! Constructor //! //! @param accepting flag to mark that we should accept connections //---------------------------------------------------------------------------- InFlightTracker(bool accepting = true) : mAcceptingRequests(accepting) {} //---------------------------------------------------------------------------- //! Decide whether to account or not for a new connection. This helps to //! keep track of the number of threads inside a critical block of code. //---------------------------------------------------------------------------- bool Up(const eos::common::VirtualIdentity& vid) { // This contraption (hopefully) ensures that after setAcceptingRequests(false) // takes effect, the following guarantees hold: // - Any subsequent calls to up() will not increase mInFlight. // - As soon as we observe an mInFlight value of zero, no further requests // will be accepted. // // The second guarantee is necessary for wait(), which checks if mInFlight // is zero to tell whether all in-flight requests have been dispatched. // If setAcceptingRequests takes effect here, the request is rejected, as expected. if (!mAcceptingRequests) { return false; } // If setAcceptingRequests takes effect here, no problem. mInFlight will // temporarily jump, but the request will be rejected. ++mInFlight; // Same as before. if (!mAcceptingRequests) { // If we're here, it means setAcceptingRequests has already taken effect. --mInFlight; return false; } // If setAcceptingRequests takes effect here, no problem: // mInFlight can NOT be zero at this point, and the spinner will wait. pthread_t myself = pthread_self(); uid_t myuid = vid.uid; std::unique_lock scope_lock(mInFlightPidMutex); if (!mInFlightPids[myself]) { mInFlightUid[myself] = myuid; mInFlightVids[myuid]++; } mInFlightPids[myself]++; return true; } //---------------------------------------------------------------------------- //! Decrement number of inflight tracked requests //---------------------------------------------------------------------------- void Down() { --mInFlight; assert(mInFlight >= 0); pthread_t mythread = pthread_self(); std::unique_lock scope_lock(mInFlightPidMutex); uid_t myuid = mInFlightUid[mythread]; if (!(--mInFlightPids[mythread])) { mInFlightPids.erase(mythread); mInFlightUid.erase(mythread); if (mInFlightVids[myuid]) { if (!--mInFlightVids[myuid]) { mInFlightVids.erase(myuid); mInFlightStalls.erase(myuid); } } } } //---------------------------------------------------------------------------- //! Set if we should accept tracking new requests or not //! //! @param value if true enable tracking, otherwise refuse //---------------------------------------------------------------------------- void SetAcceptingRequests(bool value) { mAcceptingRequests = value; } //---------------------------------------------------------------------------- //! Check if we're accepting requests //---------------------------------------------------------------------------- bool IsAcceptingRequests() const { return mAcceptingRequests; } //---------------------------------------------------------------------------- //! Wait that there are no more tracked requests //---------------------------------------------------------------------------- void SpinUntilNoRequestsInFlight(bool print_log = false, const std::chrono::milliseconds wait_ms = std::chrono::milliseconds::zero()) const { assert(!mAcceptingRequests); auto num = GetInFlight(); while (num) { if (print_log) { eos_info("msg=\"waiting for %li in-flight requests to finish\"", num); } if (wait_ms.count()) { std::this_thread::sleep_for(wait_ms); } num = GetInFlight(); } } //---------------------------------------------------------------------------- //! Get number of in-flight tracked requests //---------------------------------------------------------------------------- int64_t GetInFlight() const { return mInFlight; } std::set getInFlightThreads() { std::unique_lock scope_lock(mInFlightPidMutex); std::set inflight_threads; for (auto it : mInFlightPids) { inflight_threads.insert(it.first); } return inflight_threads; } std::map getInFlightUids() { std::unique_lock scope_lock(mInFlightPidMutex); return mInFlightVids; } size_t getInFlight(uid_t uid) { std::unique_lock scope_lock(mInFlightPidMutex); auto it = mInFlightVids.find(uid); if (it != mInFlightVids.end()) { return it->second; } else { return 0; } } void incStalls(uid_t uid) { std::unique_lock scope_lock(mInFlightPidMutex); mInFlightStalls[uid]++; } size_t getStalls(uid_t uid) { std::unique_lock scope_lock(mInFlightPidMutex); auto it = mInFlightStalls.find(uid); if (it != mInFlightStalls.end()) { return it->second; } else { return 0; } } size_t getStallTime(uid_t uid, size_t& limit); //---------------------------------------------------------------------------- //! Dump user tracking //---------------------------------------------------------------------------- std::string PrintOut(bool monitoring) ; size_t ShouldStall(uid_t uid, bool& saturated, size_t& used_threads); private: std::atomic mAcceptingRequests {true}; std::atomic mInFlight {0}; std::map mInFlightPids; std::map mInFlightUid; std::map mInFlightVids; std::map mInFlightStalls; std::mutex mInFlightPidMutex; }; //------------------------------------------------------------------------------ //! @brief Class InFlightRegistraction helper to account for in-flight //! requests at scope level. //------------------------------------------------------------------------------ class InFlightRegistration { public: //---------------------------------------------------------------------------- //! Constructor //! //! @param tracke tracker object //---------------------------------------------------------------------------- InFlightRegistration(InFlightTracker& tracker, const eos::common::VirtualIdentity& vid) : mInFlightTracker(tracker) { mSucceeded = mInFlightTracker.Up(vid); } //---------------------------------------------------------------------------- //! Destructor - take care of doing the proper accounting when getting out //! of scope. //------------------------------------------------------------------------------ ~InFlightRegistration() { if (mSucceeded) { mInFlightTracker.Down(); } } //------------------------------------------------------------------------------ //! Check if current registration is actually tracked //! //! @return true if current registration is tracked, otherwise false //------------------------------------------------------------------------------ bool IsOK() { return mSucceeded; } std::set getThreads() { return mInFlightTracker.getInFlightThreads(); } private: InFlightTracker& mInFlightTracker; bool mSucceeded; }; EOSMGMNAMESPACE_END