// ---------------------------------------------------------------------- // File: InFlightTracker.hh // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * quarkdb - a redis-like highly available key-value store * * Copyright (C) 2016 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #ifndef __QUARKDB_IN_FLIGHT_TRACKER_H__ #define __QUARKDB_IN_FLIGHT_TRACKER_H__ #include #include "Macros.hh" #include "CoreLocalArray.hh" namespace quarkdb { struct alignas(CoreLocal::kCacheLine) AlignedAtomicInt64_t { std::atomic value = {0}; }; //------------------------------------------------------------------------------ // Keep track of how many requests are currently in-flight. // It's also possible to use this as a barrier to further requests - useful // when shutting down. //------------------------------------------------------------------------------ class InFlightTracker { public: InFlightTracker(bool accepting = true) : acceptingRequests(accepting) {} int up() { // This contraption (hopefully) ensures that after setAcceptingRequests(false) // takes effect, the following guarantees hold: // - Any subsequent calls to up() will not increase inFlight. // - As soon as we observe an inFlight value of zero, no further requests // will be accepted. // // The second guarantee is necessary for wait(), which checks if inFlight // is zero to tell whether all in-flight requests have been dispatched. // If setAcceptingRequests takes effect here, the request is rejected, as expected. if(!acceptingRequests) return -1; // If setAcceptingRequests takes effect here, no problem. inFlight will // temporarily jump, but the request will be rejected. int coreIdx = inFlightArr.getCoreIndex(); inFlightArr.accessAtCore(coreIdx)->value++; // Same as before. if(!acceptingRequests) { // If we're here, it means setAcceptingRequests has already taken effect. inFlightArr.accessAtCore(coreIdx)->value--; return -1; } // If setAcceptingRequests takes effect here, no problem: // inFlight can NOT be zero at this point, and the spinner will wait. return coreIdx; } void down(int coreIdx) { inFlightArr.accessAtCore(coreIdx)->value--; qdb_assert(inFlightArr.accessAtCore(coreIdx)->value >= 0); } void setAcceptingRequests(bool value) { acceptingRequests = value; } bool isAcceptingRequests() const { return acceptingRequests; } void spinUntilNoRequestsInFlight() const { qdb_assert(!acceptingRequests); while(getInFlight() != 0) ; } int64_t getInFlight() const { int64_t inFlight = 0; for(size_t i = 0; i < inFlightArr.size(); i++) { inFlight += inFlightArr.accessAtCore(i)->value; } return inFlight; } private: std::atomic acceptingRequests {true}; CoreLocalArray inFlightArr; }; class InFlightRegistration { public: InFlightRegistration(InFlightTracker &tracker) : inFlightTracker(tracker) { coreIdx = inFlightTracker.up(); } ~InFlightRegistration() { if(ok()) { inFlightTracker.down(coreIdx); } } bool ok() { return coreIdx >= 0; } private: InFlightTracker &inFlightTracker; int coreIdx; }; } #endif