//------------------------------------------------------------------------------ // @file DrainTransferJob.cc // @author Elvin Sindrilaru - CERN //------------------------------------------------------------------------------ /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2019 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "mgm/drain/DrainTransferJob.hh" #include "mgm/XrdMgmOfs.hh" #include "mgm/FsView.hh" #include "mgm/GeoTreeEngine.hh" #include "mgm/Stat.hh" #include "mgm/proc/proc_fs.hh" #include "common/SecEntity.hh" #include "common/LayoutId.hh" #include "common/StringTokenizer.hh" #include "namespace/interface/IView.hh" #include "namespace/ns_quarkdb/persistency/MetadataFetcher.hh" #include "namespace/Prefetcher.hh" EOSMGMNAMESPACE_BEGIN //------------------------------------------------------------------------------ // Save error message and set the status accordingly //------------------------------------------------------------------------------ void DrainTransferJob::ReportError(const std::string& error) { eos_err("%s", error.c_str()); mErrorString = error; mStatus = Status::Failed; } //------------------------------------------------------------------------------ // Execute a thrid-party transfer //------------------------------------------------------------------------------ void DrainTransferJob::DoIt() noexcept { using eos::common::LayoutId; eos_static_info("msg=\"running job\" fsid_src=%i fsid_dst=%i fxid=%08llx", mFsIdSource.load(), mFsIdTarget.load(), mFileId); if (mProgressHandler.ShouldCancel(0)) { ReportError(SSTR("msg=\"job cancelled before starting\" fxid=" << eos::common::FileId::Fid2Hex(mFileId))); return; } mStatus = Status::Running; FileDrainInfo fdrain; try { fdrain = GetFileInfo(); } catch (const eos::MDException& e) { // This could be a ghost fid entry still present in the file system map // and we need to also drop it from there std::string out, err; auto root_vid = eos::common::VirtualIdentity::Root(); (void) proc_fs_dropghosts(mFsIdSource, {mFileId}, root_vid, out, err); eos_info("msg=\"drain ghost entry successful\" fxid=%s", eos::common::FileId::Fid2Hex(mFileId).c_str()); mStatus = Status::OK; return; } while (true) { if ((mFsIdTarget == 0ul) && !SelectDstFs(fdrain)) { ReportError(SSTR("msg=\"failed to select destination file system\" fxid=" << eos::common::FileId::Fid2Hex(mFileId))); return; } // Special case when deadling with 0-size replica files if ((fdrain.mProto.size() == 0) && (LayoutId::GetLayoutType(fdrain.mProto.layout_id()) == LayoutId::kReplica)) { mStatus = DrainZeroSizeFile(fdrain); return; } // Prepare the TPC copy job std::string log_id = LogId::GenerateLogId(); XrdCl::URL url_src = BuildTpcSrc(fdrain, log_id); XrdCl::URL url_dst = BuildTpcDst(fdrain, log_id); // When no more sources are available the url_src/dst is empty and // mStatus is properly set already during the build step if (!url_src.IsValid() || !url_dst.IsValid()) { eos_static_err("msg=\"url invalid\" src=\"%s\" dst=\"%s\"", url_src.GetURL().c_str(), url_dst.GetURL().c_str()); return; } // If enabled use xrootd connection pool to avoid bottelnecks on the // same physical connection eos::common::XrdConnIdHelper src_id_helper(gOFS->mXrdConnPool, url_src); eos::common::XrdConnIdHelper dst_id_helper(gOFS->mXrdConnPool, url_dst); // Populate the properties map of the transfer XrdCl::PropertyList properties; properties.Set("force", true); properties.Set("posc", false); properties.Set("coerce", false); properties.Set("source", url_src); properties.Set("target", url_dst); properties.Set("sourceLimit", (uint16_t) 1); properties.Set("chunkSize", (uint32_t)(4 * 1024 * 1024)); properties.Set("parallelChunks", (uint8_t) 1); if (getenv("EOS_USE_TPC_TIMEOUT_ESTIMATE")) { properties.Set("tpcTimeout", eos::common::FileId::EstimateTpcTimeout (fdrain.mProto.size()).count()); } // Non-empty files run with TPC only if (fdrain.mProto.size()) { properties.Set("thirdParty", "only"); } // Create the process job XrdCl::PropertyList result; XrdCl::CopyProcess cpy; cpy.AddJob(properties, &result); XrdCl::XRootDStatus prepare_st = cpy.Prepare(); eos_info("[tpc]: app=%s logid=%s src_url=%s => dst_url=%s" " prepare_msg=%s", mAppTag.c_str(), log_id.c_str(), url_src.GetLocation().c_str(), url_dst.GetLocation().c_str(), prepare_st.ToStr().c_str()); if (prepare_st.IsOK()) { XrdCl::XRootDStatus tpc_st = cpy.Run(&mProgressHandler); if (!tpc_st.IsOK()) { eos_err("%s", SSTR("src=" << url_src.GetLocation().c_str() << " dst=" << url_dst.GetLocation().c_str() << " logid=" << log_id << " tpc_err=" << tpc_st.ToStr()).c_str()); // If cancellation requested no point in trying other replicas if (mProgressHandler.ShouldCancel(0)) { break; } // If file is being written then remove it from the drain tracker so // that it's retried one more time at the end if (tpc_st.errNo == EINPROGRESS) { eos_info("msg=\"skip file open in progress\" logid=%s", log_id.c_str()); break; } } else { eos_info("msg=\"%s successful\" logid=%s fxid=%s", mAppTag.c_str(), log_id.c_str(), eos::common::FileId::Fid2Hex(mFileId).c_str()); mStatus = Status::OK; return; } } else { eos_err("%s", SSTR("msg=\"prepare failed\" logid=" << log_id.c_str()).c_str()); } } mStatus = Status::Failed; return; } //------------------------------------------------------------------------------ // Get file metadata info //------------------------------------------------------------------------------ DrainTransferJob::FileDrainInfo DrainTransferJob::GetFileInfo() const { std::ostringstream oss; FileDrainInfo fdrain; eos::Prefetcher::prefetchFileMDWithParentsAndWait(gOFS->eosView, mFileId); try { eos::common::RWMutexReadLock ns_rd_lock(gOFS->eosViewRWMutex); std::shared_ptr fmd = gOFS->eosFileService->getFileMD(mFileId); fdrain.mFullPath = gOFS->eosView->getUri(fmd.get()); fdrain.mProto.set_id(fmd->getId()); fdrain.mProto.set_layout_id(fmd->getLayoutId()); fdrain.mProto.set_cont_id(fmd->getContainerId()); fdrain.mProto.set_uid(fmd->getCUid()); fdrain.mProto.set_gid(fmd->getCGid()); fdrain.mProto.set_size(fmd->getSize()); auto xs = fmd->getChecksum(); fdrain.mProto.set_checksum(xs.getDataPtr(), xs.getSize()); auto vect_locations = fmd->getLocations(); for (const auto loc : vect_locations) { fdrain.mProto.add_locations(loc); } } catch (eos::MDException& e) { eos_err("%s", SSTR("fxid=" << eos::common::FileId::Fid2Hex(mFileId) << " errno=" << e.getErrno() << " msg=\"" << e.getMessage().str() << "\"").c_str()); throw e; } return fdrain; } //------------------------------------------------------------------------------ // Build TPC source url //------------------------------------------------------------------------------ XrdCl::URL DrainTransferJob::BuildTpcSrc(const FileDrainInfo& fdrain, const std::string& log_id) { using namespace eos::common; XrdCl::URL url_src; eos::common::FileSystem::fs_snapshot_t src_snapshot; unsigned long lid = fdrain.mProto.layout_id(); unsigned long target_lid = LayoutId::SetLayoutType(lid, LayoutId::kPlain); // Mask block checksums (set to kNone) for replica layouts if (LayoutId::GetLayoutType(lid) == LayoutId::kReplica) { target_lid = LayoutId::SetBlockChecksum(target_lid, LayoutId::kNone); } if (eos::common::LayoutId::GetLayoutType(fdrain.mProto.layout_id()) <= eos::common::LayoutId::kReplica) { bool found = false; if (!mBalanceMode) { for (const auto id : fdrain.mProto.locations()) { // First try copying from a location different from the current source // file system. Make sure we also skip any EOS_TAPE_FSID (65535) // replicas. if ((id != mFsIdSource) && (id != EOS_TAPE_FSID) && (mTriedSrcs.find(id) == mTriedSrcs.end())) { mTriedSrcs.insert(id); eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex); FileSystem* fs = FsView::gFsView.mIdView.lookupByID(id); if (fs) { fs->SnapShotFileSystem(src_snapshot); if (src_snapshot.mConfigStatus >= eos::common::ConfigStatus::kDrain) { found = true; break; } } } } } // If nothing found and we didn't try the current file system give it a go if (!found && (mTriedSrcs.find(mFsIdSource) == mTriedSrcs.end())) { found = true; mTriedSrcs.insert(mFsIdSource); eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex); FileSystem* fs = FsView::gFsView.mIdView.lookupByID(mFsIdSource); if (!fs) { ReportError(SSTR("msg=\"fsid=" << mFsIdSource << " no longer in the list")); return url_src; } fs->SnapShotFileSystem(src_snapshot); } if (!found) { ReportError(SSTR("msg=\"no more replicas available\" " << "fxid=" << eos::common::FileId::Fid2Hex(fdrain.mProto.id()))); return url_src; } } else { // For RAIN layouts we trigger an attempt only once if (mRainAttempt) { ReportError(SSTR("msg=\"fxid=" << eos::common::FileId::Fid2Hex( fdrain.mProto.id()) << " rain reconstruct already failed\"")); return url_src; } else { mRainAttempt = true; } mRainReconstruct = true; // If source/dest fses are forced we want to trigger a balance rather than // a drain reconstruct operation if (mBalanceMode) { // For RAIN layout we also need to disable checksum enforcements as the // stripe checksum and logical file checksum will not match mRainReconstruct = false; target_lid = LayoutId::SetChecksum(target_lid, LayoutId::kNone); bool found = false; eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex); FileSystem* fs = FsView::gFsView.mIdView.lookupByID(mFsIdSource); if (fs) { fs->SnapShotFileSystem(src_snapshot); if (src_snapshot.mConfigStatus >= eos::common::ConfigStatus::kDrain) { found = true; } } if (!found) { ReportError(SSTR("msg=\"source stripe not available\" " << "fxid=" << eos::common::FileId::Fid2Hex(fdrain.mProto.id()) << " fsid=" << mFsIdSource)); return url_src; } } } // Construct the source URL std::ostringstream src_params; mTxFsIdSource = src_snapshot.mId; if (mRainReconstruct) { src_params << "&mgm.path=" << StringConversion::SealXrdPath(fdrain.mFullPath) << "&mgm.manager=" << gOFS->ManagerId.c_str() << "&mgm.fid=" << eos::common::FileId::Fid2Hex(mFileId) << "&mgm.sec=" << eos::common::SecEntity::ToKey(0, SSTR("eos/" << mAppTag).c_str()) << "&eos.app=" << mAppTag << "&eos.ruid=0&eos.rgid=0"; } else { src_params << "mgm.access=read" << "&mgm.lid=" << target_lid << "&mgm.cid=" << fdrain.mProto.cont_id() << "&mgm.ruid=1&mgm.rgid=1&mgm.uid=1&mgm.gid=1" << "&mgm.path=" << StringConversion::SealXrdPath(fdrain.mFullPath) << "&mgm.manager=" << gOFS->ManagerId.c_str() << "&mgm.fid=" << eos::common::FileId::Fid2Hex(mFileId) << "&mgm.sec=" << eos::common::SecEntity::ToKey(0, SSTR("eos/" << mAppTag).c_str()) << "&mgm.localprefix=" << src_snapshot.mPath.c_str() << "&mgm.fsid=" << src_snapshot.mId << "&eos.app=" << mAppTag << "&eos.ruid=0&eos.rgid=0"; } // Build the capability int caprc = 0; XrdOucEnv* output_cap = 0; XrdOucEnv input_cap(src_params.str().c_str()); SymKey* symkey = eos::common::gSymKeyStore.GetCurrentKey(); if ((caprc = SymKey::CreateCapability(&input_cap, output_cap, symkey, gOFS->mCapabilityValidity))) { ReportError(SSTR("msg=\"unable to create src capability, errno=" << caprc << "\"")); return url_src; } int cap_len = 0; std::ostringstream src_cap; if (mRainReconstruct) { url_src.SetPath(eos::common::StringConversion::curl_escaped(fdrain.mFullPath)); url_src.SetHostName(gOFS->MgmOfsAlias.c_str()); url_src.SetPort(gOFS->ManagerPort); src_cap << output_cap->Env(cap_len) << "&mgm.logid=" << log_id << "&eos.pio.action=reconstruct" << "&eos.encodepath=curl"; if (mRepairExcluded) { src_cap << "&eos.pio.recfs=" << eos::common::StringTokenizer::merge(mTriedSrcs, ','); } else { src_cap << "&eos.pio.recfs=" << mFsIdSource; } } else { std::ostringstream oss_path; oss_path << "/replicate:" << eos::common::FileId::Fid2Hex(mFileId); url_src.SetPath(oss_path.str()); url_src.SetHostName(src_snapshot.mHost.c_str()); url_src.SetPort(src_snapshot.mPort); src_cap << output_cap->Env(cap_len) << "&mgm.logid=" << log_id << "&source.url=root://" << src_snapshot.mHostPort.c_str() << "//replicate:" << eos::common::FileId::Fid2Hex(mFileId); } url_src.SetParams(src_cap.str()); url_src.SetProtocol("root"); // @todo(esindril) this should use different physical connections url_src.SetUserName("daemon"); delete output_cap; return url_src; } //------------------------------------------------------------------------------ // Build TPC destination url //------------------------------------------------------------------------------ XrdCl::URL DrainTransferJob::BuildTpcDst(const FileDrainInfo& fdrain, const std::string& log_id) { using namespace eos::common; XrdCl::URL url_dst; eos::common::FileSystem::fs_snapshot_t dst_snapshot; unsigned long lid = fdrain.mProto.layout_id(); unsigned long target_lid = LayoutId::SetLayoutType(lid, LayoutId::kPlain); // Mask block checksums (set to kNone) for replica layouts if (LayoutId::GetLayoutType(lid) == LayoutId::kReplica) { target_lid = LayoutId::SetBlockChecksum(target_lid, LayoutId::kNone); } if (LayoutId::IsRain(lid) && mBalanceMode) { target_lid = LayoutId::SetChecksum(target_lid, LayoutId::kNone); } { // Get destination fs snapshot eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex); FileSystem* fs = FsView::gFsView.mIdView.lookupByID(mFsIdTarget); if (!fs) { ReportError("msg=\"target file system not found\""); return url_dst; } fs->SnapShotFileSystem(dst_snapshot); } std::ostringstream xs_info; std::ostringstream dst_params; if (mRainReconstruct) { dst_params << "mgm.access=write" << "&mgm.ruid=1&mgm.rgid=1&mgm.uid=1&mgm.gid=1&mgm.fid=0" << "&mgm.lid=" << target_lid << "&mgm.cid=" << fdrain.mProto.cont_id() << "&mgm.manager=" << gOFS->ManagerId.c_str() << "&mgm.fsid=" << dst_snapshot.mId << "&mgm.sec=" << eos::common::SecEntity::ToKey(0, SSTR("eos/" << mAppTag).c_str()) << "&eos.app=" << mAppTag; } else { dst_params << "mgm.access=write" << "&mgm.lid=" << target_lid << "&mgm.source.lid=" << lid << "&mgm.source.ruid=" << fdrain.mProto.uid() << "&mgm.source.rgid=" << fdrain.mProto.gid() << "&mgm.cid=" << fdrain.mProto.cont_id() << "&mgm.ruid=1&mgm.rgid=1&mgm.uid=1&mgm.gid=1" << "&mgm.path=" << StringConversion::SealXrdPath(fdrain.mFullPath.c_str()) << "&mgm.manager=" << gOFS->ManagerId.c_str() << "&mgm.fid=" << eos::common::FileId::Fid2Hex(mFileId) << "&mgm.sec=" << eos::common::SecEntity::ToKey(0, SSTR("eos/" << mAppTag).c_str()) << "&mgm.localprefix=" << dst_snapshot.mPath.c_str() << "&mgm.fsid=" << dst_snapshot.mId << "&mgm.bookingsize=" << LayoutId::ExpectedStripeSize(lid, fdrain.mProto.size()) << "&eos.app=" << mAppTag << "&mgm.targetsize=" << LayoutId::ExpectedStripeSize(lid, fdrain.mProto.size()); // This is true by default for drain, but when this is set to false, we get // a replication like behaviour similar to the adjustreplica command which // is useful for fsck for eg. if (mDropSrc) { dst_params << "&mgm.drainfsid=" << mFsIdSource; } // Checksum enforcement done only for non-rain layouts if (!LayoutId::IsRain(lid) && !fdrain.mProto.checksum().empty()) { xs_info << "&mgm.checksum="; uint32_t xs_len = LayoutId::GetChecksumLen(lid); uint32_t data_len = fdrain.mProto.checksum().size(); for (auto i = 0u; i < xs_len; ++i) { if (i >= data_len) { // Pad with zeros if necessary xs_info << '0'; } else { xs_info << StringConversion::char_to_hex(fdrain.mProto.checksum()[i]); } } } } // Build the capability int caprc = 0; XrdOucEnv* output_cap = 0; XrdOucEnv input_cap(dst_params.str().c_str()); SymKey* symkey = eos::common::gSymKeyStore.GetCurrentKey(); if ((caprc = SymKey::CreateCapability(&input_cap, output_cap, symkey, gOFS->mCapabilityValidity))) { std::string err = "msg=\"unable to create dst capability, errno="; err += caprc; err += "\""; ReportError(err); return url_dst; } int cap_len = 0; std::ostringstream oss_cap; oss_cap << output_cap->Env(cap_len) << "&mgm.logid=" << log_id; // @Note: the mgm.checksum info needs to be unencrypted in the URL if (!xs_info.str().empty()) { oss_cap << xs_info.str(); } url_dst.SetProtocol("root"); url_dst.SetHostName(dst_snapshot.mHost.c_str()); url_dst.SetPort(dst_snapshot.mPort); url_dst.SetUserName("daemon"); // @todo(esindril) this should use different physical connections url_dst.SetParams(oss_cap.str()); std::ostringstream oss_path; oss_path << "/replicate:" << (mRainReconstruct ? "0" : eos::common::FileId::Fid2Hex(mFileId)); url_dst.SetPath(oss_path.str()); delete output_cap; return url_dst; } //------------------------------------------------------------------------------ // Select destiantion file system for current transfer //------------------------------------------------------------------------------ bool DrainTransferJob::SelectDstFs(const FileDrainInfo& fdrain) { unsigned int nfilesystems = 1; unsigned int ncollocatedfs = 0; std::vector new_repl; eos::common::FileSystem::fs_snapshot_t source_snapshot; eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex); eos::common::FileSystem* source_fs = FsView::gFsView.mIdView.lookupByID( mFsIdSource); if (source_fs == nullptr) { // In case of rain reconstruction without dropping a particular stripe // the mFsIdSource is set to the sentinel value of 0. if ((mFsIdSource == 0u) && fdrain.mProto.locations_size()) { source_fs = FsView::gFsView.mIdView.lookupByID(fdrain.mProto.locations(0)); } if (source_fs == nullptr) { return false; } } source_fs->SnapShotFileSystem(source_snapshot); FsGroup* group = FsView::gFsView.mGroupView[source_snapshot.mGroup]; // Check other replicas for the file std::vector fsid_geotags; std::vector existing_repl; for (auto elem : fdrain.mProto.locations()) { // Skip any EOS_TAPE_FSID replicas if (elem != EOS_TAPE_FSID) { existing_repl.push_back(elem); } } if (!gOFS->mGeoTreeEngine->getInfosFromFsIds(existing_repl, &fsid_geotags, 0, 0)) { eos_err("msg=\"failed to retrieve info for existing replicas\" fxid=%08llx", mFileId); return false; } bool res = gOFS->mGeoTreeEngine->placeNewReplicasOneGroup( group, nfilesystems, &new_repl, (ino64_t) fdrain.mProto.id(), NULL, // entrypoints NULL, // firewall // This methods is only called for drain functionality, for // balance we already provide the destination file system GeoTreeEngine::draining, &existing_repl, &fsid_geotags, fdrain.mProto.size(), "",// start from geotag "",// client geo tag ncollocatedfs, &mExcludeDsts, &fsid_geotags); // excludeGeoTags if (!res || new_repl.empty()) { eos_err("msg=\"fxid=%08llx could not place new replica\"", mFileId); return false; } std::ostringstream oss; for (auto elem : new_repl) { oss << " " << (unsigned long)(elem); } // Return only one fs now mFsIdTarget = new_repl[0]; mExcludeDsts.push_back(mFsIdTarget); eos_static_debug("msg=\"schedule placement retc=%d with fsids=%s\" ", (int)res, oss.str().c_str()); return true; } //------------------------------------------------------------------------------ // Drain 0-size file //------------------------------------------------------------------------------ DrainTransferJob::Status DrainTransferJob::DrainZeroSizeFile(const FileDrainInfo& fdrain) { eos::common::RWMutexWriteLock wr_lock(gOFS->eosViewRWMutex); std::shared_ptr file {nullptr}; try { file = gOFS->eosFileService->getFileMD(fdrain.mProto.id()); } catch (const eos::MDException& e) { // ignore, file will be null } if (file == nullptr) { return Status::Failed; } // We already have excess replicas just drop the current one if (file->getNumLocation() > eos::common::LayoutId::GetStripeNumber(fdrain.mProto.layout_id()) + 1) { file->unlinkLocation(mFsIdSource); } else { // Add the new location and remove the old one if requested file->addLocation(mFsIdTarget); if (mDropSrc) { file->unlinkLocation(mFsIdSource); } } gOFS->eosFileService->updateStore(file.get()); return Status::OK; } //------------------------------------------------------------------------------ // Get drain job info based on the requested tags //------------------------------------------------------------------------------ std::list DrainTransferJob::GetInfo(const std::list& tags) const { std::list info; for (const auto& tag : tags) { // We only support the following tags if (tag == "fxid") { info.push_back(eos::common::FileId::Fid2Hex(mFileId)); } else if (tag == "fs_src") { info.push_back(std::to_string(mFsIdSource)); } else if (tag == "fs_dst") { info.push_back(std::to_string(mFsIdTarget)); } else if (tag == "tx_fs_src") { info.push_back(std::to_string(mTxFsIdSource)); } else if (tag == "start_timestamp") { std::time_t start_ts {(long int)mProgressHandler.mStartTimestampSec.load()}; std::tm calendar_time; (void) localtime_r(&start_ts, &calendar_time); info.push_back(SSTR(std::put_time(&calendar_time, "%c %Z"))); } else if (tag == "progress") { info.push_back(std::to_string(mProgressHandler.mProgress.load()) + "%"); } else if (tag == "speed") { uint64_t now_sec = std::chrono::duration_cast (std::chrono::system_clock::now().time_since_epoch()).count(); if (mProgressHandler.mStartTimestampSec < now_sec) { uint64_t duration_sec = now_sec - mProgressHandler.mStartTimestampSec; float rate = (mProgressHandler.mBytesTransferred / (1024 * 1024)) / duration_sec; info.push_back(std::to_string(rate)); } else { info.push_back("N/A"); } } else if (tag == "err_msg") { info.push_back(mErrorString); } else { info.push_back("N/A"); } } return info; } //------------------------------------------------------------------------------ // Update MGM stats depending on the type of transfer //------------------------------------------------------------------------------ void DrainTransferJob::UpdateMgmStats() { std::string tag_stats; if (mAppTag == "drain") { tag_stats = "DrainCentral"; } else if (mAppTag == "balance") { tag_stats = "Balance"; } else { tag_stats = mAppTag; } if (mStatus == Status::OK) { tag_stats += "Successful"; } else if (mStatus == Status::Failed) { tag_stats += "Failed"; } else { tag_stats += "Started"; } if (gOFS) { gOFS->MgmStats.Add(tag_stats.c_str(), mVid.uid, mVid.gid, 1); } } EOSMGMNAMESPACE_END