// ---------------------------------------------------------------------- // File: GeoTreeEngine.cc // Author: Geoffray Adde - CERN // ---------------------------------------------------------------------- /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2011 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #define HAVE_ATOMICS 1 #include "mgm/GeoTreeEngine.hh" #include "mgm/XrdMgmOfs.hh" #include "mgm/config/IConfigEngine.hh" #include "common/table_formatter/TableFormatterBase.hh" #include "common/FileSystem.hh" #include "common/IntervalStopwatch.hh" #include "common/Assert.hh" #include #include #include #include #include #include #include #include using namespace std; using namespace eos::common; using namespace eos::mgm; EOSMGMNAMESPACE_BEGIN // We assume that all the trees have the same max size, we should take the max // of all the sizes otherwise const size_t GeoTreeEngine::gGeoBufferSize = sizeof(FastPlacementTree) + FastPlacementTree::sGetMaxDataMemSize(); thread_local void* GeoTreeEngine::tlGeoBuffer = NULL; pthread_key_t GeoTreeEngine::gPthreadKey; const int GeoTreeEngine::sfgId = 1; const int GeoTreeEngine::sfgHost = 1 << 1; const int GeoTreeEngine::sfgGeotag = 1 << 2; const int GeoTreeEngine::sfgBoot = 1 << 3; const int GeoTreeEngine::sfgActive = 1 << 4; const int GeoTreeEngine::sfgConfigstatus = 1 << 5; const int GeoTreeEngine::sfgDrain = 1 << 6; const int GeoTreeEngine::sfgDrainer = 1 << 6; const int GeoTreeEngine::sfgBlkavailb = 1 << 8; const int GeoTreeEngine::sfgFsfilled = 1 << 9; const int GeoTreeEngine::sfgNomfilled = 1 << 10; const int GeoTreeEngine::sfgReadratemb = 1 << 12; const int GeoTreeEngine::sfgDiskload = 1 << 13; const int GeoTreeEngine::sfgEthmib = 1 << 14; const int GeoTreeEngine::sfgInratemib = 1 << 15; const int GeoTreeEngine::sfgOutratemib = 1 << 16; const int GeoTreeEngine::sfgErrc = 1 << 17; const int GeoTreeEngine::sfgPubTmStmp = 1 << 18; const int GeoTreeEngine::sfgWopen = 1 << 19; const int GeoTreeEngine::sfgRopen = 1 << 20; set GeoTreeEngine::gWatchedKeys; const map GeoTreeEngine::gNotifKey2EnumSched = { make_pair("id", sfgId), make_pair("host", sfgHost), make_pair("forcegeotag", sfgGeotag), make_pair("stat.geotag", sfgGeotag), make_pair("stat.boot", sfgBoot), make_pair("stat.active", sfgActive), make_pair("configstatus", sfgConfigstatus), make_pair("local.drain", sfgDrain), make_pair("stat.drainer", sfgDrainer), make_pair("stat.nominal.filled", sfgNomfilled), make_pair("stat.statfs.bavail", sfgBlkavailb), make_pair("stat.statfs.filled", sfgFsfilled), make_pair("stat.disk.readratemb", sfgReadratemb), make_pair("stat.disk.load", sfgDiskload), make_pair("stat.net.ethratemib", sfgEthmib), make_pair("stat.net.inratemib", sfgInratemib), make_pair("stat.net.outratemib", sfgOutratemib), make_pair("stat.errc", sfgErrc), make_pair("stat.publishtimestamp", sfgPubTmStmp), make_pair("stat.wopen", sfgWopen), make_pair("stat.ropen", sfgRopen), }; map GeoTreeEngine::gNotificationsBufferFs; map GeoTreeEngine::gNotificationsBufferProxy; sem_t GeoTreeEngine::gUpdaterPauseSem; bool GeoTreeEngine::gUpdaterPaused = false; bool GeoTreeEngine::gUpdaterStarted = false; const unsigned char GeoTreeEngine::sntFilesystem = 1, GeoTreeEngine::sntDataproxy = 4; std::map GeoTreeEngine::gQueue2NotifType; //------------------------------------------------------------------------------ // Get the maximum number of placement attempts //------------------------------------------------------------------------------ unsigned int GetMaxPlacementAttempts() { unsigned int attempt = 1u; const std::string env_name = "EOS_SCATTERED_PLACEMENT_MAX_ATTEMPTS"; const char* ptr = getenv(env_name.c_str()); if (ptr) { try { attempt = std::stoi(std::string(ptr)); } catch (...) { } } return attempt; } //------------------------------------------------------------------------------ // Constructor //------------------------------------------------------------------------------ GeoTreeEngine::GeoTreeEngine(mq::MessagingRealm* realm) : pSkipSaturatedAccess(true), pSkipSaturatedDrnAccess(true), pSkipSaturatedBlcAccess(true), pProxyCloseToFs(true), pPenaltyUpdateRate(1), pFillRatioLimit(80), pFillRatioCompTol(100), pSaturationThres(10), pTimeFrameDurationMs(1000), pPublishToPenaltyDelayMs(1000), pAccessGeotagMapping("accessgeotagmapping"), pAccessProxygroup("accessproxygroup"), pCircSize(30), pFrameCount(0), pPenaltySched(pCircSize), pLatencySched(pCircSize), mFsListener(nullptr) { mFsListener = realm->GetFsChangeListener("geotree-fs-listener"); // by default, disable all the placement operations for non geotagged fs addDisabledBranch("*", "plct", "nogeotag", NULL, false); addDisabledBranch("*", "accsdrain", "nogeotag", NULL, false); // set blocking mutexes for lower latencies pAddRmFsMutex.SetBlocking(true); configMutex.SetBlocking(true); pTreeMapMutex.SetBlocking(true); for (auto it = pPenaltySched.pCircFrCnt2FsPenalties.begin(); it != pPenaltySched.pCircFrCnt2FsPenalties.end(); it++) { it->reserve(100); } // create the thread local key to handle allocation/destruction of thread local geobuffers pthread_key_create(&gPthreadKey, GeoTreeEngine::tlFree); // initialize pauser semaphore if (sem_init(&gUpdaterPauseSem, 0, 1)) { throw "sem_init() failed"; } } bool GeoTreeEngine::forceRefreshSched() { // prevent any other use of the fast structures pAddRmFsMutex.LockWrite(); pTreeMapMutex.LockWrite(); // mark all fs needing a refresh for all the watched attributes // => SCHED for (auto it = pFsId2FsPtr.begin(); it != pFsId2FsPtr.end(); it++) { if (it->second) { gNotificationsBufferFs[it->second->GetQueuePath()] = (~0); } } for (auto it = pGroup2SchedTME.begin(); it != pGroup2SchedTME.end(); it++) { it->second->fastStructModified = true; it->second->slowTreeModified = true; } // mark all proxy needing a refresh for all the watched attributes // => PROXYGROUPS for (auto it = pPxyQueue2PxyId.begin(); it != pPxyQueue2PxyId.end(); it++) { gNotificationsBufferProxy[it->first] = (~0); } for (auto it = pPxyGrp2DpTME.begin(); it != pPxyGrp2DpTME.end(); it++) { it->second->fastStructModified = true; it->second->slowTreeModified = true; } // do the update pTreeMapMutex.UnLockWrite(); updateTreeInfo(gNotificationsBufferFs, gNotificationsBufferProxy); pAddRmFsMutex.UnLockWrite(); return true; } bool GeoTreeEngine::forceRefresh() { // signal a pause to the background updating PauseUpdater(); // do the refreshes bool result = forceRefreshSched(); // signal a resume to the background updating ResumeUpdater(); return result; } bool GeoTreeEngine::insertFsIntoGroup(FileSystem* fs, FsGroup* group, const common::FileSystemCoreParams& coreParams) { bool updateFastStruct = false; eos::common::RWMutexWriteLock lock(pAddRmFsMutex); FileSystem::fsid_t fsid = coreParams.getId(); SchedTME* mapEntry = 0; bool is_new_entry = false; { pTreeMapMutex.LockWrite(); // ==== check that fs is not already registered if (pFs2SchedTME.count(fsid)) { eos_err("error inserting fs %lu into group %s : fs is already part of a group", (unsigned long)fsid, group->mName.c_str()); pTreeMapMutex.UnLockWrite(); return false; } // ==== get the entry if (pGroup2SchedTME.count(group)) { mapEntry = pGroup2SchedTME[group]; } else { mapEntry = new SchedTME(group->mName.c_str()); is_new_entry = true; // Force update to be sure that the fast structures are properly created updateFastStruct = true; } mapEntry->slowTreeMutex.LockWrite(); pTreeMapMutex.UnLockWrite(); } // ==== fill the entry // create new TreeNodeInfo/TreeNodeState pair and update its data eos::common::FileSystem::fs_snapshot_t fsn; fs->SnapShotFileSystem(fsn, true); fsn.fillFromCoreParams(coreParams); // check if there is still some space for a new fs { size_t depth = 1; std::string sub("::"); for (size_t offset = fsn.mGeoTag.find(sub); offset != std::string::npos; offset = fsn.mGeoTag.find(sub, offset + sub.length())) { depth++; } if (depth + mapEntry->slowTree->getNodeCount() > SchedTreeBase::sGetMaxNodeCount() - 2) { mapEntry->slowTreeMutex.UnLockWrite(); eos_err("error inserting fs %lu into group %s : the group-tree is full", (unsigned long)fsid, group->mName.c_str()); if (is_new_entry) { delete mapEntry; } return false; } } SchedTreeBase::TreeNodeInfo info; info.geotag = fsn.mGeoTag; if (info.geotag.empty()) { char buffer[64]; snprintf(buffer, 64, "nogeotag"); info.geotag = buffer; } info.host = coreParams.getHost(); info.hostport = coreParams.getHostPort(); if (info.host.empty()) { uuid_t uuid; char buffer[64]; snprintf(buffer, 64, "nohost-"); uuid_generate_time(uuid); uuid_unparse(uuid, buffer + 7); info.host = buffer; } info.netSpeedClass = 1; // EthRateMiB not yet initialized at this point, // use placeholder value info.fsId = coreParams.getId(); if (!info.fsId) { mapEntry->slowTreeMutex.UnLockWrite(); eos_err("error inserting fs %lu into group %s : FsId is not set!", (unsigned long)fsid, group->mName.c_str()); if (is_new_entry) { delete mapEntry; } return false; } SchedTreeBase::TreeNodeStateFloat state; // try to insert the new node in the Slowtree SlowTreeNode* node = mapEntry->slowTree->insert(&info, &state); if (node == NULL) { mapEntry->slowTreeMutex.UnLockWrite(); eos_err("error inserting fs %lu into group %s : slow tree node insertion failed", (unsigned long)fsid, group->mName.c_str()); if (is_new_entry) { delete mapEntry; } return false; } // ==== update the penalties vectors if necessary if ((coreParams.getId() + 1) > pLatencySched.pFsId2LatencyStats.size()) { for (auto it = pPenaltySched.pCircFrCnt2FsPenalties.begin(); it != pPenaltySched.pCircFrCnt2FsPenalties.end(); it++) { it->resize(coreParams.getId() + 1); } pLatencySched.pFsId2LatencyStats.resize(coreParams.getId() + 1); } // ==== update the shared object notifications { if (gWatchedKeys.empty()) { for (auto it = gNotifKey2EnumSched.begin(); it != gNotifKey2EnumSched.end(); it++) { gWatchedKeys.insert(it->first); } } gQueue2NotifType[fs->GetQueuePath()] |= sntFilesystem; if (!fs->AttachFsListener(mFsListener, gWatchedKeys)) { eos_crit("error inserting fs %lu into group %s : error subscribing to " "shared object notifications", (unsigned long)fsid, group->mName.c_str()); gQueue2NotifType[fs->GetQueuePath()] &= ~sntFilesystem; if (gQueue2NotifType[fs->GetQueuePath()] == 0) { gQueue2NotifType.erase(fs->GetQueuePath()); } mapEntry->slowTreeMutex.UnLockWrite(); if (is_new_entry) { delete mapEntry; } return false; } } // update all the information about this new node if (!updateTreeInfo(mapEntry, &fsn, ~sfgGeotag & ~sfgId & ~sfgHost , 0, node)) { mapEntry->slowTreeMutex.UnLockWrite(); pTreeMapMutex.LockRead(); eos_err("error inserting fs %lu into group %s : slow tree node update failed", (unsigned long)fsid, group->mName.c_str()); pTreeMapMutex.UnLockRead(); if (is_new_entry) { delete mapEntry; } return false; } mapEntry->fs2SlowTreeNode[fsid] = node; mapEntry->slowTreeModified = true; mapEntry->group = group; // update the fast structures now if requested if (updateFastStruct) { if (!updateFastStructures(mapEntry)) { mapEntry->slowTreeMutex.UnLockWrite(); pTreeMapMutex.LockRead(); eos_err("error inserting fs %lu into group %s : fast structures update failed", fsid, group->mName.c_str(), pFs2SchedTME[fsid]->group->mName.c_str()); pTreeMapMutex.UnLockRead(); if (is_new_entry) { delete mapEntry; } return false; } else { mapEntry->slowTreeModified = false; } } // ==== update the entry in the map { pTreeMapMutex.LockWrite(); pGroup2SchedTME[group] = mapEntry; pFs2SchedTME[fsid] = mapEntry; pFsId2FsPtr[fsid] = fs; pTreeMapMutex.UnLockWrite(); mapEntry->slowTreeMutex.UnLockWrite(); } eos::common::Logging& g_logging = eos::common::Logging::GetInstance(); if (g_logging.gLogMask & LOG_MASK(LOG_DEBUG)) { stringstream ss; ss << (*mapEntry->slowTree); eos_debug("inserted fs %lu into group %s geotag is %s and fullgeotag is %s\n%s", (unsigned long)fsid, group->mName.c_str(), node->pNodeInfo.geotag.c_str(), node->pNodeInfo.fullGeotag.c_str(), ss.str().c_str()); } return true; } bool GeoTreeEngine::removeFsFromGroup(FileSystem* fs, FsGroup* group, bool updateFastStruct) { eos::common::RWMutexWriteLock lock(pAddRmFsMutex); SchedTME* mapEntry; FileSystem::fsid_t fsid = fs->GetId(); { pTreeMapMutex.LockWrite(); // ==== check that fs is registered if (!pFs2SchedTME.count(fsid)) { eos_err("error removing fs %lu from group %s : fs is not registered", (unsigned long)fsid, group->mName.c_str()); pTreeMapMutex.UnLockWrite(); return false; } mapEntry = pFs2SchedTME[fsid]; // ==== get the entry if (!pGroup2SchedTME.count(group)) { eos_err("error removing fs %lu from group %s : fs is not registered ", (unsigned long)fsid, group->mName.c_str()); pTreeMapMutex.UnLockWrite(); return false; } pTreeMapMutex.UnLockWrite(); mapEntry = pGroup2SchedTME[group]; mapEntry->slowTreeMutex.LockWrite(); } // ==== update the shared object notifications { if (!fs->DetachFsListener(mFsListener, gWatchedKeys)) { mapEntry->slowTreeMutex.UnLockWrite(); eos_crit("error removing fs %lu into group %s : error unsubscribing to " "shared object notifications", (unsigned long)fsid, group->mName.c_str()); return false; } gQueue2NotifType[fs->GetQueuePath()] &= ~sntFilesystem; if (gQueue2NotifType[fs->GetQueuePath()] == 0) { gQueue2NotifType.erase(fs->GetQueuePath()); } } // ==== discard updates about this fs // ==== clean the notifications buffer gNotificationsBufferFs.erase(fs->GetQueuePath()); // ==== update the entry const SlowTreeNode* intree = mapEntry->fs2SlowTreeNode[fsid]; // Double check that the SlowTreeNode exists EOS-4678 if (intree) { SchedTreeBase::TreeNodeInfo info = intree->pNodeInfo; info.geotag = intree->pNodeInfo.fullGeotag; eos_debug("msg=\"remove from SlowNodeTree\" fsid=%lu host=\"%s\" " "geotag=\"%s\" fullgeotag=\"%s\"", (unsigned long)intree->pNodeInfo.fsId, intree->pNodeInfo.host.c_str(), intree->pNodeInfo.geotag.c_str(), intree->pNodeInfo.fullGeotag.c_str()); // try to update the SlowTree info.fsId = 0; if (!mapEntry->slowTree->remove(&info)) { mapEntry->slowTreeMutex.UnLockWrite(); eos_err("error removing fs %lu from group %s : removing the slow tree node " "failed. geotag is %s and geotag in tree is %s and %s", (unsigned long)fsid, group->mName.c_str(), info.geotag.c_str(), intree->pNodeInfo.fullGeotag.c_str(), intree->pNodeInfo.geotag.c_str()); return false; } mapEntry->fs2SlowTreeNode.erase(fsid); } // if the tree is empty, remove the entry from the map if (!mapEntry->fs2SlowTreeNode.empty()) { // if the tree is getting empty, no need to update it mapEntry->slowTreeModified = true; } if (updateFastStruct && mapEntry->slowTreeModified) if (!updateFastStructures(mapEntry)) { mapEntry->slowTreeMutex.UnLockWrite(); pTreeMapMutex.LockRead(); eos_err("error removing fs %lu from group %s : fast structures update failed", fsid, group->mName.c_str(), pFs2SchedTME[fsid]->group->mName.c_str()); pTreeMapMutex.UnLockRead(); return false; } // ==== update the entry in the map if needed { pTreeMapMutex.LockWrite(); pFs2SchedTME.erase(fsid); pFsId2FsPtr.erase(fsid); if (mapEntry->fs2SlowTreeNode.empty()) { pGroup2SchedTME.erase(group); // prevent from access by other threads pPendingDeletionsFs.push_back(mapEntry); } mapEntry->slowTreeMutex.UnLockWrite(); pTreeMapMutex.UnLockWrite(); } return true; } uint64_t GeoTreeEngine::placementSpace(const std::string& space, const std::string& schedgroup) { RWMutexReadLock lock(pTreeMapMutex); uint64_t totalWritableSpace = 0; for (auto it = pGroup2SchedTME.begin(); it != pGroup2SchedTME.end(); it++) { std::string ispace; std::string index; eos::common::StringConversion::SplitKeyValue(it->second->group->mName, ispace, index, "."); if ((ispace == space) && ((schedgroup == "") || (schedgroup == it->second->group->mName))) { totalWritableSpace += it->second->foregroundFastStruct->placementTree->getTotalWritableSpace(); } } return totalWritableSpace; } void GeoTreeEngine::printInfo(std::string& info, bool dispTree, bool dispSnaps, bool dispParam, bool dispState, const std::string& schedgroup, const std::string& optype, bool useColors, bool monitoring) { eos::common::RWMutexWriteLock arlock; if (dispState) { arlock.Grab(pAddRmFsMutex); } RWMutexReadLock lock(pTreeMapMutex); stringstream ostr; map orderByGroupName; std::string format_s = !monitoring ? "s" : "os"; std::string format_ss = !monitoring ? "-s" : "os"; std::string format_l = !monitoring ? "l" : "ol"; std::string format_ll = !monitoring ? "-l" : "ol"; std::string format_lll = !monitoring ? "+l" : "ol"; std::string format_f = !monitoring ? "+f" : "of"; std::string unit = !monitoring ? "s" : ""; std::string na = !monitoring ? "-NA-" : "NA"; unsigned scale = !monitoring ? 1000 : 1; // miliseconds to seconds for human view if (dispParam) { ostr << "### GeoTreeEngine parameters :" << std::endl; ostr << "skipSaturatedAccess = " << pSkipSaturatedAccess << std::endl; ostr << "skipSaturatedDrnAccess = " << pSkipSaturatedDrnAccess << std::endl; ostr << "skipSaturatedBlcAccess = " << pSkipSaturatedBlcAccess << std::endl; ostr << "proxyCloseToFs = " << pProxyCloseToFs << std::endl; ostr << "penaltyUpdateRate = " << pPenaltyUpdateRate << std::endl; ostr << "plctDlScorePenalty = " << pPenaltySched.pPlctDlScorePenaltyF[0] << "(default)" << " | " << pPenaltySched.pPlctDlScorePenaltyF[1] << "(1Gbps)" << " | " << pPenaltySched.pPlctDlScorePenaltyF[2] << "(10Gbps)" << " | " << pPenaltySched.pPlctDlScorePenaltyF[3] << "(100Gbps)" << " | " << pPenaltySched.pPlctDlScorePenaltyF[4] << "(1000Gbps)" << std::endl; ostr << "plctUlScorePenalty = " << pPenaltySched.pPlctUlScorePenaltyF[0] << "(defaUlt)" << " | " << pPenaltySched.pPlctUlScorePenaltyF[1] << "(1Gbps)" << " | " << pPenaltySched.pPlctUlScorePenaltyF[2] << "(10Gbps)" << " | " << pPenaltySched.pPlctUlScorePenaltyF[3] << "(100Gbps)" << " | " << pPenaltySched.pPlctUlScorePenaltyF[4] << "(1000Gbps)" << std::endl; ostr << "accessDlScorePenalty = " << pPenaltySched.pAccessDlScorePenaltyF[0] << "(default)" << " | " << pPenaltySched.pAccessDlScorePenaltyF[1] << "(1Gbps)" << " | " << pPenaltySched.pAccessDlScorePenaltyF[2] << "(10Gbps)" << " | " << pPenaltySched.pAccessDlScorePenaltyF[3] << "(100Gbps)" << " | " << pPenaltySched.pAccessDlScorePenaltyF[4] << "(1000Gbps)" << std::endl; ostr << "accessUlScorePenalty = " << pPenaltySched.pAccessUlScorePenaltyF[0] << "(defaUlt)" << " | " << pPenaltySched.pAccessUlScorePenaltyF[1] << "(1Gbps)" << " | " << pPenaltySched.pAccessUlScorePenaltyF[2] << "(10Gbps)" << " | " << pPenaltySched.pAccessUlScorePenaltyF[3] << "(100Gbps)" << " | " << pPenaltySched.pAccessUlScorePenaltyF[4] << "(1000Gbps)" << std::endl; ostr << "fillRatioLimit = " << (int)pFillRatioLimit << std::endl; ostr << "fillRatioCompTol = " << (int)pFillRatioCompTol << std::endl; ostr << "saturationThres = " << (int)pSaturationThres << std::endl; ostr << "timeFrameDurationMs = " << (int)pTimeFrameDurationMs << std::endl; } if (dispState) { ostr << "frameCount = " << pFrameCount << std::endl; //! Added penalties for each fs over successive frames if (!monitoring) { ostr << "\nā”ā”> Added penalties for each fs over successive frames\n"; } { // to be sure that no fs in inserted removed in the meantime struct timeval curtime; gettimeofday(&curtime, 0); size_t ts = curtime.tv_sec * 1000 + curtime.tv_usec / 1000; TableFormatterBase table; TableHeader table_header; if (monitoring) { table_header.push_back(std::make_tuple("type", 4, format_ss)); } table_header.push_back(std::make_tuple("fsid", 4, format_ll)); table_header.push_back(std::make_tuple("drct", 4, format_ss)); for (size_t itcol = 0; itcol < pCircSize; itcol++) { float frame = pLatencySched.pCircFrCnt2Timestamp[ (pFrameCount + pCircSize - 1 - itcol) % pCircSize] ? (ts - pLatencySched.pCircFrCnt2Timestamp[ (pFrameCount + pCircSize - 1 - itcol) % pCircSize]) * 0.001 : 0; char header_name[24]; std::sprintf(header_name, "%.1f", frame); table_header.push_back(std::make_tuple(header_name, 4, format_l)); } table.SetHeader(table_header); FsView::gFsView.ViewMutex.LockRead(); size_t fsid_count = pPenaltySched.pCircFrCnt2FsPenalties.begin()->size(); for (size_t fsid = 1; fsid < fsid_count; fsid++) { if (!FsView::gFsView.mIdView.exists(fsid)) { continue; } table.AddSeparator(); // for Upload TableData table_data; table_data.emplace_back(); if (monitoring) { table_data.back().push_back(TableCell("AddedPenalties", format_ss)); } table_data.back().push_back(TableCell((unsigned long long)fsid, format_l)); table_data.back().push_back(TableCell("UL", format_ss)); for (size_t itcol = 0; itcol < pCircSize; itcol++) { int value = pPenaltySched.pCircFrCnt2FsPenalties[ (pFrameCount + pCircSize - 1 - itcol) % pCircSize][fsid].ulScorePenalty; table_data.back().push_back(TableCell(value, format_l)); } // for Download table_data.emplace_back(); if (monitoring) { table_data.back().push_back(TableCell("AddedPenalties", format_ss)); table_data.back().push_back(TableCell((unsigned long long)fsid, format_l)); } else { table_data.back().push_back(TableCell("", format_ss)); } table_data.back().push_back(TableCell("DL", format_ss)); for (size_t itcol = 0; itcol < pCircSize; itcol++) { int value = pPenaltySched.pCircFrCnt2FsPenalties[ (pFrameCount + pCircSize - 1 - itcol) % pCircSize][fsid].dlScorePenalty; table_data.back().push_back(TableCell(value, format_l)); } table.AddRows(table_data); } FsView::gFsView.ViewMutex.UnLockRead(); ostr << table.GenerateTable(HEADER2).c_str(); } //! fst2GeotreeEngine latency if (!monitoring) { ostr << "\nā”ā”> fst2GeotreeEngine latency\n"; } struct timeval nowtv; gettimeofday(&nowtv, NULL); size_t nowms = nowtv.tv_sec * 1000 + nowtv.tv_usec / 1000; double avAge = 0.0; size_t count = 0; std::vector> data_fst; for (auto it : pLatencySched.pFsId2LatencyStats) { if (it.getage(nowms) < 600000) { // consider only if less than a minute avAge += it.getage(nowms); count++; } } avAge /= (count ? count : 1); TableFormatterBase table_fst; if (!monitoring) table_fst.SetHeader({ std::make_tuple("fsid", 6, format_ll), std::make_tuple("minimum", 10, format_f), std::make_tuple("averge", 10, format_f), std::make_tuple("maximum", 10, format_f), std::make_tuple("age(last)", 10, format_f) }); else table_fst.SetHeader({ std::make_tuple("type", 0, format_ss), std::make_tuple("fsid", 0, format_ll), std::make_tuple("min", 0, format_f), std::make_tuple("avg", 0, format_f), std::make_tuple("max", 0, format_f), std::make_tuple("age(last)", 0, format_f) }); FsView::gFsView.ViewMutex.LockRead(); for (size_t fsid = 1; fsid < pLatencySched.pFsId2LatencyStats.size(); fsid++) { if (!FsView::gFsView.mIdView.exists(fsid)) { continue; } // more than 1 minute, something is wrong if (pLatencySched.pFsId2LatencyStats[fsid].getage(nowms) > 600000) { data_fst.push_back(std::make_tuple(fsid, 0, 0, 0, 0, false)); } else data_fst.push_back(std::make_tuple(fsid, pLatencySched.pFsId2LatencyStats[fsid].minlatency, pLatencySched.pFsId2LatencyStats[fsid].averagelatency, pLatencySched.pFsId2LatencyStats[fsid].maxlatency, pLatencySched.pFsId2LatencyStats[fsid].getage(nowms), true)); } FsView::gFsView.ViewMutex.UnLockRead(); for (auto it : data_fst) { TableData table_data; table_data.emplace_back(); if (monitoring) { table_data.back().push_back(TableCell("fst2GeotreeEngine", format_ss)); } if (std::get<0>(it) == 0) { table_data.back().push_back(TableCell("global", format_ss)); } else { table_data.back().push_back(TableCell(std::get<0>(it), format_l)); } if (std::get<5>(it)) { table_data.back().push_back(TableCell(std::get<1>(it) / scale, format_f, unit)); table_data.back().push_back(TableCell(std::get<2>(it) / scale, format_f, unit)); table_data.back().push_back(TableCell(std::get<3>(it) / scale, format_f, unit)); table_data.back().push_back(TableCell(std::get<4>(it) / scale, format_f, unit)); } else for (int i = 0; i < 4; i++) { table_data.back().push_back(TableCell(na, format_ss)); } table_fst.AddRows(table_data); if (std::get<0>(it) == 0 && data_fst.size() > 1) { table_fst.AddSeparator(); } } ostr << table_fst.GenerateTable(HEADER2).c_str(); } // ==== run through the map of file systems unsigned geo_depth_max = 0; // Set for tree: group, num of line, depth, color, prefix_1, prefix_2, // geotag[::fsid], host, leavs count, nodes count, status std::set> data_tree; // Set for snapshot: group, num of line, depth, color, prefix_1, prefix_2, // operation, operation_short, fsid, geotag/host, // free, repl, pidx, status, ulSc, dlSc, filR, totS, totW std::set> data_snapshot; for (auto it = pGroup2SchedTME.begin(); it != pGroup2SchedTME.end(); it++) { if (dispTree && (schedgroup.empty() || schedgroup == "*" || (schedgroup == it->second->group->mName))) { it->second->slowTree->display(data_tree, geo_depth_max, useColors); } if (dispSnaps && (schedgroup.empty() || schedgroup == "*" || (schedgroup == it->second->group->mName))) { if (optype.empty() || (optype == "plct")) { unsigned geo_depth_max_temp = 0; it->second->foregroundFastStruct->placementTree->recursiveDisplay( data_snapshot, geo_depth_max_temp, "Placement", "plct", useColors); geo_depth_max = (geo_depth_max_temp > geo_depth_max) ? geo_depth_max_temp : geo_depth_max; } if (optype.empty() || (optype == "accsro")) { unsigned geo_depth_max_temp = 0; it->second->foregroundFastStruct->rOAccessTree->recursiveDisplay( data_snapshot, geo_depth_max, "Access RO", "accsro", useColors); geo_depth_max = (geo_depth_max_temp > geo_depth_max) ? geo_depth_max_temp : geo_depth_max; } if (optype.empty() || (optype == "accsrw")) { unsigned geo_depth_max_temp = 0; it->second->foregroundFastStruct->rWAccessTree->recursiveDisplay( data_snapshot, geo_depth_max, "Access RW", "accsrw", useColors); geo_depth_max = (geo_depth_max_temp > geo_depth_max) ? geo_depth_max_temp : geo_depth_max; } if (optype.empty() || (optype == "accsdrain")) { unsigned geo_depth_max_temp = 0; it->second->foregroundFastStruct->drnAccessTree->recursiveDisplay( data_snapshot, geo_depth_max, "Draining Access", "accsdrain", useColors); geo_depth_max = (geo_depth_max_temp > geo_depth_max) ? geo_depth_max_temp : geo_depth_max; } if (optype.empty() || (optype == "plctdrain")) { unsigned geo_depth_max_temp = 0; it->second->foregroundFastStruct->drnPlacementTree->recursiveDisplay( data_snapshot, geo_depth_max, "Draining Placement", "plctdrain", useColors); geo_depth_max = (geo_depth_max_temp > geo_depth_max) ? geo_depth_max_temp : geo_depth_max; } } } // ==== run through the map of file systems for (auto it = pPxyGrp2DpTME.begin(); it != pPxyGrp2DpTME.end(); it++) { if (dispTree && (schedgroup.empty() || schedgroup == "*" || (schedgroup == it->first))) { std::string group_name = it->first + "(proxy)"; it->second->slowTree->display(data_tree, geo_depth_max, useColors); } if (dispSnaps && (schedgroup.empty() || schedgroup == "*" || (schedgroup == it->first))) { unsigned geo_depth_max_temp = 0; it->second->foregroundFastStruct->proxyAccessTree->recursiveDisplay( data_snapshot, geo_depth_max, "Proxy group", "proxy", useColors); geo_depth_max = (geo_depth_max_temp > geo_depth_max) ? geo_depth_max_temp : geo_depth_max; } } // Output for "geosched show tree" TableFormatterBase table_tree; TableHeader table_header; table_header.push_back(std::make_tuple("group", 6, format_ss)); table_header.push_back(std::make_tuple("geotag", 6, format_ss)); if (!monitoring && geo_depth_max > 1) { for (unsigned i = 1; i < geo_depth_max; i++) { std::string name = "lev" + std::to_string(i); table_header.push_back(std::make_tuple(name, 4, format_ss)); } } table_header.push_back(std::make_tuple("fsid", 4, format_l)); table_header.push_back(std::make_tuple("node", 12, format_s)); table_header.push_back(std::make_tuple("branches", 5, format_l)); table_header.push_back(std::make_tuple("leavs", 5, format_l)); table_header.push_back(std::make_tuple("sum", 3, format_l)); table_header.push_back(std::make_tuple("status", 6, format_s)); table_tree.SetHeader(table_header); unsigned prefix[geo_depth_max + 1]; for (auto it : data_tree) { unsigned geo_depth = 0; std::string geotag_temp = std::get<6>(it); while (geotag_temp.find("::") != std::string::npos) { geotag_temp.erase(0, geotag_temp.find("::") + 2); geo_depth++; } TableData table_data; table_data.emplace_back(); // Print group (depth=1) if (std::get<2>(it) == 1) { for (unsigned i = 0; i < geo_depth_max + 1; i++) { prefix[i] = 0; } table_tree.AddSeparator(); table_data.back().push_back(TableCell(std::get<0>(it), format_s, "", false, std::get<3>(it))); for (unsigned i = 0; i < geo_depth_max + 2; i++) { table_data.back().push_back(TableCell("", format_s, "", true)); // blank cell after group if (monitoring && i == 2) { break; } } } // Print geotag (depth=2) else if (std::get<2>(it) == 2) { if (!monitoring) { if (geo_depth == 0) { prefix[0] = std::get<5>(it); table_data.back().push_back(TableCell(prefix[0], "t")); table_data.back().push_back(TableCell(std::get<6>(it), format_s, "", false, std::get<3>(it))); for (unsigned i = 0; i < geo_depth_max - 1; i++) { // after arrows table_data.back().push_back(TableCell("", format_s, "", true)); } } else { prefix[geo_depth - 1] = std::get<4>(it); prefix[geo_depth] = std::get<5>(it); for (unsigned i = 0; i <= geo_depth; i++) { // arrows table_data.back().push_back(TableCell(prefix[i], "t")); } std::string name = std::get<6>(it).substr(std::get<6>(it).rfind("::") + 2); table_data.back().push_back(TableCell(name, format_s, "", false, std::get<3>(it))); for (unsigned i = 1; i < geo_depth_max - geo_depth; i++) { table_data.back().push_back(TableCell("", format_s)); } } } else { table_data.back().push_back(TableCell(std::get<0>(it), format_s)); table_data.back().push_back(TableCell(std::get<6>(it), format_s)); } table_data.back().push_back(TableCell("", format_s, "", true)); table_data.back().push_back(TableCell("", format_s, "", true)); } // Print fsid and node (depth=3) else if (std::get<2>(it) == 3) { if (!monitoring) { if (geo_depth > 0) { prefix[geo_depth - 1] = std::get<4>(it); prefix[geo_depth] = std::get<5>(it); for (unsigned i = 0; i <= geo_depth; i++) { // arrows unsigned arrow = (i == geo_depth && geo_depth_max - geo_depth > 0) ? prefix[i] + 2 : prefix[i]; table_data.back().push_back(TableCell(arrow, "t")); } for (unsigned i = 0; i < geo_depth_max - geo_depth; i++) { // extended arrows unsigned arrow = (i == geo_depth_max - geo_depth - 1) ? 7 : 6; table_data.back().push_back(TableCell(arrow, "t")); } } } else { std::string geotag = std::get<6>(it).substr(0, std::get<6>(it).rfind("::")); table_data.back().push_back(TableCell(std::get<0>(it), format_s)); table_data.back().push_back(TableCell(geotag, format_s)); } unsigned fsid = std::atoi(std::get<6>(it).substr(std::get<6> (it).rfind("::") + 2).c_str()); table_data.back().push_back(TableCell(fsid, format_l, "", false, std::get<3>(it))); table_data.back().push_back(TableCell(std::get<7>(it), format_s, "", false, std::get<3>(it))); } // Print other columns table_data.back().push_back(TableCell(std::get<9>(it) - std::get<8>(it), format_l)); table_data.back().push_back(TableCell(std::get<8>(it), format_l)); table_data.back().push_back(TableCell(std::get<9>(it), format_l)); table_data.back().push_back(TableCell(std::get<10>(it), format_s, "", (std::get<2>(it) != 3))); table_tree.AddRows(table_data); } ostr << table_tree.GenerateTable(HEADER).c_str(); // Output for "geosched show snapshot" std::string geotag = ""; size_t operation_count = 0; TableFormatterBase table_snapshot; TableHeader snapshot_header; snapshot_header.push_back(std::make_tuple("group", 6, format_ss)); snapshot_header.push_back(std::make_tuple("operation", 6, format_ss)); snapshot_header.push_back(std::make_tuple("geotag", 6, format_ss)); if (!monitoring && geo_depth_max > 1) { for (unsigned i = 1; i < geo_depth_max; i++) { std::string name = "lev" + std::to_string(i); snapshot_header.push_back(std::make_tuple(name, 2, format_ss)); } } snapshot_header.push_back(std::make_tuple("fsid", 4, format_l)); snapshot_header.push_back(std::make_tuple("node", 12, format_s)); snapshot_header.push_back(std::make_tuple("free", 4, format_l)); snapshot_header.push_back(std::make_tuple("repl", 4, format_l)); snapshot_header.push_back(std::make_tuple("pidx", 4, format_l)); snapshot_header.push_back(std::make_tuple("status", 6, format_s)); snapshot_header.push_back(std::make_tuple("ulSc", 4, format_l)); snapshot_header.push_back(std::make_tuple("dlSc", 4, format_l)); snapshot_header.push_back(std::make_tuple("filR", 4, format_l)); snapshot_header.push_back(std::make_tuple("totS", 4, format_lll)); snapshot_header.push_back(std::make_tuple("totW", 4, format_lll)); table_snapshot.SetHeader(snapshot_header); set operations; for (auto it : data_snapshot) { // we need count of used operations operations.insert(std::get<6>(it)); } unsigned geo_depth = 0; for (auto it : data_snapshot) { if (std::get<2>(it) == 2) { geo_depth = 0; std::string geotag_temp = std::get<9>(it); while (geotag_temp.find("::") != std::string::npos) { geotag_temp.erase(0, geotag_temp.find("::") + 2); geo_depth++; } } TableData table_data; table_data.emplace_back(); // Print group (depth=1) if (std::get<2>(it) == 1) { for (unsigned i = 0; i < geo_depth_max + 1; i++) { prefix[i] = 0; } if (!monitoring) { if (schedgroup == "*" || std::get<6>(it) == "Placement" || std::get<1>(it) == 0) { table_snapshot.AddSeparator(); table_data.back().push_back(TableCell(std::get<0>(it), format_s, "", false, std::get<3>(it))); table_data.emplace_back(); operation_count = 0; } operation_count++; unsigned tree_arrow = (schedgroup == "*" || operation_count == operations.size()) ? 2 : 3; table_data.back().push_back(TableCell(tree_arrow, "t")); table_data.back().push_back(TableCell(std::get<6>(it), format_s, "", false, std::get<3>(it))); } else { table_data.back().push_back(TableCell(std::get<0>(it), format_s)); table_data.back().push_back(TableCell(std::get<7>(it), format_s)); } for (unsigned i = 0; i < geo_depth_max + 2; i++) { table_data.back().push_back(TableCell("", format_s, "", true)); // blank cell after group if (monitoring && i == 2) { break; } } } // Print geotag (depth=2) else if (std::get<2>(it) == 2) { geotag = std::get<9>(it); if (!monitoring) { unsigned tree_arrow = (schedgroup == "*" || operation_count == operations.size()) ? 0 : 1; table_data.back().push_back(TableCell(tree_arrow, "t")); if (geo_depth == 0) { prefix[0] = std::get<5>(it); table_data.back().push_back(TableCell(prefix[0], "t")); table_data.back().push_back(TableCell(geotag, format_s, "", false, std::get<3>(it))); for (unsigned i = 0; i < geo_depth_max - 1; i++) { // after arrows table_data.back().push_back(TableCell("", format_s, "", true)); } } else { prefix[geo_depth - 1] = std::get<4>(it); prefix[geo_depth] = std::get<5>(it); for (unsigned i = 0; i <= geo_depth; i++) { // arrows table_data.back().push_back(TableCell(prefix[i], "t")); } std::string name = geotag.substr(geotag.rfind("::") + 2); table_data.back().push_back(TableCell(name, format_s, "", false, std::get<3>(it))); for (unsigned i = 1; i < geo_depth_max - geo_depth; i++) { table_data.back().push_back(TableCell("", format_s)); } } } else { table_data.back().push_back(TableCell(std::get<0>(it), format_s)); table_data.back().push_back(TableCell(std::get<7>(it), format_s)); table_data.back().push_back(TableCell(geotag, format_s)); } table_data.back().push_back(TableCell("", format_s, "", true)); table_data.back().push_back(TableCell("", format_s, "", true)); } // Print fsid and node (depth=3) else if (std::get<2>(it) == 3) { if (!monitoring) { unsigned tree_arrow = (schedgroup == "*" || operation_count == operations.size()) ? 0 : 1; table_data.back().push_back(TableCell(tree_arrow, "t")); prefix[geo_depth] = std::get<4>(it); prefix[geo_depth + 1] = std::get<5>(it); for (unsigned i = 0; i <= geo_depth + 1; i++) { // arrows unsigned arrow = (i == geo_depth + 1 && geo_depth_max - geo_depth - 1 > 0) ? prefix[i] + 2 : prefix[i]; table_data.back().push_back(TableCell(arrow, "t")); } for (unsigned i = 0; i < geo_depth_max - geo_depth - 1; i++) { // extended arrow unsigned arrow = (i == geo_depth_max - geo_depth - 2) ? 7 : 6; table_data.back().push_back(TableCell(arrow, "t")); } } else { table_data.back().push_back(TableCell(std::get<0>(it), format_s)); table_data.back().push_back(TableCell(std::get<7>(it), format_s)); table_data.back().push_back(TableCell(geotag, format_s)); } table_data.back().push_back(TableCell(std::get<8>(it), format_l, "", false, std::get<3>(it))); table_data.back().push_back(TableCell(std::get<9>(it), format_s, "", false, std::get<3>(it))); } // Print other columns table_data.back().push_back(TableCell(std::get<10>(it), format_l)); table_data.back().push_back(TableCell(std::get<11>(it), format_l)); table_data.back().push_back(TableCell(std::get<12>(it), format_l)); table_data.back().push_back(TableCell(std::get<13>(it), format_s)); table_data.back().push_back(TableCell(std::get<14>(it), format_l)); table_data.back().push_back(TableCell(std::get<15>(it), format_l)); table_data.back().push_back(TableCell(std::get<16>(it), format_l)); table_data.back().push_back(TableCell(std::get<17>(it), format_lll)); if ((std::get<13>(it).find("RW") != std::string::npos) || (std::get<13>(it).find("OK") != std::string::npos)) { table_data.back().push_back(TableCell(std::get<18>(it), format_lll)); } else { table_data.back().push_back(TableCell(0, format_lll)); } table_snapshot.AddRows(table_data); } ostr << table_snapshot.GenerateTable(HEADER).c_str(); info = ostr.str(); } bool GeoTreeEngine::placeNewReplicasOneGroup(FsGroup* group, const size_t& nNewReplicas, vector* newReplicas, ino64_t inode, std::vector* dataProxys, std::vector* firewallEntryPoint, SchedType type, vector* existingReplicas, std::vector* fsidsgeotags, unsigned long long bookingSize, const std::string& startFromGeoTag, const std::string& clientGeoTag, const size_t& nCollocatedReplicas, vector* excludeFs, vector* excludeGeoTags) { assert(nNewReplicas); assert(newReplicas); std::vector entries; // find the entry in the map SchedTME* entry; { RWMutexReadLock lock(this->pTreeMapMutex); if (!pGroup2SchedTME.count(group)) { eos_err("could not find the requested placement group in the map"); return false; } entry = pGroup2SchedTME[group]; AtomicInc(entry->fastStructLockWaitersCount); } // readlock the original fast structure entry->doubleBufferMutex.LockRead(); // locate the existing replicas and the excluded fs in the tree vector newReplicasIdx(nNewReplicas), *existingReplicasIdx = NULL, *excludeFsIdx = NULL; newReplicasIdx.resize(0); if (existingReplicas) { existingReplicasIdx = new vector (existingReplicas->size()); existingReplicasIdx->resize(0); int count = 0; for (auto it = existingReplicas->begin(); it != existingReplicas->end(); ++it , ++count) { const SchedTreeBase::tFastTreeIdx* idx = static_cast(0); if (!entry->foregroundFastStruct->fs2TreeIdx->get(*it, idx) && !(*fsidsgeotags)[count].empty()) { // the fs is not in that group. // this could happen because the former file scheduler // could place replicas across multiple groups // with the new geoscheduler, it should not happen // in that case, we try to match a filesystem having the same geotag SchedTreeBase::tFastTreeIdx idx = entry->foregroundFastStruct->tag2NodeIdx->getClosestFastTreeNode(( *fsidsgeotags)[count].c_str()); if (idx && (*entry->foregroundFastStruct->treeInfo)[idx].nodeType == SchedTreeBase::TreeNodeInfo::fs) { if ((std::find(existingReplicasIdx->begin(), existingReplicasIdx->end(), idx) == existingReplicasIdx->end())) { existingReplicasIdx->push_back(idx); } } // if we can't find any such filesystem, the information is not taken into account // (and then can lead to unoptimal placement else { eos_debug("could not place preexisting replica on the fast tree"); } continue; } if (idx) { existingReplicasIdx->push_back(*idx); } } } if (excludeFs) { excludeFsIdx = new vector(excludeFs->size()); excludeFsIdx->resize(0); for (auto it = excludeFs->begin(); it != excludeFs->end(); ++it) { const SchedTreeBase::tFastTreeIdx* idx; if (!entry->foregroundFastStruct->fs2TreeIdx->get(*it, idx)) { // the excluded fs might belong to another group // so it's not an error condition // eos_warning("could not place excluded fs on the fast tree"); continue; } excludeFsIdx->push_back(*idx); } } if (excludeGeoTags) { if (!excludeFsIdx) { excludeFsIdx = new vector(excludeGeoTags->size()); excludeFsIdx->resize(0); } for (auto it = excludeGeoTags->begin(); it != excludeGeoTags->end(); ++it) { SchedTreeBase::tFastTreeIdx idx; idx = entry->foregroundFastStruct->tag2NodeIdx->getClosestFastTreeNode( it->c_str()); excludeFsIdx->push_back(idx); } } SchedTreeBase::tFastTreeIdx startFromNode = 0; if (!startFromGeoTag.empty()) { startFromNode = entry->foregroundFastStruct->tag2NodeIdx->getClosestFastTreeNode( startFromGeoTag.c_str()); } else if (!clientGeoTag.empty()) { startFromNode = entry->foregroundFastStruct->tag2NodeIdx->getClosestFastTreeNode( clientGeoTag.c_str()); } // actually do the job bool success = false; switch (type) { case regularRO: case regularRW: success = placeNewReplicas(entry, nNewReplicas, &newReplicasIdx, entry->foregroundFastStruct->placementTree, existingReplicasIdx, bookingSize, startFromNode, nCollocatedReplicas, excludeFsIdx); break; case draining: success = placeNewReplicas(entry, nNewReplicas, &newReplicasIdx, entry->foregroundFastStruct->drnPlacementTree, existingReplicasIdx, bookingSize, startFromNode, nCollocatedReplicas, excludeFsIdx); break; default: break; } if (!success) { goto cleanup; } // fill the resulting vector and // update the fastTree UlScore and DlScore by applying the penalties newReplicas->resize(0); for (auto it = newReplicasIdx.begin(); it != newReplicasIdx.end(); ++it) { const SchedTreeBase::tFastTreeIdx* idx = NULL; const unsigned int fsid = (*entry->foregroundFastStruct->treeInfo)[*it].fsId; if (!entry->foregroundFastStruct->fs2TreeIdx->get(fsid, idx)) { eos_crit("inconsistency : cannot retrieve index of selected fs though " "it should be in the tree"); success = false; goto cleanup; } const char netSpeedClass = (*entry->foregroundFastStruct->treeInfo)[*idx].netSpeedClass; newReplicas->push_back(fsid); // Apply the penalties if (entry->foregroundFastStruct->placementTree->pNodes[*idx].fsData.dlScore > 0) { applyDlScorePenalty(entry, *idx, pPenaltySched.pPlctDlScorePenalty[netSpeedClass]); } if (entry->foregroundFastStruct->placementTree->pNodes[*idx].fsData.ulScore > 0) { applyUlScorePenalty(entry, *idx, pPenaltySched.pPlctUlScorePenalty[netSpeedClass]); } } if (dataProxys || firewallEntryPoint) { entries.assign(newReplicasIdx.size(), entry); } // find proxy for filesticky scheduling if (dataProxys) { if (!findProxy(newReplicasIdx, entries, inode, dataProxys, NULL, pProxyCloseToFs ? "" : clientGeoTag, filesticky)) { success = false; goto cleanup; } } // find the firewall entry point if needed if (firewallEntryPoint) { std::vector firewallProxyGroups(newReplicasIdx.size()); // if there are some access geotag mapping rules, use them if (pAccessGeotagMapping.inuse && pAccessProxygroup.inuse) for (size_t i = 0; i < newReplicasIdx.size(); i++) { if (clientGeoTag.empty() || accessReqFwEP(( *entries[i]->foregroundFastStruct->treeInfo)[newReplicasIdx[i]].fullGeotag , clientGeoTag)) { firewallProxyGroups[i] = accessGetProxygroup(( *entries[i]->foregroundFastStruct->treeInfo)[newReplicasIdx[i]].fullGeotag); } } // Use the dataproxys as entrypoints if possible if (dataProxys) { *firewallEntryPoint = *dataProxys; } if (!findProxy(newReplicasIdx, entries, inode, firewallEntryPoint, &firewallProxyGroups, pProxyCloseToFs ? "" : clientGeoTag, any)) { success = false; goto cleanup; } } // find proxy in the right proxygroup if any if (dataProxys) { // If we already have some firewall entry points, pass them to the findProxy // procedure to check if it's needed to find a distinct data proxy // use the entrypoints as dataproxy if possible if (firewallEntryPoint) { *dataProxys = *firewallEntryPoint; } if (!findProxy(newReplicasIdx, entries, inode, dataProxys, NULL, pProxyCloseToFs ? "" : clientGeoTag, regular)) { success = false; goto cleanup; } } // Unlock, cleanup cleanup: if (!success) { newReplicas->clear(); } entry->doubleBufferMutex.UnLockRead(); AtomicDec(entry->fastStructLockWaitersCount); if (existingReplicasIdx) { delete existingReplicasIdx; } if (excludeFsIdx) { delete excludeFsIdx; } return success; } // Would be better as defined locally in find Proxy // but it is not supported by gcc 4.4 struct TreeInfoFsIdComparator { SchedTreeBase::FastTreeInfo* nodesinfo; TreeInfoFsIdComparator(SchedTreeBase::FastTreeInfo* infos) { nodesinfo = infos; } bool operator()(const SchedTreeBase::tFastTreeIdx& a, const SchedTreeBase::tFastTreeIdx& b) const { return (*nodesinfo)[a].fsId < (*nodesinfo)[b].fsId; } }; bool GeoTreeEngine::findProxy(const std::vector& fsIdxs, std::vector entries, ino64_t inode, std::vector* dataProxys, std::vector* proxyGroups, const std::string& clientgeotag, tProxySchedType proxyschedtype) { // re initialize result vector dataProxys->resize(fsIdxs.size()); const std::string* fsproxygroup = 0; DataProxyTME* pxyentry = NULL; FastGatewayAccessTree* tree = NULL; std::string sgeotag; for (size_t i = 0; i < fsIdxs.size(); i++) { const std::string* geotag = NULL; // get the proxygroup // WARNING: entries[i]->doubleBufferMutex should be locked by the caller of findProxy if (!(*dataProxys)[i].empty() && (*dataProxys)[i] != "") { if (pPxyHost2DpTMEs.count((*dataProxys)[i])) { const auto& TMEs = pPxyHost2DpTMEs[(*dataProxys)[i]]; // If dataProxys already contains proxy hostnames, check first if they // already do the job for the given proxygroup. bool isInRightPxyGrp = false; if (proxyGroups) { for (auto it = TMEs.begin(); it != TMEs.end(); it++) { if ((*it)->slowTree->getName() == (*proxyGroups)[i]) { isInRightPxyGrp = true; break; } } } if (isInRightPxyGrp) { continue; } { auto entry = (*TMEs.begin()); // we don't want to lock the pxyentry which is already locked if (entry != pxyentry) { AtomicInc(entry->fastStructLockWaitersCount); entry->doubleBufferMutex.LockRead(); } // if they don't, take their geotag as a staring point sgeotag = (*TMEs.begin())->host2SlowTreeNode[(*dataProxys)[i]]->pNodeInfo.fullGeotag; geotag = &sgeotag; if (entry != pxyentry) { entry->doubleBufferMutex.UnLockRead(); AtomicDec(entry->fastStructLockWaitersCount); } } } } if (proxyGroups) { fsproxygroup = &((*proxyGroups)[i]); } else { fsproxygroup = & (*entries[i]->foregroundFastStruct->treeInfo)[fsIdxs[i]].proxygroup; } if (fsproxygroup->empty() || (*fsproxygroup) == "") { // No proxygroup, nothing to do, there will be an entry with an empty string (*dataProxys)[i].clear(); continue; } // If we don't have a proxy to match, if a client geotag is given then use // it else use the file system client bool trimlastlevel = geotag || clientgeotag.empty(); if (!geotag) { geotag = (clientgeotag.empty() ? & ((*(entries[i]->foregroundFastStruct->treeInfo))[fsIdxs[i]].fullGeotag) : &clientgeotag); } // The deepest intermediate node is a numeric id for both scheduling and GW // trees and they are unrelated. We don't want to keep this to project the // fst location on the gw tree as it would not make sense lock it for each // new fs. RWMutexReadLock lock(this->pPxyTreeMapMutex); if (!pPxyGrp2DpTME.count(*fsproxygroup)) { eos_err("could not find the requested proxy group %s in the map", fsproxygroup->c_str()); return false; } pxyentry = pPxyGrp2DpTME[*fsproxygroup]; AtomicInc(pxyentry->fastStructLockWaitersCount); // readlock the original fast structure pxyentry->doubleBufferMutex.LockRead(); // copy the fasttree if (pxyentry->foregroundFastStruct->proxyAccessTree->copyToBuffer(( char*)tlGeoBuffer, gGeoBufferSize)) { eos_crit("could not make a working copy of the fast tree for proxygroup %s", fsproxygroup->c_str()); pxyentry->doubleBufferMutex.UnLockRead(); AtomicDec(pxyentry->fastStructLockWaitersCount); return false; } tree = (FastGatewayAccessTree*)tlGeoBuffer; // get the closest node from the filesystem SchedTreeBase::tFastTreeIdx idx; idx = pxyentry->foregroundFastStruct->tag2NodeIdx->getClosestFastTreeNode( trimlastlevel ? std::string(*geotag, 0, geotag->rfind("::")).c_str() : geotag->c_str()); bool schedsuccess = false; eos::common::Logging& g_logging = eos::common::Logging::GetInstance(); if (proxyschedtype == filesticky) { // scheduling should consistently go through the same (firewallentrypoint,proxy) // this is to do the caching of the file only on one proxy // serving a same file from two proxies is not optimal but it is not mendatory neither if ((*entries[i]->foregroundFastStruct->treeInfo)[fsIdxs[i]].fileStickyProxyDepth < 0) { schedsuccess = true; } // first find the best proxy else { // then consider all the possible proxy in the same proxygroup // within the subtree starting at the best proxy and going uproot by // (*pxyentry->foregroundFastStruct->treeInfo)[idx].fileStickyProxyDepth // allocate a vectors to get the proxies auto s = pxyentry->foregroundFastStruct->treeInfo->size(); std::vector proxiesIdxs(s), upRootLevels(s), upRootLevelsIdxs(s); SchedTreeBase::tFastTreeIdx upRootLevelsCount = 0; SchedTreeBase::tFastTreeIdx np = 0; // get all the proxies if ((np = tree->findFreeSlotsAll(&proxiesIdxs[0], proxiesIdxs.size(), idx, true, SchedTreeBase::None, &upRootLevelsCount, &upRootLevelsIdxs[0], &upRootLevels[0]))) { schedsuccess = (np != 0); if (schedsuccess) { if (g_logging.gLogMask & LOG_MASK(LOG_DEBUG)) { stringstream ss; ss << " all proxys are:"; for (auto it = proxiesIdxs.begin(); it != proxiesIdxs.end(); it++) { ss << (*pxyentry->foregroundFastStruct->treeInfo)[*it].hostport; ss << "(" << (*pxyentry->foregroundFastStruct->treeInfo)[*it].fullGeotag << ")"; if (it != proxiesIdxs.end() - 1) { ss << ","; } } ss << " upRootLevels are:"; for (auto it = upRootLevels.begin(); it != upRootLevels.end(); it++) { ss << (int)*it; if (it != upRootLevels.end() - 1) { ss << ","; } } ss << " upRootLevelsIdxs are:"; for (auto it = upRootLevelsIdxs.begin(); it != upRootLevelsIdxs.end(); it++) { ss << (int)*it; if (it != upRootLevelsIdxs.end() - 1) { ss << ","; } } ss << " taken from idx:" << idx << "(" << *geotag << ")"; eos_debug("%s", ss.str().c_str()); } // keep only the proxies within the allowed uproot level, if any int uprlev = 0; while ( uprlev < upRootLevelsCount && upRootLevels[uprlev] <= (*entries[i]->foregroundFastStruct->treeInfo)[fsIdxs[i]].fileStickyProxyDepth ) { uprlev++; } if (uprlev == 0) { // no proxy with a right uproot level schedsuccess = false; } else { int resize = (uprlev == upRootLevelsCount) ? -1 : upRootLevelsIdxs[uprlev]; if (resize > 0) { proxiesIdxs.resize(resize); } else { proxiesIdxs.resize(np); } // sort the proxies by fsid TreeInfoFsIdComparator cmp(pxyentry->foregroundFastStruct->treeInfo); std::sort(proxiesIdxs.begin(), proxiesIdxs.end(), cmp); // take the proxy idx = proxiesIdxs[inode % proxiesIdxs.size()]; // if it succeeds, feel the corresponding element of the return vector (*dataProxys)[i] = (*pxyentry->foregroundFastStruct->treeInfo)[idx].hostport; if (g_logging.gLogMask & LOG_MASK(LOG_DEBUG)) { stringstream ss; ss << "file sticky proxy scheduling fs:" << (*entries[i]->foregroundFastStruct->treeInfo)[fsIdxs[i]].fsId; ss << " | fileStickyProxyDepth:" << (int)( *entries[i]->foregroundFastStruct->treeInfo)[fsIdxs[i]].fileStickyProxyDepth; ss << " | possible proxys are:"; for (auto it = proxiesIdxs.begin(); it != proxiesIdxs.end(); it++) { ss << (*pxyentry->foregroundFastStruct->treeInfo)[*it].hostport; ss << "(" << (*pxyentry->foregroundFastStruct->treeInfo)[*it].fullGeotag << ")"; if (it != proxiesIdxs.end() - 1) { ss << ","; } } ss << " | inode:" << inode; ss << " | selected host is:" << (*pxyentry->foregroundFastStruct->treeInfo)[idx].hostport; eos_debug("%s", ss.str().c_str()); } } } } } } else { if (proxyschedtype == any || ((*entries[i]->foregroundFastStruct->treeInfo)[fsIdxs[i]].fileStickyProxyDepth < 0 && proxyschedtype == regular)) { // get the proxy if (!(schedsuccess = tree->findFreeSlot(idx, idx, true /*allow uproot if necessary*/, false, true /*skipSaturated*/))) { (*dataProxys)[i] = (*pxyentry->foregroundFastStruct->treeInfo)[idx].hostport; } else { if ((schedsuccess = tree->findFreeSlot(idx, idx, true /*allow uproot if necessary*/, false, false /*skipSaturated*/))) // if it succeeds, feel the corresponding element of the return vector { (*dataProxys)[i] = (*pxyentry->foregroundFastStruct->treeInfo)[idx].hostport; } } } else { schedsuccess = true; // nothing to do } } // if the scheduling failed, throw an error if (!schedsuccess) { eos_err("could not find a proxy for proxygroup %s", fsproxygroup->c_str()); std::stringstream ss; ss << "tree is as follow\n" << (*tree); eos_err(ss.str().c_str()); pxyentry->doubleBufferMutex.UnLockRead(); AtomicDec(pxyentry->fastStructLockWaitersCount); return false; } // unlock it for each new fs pxyentry->doubleBufferMutex.UnLockRead(); AtomicDec(pxyentry->fastStructLockWaitersCount); } return true; } int GeoTreeEngine::accessHeadReplicaMultipleGroup(const size_t& nAccessReplicas, unsigned long& fsIndex, std::vector* existingReplicas, ino64_t inode, std::vector* dataProxys, std::vector* firewallEntryPoint, SchedType type, const std::string& accesserGeotag, const eos::common::FileSystem::fsid_t& forcedFsId, std::vector* unavailableFs) { int returnCode = ENODATA; assert(nAccessReplicas); assert(existingReplicas); // Check that enough replicas exist already if (nAccessReplicas > existingReplicas->size()) { eos_debug("msg=\"not enough replicas\" current=%d required=%d", (int)existingReplicas->size(), (int)nAccessReplicas); return EROFS; } // Check if the forced replicas (if any) are among the existing replicas if (forcedFsId > 0 && (std::find(existingReplicas->begin(), existingReplicas->end(), forcedFsId) == existingReplicas->end())) { return ENODATA; } // Find the group holdings the fs of the existing replicas and check that the // replicas are available size_t availFsCount = 0; eos::mgm::SchedTreeBase::TreeNodeSlots freeSlot; freeSlot.freeSlotsCount = 1; std::vector::iterator it; std::vector ERIdx; ERIdx.reserve(existingReplicas->size()); std::vector entries; entries.reserve(existingReplicas->size()); // Maps tree maps entries (i.e. scheduling groups) to fs ids containing an // available replica and the corresponding fastTreeIndex map > > entry2FsId; SchedTME* entry = NULL; { // Lock the scheduling group -> trees map so that the a map entry cannot // be delete while processing it. RWMutexReadLock lock(this->pTreeMapMutex); for (auto exrepIt = existingReplicas->begin(); exrepIt != existingReplicas->end(); exrepIt++) { auto mentry = pFs2SchedTME.find(*exrepIt); // If we cannot find the fs in any group, there is an inconsistency somewhere if (mentry == pFs2SchedTME.end()) { eos_warning("%s", "msg=\"cannot find the existing replica in any " "scheduling group\" fsid=%lu", *exrepIt); continue; } entry = mentry->second; // lock the double buffering to make sure all the fast trees are not modified if (!entry2FsId.count(entry)) { // if the entry is already there, it was locked already entry->doubleBufferMutex.LockRead(); // to prevent the destruction of the entry AtomicInc(entry->fastStructLockWaitersCount); } const SchedTreeBase::tFastTreeIdx* idx; if (!entry->foregroundFastStruct->fs2TreeIdx->get(*exrepIt, idx)) { eos_warning("msg=\"cannot find fs in the scheduling group in the 2nd " "pass\" fsid=%lu", *exrepIt); if (!entry2FsId.count(entry)) { entry->doubleBufferMutex.UnLockRead(); AtomicDec(entry->fastStructLockWaitersCount); } continue; } // take the fastindex of each existing replica ERIdx.push_back(*idx); entries.push_back(entry); // check if the fs is available bool isValid = false; std::string msg; if (std::find(unavailableFs->begin(), unavailableFs->end(), *exrepIt) == unavailableFs->end()) { switch (type) { case regularRO: isValid = entry->foregroundFastStruct->rOAccessTree->pBranchComp.isValidSlot( &entry->foregroundFastStruct->rOAccessTree->pNodes[*idx].fsData, &freeSlot); if (!isValid) { msg = "file system not readable"; } break; case regularRW: isValid = entry->foregroundFastStruct->rWAccessTree->pBranchComp.isValidSlot( &entry->foregroundFastStruct->rWAccessTree->pNodes[*idx].fsData, &freeSlot); if (!isValid) { msg = "file system not writable"; } break; case draining: isValid = entry->foregroundFastStruct->drnAccessTree->pBranchComp.isValidSlot( &entry->foregroundFastStruct->drnAccessTree->pNodes[*idx].fsData, &freeSlot); if (!isValid) { msg = "file system not readable for drain"; } break; default: break; } } else { msg = "file system marked as unavailable"; } if (isValid) { entry2FsId[entry].push_back(make_pair(*exrepIt, *idx)); availFsCount++; } else { // create an empty entry in the map if needed if (!entry2FsId.count(entry)) { entry2FsId[entry] = vector< pair >(); } // update the unavailable fs unavailableFs->push_back(*exrepIt); eos_warning("msg=\"%s\" fsid=%lu", msg.c_str(), (unsigned long)*exrepIt); } } } // Check if there are enough available replicas if (availFsCount < nAccessReplicas) { returnCode = ENETUNREACH; goto cleanup; } // Check if the forced replica (if any) is available if (forcedFsId > 0 && (std::find(unavailableFs->begin(), unavailableFs->end(), forcedFsId) != unavailableFs->end())) { returnCode = ENETUNREACH; goto cleanup; } // We have multiple groups - compute their geolocation scores to the the // available fsids (+things) having a replica { SchedTreeBase::tFastTreeIdx accesserNode = 0; FileSystem::fsid_t selectedFsId = 0; eos::common::Logging& g_logging = eos::common::Logging::GetInstance(); { // maps a geolocation scores (int) to all the file system having this geolocation scores map< unsigned , std::vector< FileSystem::fsid_t > > geoScore2Fs; vector accessedReplicasIdx(1); for (auto entryIt = entry2FsId.begin(); entryIt != entry2FsId.end(); entryIt ++) { if (g_logging.gLogMask & LOG_MASK(LOG_DEBUG)) { char buffer[1024]; buffer[0] = 0; char* buf = buffer; for (auto it = entryIt->second.begin(); it != entryIt->second.end(); ++it) { buf += sprintf(buf, "%lu ", (unsigned long)(it->second)); } eos_debug("existing replicas indices in geotree -> %s", buffer); buffer[0] = 0; buf = buffer; for (auto it = entryIt->second.begin(); it != entryIt->second.end(); ++it) { buf += sprintf(buf, "%s ", (*entryIt->first->foregroundFastStruct->treeInfo)[it->second].fullGeotag.c_str()); } eos_debug("existing replicas geotags in geotree -> %s", buffer); } // If there is no replica here (might happen if it's spotted as unavailable // after the first pass) if (entryIt->second.empty()) { continue; } entry = entryIt->first; // find the closest tree node to the accesser accesserNode = entry->foregroundFastStruct->tag2NodeIdx->getClosestFastTreeNode( accesserGeotag.c_str());; // fill a vector with the indices of the replicas vector existingReplicasIdx(entryIt->second.size()); for (size_t i = 0; i < entryIt->second.size(); i++) { existingReplicasIdx[i] = entryIt->second[i].second; } // pickup an access slot is this scheduling group accessedReplicasIdx.clear(); unsigned char retCode = 0; switch (type) { case regularRO: retCode = accessReplicas(entryIt->first, 1, &accessedReplicasIdx, accesserNode, &existingReplicasIdx, entry->foregroundFastStruct->rOAccessTree, pSkipSaturatedAccess); break; case regularRW: retCode = accessReplicas(entryIt->first, 1, &accessedReplicasIdx, accesserNode, &existingReplicasIdx, entry->foregroundFastStruct->rWAccessTree, pSkipSaturatedAccess); break; case draining: retCode = accessReplicas(entryIt->first, 1, &accessedReplicasIdx, accesserNode, &existingReplicasIdx, entry->foregroundFastStruct->drnAccessTree, pSkipSaturatedDrnAccess); break; default: break; } if (!retCode) { goto cleanup; } const string& fsGeotag = (*entryIt->first->foregroundFastStruct->treeInfo)[*accessedReplicasIdx.begin()].fullGeotag; unsigned geoScore = 0; size_t kmax = min(accesserGeotag.length(), fsGeotag.length()); for (size_t k = 0; k < kmax; k++) { if (accesserGeotag[k] != fsGeotag[k]) { break; } if (accesserGeotag[k] == ':' && k + 1 < kmax && accesserGeotag[k + 1] == ':') { geoScore++; } } // if the box is unsaturated, give an advantage to this FS if (retCode == 2) { geoScore += 100; eos_debug("found unsaturated fs"); } geoScore2Fs[geoScore].push_back( (*entryIt->first->foregroundFastStruct->treeInfo)[*accessedReplicasIdx.begin()].fsId); } // randomly choose a fs among the highest scored ones selectedFsId = geoScore2Fs.rbegin()->second[rand() % geoScore2Fs.rbegin()->second.size()]; // return the corresponding index for (it = existingReplicas->begin(); it != existingReplicas->end(); it++) { if (*it == selectedFsId) { fsIndex = (eos::common::FileSystem::fsid_t)(it - existingReplicas->begin()); break; } } // check we found it if (it == existingReplicas->end()) { eos_err("inconsistency : unable to find the selected fs but it should be there"); returnCode = EIO; goto cleanup; } } if (g_logging.gLogMask & LOG_MASK(LOG_DEBUG)) { char buffer[1024]; buffer[0] = 0; char* buf = buffer; for (auto it = existingReplicas->begin(); it != existingReplicas->end(); ++it) { buf += sprintf(buf, "%lu ", (unsigned long)(*it)); } eos_debug("existing replicas fs id's -> %s", buffer); if (entry) { eos_debug("accesser closest node to %s index -> %d / %s", accesserGeotag.c_str(), (int)accesserNode, (*entry->foregroundFastStruct->treeInfo)[accesserNode].fullGeotag.c_str()); } eos_debug("selected FsId -> %d / idx %d", (int)selectedFsId, (int)fsIndex); } } // Apply penalties if needed if (true) { std::set setunav(unavailableFs->begin(), unavailableFs->end()); for (size_t i = 0; i < existingReplicas->size(); i++) { size_t j = (fsIndex + i) % existingReplicas->size(); auto& fs = (*existingReplicas)[j]; // If this one is unavailable, skip it if (setunav.count(fs)) { continue; } if (!pFs2SchedTME.count(fs)) { continue; } entry = pFs2SchedTME[fs]; const SchedTreeBase::tFastTreeIdx* idx; if (entry->foregroundFastStruct->fs2TreeIdx->get(fs, idx)) { const char netSpeedClass = (*entry->foregroundFastStruct->treeInfo)[*idx].netSpeedClass; // every available box will push data if (entry->foregroundFastStruct->placementTree->pNodes[*idx].fsData.ulScore >= pPenaltySched.pAccessUlScorePenalty[netSpeedClass]) { applyUlScorePenalty(entry, *idx, pPenaltySched.pAccessUlScorePenalty[netSpeedClass]); } // every available box will have to pull data if it's a RW access (or if it's a gateway) if ((type == regularRW) || (j == fsIndex && nAccessReplicas > 1)) { if (entry->foregroundFastStruct->placementTree->pNodes[*idx].fsData.dlScore >= pPenaltySched.pAccessDlScorePenalty[netSpeedClass]) { applyDlScorePenalty(entry, *idx, pPenaltySched.pAccessDlScorePenalty[netSpeedClass]); } } } else { eos_err("could not find fs on the fast tree to apply penalties"); } // The gateway will also have to pull data from the if (j == fsIndex && nAccessReplicas == 1) { // mainly replica layout RO case break; } } } if (dataProxys) { if (!findProxy(ERIdx, entries, inode, dataProxys, NULL, pProxyCloseToFs ? "" : accesserGeotag, filesticky)) { returnCode = ENETUNREACH; goto cleanup; } } if (firewallEntryPoint) { std::vector firewallProxyGroups(ERIdx.size()); // if there are some access geotag mapping rules, use them if (pAccessGeotagMapping.inuse && pAccessProxygroup.inuse) for (size_t i = 0; i < ERIdx.size(); i++) { if (accesserGeotag.empty() || accessReqFwEP((*entries[i]->foregroundFastStruct->treeInfo)[ERIdx[i]].fullGeotag , accesserGeotag)) { firewallProxyGroups[i] = accessGetProxygroup(( *entries[i]->foregroundFastStruct->treeInfo)[ERIdx[i]].fullGeotag); } } if (dataProxys) { *firewallEntryPoint = *dataProxys; } if (!findProxy(ERIdx, entries, inode, firewallEntryPoint, &firewallProxyGroups, pProxyCloseToFs ? "" : accesserGeotag, any)) { returnCode = ENETUNREACH; goto cleanup; } } if (dataProxys) { if (firewallEntryPoint) { *dataProxys = *firewallEntryPoint; } if (!findProxy(ERIdx, entries, inode, dataProxys, NULL, pProxyCloseToFs ? "" : accesserGeotag, regular)) { returnCode = ENETUNREACH; goto cleanup; } } // If we get here, everything is fine returnCode = 0; // cleanup and exit cleanup: for (auto cit = entry2FsId.begin(); cit != entry2FsId.end(); cit++) { cit->first->doubleBufferMutex.UnLockRead(); AtomicDec(cit->first->fastStructLockWaitersCount); } return returnCode; } void GeoTreeEngine::StartUpdater() { updaterThread.reset(&GeoTreeEngine::listenFsChange, this); } void GeoTreeEngine::StopUpdater() { updaterThread.join(); gUpdaterStarted = false; } void GeoTreeEngine::listenFsChange(ThreadAssistant& assistant) { gUpdaterStarted = true; if (!mFsListener->startListening()) { eos_crit("error starting shared objects change notifications"); } else { eos_info("GeoTreeEngine updater is starting..."); } std::chrono::seconds timeout {1}; mq::FsChangeListener::Event event; while (!assistant.terminationRequested()) { while (sem_wait(&gUpdaterPauseSem)) { if (EINTR != errno) { throw "sem_wait() failed"; } } while (mFsListener->fetch(assistant, event, timeout)) { if (event.isDeletion()) { eos_debug("received deletion on subject %s : the fs was removed from " "the GeoTreeEngine, skipping this update", event.fileSystemQueue.c_str()); continue; } eos::common::RWMutexWriteLock wr_lock(pAddRmFsMutex); auto notifTypeIt = gQueue2NotifType.find(event.fileSystemQueue); if (notifTypeIt == gQueue2NotifType.end()) { eos_err("could not determine the type of notification associated to queue ", event.fileSystemQueue.c_str()); } else { // A machine might have several roles at the same time (DataProxy and // Gateway), so an update might end in multiple update maps if (notifTypeIt->second & sntFilesystem) { if (gNotificationsBufferFs.count(event.fileSystemQueue)) { (gNotificationsBufferFs)[event.fileSystemQueue] |= gNotifKey2EnumSched.at( event.key); } else { (gNotificationsBufferFs)[event.fileSystemQueue] = gNotifKey2EnumSched.at( event.key); } } } } // Do the processing common::IntervalStopwatch stopwatch((std::chrono::milliseconds( pTimeFrameDurationMs))); { // Do it before tree info to leave some time to the other threads checkPendingDeletionsFs(); checkPendingDeletionsDp(); { eos::common::RWMutexWriteLock lock(pAddRmFsMutex); if (!gNotificationsBufferFs.empty() || !gNotificationsBufferProxy.empty()) { updateTreeInfo(gNotificationsBufferFs, gNotificationsBufferProxy); } gNotificationsBufferFs.clear(); gNotificationsBufferProxy.clear(); } } pFrameCount++; if (sem_post(&gUpdaterPauseSem)) { throw "sem_post() failed"; } assistant.wait_for(stopwatch.timeRemainingInCycle()); } } bool GeoTreeEngine::updateTreeInfo(SchedTME* entry, eos::common::FileSystem::fs_snapshot_t* fs, int keys, SchedTreeBase::tFastTreeIdx ftIdx , SlowTreeNode* stn) { // We get a consistent set of configuration parameters per refresh of the state eos::common::RWMutexReadLock lock(configMutex); // Nothing to update if ((!ftIdx && !stn) || !keys) { return true; } #define setOneStateVarInAllFastTrees(variable,value) \ { \ entry->backgroundFastStruct->rOAccessTree->pNodes[ftIdx].fsData.variable = value; \ entry->backgroundFastStruct->rWAccessTree->pNodes[ftIdx].fsData.variable = value; \ entry->backgroundFastStruct->placementTree->pNodes[ftIdx].fsData.variable = value; \ entry->backgroundFastStruct->drnAccessTree->pNodes[ftIdx].fsData.variable = value; \ entry->backgroundFastStruct->drnPlacementTree->pNodes[ftIdx].fsData.variable = value; \ } #define setOneStateVarStatusInAllFastTrees(flag) \ { \ entry->backgroundFastStruct->rOAccessTree->pNodes[ftIdx].fsData.mStatus |= flag; \ entry->backgroundFastStruct->rWAccessTree->pNodes[ftIdx].fsData.mStatus |= flag; \ entry->backgroundFastStruct->placementTree->pNodes[ftIdx].fsData.mStatus |= flag; \ entry->backgroundFastStruct->drnAccessTree->pNodes[ftIdx].fsData.mStatus |= flag; \ entry->backgroundFastStruct->drnPlacementTree->pNodes[ftIdx].fsData.mStatus |= flag; \ } #define unsetOneStateVarStatusInAllFastTrees(flag) \ { \ entry->backgroundFastStruct->rOAccessTree->pNodes[ftIdx].fsData.mStatus &= ~flag; \ entry->backgroundFastStruct->rWAccessTree->pNodes[ftIdx].fsData.mStatus &= ~flag; \ entry->backgroundFastStruct->placementTree->pNodes[ftIdx].fsData.mStatus &= ~flag; \ entry->backgroundFastStruct->drnAccessTree->pNodes[ftIdx].fsData.mStatus &= ~flag; \ entry->backgroundFastStruct->drnPlacementTree->pNodes[ftIdx].fsData.mStatus &= ~flag; \ } if (keys & sfgGeotag) { // update the treenodeinfo string newGeoTag = fs->mGeoTag; if (newGeoTag.empty()) { newGeoTag = "nogeotag"; } FileSystem::fsid_t fsid = fs->mId; if (!fsid) { eos_err("%s", "msg=\"skip update for fsid=0\""); return false; } entry->slowTreeMutex.LockWrite(); if (!entry->fs2SlowTreeNode.count(fsid)) { eos_err("msg=\"no such slowtree node fsid=%lu\"", fsid); entry->slowTreeMutex.UnLockWrite(); return false; } SlowTreeNode* oldNode = entry->fs2SlowTreeNode[fsid]; //const string &oldGeoTag = oldNode->pNodeInfo.fullGeotag; string oldGeoTag = oldNode->pNodeInfo.fullGeotag; oldGeoTag = (oldGeoTag.rfind("::") != std::string::npos) ? oldGeoTag.substr(0, oldGeoTag.rfind("::")) : std::string(""); //CHECK IF CHANGE ACTUALLY HAPPENED BEFORE ACTUALLY CHANGING SOMETHING if (oldGeoTag != newGeoTag) { // do the change only if there is one SlowTreeNode* newNode = NULL; newNode = entry->slowTree->moveToNewGeoTag(oldNode, newGeoTag); if (!newNode) { stringstream ss; ss << (*entry->slowTree); eos_err("error changing geotag in slowtree : move is \"%s\" => \"%s\" " "and slowtree is \n%s\n", oldGeoTag.c_str(), newGeoTag.c_str(), ss.str().c_str()); entry->slowTreeMutex.UnLockWrite(); return false; } eos_debug("geotag change detected : old geotag is \"%s\" new geotag is \"%s\"", oldGeoTag.c_str(), newGeoTag.c_str()); entry->slowTreeModified = true; entry->fs2SlowTreeNode[fsid] = newNode; // !!! change the argument too stn = newNode; } entry->slowTreeMutex.UnLockWrite(); } if (keys & sfgId) { // should not happen // eos_crit("the FsId should not change once it's created: new value // is %lu",(unsigned long)fs->mId); // .... unless it is the first change to give to the id it's initial // value. It happens after it's been created so it's seen as a change. } if (keys & (sfgBoot | sfgActive | sfgErrc)) { BootStatus statboot = fs->mStatus; unsigned int errc = fs->mErrCode; ActiveStatus statactive = fs->mActiveStatus; eos_debug("fs %lu available recompute boot=%s errcode=%d active=%s", (unsigned long) fs->mId, eos::common::FileSystem::GetStatusAsString(statboot), errc, (statactive == eos::common::ActiveStatus::kOnline) ? "online" : "offline"); if ((statboot == BootStatus::kBooted) && (errc == 0) && // this we probably don't need // This checks the heartbeat and the group & node are enabled (statactive == ActiveStatus::kOnline)) { // the fs is available eos_debug("fs %lu is getting available ftidx=%d stn=%p", (unsigned long) fs->mId, (int)ftIdx, stn); if (ftIdx) { setOneStateVarStatusInAllFastTrees(SchedTreeBase::Available); } if (stn) { stn->pNodeState.mStatus |= SchedTreeBase::Available; } } else { // the fs is unavailable eos_debug("fs %lu is getting unavailable ftidx=%d stn=%p", (unsigned long) fs->mId, (int)ftIdx, stn); if (ftIdx) { unsetOneStateVarStatusInAllFastTrees(SchedTreeBase::Available); } if (stn) { stn->pNodeState.mStatus &= ~SchedTreeBase::Available; } } } if (keys & sfgConfigstatus) { common::ConfigStatus status = fs->mConfigStatus; if (status == common::ConfigStatus::kRW) { if (ftIdx) { setOneStateVarStatusInAllFastTrees(SchedTreeBase::Readable | SchedTreeBase::Writable); } if (stn) { stn->pNodeState.mStatus |= (SchedTreeBase::Readable | SchedTreeBase::Writable); } } else if (status == common::ConfigStatus::kRO || status == common::ConfigStatus::kDrain) { if (ftIdx) { setOneStateVarStatusInAllFastTrees(SchedTreeBase::Readable); unsetOneStateVarStatusInAllFastTrees(SchedTreeBase::Writable); } if (stn) { stn->pNodeState.mStatus |= SchedTreeBase::Readable; stn->pNodeState.mStatus &= ~SchedTreeBase::Writable; } } else if (status == common::ConfigStatus::kWO) { if (ftIdx) { unsetOneStateVarStatusInAllFastTrees(SchedTreeBase::Readable); setOneStateVarStatusInAllFastTrees(SchedTreeBase::Writable); } if (stn) { stn->pNodeState.mStatus &= ~SchedTreeBase::Readable; stn->pNodeState.mStatus |= SchedTreeBase::Writable; } } else { if (ftIdx) { unsetOneStateVarStatusInAllFastTrees(SchedTreeBase::Readable); unsetOneStateVarStatusInAllFastTrees(SchedTreeBase::Writable); } if (stn) { stn->pNodeState.mStatus &= ~SchedTreeBase::Readable; stn->pNodeState.mStatus &= ~SchedTreeBase::Writable; } } } if (keys & sfgDrain) { DrainStatus drainStatus = fs->mDrainStatus; if (fs->mConfigStatus == common::ConfigStatus::kDrain && drainStatus == DrainStatus::kDraining) { // mark as draining if (ftIdx) { setOneStateVarStatusInAllFastTrees(SchedTreeBase::Draining); } if (stn) { stn->pNodeState.mStatus |= SchedTreeBase::Draining; } } else { // This covers the following cases // case common::ConfigStatus::kNoDrain: // case common::ConfigStatus::kDrainPrepare: // case common::ConfigStatus::kDrainWait: // case common::ConfigStatus::kDrainStalling: // case common::ConfigStatus::kDrained: // case common::ConfigStatus::kDrainExpired: if (ftIdx) { unsetOneStateVarStatusInAllFastTrees(SchedTreeBase::Draining); } if (stn) { stn->pNodeState.mStatus &= ~SchedTreeBase::Draining; } } } if (keys & sfgDrainer) { if (ftIdx) { setOneStateVarStatusInAllFastTrees(SchedTreeBase::Drainer); } if (stn) { stn->pNodeState.mStatus |= SchedTreeBase::Drainer; } } if (keys & (sfgFsfilled | sfgNomfilled)) { auto nominal = fs->mNominalFilled; auto filled = fs->mDiskFilled; bool balancing = false; if (nominal && (filled >= nominal)) { balancing = true; } if (balancing) { if (ftIdx) { setOneStateVarStatusInAllFastTrees(SchedTreeBase::Balancing); } if (stn) { stn->pNodeState.mStatus |= SchedTreeBase::Balancing; } } else { if (ftIdx) { unsetOneStateVarStatusInAllFastTrees(SchedTreeBase::Balancing); } if (stn) { stn->pNodeState.mStatus &= ~SchedTreeBase::Balancing; } } } if (keys & sfgBlkavailb) { float ts = float(fs->mDiskBfree * (double)fs->mDiskBsize); // Account also for the headroom on the fst ts = ts - fs->mHeadRoom; if (ts < 0) { ts = 0; } if (ftIdx) { setOneStateVarInAllFastTrees(totalSpace, ts); setOneStateVarInAllFastTrees(totalWritableSpace, ts); } if (stn) { stn->pNodeState.totalSpace = ts; stn->pNodeState.totalWritableSpace = ts; } } // <1Gb/s -> 0 ; 1Gb/s -> 1; 10Gb/s->2 ; 100Gb/s->...etc size_t netSpeedClass = 0; if ((keys & sfgPubTmStmp) && fs->mPublishTimestamp) { // update the latency of this fs tLatencyStats* lstat = NULL; if (ftIdx) { if (((int)((*entry->backgroundFastStruct->treeInfo)[ftIdx].fsId)) < (( int)pLatencySched.pFsId2LatencyStats.size())) { lstat = &pLatencySched.pFsId2LatencyStats[(*entry->backgroundFastStruct->treeInfo)[ftIdx].fsId]; } else { eos_crit("trying to update latency for fs %d but latency stats vector " "size is %d : something is wrong", (int)(*entry->backgroundFastStruct->treeInfo)[ftIdx].fsId, (int)pLatencySched.pFsId2LatencyStats.size()); } } else if (stn) { if ((int)(stn->pNodeInfo.fsId) < ((int) pLatencySched.pFsId2LatencyStats.size())) { lstat = &pLatencySched.pFsId2LatencyStats[stn->pNodeInfo.fsId]; } else { eos_err("trying to update latency for fs %d but latency stats vector " "size is %d : something is wrong", (int)(stn->pNodeInfo.fsId), (int)pLatencySched.pFsId2LatencyStats.size()); } } if (lstat) { lstat->lastupdate = fs->mPublishTimestamp; lstat->update(); } } if (keys & (sfgDiskload | sfgInratemib | sfgWopen)) { // update the upload score double ulScore = (1 - fs->mDiskUtilization); double netoutweight = (1.0 - ((fs->mNetEthRateMiB) ? (fs->mNetOutRateMiB / fs->mNetEthRateMiB) : 0.0)); ulScore *= ((netoutweight > 0) ? sqrt(netoutweight) : 0); if (fs->mMaxDiskWopen && (fs->mDiskWopen >= fs->mMaxDiskWopen)) { ulScore = 0; } if (ftIdx) { setOneStateVarInAllFastTrees(ulScore, (char)(ulScore * 100)); } if (stn) { stn->pNodeState.ulScore = ulScore * 100; } } if (keys & (sfgOutratemib | sfgDiskload | sfgReadratemb | sfgRopen)) { double dlScore = (1 - fs->mDiskUtilization); double netinweight = (1.0 - ((fs->mNetEthRateMiB) ? (fs->mNetInRateMiB / fs->mNetEthRateMiB) : 0.0)); dlScore *= ((netinweight > 0) ? sqrt(netinweight) : 0); if (fs->mMaxDiskRopen && (fs->mDiskRopen >= fs->mMaxDiskRopen)) { dlScore = 0; } if (ftIdx) { setOneStateVarInAllFastTrees(dlScore, (char)(dlScore * 100)); } if (stn) { stn->pNodeState.dlScore = dlScore * 100; } } if (keys & (sfgDiskload | sfgInratemib | sfgOutratemib | sfgEthmib)) { netSpeedClass = round(log10(fs->mNetEthRateMiB * 8 * 1024 * 1024 + 1)); // netSpeedClass 1 means 1Gbps netSpeedClass = netSpeedClass > 8 ? netSpeedClass - 8 : 0; // check if netspeed class needs an update if (entry->backgroundFastStruct->treeInfo->size() >= netSpeedClass + 1 && (*entry->backgroundFastStruct->treeInfo)[ftIdx].netSpeedClass != (unsigned char)netSpeedClass) { if (ftIdx) { (*entry->backgroundFastStruct->treeInfo)[ftIdx].netSpeedClass = netSpeedClass; } if (stn) { stn->pNodeInfo.netSpeedClass = netSpeedClass; } } // This one will create the entry if it doesnt exists already nodeAgreg& na = pPenaltySched.pUpdatingNodes[fs->mHostPort]; na.fsCount++; if (!na.saturated) { if (na.fsCount == 1) { na.netSpeedClass = netSpeedClass; pPenaltySched.pMaxNetSpeedClass = std::max(pPenaltySched.pMaxNetSpeedClass , netSpeedClass); na.netOutWeight += (1.0 - ((fs->mNetEthRateMiB) ? (fs->mNetOutRateMiB / fs->mNetEthRateMiB) : 0.0)); na.netInWeight += (1.0 - ((fs->mNetEthRateMiB) ? (fs->mNetInRateMiB / fs->mNetEthRateMiB) : 0.0)); if (na.netOutWeight < 0.1 || na.netInWeight < 0.1) { na.saturated = true; // network of the box is saturated } } na.rOpen += fs->mDiskRopen; na.wOpen += fs->mDiskWopen; na.diskUtilSum += fs->mDiskUtilization; if (fs->mDiskUtilization > 0.9) { na.saturated = true; // one of the disks of the box is saturated } } // apply penalties that are still valid on fast trees if (ftIdx) { recallScorePenalty(entry, ftIdx); } // in case the fs in not in the fast trees , it has not been // used recently to schedule , so there is no penalty to recall! // so there is nothing like if(stn) recallScorePenalty(entry, stn); } if (keys & sfgFsfilled) { if (ftIdx) { setOneStateVarInAllFastTrees(fillRatio, (char)fs->mDiskFilled); } if (stn) { stn->pNodeState.fillRatio = (char)fs->mDiskFilled; } } // SHOULD WE TAKE THE NOMINAL FILLING AS SET BY THE BALANCING? // if(keys&(sfgNomfilled)) { // fs-> // } return true; #undef setOneStateVarInAllFastTrees #undef setOneStateVarStatusInAllFastTrees #undef unsetOneStateVarStatusInAllFastTrees } bool GeoTreeEngine::updateTreeInfo(const map& updatesFs, const map& updatesDp) { // copy the foreground FastStructures to the BackGround FastStructures // so that the penalties applied after the placement/access are kept by defaut // (and overwritten if a new state is received from the fs) // => SCHEDULING pTreeMapMutex.LockRead(); for (auto it = pGroup2SchedTME.begin(); it != pGroup2SchedTME.end(); it++) { SchedTME* entry = it->second; RWMutexReadLock lock(entry->slowTreeMutex); if (!entry->foregroundFastStruct->DeepCopyTo(entry->backgroundFastStruct)) { eos_crit("error deep copying in double buffering"); pTreeMapMutex.UnLockRead(); return false; } // Copy the penalties of the last frame from each group and reset the // penalties counter in the fast trees. auto& pVec = pPenaltySched.pCircFrCnt2FsPenalties[pFrameCount % pCircSize]; for (auto it2 = entry->foregroundFastStruct->fs2TreeIdx->begin(); it2 != entry->foregroundFastStruct->fs2TreeIdx->end(); it2++) { auto cur = *it2; pVec[cur.first] = (*entry->foregroundFastStruct->penalties)[cur.second]; AtomicCAS((*entry->foregroundFastStruct->penalties)[cur.second].dlScorePenalty, (*entry->foregroundFastStruct->penalties)[cur.second].dlScorePenalty, (char)0); AtomicCAS((*entry->foregroundFastStruct->penalties)[cur.second].ulScorePenalty, (*entry->foregroundFastStruct->penalties)[cur.second].ulScorePenalty, (char)0); } } pTreeMapMutex.UnLockRead(); // => PROXYGROUPS pPxyTreeMapMutex.LockRead(); for (auto it = pPxyGrp2DpTME.begin(); it != pPxyGrp2DpTME.end(); it++) { DataProxyTME* entry = it->second; RWMutexReadLock lock(entry->slowTreeMutex); if (!entry->foregroundFastStruct->DeepCopyTo(entry->backgroundFastStruct)) { eos_crit("error deep copying in double buffering"); pPxyTreeMapMutex.UnLockRead(); return false; } // Copy the penalties of the last frame from each group and reset the // penalties counter in the fast trees. auto& pMap = pPenaltySched.pCircFrCnt2HostPenalties[pFrameCount % pCircSize]; for (auto it2 = entry->foregroundFastStruct->host2TreeIdx->begin(); it2 != entry->foregroundFastStruct->host2TreeIdx->end(); it2++) { auto cur = *it2; pMap[cur.first] = (*entry->foregroundFastStruct->penalties)[cur.second]; AtomicCAS((*entry->foregroundFastStruct->penalties)[cur.second].dlScorePenalty, (*entry->foregroundFastStruct->penalties)[cur.second].dlScorePenalty, (char)0); AtomicCAS((*entry->foregroundFastStruct->penalties)[cur.second].ulScorePenalty, (*entry->foregroundFastStruct->penalties)[cur.second].ulScorePenalty, (char)0); } } pPxyTreeMapMutex.UnLockRead(); // timestamp the current frame { struct timeval curtime; gettimeofday(&curtime, 0); pLatencySched.pCircFrCnt2Timestamp[pFrameCount % pCircSize] = (( size_t)curtime.tv_sec) * 1000 + ((size_t)curtime.tv_usec) / 1000; } pPenaltySched.pUpdatingNodes.clear(); pPenaltySched.pMaxNetSpeedClass = 0; // => SCHED for (auto it = updatesFs.begin(); it != updatesFs.end(); ++it) { pTreeMapMutex.LockRead(); eos::common::FileSystem* filesystem = FsView::gFsView.mIdView.lookupByQueuePath( it->first); if (!filesystem) { eos_err("update : Invalid FileSystem Entry, skipping this update"); pTreeMapMutex.UnLockRead(); continue; } eos::common::FileSystem::fs_snapshot_t fs; filesystem->SnapShotFileSystem(fs, true); FileSystem::fsid_t fsid = fs.mId; if (!pFs2SchedTME.count(fsid)) { eos_err("update : TreeEntryMap has been removed, skipping this update"); pTreeMapMutex.UnLockRead(); continue; } SchedTME* entry = pFs2SchedTME[fsid]; AtomicInc(entry->fastStructLockWaitersCount); pTreeMapMutex.UnLockRead(); eos_debug("CHANGE BITFIELD %s => %x", it->first.c_str(), it->second); // Update only the fast structures because even if a fast structure rebuild // is needed from the slow tree. Its information and state is updated from // the fast structures. entry->doubleBufferMutex.LockRead(); const SchedTreeBase::tFastTreeIdx* idx = NULL; SlowTreeNode* node = NULL; if (!entry->backgroundFastStruct->fs2TreeIdx->get(fsid, idx)) { auto nodeit = entry->fs2SlowTreeNode.find(fsid); if (nodeit == entry->fs2SlowTreeNode.end()) { eos_crit("Inconsistency : cannot locate an fs %lu supposed to be in " "the fast structures", (unsigned long)fsid); entry->doubleBufferMutex.UnLockRead(); AtomicDec(entry->fastStructLockWaitersCount); return false; } node = nodeit->second; eos_debug("no fast tree for fs %lu : updating slowtree", (unsigned long)fsid); } else { eos_debug("fast tree available for fs %lu : not updating slowtree", (unsigned long)fsid); } updateTreeInfo(entry, &fs, it->second, idx ? *idx : 0 , node); if (idx) { entry->fastStructModified = true; } if (node) { entry->slowTreeModified = true; } // if we update the slowtree, then a fast tree generation is already pending entry->doubleBufferMutex.UnLockRead(); AtomicDec(entry->fastStructLockWaitersCount); } // Update the atomic penalties updateAtomicPenalties(); // Update the trees that need to be updated (could maybe optimized by // updating only the branch needing, might be worth it if only 1 or 2 // branches are updated). Self update for the fast structure if update // from slow tree is not needed. If convert from slowtree is needed, // update the slowtree from the fast for the info and for the state // => SCHED pTreeMapMutex.LockRead(); for (auto it = pGroup2SchedTME.begin(); it != pGroup2SchedTME.end(); it++) { SchedTME* entry = it->second; RWMutexReadLock lock(entry->slowTreeMutex); if (!updateFastStructures(entry)) { pTreeMapMutex.UnLockRead(); eos_err("error updating the tree"); return false; } } pTreeMapMutex.UnLockRead(); return true; } bool GeoTreeEngine::getInfosFromFsIds(const std::vector& fsids, std::vector* fsgeotags, std::vector* hosts, std::vector* sortedgroups) { bool result = true; if (fsgeotags) { fsgeotags->reserve(fsids.size()); } if (sortedgroups) { sortedgroups->reserve(fsids.size()); } std::map group2idx; std::vector > groupcount; groupcount.reserve(fsids.size()); { RWMutexReadLock lock(this->pTreeMapMutex); for (auto it = fsids.begin(); it != fsids.end(); ++ it) { if (pFs2SchedTME.count(*it)) { FsGroup* group = pFs2SchedTME[*it]->group; if (fsgeotags || hosts) { const SchedTreeBase::tFastTreeIdx* idx = NULL; if (pFs2SchedTME[*it]->foregroundFastStruct->fs2TreeIdx->get(*it, idx)) { if (fsgeotags) fsgeotags->push_back( (*pFs2SchedTME[*it]->foregroundFastStruct->treeInfo)[*idx].fullGeotag ); if (hosts) hosts->push_back( (*pFs2SchedTME[*it]->foregroundFastStruct->treeInfo)[*idx].host ); } else { if (fsgeotags) { fsgeotags->push_back(""); } if (hosts) { hosts->push_back(""); } } } if (sortedgroups) { if (!group2idx.count(group)) { group2idx[group] = group2idx.size(); sortedgroups->push_back(group); groupcount.push_back(make_pair(1, groupcount.size())); } else { size_t idx = group2idx[group]; groupcount[idx].first++; } } } else { // put an empty entry in the result vector to preserve the indexing if (fsgeotags) { fsgeotags->push_back(""); } if (hosts) { hosts->push_back(""); } // to signal that one of the fsids was not mapped to a group result = false; } } } if (sortedgroups) { // sort the count vector in ascending order to get the permutation std::sort(groupcount.begin(), groupcount.end(), std::greater>()); // apply the permutation std::vector final(groupcount.size()); size_t count = 0; for (auto it = groupcount.begin(); it != groupcount.end(); it++) { final[count++] = (*sortedgroups)[it->second]; } *sortedgroups = final; } return result; } void GeoTreeEngine::updateAtomicPenalties() { // In this function, we compute a rough a simplified version // of the penalties applied to selected fs for placement and access. // there is only one penalty and it's copied to ulplct, dlplct, ulaccess and dlaccess // variants. // if the update is enabled if (pPenaltyUpdateRate) { if (pPenaltySched.pUpdatingNodes.empty()) { //eos_debug("updatingNodes is empty!"); } else { // each networking speed class has its own variables std::vector ropen(pPenaltySched.pMaxNetSpeedClass + 1, 0.0), wopen(pPenaltySched.pMaxNetSpeedClass + 1, 0.0), ulload(pPenaltySched.pMaxNetSpeedClass + 1, 0.0), dlload(pPenaltySched.pMaxNetSpeedClass + 1, 0.0), fscount(pPenaltySched.pMaxNetSpeedClass + 1, 0.0), hostcount(pPenaltySched.pMaxNetSpeedClass + 1, 0.0), diskutil(pPenaltySched.pMaxNetSpeedClass + 1, 0.0); // we use the view to check that we have all the fs in a node // could be removed if we were sure to run a single on fst daemon / box // WARNING: see below / FsView::gFsView.ViewMutex.LockRead(); for (auto it = pPenaltySched.pUpdatingNodes.begin(); it != pPenaltySched.pUpdatingNodes.end(); it++) { const std::string& nodestr = it->first; // =============== // WARNING: the following part is commented out because it can create a // deadlock with FsViewMutex/pAddRmFsMutex in the above FsViewMutex lock // when inserting/removing a filesystem. It can be fixed but it's not // trivial. Because it's not needed in operation, we don't fix it for now. // When using several fst daemons on the same host, it could give // overestimated atomic penalties when they are selfestimated // =============== /* FsNode *node = NULL; if(FsView::gFsView.mNodeView.count(nodestr)) node = FsView::gFsView.mNodeView[nodestr]; else { std::stringstream ss; ss.str(""); for (auto it2 = FsView::gFsView.mNodeView.begin(); it2 != FsView::gFsView.mNodeView.end(); it2++) { ss << it2->first << " "; } eos_err("Inconsistency : cannot find updating node %s in %s", nodestr.c_str(),ss.str().c_str()); continue; } if((!it->second.saturated) && it->second.fsCount == node->size()) */ // =============== if ((!it->second.saturated)) { // eos_debug("aggregated opened files for %s: wopen %d, ropen %d, // outweight %lf, inweight %lf", it->first.c_str(), // it->second.wOpen, it->second.rOpen, // it->second.netOutWeight, it->second.netInWeight); // Update aggregated informations for the right networking class // (take into account only unsaturated boxes) ropen[it->second.netSpeedClass] += (it->second.rOpen); wopen[it->second.netSpeedClass] += (it->second.wOpen); ulload[it->second.netSpeedClass] += (1.0 - it->second.netOutWeight); dlload[it->second.netSpeedClass] += (1.0 - it->second.netInWeight); diskutil[it->second.netSpeedClass] += it->second.diskUtilSum; fscount[it->second.netSpeedClass] += it->second.fsCount; hostcount[it->second.netSpeedClass]++; } else { // The fs/host is saturated, we don't use the whole host in the estimate eos_debug("fs update in node %s : box is saturated", nodestr.c_str()); continue; // Could force to get everything // long long wopen = node->SumLongLong("stat.wopen",false); // long long ropen = node->SumLongLong("stat.ropen",false); } } // WARNING: see above / FsView::gFsView.ViewMutex.UnLockRead(); for (size_t netSpeedClass = 0; netSpeedClass <= pPenaltySched.pMaxNetSpeedClass; netSpeedClass++) { if (ropen[netSpeedClass] + wopen[netSpeedClass] > 4) { eos_debug("UPDATE netSpeedClass=%d, ulload=%lf, dlload=%lf, " "diskutil=%lf, ropen=%lf, wopen=%lf fscount=%lf, " "hostcount=%lf", (int)netSpeedClass, ulload[netSpeedClass], dlload[netSpeedClass], diskutil[netSpeedClass], ropen[netSpeedClass], wopen[netSpeedClass], fscount[netSpeedClass], hostcount[netSpeedClass]); // The penalty aims at knowing roughly how many concurrent file // operations can be done on a single fs before sturating a ressource // (disk or network) // network penalty per file = the multiplication by the number of fs // is to take into account that the bw is shared between multiple fs double avgnetload = 0.5 * (ulload[netSpeedClass] + dlload[netSpeedClass]) / (ropen[netSpeedClass] + wopen[netSpeedClass]); double networkpenSched = avgnetload * (fscount[netSpeedClass] / hostcount[netSpeedClass]); double networkpenGw = avgnetload; // double networkpen = // 0.5*(ulload[netSpeedClass]+dlload[netSpeedClass])/(ropen[netSpeedClass]+wopen[netSpeedClass]) // *(fscount[netSpeedClass]/hostcount[netSpeedClass]); // there is factor to take into account the read cache // TODO use a realistic value for this factor double diskpen = diskutil[netSpeedClass] / (0.4 * ropen[netSpeedClass] + wopen[netSpeedClass]); eos_debug("penalties updates for scheduling are network %lf disk %lf", networkpenSched, diskpen); eos_debug("penalties updates for gateway/dataproxy are network %lf", networkpenGw, diskpen); double updateSched = 100 * std::max(diskpen, networkpenSched); double updateGw = 100 * networkpenGw; if (updateSched < 1 || updateSched > 99) { // could be more restrictive eos_debug("weird value for accessDlScorePenalty update : %lf. Not " "using this one.", updateSched); } else { eos_debug("netSpeedClass %d : using update values %lf for penalties " "with weight %f%%", netSpeedClass, pPenaltyUpdateRate); eos_debug("netSpeedClass %d : values before update are " "accessDlScorePenalty=%f, plctDlScorePenalty=%f, " "accessUlScorePenalty=%f, plctUlScorePenalty=%f", netSpeedClass, pPenaltySched.pAccessDlScorePenaltyF[netSpeedClass], pPenaltySched.pPlctDlScorePenaltyF[netSpeedClass], pPenaltySched.pAccessUlScorePenaltyF[netSpeedClass], pPenaltySched.pPlctUlScorePenaltyF[netSpeedClass]); union { float f; uint32_t u; } uf; // Atomic change, no need to lock anything uf.f = 0.01 * ((100 - pPenaltyUpdateRate) * pPenaltySched.pAccessDlScorePenaltyF[netSpeedClass] + pPenaltyUpdateRate * updateSched); AtomicCAS(reinterpret_cast (pPenaltySched.pAccessDlScorePenaltyF[netSpeedClass]) , reinterpret_cast(pPenaltySched.pAccessDlScorePenaltyF[netSpeedClass]) , uf.u); uf.f = 0.01 * ((100 - pPenaltyUpdateRate) * pPenaltySched.pPlctDlScorePenaltyF[netSpeedClass] + pPenaltyUpdateRate * updateSched); AtomicCAS(reinterpret_cast (pPenaltySched.pPlctDlScorePenaltyF[netSpeedClass]) , reinterpret_cast(pPenaltySched.pPlctDlScorePenaltyF[netSpeedClass]) , uf.u); uf.f = 0.01 * ((100 - pPenaltyUpdateRate) * pPenaltySched.pAccessUlScorePenaltyF[netSpeedClass] + pPenaltyUpdateRate * updateSched); AtomicCAS(reinterpret_cast (pPenaltySched.pAccessUlScorePenaltyF[netSpeedClass]) , reinterpret_cast(pPenaltySched.pAccessUlScorePenaltyF[netSpeedClass]) , uf.u); uf.f = 0.01 * ((100 - pPenaltyUpdateRate) * pPenaltySched.pPlctUlScorePenaltyF[netSpeedClass] + pPenaltyUpdateRate * updateSched); AtomicCAS(reinterpret_cast (pPenaltySched.pPlctUlScorePenaltyF[netSpeedClass]) , reinterpret_cast(pPenaltySched.pPlctUlScorePenaltyF[netSpeedClass]) , uf.u); uf.f = 0.01 * ((100 - pPenaltyUpdateRate) * pPenaltySched.pProxyScorePenaltyF[netSpeedClass] + pPenaltyUpdateRate * updateGw); AtomicCAS(reinterpret_cast (pPenaltySched.pProxyScorePenaltyF[netSpeedClass]) , reinterpret_cast(pPenaltySched.pProxyScorePenaltyF[netSpeedClass]) , uf.u); eos_debug("netSpeedClass %d : values after update are " "accessDlScorePenalty=%f, plctDlScorePenalty=%f, " "accessUlScorePenalty=%f, plctUlScorePenalty=%f, " "gwScorePenalty=%f", netSpeedClass, pPenaltySched.pAccessDlScorePenaltyF[netSpeedClass], pPenaltySched.pPlctDlScorePenaltyF[netSpeedClass], pPenaltySched.pAccessUlScorePenaltyF[netSpeedClass], pPenaltySched.pPlctUlScorePenaltyF[netSpeedClass], pPenaltySched.pProxyScorePenaltyF[netSpeedClass]); // Update the casted versions too AtomicCAS(pPenaltySched.pPlctUlScorePenalty[netSpeedClass], pPenaltySched.pPlctUlScorePenalty[netSpeedClass], (SchedTreeBase::tFastTreeIdx) pPenaltySched.pPlctUlScorePenaltyF[netSpeedClass]); AtomicCAS(pPenaltySched.pPlctDlScorePenalty[netSpeedClass], pPenaltySched.pPlctDlScorePenalty[netSpeedClass], (SchedTreeBase::tFastTreeIdx) pPenaltySched.pPlctDlScorePenaltyF[netSpeedClass]); AtomicCAS(pPenaltySched.pAccessDlScorePenalty[netSpeedClass], pPenaltySched.pAccessDlScorePenalty[netSpeedClass], (SchedTreeBase::tFastTreeIdx) pPenaltySched.pAccessDlScorePenaltyF[netSpeedClass]); AtomicCAS(pPenaltySched.pAccessUlScorePenalty[netSpeedClass], pPenaltySched.pAccessUlScorePenalty[netSpeedClass], (SchedTreeBase::tFastTreeIdx) pPenaltySched.pAccessUlScorePenaltyF[netSpeedClass]); AtomicCAS(pPenaltySched.pProxyScorePenalty[netSpeedClass], pPenaltySched.pProxyScorePenalty[netSpeedClass], (SchedTreeBase::tFastTreeIdx) pPenaltySched.pProxyScorePenaltyF[netSpeedClass]); } } else { eos_debug("not enough file opened to get reliable statistics %d", (int)(ropen[netSpeedClass] + wopen[netSpeedClass])); } } } } } bool GeoTreeEngine::setSkipSaturatedAccess(bool value, bool setconfig) { return setInternalParam(pSkipSaturatedAccess, (int)value, false, setconfig ? "skipsaturatedaccess" : ""); } bool GeoTreeEngine::setSkipSaturatedDrnAccess(bool value, bool setconfig) { return setInternalParam(pSkipSaturatedDrnAccess, (int)value, false, setconfig ? "skipsaturateddrnaccess" : ""); } bool GeoTreeEngine::setSkipSaturatedBlcAccess(bool value, bool setconfig) { return setInternalParam(pSkipSaturatedBlcAccess, (int)value, false, setconfig ? "skipsaturatedblcaccess" : ""); } bool GeoTreeEngine::setProxyCloseToFs(bool value, bool setconfig) { return setInternalParam(pProxyCloseToFs, (int)value, false, setconfig ? "proxyclosetofs" : ""); } bool GeoTreeEngine::setScorePenalty(std::vector& fvector, std::vector& cvector, const std::vector& vvalue, const std::string& configentry) { if (vvalue.size() != 8) { return false; } std::vector valuef(8); for (int i = 0; i < 8; i++) { valuef[i] = vvalue[i]; } return setInternalParam(fvector, valuef, false, "") && setInternalParam(cvector, vvalue, false, configentry); } bool GeoTreeEngine::setScorePenalty(std::vector& fvector, std::vector& cvector, const char* svalue, const std::string& configentry) { std::vector dvvalue(8); std::vector vvalue(8); if (sscanf(svalue, "[%lf,%lf,%lf,%lf,%lf,%lf,%lf,%lf]", &dvvalue[0], &dvvalue[1], &dvvalue[2], &dvvalue[3], &dvvalue[4], &dvvalue[5], &dvvalue[6], &dvvalue[7]) != 8) { return false; } for (int i = 0; i < 8; i++) { vvalue[i] = (char)dvvalue[i]; } return setScorePenalty(fvector, cvector, vvalue, configentry); } bool GeoTreeEngine::setScorePenalty(std::vector& fvector, std::vector& cvector, char value, int netSpeedClass, const std::string& configentry) { if (netSpeedClass >= 0) { if (netSpeedClass >= (int)fvector.size()) { return false; } // return setInternalParam(fvector[netSpeedClass],(float)value,false,"") // && setInternalParam(cvector[netSpeedClass],value,false,configentry); std::vector vvalue(cvector); vvalue[netSpeedClass] = value; return setScorePenalty(fvector, cvector, vvalue, configentry); } else if (netSpeedClass == -1) { std::vector vvalue(8, value); return setScorePenalty(fvector, cvector, vvalue, configentry); } return false; } bool GeoTreeEngine::setPlctDlScorePenalty(char value, int netSpeedClass, bool setconfig) { return setScorePenalty(pPenaltySched.pPlctDlScorePenaltyF, pPenaltySched.pPlctDlScorePenalty, value, netSpeedClass, setconfig ? "plctdlscorepenalty" : ""); } bool GeoTreeEngine::setPlctUlScorePenalty(char value, int netSpeedClass, bool setconfig) { return setScorePenalty(pPenaltySched.pPlctUlScorePenaltyF, pPenaltySched.pPlctUlScorePenalty, value, netSpeedClass, setconfig ? "plctulscorepenalty" : ""); } bool GeoTreeEngine::setAccessDlScorePenalty(char value, int netSpeedClass, bool setconfig) { return setScorePenalty(pPenaltySched.pAccessDlScorePenaltyF, pPenaltySched.pAccessDlScorePenalty, value, netSpeedClass, setconfig ? "accessdlscorepenalty" : ""); } bool GeoTreeEngine::setAccessUlScorePenalty(char value, int netSpeedClass, bool setconfig) { return setScorePenalty(pPenaltySched.pAccessUlScorePenaltyF, pPenaltySched.pAccessUlScorePenalty, value, netSpeedClass, setconfig ? "accessulscorepenalty" : ""); } bool GeoTreeEngine::setProxyScorePenalty(char value, int netSpeedClass, bool setconfig) { return setScorePenalty(pPenaltySched.pProxyScorePenaltyF, pPenaltySched.pProxyScorePenalty, value, netSpeedClass, setconfig ? "gwscorepenalty" : ""); } bool GeoTreeEngine::setPlctDlScorePenalty(const char* value, bool setconfig) { return setScorePenalty(pPenaltySched.pPlctDlScorePenaltyF, pPenaltySched.pPlctDlScorePenalty, value, setconfig ? "plctdlscorepenalty" : ""); } bool GeoTreeEngine::setPlctUlScorePenalty(const char* value, bool setconfig) { return setScorePenalty(pPenaltySched.pPlctUlScorePenaltyF, pPenaltySched.pPlctUlScorePenalty, value, setconfig ? "plctulscorepenalty" : ""); } bool GeoTreeEngine::setAccessDlScorePenalty(const char* value, bool setconfig) { return setScorePenalty(pPenaltySched.pAccessDlScorePenaltyF, pPenaltySched.pAccessDlScorePenalty, value, setconfig ? "accessdlscorepenalty" : ""); } bool GeoTreeEngine::setAccessUlScorePenalty(const char* value, bool setconfig) { return setScorePenalty(pPenaltySched.pAccessUlScorePenaltyF, pPenaltySched.pAccessUlScorePenalty, value, setconfig ? "accessulscorepenalty" : ""); } bool GeoTreeEngine::setProxyScorePenalty(const char* value, bool setconfig) { return setScorePenalty(pPenaltySched.pProxyScorePenaltyF, pPenaltySched.pProxyScorePenalty, value, setconfig ? "gwscorepenalty" : ""); } bool GeoTreeEngine::setFillRatioLimit(char value, bool setconfig) { return setInternalParam(pFillRatioLimit, value, true, setconfig ? "fillratiolimit" : ""); } bool GeoTreeEngine::setFillRatioCompTol(char value, bool setconfig) { return setInternalParam(pFillRatioCompTol, value, true, setconfig ? "fillratiocomptol" : ""); } bool GeoTreeEngine::setSaturationThres(char value, bool setconfig) { return setInternalParam(pSaturationThres, value, true, setconfig ? "saturationthres" : ""); } bool GeoTreeEngine::setTimeFrameDurationMs(int value, bool setconfig) { return setInternalParam(pTimeFrameDurationMs, value, false, setconfig ? "timeframedurationms" : ""); } bool GeoTreeEngine::setPenaltyUpdateRate(float value, bool setconfig) { return setInternalParam(pPenaltyUpdateRate, value, false, setconfig ? "penaltyupdaterate" : ""); } bool GeoTreeEngine::setParameter(std::string param, const std::string& value, int iparamidx, bool setconfig) { std::transform(param.begin(), param.end(), param.begin(), ::tolower); double dval = 0.0; (void) sscanf(value.c_str(), "%lf", &dval); int ival = (int)dval; bool ok = false; #define readParamVFromString(PARAM,VALUE) { \ std::string q; \ if(sscanf(VALUE.c_str(),"[%f,%f,%f,%f,%f,%f,%f,%f]", \ &PARAM##F[0],&PARAM##F[1],&PARAM##F[2],&PARAM##F[3],&PARAM##F[4],\ &PARAM##F[5],&PARAM##F[6],&PARAM##F[7])!=8) return false; \ for(int i=0;i<8;i++) \ PARAM[i]=(char)PARAM##F[i]; \ ok = true;} if (param == "timeframedurationms") { ok = this->setTimeFrameDurationMs(ival, setconfig); } else if (param == "saturationthres") { ok = this->setSaturationThres((char)ival, setconfig); } else if (param == "fillratiocomptol") { ok = this->setFillRatioCompTol((char)ival, setconfig); } else if (param == "fillratiolimit") { ok = this->setFillRatioLimit((char)ival, setconfig); } else if (param == "accessulscorepenalty") { if (iparamidx > -2) { ok = this->setAccessUlScorePenalty((char)ival, iparamidx, setconfig); } else { readParamVFromString(pPenaltySched.pAccessUlScorePenalty, value); } } else if (param == "accessdlscorepenalty") { if (iparamidx > -2) { ok = this->setAccessDlScorePenalty((char)ival, iparamidx, setconfig); } else { readParamVFromString(pPenaltySched.pAccessDlScorePenalty, value); } } else if (param == "plctulscorepenalty") { if (iparamidx > -2) { ok = this->setPlctUlScorePenalty((char)ival, iparamidx, setconfig); } else { readParamVFromString(pPenaltySched.pPlctUlScorePenalty, value); } } else if (param == "plctdlscorepenalty") { if (iparamidx > -2) { ok = this->setPlctDlScorePenalty((char)ival, iparamidx, setconfig); } else { readParamVFromString(pPenaltySched.pPlctDlScorePenalty, value); } } else if (param == "gwscorepenalty") { if (iparamidx > -2) { ok = this->setProxyScorePenalty((char)ival, iparamidx, setconfig); } else { readParamVFromString(pPenaltySched.pProxyScorePenalty, value); } } else if (param == "skipsaturatedblcaccess") { ok = this->setSkipSaturatedBlcAccess((bool)ival, setconfig); } else if (param == "skipsaturateddrnaccess") { ok = this->setSkipSaturatedDrnAccess((bool)ival, setconfig); } else if (param == "skipsaturatedaccess") { ok = this->setSkipSaturatedAccess((bool)ival, setconfig); } else if (param == "penaltyupdaterate") { ok = this->setPenaltyUpdateRate((float)dval, setconfig); } else if (param == "disabledbranches") { ok = true; if (value.size() > 4) { // first, clear the list of disabled branches this->rmDisabledBranch("*", "*", "*", NULL); // remove leading and trailing square brackets string list(value.substr(2, value.size() - 4)); // from the end to avoid reallocation of the string size_t idxl, idxr; while ((idxr = list.rfind(')')) != std::string::npos && ok) { idxl = list.rfind('('); auto comidx = list.find(',', idxl); string geotag(list.substr(idxl + 1, comidx - idxl - 1)); auto comidx2 = list.find(',', comidx + 1); string optype(list.substr(comidx + 1, comidx2 - comidx - 1)); string group(list.substr(comidx2 + 1, idxr - comidx2 - 1)); ok = ok && this->addDisabledBranch(group, optype, geotag, NULL, setconfig); list.erase(idxl, std::string::npos); } } } else if (param == "proxyclosetofs") { ok = this->setProxyCloseToFs((bool)ival, setconfig); } else if (param == "accessgeotagmapping") { ok = this->setAccessGeotagMapping(value, setconfig); } else if (param == "accessproxygroup") { ok = this->setAccessProxygroup(value, setconfig); } return ok; } void GeoTreeEngine::setConfigValue(const char* prefix, const char* key, const char* val) { gOFS->ConfEngine->SetConfigValue(prefix, key, val); } bool GeoTreeEngine::markPendingBranchDisablings(const std::string& group, const std::string& optype, const std::string& geotag) { for (auto git = pGroup2SchedTME.begin(); git != pGroup2SchedTME.end(); git++) { RWMutexReadLock lock(git->second->doubleBufferMutex); if (group == "*" || git->first->mName == group) { git->second->slowTreeModified = true; } } return true; } bool GeoTreeEngine::applyBranchDisablings(const SchedTME& entry) { for (auto mit = pDisabledBranches.begin(); mit != pDisabledBranches.end(); mit++) { // should I lock configMutex or is it already locked? const std::string& group(mit->first); if (group != "*" && entry.group->mName != group) { continue; } for (auto oit = mit->second.begin(); oit != mit->second.end(); oit++) { const std::string& optype(oit->first); for (auto geoit = oit->second.begin(); geoit != oit->second.end(); geoit++) { const std::string& geotag(*geoit); auto idx = entry.backgroundFastStruct->tag2NodeIdx->getClosestFastTreeNode( geotag.c_str()); // check there is an exact geotag match if ((*entry.backgroundFastStruct->treeInfo)[idx].fullGeotag != geotag) { continue; } if (optype == "*" || optype == "plct") { entry.backgroundFastStruct->placementTree->disableSubTree(idx); } if (optype == "*" || optype == "accsro") { entry.backgroundFastStruct->rOAccessTree->disableSubTree(idx); } if (optype == "*" || optype == "accsrw") { entry.backgroundFastStruct->rWAccessTree->disableSubTree(idx); } if (optype == "*" || optype == "plctdrain") { entry.backgroundFastStruct->drnPlacementTree->disableSubTree(idx); } if (optype == "*" || optype == "accsdrain") { entry.backgroundFastStruct->drnAccessTree->disableSubTree(idx); } } } } return true; } bool GeoTreeEngine::addDisabledBranch(const std::string& group, const std::string& optype, const std::string& geotag, XrdOucString* output, bool toConfig) { eos::common::RWMutexWriteLock lock(pAddRmFsMutex); eos::common::RWMutexWriteLock lock2(pTreeMapMutex); eos::common::RWMutexWriteLock lock3(configMutex); std::vector intersection; // Do checks - go through the potentially intersecting groups auto git_begin = group == "*" ? pDisabledBranches.begin() : pDisabledBranches.find(group); auto git_end = (group == "*" ? pDisabledBranches.end() : pDisabledBranches.find(group)); if (git_end != pDisabledBranches.end()) { git_end++; } for (auto git = git_begin; git != git_end; git++) { // go through the potentially intersecting optypes auto oit_begin = (optype == "*" ? git->second.begin() : git->second.find( group)); auto oit_end = (optype == "*" ? git->second.end() : git->second.find(group)); if (oit_end != git->second.end()) { oit_end++; } for (auto oit = oit_begin; oit != oit_end; oit++) { XrdOucString toinsert(geotag.c_str()); // Check that none of the disabled geotag is a prefix of the current one // and the other way around. for (auto geoit = oit->second.begin(); geoit != oit->second.end(); geoit++) { XrdOucString alreadyThere(geoit->c_str()); if (alreadyThere.beginswith(toinsert) || toinsert.beginswith(alreadyThere)) { intersection.push_back(std::string("(") + geotag.c_str() + std::string(",") + oit->first + std::string(",") + git->first + std::string(")") + std::string(alreadyThere.c_str())); } } } } if (intersection.size()) { if (output) { output->append((std::string("unable to add disabled branch : ") + std::string("(") + geotag + std::string(",") + optype + std::string(",") + geotag + std::string(") clashes with : ")).c_str()); for (auto iit = intersection.begin(); iit != intersection.end(); iit++) { output->append((*iit + " , ").c_str()); } } return false; } // Update the internal value pDisabledBranches[group][optype].insert(geotag); // To apply the new set of rules, mark the involved slow trees as modified to force a refresh markPendingBranchDisablings(group, optype, geotag); // update the config if (toConfig) { XrdOucString outStr("[ "); showDisabledBranches("*", "*", "*", &outStr, false); outStr.replace(")\n(", ") , ("); outStr.replace(")\n", ")"); outStr += " ]"; setConfigValue("geosched", "disabledbranches" , outStr.c_str()); } return true; } bool GeoTreeEngine::rmDisabledBranch(const std::string& group, const std::string& optype, const std::string& geotag, XrdOucString* output, bool toConfig) { eos::common::RWMutexWriteLock lock(pAddRmFsMutex); eos::common::RWMutexWriteLock lock2(pTreeMapMutex); eos::common::RWMutexWriteLock lock3(configMutex); bool found = false; if (group == "*" && optype == "*" && geotag == "*") { found = true; eos_notice("clearing disabled branch list in GeoTreeEngine"); pDisabledBranches.clear(); } else if (pDisabledBranches.count(group)) { if (pDisabledBranches[group].count(optype)) { found = (bool)pDisabledBranches[group][optype].erase(geotag); } } if (!found) { if (output) output->append((std::string("could not find disabled branch : ") + std::string("(") + group + std::string(" , ") + optype + std::string(") -> ") + geotag).c_str()); } else { // To apply the new set of rules, mark the involved slow trees as modified // to force a refresh. markPendingBranchDisablings(group, optype, geotag); if (toConfig) { // Update the config XrdOucString outStr("[ "); showDisabledBranches("*", "*", "*", &outStr, false); outStr.replace(")\n(", ") , ("); outStr.replace(")\n", ")"); outStr += " ]"; setConfigValue("geosched", "disabledbranches" , outStr.c_str()); } } return found; } bool GeoTreeEngine::showDisabledBranches(const std::string& group, const std::string& optype, const std::string& geotag, XrdOucString* output, bool lock) { if (lock) { configMutex.LockRead(); } for (auto git = pDisabledBranches.begin(); git != pDisabledBranches.end(); git++) { if (group == "*" || group == git->first) for (auto oit = git->second.begin(); oit != git->second.end(); oit++) { if (optype == "*" || optype == oit->first) for (auto geoit = oit->second.begin(); geoit != oit->second.end(); geoit++) { if (geotag == "*" || geotag == *geoit) if (output) { output->append((std::string("(") + *geoit + std::string(",") + oit->first + std::string(",") + git->first + std::string(")\n")).c_str()); } } } } if (lock) { configMutex.UnLockRead(); } return true; } std::string GeoTreeEngine::AccessStruct::getMappingStr() const { std::string ret; for (auto it = accessGeotagMap.begin(); it != accessGeotagMap.end() ; it++) { if (it != accessGeotagMap.begin()) { ret.append(";"); } ret.append(it->first); ret.append("=>"); ret.append(it->second); } return ret; } bool GeoTreeEngine::AccessStruct::setMapping(const std::string& mapping, bool setconfig) { std::string mappingelement, geotag, geotaglist; std::stringstream ss(mapping); while (std::getline(ss, mappingelement, ';')) { auto idx = mappingelement.find("=>"); if (idx == std::string::npos) { eos_static_err("error parsing config entry while restoring config : %s", mappingelement.c_str()); return false; } geotag = mappingelement.substr(0, idx); geotaglist = mappingelement.substr(idx + 2, std::string::npos); setMapping(geotag, geotaglist, false, false); } if (!geotag.empty()) { return setMapping(geotag, geotaglist, true, setconfig); // to rebuild the tree and set the config } else { return true; } } bool GeoTreeEngine::AccessStruct::setMapping(const std::string& geotag, const std::string& geotaglist, bool updateFastStruct, bool setconfig) { RWMutexWriteLock lock(accessMutex); if (!inuse) { accessST = new SlowTree("AccessGeotagMapping"); accessFT = new FastGatewayAccessTree(); accessFT->selfAllocate(FastGatewayAccessTree::sGetMaxNodeCount()); accessFTI = new SchedTreeBase::FastTreeInfo(); accessFTI->reserve(FastGatewayAccessTree::sGetMaxNodeCount()); accessHost2Idx = new Host2TreeIdxMap(); accessHost2Idx->selfAllocate(FastGatewayAccessTree::sGetMaxNodeCount()); accessTag2Idx = new GeoTag2NodeIdxMap(); accessTag2Idx->selfAllocate(FastGatewayAccessTree::sGetMaxNodeCount());; inuse = true; } SlowTree::TreeNodeInfo tni; SlowTree::TreeNodeStateFloat tns; tni.geotag = geotag; tni.proxygroup = geotaglist; accessST->insert(&tni, &tns, false, true); accessGeotagMap[geotag] = geotaglist; if (updateFastStruct) { accessST->buildFastStrcturesAccess(accessFT, accessHost2Idx, accessFTI, accessTag2Idx); } if (setconfig) { setConfigValue("geosched", configkey.c_str(), getMappingStr().c_str()); } return true; } bool GeoTreeEngine::AccessStruct::clearMapping(const std::string& geotag, bool updateFastStruct, bool setconfig) { RWMutexWriteLock lock(accessMutex); if (inuse) { SlowTree::TreeNodeInfo tni; tni.geotag = geotag; // if we have a geotag, we remove that geotag if (!geotag.empty() && !accessST->remove(&tni, false)) { return false; } if (!geotag.empty()) { accessGeotagMap.erase(geotag); } // if we don't have a geotag or if the tree is now empty, remove everything if (geotag.empty() || accessST->getNodeCount() == 1) { delete accessST; delete accessFT; delete accessFTI; delete accessHost2Idx; delete accessTag2Idx; accessGeotagMap.clear(); inuse = false; } else if (updateFastStruct) { accessST->buildFastStrcturesAccess(accessFT, accessHost2Idx, accessFTI, accessTag2Idx); } } if (setconfig) { setConfigValue("geosched", configkey.c_str(), getMappingStr().c_str()); } return true; } bool GeoTreeEngine::AccessStruct::showMapping(XrdOucString* output, std::string operation, bool monitoring) { RWMutexReadLock lock(accessMutex); if (inuse) { unsigned geo_depth_max = 0; std::string format_s = !monitoring ? "s" : "os"; std::string format_ss = !monitoring ? "-s" : "os"; // Set for tree: num of line, depth, prefix_1, prefix_2, fullGeotag, proxygroup/direct std::set> data_access; accessST->displayAccess(data_access, geo_depth_max); TableFormatterBase table_access; TableHeader table_header; table_header.push_back(std::make_tuple("operation", 6, format_ss)); table_header.push_back(std::make_tuple("geotag", 6, format_ss)); if (!monitoring) { if (geo_depth_max > 1) { for (unsigned i = 1; i < geo_depth_max; i++) { std::string name = "lev" + std::to_string(i); table_header.push_back(std::make_tuple(name, 4, format_ss)); } } table_header.push_back(std::make_tuple("fullGeotag", 6, format_s)); } table_header.push_back(std::make_tuple("mapping", 6, format_s)); table_access.SetHeader(table_header); unsigned prefix[geo_depth_max + 1]; for (auto it : data_access) { if (!monitoring) { unsigned geo_depth = 0; std::string geotag_temp = std::get<4>(it); while (geotag_temp.find("::") != std::string::npos) { geotag_temp.erase(0, geotag_temp.find("::") + 2); geo_depth++; } TableData table_data; table_data.emplace_back(); // Print operation (depth=1) if (std::get<1>(it) == 1) { table_data.back().push_back(TableCell(operation, "s")); } // Print geotag (depth=2 or 3) else if (std::get<1>(it) == 2 || std::get<1>(it) == 3) { if (geo_depth > 0) { prefix[geo_depth - 1] = std::get<2>(it); } prefix[geo_depth] = std::get<3>(it); for (unsigned i = 0; i <= geo_depth; i++) { // arrows table_data.back().push_back(TableCell(prefix[i], "t")); } std::string geotag = std::get<4>(it); geotag = (geo_depth > 0) ? geotag.substr(geotag.rfind("::") + 2) : geotag; table_data.back().push_back(TableCell(geotag, "s")); for (unsigned i = 0; i < geo_depth_max - geo_depth - 1; i++) { // blank cell after geotag table_data.back().push_back(TableCell("", "s")); } } // Print other columns if (!std::get<5>(it).empty()) { table_data.back().push_back(TableCell(std::get<4>(it), "s")); table_data.back().push_back(TableCell(std::get<5>(it), "s")); } table_access.AddRows(table_data); } // Monitoring else if (!std::get<5>(it).empty()) { TableData table_data; table_data.emplace_back(); table_data.back().push_back(TableCell(operation, "s")); table_data.back().push_back(TableCell(std::get<4>(it), "s")); table_data.back().push_back(TableCell(std::get<5>(it), "s")); table_access.AddRows(table_data); } } output->append(table_access.GenerateTable(HEADER).c_str()); return true; } return false; } bool GeoTreeEngine::accessReqFwEP(const std::string& targetGeotag, const std::string& accesserGeotag) const { // if no direct access geotag mapping is defined, all accesses are direct if (!pAccessGeotagMapping.inuse) { return false; } // first get the parent node giving the access rule auto idx = pAccessGeotagMapping.accessTag2Idx->getClosestFastTreeNode( accesserGeotag.c_str()); SchedTreeBase::tFastTreeIdx idx2 = 0; pAccessGeotagMapping.accessFT->findFreeSlotFirstHitBack(idx2, idx); // parse the geotag list and check the access auto accessible = (*pAccessGeotagMapping.accessFTI)[idx2].proxygroup; size_t beg = std::numeric_limits::max(), end = std::numeric_limits::max(); for (size_t i = 0; i < accessible.size(); i++) { if (accessible[i] == ',') { if (beg == std::numeric_limits::max()) { continue; } end = i; // if we have a new token if (end > beg) { if (((end - beg) <= targetGeotag.size() && ((end - beg) == targetGeotag.size() || targetGeotag[end - beg] == ':')) && !strncmp(targetGeotag.c_str(), accessible.c_str() + beg, end - beg)) { return false; } beg = end + 1; } } else if (beg == std::numeric_limits::max()) { beg = i; } } // the end of the string is also the end of the last token if (beg < accessible.size()) { end = accessible.size(); } if (end > beg) { if (((end - beg) <= targetGeotag.size() && ((end - beg) == targetGeotag.size() || targetGeotag[end - beg] == ':')) && !strncmp(targetGeotag.c_str(), accessible.c_str() + beg, end - beg)) { return false; } } return true; } std::string GeoTreeEngine::accessGetProxygroup(const std::string& toAccess) const { // if no access proxygroup mapping is defined, there is no proxygroup to return if (!pAccessProxygroup.inuse) { return ""; } // first get the parent node giving the proxygroup auto idx = pAccessProxygroup.accessTag2Idx->getClosestFastTreeNode( toAccess.c_str()); SchedTreeBase::tFastTreeIdx idx2 = 0; pAccessProxygroup.accessFT->findFreeSlotFirstHitBack(idx2, idx); return (*pAccessProxygroup.accessFTI)[idx2].proxygroup; } void GeoTreeEngine::tlFree(void* arg) { eos_static_debug("destroying thread specific geobuffer"); // delete the buffer delete[](char*)arg; } char* GeoTreeEngine::tlAlloc(size_t size) { eos_static_debug("allocating thread specific geobuffer"); char* buf = new char[size]; if (pthread_setspecific(gPthreadKey, buf)) { eos_static_crit("error registering thread-local buffer located at %p for " "cleaning up : memory will be leaked when thread is " "terminated", buf); } return buf; } EOSMGMNAMESPACE_END