// ----------------------------------------------------------------------
// File: SharedHashWrapper.cc
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2019 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "SharedHashWrapper.hh"
#include "mq/XrdMqSharedObject.hh"
#include "mq/MessagingRealm.hh"
#include "common/ParseUtils.hh"
#include "common/StringUtils.hh"
#include "common/Locators.hh"
#include
#include
#include
EOSMQNAMESPACE_BEGIN
static std::string LOCAL_PREFIX = "local.";
//------------------------------------------------------------------------------
// Set value, detect based on prefix whether it should be durable,
// transient, or local
//------------------------------------------------------------------------------
void SharedHashWrapper::Batch::Set(const std::string& key,
const std::string& value)
{
if (common::startsWith(key, LOCAL_PREFIX)) {
SetLocal(key, value);
} else if (common::startsWith(key, "stat.")) {
SetTransient(key, value);
} else {
SetDurable(key, value);
}
}
//------------------------------------------------------------------------------
// Set durable value
//------------------------------------------------------------------------------
void SharedHashWrapper::Batch::SetDurable(const std::string& key,
const std::string& value)
{
mDurableUpdates[key] = value;
}
//------------------------------------------------------------------------------
// Set transient value
//------------------------------------------------------------------------------
void SharedHashWrapper::Batch::SetTransient(const std::string& key,
const std::string& value)
{
mTransientUpdates[key] = value;
}
//------------------------------------------------------------------------------
// Set local value
//------------------------------------------------------------------------------
void SharedHashWrapper::Batch::SetLocal(const std::string& key,
const std::string& value)
{
mLocalUpdates[key] = value;
}
//------------------------------------------------------------------------------
// Constructor SharedHashWrapper
//------------------------------------------------------------------------------
SharedHashWrapper::SharedHashWrapper(mq::MessagingRealm* realm,
const common::SharedHashLocator& locator,
bool takeLock, bool create)
: mSom(realm->getSom()), mLocator(locator)
{
if (realm->haveQDB()) {
mSharedHash = realm->getHashProvider()->Get(locator);
} else {
if (takeLock) {
mReadLock.Grab(mSom->HashMutex);
}
mHash = mSom->GetObject(mLocator.getConfigQueue().c_str(), "hash");
if (!mHash && create) {
// Shared hash does not exist, create
mReadLock.Release();
mSom->CreateSharedHash(mLocator.getConfigQueue().c_str(),
mLocator.getBroadcastQueue().c_str(), mSom);
mReadLock.Grab(mSom->HashMutex);
mHash = mSom->GetObject(mLocator.getConfigQueue().c_str(), "hash");
} else if (mHash) {
std::unique_lock lock(mHash->mMutex);
mHash->SetBroadCastQueue(mLocator.getBroadcastQueue().c_str());
}
}
}
//------------------------------------------------------------------------------
// Destructor
//------------------------------------------------------------------------------
SharedHashWrapper::~SharedHashWrapper()
{
releaseLocks();
}
//------------------------------------------------------------------------------
// Subscribe for updates from the underlying hash
//------------------------------------------------------------------------------
std::unique_ptr
SharedHashWrapper::subscribe()
{
if (mSharedHash) {
return mSharedHash->subscribe();
}
return nullptr;
}
//------------------------------------------------------------------------------
// Make global MGM hash
//------------------------------------------------------------------------------
SharedHashWrapper
SharedHashWrapper::makeGlobalMgmHash(mq::MessagingRealm* realm)
{
return SharedHashWrapper(realm, common::SharedHashLocator::makeForGlobalHash());
}
//------------------------------------------------------------------------------
// Release any interal locks - DO NOT use this object any further
//------------------------------------------------------------------------------
void SharedHashWrapper::releaseLocks()
{
mHash = nullptr;
mReadLock.Release();
}
//------------------------------------------------------------------------------
// Set key-value pair
//------------------------------------------------------------------------------
bool SharedHashWrapper::set(const std::string& key, const std::string& value,
bool broadcast)
{
Batch batch;
batch.Set(key, value);
return set(batch);
}
//------------------------------------------------------------------------------
// Set key-value batch
//------------------------------------------------------------------------------
bool SharedHashWrapper::set(const Batch& batch)
{
if (!mSharedHash && !mHash) {
return false;
}
if (mSharedHash) {
qclient::UpdateBatch updateBatch;
for (auto it = batch.mDurableUpdates.begin();
it != batch.mDurableUpdates.end(); it++) {
updateBatch.setDurable(it->first, it->second);
}
for (auto it = batch.mTransientUpdates.begin();
it != batch.mTransientUpdates.end(); it++) {
updateBatch.setTransient(it->first, it->second);
}
for (auto it = batch.mLocalUpdates.begin();
it != batch.mLocalUpdates.end(); it++) {
updateBatch.setLocal(it->first, it->second);
}
std::future reply = mSharedHash->set(updateBatch);
reply.wait();
} else {
// @note this is a hack to avoid boot failures on the FST side when a new fs
// is registered. The problem is that the FST expects all config parameters
// to be available in the shared hash onec it receives an update for the fs id
// This can only be achieved if we make sure the "id" is the last update the
// FST receives after applying all the rest from the current batch.
std::unique_lock lock(mHash->mMutex);
std::map::const_iterator it_id;
bool has_id_update = false;
mHash->OpenTransaction();
for (auto it = batch.mDurableUpdates.begin(); it != batch.mDurableUpdates.end();
it++) {
if (it->first != "id") {
mHash->Set(it->first.c_str(), it->second.c_str(), true);
} else {
has_id_update = true;
it_id = it;
}
}
for (auto it = batch.mTransientUpdates.begin();
it != batch.mTransientUpdates.end(); it++) {
mHash->Set(it->first.c_str(), it->second.c_str(), true);
}
for (auto it = batch.mLocalUpdates.begin(); it != batch.mLocalUpdates.end();
it++) {
mHash->Set(it->first.c_str(), it->second.c_str(), false);
}
mHash->CloseTransaction();
// If there is an id update make sure this is the last one sent
if (has_id_update) {
mHash->Set(it_id->first.c_str(), it_id->second.c_str(), true);
}
}
return true;
}
//------------------------------------------------------------------------------
// Query the given key, return if retrieval successful
//------------------------------------------------------------------------------
bool SharedHashWrapper::get(const std::string& key, std::string& value)
{
if (mSharedHash) {
return mSharedHash->get(key, value);
} else if (mHash) {
std::unique_lock lock(mHash->mMutex);
value = mHash->Get(key.c_str());
return true;
} else {
return false;
}
}
//------------------------------------------------------------------------------
// Query the given key
//------------------------------------------------------------------------------
std::string SharedHashWrapper::get(const std::string& key)
{
std::string retval;
bool outcome = this->get(key, retval);
if (!outcome) {
return "";
}
return retval;
}
//------------------------------------------------------------------------------
// Query the given key - convert to long long automatically
//------------------------------------------------------------------------------
long long SharedHashWrapper::getLongLong(const std::string& key)
{
return eos::common::ParseLongLong(get(key));
}
//----------------------------------------------------------------------------
// Query the given key - convert to double automatically
//----------------------------------------------------------------------------
double SharedHashWrapper::getDouble(const std::string& key)
{
return eos::common::ParseDouble(get(key));
}
//------------------------------------------------------------------------------
// Query the given key, return if retrieval successful
//------------------------------------------------------------------------------
bool
SharedHashWrapper::get(const std::vector& keys,
std::map& values)
{
if (mSharedHash) {
return mSharedHash->get(keys, values);
} else if (mHash) {
std::unique_lock lock(mHash->mMutex);
std::transform(keys.begin(), keys.end(),
std::inserter(values, values.end()),
[this](const std::string & key) {
return std::make_pair(key, mHash->Get(key.c_str()));
});
return true;
} else {
return false;
}
}
//------------------------------------------------------------------------------
// Delete the given key
//------------------------------------------------------------------------------
bool SharedHashWrapper::del(const std::string& key, bool broadcast)
{
if (mSharedHash) {
qclient::UpdateBatch updateBatch;
if (common::startsWith(key, "stat.")) {
updateBatch.setTransient(key, "");
} else if (common::startsWith(key, "local.")) {
updateBatch.setLocal(key, "");
} else {
updateBatch.setDurable(key, "");
}
std::future reply = mSharedHash->set(updateBatch);
reply.wait();
return true;
} else if (mHash) {
std::unique_lock lock(mHash->mMutex);
return mHash->Delete(key.c_str(), broadcast);
} else {
return false;
}
}
//------------------------------------------------------------------------------
// Get all keys in hash
//------------------------------------------------------------------------------
bool SharedHashWrapper::getKeys(std::vector& out)
{
if (mSharedHash) {
out = mSharedHash->getKeys();
return true;
} else if (mHash) {
std::unique_lock lock(mHash->mMutex);
out = mHash->GetKeys();
return true;
} else {
return false;
}
}
//------------------------------------------------------------------------------
// Get all hash contents as a map
//------------------------------------------------------------------------------
bool SharedHashWrapper::getContents(std::map& out)
{
if (mSharedHash) {
out = mSharedHash->getContents();
return true;
} else if (mHash) {
std::unique_lock lock(mHash->mMutex);
out = mHash->GetContents();
return true;
} else {
return false;
}
}
//------------------------------------------------------------------------------
// Delete a shared hash, without creating an object first
//------------------------------------------------------------------------------
bool SharedHashWrapper::deleteHash(mq::MessagingRealm* realm,
const common::SharedHashLocator& locator)
{
if (realm->getQSom()) {
realm->getHashProvider()->Delete(locator);
return true;
} else if (realm->getSom()) {
return realm->getSom()->DeleteSharedHash(locator.getConfigQueue().c_str(),
true);
} else {
eos_static_crit("msg=\"no shared object manager\" locator=\"%s\"",
locator.getConfigQueue().c_str());
return false;
}
}
EOSMQNAMESPACE_END