// ---------------------------------------------------------------------- // File: SharedHashWrapper.cc // Author: Georgios Bitzes - CERN // ---------------------------------------------------------------------- /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2019 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "SharedHashWrapper.hh" #include "mq/XrdMqSharedObject.hh" #include "mq/MessagingRealm.hh" #include "common/ParseUtils.hh" #include "common/StringUtils.hh" #include "common/Locators.hh" #include #include #include EOSMQNAMESPACE_BEGIN static std::string LOCAL_PREFIX = "local."; //------------------------------------------------------------------------------ // Set value, detect based on prefix whether it should be durable, // transient, or local //------------------------------------------------------------------------------ void SharedHashWrapper::Batch::Set(const std::string& key, const std::string& value) { if (common::startsWith(key, LOCAL_PREFIX)) { SetLocal(key, value); } else if (common::startsWith(key, "stat.")) { SetTransient(key, value); } else { SetDurable(key, value); } } //------------------------------------------------------------------------------ // Set durable value //------------------------------------------------------------------------------ void SharedHashWrapper::Batch::SetDurable(const std::string& key, const std::string& value) { mDurableUpdates[key] = value; } //------------------------------------------------------------------------------ // Set transient value //------------------------------------------------------------------------------ void SharedHashWrapper::Batch::SetTransient(const std::string& key, const std::string& value) { mTransientUpdates[key] = value; } //------------------------------------------------------------------------------ // Set local value //------------------------------------------------------------------------------ void SharedHashWrapper::Batch::SetLocal(const std::string& key, const std::string& value) { mLocalUpdates[key] = value; } //------------------------------------------------------------------------------ // Constructor SharedHashWrapper //------------------------------------------------------------------------------ SharedHashWrapper::SharedHashWrapper(mq::MessagingRealm* realm, const common::SharedHashLocator& locator, bool takeLock, bool create) : mSom(realm->getSom()), mLocator(locator) { if (realm->haveQDB()) { mSharedHash = realm->getHashProvider()->Get(locator); } else { if (takeLock) { mReadLock.Grab(mSom->HashMutex); } mHash = mSom->GetObject(mLocator.getConfigQueue().c_str(), "hash"); if (!mHash && create) { // Shared hash does not exist, create mReadLock.Release(); mSom->CreateSharedHash(mLocator.getConfigQueue().c_str(), mLocator.getBroadcastQueue().c_str(), mSom); mReadLock.Grab(mSom->HashMutex); mHash = mSom->GetObject(mLocator.getConfigQueue().c_str(), "hash"); } else if (mHash) { std::unique_lock lock(mHash->mMutex); mHash->SetBroadCastQueue(mLocator.getBroadcastQueue().c_str()); } } } //------------------------------------------------------------------------------ // Destructor //------------------------------------------------------------------------------ SharedHashWrapper::~SharedHashWrapper() { releaseLocks(); } //------------------------------------------------------------------------------ // Subscribe for updates from the underlying hash //------------------------------------------------------------------------------ std::unique_ptr SharedHashWrapper::subscribe() { if (mSharedHash) { return mSharedHash->subscribe(); } return nullptr; } //------------------------------------------------------------------------------ // Make global MGM hash //------------------------------------------------------------------------------ SharedHashWrapper SharedHashWrapper::makeGlobalMgmHash(mq::MessagingRealm* realm) { return SharedHashWrapper(realm, common::SharedHashLocator::makeForGlobalHash()); } //------------------------------------------------------------------------------ // Release any interal locks - DO NOT use this object any further //------------------------------------------------------------------------------ void SharedHashWrapper::releaseLocks() { mHash = nullptr; mReadLock.Release(); } //------------------------------------------------------------------------------ // Set key-value pair //------------------------------------------------------------------------------ bool SharedHashWrapper::set(const std::string& key, const std::string& value, bool broadcast) { Batch batch; batch.Set(key, value); return set(batch); } //------------------------------------------------------------------------------ // Set key-value batch //------------------------------------------------------------------------------ bool SharedHashWrapper::set(const Batch& batch) { if (!mSharedHash && !mHash) { return false; } if (mSharedHash) { qclient::UpdateBatch updateBatch; for (auto it = batch.mDurableUpdates.begin(); it != batch.mDurableUpdates.end(); it++) { updateBatch.setDurable(it->first, it->second); } for (auto it = batch.mTransientUpdates.begin(); it != batch.mTransientUpdates.end(); it++) { updateBatch.setTransient(it->first, it->second); } for (auto it = batch.mLocalUpdates.begin(); it != batch.mLocalUpdates.end(); it++) { updateBatch.setLocal(it->first, it->second); } std::future reply = mSharedHash->set(updateBatch); reply.wait(); } else { // @note this is a hack to avoid boot failures on the FST side when a new fs // is registered. The problem is that the FST expects all config parameters // to be available in the shared hash onec it receives an update for the fs id // This can only be achieved if we make sure the "id" is the last update the // FST receives after applying all the rest from the current batch. std::unique_lock lock(mHash->mMutex); std::map::const_iterator it_id; bool has_id_update = false; mHash->OpenTransaction(); for (auto it = batch.mDurableUpdates.begin(); it != batch.mDurableUpdates.end(); it++) { if (it->first != "id") { mHash->Set(it->first.c_str(), it->second.c_str(), true); } else { has_id_update = true; it_id = it; } } for (auto it = batch.mTransientUpdates.begin(); it != batch.mTransientUpdates.end(); it++) { mHash->Set(it->first.c_str(), it->second.c_str(), true); } for (auto it = batch.mLocalUpdates.begin(); it != batch.mLocalUpdates.end(); it++) { mHash->Set(it->first.c_str(), it->second.c_str(), false); } mHash->CloseTransaction(); // If there is an id update make sure this is the last one sent if (has_id_update) { mHash->Set(it_id->first.c_str(), it_id->second.c_str(), true); } } return true; } //------------------------------------------------------------------------------ // Query the given key, return if retrieval successful //------------------------------------------------------------------------------ bool SharedHashWrapper::get(const std::string& key, std::string& value) { if (mSharedHash) { return mSharedHash->get(key, value); } else if (mHash) { std::unique_lock lock(mHash->mMutex); value = mHash->Get(key.c_str()); return true; } else { return false; } } //------------------------------------------------------------------------------ // Query the given key //------------------------------------------------------------------------------ std::string SharedHashWrapper::get(const std::string& key) { std::string retval; bool outcome = this->get(key, retval); if (!outcome) { return ""; } return retval; } //------------------------------------------------------------------------------ // Query the given key - convert to long long automatically //------------------------------------------------------------------------------ long long SharedHashWrapper::getLongLong(const std::string& key) { return eos::common::ParseLongLong(get(key)); } //---------------------------------------------------------------------------- // Query the given key - convert to double automatically //---------------------------------------------------------------------------- double SharedHashWrapper::getDouble(const std::string& key) { return eos::common::ParseDouble(get(key)); } //------------------------------------------------------------------------------ // Query the given key, return if retrieval successful //------------------------------------------------------------------------------ bool SharedHashWrapper::get(const std::vector& keys, std::map& values) { if (mSharedHash) { return mSharedHash->get(keys, values); } else if (mHash) { std::unique_lock lock(mHash->mMutex); std::transform(keys.begin(), keys.end(), std::inserter(values, values.end()), [this](const std::string & key) { return std::make_pair(key, mHash->Get(key.c_str())); }); return true; } else { return false; } } //------------------------------------------------------------------------------ // Delete the given key //------------------------------------------------------------------------------ bool SharedHashWrapper::del(const std::string& key, bool broadcast) { if (mSharedHash) { qclient::UpdateBatch updateBatch; if (common::startsWith(key, "stat.")) { updateBatch.setTransient(key, ""); } else if (common::startsWith(key, "local.")) { updateBatch.setLocal(key, ""); } else { updateBatch.setDurable(key, ""); } std::future reply = mSharedHash->set(updateBatch); reply.wait(); return true; } else if (mHash) { std::unique_lock lock(mHash->mMutex); return mHash->Delete(key.c_str(), broadcast); } else { return false; } } //------------------------------------------------------------------------------ // Get all keys in hash //------------------------------------------------------------------------------ bool SharedHashWrapper::getKeys(std::vector& out) { if (mSharedHash) { out = mSharedHash->getKeys(); return true; } else if (mHash) { std::unique_lock lock(mHash->mMutex); out = mHash->GetKeys(); return true; } else { return false; } } //------------------------------------------------------------------------------ // Get all hash contents as a map //------------------------------------------------------------------------------ bool SharedHashWrapper::getContents(std::map& out) { if (mSharedHash) { out = mSharedHash->getContents(); return true; } else if (mHash) { std::unique_lock lock(mHash->mMutex); out = mHash->GetContents(); return true; } else { return false; } } //------------------------------------------------------------------------------ // Delete a shared hash, without creating an object first //------------------------------------------------------------------------------ bool SharedHashWrapper::deleteHash(mq::MessagingRealm* realm, const common::SharedHashLocator& locator) { if (realm->getQSom()) { realm->getHashProvider()->Delete(locator); return true; } else if (realm->getSom()) { return realm->getSom()->DeleteSharedHash(locator.getConfigQueue().c_str(), true); } else { eos_static_crit("msg=\"no shared object manager\" locator=\"%s\"", locator.getConfigQueue().c_str()); return false; } } EOSMQNAMESPACE_END