//------------------------------------------------------------------------------
// File: TransientSharedHash.cc
// Author: Georgios Bitzes - CERN
//------------------------------------------------------------------------------
/************************************************************************
* qclient - A simple redis C++ client with support for redirects *
* Copyright (C) 2019 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "qclient/shared/TransientSharedHash.hh"
#include "qclient/pubsub/Subscriber.hh"
#include "qclient/pubsub/Message.hh"
#include "SharedSerialization.hh"
#include "qclient/Logger.hh"
#include "qclient/shared/SharedManager.hh"
#include "qclient/shared/SharedHashSubscription.hh"
namespace qclient {
//------------------------------------------------------------------------------
// Destructor
//------------------------------------------------------------------------------
TransientSharedHash::~TransientSharedHash()
{
// empty
}
//------------------------------------------------------------------------------
//! Private constructor - use SharedManager to instantiate this object.
//------------------------------------------------------------------------------
TransientSharedHash::TransientSharedHash(SharedManager *sm,
const std::string &chan, std::unique_ptr sub,
const std::shared_ptr &hashSub)
: sharedManager(sm), channel(chan), subscription(std::move(sub)), mHashSubscriber(hashSub) {
using namespace std::placeholders;
subscription->attachCallback(std::bind(&TransientSharedHash::processIncoming, this, _1));
}
//------------------------------------------------------------------------------
// Process incoming message
//------------------------------------------------------------------------------
void TransientSharedHash::processIncoming(Message &&msg) {
if(msg.getMessageType() != MessageType::kMessage || msg.getChannel() != channel) {
// Ignore, message does not concern us
return;
}
std::map incomingBatch;
if(!parseBatch(msg.getPayload(), incomingBatch)) {
QCLIENT_LOG(logger, LogLevel::kError, "Could not parse message payload (length "
<< msg.getPayload().size() << ") received in channel "
<< channel << ", ignoring");
return;
}
// Batch received and parsed, apply
std::unique_lock lock(contentsMtx);
for(auto it = incomingBatch.begin(); it != incomingBatch.end(); it++) {
contents[it->first] = it->second;
}
lock.unlock();
// Notify subscriber
if(mHashSubscriber) {
for(auto it = incomingBatch.begin(); it != incomingBatch.end(); it++) {
SharedHashUpdate hashUpdate;
hashUpdate.key = it->first;
hashUpdate.value = it->second;
mHashSubscriber->feedUpdate(hashUpdate);
}
}
}
//------------------------------------------------------------------------------
// Set key to the given value.
//------------------------------------------------------------------------------
void TransientSharedHash::set(const std::string &key, const std::string &value)
{
std::map batch;
batch[key] = value;
set(batch);
}
//------------------------------------------------------------------------------
// Set a batch of key-value pairs.
//------------------------------------------------------------------------------
void TransientSharedHash::set(const std::map &batch)
{
std::string serializedBatch = serializeBatch(batch);
sharedManager->publish(channel, serializedBatch);
}
//------------------------------------------------------------------------------
// Get key, if it exists
//------------------------------------------------------------------------------
bool TransientSharedHash::get(const std::string &key, std::string &value) const
{
std::lock_guard lock(contentsMtx);
auto it = contents.find(key);
if(it == contents.end()) {
return false;
}
value = it->second;
return true;
}
//------------------------------------------------------------------------------
// Get vector of keys in the hash
//------------------------------------------------------------------------------
std::vector TransientSharedHash::getKeys() const
{
std::vector keys;
std::lock_guard lock(contentsMtx);
for (const auto& elem: contents) {
keys.push_back(elem.first);
}
return keys;
}
//------------------------------------------------------------------------------
// Get contents of the hash
//------------------------------------------------------------------------------
std::map
TransientSharedHash::getContents() const
{
std::lock_guard lock(contentsMtx);
return contents;
}
}