// ---------------------------------------------------------------------- // File: FsConfigListener.cc // Author: Andreas-Joachim Peters - CERN // ---------------------------------------------------------------------- /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2011 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ // ----------------------------------------------------------------------- // This file is included source code in XrdMgmOfs.cc to make the code more // transparent without slowing down the compilation time. // ----------------------------------------------------------------------- //------------------------------------------------------------------------------ // Get key from MGM config queue //------------------------------------------------------------------------------ bool XrdMgmOfs::getMGMConfigValue(const std::string& key, std::string& value) { return eos::mq::SharedHashWrapper::makeGlobalMgmHash(mMessagingRealm.get()).get( key, value); } //------------------------------------------------------------------------------ // Process incoming MGM configuration change //------------------------------------------------------------------------------ void XrdMgmOfs::processIncomingMgmConfigurationChange(const std::string& key) { std::string tmpValue; if (!getMGMConfigValue(key, tmpValue)) { return; } XrdOucString err; XrdOucString value = tmpValue.c_str(); if (value.c_str()) { // Here we might get a change without the namespace, in this // case we add the global namespace if ((key.substr(0, 4) != "map:") && (key.substr(0, 3) != "fs:") && (key.substr(0, 6) != "quota:") && (key.substr(0, 4) != "vid:") && (key.substr(0, 7) != "policy:")) { XrdOucString skey = key.c_str(); eos_info("msg=\"apply access config\" key=\"%s\" val=\"%s\"", key.c_str(), value.c_str()); Access::ApplyAccessConfig(false); if (skey.beginswith("iostat:")) { gOFS->IoStats->ApplyIostatConfig(&FsView::gFsView); } } else { eos_info("msg=\"set config value\" key=\"%s\" val=\"%s\"", key.c_str(), value.c_str()); gOFS->ConfEngine->SetConfigValue(0, key.c_str(), value.c_str(), false); // For fs modification we need to lock for write the FsView::ViewMutex if (key.find("fs:") == 0) { std::string fs_key = key; fs_key.erase(0, 3); eos::common::RWMutexWriteLock wr_view_lock(FsView::gFsView.ViewMutex); // To avoid issues when applying config changes in the slave we need to // unregister the file system first and then apply the new configuration const bool first_unregister = true; FsView::gFsView.ApplyFsConfig(fs_key.c_str(), value.c_str(), first_unregister); } else if (key.find("quota:") == 0) { eos_info("%s", "msg=\"skip quota update as it might mess with " "the namespace, will reload once we become master\""); } else { gOFS->ConfEngine->ApplyEachConfig(key.c_str(), &value, &err); } } } } //------------------------------------------------------------------------------ // Process geotag change on the specified filesystem //------------------------------------------------------------------------------ void XrdMgmOfs::ProcessGeotagChange(const std::string& queue) { std::string newgeotag; eos::common::FileSystem::fsid_t fsid = 0; eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex); FileSystem* fs = FsView::gFsView.mIdView.lookupByQueuePath(queue); if (fs == nullptr) { return; } fsid = (eos::common::FileSystem::fsid_t) fs->GetLongLong("id"); newgeotag = fs->GetString("stat.geotag"); if (fsid == 0 && newgeotag.empty()) { return; } std::string oldgeotag = newgeotag; if (FsView::gFsView.mNodeView.count(fs->GetQueue())) { // Check if the change notification is an actual change in the geotag FsNode* node = FsView::gFsView.mNodeView[fs->GetQueue()]; static_cast(node)->getGeoTagInTree(fsid , oldgeotag); oldgeotag.erase(0, 8); // to get rid of the "::" prefix } if (oldgeotag != newgeotag) { eos_warning("msg=\"received geotag change\" fsid=%lu old_geotag=\"%s\" " "new_geotag=\"%s\"", (unsigned long)fsid, oldgeotag.c_str(), newgeotag.c_str()); // Release read lock and take write lock fs_rd_lock.Release(); eos::common::RWMutexWriteLock fs_rw_lock(FsView::gFsView.ViewMutex); eos::common::FileSystem::fs_snapshot_t snapshot; fs->SnapShotFileSystem(snapshot); // Update node view tree structure if (FsView::gFsView.mNodeView.count(snapshot.mQueue)) { FsNode* node = FsView::gFsView.mNodeView[snapshot.mQueue]; eos_debug("msg=\"update geotag of fsid=%lu in node=%s", (unsigned long)fsid, node->mName.c_str()); if (!static_cast(node)->erase(fsid)) { eos_err("msg=\"error removing fsid=%lu from node=%s\"", (unsigned long)fsid, node->mName.c_str()); } if (!static_cast(node)->insert(fsid)) { eos_err("msg=\"error inserting fsid=%lu into node=%s\"", (unsigned long)fsid, node->mName.c_str()); } } // Update group view tree structure if (FsView::gFsView.mGroupView.count(snapshot.mGroup)) { FsGroup* group = FsView::gFsView.mGroupView[snapshot.mGroup]; eos_debug("msg=\"updating geotag of fsid=%lu in group=%s\"", (unsigned long)fsid, group->mName.c_str()); if (!static_cast(group)->erase(fsid)) { eos_err("msg=\"error removing fsid=%lu from group=%s\"", (unsigned long)fsid, group->mName.c_str()); } if (!static_cast(group)->insert(fsid)) { eos_err("msg=\"error inserting fsid=%lu into group=%s\"", (unsigned long)fsid, group->mName.c_str()); } } // Update space view tree structure if (FsView::gFsView.mSpaceView.count(snapshot.mSpace)) { FsSpace* space = FsView::gFsView.mSpaceView[snapshot.mSpace]; eos_debug("msg=\"updating geotag of fsid=%lu in space=%s\"", (unsigned long)fsid, space->mName.c_str()); if (!static_cast(space)->erase(fsid)) { eos_err("msg=\"error removing fsid=%lu from space=%s\"", (unsigned long)fsid, space->mName.c_str()); } if (!static_cast(space)->insert(fsid)) { eos_err("msg=\"error inserting fsid=%lu into space=%s\"", (unsigned long)fsid, space->mName.c_str()); } } } } //------------------------------------------------------------------------------ // A thread monitoring for important key-value changes in filesystems //------------------------------------------------------------------------------ void XrdMgmOfs::FileSystemMonitorThread(ThreadAssistant& assistant) noexcept { std::shared_ptr fs_listener = mMessagingRealm->GetFsChangeListener("filesystem-listener-thread"); FsView::gFsView.AddFsChangeListener(fs_listener, {"stat.errc", "stat.geotag"}); if (!fs_listener->startListening()) { eos_static_crit("%s", "msg=\"unspecified problem when attempting to " "subscribe to filesystem key changes\""); } while (!assistant.terminationRequested()) { eos::mq::FsChangeListener::Event event; if (fs_listener->fetch(assistant, event) && !event.isDeletion()) { if (event.key == "stat.geotag") { ProcessGeotagChange(event.fileSystemQueue); } else { // This is a filesystem status error if (gOFS->mMaster->IsMaster()) { // only an MGM master needs to initiate draining eos::common::FileSystem::fsid_t fsid = 0; long long errc = 0; std::string configstatus = ""; std::string bootstatus = ""; eos::common::ConfigStatus cfgstatus = eos::common::ConfigStatus::kOff; eos::common::BootStatus bstatus = eos::common::BootStatus::kDown; // read the id from the hash and the current error value eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex); FileSystem* fs = FsView::gFsView.mIdView.lookupByQueuePath( event.fileSystemQueue); if (fs) { fsid = (eos::common::FileSystem::fsid_t) fs->GetLongLong("id"); errc = (int) fs->GetLongLong("stat.errc"); configstatus = fs->GetString("configstatus"); bootstatus = fs->GetString("stat.boot"); cfgstatus = eos::common::FileSystem::GetConfigStatusFromString( configstatus.c_str()); bstatus = eos::common::FileSystem::GetStatusFromString(bootstatus.c_str()); } if (fs && fsid && errc && (cfgstatus >= eos::common::ConfigStatus::kRO) && (bstatus == eos::common::BootStatus::kOpsError)) { // Case when we take action and explicitly ask to start a drain job fs->SetConfigStatus(eos::common::ConfigStatus::kDrain); bool status = gOFS->mFsScheduler->setDiskStatus(fs->getCoreParams().getSpace(), fsid, eos::common::ConfigStatus::kDrain); if (!status) { eos_static_err("msg=\"Failed to set Disk Status in FsScheduler for disk\" %llu", fsid); } } } } } } } /*----------------------------------------------------------------------------*/ /* * @brief file system listener agent starting drain jobs when receving opserror * and applying remote master configuration changes to the local configuration * object. * * This thread agent catches 'opserror' states on filesystems and executes the * drain job start routine on the referenced filesystem. If a filesystem * is removing the error code it also run's a stop drain job routine. * Additionally it applies changes in the MGM configuration which have been * broadcasted by a remote master MGM. */ /*----------------------------------------------------------------------------*/ void XrdMgmOfs::FsConfigListener(ThreadAssistant& assistant) noexcept { eos::mq::GlobalConfigChangeListener changeListener(mMessagingRealm.get(), "fs-config-listener-thread", MgmConfigQueue.c_str()); // Thread listening on filesystem errors and configuration changes while (!assistant.terminationRequested()) { eos::mq::GlobalConfigChangeListener::Event event; if (changeListener.fetch(assistant, event)) { if (!event.isDeletion() && !gOFS->mMaster->IsMaster()) { // This is an MGM configuration modification - only an MGM // slave needs to apply this. processIncomingMgmConfigurationChange(event.key); } else if (event.isDeletion()) { gOFS->ConfEngine->DeleteConfigValue(0, event.key.c_str(), false); gOFS->ConfEngine->ApplyKeyDeletion(event.key.c_str()); } } } }