//------------------------------------------------------------------------------
// File: PersistentSharedHash.cc
// Author: Georgios Bitzes - CERN
//------------------------------------------------------------------------------
/************************************************************************
* qclient - A simple redis C++ client with support for redirects *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "qclient/shared/PersistentSharedHash.hh"
#include "qclient/Logger.hh"
#include "qclient/utils/Macros.hh"
#include "qclient/MultiBuilder.hh"
#include "qclient/shared/SharedManager.hh"
#include "qclient/QClient.hh"
#include "qclient/SSTR.hh"
#include "qclient/pubsub/Subscriber.hh"
#include "qclient/pubsub/Message.hh"
#include "qclient/ResponseBuilder.hh"
#include "qclient/shared/SharedHashSubscription.hh"
#include
namespace qclient {
//------------------------------------------------------------------------------
// Constructor - supply a SharedManager object. I'll keep a reference to it
// throughout my lifetime - don't destroy it before me!
//------------------------------------------------------------------------------
PersistentSharedHash::PersistentSharedHash(SharedManager *sm_, const std::string &key_,
const std::shared_ptr &sub)
: sm(sm_), key(key_), currentVersion(0u) {
mHashSubscriber = sub;
logger = sm->getLogger();
qcl = sm->getQClient();
qcl->attachListener(this);
subscription = sm->getSubscriber()->subscribe(SSTR("__vhash@" << key));
using namespace std::placeholders;
subscription->attachCallback(std::bind(&PersistentSharedHash::processIncoming, this, _1));
triggerResilvering();
}
//------------------------------------------------------------------------------
// Destructor
//------------------------------------------------------------------------------
PersistentSharedHash::~PersistentSharedHash()
{
if (qcl) {
qcl->detachListener(this);
}
}
//------------------------------------------------------------------------------
// Read contents of the specified field.
//
// Eventually consistent read - it could be that a different client has
// set this field to a different value _and received an acknowledgement_ at
// the time we call get(), but our local value has not been updated yet
// due to network latency.
//
// Returns true if found, false otherwise.
//------------------------------------------------------------------------------
bool
PersistentSharedHash::get(const std::string &field, std::string& value)
{
checkFuture();
std::shared_lock lock(contentsMutex);
auto it = contents.find(field);
if(it == contents.end()) {
return false;
}
value = it->second;
return true;
}
//------------------------------------------------------------------------------
// Get vector of keys in the current hash
//------------------------------------------------------------------------------
std::vector
PersistentSharedHash::getKeys()
{
checkFuture();
std::vector keys;
std::shared_lock lock(contentsMutex);
for (const auto& elem: contents) {
keys.push_back(elem.first);
}
return keys;
}
//------------------------------------------------------------------------------
// Get contents of the hash
//------------------------------------------------------------------------------
std::map
PersistentSharedHash::getContents()
{
checkFuture();
std::shared_lock lock(contentsMutex);
return contents;
}
//------------------------------------------------------------------------------
// Set contents of the specified field, or batch of values.
// Not guaranteed to succeed in case of network instabilities.
//------------------------------------------------------------------------------
std::future PersistentSharedHash::set(const std::string &field, const std::string &value) {
std::map batch;
batch[field] = value;
return this->set(batch);
}
std::future PersistentSharedHash::set(const std::map &batch) {
qclient::MultiBuilder multi;
for(auto it = batch.begin(); it != batch.end(); it++) {
if(it->second.empty()) {
multi.emplace_back("VHDEL", key, it->first);
}
else {
multi.emplace_back("VHSET", key, it->first, it->second);
}
}
return sm->getQClient()->execute(multi.getDeque());
}
//------------------------------------------------------------------------------
// Delete the specified field.
// Not guaranteed to succeed in case of network instabilities.
//------------------------------------------------------------------------------
std::future PersistentSharedHash::del(const std::string &field) {
std::map batch;
batch[field] = "";
return this->set(batch);
}
//------------------------------------------------------------------------------
// Get current version
//------------------------------------------------------------------------------
uint64_t PersistentSharedHash::getCurrentVersion() {
checkFuture();
std::shared_lock lock(contentsMutex);
return currentVersion;
}
//------------------------------------------------------------------------------
// Listen for reconnection events
//------------------------------------------------------------------------------
void PersistentSharedHash::notifyConnectionLost(int64_t epoch, int errc, const std::string &msg) {}
void PersistentSharedHash::notifyConnectionEstablished(int64_t epoch) {
triggerResilvering();
checkFuture();
}
//------------------------------------------------------------------------------
// Asynchronously trigger resilvering
//------------------------------------------------------------------------------
void PersistentSharedHash::triggerResilvering() {
std::lock_guard lock(futureReplyMtx);
futureReply = qcl->exec("VHGETALL", key);
}
//------------------------------------------------------------------------------
// Check future
//------------------------------------------------------------------------------
void PersistentSharedHash::checkFuture() {
std::lock_guard lock(futureReplyMtx);
if(futureReply.valid() && futureReply.wait_for(std::chrono::seconds(0)) == std::future_status::ready) {
handleResponse(futureReply.get());
}
}
//------------------------------------------------------------------------------
// Parse serialized version + string map
//------------------------------------------------------------------------------
bool PersistentSharedHash::parseReply(redisReplyPtr &reply, uint64_t &revision, std::map &contents) {
contents.clear();
if(reply == nullptr || reply->type != REDIS_REPLY_ARRAY || reply->elements != 2) {
return false;
}
if(reply->element[0]->type != REDIS_REPLY_INTEGER) {
return false;
}
revision = reply->element[0]->integer;
redisReply *contentArray = reply->element[1];
if(!contentArray || contentArray->type != REDIS_REPLY_ARRAY || contentArray->elements % 2 != 0) {
return false;
}
for(size_t i = 0; i < contentArray->elements; i += 2) {
if(contentArray->element[i]->type != REDIS_REPLY_STRING || contentArray->element[i+1]->type != REDIS_REPLY_STRING) {
return false;
}
std::string key;
std::string value;
key = std::string(contentArray->element[i]->str, contentArray->element[i]->len);
value = std::string(contentArray->element[i+1]->str, contentArray->element[i+1]->len);
contents[key] = value;
}
return true;
}
//------------------------------------------------------------------------------
// Listen for resilvering responses
//------------------------------------------------------------------------------
void PersistentSharedHash::handleResponse(redisReplyPtr &&reply) {
uint64_t revision;
std::map contents;
if(!parseReply(reply, revision, contents)) {
QCLIENT_LOG(logger, LogLevel::kWarn, "SharedHash could not parse incoming resilvering message: " <<
qclient::describeRedisReply(reply));
return;
}
//----------------------------------------------------------------------------
// VHGETALL parsed successfully, apply
//----------------------------------------------------------------------------
return resilver(revision, std::move(contents));
}
//------------------------------------------------------------------------------
// Process incoming message
//------------------------------------------------------------------------------
void PersistentSharedHash::processIncoming(Message &&msg) {
checkFuture();
if(msg.getMessageType() != MessageType::kMessage) return;
redisReplyPtr payload = ResponseBuilder::parseRedisEncodedString(msg.getPayload());
if(!payload) return;
uint64_t revision;
std::map update;
if(!parseReply(payload, revision, update)) {
QCLIENT_LOG(logger, LogLevel::kWarn, "SharedHash could not parse incoming revision update: " <<
qclient::describeRedisReply(payload));
return;
}
// Payload parsed successfully, apply
if(!feedRevision(revision, update)) {
triggerResilvering();
}
}
//------------------------------------------------------------------------------
// Feed a single key-value update. Assumes lock is taken.
//------------------------------------------------------------------------------
void PersistentSharedHash::feedSingleKeyValue(const std::string &key, const std::string &value) {
if(value.empty()) {
// Deletion
contents.erase(key);
return;
}
// Insert
contents[key] = value;
}
//------------------------------------------------------------------------------
// Notify the hash of a new update. Two possibilities:
// - The hash is up-to-date, and is able to apply this revision. This
// function returns true.
// - The hash is out-of-date, and needs to be reset with the complete
// contents. The change is not applied - a return value of false means
// "please bring me up-to-date by calling resilver function"
//------------------------------------------------------------------------------
bool PersistentSharedHash::feedRevision(uint64_t revision, const std::map &updates) {
std::unique_lock lock(contentsMutex);
if(revision <= currentVersion) {
// I have a newer version than current revision, nothing to do
return true;
}
if(revision >= currentVersion+2) {
// We have a discontinuity in received revisions, cannot bring up to date
// Warn, because this should not happen often, means network instability
QCLIENT_LOG(logger, LogLevel::kWarn, "SharedHash with key " << key <<
" went out of date; received revision " << revision << ", but my last " <<
"version is " << currentVersion << ", asking for resilvering");
return false;
}
qclient_assert(revision == currentVersion+1);
for(auto it = updates.begin(); it != updates.end(); it++) {
feedSingleKeyValue(it->first, it->second);
}
currentVersion = revision;
lock.unlock();
if(mHashSubscriber) {
for(auto it = updates.begin(); it != updates.end(); it++) {
qclient::SharedHashUpdate hashUpdate;
hashUpdate.key = it->first;
hashUpdate.value = it->second;
mHashSubscriber->feedUpdate(hashUpdate);
}
}
return true;
}
//------------------------------------------------------------------------------
// Same as above, but the given revision updates only a single
// key-value pair
//------------------------------------------------------------------------------
bool PersistentSharedHash::feedRevision(uint64_t revision, const std::string &key,
const std::string &value) {
std::map batch;
batch[key] = value;
return feedRevision(revision, batch);
}
//------------------------------------------------------------------------------
// "Resilver" แนซhe hash, flushing all previous contents with new ones.
//------------------------------------------------------------------------------
void
PersistentSharedHash::resilver(uint64_t revision,
std::map &&newContents)
{
std::unique_lock lock(contentsMutex);
QCLIENT_LOG(logger, LogLevel::kWarn, "SharedHash with key " << key <<
" being resilvered with revision " << revision << " from " << currentVersion);
currentVersion = revision;
contents = std::move(newContents);
if(mHashSubscriber) {
for(auto it = contents.begin(); it != contents.end(); it++) {
qclient::SharedHashUpdate hashUpdate;
hashUpdate.key = it->first;
hashUpdate.value = it->second;
mHashSubscriber->feedUpdate(hashUpdate);
}
}
}
}