//------------------------------------------------------------------------------ // File: Communicator.cc // Author: Georgios Bitzes - CERN //------------------------------------------------------------------------------ /************************************************************************ * qclient - A simple redis C++ client with support for redirects * * Copyright (C) 2020 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "qclient/shared/Communicator.hh" #include "qclient/pubsub/Subscriber.hh" #include "qclient/pubsub/Message.hh" #include "SharedSerialization.hh" #include "qclient/SSTR.hh" #include "qclient/utils/Macros.hh" #include "qclient/utils/SteadyClock.hh" #include "qclient/Debug.hh" namespace qclient { //------------------------------------------------------------------------------ // Convenience class for point-to-point request / response messaging //------------------------------------------------------------------------------ Communicator::Communicator(Subscriber* subscriber, const std::string &channel, SteadyClock* clock, std::chrono::milliseconds retryInterval, std::chrono::seconds deadline) : mSubscriber(subscriber), mChannel(channel), mClock(clock), mQcl(mSubscriber->getQcl()), mRetryInterval(retryInterval), mHardDeadline(deadline) { mSubscription = mSubscriber->subscribe(mChannel); using namespace std::placeholders; mSubscription->attachCallback(std::bind(&Communicator::processIncoming, this, _1)); if(!mClock || !mClock->isFake()) { mThread.reset(&Communicator::backgroundThread, this); } } //------------------------------------------------------------------------------ // Destructor //------------------------------------------------------------------------------ Communicator::~Communicator() { mPendingVault.setBlockingMode(false); } //------------------------------------------------------------------------------ // Cleanup and retry thread //------------------------------------------------------------------------------ void Communicator::backgroundThread(ThreadAssistant &assistant) { while(!assistant.terminationRequested()) { std::chrono::steady_clock::time_point earliestRetry; if(!mPendingVault.getEarliestRetry(earliestRetry)) { // Pending vault empty, sleep mPendingVault.blockUntilNonEmpty(); continue; } std::chrono::steady_clock::time_point now = SteadyClock::now(mClock); if(earliestRetry+mRetryInterval > now) { // Not there yet, need to wait a bit more std::chrono::milliseconds nextRetryIn = std::chrono::duration_cast(now - (earliestRetry+mRetryInterval)); assistant.wait_for(nextRetryIn); continue; } std::string channel, contents, id; if(runNextToRetry(channel, contents, id) && mQcl) { // Go mQcl->exec("PUBLISH", channel, serializeCommunicatorRequest(id, contents)); } } } //------------------------------------------------------------------------------ // Issue a request on the given channel //------------------------------------------------------------------------------ std::future Communicator::issue(const std::string &contents) { std::string unused; return issue(contents, unused); } //------------------------------------------------------------------------------ // Issue a request on the given channel, retrieve ID too //------------------------------------------------------------------------------ std::future Communicator::issue(const std::string &contents, std::string &id) { PendingRequestVault::InsertOutcome outcome = mPendingVault.insert(mChannel, contents, SteadyClock::now(mClock)); id = outcome.id; if(mQcl) { mQcl->exec("PUBLISH", mChannel, serializeCommunicatorRequest(outcome.id, contents)); } return std::move(outcome.fut); } //------------------------------------------------------------------------------ // Run next-to-retry pass // // Return value: // - False: Nothing to retry // - True: We have something to retry //------------------------------------------------------------------------------ bool Communicator::runNextToRetry(std::string &channel, std::string &contents, std::string &id) { mPendingVault.expire(SteadyClock::now(mClock) - mHardDeadline); std::chrono::steady_clock::time_point earliestRetry; if(!mPendingVault.getEarliestRetry(earliestRetry)) { // Empty, nothing to retry return false; } // Are we at least mRetryInterval ahead of last retry? if(earliestRetry+mRetryInterval > SteadyClock::now(mClock)) { return false; } // Let's do it return mPendingVault.retryFrontItem(SteadyClock::now(mClock), channel, contents, id); } //------------------------------------------------------------------------------ // Process incoming message //------------------------------------------------------------------------------ void Communicator::processIncoming(Message &&msg) { if(msg.getMessageType() != MessageType::kMessage) return; if(msg.getChannel() != mChannel) return; std::string uuid; CommunicatorReply reply; if(parseCommunicatorReply(msg.getPayload(), reply, uuid)) { mPendingVault.satisfy(uuid, std::move(reply)); } } }