// ---------------------------------------------------------------------- // File: XrdMqSharedObject.cc // Author: Andreas-Joachim Peters - CERN // ---------------------------------------------------------------------- /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2011 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "mq/XrdMqSharedObject.hh" #include "mq/XrdMqMessaging.hh" #include "common/Logging.hh" #include "common/StringConversion.hh" #include "common/ParseUtils.hh" #include "XrdSys/XrdSysTimer.hh" #include "XrdOuc/XrdOucEnv.hh" #include #include #include using eos::common::RWMutexReadLock; using eos::common::RWMutexWriteLock; std::atomic XrdMqSharedObjectManager::sDebug {false}; // Static counters std::atomic XrdMqSharedHash::sSetCounter {0}; std::atomic XrdMqSharedHash::sSetNLCounter {0}; std::atomic XrdMqSharedHash::sGetCounter {0}; thread_local XrdMqSharedObjectChangeNotifier::Subscriber* XrdMqSharedObjectChangeNotifier::tlSubscriber = NULL; //------------------------------------------------------------------------------ // //------------------------------------------------------------------------------ #define _NotifierMapUpdate(map,key,subscriber) \ { \ auto entry = map.find(key); \ if( entry != map.end() ) { \ entry->second.mSubscribers.erase(subscriber); \ if(entry->second.mSubscribers.empty()) { \ if(entry->second.mRegex) { \ regfree(entry->second.mRegex); \ delete entry->second.mRegex; \ } \ map.erase(entry); \ } \ } \ } //------------------------------------------------------------------------------ // * * * Class XrdMqSharedHashEntry * * * //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ // Constructor //------------------------------------------------------------------------------ XrdMqSharedHashEntry::XrdMqSharedHashEntry(): mKey(""), mValue(""), mChangeId(0) { mMtime.tv_sec = 0; mMtime.tv_usec = 0; } //------------------------------------------------------------------------------ // Constructor with parameters //------------------------------------------------------------------------------ XrdMqSharedHashEntry::XrdMqSharedHashEntry(const char* key, const char* value): mChangeId(0) { gettimeofday(&mMtime, 0); mKey = (key ? key : ""); mValue = (value ? value : ""); } //------------------------------------------------------------------------------ // Copy assignment operator //------------------------------------------------------------------------------ XrdMqSharedHashEntry& XrdMqSharedHashEntry::operator=(const XrdMqSharedHashEntry& other) { if (this != &other) { mChangeId = other.mChangeId; mKey = other.mKey; mValue = other.mValue; mMtime.tv_sec = other.mMtime.tv_sec; mMtime.tv_usec = other.mMtime.tv_usec; } return *this; } //------------------------------------------------------------------------------ // Copy constructor //------------------------------------------------------------------------------ XrdMqSharedHashEntry::XrdMqSharedHashEntry(const XrdMqSharedHashEntry& other) { *this = other; } //---------------------------------------------------------------------------- //! Move constructor //---------------------------------------------------------------------------- XrdMqSharedHashEntry::XrdMqSharedHashEntry(XrdMqSharedHashEntry&& other): mKey(std::move(other.mKey)), mValue(std::move(other.mValue)), mChangeId(other.mChangeId), mMtime(other.mMtime) {} //------------------------------------------------------------------------------ // Move assignment operator //------------------------------------------------------------------------------ XrdMqSharedHashEntry& XrdMqSharedHashEntry::operator=(XrdMqSharedHashEntry&& other) { if (this != &other) { mKey = std::move(other.mKey); mValue = std::move(other.mValue); mChangeId = other.mChangeId; mMtime = other.mMtime; } return *this; } //------------------------------------------------------------------------------ // Get age in milliseconds //------------------------------------------------------------------------------ long long XrdMqSharedHashEntry::GetAgeInMilliSeconds() { struct timeval ntime; gettimeofday(&ntime, 0); return (((ntime.tv_sec - mMtime.tv_sec) * 1000) + ((ntime.tv_usec - mMtime.tv_usec) / 1000)); } //------------------------------------------------------------------------------ // Get age in seconds //------------------------------------------------------------------------------ double XrdMqSharedHashEntry::GetAgeInSeconds() { return GetAgeInMilliSeconds() / 1000.0; } //------------------------------------------------------------------------------ // Append entry representation the output string //------------------------------------------------------------------------------ void XrdMqSharedHashEntry::Dump(XrdOucString& out) { char format_line[65536]; snprintf(format_line, sizeof(format_line) - 1, "value:%-32s age:%.2f changeid:%llu", mValue.c_str(), GetAgeInSeconds(), mChangeId); out += format_line; } //------------------------------------------------------------------------------ // * * * Class XrdMqSharedObjectHash * * * //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ // Constructor //------------------------------------------------------------------------------ XrdMqSharedHash::XrdMqSharedHash(const char* subject, const char* bcast_queue, XrdMqSharedObjectManager* som): mType("hash"), mSOM(som), mSubject((subject ? subject : "")), mIsTransaction(false), mBroadcastQueue((bcast_queue ? bcast_queue : "")), mTransactMutex(new XrdSysMutex()), mStoreMutex(new eos::common::RWMutex()) {} //------------------------------------------------------------------------------ // Move constructor //------------------------------------------------------------------------------ XrdMqSharedHash::XrdMqSharedHash(XrdMqSharedHash&& other): mSOM(nullptr) { *this = std::move(other); } //------------------------------------------------------------------------------ // Move assignment operator //------------------------------------------------------------------------------ XrdMqSharedHash& XrdMqSharedHash::operator=(XrdMqSharedHash&& other) { if (this != &other) { mSOM = nullptr; mTransactMutex.reset(nullptr); mStoreMutex.reset(nullptr); mType = other.mType; std::swap(mSOM, other.mSOM); mSubject = other.mSubject; mIsTransaction = other.mIsTransaction.load(); mBroadcastQueue = other.mBroadcastQueue; std::swap(mStore, other.mStore); std::swap(mDeletions, other.mDeletions); std::swap(mTransactions, other.mTransactions); std::swap(mTransactMutex, other.mTransactMutex); std::swap(mStoreMutex, other.mStoreMutex); } return *this; } //------------------------------------------------------------------------------ // Get size of the hash //------------------------------------------------------------------------------ unsigned int XrdMqSharedHash::GetSize() { RWMutexReadLock rd_lock(*mStoreMutex); return (unsigned int) mStore.size(); } //------------------------------------------------------------------------------ // Get age in milliseconds for a certain key //------------------------------------------------------------------------------ unsigned long long XrdMqSharedHash::GetAgeInMilliSeconds(const char* key) { RWMutexReadLock rd_lock(*mStoreMutex); unsigned long long val = (mStore.count(key) ? mStore[key].GetAgeInMilliSeconds() : 0); return val; } //------------------------------------------------------------------------------ // Get age in seconds for a certain key //------------------------------------------------------------------------------ unsigned long long XrdMqSharedHash::GetAgeInSeconds(const char* key) { RWMutexReadLock rd_lock(*mStoreMutex); unsigned long long val = (mStore.count(key) ? (unsigned long long) mStore[key].GetAgeInSeconds() : (unsigned long long) 0); return val; } //------------------------------------------------------------------------------ // Get entry value for key //------------------------------------------------------------------------------ std::string XrdMqSharedHash::Get(const std::string& key) { sGetCounter++; std::string value = ""; RWMutexReadLock rd_lock(*mStoreMutex); if (mStore.count(key)) { value = mStore[key].GetValue(); } return value; } //------------------------------------------------------------------------------ // Get a copy of all the keys //------------------------------------------------------------------------------ std::vector XrdMqSharedHash::GetKeys() { std::vector keys; RWMutexReadLock rd_lock(*mStoreMutex); for (auto it = mStore.begin(); it != mStore.end(); ++it) { keys.push_back(it->first); } return keys; } //------------------------------------------------------------------------------ // Get a copy of all the keys + values // // @return map containing all the key-value pairs in the hash //------------------------------------------------------------------------------ std::map XrdMqSharedHash::GetContents() { std::map contents; RWMutexReadLock rd_lock(*mStoreMutex); for (auto it = mStore.begin(); it != mStore.end(); ++it) { contents.emplace(it->first, it->second.GetValue()); } return contents; } //------------------------------------------------------------------------------ // Get key value as long long //------------------------------------------------------------------------------ long long XrdMqSharedHash::GetLongLong(const char* key) { return eos::common::ParseLongLong(Get(key)); } //------------------------------------------------------------------------------ // Get key value as double //------------------------------------------------------------------------------ double XrdMqSharedHash::GetDouble(const char* key) { return eos::common::ParseDouble(Get(key)); } //------------------------------------------------------------------------------ // Get key value as unsigned int //------------------------------------------------------------------------------ unsigned int XrdMqSharedHash::GetUInt(const char* key) { return (unsigned int) GetLongLong(key); } //------------------------------------------------------------------------------ // Open transaction //------------------------------------------------------------------------------ bool XrdMqSharedHash::OpenTransaction() { mTransactMutex->Lock(); mTransactions.clear(); mIsTransaction = true; return true; } //------------------------------------------------------------------------------- // Close transaction //------------------------------------------------------------------------------- bool XrdMqSharedHash::CloseTransaction() { bool retval = true; if (mSOM->mBroadcast && mTransactions.size()) { XrdOucString txmessage = ""; MakeUpdateEnvHeader(txmessage); AddTransactionsToEnvString(txmessage, false); if (txmessage.length() > (2 * 1000 * 1000)) { // Set the message size limit to 2M, if the message is bigger then just // send transaction item by item. for (auto it = mTransactions.begin(); it != mTransactions.end(); ++it) { txmessage = ""; MakeUpdateEnvHeader(txmessage); txmessage += "&"; txmessage += XRDMQSHAREDHASH_PAIRS; txmessage += "="; RWMutexReadLock rd_lock(*mStoreMutex); if ((mStore.count(it->c_str()))) { txmessage += "|"; txmessage += it->c_str(); txmessage += "~"; txmessage += mStore[it->c_str()].GetValue(); txmessage += "%"; char cid[1024]; snprintf(cid, sizeof(cid) - 1, "%llu", mStore[it->c_str()].GetChangeId()); txmessage += cid; } XrdMqMessage message("XrdMqSharedHashMessage"); message.SetBody(txmessage.c_str()); message.MarkAsMonitor(); retval &= XrdMqMessaging::gMessageClient.SendMessage(message, mBroadcastQueue.c_str(), false, false, true); } } else { XrdMqMessage message("XrdMqSharedHashMessage"); message.SetBody(txmessage.c_str()); message.MarkAsMonitor(); retval &= XrdMqMessaging::gMessageClient.SendMessage(message, mBroadcastQueue.c_str(), false, false, true); } } if (mSOM->mBroadcast && mDeletions.size()) { XrdOucString txmessage = ""; MakeDeletionEnvHeader(txmessage); AddDeletionsToEnvString(txmessage); XrdMqMessage message("XrdMqSharedHashMessage"); message.SetBody(txmessage.c_str()); message.MarkAsMonitor(); retval &= XrdMqMessaging::gMessageClient.SendMessage(message, mBroadcastQueue.c_str(), false, false, true); } mTransactions.clear(); mIsTransaction = false; mTransactMutex->UnLock(); return retval; } //------------------------------------------------------------------------------- // Construct broadcast env header //------------------------------------------------------------------------------- void XrdMqSharedHash::MakeBroadCastEnvHeader(XrdOucString& out) { out = XRDMQSHAREDHASH_BCREPLY; out += "&"; out += XRDMQSHAREDHASH_SUBJECT; out += "="; out += mSubject.c_str(); out += "&"; out += XRDMQSHAREDHASH_TYPE; out += "="; out += mType.c_str(); } //------------------------------------------------------------------------------- // Construct update env header //------------------------------------------------------------------------------- void XrdMqSharedHash::MakeUpdateEnvHeader(XrdOucString& out) { out = XRDMQSHAREDHASH_UPDATE; out += "&"; out += XRDMQSHAREDHASH_SUBJECT; out += "="; out += mSubject.c_str(); out += "&"; out += XRDMQSHAREDHASH_TYPE; out += "="; out += mType.c_str(); } //------------------------------------------------------------------------------- // Construct deletion env header //------------------------------------------------------------------------------- void XrdMqSharedHash::MakeDeletionEnvHeader(XrdOucString& out) { out = XRDMQSHAREDHASH_DELETE; out += "&"; out += XRDMQSHAREDHASH_SUBJECT; out += "="; out += mSubject.c_str(); out += "&"; out += XRDMQSHAREDHASH_TYPE; out += "="; out += mType.c_str(); } //------------------------------------------------------------------------------- // Construct remove env header //------------------------------------------------------------------------------- void XrdMqSharedHash::MakeRemoveEnvHeader(XrdOucString& out) { out = XRDMQSHAREDHASH_REMOVE; out += "&"; out += XRDMQSHAREDHASH_SUBJECT; out += "="; out += mSubject.c_str(); out += "&"; out += XRDMQSHAREDHASH_TYPE; out += "="; out += mType.c_str(); } //------------------------------------------------------------------------------- // Broadcast hash as env string //------------------------------------------------------------------------------- bool XrdMqSharedHash::BroadCastEnvString(const char* receiver) { XrdOucString txmessage = ""; { XrdSysMutexHelper lock(*mTransactMutex); mTransactions.clear(); mIsTransaction = true; { RWMutexReadLock rd_lock(*mStoreMutex); for (auto it = mStore.begin(); it != mStore.end(); ++it) { // @todo(esindril) needs review as there is no clear difference as some // stat. parameters do need to be broadcasted! // // Skip broadcasting transient values // if ((strncmp(it->first.c_str(), "stat.", 5) == 0) && // (it->first != "stat.active")) { // continue; // } mTransactions.insert(it->first); } } MakeBroadCastEnvHeader(txmessage); // This will also clear the mTransactions set AddTransactionsToEnvString(txmessage); mIsTransaction = false; } if (mSOM->mBroadcast) { XrdMqMessage message("XrdMqSharedHashMessage"); message.SetBody(txmessage.c_str()); message.MarkAsMonitor(); if (XrdMqSharedObjectManager::sDebug) { fprintf(stderr, "XrdMqSharedObjectManager::BroadCastEnvString=>[%s]=>%s msg=%s\n", mSubject.c_str(), receiver, txmessage.c_str()); } return XrdMqMessaging::gMessageClient.SendMessage(message, receiver, false, false, true); } return true; } //------------------------------------------------------------------------------- // Encode transactions to env string - this must be called with the // mTransactMutex locked. //------------------------------------------------------------------------------- void XrdMqSharedHash::AddTransactionsToEnvString(XrdOucString& out, bool clear_after) { // Encode transactions as // "mysh.pairs=|~%|~%c_str()))) { out += "|"; out += it->c_str(); out += "~"; out += mStore[it->c_str()].GetValue(); out += "%"; char cid[1024]; snprintf(cid, sizeof(cid) - 1, "%llu", mStore[it->c_str()].GetChangeId()); out += cid; } } if (clear_after) { mTransactions.clear(); } } //------------------------------------------------------------------------------- // Encode deletions as env string - this must be called with the mTransactMutex // locked. //------------------------------------------------------------------------------- void XrdMqSharedHash::AddDeletionsToEnvString(XrdOucString& out) { // Encode deletions as "mysh.keys=|| ...." out += "&"; out += XRDMQSHAREDHASH_KEYS; out += "="; RWMutexWriteLock wr_lock(*mStoreMutex); for (auto it = mDeletions.begin(); it != mDeletions.end(); ++it) { out += "|"; out += it->c_str(); } mDeletions.clear(); } //------------------------------------------------------------------------------- // Build and send broadcast request //------------------------------------------------------------------------------- bool XrdMqSharedHash::BroadcastRequest(const char* req_target) { XrdOucString out; XrdMqMessage message("XrdMqSharedHashMessage"); out += XRDMQSHAREDHASH_BCREQUEST; out += "&"; out += XRDMQSHAREDHASH_SUBJECT; out += "="; out += mSubject.c_str(); out += "&"; out += XRDMQSHAREDHASH_REPLY; out += "="; out += XrdMqMessaging::gMessageClient.GetClientId(); out += "&"; out += XRDMQSHAREDHASH_TYPE; out += "="; out += mType.c_str(); message.SetBody(out.c_str()); message.MarkAsMonitor(); return XrdMqMessaging::gMessageClient.SendMessage(message, req_target, false, false, true); } //------------------------------------------------------------------------------- // Dump hash map representation to output string //------------------------------------------------------------------------------- void XrdMqSharedHash::Dump(XrdOucString& out) { char key_print[64]; RWMutexReadLock rd_lock(*mStoreMutex); for (auto it = mStore.begin(); it != mStore.end(); ++it) { snprintf(key_print, sizeof(key_print) - 1, "key=%-24s", it->first.c_str()); out += key_print; out += " "; it->second.Dump(out); out += "\n"; } } //------------------------------------------------------------------------------- // Delete key entry //------------------------------------------------------------------------------- bool XrdMqSharedHash::Delete(const std::string& key, bool broadcast) { bool deleted = false; RWMutexWriteLock wr_lock(*mStoreMutex); if (mStore.count(key)) { mStore.erase(key); deleted = true; if (mSOM->mBroadcast && broadcast) { // Emulate transaction for single shot deletions if (!mIsTransaction) { mTransactMutex->Lock(); mTransactions.clear(); } mDeletions.insert(key); mTransactions.erase(key); // Emulate transaction for single shot deletions if (!mIsTransaction) { wr_lock.Release(); CloseTransaction(); } } // Check if we have to post for this subject if (mSOM) { std::string fkey = mSubject.c_str(); fkey += ";"; fkey += key; if (XrdMqSharedObjectManager::sDebug) { fprintf(stderr, "XrdMqSharedObjectManager::Delete=>[%s:%s] notified\n", mSubject.c_str(), key.c_str()); } XrdMqSharedObjectManager::Notification event(fkey, XrdMqSharedObjectManager::kMqSubjectKeyDeletion); mSOM->mSubjectsMutex.Lock(); mSOM->mNotificationSubjects.push_back(event); mSOM->SubjectsSem.Post(); mSOM->mSubjectsMutex.UnLock(); } } return deleted; } //------------------------------------------------------------------------------- // Clear contents of the hash //------------------------------------------------------------------------------- void XrdMqSharedHash::Clear(bool broadcast) { RWMutexWriteLock wr_lock(*mStoreMutex); for (auto it = mStore.begin(); it != mStore.end(); ++it) { if (mIsTransaction) { if (mSOM->mBroadcast && broadcast) { mDeletions.insert(it->first); } mTransactions.erase(it->first); } } mStore.clear(); } //------------------------------------------------------------------------------- // Set entry in hash map //------------------------------------------------------------------------------- bool XrdMqSharedHash::SetImpl(const char* key, const char* value, bool broadcast) { std::string skey = key; { RWMutexWriteLock wr_lock(*mStoreMutex); if (mStore.count(skey) == 0) { mStore.insert(std::make_pair(skey, XrdMqSharedHashEntry(key, value))); } else { mStore[skey] = XrdMqSharedHashEntry(key, value); } } if (mSOM->mBroadcast && broadcast) { bool is_transact = false; // mSOM->IsMuxTransaction is tested first to avoid contention on the // MuxTransactionsMutex and then is tested again when we actually have the // lock to check it didn't change in the meantime - hackish, needs fix! if (mSOM->IsMuxTransaction) { XrdSysMutexHelper lock(mSOM->MuxTransactionsMutex); if (mSOM->IsMuxTransaction) { mSOM->MuxTransactions[mSubject].insert(skey); is_transact = true; } } if (!is_transact) { // Emulate a transaction for a single set operation bool emulate_transact = false; if (!mIsTransaction) { mTransactMutex->Lock(); mTransactions.clear(); emulate_transact = true; } mTransactions.insert(skey); if (emulate_transact) { CloseTransaction(); } } } // Check if we have to post for this subject if (mSOM) { std::string fkey = mSubject.c_str(); fkey += ";"; fkey += skey; if (XrdMqSharedObjectManager::sDebug) { fprintf(stderr, "XrdMqSharedObjectManager::Set=>[%s:%s]=>%s notified\n", mSubject.c_str(), skey.c_str(), value); } XrdSysMutexHelper lock(mSOM->mSubjectsMutex); XrdMqSharedObjectManager::Notification event (fkey, XrdMqSharedObjectManager::kMqSubjectModification); mSOM->mNotificationSubjects.push_back(event); mSOM->SubjectsSem.Post(); } return true; } //------------------------------------------------------------------------------ // * * * Class XrdMqSharedQueue * * * //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ // Constructor //------------------------------------------------------------------------------ XrdMqSharedQueue::XrdMqSharedQueue(const char* subject, const char* bcast_queue, XrdMqSharedObjectManager* som): XrdMqSharedHash(subject, bcast_queue, som), mQMutex(new XrdSysMutex()), mLastObjId(0) { mType = "queue"; } //------------------------------------------------------------------------------ // Move constructor //------------------------------------------------------------------------------ XrdMqSharedQueue::XrdMqSharedQueue(XrdMqSharedQueue&& other) { *this = std::move(other); } //------------------------------------------------------------------------------ // Move assignment operator //------------------------------------------------------------------------------ XrdMqSharedQueue& XrdMqSharedQueue::operator=(XrdMqSharedQueue&& other) { if (this != &other) { mQMutex.reset(nullptr); XrdMqSharedHash::operator=(std::move(other)); std::swap(mQMutex, other.mQMutex); std::swap(mQueue, other.mQueue); std::swap(mLastObjId, other.mLastObjId); } return *this; } //------------------------------------------------------------------------------ // Delete entry from queue //------------------------------------------------------------------------------ bool XrdMqSharedQueue::Delete(const std::string& key, bool broadcast) { if (!key.empty()) { XrdSysMutexHelper lock(*mQMutex); bool found = false; for (auto it = mQueue.begin(); it != mQueue.end(); ++it) { if (*it == key) { mQueue.erase(it); found = true; break; } } if (found) { return XrdMqSharedHash::Delete(key); } } return false; } //------------------------------------------------------------------------------ // Push back entry into the queue //------------------------------------------------------------------------------ bool XrdMqSharedQueue::PushBack(const std::string& key, const std::string& value) { if (value.empty()) { fprintf(stderr, "Error: key=%s has empty value for queue!\n", key.c_str()); return false; } return SetImpl(key.c_str(), value.c_str(), true); } //------------------------------------------------------------------------------ // Get first entry value from the queue //------------------------------------------------------------------------------ std::string XrdMqSharedQueue::PopFront() { std::string value = ""; XrdSysMutexHelper lock(*mQMutex); if (!mQueue.empty()) { std::string key = mQueue.front(); mQueue.pop_front(); value = XrdMqSharedHash::Get(key); (void) XrdMqSharedHash::Delete(key); } return value; } //------------------------------------------------------------------------------- // Set entry in queue //------------------------------------------------------------------------------- bool XrdMqSharedQueue::SetImpl(const char* key, const char* value, bool broadcast) { std::string uuid; XrdSysMutexHelper lock(*mQMutex); if (!key || (*key == '\0')) { char lld[1024]; mLastObjId++; snprintf(lld, 1023, "%llu", mLastObjId); uuid = lld; } else { uuid = key; } if (mStore.find(uuid) == mStore.end()) { if (XrdMqSharedHash::SetImpl(uuid.c_str(), value, broadcast)) { mQueue.push_back(uuid); return true; } } return false; } //------------------------------------------------------------------------------ // * * * Class XrdMqSharedObjectChangeNotifier * * * //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ // //------------------------------------------------------------------------------ bool XrdMqSharedObjectChangeNotifier::SubscribesToSubject(const std::string& subscriber, const std::string& subject, XrdMqSharedObjectChangeNotifier::notification_t type) { eos_static_debug("subscribing to subject %s", subject.c_str()); Subscriber* s = GetSubscriberFromCatalog(subscriber); XrdSysMutexHelper lock(s->WatchMutex); if (s->WatchSubjects[type].count(subject)) { return false; } s->WatchSubjects[type].insert(subject); if (s->Notify) { // if the notification is started for this process, update it if (!StartNotifySubject(s, subject, type)) { return false; } } return true; } /*----------------------------------------------------------------------------*/ bool XrdMqSharedObjectChangeNotifier::SubscribesToSubjectRegex( const std::string& subscriber, const std::string& subject, XrdMqSharedObjectChangeNotifier::notification_t type) { Subscriber* s = GetSubscriberFromCatalog(subscriber); XrdSysMutexHelper lock(s->WatchMutex); eos_static_debug("subscribing to subject regex %s", subject.c_str()); if (s->WatchSubjectsRegex[type].count(subject)) { return false; } s->WatchSubjectsRegex[type].insert(subject); if (s->Notify) { // if the notification is started for this process, update it if (!StartNotifySubjectRegex(s, subject, type)) { return false; } } return true; } /*----------------------------------------------------------------------------*/ bool XrdMqSharedObjectChangeNotifier::SubscribesToKey(const std::string& subscriber, const std::string& key, XrdMqSharedObjectChangeNotifier::notification_t type) { Subscriber* s = GetSubscriberFromCatalog(subscriber); XrdSysMutexHelper lock(s->WatchMutex); eos_static_debug("subscribing to key %s", key.c_str()); if (s->WatchKeys[type].count(key)) { return false; } s->WatchKeys[type].insert(key); if (s->Notify) { // if the notification is started for this process, update it if (!StartNotifyKey(s, key, type)) { return false; } } return true; } /*----------------------------------------------------------------------------*/ bool XrdMqSharedObjectChangeNotifier::SubscribesToKeyRegex(const std::string& subscriber, const std::string& key, XrdMqSharedObjectChangeNotifier::notification_t type) { Subscriber* s = GetSubscriberFromCatalog(subscriber); XrdSysMutexHelper lock(s->WatchMutex); eos_static_debug("subscribing to key regex %s", key.c_str()); if (s->WatchKeysRegex[type].count(key)) { return false; } s->WatchKeysRegex[type].insert(key); if (s->Notify) { // if the notification is started for this process, update it if (!StartNotifyKeyRegex(s, key, type)) { return false; } } return true; } /*----------------------------------------------------------------------------*/ bool XrdMqSharedObjectChangeNotifier::SubscribesToSubjectAndKey( const std::string& subscriber, const std::set& subjects, const std::set& keys, XrdMqSharedObjectChangeNotifier::notification_t type) { eos::common::Logging& g_logging = eos::common::Logging::GetInstance(); Subscriber* s = GetSubscriberFromCatalog(subscriber); XrdSysMutexHelper lock(s->WatchMutex); if (g_logging.gLogMask & LOG_MASK(LOG_DEBUG)) { size_t bufsize = 0; for (auto it = subjects.begin(); it != subjects.end(); ++it) { bufsize += (it->size() + 1); } for (auto it = keys.begin(); it != keys.end(); ++it) { bufsize += (it->size() + 1); } bufsize += 64; int sz; char* buffer = new char[bufsize]; char* buf = buffer; sz = snprintf(buf, bufsize, "subscribing to subjects [ "); buf += sz; bufsize -= sz; for (auto it = subjects.begin(); it != subjects.end(); ++it) { sz = snprintf(buf, bufsize, "%s ", it->c_str()); buf += sz; bufsize -= sz; } sz = snprintf(buf, bufsize, "] times keys [ "); buf += sz; bufsize -= sz; for (auto it = keys.begin(); it != keys.end(); ++it) { sz = snprintf(buf, bufsize, "%s ", it->c_str()); buf += sz; bufsize -= sz; } sz = snprintf(buf, bufsize, "]"); eos_static_debug("%s", buffer); delete[] buffer; } // firstly update the thread-local vector bool insertIntoExisiting = false; { for (auto it = s->WatchSubjectsXKeys[type].begin(); it != s->WatchSubjectsXKeys[type].end(); ++it) { // { // size_t bufsize=0; // for(auto it2 = it->first.begin(); it2 != it->first.end(); ++it2) // bufsize+=(it2->size()+1); // for(auto it2 = it->second.begin(); it2 != it->second.end(); ++it2) // bufsize+=(it2->size()+1); // bufsize += 64; // int sz; // // char *buffer = new char[bufsize]; // char *buf=buffer; // sz = snprintf(buf,bufsize,"WatchSubjectsXKeys item : subjects [ "); // buf += sz; // bufsize -=sz; // // for(auto it2 = it->first.begin(); it2 != it->first.end(); ++it2) { // sz = snprintf(buf,bufsize,"%s ",it2->c_str()); // buf += sz; // bufsize -=sz; // } // // sz = snprintf(buf,bufsize,"] times keys [ "); // buf += sz; // bufsize -=sz; // // for(auto it2 = it->second.begin(); it2 != it->second.end(); ++it2) { // sz = snprintf(buf,bufsize,"%s ",it2->c_str()); // buf += sz; // bufsize -=sz; // } // sz = snprintf(buf,bufsize,"]"); // // eos_static_info("%s", buffer); // delete[] buffer; // } if (subjects == it->first) { size_t sizeBefore = it->second.size(); it->second.insert(keys.begin(), keys.end()); if (sizeBefore == it->second.size()) { return false; // nothing to insert } else { insertIntoExisiting = true; break; } } else if (keys == it->second) { size_t sizeBefore = it->first.size(); it->first.insert(subjects.begin(), subjects.end()); if (sizeBefore == it->first.size()) { return false; // nothing to insert } else { insertIntoExisiting = true; break; } } } if (!insertIntoExisiting) { s->WatchSubjectsXKeys[type].push_back(make_pair(subjects, keys)); } } if (s->Notify) { // update the ongoing notification return StartNotifySubjectsAndKeys(s, subjects, keys, type); } return true; } bool XrdMqSharedObjectChangeNotifier::SubscribesToSubjectAndKey( const std::string& subscriber, const std::string& subject, const std::string& key, XrdMqSharedObjectChangeNotifier::notification_t type) { std::set s, k; s.insert(subject); k.insert(key); return SubscribesToSubjectAndKey(subscriber, s, k, type); } bool XrdMqSharedObjectChangeNotifier::SubscribesToSubjectAndKey( const std::string& subscriber, const std::string& subject, const std::set& keys, XrdMqSharedObjectChangeNotifier::notification_t type) { std::set s; s.insert(subject); return SubscribesToSubjectAndKey(subscriber, s, keys, type); } bool XrdMqSharedObjectChangeNotifier::SubscribesToSubjectAndKey( const std::string& subscriber, const std::set& subjects, const std::string& key, XrdMqSharedObjectChangeNotifier::notification_t type) { std::set k; k.insert(key); return SubscribesToSubjectAndKey(subscriber, subjects, k, type); } /*----------------------------------------------------------------------------*/ bool XrdMqSharedObjectChangeNotifier::UnsubscribesToSubject(const std::string& subscriber, const std::string& subject, XrdMqSharedObjectChangeNotifier::notification_t type) { Subscriber* s = GetSubscriberFromCatalog(subscriber, false); if (!s) { return false; } XrdSysMutexHelper lock(s->WatchMutex); if (s->Notify) { // if the notification is started for this process, update it if (!StopNotifySubject(s, subject, type)) { return false; } } if (s->empty()) { delete s; s = NULL; } return true; } /*----------------------------------------------------------------------------*/ bool XrdMqSharedObjectChangeNotifier::UnsubscribesToSubjectRegex( const std::string& subscriber, const std::string& subject, XrdMqSharedObjectChangeNotifier::notification_t type) { Subscriber* s = GetSubscriberFromCatalog(subscriber, false); if (!s) { return false; } XrdSysMutexHelper lock(s->WatchMutex); if (s->Notify) { // if the notification is started for this process, update it if (!StopNotifySubjectRegex(s, subject, type)) { return false; } } if (s->empty()) { delete s; s = NULL; } return true; } /*----------------------------------------------------------------------------*/ bool XrdMqSharedObjectChangeNotifier::UnsubscribesToKey(const std::string& subscriber, const std::string& key, XrdMqSharedObjectChangeNotifier::notification_t type) { Subscriber* s = GetSubscriberFromCatalog(subscriber, false); if (!s) { return false; } XrdSysMutexHelper lock(s->WatchMutex); if (s->Notify) { // if the notification is started for this process, update it if (!StopNotifyKey(s, key, type)) { return false; } } if (s->empty()) { delete s; s = NULL; } return true; } /*----------------------------------------------------------------------------*/ bool XrdMqSharedObjectChangeNotifier::UnsubscribesToKeyRegex( const std::string& subscriber, const std::string& key, XrdMqSharedObjectChangeNotifier::notification_t type) { Subscriber* s = GetSubscriberFromCatalog(subscriber, false); if (!s) { return false; } XrdSysMutexHelper lock(s->WatchMutex); if (s->Notify) { // if the notification is started for this process, update it if (!StopNotifyKeyRegex(s, key, type)) { return false; } } if (s->empty()) { delete s; s = NULL; } return true; } /*----------------------------------------------------------------------------*/ bool XrdMqSharedObjectChangeNotifier::UnsubscribesToEverything( const std::string& subscriber) { Subscriber* s = GetSubscriberFromCatalog(subscriber, false); if (!s) { return false; } XrdSysMutexHelper lock(s->WatchMutex); if (s->Notify) { StopNotifyCurrentThread(); } delete s; s = NULL; return true; } ///*----------------------------------------------------------------------------*/ bool XrdMqSharedObjectChangeNotifier::UnsubscribesToSubjectAndKey( const std::string& subscriber, std::set subjects, std::set keys, XrdMqSharedObjectChangeNotifier::notification_t type) { eos::common::Logging& g_logging = eos::common::Logging::GetInstance(); if (g_logging.gLogMask & LOG_MASK(LOG_DEBUG)) { size_t bufsize = 0; for (auto it = subjects.begin(); it != subjects.end(); ++it) { bufsize += (it->size() + 1); } for (auto it = keys.begin(); it != keys.end(); ++it) { bufsize += (it->size() + 1); } bufsize += 64; int sz; char* buffer = new char[bufsize]; char* buf = buffer; sz = snprintf(buf, bufsize, "unsubscribing to subjects [ "); buf += sz; bufsize -= sz; for (auto it = subjects.begin(); it != subjects.end(); ++it) { sz = snprintf(buf, bufsize, "%s ", it->c_str()); buf += sz; bufsize -= sz; } sz = snprintf(buf, bufsize, "] times keys [ "); buf += sz; bufsize -= sz; for (auto it = keys.begin(); it != keys.end(); ++it) { sz = snprintf(buf, bufsize, "%s ", it->c_str()); buf += sz; bufsize -= sz; } sz = snprintf(buf, bufsize, "]"); eos_static_debug("%s", buffer); delete[] buffer; } Subscriber* s = GetSubscriberFromCatalog(subscriber, false); if (!s) { return false; } XrdSysMutexHelper lock(s->WatchMutex); // firstly update the thread-local vector bool removedAll = false; { for (auto it = s->WatchSubjectsXKeys[type].begin(); it != s->WatchSubjectsXKeys[type].end(); ++it) { if (it->first == subjects && std::includes(it->second.begin(), it->second.end(), keys.begin(), keys.end())) { set newKeys; set_difference(it->second.begin(), it->second.end(), keys.begin(), keys.end(), inserter(newKeys, newKeys.end())); it->second = newKeys; //it->second.erase(keys.begin(),keys.end()); removedAll = true; if (it->second.empty()) { s->WatchSubjectsXKeys[type].erase(it); } break; } else if (it->second == keys && std::includes(it->first.begin(), it->first.end(), subjects.begin(), subjects.end())) { set newSubjects; set_difference(it->first.begin(), it->first.end(), subjects.begin(), subjects.end(), inserter(newSubjects, newSubjects.end())); it->first = newSubjects; //it->first.erase(subjects.begin(),subjects.end()); removedAll = true; if (it->first.empty()) { s->WatchSubjectsXKeys[type].erase(it); } break; } } if (!removedAll) { return false; } } // SYNCHRONIZE AITH GLOBAL MAP if (s->Notify) { return StopNotifySubjectsAndKeys(s, subjects, keys, type); } return true; } bool XrdMqSharedObjectChangeNotifier::UnsubscribesToSubjectAndKey( const std::string& subscriber, const std::string& subject, const std::string& key, XrdMqSharedObjectChangeNotifier::notification_t type) { std::set s, k; s.insert(subject); k.insert(key); return UnsubscribesToSubjectAndKey(subscriber, s, k, type); } bool XrdMqSharedObjectChangeNotifier::UnsubscribesToSubjectAndKey( const std::string& subscriber, const std::string& subject, const std::set& keys, XrdMqSharedObjectChangeNotifier::notification_t type) { std::set s; s.insert(subject); return UnsubscribesToSubjectAndKey(subscriber, s, keys, type); } bool XrdMqSharedObjectChangeNotifier::UnsubscribesToSubjectAndKey( const std::string& subscriber, const std::set& subjects, const std::string& key, XrdMqSharedObjectChangeNotifier::notification_t type) { std::set k; k.insert(key); return UnsubscribesToSubjectAndKey(subscriber, subjects, k, type); } /*----------------------------------------------------------------------------*/ bool XrdMqSharedObjectChangeNotifier::StartNotifyKey(Subscriber* subscriber, const std::string& key, XrdMqSharedObjectChangeNotifier::notification_t type) { XrdSysMutexHelper lock(WatchMutex); return (WatchKeys2Subscribers[type][key].mSubscribers.insert( subscriber)).second; } /*----------------------------------------------------------------------------*/ bool XrdMqSharedObjectChangeNotifier::StartNotifyKeyRegex(Subscriber* subscriber, const std::string& key, XrdMqSharedObjectChangeNotifier::notification_t type) { XrdSysMutexHelper lock(WatchMutex); bool res = (WatchKeys2Subscribers[type][key].mSubscribers.insert( subscriber)).second; if (WatchKeys2Subscribers[type][key].mRegex == NULL) { regex_t* r = new regex_t; if (regcomp(r, key.c_str(), REG_NOSUB)) { WatchKeys2Subscribers[type].erase(key); delete r; return false; } WatchKeys2Subscribers[type][key].mRegex = r; } return res; } /*----------------------------------------------------------------------------*/ bool XrdMqSharedObjectChangeNotifier::StopNotifyKey(Subscriber* subscriber, const std::string& key, XrdMqSharedObjectChangeNotifier::notification_t type) { XrdSysMutexHelper lock(WatchMutex); _NotifierMapUpdate(WatchKeys2Subscribers[type], key, subscriber); return true; } /*----------------------------------------------------------------------------*/ bool XrdMqSharedObjectChangeNotifier::StopNotifyKeyRegex(Subscriber* subscriber, const std::string& key, XrdMqSharedObjectChangeNotifier::notification_t type) { XrdSysMutexHelper lock(WatchMutex); _NotifierMapUpdate(WatchKeys2Subscribers[type], key, subscriber); return true; } /*----------------------------------------------------------------------------*/ bool XrdMqSharedObjectChangeNotifier::StartNotifySubject(Subscriber* subscriber, const std::string& subject, XrdMqSharedObjectChangeNotifier::notification_t type) { XrdSysMutexHelper lock(WatchMutex); return (WatchSubjects2Subscribers[type][subject].mSubscribers.insert( subscriber)).second; } /*----------------------------------------------------------------------------*/ bool XrdMqSharedObjectChangeNotifier::StartNotifySubjectRegex(Subscriber* subscriber, const std::string& subject, XrdMqSharedObjectChangeNotifier::notification_t type) { XrdSysMutexHelper lock(WatchMutex); bool res = (WatchSubjects2Subscribers[type][subject].mSubscribers.insert( subscriber)).second; if (WatchSubjects2Subscribers[type][subject].mRegex) { regex_t* r = new regex_t; if (regcomp(r, subject.c_str(), REG_NOSUB)) { WatchSubjects2Subscribers[type].erase(subject); delete r; return false; } WatchSubjects2Subscribers[type][subject].mRegex = r; } return res; } /*----------------------------------------------------------------------------*/ bool XrdMqSharedObjectChangeNotifier::StopNotifySubject(Subscriber* subscriber, const std::string& subject, XrdMqSharedObjectChangeNotifier::notification_t type) { XrdSysMutexHelper lock(WatchMutex); _NotifierMapUpdate(WatchSubjects2Subscribers[type], subject, subscriber); return true; } /*----------------------------------------------------------------------------*/ bool XrdMqSharedObjectChangeNotifier::StopNotifySubjectRegex(Subscriber* subscriber, const std::string& subject, XrdMqSharedObjectChangeNotifier::notification_t type) { XrdSysMutexHelper lock(WatchMutex); _NotifierMapUpdate(WatchSubjects2Subscribers[type], subject, subscriber); return true; } /*----------------------------------------------------------------------------*/ bool XrdMqSharedObjectChangeNotifier::StartNotifySubjectsAndKeys( Subscriber* subscriber, const std::set& subjects, const std::set& keys, XrdMqSharedObjectChangeNotifier::notification_t type) { eos::common::Logging& g_logging = eos::common::Logging::GetInstance(); if (g_logging.gLogMask & LOG_MASK(LOG_DEBUG)) { size_t bufsize = 0; for (auto it = subjects.begin(); it != subjects.end(); ++it) { bufsize += (it->size() + 1); } for (auto it = keys.begin(); it != keys.end(); ++it) { bufsize += (it->size() + 1); } bufsize += 64; int sz; char* buffer = new char[bufsize]; char* buf = buffer; sz = snprintf(buf, bufsize, "starting notification for subjects [ "); buf += sz; bufsize -= sz; for (auto it = subjects.begin(); it != subjects.end(); ++it) { sz = snprintf(buf, bufsize, "%s ", it->c_str()); buf += sz; bufsize -= sz; } sz = snprintf(buf, bufsize, "] times keys [ "); buf += sz; bufsize -= sz; for (auto it = keys.begin(); it != keys.end(); ++it) { sz = snprintf(buf, bufsize, "%s ", it->c_str()); buf += sz; bufsize -= sz; } sz = snprintf(buf, bufsize, "]"); eos_static_debug("%s", buffer); delete[] buffer; } bool insertIntoExisiting = false; XrdSysMutexHelper lock(WatchMutex); for (auto it = WatchSubjectsXKeys2Subscribers[type].begin(); it != WatchSubjectsXKeys2Subscribers[type].end(); ++it) { // { // size_t bufsize=0; // for(auto it2 = it->first.first.begin(); it2 != it->first.first.end(); ++it2) // bufsize+=(it2->size()+1); // for(auto it2 = it->first.second.begin(); it2 != it->first.second.end(); ++it2) // bufsize+=(it2->size()+1); // bufsize += 64; // int sz; // // char *buffer = new char[bufsize]; // char *buf=buffer; // sz = snprintf(buf,bufsize,"WatchSubjectsXKeys2Subscribers item : subjects [ "); // buf += sz; // bufsize -=sz; // // for(auto it2 = it->first.first.begin(); it2 != it->first.first.end(); ++it2) { // sz = snprintf(buf,bufsize,"%s ",it2->c_str()); // buf += sz; // bufsize -=sz; // } // // sz = snprintf(buf,bufsize,"] times keys [ "); // buf += sz; // bufsize -=sz; // // for(auto it2 = it->first.second.begin(); it2 != it->first.second.end(); ++it2) { // sz = snprintf(buf,bufsize,"%s ",it2->c_str()); // buf += sz; // bufsize -=sz; // } // sz = snprintf(buf,bufsize,"]"); // // eos_static_info("%s", buffer); // delete[] buffer; // } if (subjects == it->first.first) { if (it->second.size() == 1 && it->second.count(subscriber)) { // only one subscriber and it's the same, factor size_t sizeBefore = it->first.second.size(); it->first.second.insert(keys.begin(), keys.end()); if (sizeBefore == it->first.second.size()) { return false; // nothing to insert } else { insertIntoExisiting = true; break; } } else if (keys == it->first.second && it->second.count(subscriber) == 0) { it->second.insert( subscriber); // same SubjectXKey without this subscriber -> we insert it break; } } else if (keys == it->first.second) { if (it->second.size() == 1 && it->second.count(subscriber)) { // only one subscriber and it's the same, factor size_t sizeBefore = it->first.first.size(); it->first.first.insert(subjects.begin(), subjects.end()); if (sizeBefore == it->first.first.size()) { return false; // nothing to insert } else { insertIntoExisiting = true; break; } } else if (subjects == it->first.first && it->second.count(subscriber) == 0) { it->second.insert( subscriber); // same SubjectXKey without this subscriber -> we insert it break; } } } if (!insertIntoExisiting) { std::set s; s.insert(subscriber); WatchSubjectsXKeys2Subscribers[type].push_back(make_pair(make_pair(subjects, keys), s)); } return true; } /*----------------------------------------------------------------------------*/ bool XrdMqSharedObjectChangeNotifier::StopNotifySubjectsAndKeys( Subscriber* subscriber, const std::set& subjects, const std::set& keys, XrdMqSharedObjectChangeNotifier::notification_t type) { eos::common::Logging& g_logging = eos::common::Logging::GetInstance(); if (g_logging.gLogMask & LOG_MASK(LOG_DEBUG)) { size_t bufsize = 0; for (auto it = subjects.begin(); it != subjects.end(); ++it) { bufsize += (it->size() + 1); } for (auto it = keys.begin(); it != keys.end(); ++it) { bufsize += (it->size() + 1); } bufsize += 64; int sz; char* buffer = new char[bufsize]; char* buf = buffer; sz = snprintf(buf, bufsize, "stopping notifications for subjects [ "); buf += sz; bufsize -= sz; for (auto it = subjects.begin(); it != subjects.end(); ++it) { sz = snprintf(buf, bufsize, "%s ", it->c_str()); buf += sz; bufsize -= sz; } sz = snprintf(buf, bufsize, "] times keys [ "); buf += sz; bufsize -= sz; for (auto it = keys.begin(); it != keys.end(); ++it) { sz = snprintf(buf, bufsize, "%s ", it->c_str()); buf += sz; bufsize -= sz; } sz = snprintf(buf, bufsize, "]"); eos_static_debug("%s", buffer); delete[] buffer; } bool removedAll = false; // secondly update the global vector XrdSysMutexHelper lock(WatchMutex); for (auto it = WatchSubjectsXKeys2Subscribers[type].begin(); it != WatchSubjectsXKeys2Subscribers[type].end(); ++it) { // { // size_t bufsize=0; // for(auto it2 = it->first.first.begin(); it2 != it->first.first.end(); ++it2) // bufsize+=(it2->size()+1); // for(auto it2 = it->first.second.begin(); it2 != it->first.second.end(); ++it2) // bufsize+=(it2->size()+1); // bufsize += 64; // int sz; // // char *buffer = new char[bufsize]; // char *buf=buffer; // sz = snprintf(buf,bufsize,"WatchSubjectsXKeys2Subscribers item : subjects [ "); // buf += sz; // bufsize -=sz; // // for(auto it2 = it->first.first.begin(); it2 != it->first.first.end(); ++it2) { // sz = snprintf(buf,bufsize,"%s ",it2->c_str()); // buf += sz; // bufsize -=sz; // } // // sz = snprintf(buf,bufsize,"] times keys [ "); // buf += sz; // bufsize -=sz; // // for(auto it2 = it->first.second.begin(); it2 != it->first.second.end(); ++it2) { // sz = snprintf(buf,bufsize,"%s ",it2->c_str()); // buf += sz; // bufsize -=sz; // } // sz = snprintf(buf,bufsize,"]"); // // eos_static_info("%s", buffer); // delete[] buffer; // } if ((it->first.first == subjects) && std::includes(it->first.second.begin(), it->first.second.end(), keys.begin(), keys.end())) { if (it->second.count(subscriber)) { // if the subscriber is there if (it->second.size() > 1) { // there's some other subscriber, split before update it->second.erase(subscriber); WatchSubjectsXKeys2Subscribers[type].push_back( std::make_pair(it->first, std::set (&subscriber, &subscriber + 1))); it = WatchSubjectsXKeys2Subscribers[type].end() - 1; } if (it->second.size() == 1) { // if the subscriber is the only guy there for (auto itk = keys.begin(); itk != keys.end(); ++itk) { it->first.second.erase(*itk); } if (it->first.second.empty()) { // if this entry is now empty, remove it WatchSubjectsXKeys2Subscribers[type].erase(it); } } removedAll = true; break; } } else if ((it->first.second == keys) && std::includes(it->first.first.begin(), it->first.first.end(), subjects.begin(), subjects.end())) { if (it->second.count(subscriber)) { // if the subscriber is there // eos_static_debug("1 element size is %d vector size is %d", // it->second.size(),WatchSubjectsXKeys2Subscribers[type].size()); if (it->second.size() > 1) { // there's some other subscriber, split before update it->second.erase(subscriber); WatchSubjectsXKeys2Subscribers[type].push_back( std::make_pair(it->first, std::set (&subscriber, &subscriber + 1))); it = WatchSubjectsXKeys2Subscribers[type].end() - 1; } // eos_static_debug("2 element size is %d vector size is %d", // it->second.size(), WatchSubjectsXKeys2Subscribers[type].size()); if (it->second.size() == 1) { // if the subscriber is the only guy there for (auto its = subjects.begin(); its != subjects.end(); ++its) { it->first.first.erase(*its); } // eos_static_debug("3 element size is %d vector size is %d", // it->second.size(),WatchSubjectsXKeys2Subscribers[type].size()); if (it->first.first.empty()) { // if this entry is now empty, remove it WatchSubjectsXKeys2Subscribers[type].erase(it); } } removedAll = true; break; } } } if (!removedAll) { return false; } return true; } /*----------------------------------------------------------------------------*/ bool XrdMqSharedObjectChangeNotifier::StartNotifyCurrentThread() { // to start notifying, we just copy the references to the watched words // from the thread local to the global map if (!tlSubscriber) { eos_static_err("the current thread is not bound to any subscriber"); return false; } if (tlSubscriber->Notify) { return false; } eos_static_info("Starting notification"); { XrdSysMutexHelper lock1(tlSubscriber->WatchMutex); { XrdSysMutexHelper lock2(WatchMutex); for (int type = 0; type < 5; type++) { for (auto it = tlSubscriber->WatchKeys[type].begin(); it != tlSubscriber->WatchKeys[type].end(); ++it) { WatchKeys2Subscribers[type][*it].mSubscribers.insert(tlSubscriber); } for (auto it = tlSubscriber->WatchSubjects[type].begin(); it != tlSubscriber->WatchSubjects[type].end(); ++it) { WatchSubjects2Subscribers[type][*it].mSubscribers.insert(tlSubscriber); } for (auto it = tlSubscriber->WatchKeysRegex[type].begin(); it != tlSubscriber->WatchKeysRegex[type].end(); ++it) { WatchKeys2Subscribers[type][*it].mSubscribers.insert(tlSubscriber); if (!WatchKeys2Subscribers[type][*it].mRegex) { regex_t* r = new regex_t; if (regcomp(r, it->c_str(), REG_NOSUB)) { WatchKeys2Subscribers[type].erase(*it); delete r; return false; } WatchKeys2Subscribers[type][*it].mRegex = r; } } for (auto it = tlSubscriber->WatchSubjectsRegex[type].begin(); it != tlSubscriber->WatchSubjectsRegex[type].end(); ++it) { WatchSubjects2Subscribers[type][*it].mSubscribers.insert(tlSubscriber); if (!WatchSubjects2Subscribers[type][*it].mRegex) { regex_t* r = new regex_t; if (regcomp(r, it->c_str(), REG_NOSUB)) { WatchSubjects2Subscribers[type].erase(*it); delete r; return false; } WatchSubjects2Subscribers[type][*it].mRegex = r; } } } } } for (int type = 0; type < 5; ++type) { for (auto it = tlSubscriber->WatchSubjectsXKeys[type].begin(); it != tlSubscriber->WatchSubjectsXKeys[type].end(); ++it) { StartNotifySubjectsAndKeys(tlSubscriber, it->first, it->second, static_cast(type)); } } tlSubscriber->Notify = true; return true; } /*----------------------------------------------------------------------------*/ bool XrdMqSharedObjectChangeNotifier::StopNotifyCurrentThread() { if (!tlSubscriber) { eos_static_err("the current thread is not bound to any subscriber"); return false; } // to stop notifying, we just remove the references to the watched words // from the thread local to the global map if (!tlSubscriber->Notify) { return false; } eos_static_info("Stopping notification"); { XrdSysMutexHelper lock1(tlSubscriber->WatchMutex); { XrdSysMutexHelper lock2(WatchMutex); for (int type = 0; type < 5; type++) { for (auto it = tlSubscriber->WatchKeys[type].begin(); it != tlSubscriber->WatchKeys[type].end(); ++it) { _NotifierMapUpdate(WatchKeys2Subscribers[type], *it, tlSubscriber); } for (auto it = tlSubscriber->WatchSubjects[type].begin(); it != tlSubscriber->WatchSubjects[type].end(); ++it) { _NotifierMapUpdate(WatchSubjects2Subscribers[type], *it, tlSubscriber); } for (auto it = tlSubscriber->WatchKeysRegex[type].begin(); it != tlSubscriber->WatchKeysRegex[type].end(); ++it) { _NotifierMapUpdate(WatchKeys2Subscribers[type], *it, tlSubscriber); } for (auto it = tlSubscriber->WatchSubjectsRegex[type].begin(); it != tlSubscriber->WatchSubjectsRegex[type].end(); ++it) { _NotifierMapUpdate(WatchSubjects2Subscribers[type], *it, tlSubscriber); } std::vector toRemove; for (auto it = WatchSubjectsXKeys2Subscribers[type].begin(); it != WatchSubjectsXKeys2Subscribers[type].end(); ++it) { auto entry = it->second.find(tlSubscriber); if (entry != it->second.end()) { // if the current threads is a subscriber of this entry it->second.erase(tlSubscriber); if (it->second.empty()) { // if the set of subscribers is empty, remove this entry toRemove.push_back(it); } } } } } } for (int type = 0; type < 5; type++) { for (auto it = tlSubscriber->WatchSubjectsXKeys[type].begin(); it != tlSubscriber->WatchSubjectsXKeys[type].end(); ++it) { StopNotifySubjectsAndKeys(tlSubscriber, it->first, it->second, static_cast(type)); } } tlSubscriber->Notify = false; return true; } /*----------------------------------------------------------------------------*/ void XrdMqSharedObjectChangeNotifier::SomListener(ThreadAssistant& assistant) noexcept { // thread listening on filesystem errors and configuration changes eos_static_info("%s", "mgm=\"starting SOM listener\""); while (!assistant.terminationRequested()) { SOM->SubjectsSem.Wait(); if (assistant.terminationRequested()) { eos_static_notice("%s", "msg=\"exiting SOM listener thread\""); break; } // we always take a lock to take something from the queue and then release it WatchMutex.Lock(); SOM->mSubjectsMutex.Lock(); std::set notifiedSubscribers; while (SOM->mNotificationSubjects.size()) { XrdMqSharedObjectManager::Notification event; event = SOM->mNotificationSubjects.front(); SOM->mNotificationSubjects.pop_front(); SOM->mSubjectsMutex.UnLock(); std::string newsubject = event.mSubject; //eos_info("msg=\"SOM Listener new notification\" subject=%s, type=%i", // event.mSubject.c_str(), event.mType); int type = static_cast(event.mType); std::set notifiedSubscribersForCurrentEvent; std::string key = newsubject; std::string queue = newsubject; size_t dpos = 0; if ((dpos = queue.find(";")) != std::string::npos) { key.erase(0, dpos + 1); queue.erase(dpos); } // these are useful only if type == 4 std::string newVal; bool newValAsserted = false; bool isNewVal = false; do { // Check if there is a matching key for (auto it = WatchKeys2Subscribers[type].begin(); it != WatchKeys2Subscribers[type].end(); ++it) { if (((it->second.mRegex == NULL) && (key == it->first)) || ((it->second.mRegex != NULL) && !regexec(it->second.mRegex, key.c_str(), 0, NULL, 0))) { if (type == 4) { if (!newValAsserted) { auto lvIt = LastValues.find(newsubject); SOM->HashMutex.LockRead(); XrdMqSharedHash* hash = SOM->GetObject(queue.c_str(), "hash"); SOM->HashMutex.UnLockRead(); if (hash) { newVal = hash->Get(key.c_str()); if (lvIt == LastValues.end() || lvIt->second != newVal) { isNewVal = true; } newValAsserted = true; } else { continue; } } if (isNewVal) { // eos_static_debug("notification on %s : new value %s IS a" // "strict change",newsubject.c_str(),newVal.c_str()); LastValues[newsubject] = newVal; } else { //eos_static_debug("notification on %s : new value %s IS NOT" //" a strict change",newsubject.c_str(),newVal.c_str()); continue; } } for (auto it2 = it->second.mSubscribers.begin(); it2 != it->second.mSubscribers.end(); ++it2) { if (notifiedSubscribersForCurrentEvent.count(*it2) == 0) { // Don't notify twice for the same event (*it2)->mSubjMtx.Lock(); (*it2)->NotificationSubjects.push_back(event); (*it2)->mSubjMtx.UnLock(); notifiedSubscribersForCurrentEvent.insert(*it2); notifiedSubscribers.insert(*it2); } } } } // Check if there is a matching subject for (auto it = WatchSubjects2Subscribers[type].begin(); it != WatchSubjects2Subscribers[type].end(); ++it) { if (((it->second.mRegex == NULL) && (queue == it->first)) || ((it->second.mRegex != NULL) && !regexec(it->second.mRegex, queue.c_str(), 0, NULL, 0))) { if (type == 4) { if (!newValAsserted) { auto lvIt = LastValues.find(newsubject); SOM->HashMutex.LockRead(); XrdMqSharedHash* hash = SOM->GetObject(queue.c_str(), "hash"); SOM->HashMutex.UnLockRead(); if (hash) { newVal = hash->Get(key.c_str()); if (lvIt == LastValues.end() || lvIt->second != newVal) { isNewVal = true; } newValAsserted = true; } } if (isNewVal) { // eos_static_debug("notification on %s : new value %s IS a " // "strict change",newsubject.c_str(),newVal.c_str()); LastValues[newsubject] = newVal; } else { //eos_static_debug("notification on %s : new value %s IS NOT " // "a strict change",newsubject.c_str(),newVal.c_str()); continue; } } for (auto it2 = it->second.mSubscribers.begin(); it2 != it->second.mSubscribers.end(); ++it2) { if (notifiedSubscribersForCurrentEvent.count(*it2) == 0) { // Don't notify twice for the same event (*it2)->mSubjMtx.Lock(); (*it2)->NotificationSubjects.push_back(event); (*it2)->mSubjMtx.UnLock(); notifiedSubscribersForCurrentEvent.insert(*it2); notifiedSubscribers.insert(*it2); } } } else { // eos_static_info("regex %s DID NOT MATCH %s",it->first.c_str(), // queue.c_str()); } } // Check if there is a matching subjectXkey for (auto it = WatchSubjectsXKeys2Subscribers[type].begin(); it != WatchSubjectsXKeys2Subscribers[type].end(); ++it) { if (it->first.first.count(queue) && it->first.second.count(key)) { if (type == 4) { if (!newValAsserted) { auto lvIt = LastValues.find(newsubject); SOM->HashMutex.LockRead(); XrdMqSharedHash* hash = SOM->GetObject(queue.c_str(), "hash"); SOM->HashMutex.UnLockRead(); if (hash) { newVal = hash->Get(key.c_str()); if (lvIt == LastValues.end() || lvIt->second != newVal) { isNewVal = true; } newValAsserted = true; } else { continue; } } if (isNewVal) { //if(key=="id") { // eos_static_warning("WARNING ID CHANGE in queue %s FROM %s " // "to %s",queue.c_str(),LastValues[newsubject].c_str(),newVal.c_str()); //} LastValues[newsubject] = newVal; } else { // eos_static_info("notification on %s : new value %s IS NOT " // "a strict change",newsubject.c_str(),newVal.c_str()); continue; } } for (auto it2 = it->second.begin(); it2 != it->second.end(); ++it2) { if (notifiedSubscribersForCurrentEvent.count(*it2) == 0) { // Don't notify twice for the same event (*it2)->mSubjMtx.Lock(); (*it2)->NotificationSubjects.push_back(event); (*it2)->mSubjMtx.UnLock(); notifiedSubscribersForCurrentEvent.insert(*it2); notifiedSubscribers.insert(*it2); } } } } if (type == 2) { // If it's a modification, check also the strict modifications to be notified type = 4; } else { break; } } while (true); SOM->mSubjectsMutex.Lock(); } // wake up all subscriber threads for (auto it = notifiedSubscribers.begin(); it != notifiedSubscribers.end(); ++it) { (*it)->mSubjSem.Post(); } SOM->mSubjectsMutex.UnLock(); WatchMutex.UnLock(); } } //------------------------------------------------------------------------------ // Start dispatching change thread //------------------------------------------------------------------------------ bool XrdMqSharedObjectChangeNotifier::Start() { try { mDispatchThread.reset(&XrdMqSharedObjectChangeNotifier::SomListener, this); } catch (const std::system_error& e) { eos_static_err("%s", "msg=\"failed to start SOM listener\""); return false; } return true; } //------------------------------------------------------------------------------ // Stop dispatcher thread //------------------------------------------------------------------------------ bool XrdMqSharedObjectChangeNotifier::Stop() { auto start = std::chrono::steady_clock::now(); auto stop_objnotifier = std::thread([&]() { mDispatchThread.join(); }); // We now need to signal to the SomListener thread to unblock it { if (SOM) { std::this_thread::sleep_for(std::chrono::milliseconds(500)); XrdSysMutexHelper lock(SOM->mSubjectsMutex); SOM->SubjectsSem.Post(); } } stop_objnotifier.join(); auto end = std::chrono::steady_clock::now(); auto duration = std::chrono::duration_cast (end - start); eos_static_notice("msg=\"SOM listener shutdown duration: %llu millisec", duration.count()); return true; } //------------------------------------------------------------------------------ // * * * Class XrdMqSharedObjectManager * * * //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ // Constructor //------------------------------------------------------------------------------ XrdMqSharedObjectManager::XrdMqSharedObjectManager(): mEnableQueue(false), mDumperFile("") { AutoReplyQueue = ""; AutoReplyQueueDerive = false; IsMuxTransaction = false; { XrdSysMutexHelper mLock(MuxTransactionsMutex); MuxTransactions.clear(); } } //------------------------------------------------------------------------------ // Destructor //------------------------------------------------------------------------------ XrdMqSharedObjectManager::~XrdMqSharedObjectManager() { mDumperTid.join(); for (auto it = mHashSubjects.begin(); it != mHashSubjects.end(); ++it) { delete it->second; } } //---------------------------------------------------------------------------- // Create requested shared object type //---------------------------------------------------------------------------- bool XrdMqSharedObjectManager::CreateSharedObject(const char* subject, const char* bcast_queue, const char* type, XrdMqSharedObjectManager* som) { std::string stype = type; if (stype == "hash") { return CreateSharedHash(subject, bcast_queue, som ? som : this); } if (stype == "queue") { return CreateSharedQueue(subject, bcast_queue, som ? som : this); } return false; } //------------------------------------------------------------------------------ // Create shared hash object //------------------------------------------------------------------------------ bool XrdMqSharedObjectManager::CreateSharedHash(const char* subject, const char* broadcastqueue, XrdMqSharedObjectManager* som) { std::string ss = subject; Notification event(ss, XrdMqSharedObjectManager::kMqSubjectCreation); HashMutex.LockWrite(); if (mHashSubjects.count(ss) > 0) { mHashSubjects[ss]->SetBroadCastQueue(broadcastqueue); HashMutex.UnLockWrite(); return false; } else { XrdMqSharedHash* newhash = new XrdMqSharedHash(subject, broadcastqueue, som ? som : this); mHashSubjects.insert(std::pair (ss, newhash)); HashMutex.UnLockWrite(); if (mEnableQueue) { mSubjectsMutex.Lock(); mNotificationSubjects.push_back(event); mSubjectsMutex.UnLock(); SubjectsSem.Post(); } return true; } } //------------------------------------------------------------------------------ // Create shared queue object //------------------------------------------------------------------------------ bool XrdMqSharedObjectManager::CreateSharedQueue(const char* subject, const char* broadcastqueue, XrdMqSharedObjectManager* som) { std::string ss = subject; Notification event(ss, XrdMqSharedObjectManager::kMqSubjectCreation); HashMutex.LockWrite(); if (mQueueSubjects.count(ss) > 0) { HashMutex.UnLockWrite(); return false; } else { mQueueSubjects.emplace(ss, XrdMqSharedQueue(subject, broadcastqueue, som ? som : this)); HashMutex.UnLockWrite(); if (mEnableQueue) { mSubjectsMutex.Lock(); mNotificationSubjects.push_back(event); mSubjectsMutex.UnLock(); SubjectsSem.Post(); } return true; } } //------------------------------------------------------------------------------ // Delete shared object type //------------------------------------------------------------------------------ bool XrdMqSharedObjectManager::DeleteSharedObject(const char* subject, const char* type, bool broadcast) { std::string stype = type; if (stype == "hash") { return DeleteSharedHash(subject, broadcast); } if (stype == "queue") { return DeleteSharedQueue(subject, broadcast); } return false; } //------------------------------------------------------------------------------ // Delete shared hash object //------------------------------------------------------------------------------ bool XrdMqSharedObjectManager::DeleteSharedHash(const char* subject, bool broadcast) { std::string ss = subject; Notification event(ss, XrdMqSharedObjectManager::kMqSubjectDeletion); HashMutex.LockWrite(); if ((mHashSubjects.count(ss) > 0)) { if (mBroadcast && broadcast) { XrdOucString txmessage = ""; mHashSubjects[ss]->MakeRemoveEnvHeader(txmessage); XrdMqMessage message("XrdMqSharedHashMessage"); message.SetBody(txmessage.c_str()); message.MarkAsMonitor(); XrdMqMessaging::gMessageClient.SendMessage(message, 0, false, false, true); } delete(mHashSubjects[ss]); mHashSubjects.erase(ss); HashMutex.UnLockWrite(); if (mEnableQueue) { mSubjectsMutex.Lock(); mNotificationSubjects.push_back(event); mSubjectsMutex.UnLock(); SubjectsSem.Post(); } return true; } else { HashMutex.UnLockWrite(); return true; } } //------------------------------------------------------------------------------ // Delete shared queue //------------------------------------------------------------------------------ bool XrdMqSharedObjectManager::DeleteSharedQueue(const char* subject, bool broadcast) { std::string ss = subject; Notification event(ss, XrdMqSharedObjectManager::kMqSubjectDeletion); HashMutex.LockWrite(); if ((mQueueSubjects.count(ss) > 0)) { if (mBroadcast && broadcast) { XrdOucString txmessage = ""; mQueueSubjects[ss].MakeRemoveEnvHeader(txmessage); XrdMqMessage message("XrdMqSharedHashMessage"); message.SetBody(txmessage.c_str()); message.MarkAsMonitor(); XrdMqMessaging::gMessageClient.SendMessage(message, 0, false, false, true); } mQueueSubjects.erase(ss); HashMutex.UnLockWrite(); if (mEnableQueue) { mSubjectsMutex.Lock(); mNotificationSubjects.push_back(event); mSubjectsMutex.UnLock(); SubjectsSem.Post(); } return true; } else { HashMutex.UnLockWrite(); return true; } } //------------------------------------------------------------------------------ // Get pointer to shared object type //------------------------------------------------------------------------------ XrdMqSharedHash* XrdMqSharedObjectManager::GetObject(const char* subject, const char* type) { std::string stype = type; if (stype == "hash") { return GetHash(subject); } if (stype == "queue") { return GetQueue(subject); } return 0; } //------------------------------------------------------------------------------ // Get shared hash object //------------------------------------------------------------------------------ XrdMqSharedHash* XrdMqSharedObjectManager::GetHash(const char* subject) { std::string ssubject = subject; if (mHashSubjects.count(ssubject)) { return mHashSubjects[ssubject]; } else { return 0; } } //------------------------------------------------------------------------------ // Get shared queue object //------------------------------------------------------------------------------ XrdMqSharedQueue* XrdMqSharedObjectManager::GetQueue(const char* subject) { std::string ssubject = subject; if (mQueueSubjects.count(ssubject)) { return &mQueueSubjects[ssubject]; } else { return 0; } } //------------------------------------------------------------------------------ // Dump contents of all shared objects to the output string //------------------------------------------------------------------------------ void XrdMqSharedObjectManager::DumpSharedObjects(XrdOucString& out) { out = ""; RWMutexReadLock lock(HashMutex); for (auto it = mHashSubjects.begin(); it != mHashSubjects.end(); ++it) { std::unique_lock lock(it->second->mMutex); out += "===================================================\n"; out += it->first.c_str(); out += " [ hash=> "; out += it->second->GetBroadCastQueue(); out += " ]\n"; out += "---------------------------------------------------\n"; it->second->Dump(out); } for (auto it = mQueueSubjects.begin(); it != mQueueSubjects.end(); ++it) { out += "===================================================\n"; out += it->first.c_str(); out += " [ queue=> "; out += it->second.GetBroadCastQueue(); out += " ]\n"; out += "---------------------------------------------------\n"; it->second.Dump(out); } } //------------------------------------------------------------------------------ // //------------------------------------------------------------------------------ void XrdMqSharedObjectManager::StartDumper(const char* file) { mDumperFile = file; try { mDumperTid.reset(&XrdMqSharedObjectManager::FileDumper, this); } catch (const std::system_error& e) { fprintf(stderr, "XrdMqSharedObjectManager::StartDumper=> failed to run " "dumper thread\n"); } } //------------------------------------------------------------------------------ // //------------------------------------------------------------------------------ void XrdMqSharedObjectManager::FileDumper(ThreadAssistant& assistant) noexcept { while (!assistant.terminationRequested()) { XrdOucString s; DumpSharedObjects(s); std::string df = mDumperFile; df += ".tmp"; FILE* f = fopen(df.c_str(), "w+"); if (f) { fprintf(f, "%s\n", s.c_str()); fclose(f); } if (chmod(mDumperFile.c_str(), S_IRWXU | S_IRGRP | S_IROTH)) { fprintf(stderr, "XrdMqSharedObjectManager::FileDumper=> unable to set " "755 permissions on file %s\n", mDumperFile.c_str()); } if (rename(df.c_str(), mDumperFile.c_str())) { fprintf(stderr, "XrdMqSharedObjectManager::FileDumper=> unable to write " "dumper file %s\n", mDumperFile.c_str()); } assistant.wait_for(std::chrono::seconds(60)); } } //------------------------------------------------------------------------------ // //------------------------------------------------------------------------------ bool XrdMqSharedObjectManager::ParseEnvMessage(XrdMqMessage* message, XrdOucString& error) { error = ""; std::string subject = ""; std::string reply = ""; std::string type = ""; if (!message) { error = "no message provided"; return false; } XrdOucEnv env(message->GetBody()); int envlen; if (sDebug) { char* senv = env.Env(envlen); fprintf(stderr, "XrdMqSharedObjectManager::ParseEnvMessage=> size=%d text=%s\n", envlen, senv); } if (env.Get(XRDMQSHAREDHASH_SUBJECT)) { subject = env.Get(XRDMQSHAREDHASH_SUBJECT); } else { error = "no subject in message body"; return false; } if (env.Get(XRDMQSHAREDHASH_REPLY)) { reply = env.Get(XRDMQSHAREDHASH_REPLY); } else { reply = ""; } if (env.Get(XRDMQSHAREDHASH_TYPE)) { type = env.Get(XRDMQSHAREDHASH_TYPE); } else { error = "no hash type in message body"; return false; } if (env.Get(XRDMQSHAREDHASH_CMD)) { HashMutex.LockRead(); XrdMqSharedHash* sh = 0; std::vector subjectlist; int wpos = 0; // support 'wild card' broadcasts with /* if ((wpos = subject.find("/*")) != STR_NPOS) { XrdOucString wmatch = subject.c_str(); wmatch.erase(wpos); for (auto it = mHashSubjects.begin(); it != mHashSubjects.end(); ++it) { XrdOucString hs = it->first.c_str(); if (hs.beginswith(wmatch)) { subjectlist.push_back(hs.c_str()); } } for (auto it = mQueueSubjects.begin(); it != mQueueSubjects.end(); ++it) { XrdOucString hs = it->first.c_str(); if (hs.beginswith(wmatch)) { subjectlist.push_back(hs.c_str()); } } } else { // support 'wild card' broadcasts with */ if ((subject.find("*/")) == 0) { XrdOucString wmatch = subject.c_str(); wmatch.erase(0, 2); for (auto it = mHashSubjects.begin(); it != mHashSubjects.end(); ++it) { XrdOucString hs = it->first.c_str(); if (hs.endswith(wmatch)) { subjectlist.push_back(hs.c_str()); } } for (auto it = mQueueSubjects.begin(); it != mQueueSubjects.end(); ++it) { XrdOucString hs = it->first.c_str(); if (hs.endswith(wmatch)) { subjectlist.push_back(hs.c_str()); } } } else { std::string delimiter = "%"; // we support also multiplexed subject updates and split the list eos::common::StringConversion::Tokenize(subject, subjectlist, delimiter); } } XrdOucString ftag = XRDMQSHAREDHASH_CMD; ftag += "="; ftag += env.Get(XRDMQSHAREDHASH_CMD); if (subjectlist.size() > 0) { sh = GetObject(subjectlist[0].c_str(), type.c_str()); } if ((ftag == XRDMQSHAREDHASH_BCREQUEST) || (ftag == XRDMQSHAREDHASH_DELETE) || (ftag == XRDMQSHAREDHASH_REMOVE)) { // if we don't know the subject, we don't create it with a BCREQUEST if ((ftag == XRDMQSHAREDHASH_BCREQUEST) && (reply == "")) { HashMutex.UnLockRead(); error = "bcrequest: no reply address present"; return false; } if (!sh) { if (ftag == XRDMQSHAREDHASH_BCREQUEST) { error = "bcrequest: don't know this subject "; if (!subjectlist.empty()) { error += subjectlist[0].c_str(); } } if (ftag == XRDMQSHAREDHASH_DELETE) { error = "delete: don't know this subject "; if (!subjectlist.empty()) { error += subjectlist[0].c_str(); } } if (ftag == XRDMQSHAREDHASH_REMOVE) { error = "remove: don't know this subject "; if (!subjectlist.empty()) { error += subjectlist[0].c_str(); } } HashMutex.UnLockRead(); return false; } else { HashMutex.UnLockRead(); } } else { // automatically create the subject, if it does not exist if (!sh) { HashMutex.UnLockRead(); if (AutoReplyQueueDerive) { AutoReplyQueue = subject.c_str(); int pos = 0; for (int i = 0; i < 4; i++) { pos = subject.find("/", pos); if (i < 3) { if (pos == STR_NPOS) { AutoReplyQueue = ""; error = "cannot derive the reply queue from "; error += subject.c_str(); return false; } else { pos++; } } else { AutoReplyQueue.erase(pos); } } } // create the list of subjects for (size_t i = 0; i < subjectlist.size(); i++) { if (!CreateSharedObject(subjectlist[i].c_str(), AutoReplyQueue.c_str(), type.c_str())) { error = "cannot create shared object for "; error += subject.c_str(); error += " and type "; error += type.c_str(); eos_err("%s", error.c_str()); return false; } } { RWMutexReadLock lock(HashMutex); sh = GetObject(subject.c_str(), type.c_str()); } } else { HashMutex.UnLockRead(); } } { RWMutexReadLock lock(HashMutex); // from here on we have a read lock on 'sh' if ((ftag == XRDMQSHAREDHASH_UPDATE) || (ftag == XRDMQSHAREDHASH_BCREPLY)) { std::string val = (env.Get(XRDMQSHAREDHASH_PAIRS) ? env.Get( XRDMQSHAREDHASH_PAIRS) : ""); if (val.length() == 0) { error = "no pairs in message body"; return false; } if ((ftag == XRDMQSHAREDHASH_BCREPLY) && sh) { // Don't broadcast this one ... is a broadcast reply sh->Clear(false); } std::string key; std::string value; std::string cid; std::vector keystart; std::vector valuestart; std::vector cidstart; for (unsigned int i = 0; i < val.length(); i++) { if (val.c_str()[i] == '|') { keystart.push_back(i); } if (val.c_str()[i] == '~') { valuestart.push_back(i); } if (val.c_str()[i] == '%') { cidstart.push_back(i); } } if (keystart.size() != valuestart.size()) { error = "update: parsing error in pairs tag"; return false; } if (keystart.size() != cidstart.size()) { error = "update: parsing error in pairs tag"; return false; } int parseindex = 0; for (size_t s = 0; s < subjectlist.size(); s++) { sh = GetObject(subjectlist[s].c_str(), type.c_str()); if (!sh) { error = "update: subject "; error += subjectlist[s].c_str(); error += " does not exist"; return false; } std::string sstr; for (unsigned int i = parseindex; i < keystart.size(); i++) { key.assign(val, keystart[i] + 1, valuestart[i] - 1 - (keystart[i])); value.assign(val, valuestart[i] + 1, cidstart[i] - 1 - (valuestart[i])); if (i == (keystart.size() - 1)) { cid.assign(val, cidstart[i] + 1, val.length() - cidstart[i] - 1); } else { cid.assign(val, cidstart[i] + 1, keystart[i + 1] - 1 - (cidstart[i])); } // eos_info("got bcreply subject=%s, key=%s, val=%s obj_ptr=%p", // subject.c_str(), key.c_str(), value.c_str(), (void *)sh); if (subjectlist.size() > 1) { // This is a multiplexed update, where we have to remove the // subject from the key if there is a match with the current subject // MUX transactions have the ## as key prefix XrdOucString skey = "#"; skey += (int) s; skey += "#"; if (!key.compare(0, skey.length(), skey.c_str())) { // this is the right key for the subject we are dealing with key.erase(0, skey.length()); } else { parseindex = i; break; } } else { // This can be the case for a single multiplexed message, so // we have also to remove the prefix in that case XrdOucString skey = "#"; skey += (int) s; skey += "#"; if (!key.compare(0, skey.length(), skey.c_str())) { // this is the right key for the subject we are dealing with key.erase(0, skey.length()); } } // Set entry without broadcast sh->Set(key.c_str(), value.c_str(), false); } } return true; } if (ftag == XRDMQSHAREDHASH_BCREQUEST) { bool success = true; for (unsigned int l = 0; l < subjectlist.size(); l++) { // try 'queue' and 'hash' to have wildcard broadcasts for both sh = GetObject(subjectlist[l].c_str(), "queue"); if (!sh) { sh = GetObject(subjectlist[l].c_str(), "hash"); } if (sh) { success *= sh->BroadCastEnvString(reply.c_str()); } } return success; } if (ftag == XRDMQSHAREDHASH_DELETE) { std::string val = (env.Get(XRDMQSHAREDHASH_KEYS) ? env.Get( XRDMQSHAREDHASH_KEYS) : ""); if (val.length() <= 1) { error = "no keys in message body : "; error += env.Env(envlen); return false; } std::string key; std::vector keystart; for (unsigned int i = 0; i < val.length(); i++) { if (val.c_str()[i] == '|') { keystart.push_back(i); } } std::string sstr; for (unsigned int i = 0; i < keystart.size(); i++) { if (i < (keystart.size() - 1)) { sstr = val.substr(keystart[i] + 1, keystart[i + 1] - 1 - (keystart[i])); } else { sstr = val.substr(keystart[i] + 1); } key = sstr; // message->Print(); // fprintf(stderr,"XrdMqSharedObjectManager::ParseEnvMessage=>Deleting" // " [%s] %s\n", subject.c_str(),key.c_str()); sh->Delete(key, false); } } } // end of read mutex on HashMutex if (ftag == XRDMQSHAREDHASH_REMOVE) { for (unsigned int l = 0; l < subjectlist.size(); l++) { if (!DeleteSharedObject(subjectlist[l].c_str(), type.c_str(), false)) { error = "cannot delete subject "; error += subjectlist[l].c_str(); return false; } } } return true; } error = "unknown message: "; error += message->GetBody(); return false; } //------------------------------------------------------------------------------ // //------------------------------------------------------------------------------ void XrdMqSharedObjectManager::Clear() { RWMutexReadLock lock(HashMutex); for (auto it = mHashSubjects.begin(); it != mHashSubjects.end(); ++it) { it->second->Clear(); } for (auto it = mQueueSubjects.begin(); it != mQueueSubjects.end(); ++it) { it->second.Clear(); } } //------------------------------------------------------------------------------ // //------------------------------------------------------------------------------ bool XrdMqSharedObjectManager::CloseMuxTransaction() { // Mux Transactions can only update values with the same broadcastqueue, // no deletions of subjects XrdSysMutexHelper mLock(MuxTransactionsMutex); if (MuxTransactions.size()) { XrdOucString txmessage = ""; MakeMuxUpdateEnvHeader(txmessage); AddMuxTransactionEnvString(txmessage); XrdMqMessage message("XrdMqSharedHashMessage"); message.SetBody(txmessage.c_str()); message.MarkAsMonitor(); XrdMqMessaging::gMessageClient.SendMessage(message, MuxTransactionBroadCastQueue.c_str(), false, false, true); } IsMuxTransaction = false; MuxTransactions.clear(); return true; } //------------------------------------------------------------------------------ // //------------------------------------------------------------------------------ void XrdMqSharedObjectManager::MakeMuxUpdateEnvHeader(XrdOucString& out) { std::string subjects = ""; for (auto it = MuxTransactions.begin(); it != MuxTransactions.end(); ++it) { subjects += it->first; subjects += "%"; } // remove trailing '%' if (subjects.length() > 0) { subjects.erase(subjects.length() - 1, 1); } out = XRDMQSHAREDHASH_UPDATE; out += "&"; out += XRDMQSHAREDHASH_SUBJECT; out += "="; out += subjects.c_str(); out += "&"; out += XRDMQSHAREDHASH_TYPE; out += "="; out += MuxTransactionType.c_str(); } //------------------------------------------------------------------------------ // //------------------------------------------------------------------------------ void XrdMqSharedObjectManager::AddMuxTransactionEnvString(XrdOucString& out) { // Encoding has the following format // "mysh.pairs=|~%|~%first.c_str(), MuxTransactionType.c_str()); if (hash) { RWMutexReadLock lock(*(hash->mStoreMutex)); // loop over variables for (auto it = it_subj->second.begin(); it != it_subj->second.end(); ++it) { if ((hash->mStore.count(it->c_str()))) { out += "|"; // the subject is a prefix to the key as ## out += "#"; out += sindex.c_str(); out += "#"; out += it->c_str(); out += "~"; out += hash->mStore[it->c_str()].GetValue(); out += "%"; char cid[1024]; snprintf(cid, sizeof(cid) - 1, "%llu", hash->mStore[it->c_str()].GetChangeId()); out += cid; } } } index++; } } //------------------------------------------------------------------------------- // //------------------------------------------------------------------------------- bool XrdMqSharedObjectManager::OpenMuxTransaction(const char* type, const char* broadcastqueue) { XrdSysMutexHelper lock(MuxTransactionsMutex); MuxTransactionType = type; if (MuxTransactionType != "hash") { return false; } if (!broadcastqueue) { if (AutoReplyQueue.length()) { MuxTransactionBroadCastQueue = AutoReplyQueue; } else { return false; } } else { MuxTransactionBroadCastQueue = broadcastqueue; } MuxTransactions.clear(); IsMuxTransaction = true; return true; }