// ---------------------------------------------------------------------- // File: WFE.cc // Author: Andreas-Joachim Peters - CERN // ---------------------------------------------------------------------- /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2011 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "common/Path.hh" #include "common/Logging.hh" #include "common/LayoutId.hh" #include "common/ShellCmd.hh" #include "common/StringTokenizer.hh" #include "common/Constants.hh" #include "mgm/Quota.hh" #include "common/CtaCommon.hh" #include "common/eos_cta_pb/EosCtaAlertHandler.hh" #include "mgm/XattrSet.hh" #include "mgm/WFE.hh" #include "mgm/Stat.hh" #include "mgm/XrdMgmOfsDirectory.hh" #include "mgm/XrdMgmOfs.hh" #include "mgm/EosCtaReporter.hh" #include "mgm/CtaUtils.hh" #include "mgm/proc/admin/EvictCmd.hh" #include "namespace/interface/IView.hh" #include "namespace/Prefetcher.hh" #include "namespace/utils/Checksum.hh" #include "Xrd/XrdScheduler.hh" #define EOS_WFE_BASH_PREFIX "/var/eos/wfe/bash/" XrdSysMutex eos::mgm::WFE::gSchedulerMutex; XrdScheduler* eos::mgm::WFE::gScheduler; /*----------------------------------------------------------------------------*/ extern XrdSysError gMgmOfsEroute; extern XrdOucTrace gMgmOfsTrace; EOSMGMNAMESPACE_BEGIN using namespace eos::common; /*----------------------------------------------------------------------------*/ /** * @brief Constructor of the work flow engine */ /*----------------------------------------------------------------------------*/ WFE::WFE() { mMs = 0; mActiveJobs = 0; mRootVid = eos::common::VirtualIdentity::Root(); XrdSysMutexHelper sLock(gSchedulerMutex); gScheduler = new XrdScheduler(&gMgmOfsEroute, &gMgmOfsTrace, 10, 500, 100); gScheduler->Start(); } /*----------------------------------------------------------------------------*/ /** * @brief asynchronous WFE thread startup function */ /*----------------------------------------------------------------------------*/ bool WFE::Start() { try { mThread.reset(&WFE::WFEr, this); } catch (const std::system_error& e) { return false; } return true; } /*----------------------------------------------------------------------------*/ /** * @brief asynchronous WFE thread stop function */ /*----------------------------------------------------------------------------*/ void WFE::Stop() { mThread.join(); } //------------------------------------------------------------------------------ // @brief WFE method doing the actual workflow // // This thread method loops in regular intervals over all workflow jobs in the // workflow directory /eos//proc/workflow/ //------------------------------------------------------------------------------/ void WFE::WFEr(ThreadAssistant& assistant) noexcept { // Eternal thread doing WFE scans time_t snoozetime = 10; size_t lWFEntx = 0; time_t cleanuptime = 0; gOFS->WaitUntilNamespaceIsBooted(assistant); if (assistant.terminationRequested()) { return; } assistant.wait_for(std::chrono::seconds(10)); eos_static_info("msg=\"async WFE thread started\""); while (!assistant.terminationRequested()) { bool IsEnabledWFE; time_t lWFEInterval; time_t lStartTime = time(NULL); time_t lStopTime; time_t lKeepTime = 7 * 86400; std::map > wfedirs; XrdOucString stdErr; { eos::common::RWMutexReadLock lock(FsView::gFsView.ViewMutex); if (FsView::gFsView.mSpaceView.count("default") && (FsView::gFsView.mSpaceView["default"]->GetConfigMember("wfe") == "on")) { IsEnabledWFE = true; } else { IsEnabledWFE = false; } if (FsView::gFsView.mSpaceView.count("default")) { lWFEInterval = atoi(FsView::gFsView.mSpaceView["default"]->GetConfigMember("wfe.interval").c_str()); lWFEntx = atoi(FsView::gFsView.mSpaceView["default"]->GetConfigMember("wfe.ntx").c_str()); lKeepTime = atoi( FsView::gFsView.mSpaceView["default"]->GetConfigMember("wfe.keepTIME").c_str()); if (!lKeepTime) { lKeepTime = 7 * 86400; } } else { lWFEInterval = 0; lWFEntx = 0; } } // Only a master needs to run WFE if (gOFS->mMaster->IsMaster() && IsEnabledWFE) { eos_static_debug("msg=\"start WFE scan\""); // Find all directories defining an WFE policy gOFS->MgmStats.Add("WFEFind", 0, 0, 1); EXEC_TIMING_BEGIN("WFEFind"); // prepare four queries today, yesterday for queued and error jobs std::string queries[4]; for (size_t i = 0; i < 4; ++i) { queries[i] = gOFS->MgmProcWorkflowPath.c_str(); queries[i] += "/"; } { // today time_t when = time(NULL); std::string day = eos::common::Timing::UnixTimestamp_to_Day(when); queries[0] += day; queries[0] += "/q/"; queries[1] += day; queries[1] += "/e/"; //yesterday when -= (24 * 3600); day = eos::common::Timing::UnixTimestamp_to_Day(when); queries[2] += day; queries[2] += "/q/"; queries[3] += day; queries[3] += "/e/"; } for (size_t i = 0; i < 4; ++i) { eos_static_debug("query-path=%s", queries[i].c_str()); gOFS->_find(queries[i].c_str(), mError, stdErr, mRootVid, wfedirs, 0, 0, false, 0, false, 0); } { eos_static_debug("msg=\"finished WFE find\" WFE-dirs=%llu %s", wfedirs.size(), stdErr.c_str() ); time_t now = time(NULL); for (auto it = wfedirs.begin(); it != wfedirs.end(); it++) { // ------------------------------------------------------------------- // get workflows // ------------------------------------------------------------------- if (it->second.size()) { for (auto wit = it->second.begin(); wit != it->second.end(); ++wit) { eos_static_debug("wfe-dir=\"%s\" wfe-job=\"%s\"", it->first.c_str(), wit->c_str()); std::string f = it->first; f += *wit; Job* job = new Job(); if (!job || job->Load(f)) { eos_static_err("msg=\"cannot load workflow entry\" value=\"%s\"", f.c_str()); if (job) { delete job; } } else { // don't schedule jobs for the future if ((!job->mActions.size()) || (now < job->mActions[0].mTime)) { delete job; continue; } // stop scheduling if there are too many jobs running if (lWFEntx <= GetActiveJobs()) { if (lWFEntx > 0) { mDoneSignal.WaitMS(100); if (lWFEntx <= GetActiveJobs()) { delete job; break; } } } if (!job->IsSync()) { // use the shared scheduler for asynchronous jobs XrdSysMutexHelper sLock(gSchedulerMutex); time_t storetime = 0; // move job into the scheduled queue job->Move(job->mActions[0].mQueue, "r", storetime); job->mActions[0].mQueue = "r"; job->mActions[0].mTime = storetime; XrdOucString tst; job->mActions[0].mWhen = eos::common::StringConversion::GetSizeString(tst, (unsigned long long) storetime); gScheduler->Schedule((XrdJob*) job); IncActiveJobs(); eos_static_info("msg=\"scheduled workflow\" job=\"%s\"", job->mDescription.c_str()); } else { delete job; } } } } } } EXEC_TIMING_END("WFEFind"); eos_static_debug("msg=\"finished WFE application\" WFE-dirs=%llu", wfedirs.size()); } lStopTime = time(NULL); if ((lStopTime - lStartTime) < lWFEInterval) { snoozetime = lWFEInterval - (lStopTime - lStartTime); } if (!IsEnabledWFE) { snoozetime = 6000; } eos_static_debug("snooze-time=%llu enabled=%d", snoozetime, IsEnabledWFE); size_t snoozeloop = snoozetime * 10; for (size_t i = 0; i < snoozeloop; i++) { assistant.wait_for(std::chrono::milliseconds(100)); if (assistant.terminationRequested()) { break; } // Check if the setting changed eos::common::RWMutexReadLock lock(FsView::gFsView.ViewMutex); if (FsView::gFsView.mSpaceView.count("default") && (FsView::gFsView.mSpaceView["default"]->GetConfigMember("wfe") == "on")) { if (!IsEnabledWFE) { break; } } else { if (IsEnabledWFE) { break; } } } if (gOFS->mMaster->IsMaster() && (!cleanuptime || (cleanuptime < time(NULL)))) { time_t now = time(NULL); eos_static_info("msg=\"clean old workflows\""); XrdMgmOfsDirectory dir; if (dir.open(gOFS->MgmProcWorkflowPath.c_str(), mRootVid, "") != SFS_OK) { eos_static_err("msg=\"failed to open proc workflow directory\""); continue; } const char* entry; while ((entry = dir.nextEntry())) { std::string when = entry; if ((when == ".") || (when == "..")) { continue; } time_t tst = eos::common::Timing::Day_to_UnixTimestamp(when); if (!tst || (tst < (now - lKeepTime))) { eos_static_info("msg=\"cleaning\" dir=\"%s\"", entry); ProcCommand Cmd; XrdOucString info; XrdOucString out; XrdOucString err; info = "mgm.cmd=rm&eos.ruid=0&eos.rgid=0&mgm.deletion=deep&mgm.option=r&mgm.path="; info += gOFS->MgmProcWorkflowPath; info += "/"; info += entry; Cmd.open("/proc/user", info.c_str(), mRootVid, &mError); Cmd.AddOutput(out, err); if (err.length()) { eos_static_err("msg=\"cleaning failed\" errmsg=\"%s\"", err.c_str()); } else { eos_static_info("msg=\"cleaned\" dri=\"%s\""); } Cmd.close(); } } cleanuptime = now + 3600; } } } /*----------------------------------------------------------------------------*/ /** * @brief store a workflow jobs in the workflow queue * @return SFS_OK if success */ /*----------------------------------------------------------------------------*/ int WFE::Job::Save(std::string queue, time_t& when, int action, int retry) { if (mActions.size() != 1) { return -1; } std::string workflowdir = gOFS->MgmProcWorkflowPath.c_str(); workflowdir += "/"; workflowdir += mActions[action].mDay; workflowdir += "/"; workflowdir += queue; workflowdir += "/"; workflowdir += mActions[action].mWorkflow; workflowdir += "/"; std::string entry = eos::common::FileId::Fid2Hex(mFid); eos_static_info("workflowdir=\"%s\" retry=%d when=%u job-time=%s", workflowdir.c_str(), retry, when, mActions[action].mWhen.c_str()); XrdOucErrInfo lError; eos::common::VirtualIdentity rootvid = eos::common::VirtualIdentity::Root(); // check that the workflow directory exists struct stat buf; if (gOFS->_stat(workflowdir.c_str(), &buf, lError, rootvid, "")) { // create the workflow sub directory if (gOFS->_mkdir(workflowdir.c_str(), S_IRWXU | SFS_O_MKPTH, lError, rootvid, "")) { // check if it has been created in the meanwhile as the stat and mkdir are not atomic together if (gOFS->_stat(workflowdir.c_str(), &buf, lError, rootvid, "")) { eos_static_err("msg=\"failed to create workflow directory\" path=\"%s\"", workflowdir.c_str()); return -1; } } } // write a workflow file std::string workflowpath = workflowdir; // evt. store with the current time if (!when) { when = time(nullptr); } XrdOucString tst; workflowpath += eos::common::StringConversion::GetSizeString(tst, (unsigned long long) when); workflowpath += ":"; workflowpath += entry; workflowpath += ":"; workflowpath += mActions[action].mEvent; mWorkflowPath = workflowpath; //Store which day it is stored for mActions[action].mSavedOnDay = mActions[action].mDay; std::string vids = eos::common::Mapping::VidToString(mVid); try { // The point of prefetching here is to get the chunks preceeding the final // one, so that createFile is guaranteed not to wait on network requests. eos::Prefetcher::prefetchContainerMDAndWait(gOFS->eosView, workflowpath); eos::common::RWMutexWriteLock wLock {gOFS->eosViewRWMutex}; auto fmd = gOFS->eosView->createFile(workflowpath, 0, 0); auto cid = fmd->getContainerId(); auto cmd = gOFS->eosDirectoryService->getContainerMD(cid); cmd->setMTimeNow(); cmd->notifyMTimeChange(gOFS->eosDirectoryService); gOFS->eosView->updateContainerStore(cmd.get()); fmd->setAttribute("sys.action", mActions[0].mAction); fmd->setAttribute("sys.vid", vids); fmd->setAttribute("sys.wfe.errmsg", mErrorMesssage); fmd->setAttribute("sys.wfe.retry", std::to_string(retry)); gOFS->eosView->updateFileStore(fmd.get()); } catch (eos::MDException& ex) { eos_static_err("msg=\"failed to save workflow entry\" path=\"%s\" error=\"%s\"", workflowpath.c_str(), ex.what()); return -1; } return SFS_OK; } /*----------------------------------------------------------------------------*/ int WFE::Job::Load(std::string path2entry) /*----------------------------------------------------------------------------*/ /** * @brief load a workflow job from the given path * @return SFS_OK if success */ /*----------------------------------------------------------------------------*/ { XrdOucErrInfo lError; eos::common::VirtualIdentity rootvid = eos::common::VirtualIdentity::Root(); std::string f = path2entry; f.erase(0, path2entry.rfind('/') + 1); std::string workflow = path2entry; workflow.erase(path2entry.rfind('/')); workflow.erase(0, workflow.rfind('/') + 1); std::string q = path2entry; q.erase(q.rfind('/')); q.erase(q.rfind('/')); q.erase(0, q.rfind('/') + 1); std::string savedAtDay = path2entry; savedAtDay.erase(savedAtDay.rfind('/')); savedAtDay.erase(savedAtDay.rfind('/')); savedAtDay.erase(savedAtDay.rfind('/')); savedAtDay.erase(0, savedAtDay.rfind('/') + 1); std::string when; std::string idevent; std::string id; std::string event; bool s1 = eos::common::StringConversion::SplitKeyValue(f, when, idevent, ":"); bool s2 = eos::common::StringConversion::SplitKeyValue(idevent, id, event, ":"); mWorkflowPath = path2entry; if (s1 && s2) { mFid = eos::common::FileId::Hex2Fid(id.c_str()); eos_static_info("workflow=\"%s\" fxid=%08llx", workflow.c_str(), mFid); { eos::Prefetcher::prefetchFileMDAndWait(gOFS->eosView, path2entry); eos::common::RWMutexReadLock rLock(gOFS->eosViewRWMutex); auto fmd = gOFS->eosView->getFile(path2entry); try { time_t t_when = strtoull(when.c_str(), 0, 10); AddAction(fmd->getAttribute("sys.action"), event, t_when, savedAtDay, workflow, q); } catch (eos::MDException& ex) { eos_static_err("msg=\"no action stored\" path=\"%s\"", f.c_str()); } try { auto vidstring = fmd->getAttribute("sys.vid").c_str(); if (!eos::common::Mapping::VidFromString(mVid, vidstring)) { eos_static_crit("parsing of %s failed - setting nobody\n", vidstring); mVid = eos::common::VirtualIdentity::Nobody(); } } catch (eos::MDException& ex) { mVid = eos::common::VirtualIdentity::Nobody(); eos_static_err("msg=\"no vid stored\" path=\"%s\"", f.c_str()); } try { mRetry = (int)strtoul(fmd->getAttribute("sys.wfe.retry").c_str(), nullptr, 10); } catch (eos::MDException& ex) { eos_static_err("msg=\"no retry stored\" path=\"%s\"", f.c_str()); } try { mErrorMesssage = fmd->getAttribute("sys.wfe.errmsg"); } catch (eos::MDException& ex) { eos_static_info("msg=\"no error message stored\" path=\"%s\"", f.c_str()); } } } else { eos_static_err("msg=\"illegal workflow entry\" key=\"%s\"", f.c_str()); return SFS_ERROR; } return SFS_OK; } /*----------------------------------------------------------------------------*/ int WFE::Job::Move(std::string from_queue, std::string to_queue, time_t& when, int retry) /*----------------------------------------------------------------------------*/ /** * @brief move workflow jobs between queues * @return SFS_OK if success */ /*----------------------------------------------------------------------------*/ { auto fromDay = mActions[0].mSavedOnDay; if (Save(to_queue, when, 0, retry) == SFS_OK) { mActions[0].mQueue = to_queue; if ((from_queue != to_queue) && (Delete(from_queue, fromDay) == SFS_ERROR)) { eos_static_err("msg=\"failed to remove for move from queue=\"%s\" to queue=\"%s\"", from_queue.c_str(), to_queue.c_str()); } } else { eos_static_err("msg=\"failed to save for move to queue\" queue=\"%s\"", to_queue.c_str()); return SFS_ERROR; } return SFS_OK; } /*----------------------------------------------------------------------------*/ int /*----------------------------------------------------------------------------*/ WFE::Job::Results(std::string queue, int retc, XrdOucString log, time_t when) /*----------------------------------------------------------------------------*/ { std::string workflowdir = gOFS->MgmProcWorkflowPath.c_str(); workflowdir += "/"; workflowdir += mActions[0].mDay; workflowdir += "/"; workflowdir += queue; workflowdir += "/"; workflowdir += mActions[0].mWorkflow; workflowdir += "/"; std::string entry = eos::common::FileId::Fid2Hex(mFid); eos_static_info("workflowdir=\"%s\" entry=%s", workflowdir.c_str(), entry.c_str()); XrdOucErrInfo lError; eos::common::VirtualIdentity rootvid = eos::common::VirtualIdentity::Root(); // check that the workflow directory exists struct stat buf; if (gOFS->_stat(workflowdir.c_str(), &buf, lError, rootvid, "")) { eos_static_err("msg=\"failed to find the workflow dir\" path=\"%s\"", workflowdir.c_str()); return -1; } // write a workflow file std::string workflowpath = workflowdir; XrdOucString tst; workflowpath += eos::common::StringConversion::GetSizeString(tst, (unsigned long long) when); workflowpath += ":"; workflowpath += entry; workflowpath += ":"; workflowpath += mActions[0].mEvent; mWorkflowPath = workflowpath; XrdOucString sretc; sretc += retc; if (gOFS->_attr_set(workflowpath.c_str(), lError, rootvid, nullptr, "sys.wfe.retc", sretc.c_str())) { eos_static_err("msg=\"failed to store workflow return code\" path=\"%s\" retc=\"%s\"", workflowpath.c_str(), sretc.c_str()); return -1; } if (gOFS->_attr_set(workflowpath.c_str(), lError, rootvid, nullptr, "sys.wfe.log", log.c_str())) { eos_static_err("msg=\"failed to store workflow log\" path=\"%s\" log=\"%s\"", workflowpath.c_str(), log.c_str()); return -1; } return SFS_OK; } /*----------------------------------------------------------------------------*/ int WFE::Job::Delete(std::string queue, std::string fromDay) /*----------------------------------------------------------------------------*/ /** * @brief delete a workflow job from a queue * @return SFS_OK if success */ /*----------------------------------------------------------------------------*/ { if (mActions.size() != 1) { return SFS_ERROR; } std::string workflowdir = gOFS->MgmProcWorkflowPath.c_str(); workflowdir += "/"; // We have to remove from the day when it was saved workflowdir += fromDay; workflowdir += "/"; workflowdir += queue; workflowdir += "/"; workflowdir += mActions[0].mWorkflow; workflowdir += "/"; std::string entry = eos::common::FileId::Fid2Hex(mFid); eos_static_info("workflowdir=\"%s\"", workflowdir.c_str()); XrdOucErrInfo lError; eos::common::VirtualIdentity rootvid = eos::common::VirtualIdentity::Root(); // write a workflow file std::string workflowpath = workflowdir; workflowpath += mActions[0].mWhen; workflowpath += ":"; workflowpath += entry; workflowpath += ":"; workflowpath += mActions[0].mEvent; if (!gOFS->_rem(workflowpath.c_str(), lError, rootvid, "", false, false, true)) { return SFS_OK; } else { eos_static_err("msg=\"failed to delete job\" job=\"%s\"", mDescription.c_str()); return SFS_ERROR; } } /*----------------------------------------------------------------------------*/ int WFE::Job::DoIt(bool issync, std::string& errorMsg, const char* const ininfo) /*----------------------------------------------------------------------------*/ /** * @brief execute a workflow * @return * */ /*----------------------------------------------------------------------------*/ { // RAII: Async jobs reduce counter on all paths auto decrementJobs = [this](void*) { if (!IsSync()) { gOFS->WFEd.DecActiveJobs(); gOFS->WFEd.GetSignal()->Signal(); } }; std::unique_ptr activeJobsGuard { static_cast(this), decrementJobs }; std::string method; std::string args; eos::common::VirtualIdentity lRootVid = eos::common::VirtualIdentity::Root(); XrdOucErrInfo lError; int retc = 0; time_t storetime = 0; if (mActions[0].mQueue == "r" || mActions[0].mQueue == "e") { bool actionParsed = false; if (mActions[0].mAction.find(':') == std::string::npos) { method = mActions[0].mAction; actionParsed = true; } else { actionParsed = eos::common::StringConversion::SplitKeyValue(mActions[0].mAction, method, args, ":"); } if (actionParsed) { if (method == "mail") { std::string recipient; std::string freetext; if (!eos::common::StringConversion::SplitKeyValue(args, recipient, freetext, ":")) { recipient = args; freetext = "EOS workflow notification"; } std::string topic = gOFS->MgmOfsInstanceName.c_str(); topic += " ( "; topic += gOFS->HostName; topic += " ) "; topic += " "; topic += " event="; topic += mActions[0].mEvent; topic += " fid="; topic += eos::common::FileId::Fid2Hex(mFid).c_str(); std::string do_mail = "echo "; do_mail += "\""; do_mail += freetext; do_mail += "\""; do_mail += "| mail -s \""; do_mail += topic; do_mail += "\" "; do_mail += recipient; eos_static_info("shell-cmd=\"%s\"", do_mail.c_str()); eos::common::ShellCmd cmd(do_mail); eos::common::cmd_status rc = cmd.wait(5); if (rc.exit_code) { eos_static_err("msg=\"failed to send workflow notification mail\" job=\"%s\"", mDescription.c_str()); storetime = 0; Move(mActions[0].mQueue, "f", storetime); XrdOucString log = "failed to send workflow notification mail"; Results("f", -1, log, storetime); } else { eos_static_info("msg=\"done notification\" job=\"%s\"", mDescription.c_str()); storetime = 0; Move(mActions[0].mQueue, "d", storetime); XrdOucString log = "notified by email"; Results("d", 0, log, storetime); } } else if (method == "bash") { std::string executable; std::string executableargs; if (!eos::common::StringConversion::SplitKeyValue(args, executable, executableargs, ":")) { executable = args; executableargs = ""; } XrdOucString execargs = executableargs.c_str(); std::string fullpath; bool format_error = false; if (executable.find('/') == std::string::npos) { std::shared_ptr fmd ; std::shared_ptr cmd ; // do meta replacement eos::Prefetcher::prefetchFileMDWithParentsAndWait(gOFS->eosView, mFid); eos::common::RWMutexReadLock viewReadLock(gOFS->eosViewRWMutex); try { fmd = gOFS->eosFileService->getFileMD(mFid); cmd = gOFS->eosDirectoryService->getContainerMD(fmd->getContainerId()); fullpath = gOFS->eosView->getUri(fmd.get()); } catch (eos::MDException& e) { eos_static_debug("caught exception %d %s\n", e.getErrno(), e.getMessage().str().c_str()); } if (fmd.get() && cmd.get()) { std::shared_ptr cfmd = fmd; std::shared_ptr ccmd = cmd; viewReadLock.Release(); std::string cv; eos::IFileMD::ctime_t ctime; eos::IFileMD::ctime_t mtime; cfmd->getCTime(ctime); cfmd->getMTime(mtime); std::string checksum; eos::appendChecksumOnStringAsHex(cfmd.get(), checksum); // translate uid/gid to username/groupname std::string user_name; std::string group_name; int errc; errc = 0; user_name = Mapping::UidToUserName(cfmd->getCUid(), errc); if (errc) { user_name = "nobody"; } errc = 0; group_name = Mapping::GidToGroupName(cfmd->getCGid(), errc); if (errc) { group_name = "nobody"; } XrdOucString unbase64; XrdOucString base64; unbase64 = fullpath.c_str(); eos::common::SymKey::Base64(unbase64, base64); int cnt = 0; while (execargs.replace("", unbase64.c_str())) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", base64.c_str())) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", eos::common::StringConversion::GetSizeString(cv, (unsigned long long) cfmd->getCUid()))) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", eos::common::StringConversion::GetSizeString(cv, (unsigned long long) cfmd->getCGid()))) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", eos::common::StringConversion::GetSizeString(cv, (unsigned long long) mVid.uid))) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", eos::common::StringConversion::GetSizeString(cv, (unsigned long long) mVid.gid))) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", user_name.c_str())) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", group_name.c_str())) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", mVid.uid_string.c_str())) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", mVid.gid_string.c_str())) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", mVid.host.c_str())) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", mVid.app.c_str())) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", mVid.name.c_str())) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", mVid.prot.c_str())) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", mVid.grps.c_str())) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", gOFS->MgmOfsInstanceName)) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", eos::common::StringConversion::GetSizeString(cv, (unsigned long long) ctime.tv_sec))) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", eos::common::StringConversion::GetSizeString(cv, (unsigned long long) ctime.tv_sec))) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", eos::common::StringConversion::GetSizeString(cv, (unsigned long long) ctime.tv_nsec))) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", eos::common::StringConversion::GetSizeString(cv, (unsigned long long) ctime.tv_nsec))) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", eos::common::StringConversion::GetSizeString(cv, (unsigned long long) ctime.tv_sec))) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", eos::common::StringConversion::GetSizeString(cv, (unsigned long long) ctime.tv_sec))) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", eos::common::StringConversion::GetSizeString(cv, (unsigned long long) cfmd->getSize()))) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", eos::common::StringConversion::GetSizeString(cv, (unsigned long long) cfmd->getContainerId()))) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", eos::common::StringConversion::GetSizeString(cv, (unsigned long long) mFid))) { if (++cnt > 16) { break; } } const std::string hex_fid = eos::common::FileId::Fid2Hex(mFid); cnt = 0; while (execargs.replace("", hex_fid.c_str())) { if (++cnt > 16) { break; } } std::string turl = "root://"; turl += gOFS->MgmOfsAlias.c_str(); turl += "/"; turl += fullpath; turl += "?eos.lfn=fxid:"; turl += hex_fid.c_str(); cnt = 0; while (execargs.replace("", turl.c_str())) { if (++cnt > 16) { break; } } unbase64 = cfmd->getName().c_str(); eos::common::SymKey::Base64(unbase64, base64); cnt = 0; while (execargs.replace("", unbase64.c_str())) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", base64.c_str())) { if (++cnt > 16) { break; } } unbase64 = cfmd->getLink().c_str(); eos::common::SymKey::Base64(unbase64, base64); cnt = 0; while (execargs.replace("", unbase64.c_str())) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", base64.c_str())) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", checksum.c_str())) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", eos::common::LayoutId::GetChecksumString(cfmd->getLayoutId()))) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", mActions[0].mEvent.c_str())) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", mActions[0].mQueue.c_str())) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", mActions[0].mWorkflow.c_str())) { if (++cnt > 16) { break; } } cnt = 0; while (execargs.replace("", mWorkflowPath.c_str())) { if (++cnt > 16) { break; } } time_t now = time(NULL); cnt = 0; while (execargs.replace("", eos::common::StringConversion::GetSizeString(cv, (unsigned long long) now))) { if (++cnt > 16) { break; } } int xstart = 0; cnt = 0; while ((xstart = execargs.find(" 256) { break; } int xend = execargs.find(">", xstart); if (xend == STR_NPOS) { format_error = true; break; } else { bool b64encode = false; std::string key; std::string value; key.assign(execargs.c_str() + xstart + 18, xend - xstart - 18); execargs.erase(xstart, xend + 1 - xstart); XrdOucString skey = key.c_str(); if (skey.beginswith("base64:")) { key.erase(0, 7); b64encode = true; } if (gOFS->_attr_get(*cfmd, key, value)) { if (b64encode) { unbase64 = value.c_str(); eos::common::SymKey::Base64(unbase64, base64); value = base64.c_str(); } if (xstart == execargs.length()) { execargs += value.c_str(); } else { execargs.insert(value.c_str(), xstart); } } else { execargs.insert("UNDEF", xstart); } } } xstart = 0; cnt = 0; while ((xstart = execargs.find(" 256) { break; } int xend = execargs.find(">", xstart); if (xend == STR_NPOS) { format_error = true; break; } else { bool b64encode = false; std::string key; std::string value; key.assign(execargs.c_str() + xstart + 18, xend - xstart - 18); execargs.erase(xstart, xend + 1 - xstart); XrdOucString skey = key.c_str(); if (skey.beginswith("base64:")) { key.erase(0, 7); b64encode = true; } if (gOFS->_attr_get(*ccmd, key, value)) { if (b64encode) { unbase64 = value.c_str(); eos::common::SymKey::Base64(unbase64, base64); value = base64.c_str(); } if (xstart == execargs.length()) { execargs += value.c_str(); } else { execargs.insert(value.c_str(), xstart); } } else { execargs.insert("UNDEF", xstart); } } } if (execargs.find("") != STR_NPOS) { XrdOucString out = ""; XrdOucString err = ""; // --------------------------------------------------------------------------------- // run file info to get file md // --------------------------------------------------------------------------------- XrdOucString file_metadata; ProcCommand Cmd; XrdOucString info; info = "mgm.cmd=fileinfo&mgm.path=fid:"; info += eos::common::StringConversion::GetSizeString(cv, (unsigned long long) mFid); info += "&mgm.file.info.option=-m"; Cmd.open("/proc/user", info.c_str(), lRootVid, &lError); Cmd.AddOutput(out, err); Cmd.close(); file_metadata = out; if (err.length()) { eos_static_err("msg=\"file info returned error\" err=\"%s\"", err.c_str()); } cnt = 0; while (file_metadata.replace("\"", "'")) { if (++cnt > 16) { break; } } out = err = ""; // --------------------------------------------------------------------------------- // run container info to get container md // --------------------------------------------------------------------------------- XrdOucString container_metadata; info = "mgm.cmd=fileinfo&mgm.path=pid:"; info += eos::common::StringConversion::GetSizeString(cv, (unsigned long long) cfmd->getContainerId()); info += "&mgm.file.info.option=-m"; Cmd.open("/proc/user", info.c_str(), lRootVid, &lError); Cmd.AddOutput(out, err); Cmd.close(); container_metadata = out; if (err.length()) { eos_static_err("msg=\"container info returned error\" err=\"%s\"", err.c_str()); } cnt = 0; while (container_metadata.replace("\"", "'")) { if (++cnt > 16) { break; } } std::string metadata = "\"fmd={ "; metadata += file_metadata.c_str(); metadata += "} dmd={ "; metadata += container_metadata.c_str(); metadata += "}\""; unbase64 = metadata.c_str(); eos::common::SymKey::Base64(unbase64, base64); execargs.replace("", base64.c_str()); } execargs.replace("", mActions[0].mAction.c_str()); std::string bashcmd = EOS_WFE_BASH_PREFIX + executable + " " + execargs.c_str(); if (!format_error) { eos::common::ShellCmd cmd(bashcmd); eos_static_info("shell-cmd=\"%s\"", bashcmd.c_str()); eos::common::cmd_status rc = cmd.wait(1800); // retrieve the stderr of this command XrdOucString outerr; char buff[65536]; int end; memset(buff, 0, sizeof(buff)); while ((end = ::read(cmd.errfd, buff, sizeof(buff))) > 0) { outerr += buff; memset(buff, 0, sizeof(buff)); } eos_static_info("shell-cmd-stderr=%s", outerr.c_str()); // scan for result tags referencing the trigger path xstart = 0; cnt = 0; while ((xstart = outerr.find(" 256) { break; } int xend = outerr.find(">", xstart); if (xend == STR_NPOS) { eos_static_err("malformed shell stderr tag"); break; } else { std::string key; std::string value; key.assign(outerr.c_str() + xstart + 24, xend - xstart - 24); int vend = outerr.find(" ", xend + 1); if (vend > 0) { value.assign(outerr.c_str(), xend + 1, vend - (xend + 1)); } else { value.assign(outerr.c_str(), xend + 1, string::npos); } // remove a possible line feed from the value while (value.length() && (value[value.length() - 1] == '\n')) { value.erase(value.length() - 1); } eos::Prefetcher::prefetchFileMDWithParentsAndWait(gOFS->eosView, mFid); eos::common::RWMutexWriteLock nsLock(gOFS->eosViewRWMutex); try { fmd = gOFS->eosFileService->getFileMD(mFid); base64 = value.c_str(); eos::common::SymKey::DeBase64(base64, unbase64); fmd->setAttribute(key, unbase64.c_str()); fmd->setMTimeNow(); gOFS->eosView->updateFileStore(fmd.get()); errno = 0; eos_static_info("msg=\"stored extended attribute\" key=%s value=%s", key.c_str(), value.c_str()); } catch (eos::MDException& e) { eos_static_err("msg=\"failed set extended attribute\" key=%s value=%s", key.c_str(), value.c_str()); } } xstart++; } retc = rc.exit_code; if (rc.exit_code) { eos_static_err("msg=\"failed to run bash workflow\" job=\"%s\" retc=%d", mDescription.c_str(), rc.exit_code); int retry = 0; time_t delay = 0; if (rc.exit_code == EAGAIN) { try { std::string retryattr = "sys.workflow." + mActions[0].mEvent + "." + mActions[0].mWorkflow + ".retry.max"; std::string delayattr = "sys.workflow." + mActions[0].mEvent + "." + mActions[0].mWorkflow + ".retry.delay"; eos_static_info("%s %s", retryattr.c_str(), delayattr.c_str()); std::string value = ccmd->getAttribute(retryattr); retry = (int)strtoul(value.c_str(), 0, 10); value = ccmd->getAttribute(delayattr); delay = (int)strtoul(value.c_str(), 0, 10); } catch (eos::MDException& e) { execargs.insert("UNDEF", xstart); } if (!IsSync() && (mRetry < retry)) { storetime = (time_t) mActions[0].mTime + delay; // can retry Move("r", "e", storetime, ++mRetry); XrdOucString log = "scheduled for retry"; Results("e", EAGAIN, log, storetime); } else { storetime = (time_t) mActions[0].mTime; // can not retry Move("r", "f", storetime, mRetry); XrdOucString log = "workflow failed without possibility to retry"; Results("f", rc.exit_code, log, storetime); } } else { storetime = 0; // can not retry Move("r", "f", storetime); XrdOucString log = "workflow failed without possibility to retry"; Results("f", rc.exit_code, log, storetime); } } else { eos_static_info("msg=\"done bash workflow\" job=\"%s\"", mDescription.c_str()); storetime = 0; Move("r", "d", storetime); XrdOucString log = "workflow succeeded"; Results("d", rc.exit_code, log, storetime); } // scan for result tags referencing the workflow path xstart = 0; cnt = 0; while ((xstart = outerr.find(" 256) { break; } int xend = outerr.find(">", xstart); if (xend == STR_NPOS) { eos_static_err("malformed shell stderr tag"); break; } else { std::string key; std::string value; key.assign(outerr.c_str() + xstart + 25, xend - xstart - 25); int vend = outerr.find(" ", xend + 1); if (vend > 0) { value.assign(outerr.c_str(), xend + 1, vend - (xend + 1)); } else { value.assign(outerr.c_str(), xend + 1, string::npos); } eos::Prefetcher::prefetchFileMDAndWait(gOFS->eosView, mWorkflowPath); eos::common::RWMutexWriteLock nsLock(gOFS->eosViewRWMutex); try { fmd = gOFS->eosView->getFile(mWorkflowPath); base64 = value.c_str(); eos::common::SymKey::DeBase64(base64, unbase64); fmd->setAttribute(key, unbase64.c_str()); fmd->setMTimeNow(); gOFS->eosView->updateFileStore(fmd.get()); errno = 0; eos_static_info("msg=\"stored extended attribute on vpath\" vpath=%s key=%s value=%s", mWorkflowPath.c_str(), key.c_str(), value.c_str()); } catch (eos::MDException& e) { eos_static_err("msg=\"failed set extended attribute\" key=%s value=%s", key.c_str(), value.c_str()); } } xstart++; } } else { retc = EINVAL; storetime = 0; // cannot retry Move(mActions[0].mQueue, "f", storetime); XrdOucString log = "workflow failed to invalid arguments"; Results("f", retc, log, storetime); } } else { storetime = 0; retc = EINVAL; viewReadLock.Release(); eos_static_err("msg=\"failed to run bash workflow - file gone\" job=\"%s\"", mDescription.c_str()); Move(mActions[0].mQueue, "g", storetime); XrdOucString log = "workflow failed to invalid arguments - file is gone"; Results("g", retc, log, storetime); } } else { storetime = 0; retc = EINVAL; eos_static_err("msg=\"failed to run bash workflow - executable name modifies path\" job=\"%s\"", mDescription.c_str()); Move(mActions[0].mQueue, "g", storetime); } } else if (gOFS->mTapeEnabled && method == "proto") { return HandleProtoMethodEvents(errorMsg, ininfo); } else { storetime = 0; eos_static_err("msg=\"moving unknown workflow\" job=\"%s\"", mDescription.c_str()); Move(mActions[0].mQueue, "g", storetime); XrdOucString log = "workflow is not known"; Results("g", EINVAL, log, storetime); } } else { storetime = 0; retc = EINVAL; eos_static_err("msg=\"moving illegal workflow\" job=\"%s\"", mDescription.c_str()); Move(mActions[0].mQueue, "g", storetime); XrdOucString log = "workflow illegal"; Results("g", retc, log, storetime); } } else { //Delete(mActions[0].mQueue); } return retc; } /*----------------------------------------------------------------------------*/ /** * @brief Handles "proto" method events * @param errorMsg out parameter giving the text of any error * @return SFS_OK if success */ /*----------------------------------------------------------------------------*/ int WFE::Job::HandleProtoMethodEvents(std::string& errorMsg, const char* const ininfo) { const auto event = mActions[0].mEvent; std::string fullPath; try { eos::Prefetcher::prefetchFileMDWithParentsAndWait(gOFS->eosView, mFid); eos::common::RWMutexReadLock rlock(gOFS->eosViewRWMutex); auto fmd = gOFS->eosFileService->getFileMD(mFid); fullPath = gOFS->eosView->getUri(fmd.get()); } catch (eos::MDException& e) { eos_static_err("Could not get metadata for file %u. Reason: %s", mFid, e.getMessage().str().c_str()); MoveWithResults(ENOENT); return ENOENT; } { std::string opaqueRequestIdStr; if (nullptr != ininfo) { XrdOucEnv opaque(ininfo); const char* const opaqueRequestId = opaque.Get("mgm.reqid"); if (nullptr != opaqueRequestId) { opaqueRequestIdStr = opaqueRequestId; } } auto eventUpperCase = event; std::transform(eventUpperCase.begin(), eventUpperCase.end(), eventUpperCase.begin(), [](unsigned char c) { return std::toupper(c); } ); eos_static_info("%s %s %s %s fxid=%08llx mgm.reqid=\"%s\"", mActions[0].mWorkflow.c_str(), eventUpperCase.c_str(), fullPath.c_str(), gOFS->ProtoWFEndPoint.c_str(), mFid, opaqueRequestIdStr.c_str()); } if (event == "sync::prepare" || event == "prepare") { return HandleProtoMethodPrepareEvent(fullPath, ininfo, errorMsg); } else if (event == "sync::abort_prepare" || event == "abort_prepare") { return HandleProtoMethodAbortPrepareEvent(fullPath, ininfo, errorMsg); } else if (event == "sync::evict_prepare" || event == "evict_prepare") { return HandleProtoMethodEvictPrepareEvent(fullPath, ininfo, errorMsg); } else if (event == "sync::create" || event == "create") { return HandleProtoMethodCreateEvent(fullPath, errorMsg); } else if (event == "sync::delete" || event == "delete") { return HandleProtoMethodDeleteEvent(fullPath, errorMsg); } else if (event == "sync::closew" || event == "closew") { return HandleProtoMethodCloseEvent(event, fullPath, ininfo); } else if (event == "sync::archived" || event == "archived") { return HandleProtoMethodArchivedEvent(event, fullPath, ininfo); } else if (event == RETRIEVE_FAILED_WORKFLOW_NAME) { return HandleProtoMethodRetrieveFailedEvent(fullPath); } else if (event == ARCHIVE_FAILED_WORKFLOW_NAME) { return HandleProtoMethodArchiveFailedEvent(fullPath); } else if (event == "sync::offline" || event == "offline") { return HandleProtoMethodOfflineEvent(fullPath, ininfo, errorMsg); } else if (event == "sync::update_fid" || event == "update_fid") { return HandleProtoMethodUpdateFidEvent(fullPath, errorMsg); } else { eos_static_err("Unknown event %s for proto workflow", event.c_str()); MoveWithResults(SFS_ERROR); return SFS_ERROR; } } int WFE::Job::HandleProtoMethodPrepareEvent(const std::string& fullPath, const char* const ininfo, std::string& errorMsg) { EXEC_TIMING_BEGIN("Proto::Prepare"); gOFS->MgmStats.Add("Proto::Prepare", 0, 0, 1); const std::string prepareRequestId = GetPrepareRequestIdFromOpaqueData(ininfo); const std::string prepareActivity = GetPrepareActivityFromOpaqueData(ininfo); const int prepareRc = IdempotentPrepare(fullPath, prepareRequestId, prepareActivity, errorMsg); EXEC_TIMING_END("Proto::Prepare"); return prepareRc; } std::string WFE::Job::GetPrepareRequestIdFromOpaqueData(const char* const ininfo) { // Get the new request ID from the opaque data and add it to the list XrdOucEnv opaque(ininfo); const char* const prepareRequestId = opaque.Get("mgm.reqid"); if (prepareRequestId == nullptr) { throw_mdexception(EINVAL, "mgm.reqid does not exist in opaque data."); } else if (*prepareRequestId == '\0') { throw_mdexception(EINVAL, "mgm.reqid has no value set in opaque data."); } return prepareRequestId; } std::string WFE::Job::GetPrepareActivityFromOpaqueData(const char* const ininfo) { // Get the activity from the opaque data and add it to the list XrdOucEnv opaque(ininfo); const char* const prepareActivity = opaque.Get("activity"); if (prepareActivity == nullptr) { return ""; } return prepareActivity; } int WFE::Job::IdempotentPrepare(const std::string& fullPath, const std::string& prepareRequestId, const std::string& prepareActivity, std::string& errorMsg) { using namespace std::chrono; struct stat buf; XrdOucErrInfo errInfo; bool onDisk; bool onTape; struct timespec ts_now; eos::common::Timing::GetTimeSpec(ts_now); EosCtaReporterPrepareWfe eosLog; eosLog .addParam(EosCtaReportParam::SEC_APP, "tape_wfe") .addParam(EosCtaReportParam::LOG, std::string(gOFS->logId)) .addParam(EosCtaReportParam::PATH, fullPath) .addParam(EosCtaReportParam::RUID, mVid.uid) .addParam(EosCtaReportParam::RGID, mVid.gid) .addParam(EosCtaReportParam::TD, mVid.tident.c_str()) .addParam(EosCtaReportParam::PREP_WFE_EVENT, "stage") .addParam(EosCtaReportParam::PREP_WFE_ACTIVITY, prepareActivity) .addParam(EosCtaReportParam::TS, ts_now.tv_sec) .addParam(EosCtaReportParam::TNS, ts_now.tv_nsec); // Check if we have a disk replica and if not, whether it's on tape if (gOFS->_stat(fullPath.c_str(), &buf, errInfo, mVid, nullptr, nullptr, false) == 0) { // Note that buf.st_mode is an unsigned integer onDisk = (buf.st_mode & EOS_TAPE_MODE_T) ? buf.st_nlink > 1 : buf.st_nlink > 0; onTape = (buf.st_mode & EOS_TAPE_MODE_T) != 0; eosLog .addParam(EosCtaReportParam::PREP_WFE_ONDISK, onDisk) .addParam(EosCtaReportParam::PREP_WFE_ONTAPE, onTape); } else { std::stringstream err_message; err_message << "Cannot determine file and disk replicas, not doing the prepare. Reason: " << errInfo.getErrText(); eos_static_err(err_message.str().c_str()); eosLog .addParam(EosCtaReportParam::PREP_WFE_SENTTOCTA, false) .addParam(EosCtaReportParam::PREP_WFE_ERROR, err_message.str()); MoveWithResults(EAGAIN); return EAGAIN; } uid_t cuid = 99; gid_t cgid = 99; if (onDisk) { eos_static_info("File %s is already on disk, nothing to prepare. Eviction counter will be incremented.", fullPath.c_str()); try { int evictionCounter = 0; eos::common::RWMutexWriteLock lock(gOFS->eosViewRWMutex); auto fmd = gOFS->eosFileService->getFileMD(mFid); if (fmd->hasAttribute(RETRIEVE_EVICT_COUNTER_NAME)) { evictionCounter = std::stoi(fmd->getAttribute(RETRIEVE_EVICT_COUNTER_NAME)); } fmd->setAttribute(RETRIEVE_EVICT_COUNTER_NAME, std::to_string(++evictionCounter)); gOFS->eosView->updateFileStore(fmd.get()); eosLog.addParam(EosCtaReportParam::PREP_WFE_EVICTCOUNTER, evictionCounter); } catch (eos::MDException& ex) { std::stringstream err_message; err_message << "msg=\"could not update eviction counter for file " << fullPath; eos_static_err(err_message.str().c_str()); eosLog.addParam(EosCtaReportParam::PREP_WFE_ERROR, err_message.str()); } eosLog.addParam(EosCtaReportParam::PREP_WFE_SENTTOCTA, false); MoveWithResults(SFS_OK); return SFS_OK; } else if (!onTape) { std::stringstream err_message; err_message << "File " << fullPath << " is not on disk nor on tape, cannot prepare it."; eos_static_err(err_message.str().c_str()); eosLog .addParam(EosCtaReportParam::PREP_WFE_SENTTOCTA, false) .addParam(EosCtaReportParam::PREP_WFE_ERROR, err_message.str()); MoveWithResults(ENODATA); return ENODATA; } else { eos::common::RWMutexWriteLock lock; lock.Grab(gOFS->eosViewRWMutex); auto fmd = gOFS->eosFileService->getFileMD(mFid); // Get the list of in-flight Prepare requests for this file (if any) XattrSet prepareReqIds; if (fmd->hasAttribute(RETRIEVE_REQID_ATTR_NAME)) { prepareReqIds.deserialize(fmd->getAttribute(RETRIEVE_REQID_ATTR_NAME)); } bool isFirstPrepare = prepareReqIds.values.empty(); prepareReqIds.values.insert(prepareRequestId.c_str()); eosLog .addParam(EosCtaReportParam::PREP_WFE_FIRSTPREPARE, isFirstPrepare) .addParam(EosCtaReportParam::PREP_WFE_REQID, prepareRequestId) .addParam(EosCtaReportParam::PREP_WFE_REQCOUNT, prepareReqIds.values.size()) .addParam(EosCtaReportParam::PREP_WFE_REQLIST, prepareReqIds.serialize()); try { fmd->setAttribute(RETRIEVE_REQID_ATTR_NAME, prepareReqIds.serialize()); // if we are the first to retrieve the file if (isFirstPrepare) { fmd->setAttribute(RETRIEVE_ERROR_ATTR_NAME, ""); // Read these attributes here to optimize locking cuid = fmd->getCUid(); cgid = fmd->getCGid(); gOFS->eosView->updateFileStore(fmd.get()); } else { eos_static_info("File %s is already being retrieved by %u clients.", fullPath.c_str(), prepareReqIds.values.size() - 1); eosLog.addParam(EosCtaReportParam::PREP_WFE_SENTTOCTA, false); MoveWithResults(SFS_OK); return SFS_OK; } } catch (eos::MDException& ex) { lock.Release(); std::stringstream err_message; err_message << "Could not write attributes " << RETRIEVE_REQID_ATTR_NAME << " and " << RETRIEVE_ERROR_ATTR_NAME << " for file " << fullPath.c_str() << ". Not doing the retrieve."; eos_static_err(err_message.str().c_str()); eosLog .addParam(EosCtaReportParam::PREP_WFE_SENTTOCTA, false) .addParam(EosCtaReportParam::PREP_WFE_ERROR, err_message.str()); MoveWithResults(EAGAIN); return EAGAIN; } } // If we reached this point: the file is not on disk, it is on tape, and this is the first Prepare // request for this file. Proceed with issuing the Prepare request to the tape back-end. cta::xrd::Request request; auto notification = request.mutable_notification(); notification->mutable_cli()->mutable_user()->set_username(GetUserName( mVid.uid)); notification->mutable_cli()->mutable_user()->set_groupname(GetGroupName( mVid.gid)); auto xAttrs = CollectAttributes(fullPath); for (const auto& attribute : xAttrs) { google::protobuf::MapPair attr(attribute.first, attribute.second); notification->mutable_file()->mutable_xattr()->insert(attr); } if (prepareActivity.length()) { google::protobuf::MapPair attr("activity", prepareActivity); notification->mutable_file()->mutable_xattr()->insert(attr); } notification->mutable_wf()->set_event(cta::eos::Workflow::PREPARE); notification->mutable_file()->set_lpath(fullPath); notification->mutable_wf()->mutable_instance()->set_name( gOFS->MgmOfsInstanceName.c_str()); notification->mutable_file()->set_fid(mFid); notification->mutable_file()->mutable_owner()->set_uid(cuid); notification->mutable_file()->mutable_owner()->set_gid(cgid); notification->mutable_file()->set_disk_file_id(std::to_string(mFid)); if (xAttrs.count(ARCHIVE_FILE_ID_ATTR_NAME)) { notification->mutable_file()->set_archive_file_id (CtaUtils::toUint64(xAttrs[ARCHIVE_FILE_ID_ATTR_NAME])); } if (xAttrs.count(ARCHIVE_STORAGE_CLASS_ATTR_NAME)) { notification->mutable_file()->set_storage_class( xAttrs[ARCHIVE_STORAGE_CLASS_ATTR_NAME]); } if (xAttrs.count(EOS_BTIME)) { eos::IFileMD::ctime_t btime {0, 0}; Timing::Timespec_from_TimespecStr(xAttrs[EOS_BTIME], btime); notification->mutable_file()->mutable_btime()->set_sec(btime.tv_sec); notification->mutable_file()->mutable_btime()->set_nsec(btime.tv_nsec); } auto fxidString = StringConversion::FastUnsignedToAsciiHex(mFid); std::ostringstream destStream; std::string mgmHostName; if (gOFS->MgmOfsAlias.length()) { mgmHostName = gOFS->MgmOfsAlias.c_str(); } else if (gOFS->HostName != nullptr) { mgmHostName = gOFS->HostName; } else { std::stringstream err_message; err_message << "IdempotentPrepare() failed: Could not determine the value of mgmHostName"; eos_static_err(err_message.str().c_str()); eosLog .addParam(EosCtaReportParam::PREP_WFE_SENTTOCTA, false) .addParam(EosCtaReportParam::PREP_WFE_ERROR, err_message.str()); MoveWithResults(ENODATA); return ENODATA; } eosLog.addParam(EosCtaReportParam::HOST, mgmHostName); destStream << "root://" << mgmHostName << "/" << fullPath << "?eos.lfn=fxid:" << fxidString; destStream << "&eos.ruid=0&eos.rgid=0&eos.injection=1&eos.workflow=" << RETRIEVE_WRITTEN_WORKFLOW_NAME << "&eos.space=" << gOFS->mPrepareDestSpace; notification->mutable_transport()->set_dst_url(destStream.str()); std::ostringstream errorReportStream; errorReportStream << "eosQuery://" << mgmHostName << "//eos/wfe/passwd?mgm.pcmd=event&mgm.fid=" << fxidString << "&mgm.logid=cta&mgm.event=" << RETRIEVE_FAILED_WORKFLOW_NAME << "&mgm.workflow=default&mgm.path=/dummy_path&mgm.ruid=0&mgm.rgid=0&mgm.errmsg="; notification->mutable_transport()->set_error_report_url( errorReportStream.str()); auto sendResult = SendProtoWFRequest(this, fullPath, request, errorMsg); // Create timestamp std::string ctimestr(std::to_string(system_clock::to_time_t( system_clock::now()))); if (sendResult == 0) { // Update the timestamp of the last Prepare request that was successfully sent eos::common::RWMutexWriteLock lock(gOFS->eosViewRWMutex); auto fmd = gOFS->eosFileService->getFileMD(mFid); try { fmd->setAttribute(RETRIEVE_REQTIME_ATTR_NAME, ctimestr); gOFS->eosView->updateFileStore(fmd.get()); } catch (eos::MDException& ex) { // fail silently if we couldn't update the timestamp } } else { if (errorMsg.empty()) { errorMsg = "Prepare handshake failed"; } std::string errorMsgAttr = ctimestr + " -> " + errorMsg; eos::common::RWMutexWriteLock lock(gOFS->eosViewRWMutex); auto fmd = gOFS->eosFileService->getFileMD(mFid); try { // Delete the request ID from the extended attributes so it can be retried fmd->setAttribute(RETRIEVE_REQID_ATTR_NAME, ""); fmd->setAttribute(RETRIEVE_REQTIME_ATTR_NAME, ""); fmd->setAttribute(RETRIEVE_ERROR_ATTR_NAME, errorMsgAttr); gOFS->eosView->updateFileStore(fmd.get()); } catch (eos::MDException& ex) {} } eosLog.addParam(EosCtaReportParam::PREP_WFE_SENTTOCTA, true); return sendResult; } int WFE::Job::HandleProtoMethodAbortPrepareEvent(const std::string& fullPath, const char* const ininfo, std::string& errorMsg) { EXEC_TIMING_BEGIN("Proto::Prepare::Abort"); gOFS->MgmStats.Add("Proto::Prepare::Abort", 0, 0, 1); EosCtaReporterPrepareWfe eosLog; eosLog .addParam(EosCtaReportParam::SEC_APP, "tape_wfe") .addParam(EosCtaReportParam::LOG, std::string(gOFS->logId)) .addParam(EosCtaReportParam::PATH, fullPath) .addParam(EosCtaReportParam::RUID, mVid.uid) .addParam(EosCtaReportParam::RGID, mVid.gid) .addParam(EosCtaReportParam::TD, mVid.tident.c_str()) .addParam(EosCtaReportParam::PREP_WFE_EVENT, "abort"); XattrSet prepareReqIds; { eos::common::RWMutexWriteLock lock; lock.Grab(gOFS->eosViewRWMutex); auto fmd = gOFS->eosFileService->getFileMD(mFid); try { if (fmd->hasAttribute(RETRIEVE_REQID_ATTR_NAME)) { prepareReqIds.deserialize(fmd->getAttribute(RETRIEVE_REQID_ATTR_NAME)); } eosLog .addParam(EosCtaReportParam::PREP_WFE_REQCOUNT, prepareReqIds.values.size()) .addParam(EosCtaReportParam::PREP_WFE_REQLIST, prepareReqIds.serialize()); } catch (...) { lock.Release(); std::stringstream err_message; err_message << "Could not determine ongoing retrieves for file " << fullPath.c_str() << ". Check the " << RETRIEVE_REQID_ATTR_NAME << " extended attribute"; eos_static_err(err_message.str().c_str()); eosLog.addParam(EosCtaReportParam::PREP_WFE_ERROR, err_message.str()); MoveWithResults(EAGAIN); return EAGAIN; } std::string opaqueRequestIdStr; try { // Remove the request ID from the list in the Xattr XrdOucEnv opaque(ininfo); const char* const opaqueRequestId = opaque.Get("mgm.reqid"); if (nullptr != opaqueRequestId) { opaqueRequestIdStr = opaqueRequestId; } if (opaqueRequestId == nullptr) { throw_mdexception(EINVAL, "mgm.reqid does not exist in opaque data."); } else if (*opaqueRequestId == '\0') { throw_mdexception(EINVAL, "mgm.reqid has no value set in opaque data."); } eosLog.addParam(EosCtaReportParam::PREP_WFE_REQID, opaqueRequestId); if (prepareReqIds.values.erase(opaqueRequestId) != 1) { throw_mdexception(EINVAL, "Request ID not found in extended attributes"); } fmd->setAttribute(RETRIEVE_REQID_ATTR_NAME, prepareReqIds.serialize()); gOFS->eosView->updateFileStore(fmd.get()); } catch (eos::MDException& ex) { lock.Release(); std::stringstream err_message; err_message << "Error accessing attribute " << RETRIEVE_REQID_ATTR_NAME << " for file " << fullPath.c_str() << ". " << "Not doing the abort retrieve."; eos_static_err(err_message.str().c_str()); eosLog.addParam(EosCtaReportParam::PREP_WFE_ERROR, err_message.str()); MoveWithResults(EAGAIN); return EAGAIN; } } if (!prepareReqIds.values.empty()) { // There are other pending Prepare requests on this file, just return OK eosLog.addParam(EosCtaReportParam::PREP_WFE_SENTTOCTA, false); MoveWithResults(SFS_OK); return SFS_OK; } // optimization for reduced memory IO during write lock cta::xrd::Request request; auto notification = request.mutable_notification(); notification->mutable_cli()->mutable_user()->set_username(GetUserName( mVid.uid)); notification->mutable_cli()->mutable_user()->set_groupname(GetGroupName( mVid.gid)); auto xAttrs = CollectAttributes(fullPath); for (const auto& attribute : xAttrs) { google::protobuf::MapPair attr(attribute.first, attribute.second); notification->mutable_file()->mutable_xattr()->insert(attr); } uid_t cuid = 99; gid_t cgid = 99; { eos::common::RWMutexReadLock rlock(gOFS->eosViewRWMutex); auto fmd = gOFS->eosFileService->getFileMD(mFid); cuid = fmd->getCUid(); cgid = fmd->getCGid(); } notification->mutable_file()->mutable_owner()->set_uid(cuid); notification->mutable_file()->mutable_owner()->set_gid(cgid); notification->mutable_wf()->set_event(cta::eos::Workflow::ABORT_PREPARE); notification->mutable_file()->set_lpath(fullPath); notification->mutable_wf()->mutable_instance()->set_name( gOFS->MgmOfsInstanceName.c_str()); notification->mutable_file()->set_fid(mFid); notification->mutable_file()->set_disk_file_id(std::to_string(mFid)); if (xAttrs.count(ARCHIVE_FILE_ID_ATTR_NAME)) { notification->mutable_file()->set_archive_file_id (CtaUtils::toUint64(xAttrs[ARCHIVE_FILE_ID_ATTR_NAME])); } if (xAttrs.count(ARCHIVE_STORAGE_CLASS_ATTR_NAME)) { notification->mutable_file()->set_storage_class( xAttrs[ARCHIVE_STORAGE_CLASS_ATTR_NAME]); } if (xAttrs.count(EOS_BTIME)) { eos::IFileMD::ctime_t btime {0, 0}; Timing::Timespec_from_TimespecStr(xAttrs[EOS_BTIME], btime); notification->mutable_file()->mutable_btime()->set_sec(btime.tv_sec); notification->mutable_file()->mutable_btime()->set_nsec(btime.tv_nsec); } auto s_ret = SendProtoWFRequest(this, fullPath, request, errorMsg); if (s_ret == 0) { eos::common::RWMutexWriteLock lock(gOFS->eosViewRWMutex); auto fmd = gOFS->eosFileService->getFileMD(mFid); try { // All Prepare requests cancelled by the user: // Delete the request time and error message from the extended attributes fmd->setAttribute(RETRIEVE_REQTIME_ATTR_NAME, ""); fmd->setAttribute(RETRIEVE_ERROR_ATTR_NAME, ""); gOFS->eosView->updateFileStore(fmd.get()); } catch (eos::MDException& ex) {} } eosLog.addParam(EosCtaReportParam::PREP_WFE_SENTTOCTA, true); EXEC_TIMING_END("Proto::Prepare::Abort"); return s_ret; } int WFE::Job::HandleProtoMethodEvictPrepareEvent(const std::string& fullPath, const char* const ininfo, std::string& errorMsg) { using namespace std::chrono; struct stat buf; XrdOucErrInfo errInfo; bool onDisk; bool onTape; EosCtaReporterPrepareWfe eosLog; eosLog .addParam(EosCtaReportParam::SEC_APP, "tape_wfe") .addParam(EosCtaReportParam::LOG, std::string(gOFS->logId)) .addParam(EosCtaReportParam::PATH, fullPath) .addParam(EosCtaReportParam::RUID, mVid.uid) .addParam(EosCtaReportParam::RGID, mVid.gid) .addParam(EosCtaReportParam::TD, mVid.tident.c_str()) .addParam(EosCtaReportParam::PREP_WFE_EVENT, "evict"); EXEC_TIMING_BEGIN("Proto::EvictPrepare"); gOFS->MgmStats.Add("Proto::EvictPrepare", 0, 0, 1); std::ostringstream preamble; preamble << "fxid=" << std::hex << mFid << " file=" << fullPath; // Check if we have a disk replica and if not, whether it's on tape if (gOFS->_stat(fullPath.c_str(), &buf, errInfo, mVid, nullptr, nullptr, false) == 0) { onDisk = ((buf.st_mode & EOS_TAPE_MODE_T) ? buf.st_nlink - 1 : buf.st_nlink) > 0; onTape = (buf.st_mode & EOS_TAPE_MODE_T) != 0; eosLog .addParam(EosCtaReportParam::PREP_WFE_ONDISK, onDisk) .addParam(EosCtaReportParam::PREP_WFE_ONTAPE, onTape); } else { std::ostringstream msg; msg << preamble.str() << " msg=\"Cannot determine file and disk replicas, not doing the evict. Reason: " << errInfo.getErrText() << "\""; eos_static_err(msg.str().c_str()); eosLog.addParam(EosCtaReportParam::PREP_WFE_ERROR, msg.str()); MoveWithResults(EAGAIN); return EAGAIN; } if (!onDisk) { std::ostringstream msg; msg << preamble.str() << " msg=\"File is not on disk, nothing to evict.\""; eos_static_info(msg.str().c_str()); } else if (!onTape) { std::ostringstream msg; msg << preamble.str() << " msg=\"File is not on tape, cannot evict it.\""; eos_static_err(msg.str().c_str()); eosLog.addParam(EosCtaReportParam::PREP_WFE_ERROR, msg.str()); MoveWithResults(ENODATA); return ENODATA; } else { const auto result = EvictAsRoot(mFid); if (0 == result.retc()) { std::ostringstream msg; msg << preamble.str() << " msg=\"Successfully issued evict for evict_prepare event\""; eos_static_info(msg.str().c_str()); } else { std::ostringstream msg; msg << preamble.str() << " msg=\"Failed to issue evict for evict_prepare event\""; eos_static_info(msg.str().c_str()); eosLog.addParam(EosCtaReportParam::PREP_WFE_ERROR, msg.str()); MoveWithResults(EAGAIN); return EAGAIN; } } MoveWithResults(SFS_OK); EXEC_TIMING_END("Proto::EvictPrepare"); return SFS_OK; } int WFE::Job::HandleProtoMethodCreateEvent(const std::string& fullPath, std::string& errorMsg) { EXEC_TIMING_BEGIN("Proto::Create"); gOFS->MgmStats.Add("Proto::Create", 0, 0, 1); cta::xrd::Request request; auto notification = request.mutable_notification(); notification->mutable_cli()->mutable_user()->set_username(GetUserName( mVid.uid)); notification->mutable_cli()->mutable_user()->set_groupname(GetGroupName( mVid.gid)); auto xAttrs = CollectAttributes(fullPath); for (const auto& attribute : xAttrs) { google::protobuf::MapPair attr(attribute.first, attribute.second); notification->mutable_file()->mutable_xattr()->insert(attr); } uid_t cuid = 99; gid_t cgid = 99; { eos::common::RWMutexReadLock rlock(gOFS->eosViewRWMutex); auto fmd = gOFS->eosFileService->getFileMD(mFid); cuid = fmd->getCUid(); cgid = fmd->getCGid(); } notification->mutable_file()->mutable_owner()->set_uid(cuid); notification->mutable_file()->mutable_owner()->set_gid(cgid); notification->mutable_wf()->set_event(cta::eos::Workflow::CREATE); notification->mutable_wf()->mutable_instance()->set_name( gOFS->MgmOfsInstanceName.c_str()); notification->mutable_file()->set_lpath(fullPath); notification->mutable_file()->set_fid(mFid); notification->mutable_file()->set_disk_file_id(std::to_string(mFid)); if (xAttrs.count(ARCHIVE_FILE_ID_ATTR_NAME)) { notification->mutable_file()->set_archive_file_id (CtaUtils::toUint64(xAttrs[ARCHIVE_FILE_ID_ATTR_NAME])); } if (xAttrs.count(ARCHIVE_STORAGE_CLASS_ATTR_NAME)) { notification->mutable_file()->set_storage_class( xAttrs[ARCHIVE_STORAGE_CLASS_ATTR_NAME]); } if (xAttrs.count(EOS_BTIME)) { eos::IFileMD::ctime_t btime {0, 0}; Timing::Timespec_from_TimespecStr(xAttrs[EOS_BTIME], btime); notification->mutable_file()->mutable_btime()->set_sec(btime.tv_sec); notification->mutable_file()->mutable_btime()->set_nsec(btime.tv_nsec); } auto s_ret = SendProtoWFRequest(this, fullPath, request, errorMsg); EXEC_TIMING_END("Proto::Create"); return s_ret; } int WFE::Job::HandleProtoMethodDeleteEvent(const std::string& fullPath, std::string& errorMsg) { EXEC_TIMING_BEGIN("Proto::Delete"); gOFS->MgmStats.Add("Proto::Delete", 0, 0, 1); cta::xrd::Request request; auto notification = request.mutable_notification(); notification->mutable_cli()->mutable_user()->set_username(GetUserName( mVid.uid)); notification->mutable_cli()->mutable_user()->set_groupname(GetGroupName( mVid.gid)); auto xAttrs = CollectAttributes(fullPath); for (const auto& attribute : xAttrs) { google::protobuf::MapPair attr(attribute.first, attribute.second); notification->mutable_file()->mutable_xattr()->insert(attr); } notification->mutable_wf()->set_event(cta::eos::Workflow::DELETE); notification->mutable_wf()->mutable_instance()->set_name( gOFS->MgmOfsInstanceName.c_str()); notification->mutable_file()->set_lpath(fullPath); notification->mutable_file()->set_fid(mFid); notification->mutable_file()->set_disk_file_id(std::to_string(mFid)); if (xAttrs.count(ARCHIVE_FILE_ID_ATTR_NAME)) { notification->mutable_file()->set_archive_file_id (CtaUtils::toUint64(xAttrs[ARCHIVE_FILE_ID_ATTR_NAME])); } if (xAttrs.count(ARCHIVE_STORAGE_CLASS_ATTR_NAME)) { notification->mutable_file()->set_storage_class( xAttrs[ARCHIVE_STORAGE_CLASS_ATTR_NAME]); } if (xAttrs.count(EOS_BTIME)) { eos::IFileMD::ctime_t btime {0, 0}; Timing::Timespec_from_TimespecStr(xAttrs[EOS_BTIME], btime); notification->mutable_file()->mutable_btime()->set_sec(btime.tv_sec); notification->mutable_file()->mutable_btime()->set_nsec(btime.tv_nsec); } // IMPORTANT // Remove the tape location from the EOS namespace before actually deleting // the tape file(s). Doing these operations the other way around could result // in failing to remove the tape location of the file from the EOS namespace // after successfully deleting the actual tape files. This would give the end // user a false sense of security that their tape file still exists when they // list it. This would be considered as data loss by the end user. // However, before deleting the file from the EOS namespace, we should log its contents. // This provides a way to recover the namespace after an unintended deletion of a file on tape. uint64_t file_size; { struct timespec ts_now; eos::common::Timing::GetTimeSpec(ts_now); EosCtaReporterFileDeletion eosLog; auto fmd = gOFS->eosFileService->getFileMD(mFid); auto locations = fmd->getLocations(); std::ostringstream locationsOStream; bool streamEmpty = true; for (auto& location : fmd->getLocations()) { locationsOStream << (streamEmpty ? "" : ",") << location; streamEmpty = false; } file_size = fmd->getSize(); // Add File Size to notification notification->mutable_file()->set_size(file_size); std::string checksum; eos::appendChecksumOnStringAsHex(fmd.get(), checksum); eosLog.addParam(EosCtaReportParam::SEC_APP, "tape_delete") .addParam(EosCtaReportParam::LOG, std::string(gOFS->logId)) .addParam(EosCtaReportParam::PATH, fullPath) .addParam(EosCtaReportParam::RUID, mVid.uid) .addParam(EosCtaReportParam::RGID, mVid.gid) .addParam(EosCtaReportParam::TD, mVid.tident.c_str()) .addParam(EosCtaReportParam::TS, ts_now.tv_sec) .addParam(EosCtaReportParam::TNS, ts_now.tv_nsec) .addParam(EosCtaReportParam::FILE_DEL_FID, fmd->getId()) .addParam(EosCtaReportParam::FILE_DEL_FXID, eos::common::FileId::Fid2Hex(fmd->getId()).c_str()) .addParam(EosCtaReportParam::FILE_DEL_EOS_BTIME, xAttrs.count(EOS_BTIME) ? xAttrs[EOS_BTIME] : "") .addParam(EosCtaReportParam::FILE_DEL_ARCHIVE_FILE_ID, xAttrs.count(ARCHIVE_FILE_ID_ATTR_NAME) ? xAttrs[ARCHIVE_FILE_ID_ATTR_NAME] : "") .addParam(EosCtaReportParam::FILE_DEL_ARCHIVE_STORAGE_CLASS, xAttrs.count(ARCHIVE_STORAGE_CLASS_ATTR_NAME) ? xAttrs[ARCHIVE_STORAGE_CLASS_ATTR_NAME] : "") .addParam(EosCtaReportParam::FILE_DEL_LOCATIONS, locationsOStream.str()) .addParam(EosCtaReportParam::FILE_DEL_CHECKSUMTYPE, eos::common::LayoutId::GetChecksumString(fmd->getLayoutId())) .addParam(EosCtaReportParam::FILE_DEL_CHECKSUMVALUE, checksum) .addParam(EosCtaReportParam::FILE_DEL_SIZE, fmd->getSize()); // Add checksum to the notification CtaCommon::SetChecksum(notification->mutable_file()->mutable_csb()->add_cs(), fmd->getLayoutId(), checksum); } bool tapeLocationWasRemoved = false; try { // remove tape location eos::common::RWMutexWriteLock lock(gOFS->eosViewRWMutex); auto fmd = gOFS->eosFileService->getFileMD(mFid); fmd->unlinkLocation(TAPE_FS_ID); fmd->removeLocation(TAPE_FS_ID); gOFS->eosView->updateFileStore(fmd.get()); tapeLocationWasRemoved = true; } catch (eos::MDException& ex) { eos_static_err("msg=\"Failed to unlink tape location for file %s", fullPath.c_str()); } if (tapeLocationWasRemoved) { if (xAttrs.count(ARCHIVE_FILE_ID_ATTR_NAME)) { const int sendRc = SendProtoWFRequest(this, fullPath, request, errorMsg); if (SFS_OK != sendRc) { // The EOS namespace can ignore the failed deletion of the tape file(s) as this only generates dark data tape which will be picked up later eos_static_err("msg=\"Failed to notify protocol buffer endpoint about the deletion of file %s: %s\" sendRc=%d", fullPath.c_str(), errorMsg.c_str(), sendRc); } } else { if (file_size == 0) { eos_static_warning( "msg=\"File size is zero and attribute sys.archive.file_id not found. Not sending deletion request to CTA.\""); } else { eos_static_err( "msg=\"File size is %d but attribute sys.archive.file_id not found. Not sending deletion request to CTA.\"", file_size); } } } EXEC_TIMING_END("Proto::Delete"); return SFS_OK; // Ignore any failure in notifying the protocol buffer endpoint } int WFE::Job::HandleProtoMethodCloseEvent(const std::string& event, const std::string& fullPath, const char* const ininfo) { EXEC_TIMING_BEGIN("Proto::Close"); gOFS->MgmStats.Add("Proto::Close", 0, 0, 1); if (mActions[0].mWorkflow == RETRIEVE_WRITTEN_WORKFLOW_NAME) { resetRetrieveIdListAndErrorMsg(fullPath); } { XrdOucEnv opaque(ininfo); const char* const archive_req_id = opaque.Get("mgm.archive_req_id"); if (archive_req_id != nullptr && *archive_req_id != '\0') { try { eos::common::RWMutexWriteLock lock(gOFS->eosViewRWMutex); auto fmd = gOFS->eosFileService->getFileMD(mFid); fmd->setAttribute(CTA_OBJECTSTORE_ARCHIVE_REQ_ID_NAME, archive_req_id); gOFS->eosView->updateFileStore(fmd.get()); } catch (std::exception& se) { eos_static_err("msg=\"Failed to set xattr: %s\" path=\"%s\" xattr_name=\"%s\" xattr_value=\"%s\"", se.what(), fullPath.c_str(), CTA_OBJECTSTORE_ARCHIVE_REQ_ID_NAME, archive_req_id); } catch (...) { eos_static_err("msg=\"Failed to set xattr: Caught an unknown exception\" path=\"%s\" xattr_name=\"%s\"" " xattr_value=\"%s\"", fullPath.c_str(), CTA_OBJECTSTORE_ARCHIVE_REQ_ID_NAME, archive_req_id); } } } MoveWithResults(SFS_OK); EXEC_TIMING_END("Proto::Close"); return SFS_OK; } void WFE::Job::resetRetrieveIdListAndErrorMsg(const std::string& fullPath) { std::string errorMsg; try { eos::common::RWMutexWriteLock lock(gOFS->eosViewRWMutex); XattrSet prepareReqIds; auto fmd = gOFS->eosFileService->getFileMD(mFid); if (fmd->hasAttribute(RETRIEVE_REQID_ATTR_NAME)) { prepareReqIds.deserialize(fmd->getAttribute(RETRIEVE_REQID_ATTR_NAME)); } int evictionCounter = prepareReqIds.values.size(); fmd->setAttribute(RETRIEVE_REQID_ATTR_NAME, ""); fmd->setAttribute(RETRIEVE_REQTIME_ATTR_NAME, ""); fmd->setAttribute(RETRIEVE_ERROR_ATTR_NAME, ""); fmd->setAttribute(RETRIEVE_EVICT_COUNTER_NAME, std::to_string(evictionCounter)); fmd->removeAttribute(CTA_OBJECTSTORE_REQ_ID_NAME); gOFS->eosView->updateFileStore(fmd.get()); return; } catch (std::exception& se) { errorMsg = se.what(); } catch (...) { errorMsg = "Caught an unknown exception"; } // Reaching this point means an exception was thrown and caught eos_static_err("Could not reset retrieves counter and error attribute for file %s: %s", fullPath.c_str(), errorMsg.c_str()); } int WFE::Job::HandleProtoMethodArchivedEvent(const std::string& event, const std::string& fullPath, const char* const ininfo) { EXEC_TIMING_BEGIN("Proto::Archive"); gOFS->MgmStats.Add("Proto::Archive", 0, 0, 1); std::string xattrCtaArchiveFileId; bool hasXattrCtaArchiveFileId = false; { eos::common::RWMutexReadLock lock(gOFS->eosViewRWMutex); auto fmd = gOFS->eosFileService->getFileMD(mFid); hasXattrCtaArchiveFileId = fmd->hasAttribute(ARCHIVE_FILE_ID_ATTR_NAME); if (hasXattrCtaArchiveFileId) { xattrCtaArchiveFileId = fmd->getAttribute(ARCHIVE_FILE_ID_ATTR_NAME); } } bool onlyTapeCopy = false; { eos::common::RWMutexReadLock lock(gOFS->eosViewRWMutex); auto fmd = gOFS->eosFileService->getFileMD(mFid); onlyTapeCopy = fmd->hasLocation(TAPE_FS_ID) && fmd->getLocations().size() == 1; } XrdOucEnv opaque(ininfo); const char* const opaqueCtaArchiveFileId = opaque.Get("cta_archive_file_id"); if (event == "archived") { eos_static_err("The archived message for file %s is asynchronous when it should be synchronous." " Ignoring request", fullPath.c_str()); } else if (onlyTapeCopy) { eos_static_info("File %s already has a tape copy. Ignoring request.", fullPath.c_str()); } else if (!hasXattrCtaArchiveFileId) { eos_static_err("File %s does not have the %s attribute. Ignoring request.", fullPath.c_str(), ARCHIVE_FILE_ID_ATTR_NAME); } else if (xattrCtaArchiveFileId.empty()) { eos_static_err("The %s attribute of file %s is an empty string. Ignoring request.", ARCHIVE_FILE_ID_ATTR_NAME, fullPath.c_str()); } else if (nullptr == opaqueCtaArchiveFileId) { eos_static_err("The opaque data of the archived message for file %s does not contain cta_archive_file_id." " Ignoring request.", fullPath.c_str()); } else if (xattrCtaArchiveFileId != opaqueCtaArchiveFileId) { eos_static_err("The %s attribute of file %s does not match cta_archive_file_id in the" " opaque data of the archived message. xattrCtaArchiveFileId=%s opaqueCtaArchiveFileId=%s." " Ignoring request.", ARCHIVE_FILE_ID_ATTR_NAME, fullPath.c_str(), xattrCtaArchiveFileId.c_str(), opaqueCtaArchiveFileId); } else { eos::common::VirtualIdentity root_vid = eos::common::VirtualIdentity::Root(); { eos::common::RWMutexWriteLock lock(gOFS->eosViewRWMutex); auto fmd = gOFS->eosFileService->getFileMD(mFid); try { fmd->addLocation(TAPE_FS_ID); // Reset the error message fmd->setAttribute(ARCHIVE_ERROR_ATTR_NAME, ""); //Reset the CTA archive request objectstore ID fmd->setAttribute(CTA_OBJECTSTORE_ARCHIVE_REQ_ID_NAME, ""); gOFS->eosView->updateFileStore(fmd.get()); } catch (eos::MDException& ex) { // fail silently - file could have been removed already } } std::string tgcSpace = "default"; { eos::common::RWMutexReadLock fsLock(FsView::gFsView.ViewMutex); eos::common::RWMutexReadLock eosLock(gOFS->eosViewRWMutex); auto fmd = gOFS->eosFileService->getFileMD(mFid); unsigned int firstDiskLocation = 0; for (unsigned int i = 0; i < fmd->getNumLocation(); i++) { const auto loc = fmd->getLocation(i); if (loc != 0 && loc != eos::common::TAPE_FS_ID) { firstDiskLocation = loc; break; } } if (0 != firstDiskLocation) { tgcSpace = FsView::gFsView.mIdView.lookupSpaceByID(firstDiskLocation); } } if (GetFileArchivedGCEnabled(tgcSpace)) { bool dropAllStripes = true; IContainerMD::XAttrMap parentDirAttributes; XrdOucErrInfo errInfo; if (gOFS->_attr_ls(eos::common::Path{fullPath.c_str()} .GetParentPath(), errInfo, root_vid, nullptr, parentDirAttributes, true) == 0) { for (const auto& attrPair : parentDirAttributes) { if (attrPair.first == "sys.wfe.archived.dropdiskreplicas" && attrPair.second == "0") { dropAllStripes = false; } } } errInfo.clear(); if (dropAllStripes && gOFS->_dropallstripes(fullPath.c_str(), errInfo, root_vid) != 0) { eos_static_err("Could not delete all file replicas of %s. Reason: %s", fullPath.c_str(), errInfo.getErrText()); MoveToRetry(fullPath); return EAGAIN; } } } MoveWithResults(SFS_OK); EXEC_TIMING_END("Proto::Archive"); return SFS_OK; } bool WFE::Job::GetFileArchivedGCEnabled(const std::string& space) { const bool defaultValue = false; // The default value of filearchivedgc is 'false' std::string valueStr; try { eos::common::RWMutexReadLock lock(FsView::gFsView.ViewMutex); const auto spaceItor = FsView::gFsView.mSpaceView.find(space); if (FsView::gFsView.mSpaceView.end() == spaceItor) { return defaultValue; } if (nullptr == spaceItor->second) { return defaultValue; } const auto& space = *(spaceItor->second); valueStr = space.GetConfigMember("filearchivedgc"); } catch (...) { return defaultValue; } if (valueStr.empty()) { return defaultValue; } else { return valueStr == "on"; } } int WFE::Job::HandleProtoMethodRetrieveFailedEvent(const std::string& fullPath) { EXEC_TIMING_BEGIN("Proto::Retrieve::Failed"); gOFS->MgmStats.Add("Proto::Retrieve::Failed", 0, 0, 1); try { eos::common::RWMutexWriteLock lock(gOFS->eosViewRWMutex); auto fmd = gOFS->eosFileService->getFileMD(mFid); fmd->setAttribute(RETRIEVE_REQID_ATTR_NAME, ""); fmd->setAttribute(RETRIEVE_REQTIME_ATTR_NAME, ""); fmd->setAttribute(RETRIEVE_ERROR_ATTR_NAME, mErrorMesssage); gOFS->eosView->updateFileStore(fmd.get()); } catch (eos::MDException& ex) { eos_static_err("Could not reset retrieves counter and set retrieve error attribute for file %s.", fullPath.c_str()); MoveWithResults(SFS_ERROR); return SFS_ERROR; } MoveWithResults(SFS_OK); EXEC_TIMING_END("Proto::Retrieve::Failed"); return SFS_OK; } int WFE::Job::HandleProtoMethodArchiveFailedEvent(const std::string& fullPath) { EXEC_TIMING_BEGIN("Proto::Archive::Failed"); gOFS->MgmStats.Add("Proto::Archive::Failed", 0, 0, 1); try { eos::common::RWMutexWriteLock lock(gOFS->eosViewRWMutex); auto fmd = gOFS->eosFileService->getFileMD(mFid); fmd->setAttribute(ARCHIVE_ERROR_ATTR_NAME, mErrorMesssage); gOFS->eosView->updateFileStore(fmd.get()); } catch (eos::MDException& ex) { eos_static_err("Could not set archive error attribute for file %s.", fullPath.c_str()); MoveWithResults(SFS_ERROR); return SFS_ERROR; } MoveWithResults(SFS_OK); EXEC_TIMING_END("Proto::Archive::Failed"); return SFS_OK; } int WFE::Job::HandleProtoMethodOfflineEvent(const std::string& fullPath, const char* const ininfo, std::string& errorMsg) { EXEC_TIMING_BEGIN("Proto::Offline"); gOFS->MgmStats.Add("Proto::Offline", 0, 0, 1); const std::string prepareRequestId = "eos:implicit-prepare"; const std::string prepareActivity = ""; const int prepareRc = IdempotentPrepare(fullPath, prepareRequestId, prepareActivity, errorMsg); EXEC_TIMING_END("Proto::Offline"); return prepareRc; } int WFE::Job::HandleProtoMethodUpdateFidEvent(const std::string& fullPath, std::string& errorMsg) { EXEC_TIMING_BEGIN("Proto::UpdateFid"); gOFS->MgmStats.Add("Proto::UpdateFid", 0, 0, 1); cta::xrd::Request request; auto notification = request.mutable_notification(); notification->mutable_cli()->mutable_user()->set_username(GetUserName( mVid.uid)); notification->mutable_cli()->mutable_user()->set_groupname(GetGroupName( mVid.gid)); auto xAttrs = CollectAttributes(fullPath); for (const auto& attribute : xAttrs) { google::protobuf::MapPair attr(attribute.first, attribute.second); notification->mutable_file()->mutable_xattr()->insert(attr); } notification->mutable_wf()->set_event(cta::eos::Workflow::UPDATE_FID); notification->mutable_wf()->mutable_instance()->set_name( gOFS->MgmOfsInstanceName.c_str()); notification->mutable_file()->set_lpath(fullPath); notification->mutable_file()->set_fid(mFid); notification->mutable_file()->set_disk_file_id(std::to_string(mFid)); if (xAttrs.count(ARCHIVE_FILE_ID_ATTR_NAME)) { notification->mutable_file()->set_archive_file_id (CtaUtils::toUint64(xAttrs[ARCHIVE_FILE_ID_ATTR_NAME])); } if (xAttrs.count(ARCHIVE_STORAGE_CLASS_ATTR_NAME)) { notification->mutable_file()->set_storage_class( xAttrs[ARCHIVE_STORAGE_CLASS_ATTR_NAME]); } if (xAttrs.count(EOS_BTIME)) { eos::IFileMD::ctime_t btime {0, 0}; Timing::Timespec_from_TimespecStr(xAttrs[EOS_BTIME], btime); notification->mutable_file()->mutable_btime()->set_sec(btime.tv_sec); notification->mutable_file()->mutable_btime()->set_nsec(btime.tv_nsec); } const int sendRc = SendProtoWFRequest(this, fullPath, request, errorMsg); if (SFS_OK != sendRc) { eos_static_err("msg=\"Failed to notify protocol buffer endpoint about file ID update %s: %s\" sendRc=%d " "newFxid=%08llx", fullPath.c_str(), errorMsg.c_str(), sendRc, mFid); } EXEC_TIMING_END("Proto::UpdateFid"); return sendRc; } int WFE::Job::SendProtoWFRequest(Job* jobPtr, const std::string& fullPath, const cta::xrd::Request& request, std::string& errorMsg, bool retry) { const std::string& event = jobPtr->mActions[0].mEvent; std::string exec_tag = "Proto::Send::"; exec_tag += event; EXEC_TIMING_BEGIN(exec_tag.c_str()); gOFS->MgmStats.Add(exec_tag.c_str(), 0, 0, 1); if (gOFS->ProtoWFEndPoint.empty() || gOFS->ProtoWFResource.empty()) { eos_static_err("protoWFEndPoint=\"%s\" protoWFResource=\"%s\" fullPath=\"%s\" event=\"%s\" " "msg=\"You are running proto wf jobs without specifying mgmofs.protowfendpoint or " "mgmofs.protowfresource in the MGM config file.\"", gOFS->ProtoWFEndPoint.c_str(), gOFS->ProtoWFResource.c_str(), fullPath.c_str(), event.c_str()); jobPtr->MoveWithResults(ENOTCONN); return ENOTCONN; } XrdSsiPb::Config config; if (getenv("XRDDEBUG")) { config.set("log", "all"); } else { config.set("log", "info"); } config.set("request_timeout", "120"); // Instantiate service object only once, static is thread-safe static XrdSsiPbServiceType service(gOFS->ProtoWFEndPoint, gOFS->ProtoWFResource, config); cta::xrd::Response response; // Send the request try { const auto sentAt = std::chrono::steady_clock::now(); service.Send(request, response); const auto receivedAt = std::chrono::steady_clock::now(); const auto timeSpentMilliseconds = std::chrono::duration_cast (receivedAt - sentAt); eos_static_info("protoWFEndPoint=\"%s\" protoWFResource=\"%s\" fullPath=\"%s\" event=\"%s\" timeSpentMs=%ld " "msg=\"Sent SSI protocol buffer request\"", gOFS->ProtoWFEndPoint.c_str(), gOFS->ProtoWFResource.c_str(), fullPath.c_str(), event.c_str(), timeSpentMilliseconds.count()); } catch (std::runtime_error& error) { eos_static_err("protoWFEndPoint=\"%s\" protoWFResource=\"%s\" fullPath=\"%s\" event=\"%s\" " "msg=\"Could not send SSI protocol buffer request to outside service.\" reason=\"%s\"", gOFS->ProtoWFEndPoint.c_str(), gOFS->ProtoWFResource.c_str(), fullPath.c_str(), event.c_str(), error.what()); errorMsg = error.what(); retry ? jobPtr->MoveToRetry(fullPath) : jobPtr->MoveWithResults(ENOTCONN); return ENOTCONN; } // Handle the response int retval = EPROTO; switch (response.type()) { case cta::xrd::Response::RSP_SUCCESS: { // Set all attributes for file from response eos::common::VirtualIdentity rootvid = eos::common::VirtualIdentity::Root(); XrdOucErrInfo errInfo; for (const auto& attrPair : response.xattr()) { errInfo.clear(); if (gOFS->_attr_set(fullPath.c_str(), errInfo, rootvid, nullptr, attrPair.first.c_str(), attrPair.second.c_str()) != 0) { eos_static_err("protoWFEndPoint=\"%s\" protoWFResource=\"%s\" fullPath=\"%s\" event=\"%s\" " "msg=\"Could not set attribute\" attrName=\"%s\" attrValue=\"%s\" reason=\"%s\"", gOFS->ProtoWFEndPoint.c_str(), gOFS->ProtoWFResource.c_str(), fullPath.c_str(), event.c_str(), attrPair.first.c_str(), attrPair.second.c_str(), errInfo.getErrText()); } } jobPtr->MoveWithResults(SFS_OK); EXEC_TIMING_END(exec_tag.c_str()); return SFS_OK; } // CTA Frontend error; return operation cancelled case cta::xrd::Response::RSP_ERR_CTA: retval = ECANCELED; break; // User error; return operation not permitted case cta::xrd::Response::RSP_ERR_USER: retval = EPERM; break; // Error in Google Protocol buffers; return protocol error case cta::xrd::Response::RSP_ERR_PROTOBUF: retval = EPROTO; break; // Response type was not set or was set to an unknown value; return not a data message case cta::xrd::Response::RSP_INVALID: default: retval = EBADMSG; eos_static_err("protoWFEndPoint=\"%s\" protoWFResource=\"%s\" fullPath=\"%s\" event=\"%s\" " "msg=\"Invalid or unknown response\" response=\"%s\"", gOFS->ProtoWFEndPoint.c_str(), gOFS->ProtoWFResource.c_str(), fullPath.c_str(), event.c_str(), response.DebugString().c_str()); } eos_static_err("protoWFEndPoint=\"%s\" protoWFResource=\"%s\" fullPath=\"%s\" event=\"%s\" " "msg=\"Received an error response\" response=\"%s\" reason=\"%s\"", gOFS->ProtoWFEndPoint.c_str(), gOFS->ProtoWFResource.c_str(), fullPath.c_str(), event.c_str(), CtaCommon::ctaResponseCodeToString(response.type()).c_str(), response.message_txt().c_str()); retry ? jobPtr->MoveToRetry(fullPath) : jobPtr->MoveWithResults(retval); errorMsg = response.message_txt(); return retval; } console::ReplyProto WFE::Job::EvictAsRoot(const eos::IFileMD::id_t fid) { eos::common::VirtualIdentity rootVid = eos::common::VirtualIdentity::Root(); eos::console::RequestProto req; eos::console::EvictProto* evict = req.mutable_evict(); auto file = evict->add_file(); file->set_fid(fid); EvictCmd cmd(std::move(req), rootVid); return cmd.ProcessRequest(); } void WFE::Job::MoveToRetry(const std::string& filePath) { if (!IsSync()) { int retry = 0, delay = 0; std::string retryattr = "sys.workflow." + mActions[0].mEvent + "." + mActions[0].mWorkflow + ".retry.max"; std::string delayattr = "sys.workflow." + mActions[0].mEvent + "." + mActions[0].mWorkflow + ".retry.delay"; eos_static_info("%s %s", retryattr.c_str(), delayattr.c_str()); { eos::common::Path cPath(filePath.c_str()); auto parentPath = cPath.GetParentPath(); eos::Prefetcher::prefetchContainerMDAndWait(gOFS->eosView, parentPath); eos::common::RWMutexReadLock lock(gOFS->eosViewRWMutex); auto cmd = gOFS->eosView->getContainer(parentPath); try { retry = std::stoi(cmd->getAttribute(retryattr)); } catch (...) { // retry 25 times by default retry = 25; } try { delay = std::stoi(cmd->getAttribute(delayattr)); } catch (...) { // retry after 1 hour by default and one final longer wait delay = mRetry == retry - 1 ? 7200 : 3600; } } if (mRetry < retry) { time_t storetime = (time_t) mActions[0].mTime + delay; Move("r", "e", storetime, ++mRetry); Results("e", EAGAIN, "scheduled for retry", storetime); } else { eos_static_err("WF event finally failed for %s event of %s file after %d retries.", mActions[0].mEvent.c_str(), filePath.c_str(), mRetry); MoveWithResults(SFS_ERROR, "e"); } } } void WFE::Job::MoveWithResults(int rcode, std::string fromQueue) { if (!IsSync()) { time_t storetime = 0; if (rcode == 0) { Move(fromQueue, "d", storetime); Results("d", rcode, "moved to done", storetime); } else { Move(fromQueue, "f", storetime); Results("f", rcode, "moved to failed", storetime); } } } std::string WFE::GetGroupName(gid_t gid) { int errc = 0; auto group_name = Mapping::GidToGroupName(gid, errc); if (errc) { group_name = "nobody"; } return group_name; } std::string WFE::GetUserName(uid_t uid) { int errc = 0; auto user_name = Mapping::UidToUserName(uid, errc); if (errc) { user_name = "nobody"; } return user_name; } /*----------------------------------------------------------------------------*/ void WFE::PublishActiveJobs() /*----------------------------------------------------------------------------*/ /** * @brief publish the active job number in the space view * */ /*----------------------------------------------------------------------------*/ { eos::common::RWMutexReadLock lock(FsView::gFsView.ViewMutex); char sactive[256]; snprintf(sactive, sizeof(sactive) - 1, "%u", GetActiveJobs()); FsView::gFsView.mSpaceView["default"]->SetConfigMember("stat.wfe.active", sactive, true); } IContainerMD::XAttrMap WFE::CollectAttributes(const std::string& fullPath) { eos::common::VirtualIdentity rootvid = eos::common::VirtualIdentity::Root(); XrdOucErrInfo errInfo; IContainerMD::XAttrMap fileAttributes, parentDirAttributes, result; if (gOFS->_attr_ls(fullPath.c_str(), errInfo, rootvid, nullptr, fileAttributes, true) == 0) { for (const auto& fileAttrPair : fileAttributes) { // sys.archive.file_id and sys.archive.storage_class are set in the MGM and need to be // communicated to the CTA Frontend. // sys.cta.* is set by the CTA Frontend for internal use, e.g. tracking requests in the // objectstore. if (fileAttrPair.first.find(EOS_BTIME) == 0 || fileAttrPair.first.find("sys.archive.") == 0 || fileAttrPair.first.find("sys.cta.") == 0 || fileAttrPair.first.find("CTA_") == 0) { result.insert(fileAttrPair); } } } errInfo.clear(); if (gOFS->_attr_ls(eos::common::Path{fullPath.c_str()} .GetParentPath(), errInfo, rootvid, nullptr, parentDirAttributes, true) == 0) { for (const auto& dirAttrPair : parentDirAttributes) { if (dirAttrPair.first.find("sys.archive.") == 0 || dirAttrPair.first.find("CTA_") == 0) { result.insert(dirAttrPair); } } } return result; } void WFE::MoveFromRBackToQ() { std::string queries[2]; for (auto& query : queries) { query = gOFS->MgmProcWorkflowPath.c_str(); query += "/"; } { // today time_t when = time(nullptr); std::string day = eos::common::Timing::UnixTimestamp_to_Day(when); queries[0] += day; queries[0] += "/r/"; //yesterday when -= (24 * 3600); day = eos::common::Timing::UnixTimestamp_to_Day(when); queries[1] += day; queries[1] += "/r/"; } std::map> wfedirs; XrdOucErrInfo errInfo; XrdOucString stdErr; eos::common::VirtualIdentity rootvid = eos::common::VirtualIdentity::Root(); for (const auto& query : queries) { gOFS->_find(query.c_str(), errInfo, stdErr, rootvid, wfedirs, nullptr, nullptr, false, 0, false, 0 ); } for (const auto& wfedir : wfedirs) { auto wfEntry = wfedir.first; for (const auto& entry : wfedir.second) { wfEntry += entry; Job job; if (job.Load(wfEntry) == 0) { if (!job.IsSync()) { job.Move("r", "q", job.mActions[0].mTime); } } else { eos_static_err("msg=\"cannot load workflow entry during recycling from r queue\" value=\"%s\"", wfEntry.c_str()); } } } } EOSMGMNAMESPACE_END