// ---------------------------------------------------------------------- // File: Communicator.cc // Author: Andreas-Joachim Peters - CERN // ---------------------------------------------------------------------- /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2011 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "fst/storage/Storage.hh" #include "fst/XrdFstOfs.hh" #include "fst/Config.hh" #include "fst/storage/FileSystem.hh" #include "common/SymKeys.hh" #include "common/Assert.hh" #include "common/Constants.hh" #include "common/StringTokenizer.hh" #include "mq/SharedHashWrapper.hh" #include "qclient/structures/QScanner.hh" #include "qclient/shared/SharedHashSubscription.hh" EOSFSTNAMESPACE_BEGIN // Set of keys updates to be tracked at the node level std::set Storage::sNodeUpdateKeys { "stat.refresh_fs", "manager", "symkey", "publish.interval", "debug.level", "error.simulation" }; //------------------------------------------------------------------------------ // Get configuration value from global FST config //------------------------------------------------------------------------------ bool Storage::GetFstConfigValue(const std::string& key, std::string& value) const { common::SharedHashLocator locator = gConfig.getNodeHashLocator("getConfigValue", false); if (locator.empty()) { return false; } mq::SharedHashWrapper hash(gOFS.mMessagingRealm.get(), locator, true, false); return hash.get(key, value); } //------------------------------------------------------------------------------ // Get configuration value from global FST config //------------------------------------------------------------------------------ bool Storage::GetFstConfigValue(const std::string& key, unsigned long long& value) const { std::string strVal; if (!GetFstConfigValue(key, strVal)) { return false; } value = atoi(strVal.c_str()); return true; } //------------------------------------------------------------------------------ // Unregister file system given a queue path //------------------------------------------------------------------------------ void Storage::UnregisterFileSystem(const std::string& queuepath) { eos::common::RWMutexWriteLock fs_wr_lock(mFsMutex); auto it = std::find_if(mFsVect.begin(), mFsVect.end(), [&](FileSystem * fs) { return (fs->GetQueuePath() == queuepath); }); if (it == mFsVect.end()) { eos_static_warning("msg=\"file system is already removed\" qpath=%s", queuepath.c_str()); return; } auto fs = *it; mFsVect.erase(it); auto it_map = std::find_if(mFsMap.begin(), mFsMap.end(), [&](const auto & pair) { return (pair.second->GetQueuePath() == queuepath); }); if (it_map == mFsMap.end()) { eos_static_warning("msg=\"file system missing from map\" qpath=%s", queuepath.c_str()); } else { mFsMap.erase(it_map); } eos_static_info("msg=\"deleting file system\" qpath=%s", fs->GetQueuePath().c_str()); delete fs; } //------------------------------------------------------------------------------ // Register file system //------------------------------------------------------------------------------ Storage::FsRegisterStatus Storage::RegisterFileSystem(const std::string& queuepath) { eos::common::RWMutexWriteLock fs_wr_lock(mFsMutex); auto it = std::find_if(mFsVect.begin(), mFsVect.end(), [&](FileSystem * fs) { return (fs->GetQueuePath() == queuepath); }); if (it != mFsVect.end()) { eos_static_warning("msg=\"file system is already registered\" qpath=%s", queuepath.c_str()); return FsRegisterStatus::kNoAction; } common::FileSystemLocator locator; if (!common::FileSystemLocator::fromQueuePath(queuepath, locator)) { eos_static_crit("msg=\"failed to parse locator\" qpath=%s", queuepath.c_str()); return FsRegisterStatus::kNoAction; } fst::FileSystem* fs = new fst::FileSystem(locator, gOFS.mMessagingRealm.get()); fs->SetLocalId(); fs->SetLocalUuid(); mFsVect.push_back(fs); eos_static_info("msg=\"attempt file system registration\" qpath=\"%s\" " "fsid=%u uuid=\"%s\"", queuepath.c_str(), fs->GetLocalId(), fs->GetLocalUuid().c_str()); if ((fs->GetLocalId() == 0ul) || fs->GetLocalUuid().empty()) { eos_static_info("msg=\"partially register file system\" qpath=\"%s\"", queuepath.c_str()); return FsRegisterStatus::kPartial; } if (mFsMap.find(fs->GetLocalId()) != mFsMap.end()) { eos_static_crit("msg=\"trying to register an already existing file system\" " "fsid=%u uuid=\"%s\"", fs->GetLocalId(), fs->GetLocalUuid().c_str()); std::abort(); } mFsMap[fs->GetLocalId()] = fs; if (gOFS.mMessagingRealm->haveQDB()) { if (gConfig.autoBoot && (fs->GetConfigStatus() > eos::common::ConfigStatus::kOff)) { RunBootThread(fs, ""); } } return FsRegisterStatus::kRegistered; } //------------------------------------------------------------------------------ // Process incoming configuration change //------------------------------------------------------------------------------ void Storage::ProcessFstConfigChange(const std::string& key, const std::string& value) { static std::string last_refresh_ts = ""; eos_static_info("msg=\"FST node configuration change\" key=\"%s\" " "value=\"%s\"", key.c_str(), value.c_str()); // Refresh the list of FS'es registered from QDB shared hashes if (key == "stat.refresh_fs") { if (last_refresh_ts != value) { last_refresh_ts = value; SignalRegisterThread(); } return; } if (key == "manager") { XrdSysMutexHelper lock(gConfig.Mutex); gConfig.Manager = value.c_str(); return; } if (key == "symkey") { eos::common::gSymKeyStore.SetKey64(value.c_str(), 0); return; } if (key == "publish.interval") { eos_static_info("publish.interval=%s", value.c_str()); XrdSysMutexHelper lock(gConfig.Mutex); gConfig.PublishInterval = atoi(value.c_str()); return; } if (key == "debug.level") { std::string debuglevel = value; eos::common::Logging& g_logging = eos::common::Logging::GetInstance(); int debugval = g_logging.GetPriorityByString(debuglevel.c_str()); if (debugval < 0) { eos_static_err("msg=\"unknown debug level\" level=\"%s\"", debuglevel.c_str()); } else { // we set the shared hash debug for the lowest 'debug' level if (debuglevel == "debug") { gOFS.ObjectManager.SetDebug(true); } else { gOFS.ObjectManager.SetDebug(false); } g_logging.SetLogPriority(debugval); } return; } if (key == "error.simulation") { gOFS.SetSimulationError(value.c_str()); return; } } //------------------------------------------------------------------------------ // Process incoming filesystem-level configuration change //------------------------------------------------------------------------------ void Storage::ProcessFsConfigChange(fst::FileSystem* targetFs, const std::string& key, const std::string& value) { if ((key == "id") || (key == "uuid") || (key == "bootsenttime")) { RunBootThread(targetFs, key); } else { if ((key == eos::common::SCAN_IO_RATE_NAME) || (key == eos::common::SCAN_ENTRY_INTERVAL_NAME) || (key == eos::common::SCAN_RAIN_ENTRY_INTERVAL_NAME) || (key == eos::common::SCAN_DISK_INTERVAL_NAME) || (key == eos::common::SCAN_NS_INTERVAL_NAME) || (key == eos::common::SCAN_NS_RATE_NAME)) { try { long long val = std::stoll(value); if (val >= 0) { targetFs->ConfigScanner(&mFstLoad, key.c_str(), val); } } catch (...) { eos_static_err("msg=\"failed to convert value\" key=\"%s\" val=\"%s\"", key.c_str(), value.c_str()); } } } } //------------------------------------------------------------------------------ // Process incoming filesystem-level configuration change //------------------------------------------------------------------------------ void Storage::ProcessFsConfigChange(const std::string& queuepath, const std::string& key) { eos::common::RWMutexReadLock fs_rd_lock(mFsMutex); auto it = std::find_if(mFsMap.begin(), mFsMap.end(), [&](const auto & pair) { return (pair.second->GetQueuePath() == queuepath); }); if (it == mFsMap.end()) { // If file system does not exist in the map and this an "id" info then // it could be that we have a partially registered file system if ((key == "id") || (key == "uuid")) { // Switch to a write lock as we might add the new fs to the map fs_rd_lock.Release(); eos::common::RWMutexWriteLock fs_wr_lock(mFsMutex); auto itv = std::find_if(mFsVect.begin(), mFsVect.end(), [&](fst::FileSystem * fs) { return (fs->GetQueuePath() == queuepath); }); if (itv == mFsVect.end()) { eos_static_err("msg=\"no file system for id modification\" " "qpath=\"%s\" key=\"%s\"", queuepath.c_str(), key.c_str()); return; } fst::FileSystem* fs = *itv; fs->SetLocalId(); fs->SetLocalUuid(); eos_static_info("msg=\"attempt file system registration\" qpath=\"%s\" " "fsid=%u uuid=\"%s\"", queuepath.c_str(), fs->GetLocalId(), fs->GetLocalUuid().c_str()); if ((fs->GetLocalId() == 0ul) || fs->GetLocalUuid().empty()) { eos_static_info("msg=\"defer file system registration\" qpath=\"%s\"", queuepath.c_str()); return; } eos::common::FileSystem::fsid_t fsid = fs->GetLocalId(); it = mFsMap.emplace(fsid, fs).first; eos_static_info("msg=\"fully register file system\" qpath=%s fsid=%u " "uuid=\"%s\"", queuepath.c_str(), fs->GetLocalId(), fs->GetLocalUuid().c_str()); // Switch back to read lock and update the iterator fs_wr_lock.Release(); fs_rd_lock.Grab(mFsMutex); it = mFsMap.find(fsid); } else { eos_static_err("msg=\"no file system for modification\" qpath=\"%s\" " "key=\"%s\"", queuepath.c_str(), key.c_str()); return; } } eos_static_info("msg=\"process modification\" qpath=\"%s\" key=\"%s\"", queuepath.c_str(), key.c_str()); fst::FileSystem* fs = it->second; mq::SharedHashWrapper hash(gOFS.mMessagingRealm.get(), fs->getHashLocator()); std::string value; if (!hash.get(key, value)) { eos_static_err("msg=\"no such key in hash\" qpath=\"%s\" key=\"%s\"", queuepath.c_str(), key.c_str()); return; } hash.releaseLocks(); return ProcessFsConfigChange(fs, key, value); } //------------------------------------------------------------------------------ // Communicator //------------------------------------------------------------------------------ void Storage::Communicator(ThreadAssistant& assistant) noexcept { eos_static_info("%s", "msg=\"starting communicator thread\""); std::set watch_modification_keys { "id", "uuid", "bootsenttime", eos::common::SCAN_IO_RATE_NAME, eos::common::SCAN_ENTRY_INTERVAL_NAME, eos::common::SCAN_RAIN_ENTRY_INTERVAL_NAME, eos::common::SCAN_DISK_INTERVAL_NAME, eos::common::SCAN_NS_INTERVAL_NAME, eos::common::SCAN_NS_RATE_NAME, "symkey", "manager", "publish.interval", "debug.level", "error.simulation"}; bool ok = true; for (const auto& key : watch_modification_keys) { ok &= gOFS.ObjectNotifier.SubscribesToKey("communicator", key, XrdMqSharedObjectChangeNotifier::kMqSubjectModification); } std::string watch_regex = ".*"; ok &= gOFS.ObjectNotifier.SubscribesToSubjectRegex("communicator", watch_regex, XrdMqSharedObjectChangeNotifier::kMqSubjectCreation); ok &= gOFS.ObjectNotifier.SubscribesToSubjectRegex("communicator", watch_regex, XrdMqSharedObjectChangeNotifier::kMqSubjectDeletion); if (!ok) { eos_crit("%s", "msg=\"error subscribing to shared object change " "notifications\""); } gOFS.ObjectNotifier.BindCurrentThread("communicator"); if (!gOFS.ObjectNotifier.StartNotifyCurrentThread()) { eos_crit("%s", "msg=\"error starting shared object change notifier\""); } while (!assistant.terminationRequested()) { // Wait for new filesystem definitions gOFS.ObjectNotifier.tlSubscriber->mSubjSem.Wait(); do { if (assistant.terminationRequested()) { break; } XrdMqSharedObjectManager::Notification event; { // Take an event from the queue under lock XrdSysMutexHelper lock(gOFS.ObjectNotifier.tlSubscriber->mSubjMtx); if (gOFS.ObjectNotifier.tlSubscriber->NotificationSubjects.size() == 0) { break; } else { event = gOFS.ObjectNotifier.tlSubscriber->NotificationSubjects.front(); gOFS.ObjectNotifier.tlSubscriber->NotificationSubjects.pop_front(); } } eos_static_info("msg=\"shared object notification\" type=%i subject=\"%s\"", event.mType, event.mSubject.c_str()); XrdOucString queue = event.mSubject.c_str(); if (event.mType == XrdMqSharedObjectManager::kMqSubjectCreation) { // Skip fst wildcard queue creation if (queue == gConfig.FstQueueWildcard) { continue; } if (!queue.beginswith(gConfig.FstQueue)) { // ! endswith seems to be buggy if the comparable suffix is longer than the string ! if (queue.beginswith("/config/") && (queue.length() > gConfig.FstHostPort.length()) && queue.endswith(gConfig.FstHostPort)) { // This is the configuration entry and we should store it to have // access to it since its name depends on the instance name and // we don't know (yet) gConfig.setFstNodeConfigQueue(queue.c_str()); eos_static_info("msg=\"storing config queue name\" qpath=\"%s\"", queue.c_str()); } else { eos_static_info("msg=\"no action on subject creation\" qpath=\"%s\" " "own_id=\"%s\"", queue.c_str(), gConfig.FstQueue.c_str()); } continue; } (void) RegisterFileSystem(queue.c_str()); } else if (event.mType == XrdMqSharedObjectManager::kMqSubjectDeletion) { // Skip deletions that don't concern us if (queue.beginswith(gConfig.FstQueue) == false) { continue; } else { UnregisterFileSystem(event.mSubject); } } else if (event.mType == XrdMqSharedObjectManager::kMqSubjectModification) { // Handle subject modification, seperate from std::string key = queue.c_str(); int dpos = 0; if ((dpos = queue.find(";")) != STR_NPOS) { key.erase(0, dpos + 1); queue.erase(dpos); } if (queue == gConfig.getFstNodeConfigQueue("communicator", false)) { std::string value; if (GetFstConfigValue(key, value)) { ProcessFstConfigChange(key, value); } } else { ProcessFsConfigChange(queue.c_str(), key); } } } while (true); } } //------------------------------------------------------------------------------ // Extract filesystem path from QDB hash key - helper function //------------------------------------------------------------------------------ static std::string ExtractFsPath(const std::string& key) { std::vector parts = common::StringTokenizer::split>(key, '|'); return parts[parts.size() - 1]; } //------------------------------------------------------------------------------ // Update file system list given the QDB shared hash configuration -this // update is done in a separate thread handling the trigger event otherwise // we deadlock in the QClient code. //------------------------------------------------------------------------------ void Storage::UpdateRegisteredFs(ThreadAssistant& assistant) noexcept { while (!assistant.terminationRequested()) { { // Reduce scope of mutex to avoid coupling the the SignalRegisterThread // which is called from the QClient event loop with other QClient requests // like the QScanner listing below - this will lead to a deadlock!!! std::unique_lock lock(mMutexRegisterFs); mCvRegisterFs.wait(lock, [&] {return mTriggerRegisterFs;}); eos_static_info("%s", "msg=\"update registered file systems\""); mTriggerRegisterFs = false; } qclient::QScanner scanner(*gOFS.mMessagingRealm->getQSom()->getQClient(), SSTR("eos-hash||fs||" << gConfig.FstHostPort << "||*")); std::set new_filesystems; for (; scanner.valid(); scanner.next()) { std::string queuePath = SSTR("/eos/" << gConfig.FstHostPort << "/fst" << ExtractFsPath(scanner.getValue())); new_filesystems.insert(queuePath); } // Filesystems added? std::set partial_filesystems; for (auto it = new_filesystems.begin(); it != new_filesystems.end(); ++it) { if (mLastRoundFilesystems.find(*it) == mLastRoundFilesystems.end()) { if (RegisterFileSystem(*it) == FsRegisterStatus::kPartial) { partial_filesystems.insert(*it); } } } // Filesystems removed? for (auto it = mLastRoundFilesystems.begin(); it != mLastRoundFilesystems.end(); ++it) { if (new_filesystems.find(*it) == new_filesystems.end()) { UnregisterFileSystem(*it); } } if (!partial_filesystems.empty()) { // Reset register trigger flag and remove partial file systems so // that we properly register them in them next loop. { std::unique_lock lock(mMutexRegisterFs); mTriggerRegisterFs = true; } for (const auto& elem : partial_filesystems) { UnregisterFileSystem(elem); auto it_del = new_filesystems.find(elem); new_filesystems.erase(it_del); } eos_static_info("%s", "msg=\"re-trigger file system registration " "in 5 seconds\""); std::this_thread::sleep_for(std::chrono::seconds(5)); } mLastRoundFilesystems = std::move(new_filesystems); } } //------------------------------------------------------------------------------ // FST node update callback - this is triggered whenever the underlying // qclient::SharedHash corresponding to the node is modified. //------------------------------------------------------------------------------ void Storage::NodeUpdateCb(qclient::SharedHashUpdate&& upd) { if (sNodeUpdateKeys.find(upd.key) != sNodeUpdateKeys.end()) { ProcessFstConfigChange(upd.key, upd.value); } } //------------------------------------------------------------------------------ // QdbCommunicator //------------------------------------------------------------------------------ void Storage::QdbCommunicator(ThreadAssistant& assistant) noexcept { using namespace std::placeholders; if (!gOFS.mMessagingRealm->haveQDB()) { eos_static_info("%s", "msg=\"disable QDB communicator\""); return; } eos_static_info("%s", "msg=\"starting QDB communicator thread\""); // Process initial FST configuration ... discover instance name std::string instance_name; for (size_t i = 0; i < 10; i++) { if (gOFS.mMessagingRealm->getInstanceName(instance_name)) { break; } std::this_thread::sleep_for(std::chrono::seconds(1)); } if (instance_name.empty()) { eos_static_crit("%s", "msg=\"unable to obtain instance name from QDB\""); exit(1); } std::string cfg_queue = SSTR("/config/" << instance_name << "/node/" << gConfig.FstHostPort); gConfig.setFstNodeConfigQueue(cfg_queue); // Discover node-specific configuration mq::SharedHashWrapper node_hash(gOFS.mMessagingRealm.get(), gConfig.getNodeHashLocator(), false, false); // Discover MGM name std::string mgm_host; for (size_t i = 0; i < 10; i++) { if (node_hash.get("manager", mgm_host)) { break; } std::this_thread::sleep_for(std::chrono::seconds(5)); } if (mgm_host.empty()) { eos_static_crit("%s", "msg=\"unable to obtain manager info for node\""); exit(1); } ProcessFstConfigChange("manager", mgm_host); // Discover the rest of the FST node configuration options for (const auto& node_key : sNodeUpdateKeys) { std::string value; if (node_hash.get(node_key, value)) { ProcessFstConfigChange(node_key, value); } } // One-off collect all configured file systems for this node SignalRegisterThread(); // Attach callback for node configuration updates auto node_subscription = node_hash.subscribe(); node_subscription->attachCallback(std::bind(&Storage::NodeUpdateCb, this, _1)); // Broadcast FST node hearbeat while (!assistant.terminationRequested()) { node_hash.set(eos::common::FST_HEARTBEAT_KEY, std::to_string(time(0))); assistant.wait_for(std::chrono::seconds(1)); } } //------------------------------------------------------------------------------ // Signal the thread responsible with registered file systems //------------------------------------------------------------------------------ void Storage::SignalRegisterThread() { { std::unique_lock lock(mMutexRegisterFs); mTriggerRegisterFs = true; } mCvRegisterFs.notify_one(); } EOSFSTNAMESPACE_END