//------------------------------------------------------------------------------
// File: BackgroundFlusher.cc
// Author: Georgios Bitzes - CERN
//------------------------------------------------------------------------------
/************************************************************************
* qclient - A simple redis C++ client with support for redirects *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "qclient/BackgroundFlusher.hh"
#include "qclient/Utils.hh"
using namespace qclient;
#define SSTR(message) static_cast(std::ostringstream().flush() << message).str()
BackgroundFlusher::FlusherCallback::FlusherCallback(BackgroundFlusher *prnt) : parent(prnt) {}
void BackgroundFlusher::FlusherCallback::handleResponse(redisReplyPtr &&reply) {
if(reply == nullptr) {
//--------------------------------------------------------------------------
// The only valid case where we might legitimately receive a nullptr is
// during BackgroundFlusher shutdown.
//--------------------------------------------------------------------------
if(parent->inShutdown) {
return;
}
//--------------------------------------------------------------------------
// Nope, panic.
//--------------------------------------------------------------------------
parent->notifier.eventUnexpectedResponse("received nullptr in BackgroundFlusher::FlusherCallback::handleResponse, should never happen");
std::terminate();
}
if(reply->type == REDIS_REPLY_ERROR) {
std::string err(reply->str, reply->len);
parent->notifier.eventUnexpectedResponse(SSTR("Unexpected backend response: " << err));
std::terminate();
}
parent->itemWasAcknowledged();
}
BackgroundFlusher::~BackgroundFlusher() {
inShutdown = true;
}
BackgroundFlusher::BackgroundFlusher(Members members, qclient::Options &&opts,
Notifier ¬if, BackgroundFlusherPersistency *pers)
: persistency(pers),
callback(this),
options(std::move(opts)),
notifier(notif) {
//----------------------------------------------------------------------------
// Overwrite certain QClient options.
//----------------------------------------------------------------------------
options.transparentRedirects = true;
options.retryStrategy = RetryStrategy::InfiniteRetries();
//----------------------------------------------------------------------------
// Initialize QClient object.
//----------------------------------------------------------------------------
qclient.reset(new QClient(members, std::move(options)));
//----------------------------------------------------------------------------
// Restore contents from persistency layer, if there are any.
//----------------------------------------------------------------------------
for(ItemIndex i = persistency->getStartingIndex(); i != persistency->getEndingIndex(); i++) {
std::vector contents;
if(!persistency->retrieve(i, contents)) {
std::cerr << "BackgroundFlusher corruption, could not retrieve entry with index " << i << std::endl;
std::terminate();
}
qclient->execute(&callback, contents);
}
}
size_t BackgroundFlusher::size() const {
return persistency->getEndingIndex() - persistency->getStartingIndex();
}
// Return number of enqueued items since last time this function was called.
int64_t BackgroundFlusher::getEnqueuedAndClear() {
int64_t retvalue = enqueued.exchange(0);
return retvalue;
}
// Return number of acknowledged (dequeued) items since last time this function was called.
int64_t BackgroundFlusher::getAcknowledgedAndClear() {
int64_t retvalue = acknowledged.exchange(0);
return retvalue;
}
void BackgroundFlusher::pushRequest(const std::vector &operation) {
std::lock_guard lock(newEntriesMtx);
persistency->record(persistency->getEndingIndex(), operation);
qclient->execute(&callback, operation);
enqueued++;
}
void BackgroundFlusher::itemWasAcknowledged() {
{
std::lock_guard lock(newEntriesMtx);
persistency->pop();
}
acknowledged++;
acknowledgementCV.notify_all();
}