//------------------------------------------------------------------------------
// File: PendingRequestVault.cc
// Author: Georgios Bitzes - CERN
//------------------------------------------------------------------------------
/************************************************************************
* qclient - A simple redis C++ client with support for redirects *
* Copyright (C) 2020 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "qclient/shared/PendingRequestVault.hh"
#include "qclient/utils/Macros.hh"
#include "../Uuid.hh"
#include
namespace qclient {
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
PendingRequestVault::PendingRequestVault() {}
//------------------------------------------------------------------------------
// Destructor
//------------------------------------------------------------------------------
PendingRequestVault::~PendingRequestVault() {}
//------------------------------------------------------------------------------
// Insert pending request
//------------------------------------------------------------------------------
PendingRequestVault::InsertOutcome PendingRequestVault::insert(const std::string &channel, const std::string &contents,
std::chrono::steady_clock::time_point timepoint) {
std::unique_lock lock(mMutex);
InsertOutcome outcome;
outcome.id = generateUuid();
auto mapIter = mPendingRequests.insert(std::make_pair(outcome.id, Item(outcome.id))).first;
mapIter->second.start = timepoint;
mapIter->second.lastRetry = timepoint;
mapIter->second.channel = channel;
mapIter->second.contents = contents;
outcome.fut = mapIter->second.promise.get_future();
mNextToRetry.push_back(outcome.id);
mapIter->second.listIter = mNextToRetry.end();
--mapIter->second.listIter;
mCV.notify_all();
qclient_assert(mPendingRequests.size() == mNextToRetry.size());
return outcome;
}
//------------------------------------------------------------------------------
// Satisfy pending request
//------------------------------------------------------------------------------
bool PendingRequestVault::satisfy(const RequestID &id, CommunicatorReply &&reply) {
std::unique_lock lock(mMutex);
auto mapIter = mPendingRequests.find(id);
if(mapIter == mPendingRequests.end()) {
return false;
}
mapIter->second.promise.set_value(std::move(reply));
mNextToRetry.erase(mapIter->second.listIter);
mPendingRequests.erase(mapIter);
qclient_assert(mPendingRequests.size() == mNextToRetry.size());
return true;
}
//------------------------------------------------------------------------------
// Get current pending requests
//------------------------------------------------------------------------------
size_t PendingRequestVault::size() const {
std::unique_lock lock(mMutex);
qclient_assert(mPendingRequests.size() == mNextToRetry.size());
return mPendingRequests.size();
}
//------------------------------------------------------------------------------
// Get earliest retry
// - Return value False: Vault is empty
// - Return value True: tp is filled with earliest lastRetry time_point
//------------------------------------------------------------------------------
bool PendingRequestVault::getEarliestRetry(std::chrono::steady_clock::time_point &tp) {
std::unique_lock lock(mMutex);
if(mPendingRequests.empty()) {
return false;
}
tp = mPendingRequests[mNextToRetry.front()].lastRetry;
return true;
}
//------------------------------------------------------------------------------
// Drop front item
//------------------------------------------------------------------------------
void PendingRequestVault::dropFront() {
mPendingRequests.erase(mNextToRetry.front());
mNextToRetry.pop_front();
qclient_assert(mPendingRequests.size() == mNextToRetry.size());
}
//------------------------------------------------------------------------------
// Expire any items which were submitted past the deadline.
// Only the original submission time counts here, not the retries.
//------------------------------------------------------------------------------
size_t PendingRequestVault::expire(std::chrono::steady_clock::time_point deadline) {
std::unique_lock lock(mMutex);
size_t expired = 0;
while(!mPendingRequests.empty()) {
if(mPendingRequests[mNextToRetry.front()].start <= deadline) {
dropFront();
expired++;
}
else {
break;
}
}
return expired;
}
//------------------------------------------------------------------------------
// Retry front item, if it exists
//------------------------------------------------------------------------------
bool PendingRequestVault::retryFrontItem(std::chrono::steady_clock::time_point now,
std::string &channel, std::string &contents, std::string &id) {
std::unique_lock lock(mMutex);
if(mPendingRequests.empty()) {
return false;
}
Item& item = mPendingRequests[mNextToRetry.front()];
channel = item.channel;
contents = item.contents;
id = item.id;
item.lastRetry = now;
mNextToRetry.splice(mNextToRetry.end(), mNextToRetry, mNextToRetry.begin());
qclient_assert(mPendingRequests.size() == mNextToRetry.size());
return true;
}
//------------------------------------------------------------------------------
// Set blocking mode
//------------------------------------------------------------------------------
void PendingRequestVault::setBlockingMode(bool val) {
std::unique_lock lock(mMutex);
mBlockingMode = val;
mCV.notify_all();
}
//------------------------------------------------------------------------------
// Block until there's an item in the queue
//------------------------------------------------------------------------------
void PendingRequestVault::blockUntilNonEmpty() {
std::unique_lock lock(mMutex);
while(mBlockingMode && mPendingRequests.empty()) {
mCV.wait(lock);
}
}
}