// ---------------------------------------------------------------------- // File: XrdMgmOfs.cc // Author: Andreas-Joachim Peters - CERN // ---------------------------------------------------------------------- /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2011 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "common/CommentLog.hh" #include "common/Constants.hh" #include "common/Mapping.hh" #include "common/FileId.hh" #include "common/LayoutId.hh" #include "common/Path.hh" #include "common/SecEntity.hh" #include "common/StackTrace.hh" #include "common/SymKeys.hh" #include "common/ParseUtils.hh" #include "common/http/OwnCloud.hh" #include "common/JeMallocHandler.hh" #include "common/plugin_manager/Plugin.hh" #include "common/plugin_manager/DynamicLibrary.hh" #include "common/plugin_manager/PluginManager.hh" #include "common/Strerror_r_wrapper.hh" #include "common/BufferManager.hh" #include "namespace/Constants.hh" #include "namespace/interface/ContainerIterators.hh" #include "namespace/utils/Attributes.hh" #include "namespace/utils/Checksum.hh" #include "namespace/utils/RenameSafetyCheck.hh" #include "namespace/utils/Stat.hh" #include "namespace/utils/Etag.hh" #include "grpc/GrpcServer.hh" #include "grpc/GrpcWncServer.hh" #include "grpc/GrpcRestGwServer.hh" #include "mgm/AdminSocket.hh" #include "mgm/Stat.hh" #include "mgm/Access.hh" #include "mgm/FileSystem.hh" #include "mgm/XrdMgmOfs.hh" #include "mgm/XrdMgmAuthz.hh" #include "mgm/XrdMgmOfsDirectory.hh" #include "mgm/XrdMgmOfsFile.hh" #include "mgm/XrdMgmOfsTrace.hh" #include "mgm/XrdMgmOfsSecurity.hh" #include "mgm/CommandMap.hh" #include "mgm/Policy.hh" #include "mgm/Quota.hh" #include "mgm/Acl.hh" #include "mgm/Workflow.hh" #include "mgm/proc/ProcInterface.hh" #include "mgm/Recycle.hh" #include "mgm/Devices.hh" #include "mgm/PathRouting.hh" #include "mgm/Macros.hh" #include "mgm/GeoTreeEngine.hh" #include "mgm/Egroup.hh" #include "mgm/http/HttpServer.hh" #include "mgm/ZMQ.hh" #include "mgm/Iostat.hh" #include "mgm/LRU.hh" #include "mgm/WFE.hh" #include "mgm/fsck/Fsck.hh" #include "mgm/IMaster.hh" #include "mgm/convert/ConverterDriver.hh" #include "mgm/FuseServer/FusexCastBatch.hh" #include "mgm/tgc/RealTapeGcMgm.hh" #include "mgm/tgc/MultiSpaceTapeGc.hh" #include "mgm/tracker/ReplicationTracker.hh" #include "mgm/XrdMgmOfs/fsctl/CommitHelper.hh" #include "mgm/XattrLock.hh" #include "mgm/auth/AccessChecker.hh" #include "mgm/config/IConfigEngine.hh" #include "mgm/bulk-request/prepare/manager/PrepareManager.hh" #include "mgm/placement/FsScheduler.hh" #include "mq/SharedHashWrapper.hh" #include "mq/FsChangeListener.hh" #include "mq/GlobalConfigChangeListener.hh" #include "mq/MessagingRealm.hh" #include "mq/QdbListener.hh" #include "namespace/interface/IFsView.hh" #include "namespace/Prefetcher.hh" #include "XrdVersion.hh" #include "XrdOss/XrdOss.hh" #include "XrdOuc/XrdOucBuffer.hh" #include "XrdOuc/XrdOucEnv.hh" #include "XrdOuc/XrdOucTokenizer.hh" #include "XrdOuc/XrdOucTList.hh" #include "XrdOuc/XrdOucTrace.hh" #include "XrdSys/XrdSysError.hh" #include "XrdSys/XrdSysLogger.hh" #include "XrdSys/XrdSysPthread.hh" #include "XrdSec/XrdSecInterface.hh" #include "XrdSfs/XrdSfsAio.hh" #include "XrdSfs/XrdSfsFlags.hh" #include "google/protobuf/io/zero_copy_stream_impl.h" #include "mgm/bulk-request/dao/factories/AbstractDAOFactory.hh" #include "mgm/bulk-request/dao/factories/ProcDirectoryDAOFactory.hh" #include "mgm/bulk-request/business/BulkRequestBusiness.hh" #include "mgm/bulk-request/interface/RealMgmFileSystemInterface.hh" #include "mgm/bulk-request/prepare/manager/BulkRequestPrepareManager.hh" #include "mgm/bulk-request/dao/proc/ProcDirectoryBulkRequestLocations.hh" #include "mgm/bulk-request/response/QueryPrepareResponse.hh" #include "mgm/bulk-request/prepare/query-prepare/QueryPrepareResult.hh" #include "mgm/bulk-request/dao/proc/cleaner/BulkRequestProcCleaner.hh" #include "mgm/bulk-request/utils/json/QueryPrepareResponseJson.hh" #include "mgm/http/rest-api/handler/tape/TapeRestHandler.hh" #include "mgm/http/rest-api/manager/RestApiManager.hh" #include "mgm/utils/AttrHelper.hh" #ifdef __APPLE__ #define ECOMM 70 #endif #ifndef S_IAMB #define S_IAMB 0x1FF #endif // Initialize static variables XrdSysError* XrdMgmOfs::eDest; thread_local eos::common::LogId XrdMgmOfs::tlLogId; XrdSysError gMgmOfsEroute(0); XrdOucTrace gMgmOfsTrace(&gMgmOfsEroute); XrdMgmOfs* gOFS = 0; const char* k_mdino = "sys.eos.mdino"; const char* k_nlink = "sys.eos.nlink"; // Set the version information XrdVERSIONINFO(XrdSfsGetFileSystem, MgmOfs); XrdVERSIONINFO(XrdSfsGetFileSystem2, MgmOfs); //------------------------------------------------------------------------------ // Convert NamespaceState to string //------------------------------------------------------------------------------ std::string namespaceStateToString(NamespaceState st) { switch (st) { case NamespaceState::kDown: { return "down"; } case NamespaceState::kBooting: { return "booting"; } case NamespaceState::kBooted: { return "booted"; } case NamespaceState::kFailed: { return "failed"; } case NamespaceState::kCompacting: { return "compacting"; } } return "(invalid)"; } extern "C" { #ifdef COVERAGE_BUILD // Forward declaration of gcov flush API extern "C" void __gcov_flush(); #endif //------------------------------------------------------------------------------ // XrdAccAuthorizeObject() is called to obtain an instance of the auth object // that will be used for all subsequent authorization decisions. If it returns // a null pointer; initialization fails and the program exits. The args are: // // lp -> XrdSysLogger to be tied to an XrdSysError object for messages // cfn -> The name of the configuration file // parm -> Paramexters specified on the authlib directive. If none it is zero. //------------------------------------------------------------------------------ XrdAccAuthorize* XrdAccAuthorizeObject(XrdSysLogger* lp, const char* cfn, const char* parm); //------------------------------------------------------------------------------ //! Filesystem Plugin factory function //! //! @param native_fs (not used) //! @param lp the logger object //! @param configfn the configuration file name //! //! @returns configures and returns our MgmOfs object //------------------------------------------------------------------------------ XrdSfsFileSystem* XrdSfsGetFileSystem(XrdSfsFileSystem* native_fs, XrdSysLogger* lp, const char* configfn) { if (gOFS) { // File system object already initalized return gOFS; } gMgmOfsEroute.SetPrefix("MgmOfs_"); gMgmOfsEroute.logger(lp); static XrdMgmOfs myFS(&gMgmOfsEroute); XrdOucString vs = "MgmOfs (meta data redirector) "; vs += VERSION; gMgmOfsEroute.Say("++++++ (c) 2015 CERN/IT-DSS ", vs.c_str()); // Initialize the subsystems if (!myFS.Init(gMgmOfsEroute)) { return nullptr; } // Disable XRootd log rotation lp->setRotate(0); gOFS = &myFS; // Did we pass the "-2" option to the plugin in the config file? gOFS->IsFileSystem2 = false; // By default enable stalling and redirection gOFS->IsStall = true; gOFS->IsRedirect = true; myFS.ConfigFN = (configfn && *configfn ? strdup(configfn) : nullptr); if (myFS.Configure(gMgmOfsEroute)) { return nullptr; } // Initialize authorization plugin XrdMgmAuthz gOFS->mMgmAuthz = (XrdMgmAuthz*) XrdAccAuthorizeObject(lp, configfn, nullptr); if (!gOFS->mMgmAuthz) { return nullptr; } return gOFS; } //------------------------------------------------------------------------------ //! Filesystem Plugin factory function //! //! @description FileSystem2 version, to allow passing configuration info back //! to XRootD. Configure with: xrootd.fslib -2 libXrdEosMgm.so //! //! @param native_fs (not used) //! @param lp the logger object //! @param configfn the configuration file name //! @param envP pass configuration information back to XrdXrootd //! //! @returns configures and returns our MgmOfs object //------------------------------------------------------------------------------ XrdSfsFileSystem* XrdSfsGetFileSystem2(XrdSfsFileSystem* native_fs, XrdSysLogger* lp, const char* configfn, XrdOucEnv* envP) { // Initialise gOFS XrdSfsGetFileSystem(native_fs, lp, configfn); gOFS->IsFileSystem2 = true; // Tell XRootD that MgmOfs implements the Prepare plugin if (envP != nullptr) { envP->Put("XRD_PrepHandler", "1"); } return gOFS; } } // extern "C" /******************************************************************************/ /* MGM Meta Data Interface */ /******************************************************************************/ //------------------------------------------------------------------------------ // Constructor MGM Ofs //------------------------------------------------------------------------------ XrdMgmOfs::XrdMgmOfs(XrdSysError* ep): ConfigFN(0), ConfEngine(0), mCapabilityValidity(3600), mMgmMessaging(nullptr), ManagerPort(1094), LinuxStatsStartup{0}, HostName(0), HostPref(0), mNamespaceState(NamespaceState::kDown), mFileInitTime(0), mTotalInitTime(time(nullptr)), mStartTime(time(nullptr)), Shutdown(false), mBootFileId(0), mBootContainerId(0), IsRedirect(true), IsStall(true), mAuthorize(false), mAuthLib(""), mTapeEnabled(false), MgmRedirector(false), mErrLogEnabled(true), eosDirectoryService(0), eosFileService(0), eosView(0), eosFsView(0), eosContainerAccounting(0), eosSyncTimeAccounting(0), mFrontendPort(0), mNumAuthThreads(0), zMQ(nullptr), mExtAuthz(nullptr), MgmStatsPtr(new eos::mgm::Stat()), MgmStats(*MgmStatsPtr), mFsckEngine(new Fsck()), mMaster(nullptr), mRouting(new eos::mgm::PathRouting()), mConverterDriver(), mHttpd(nullptr), GRPCd(nullptr), WNCd(nullptr), mRestGrpcSrv(nullptr), mLRUEngine(new eos::mgm::LRU()), WFEPtr(new eos::mgm::WFE()), WFEd(*WFEPtr), UTF8(false), mFstGwHost(""), mFstGwPort(0), mQdbCluster(""), mHttpdPort(8000), mFusexPort(1100), mGRPCPort(50051), mWncPort(50052), mRestGrpcPort(50054), mFidTracker(std::chrono::seconds(600), std::chrono::seconds(3600)), mDoneOrderlyShutdown(false), mXrdBuffPool(2 * eos::common::KB, 2 * eos::common::MB, 8, 64), mJeMallocHandler(new eos::common::JeMallocHandler()) { eDest = ep; ConfigFN = 0; enforceRecycleBin = false; if (getenv("EOS_MGM_HTTP_PORT")) { mHttpdPort = strtol(getenv("EOS_MGM_HTTP_PORT"), 0, 10); } if (getenv("EOS_MGM_FUSEX_PORT")) { mFusexPort = strtol(getenv("EOS_MGM_FUSEX_PORT"), 0, 10); } if (getenv("EOS_MGM_GRPC_PORT")) { mGRPCPort = strtol(getenv("EOS_MGM_GRPC_PORT"), 0, 10); } if (getenv("EOS_MGM_WNC_PORT")) { mWncPort = strtol(getenv("EOS_MGM_WNC_PORT"), 0, 10); } if (getenv("EOS_MGM_FUSE_BOOKING_SIZE")) { mFusePlacementBooking = strtol(getenv("EOS_MGM_FUSE_BOOKING_SIZE"), 0, 10); } else { mFusePlacementBooking = 5 * 1024 * 1024 * 1024ll; } mRestApiManager = std::make_unique(); { // Run a dummy command so that the ShellExecutor is forked before any XrdCl // is initialized. Otherwise it might segv due to the following bug: // https://github.com/xrootd/xrootd/issues/1515 // eos::common::ShellCmd dummy_cmd("uname -a"); } eos::common::LogId::SetSingleShotLogId(); mZmqContext = new zmq::context_t(1); IoStats.reset(new eos::mgm::Iostat()); mHttpd.reset(new eos::mgm::HttpServer(mHttpdPort)); if (mGRPCPort) { GRPCd.reset(new eos::mgm::GrpcServer(mGRPCPort)); } if (mWncPort) { WNCd.reset(new eos::mgm::GrpcWncServer(mWncPort)); } if (getenv("EOS_MGM_REST_GRPC_PORT")) { mRestGrpcPort = strtol(getenv("EOS_MGM_REST_GRPC_PORT"), 0, 10); } if (mRestGrpcPort) { const char* ptr = getenv("EOS_MGM_ENABLE_REST_API"); if (ptr && strncmp(ptr, "1", 1) == 0) { mRestGrpcSrv.reset(new eos::mgm::GrpcRestGwServer(mRestGrpcPort)); } } EgroupRefresh.reset(new eos::mgm::Egroup()); Recycler.reset(new eos::mgm::Recycle()); DeviceTracker.reset(new eos::mgm::Devices()); mTapeGcMgm.reset(new tgc::RealTapeGcMgm(*this)); mTapeGc.reset(new tgc::MultiSpaceTapeGc(*mTapeGcMgm)); mFsScheduler.reset(new eos::mgm::placement::FSScheduler()); } //------------------------------------------------------------------------------ // Destructor //------------------------------------------------------------------------------ XrdMgmOfs::~XrdMgmOfs() { OrderlyShutdown(); eos_warning("%s", "msg=\"finished destructor\""); if (HostName) { free(HostName); } } //------------------------------------------------------------------------------ // Destroy member objects and clean up threads //------------------------------------------------------------------------------ void XrdMgmOfs::OrderlyShutdown() { if (mDoneOrderlyShutdown) { eos_warning("%s", "msg=\"skipping already done shutdown procedure\""); return; } auto start_ts = std::chrono::steady_clock::now(); mDoneOrderlyShutdown = true; { eos_warning("%s", "msg=\"set stall rule of all ns operations\""); eos::common::RWMutexWriteLock lock(Access::gAccessMutex); Access::gStallRules[std::string("*")] = "300"; } gOFS->mTracker.SetAcceptingRequests(false); gOFS->mTracker.SpinUntilNoRequestsInFlight(true, std::chrono::milliseconds(100)); eos_warning("%s", "msg=\"stopping fs listener thread\""); auto stop_fsconfiglistener = std::thread([&]() { mFsConfigTid.join(); }); // We now need to signal to the FsConfigListener thread to unblock it XrdMqSharedObjectChangeNotifier::Subscriber* subscriber = ObjectNotifier.GetSubscriberFromCatalog("fsconfiglistener", false); if (subscriber) { std::this_thread::sleep_for(std::chrono::milliseconds(500)); XrdSysMutexHelper lock(subscriber->mSubjMtx); subscriber->mSubjSem.Post(); } stop_fsconfiglistener.join(); eos_warning("%s", "msg=\"disable configuration engine autosave\""); ConfEngine->SetAutoSave(false); FsView::gFsView.SetConfigEngine(nullptr); eos_warning("%s", "msg=\"stop routing\""); if (mRouting) { mRouting.reset(); } eos_warning("%s", "msg=\"stopping the stats collecting thread\""); eos_warning("%s", "msg=\"stopping archive submitter\""); mSubmitterTid.join(); if (mZmqContext) { eos_warning("%s", "msg=\"closing the ZMQ context\""); // TODO: for now removing this since it breaks Centos8/9 shutdown // mZmqContext->close(); eos_warning("%s", "msg=\"joining the master and worker auth threads\""); mAuthMasterTid.join(); for (const auto& auth_tid : mVectTid) { XrdSysThread::Join(auth_tid, nullptr); } mVectTid.clear(); eos_warning("%s", "msg=\"deleting the ZMQ context\""); delete mZmqContext; } eos_warning("%s", "msg=\"stopping and deleting the Converter engine\""); mConverterDriver.reset(); eos_warning("%s", "msg=\"stopping central drainning\""); mDrainEngine.Stop(); eos_warning("%s", "msg=\"stopping geotree engine updater\""); mGeoTreeEngine->StopUpdater(); if (IoStats) { eos_warning("%s", "msg=\"stopping and deleting IoStats\""); IoStats.reset(); } eos_warning("%s", "msg=\"stopping fusex server\""); zMQ->gFuseServer.shutdown(); // TODO: for now removing this since it breaks Centos8/9 shutdown /* if (zMQ) { delete zMQ; zMQ = nullptr; } */ eos_warning("%s", "msg=\"stopping FSCK service\""); mFsckEngine->Stop(); eos_warning("%s", "msg=\"stopping messaging\""); if (mMgmMessaging) { delete mMgmMessaging; mMgmMessaging = nullptr; } if (Recycler) { eos_warning("%s", "msg=\"stopping and deleting recycler server\""); Recycler.reset(); } if (WFEPtr) { eos_warning("%s", "msg=\"stopping and deleting the WFE engine\""); WFEPtr.reset(); } eos_warning("%s", "msg=\"stopping and deleting the LRU engine\""); mLRUEngine.reset(); if (EgroupRefresh) { eos_warning("%s", "msg=\"stopping and deleting egroup refresh thread\""); EgroupRefresh.reset(); } if (mHttpd) { eos_warning("%s", "msg=\"stopping and deleting HTTP daemon\""); mHttpd.reset(); } if (WNCd) { eos_warning("%s", "msg=\"stopping gRPC server for EOS-wnc\""); WNCd.reset(); } eos_warning("%s", "msg=\"stopping the shared object notifier thread\""); ObjectNotifier.Stop(); eos_warning("%s", "msg=\"cleanup quota information\""); (void) Quota::CleanUp(); eos_warning("%s", "msg=\"graceful shutdown of the FsView\""); FsView::gFsView.StopHeartBeat(); FsView::gFsView.Clear(); if (mErrLogEnabled) { eos_warning("%s", "msg=\"error log kill\""); std::string errorlogkillline = "pkill -9 -f \"eos -b console log _MGMID_\""; int rrc = system(errorlogkillline.c_str()); if (WEXITSTATUS(rrc)) { eos_static_info("%s returned %d", errorlogkillline.c_str(), rrc); } } if (gOFS->mNamespaceState == NamespaceState::kBooted) { eos_warning("%s", "msg=\"finalizing namespace views\""); try { gOFS->eosDirectoryService = nullptr; gOFS->eosFileService = nullptr; gOFS->eosView = nullptr; gOFS->eosFsView = nullptr; gOFS->eosContainerAccounting = nullptr; gOFS->eosSyncTimeAccounting = nullptr; gOFS->namespaceGroup.reset(); } catch (eos::MDException& e) { // we don't really care about any exception here! } } eos_warning("%s", "msg=\"stopping master-slave supervisor thread\""); if (mMaster) { mMaster.reset(); } auto end_ts = std::chrono::steady_clock::now(); eos_warning("msg=\"finished orderly shutdown in %llu seconds\"", std::chrono::duration_cast (end_ts - start_ts).count()); } //------------------------------------------------------------------------------ // This is just kept to be compatible with standard OFS plugins, but it is not // used for the moment. //------------------------------------------------------------------------------ bool XrdMgmOfs::Init(XrdSysError& ep) { return true; } //------------------------------------------------------------------------------ // Return a MGM directory object //------------------------------------------------------------------------------ XrdSfsDirectory* XrdMgmOfs::newDir(char* user, int MonID) { return (XrdSfsDirectory*)new XrdMgmOfsDirectory(user, MonID); } //------------------------------------------------------------------------------ // Return MGM file object //------------------------------------------------------------------------------ XrdSfsFile* XrdMgmOfs::newFile(char* user, int MonID) { return (XrdSfsFile*)new XrdMgmOfsFile(user, MonID); } //------------------------------------------------------------------------------ // Notify filesystem that a client has disconnected //------------------------------------------------------------------------------ void XrdMgmOfs::Disc(const XrdSecEntity* client) { if (client) { ProcInterface::DropSubmittedCmd(client->tident); } } //------------------------------------------------------------------------------ // Implementation Source Code Includes //------------------------------------------------------------------------------ #include "XrdMgmOfs/Access.cc" #include "XrdMgmOfs/Attr.cc" #include "XrdMgmOfs/Auth.cc" #include "XrdMgmOfs/Chksum.cc" #include "XrdMgmOfs/Chmod.cc" #include "XrdMgmOfs/Chown.cc" #include "XrdMgmOfs/Coverage.cc" #include "XrdMgmOfs/DeleteExternal.cc" #include "XrdMgmOfs/Exists.cc" #include "XrdMgmOfs/Find.cc" #include "XrdMgmOfs/FsConfigListener.cc" #include "XrdMgmOfs/ErrorLogListener.cc" #include "XrdMgmOfs/Fsctl.cc" #include "XrdMgmOfs/Link.cc" #include "XrdMgmOfs/Mkdir.cc" #include "XrdMgmOfs/PathMap.cc" #include "XrdMgmOfs/QoS.cc" #include "XrdMgmOfs/Remdir.cc" #include "XrdMgmOfs/Rename.cc" #include "XrdMgmOfs/Rm.cc" #include "XrdMgmOfs/SharedPath.cc" #include "XrdMgmOfs/ShouldRedirect.cc" #include "XrdMgmOfs/ShouldRoute.cc" #include "XrdMgmOfs/ShouldStall.cc" #include "XrdMgmOfs/Shutdown.cc" #include "XrdMgmOfs/Stacktrace.cc" #include "XrdMgmOfs/Stat.cc" #include "XrdMgmOfs/Stripes.cc" #include "XrdMgmOfs/Touch.cc" #include "XrdMgmOfs/Utimes.cc" #include "XrdMgmOfs/Version.cc" //------------------------------------------------------------------------------ // Test for stall rule //------------------------------------------------------------------------------ bool XrdMgmOfs::HasStall(const char* path, const char* rule, int& stalltime, XrdOucString& stallmsg) { if (!rule) { return false; } eos::common::RWMutexReadLock lock(Access::gAccessMutex); if (Access::gStallRules.count(std::string(rule))) { stalltime = atoi(Access::gStallRules[std::string(rule)].c_str()); stallmsg = "Attention: you are currently hold in this instance and each request is stalled for "; stallmsg += (int) stalltime; stallmsg += " seconds after an errno of type: "; stallmsg += rule; eos_static_info("info=\"stalling\" path=\"%s\" errno=\"%s\"", path, rule); return true; } else { return false; } } //------------------------------------------------------------------------------ // Test for redirection rule //------------------------------------------------------------------------------ bool XrdMgmOfs::HasRedirect(const char* path, const char* rule, std::string& host, int& port) { if (!rule) { return false; } std::string srule = rule; eos::common::RWMutexReadLock lock(Access::gAccessMutex); if (Access::gRedirectionRules.count(srule)) { std::string delimiter = ":"; std::vector tokens; eos::common::StringConversion::Tokenize(Access::gRedirectionRules[srule], tokens, delimiter); if (tokens.size() == 1) { host = tokens[0].c_str(); port = 1094; } else { host = tokens[0].c_str(); port = atoi(tokens[1].c_str()); if (port == 0) { port = 1094; } } eos_static_info("info=\"redirect\" path=\"%s\" host=%s port=%d errno=%s", path, host.c_str(), port, rule); if (srule == "ENONET") { gOFS->MgmStats.Add("RedirectENONET", 0, 0, 1); } else if (srule == "ENOENT") { gOFS->MgmStats.Add("RedirectENOENT", 0, 0, 1); } else if (srule == "ENETUNREACH") { gOFS->MgmStats.Add("RedirectENETUNREACH", 0, 0, 1); } return true; } else { return false; } } //------------------------------------------------------------------------------ // Return the version of the MGM software //------------------------------------------------------------------------------ const char* XrdMgmOfs::getVersion() { static XrdOucString FullVersion = XrdVERSION; FullVersion += " MgmOfs "; FullVersion += VERSION; return FullVersion.c_str(); } //------------------------------------------------------------------------------------- // Prepare a file or query the status of a previous prepare request //------------------------------------------------------------------------------------- int XrdMgmOfs::prepare(XrdSfsPrep& pargs, XrdOucErrInfo& error, const XrdSecEntity* client) { if (pargs.opts & Prep_QUERY) { return _prepare_query(pargs, error, client); } else { return _prepare(pargs, error, client); } } //-------------------------------------------------------------------------------------- // Prepare a file // // EOS will call a prepare workflow if defined //-------------------------------------------------------------------------------------- int XrdMgmOfs::_prepare(XrdSfsPrep& pargs, XrdOucErrInfo& error, const XrdSecEntity* client) { USE_EOSBULKNAMESPACE; PrepareManager pm(std::make_unique(gOFS)); int prepareRetCode = pm.prepare(pargs, error, client); return prepareRetCode; } //------------------------------------------------------------------------------------------- // Query the status of a previous prepare request //------------------------------------------------------------------------------------------- int XrdMgmOfs::_prepare_query(XrdSfsPrep& pargs, XrdOucErrInfo& error, const XrdSecEntity* client) { USE_EOSBULKNAMESPACE; RealMgmFileSystemInterface mgmFsInterface(gOFS); PrepareManager pm(std::make_unique(gOFS)); std::unique_ptr result = pm.queryPrepare(pargs, error, client); if (result->hasQueryPrepareFinished()) { //Create the JSON response bulk::QueryPrepareResponseJson jsonQueryPrepareResponse; std::stringstream json_ss; auto queryPrepareResponse = result->getResponse(); queryPrepareResponse->setJsonifier( std::make_shared()); jsonQueryPrepareResponse.jsonify(queryPrepareResponse.get(), json_ss); // Send the reply. XRootD requires that we put it into a buffer that can be released with free(). auto json_len = json_ss.str().length(); char* json_buf = reinterpret_cast(malloc(json_len)); strncpy(json_buf, json_ss.str().c_str(), json_len); // Ownership of this buffer is passed to xrd_buff which has a Recycle() method. XrdOucBuffer* xrd_buff = new XrdOucBuffer(json_buf, json_len); // Ownership of xrd_buff is passed to error. Note that as we are returning SFS_DATA, the first // parameter is the buffer length rather than an error code. error.setErrInfo(xrd_buff->BuffSize(), xrd_buff); } return result->getReturnCode(); } //------------------------------------------------------------------------------ //! Truncate a file (not supported in EOS, only via the file interface) //------------------------------------------------------------------------------ int XrdMgmOfs::truncate(const char*, XrdSfsFileOffset, XrdOucErrInfo& error, const XrdSecEntity* client, const char* path) { static const char* epname = "truncate"; const char* tident = error.getErrUser(); // use a thread private vid eos::common::VirtualIdentity vid; EXEC_TIMING_BEGIN("IdMap"); eos::common::Mapping::IdMap(client, 0, tident, vid); EXEC_TIMING_END("IdMap"); gOFS->MgmStats.Add("IdMap", vid.uid, vid.gid, 1); ACCESSMODE_W; MAYSTALL; { const char* ininfo = ""; MAYREDIRECT; } gOFS->MgmStats.Add("Truncate", vid.uid, vid.gid, 1); return Emsg(epname, error, EOPNOTSUPP, "truncate", path); } //------------------------------------------------------------------------------ // Return error message //------------------------------------------------------------------------------ int XrdMgmOfs::Emsg(const char* pfx, XrdOucErrInfo& einfo, int ecode, const char* op, const char* target) { char etext[128], buffer[4096]; // Get the reason for the error if (ecode < 0) { ecode = -ecode; } if (eos::common::strerror_r(ecode, etext, sizeof(etext))) { sprintf(etext, "reason unknown (%d)", ecode); } // Format the error message snprintf(buffer, sizeof(buffer), "Unable to %s %s; %s", op, target, etext); if ((ecode == EIDRM) || (ecode == ENODATA)) { eos_debug("Unable to %s %s; %s", op, target, etext); } else { if ((!strcmp(op, "stat")) || (((!strcmp(pfx, "attr_get")) || (!strcmp(pfx, "attr_ls")) || (!strcmp(pfx, "FuseX"))) && (ecode == ENOENT))) { eos_debug("Unable to %s %s; %s", op, target, etext); } else { eos_err("Unable to %s %s; %s", op, target, etext); } } // Print it out if debugging is enabled #ifndef NODEBUG // XrdMgmOfs::eDest->Emsg(pfx, buffer); #endif // Place the error message in the error object and return einfo.setErrInfo(ecode, buffer); return SFS_ERROR; } //------------------------------------------------------------------------------ // Create stall response //------------------------------------------------------------------------------ int XrdMgmOfs::Stall(XrdOucErrInfo& error, int stime, const char* msg) { XrdOucString smessage = msg; smessage += "; come back in "; smessage += stime; smessage += " seconds!"; EPNAME("Stall"); const char* tident = error.getErrUser(); ZTRACE(delay, "Stall " << stime << ": " << smessage.c_str()); // Place the error message in the error object and return error.setErrInfo(0, smessage.c_str()); return stime; } //------------------------------------------------------------------------------ // Create redirect response //------------------------------------------------------------------------------ int XrdMgmOfs::Redirect(XrdOucErrInfo& error, const char* host, int& port, const char* path, bool collapse) { EPNAME("Redirect"); const char* tident = error.getErrUser(); ZTRACE(delay, "Redirect " << host << ":" << port); if (collapse && strlen(path)) { std::string url = "root://"; url += host; url += ":"; url += std::to_string(port); url += "/"; url += path; error.setErrInfo(~(~(-1) | kXR_collapseRedir), url.c_str()); } else { // Place the error message in the error object and return error.setErrInfo(port, host); } return SFS_REDIRECT; } //------------------------------------------------------------------------------ // Start a thread that will queue, build and submit backup operations to the // archiver daemon. //------------------------------------------------------------------------------ void XrdMgmOfs::ArchiveSubmitterThread(ThreadAssistant& assistant) noexcept { ProcCommand pcmd; std::string job_opaque; XrdOucString std_out, std_err; int max, running, pending; eos::common::VirtualIdentity root_vid = eos::common::VirtualIdentity::Root(); eos_debug("msg=\"starting archive/backup submitter thread\""); std::ostringstream cmd_json; cmd_json << "{\"cmd\": \"stats\", " << "\"opt\": \"\", " << "\"uid\": \"0\", " << "\"gid\": \"0\" }"; while (!assistant.terminationRequested()) { { XrdSysMutexHelper lock(mJobsQMutex); if (!mPendingBkps.empty()) { // Check if archiver has slots available if (!pcmd.ArchiveExecuteCmd(cmd_json.str())) { std_out.resize(0); std_err.resize(0); pcmd.AddOutput(std_out, std_err); if ((sscanf(std_out.c_str(), "max=%i running=%i pending=%i", &max, &running, &pending) == 3)) { while ((running + pending < max) && !mPendingBkps.empty()) { running++; job_opaque = mPendingBkps.back(); mPendingBkps.pop_back(); job_opaque += "&mgm.backup.create=1"; if (pcmd.open("/proc/admin", job_opaque.c_str(), root_vid, 0)) { pcmd.AddOutput(std_out, std_err); eos_err("failed backup, msg=\"%s\"", std_err.c_str()); } } } } else { eos_err("failed to send stats command to archive daemon"); } } } assistant.wait_for(std::chrono::seconds(5)); } eos_warning("%s", "msg=\"shutdown archive submitter\""); } //------------------------------------------------------------------------------ // Submit backup job //------------------------------------------------------------------------------ bool XrdMgmOfs::SubmitBackupJob(const std::string& job_opaque) { XrdSysMutexHelper lock(mJobsQMutex); auto it = std::find(mPendingBkps.begin(), mPendingBkps.end(), job_opaque); if (it == mPendingBkps.end()) { mPendingBkps.push_front(job_opaque); return true; } return false; } //------------------------------------------------------------------------------ // Get vector of pending backups //------------------------------------------------------------------------------ std::vector XrdMgmOfs::GetPendingBkps() { std::vector bkps; XrdSysMutexHelper lock(mJobsQMutex); for (auto it = mPendingBkps.begin(); it != mPendingBkps.end(); ++it) { XrdOucEnv opaque(it->c_str()); bkps.emplace_back("N/A", "N/A", opaque.Get("mgm.backup.dst"), "backup", "pending at MGM"); } return bkps; } //------------------------------------------------------------------------------ // Discover/search for a service provided to the plugins by the platform //------------------------------------------------------------------------------ int32_t XrdMgmOfs::DiscoverPlatformServices(const char* svc_name, void* opaque) { std::string sname = svc_name; if (sname == "NsViewMutex") { PF_Discovery_Service* ns_lock = (PF_Discovery_Service*)(opaque); // TODO (esindril): Use this code when we drop SLC6 support @todo //std::string htype = std::to_string(typeid(&gOFS->eosViewRWMutex).hash_code()); std::string htype = "eos::common::RWMutex*"; ns_lock->objType = (char*)calloc(htype.length() + 1, sizeof(char)); (void) strcpy(ns_lock->objType, htype.c_str()); ns_lock->ptrService = static_cast(&gOFS->eosViewRWMutex); } else { return EINVAL; } return 0; } //------------------------------------------------------------------------------ // Cast a change message to all fusex clients about a deletion of an entry //------------------------------------------------------------------------------ void XrdMgmOfs::FuseXCastDeletion(eos::ContainerIdentifier id, const std::string& name) { struct timespec pt_mtime { 0, 0 }; gOFS->zMQ->gFuseServer.Cap().BroadcastDeletionFromExternal( id.getUnderlyingUInt64(), name, pt_mtime); } //------------------------------------------------------------------------------ void XrdMgmOfs::FuseXCastRefresh(eos::ContainerIdentifier id, eos::ContainerIdentifier parentid) { gOFS->zMQ->gFuseServer.Cap().BroadcastRefreshFromExternal( id.getUnderlyingUInt64(), parentid.getUnderlyingUInt64()); } void XrdMgmOfs::FuseXCastRefresh(eos::FileIdentifier id, eos::ContainerIdentifier parentid) { gOFS->zMQ->gFuseServer.Cap().BroadcastRefreshFromExternal( eos::common::FileId::FidToInode(id.getUnderlyingUInt64()), parentid.getUnderlyingUInt64()); } //------------------------------------------------------------------------------ // Cast a MD object to clients //------------------------------------------------------------------------------ void XrdMgmOfs::FuseXCastMD(eos::ContainerIdentifier id, eos::ContainerIdentifier parentid, struct timespec& pt_mtime, bool lock) { eos::fusex::md dir; static eos::common::VirtualIdentity root_vid = eos::common::VirtualIdentity::Root(); if (!gOFS->zMQ->gFuseServer.FillContainerMD(id.getUnderlyingUInt64(), dir, root_vid, lock)) { gOFS->zMQ->gFuseServer.Cap().BroadcastMD(dir, dir.md_ino(), dir.md_pino(), dir.clock(), pt_mtime); } } void XrdMgmOfs::FuseXCastMD(eos::FileIdentifier id, eos::ContainerIdentifier parentid, struct timespec& pt_mtime, bool lock ) { eos::fusex::md file; static eos::common::VirtualIdentity root_vid = eos::common::VirtualIdentity::Root(); if (gOFS->zMQ->gFuseServer.FillFileMD(eos::common::FileId::FidToInode( id.getUnderlyingUInt64()), file, root_vid, lock)) { gOFS->zMQ->gFuseServer.Cap().BroadcastMD(file, file.md_ino(), file.md_pino(), file.clock(), pt_mtime); } } //------------------------------------------------------------------------------ // Check if namespace is booted //------------------------------------------------------------------------------ bool XrdMgmOfs::IsNsBooted() const { return ((mNamespaceState == NamespaceState::kBooted) || (mNamespaceState == NamespaceState::kCompacting)); } //------------------------------------------------------------------------------ // Convert error code to string representation //------------------------------------------------------------------------------ std::string XrdMgmOfs::MacroStringError(int errcode) { if (errcode == ENOTCONN) { return "ENOTCONN"; } else if (errcode == EPROTO) { return "EPROTO"; } else if (errcode == EAGAIN) { return "EAGAIN"; } else { return "EINVAL"; } } //------------------------------------------------------------------------------ // Write report record for final deletion (to IoStats) //------------------------------------------------------------------------------ void XrdMgmOfs::WriteRmRecord(const std::shared_ptr& fmd) { struct timespec ts_now; char report[16384]; eos::IFileMD::ctime_t ctime; eos::IFileMD::ctime_t mtime; fmd->getCTime(ctime); fmd->getMTime(mtime); eos::common::Timing::GetTimeSpec(ts_now); snprintf(report, sizeof(report) - 1, "log=%s&" "host=%s&fid=%llu&fxid=%08llx&" "ruid=%u&rgid=%u&" "del_ts=%lu&del_tns=%lu&" "dc_ts=%lu&dc_tns=%lu&" "dm_ts=%lu&dm_tns=%lu&" "dsize=%lu&sec.app=rm", this->logId, gOFS->ManagerId.c_str(), (unsigned long long) fmd->getId(), (unsigned long long) fmd->getId(), fmd->getCUid(), fmd->getCGid(), ts_now.tv_sec, ts_now.tv_nsec, ctime.tv_sec, ctime.tv_nsec, mtime.tv_sec, mtime.tv_nsec, fmd->getSize()); std::string record = report; if (IoStats) { IoStats->WriteRecord(record); } } //------------------------------------------------------------------------------ // Write report record for recycle bin deletion (to IoStats) //------------------------------------------------------------------------------ void XrdMgmOfs::WriteRecycleRecord(const std::shared_ptr& fmd) { struct timespec ts_now; char report[16384]; eos::IFileMD::ctime_t ctime; eos::IFileMD::ctime_t mtime; fmd->getCTime(ctime); fmd->getMTime(mtime); eos::common::Timing::GetTimeSpec(ts_now); snprintf(report, sizeof(report) - 1, "log=%s&" "host=%s&fid=%llu&fxid=%08llx&" "ruid=%u&rgid=%u&" "del_ts=%lu&del_tns=%lu&" "dc_ts=%lu&dc_tns=%lu&" "dm_ts=%lu&dm_tns=%lu&" "dsize=%lu&sec.app=recycle", this->logId, gOFS->ManagerId.c_str(), (unsigned long long) fmd->getId(), (unsigned long long) fmd->getId(), fmd->getCUid(), fmd->getCGid(), ts_now.tv_sec, ts_now.tv_nsec, ctime.tv_sec, ctime.tv_nsec, mtime.tv_sec, mtime.tv_nsec, fmd->getSize()); std::string record = report; if (IoStats) { IoStats->WriteRecord(record); } } //------------------------------------------------------------------------------ // Check if a host was tried already in a given URL with a given error //------------------------------------------------------------------------------ bool XrdMgmOfs::Tried(XrdCl::URL& url, std::string& host, const char* terr) { XrdCl::URL::ParamsMap params = url.GetParams(); std::string tried_hosts = params["tried"]; std::string tried_rc = params["triedrc"]; std::vector v_hosts; std::vector v_rc; eos::common::StringConversion::Tokenize(tried_hosts, v_hosts, ","); eos::common::StringConversion::Tokenize(tried_rc, v_rc, ","); for (size_t i = 0; i < v_hosts.size(); ++i) { if ((v_hosts[i] == host) && (i < v_rc.size()) && ((v_rc[i] == std::string(terr)) || (std::string(terr) == "*"))) { return true; } } return false; } //------------------------------------------------------------------------------ // Wait until namespace is booted - thread cancellation point //------------------------------------------------------------------------------ void XrdMgmOfs::WaitUntilNamespaceIsBooted() { XrdSysThread::SetCancelDeferred(); while (gOFS->mNamespaceState != NamespaceState::kBooted) { std::this_thread::sleep_for(std::chrono::seconds(1)); XrdSysThread::CancelPoint(); } } //------------------------------------------------------------------------------ // Wait until namespace is booted //------------------------------------------------------------------------------ void XrdMgmOfs::WaitUntilNamespaceIsBooted(ThreadAssistant& assistant) { while (gOFS->mNamespaceState != NamespaceState::kBooted) { assistant.wait_for(std::chrono::seconds(1)); if (assistant.terminationRequested()) { break; } } } //------------------------------------------------------------------------------ // Return string representation of prepare options //------------------------------------------------------------------------------ std::string XrdMgmOfs::prepareOptsToString(const int opts) { std::ostringstream result; const int priority = opts & Prep_PMASK; switch (priority) { case Prep_PRTY0: result << "PRTY0"; break; case Prep_PRTY1: result << "PRTY1"; break; case Prep_PRTY2: result << "PRTY2"; break; case Prep_PRTY3: result << "PRTY3"; break; default: result << "PRTYUNKNOWN"; } const int send_mask = 12; const int send = opts & send_mask; switch (send) { case 0: break; case Prep_SENDAOK: result << ",SENDAOK"; break; case Prep_SENDERR: result << ",SENDERR"; break; case Prep_SENDACK: result << ",SENDACK"; break; default: result << ",SENDUNKNOWN"; } if (opts & Prep_WMODE) { result << ",WMODE"; } if (opts & Prep_STAGE) { result << ",STAGE"; } if (opts & Prep_COLOC) { result << ",COLOC"; } if (opts & Prep_FRESH) { result << ",FRESH"; } #if (XrdMajorVNUM(XrdVNUMBER) == 4 && XrdMinorVNUM(XrdVNUMBER) >= 10) || XrdMajorVNUM(XrdVNUMBER) >= 5 if (opts & Prep_CANCEL) { result << ",CANCEL"; } if (opts & Prep_QUERY) { result << ",QUERY"; } if (opts & Prep_EVICT) { result << ",EVICT"; } #endif return result.str(); } //------------------------------------------------------------------------------ // Populate file error object with redirection information that can be // longer than 2kb. For this we need to use the XrdOucBuffer interface. //------------------------------------------------------------------------------ bool XrdMgmOfs::SetRedirectionInfo(XrdOucErrInfo& err_obj, const std::string& rdr_info, int rdr_port) { if (rdr_info.empty() || (rdr_port == 0)) { err_obj.setErrInfo(EINVAL, "no redirection info available"); return false; } // If size < 2kb just set it directly if (rdr_info.length() < 2 * 1024) { err_obj.setErrInfo(rdr_port, rdr_info.c_str()); return true; } // Otherwise use the XrdOucBuffPool to manage XrdOucBuffer objects that // can hold redirection info >= 2kb const uint32_t aligned_sz = eos::common::GetPowerCeil(rdr_info.length() + 1, 2 * eos::common::KB); XrdOucBuffer* buff = mXrdBuffPool.Alloc(aligned_sz); if (buff == nullptr) { eos_static_err("msg=\"requested redirection buffer allocation size too " "big\" req_sz=%llu max_sz=%i", rdr_info.length(), mXrdBuffPool.MaxSize()); err_obj.setErrInfo(EINVAL, "redirection buffer too big"); return false; } (void) strcpy(buff->Buffer(), rdr_info.c_str()); buff->SetLen(rdr_info.length() + 1); err_obj.setErrInfo(rdr_port, buff); return true; } //------------------------------------------------------------------------------ // Send query (XrdFileSystem::Query) to the given endpoint and collect the // repsonse //------------------------------------------------------------------------------ int XrdMgmOfs::SendQuery(const std::string& hostname, int port, const std::string& request, std::string& response, uint16_t timeout) { std::ostringstream oss; oss << "root://" << hostname << ":" << port << "/?xrd.wantprot=sss"; XrdCl::URL url(oss.str()); if (!url.IsValid()) { eos_static_err("msg=\"invalid url\" url=\"%s\"", oss.str().c_str()); return EINVAL; } XrdCl::Buffer arg; XrdCl::Buffer* raw_resp {nullptr}; XrdCl::FileSystem fs {url}; arg.FromString(request); XrdCl::XRootDStatus status = fs.Query(XrdCl::QueryCode::OpaqueFile, arg, raw_resp, timeout); std::unique_ptr resp(raw_resp); raw_resp = nullptr; if (!status.IsOK()) { eos_static_err("msg=\"failed query request\" request=\"%s\" status=\"%s\"", request.c_str(), status.ToStr().c_str()); return -1; } if (resp && resp->GetBuffer()) { response = resp->GetBuffer(); } return 0; } //---------------------------------------------------------------------------- // Broadcast query (XrdFileSystem::Query) to the given endpoints and collect // the responses //---------------------------------------------------------------------------- int XrdMgmOfs::BroadcastQuery(const std::string& request, std::set& endpoints, std::map>& responses, uint16_t timeout) { std::atomic g_retc = 0; // overall return code class QueryRespHandler: public XrdCl::ResponseHandler { public: //------------------------------------------------------------------------ //! Constructor //------------------------------------------------------------------------ QueryRespHandler(const std::string& endpoint, std::map>& responses, std::mutex& mutex, std::condition_variable& cv, std::atomic& retc): mEndpoint(endpoint), mRespMap(responses), mMutexMap(mutex), mCv(cv), mRetc(retc) {} //------------------------------------------------------------------------ //! Called when a response to associated request arrives or an error //! occurs //! //! @param status status of the request //! @param response an object associated with the response //------------------------------------------------------------------------ void HandleResponse(XrdCl::XRootDStatus* status, XrdCl::AnyObject* response) { int retc = 0; std::string resp; if (status->IsOK()) { if (response) { XrdCl::Buffer* buffer {nullptr}; response->Get(buffer); resp = buffer->GetBuffer(); } } else { retc = (status->errNo ? status->errNo : ENOMSG); resp = status->GetErrorMessage(); mRetc = 1; } if (response) { delete response; } delete status; { // Add info to the global map and notify main thread std::unique_lock lock(mMutexMap); mRespMap.emplace(mEndpoint, std::make_pair(retc, std::move(resp))); } mCv.notify_one(); } private: std::string mEndpoint; std::map>& mRespMap; std::mutex& mMutexMap; std::condition_variable& mCv; std::atomic& mRetc; }; // Collect all the FST endpoints if nothing specified if (endpoints.empty()) { endpoints = FsView::gFsView.CollectEndpoints("*"); } size_t num_resp = endpoints.size(); std::mutex mutex; std::condition_variable cv; std::map queries; for (const auto& ep : endpoints) { std::ostringstream oss; oss << "root://" << ep << "/?xrd.wantprot=sss"; XrdCl::URL url(oss.str()); if (!url.IsValid()) { eos_static_err("msg=\"invalid url\" url=\"%s\"", oss.str().c_str()); std::unique_lock lock(mutex); responses.emplace(ep, std::make_pair(EINVAL, "invalid url")); continue; } auto pair = queries.emplace(new XrdCl::FileSystem(url), new QueryRespHandler(ep, responses, mutex, cv, g_retc)); if (!pair.second) { eos_static_err("msg=\"failed to insert query\" endpoint=\"%s\"", ep.c_str()); std::unique_lock lock(mutex); responses.emplace(ep, std::make_pair(EINVAL, "failed query insert")); continue; } //! const_cast auto* fs = pair.first->first; auto* handler = pair.first->second; XrdCl::Buffer arg; arg.FromString(request); XrdCl::XRootDStatus status = fs->Query(XrdCl::QueryCode::OpaqueFile, arg, handler, timeout); if (!status.IsOK()) { eos_static_err("msg=\"failed to send query\" endpoint=\"%s\"", ep.c_str()); std::unique_lock lock(mutex); responses.emplace(ep, std::make_pair(EINVAL, "failed query send")); continue; } } { // Wait for all the responses to be received std::unique_lock lock(mutex); cv.wait(lock, [&] {return (num_resp == responses.size());}); } // Clean up memory for (const auto& elem : queries) { delete elem.first; delete elem.second; } return g_retc; } //------------------------------------------------------------------------------ // Send a resync command for a file identified by id and filesystem //------------------------------------------------------------------------------ int XrdMgmOfs::QueryResync(eos::common::FileId::fileid_t fid, eos::common::FileSystem::fsid_t fsid, bool force) { int fst_port; std::string fst_host; std::string fst_queue; { eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex); eos::mgm::FileSystem* fs = FsView::gFsView.mIdView.lookupByID(fsid); if (!fs) { eos_err("msg=\"no resync msg sent, no such file system\" fsid=%lu", fsid); return -1; } fst_host = fs->GetHost(); fst_queue = fs->GetQueue(); fst_port = fs->getCoreParams().getLocator().getPort(); } EXEC_TIMING_BEGIN("QueryResync"); gOFS->MgmStats.Add("QueryResync", vid.uid, vid.gid, 1); std::string request = SSTR("/?fst.pcmd=resync" << "&fst.resync.fsid=" << fsid << "&fst.resync.fxid=" << eos::common::FileId::Fid2Hex(fid) << "&fst.resync.force=" << force); std::string response; int query_retc = gOFS->SendQuery(fst_host, fst_port, request, response); (void) response; EXEC_TIMING_END("QueryResync"); return query_retc; } //------------------------------------------------------------------------------ // Remove file/container metadata object that was already deleted before // but now it's still in the namespace detached from any parent //------------------------------------------------------------------------------ bool XrdMgmOfs::RemoveDetached(uint64_t id, bool is_dir, bool force, std::string& msg) const { errno = 0; if (is_dir) { try { eos::common::RWMutexReadLock ns_rd_lock(gOFS->eosViewRWMutex); std::shared_ptr cont = gOFS->eosDirectoryService->getContainerMD(id); if (cont->getParentId()) { gOFS->eosDirectoryService->removeContainer(cont.get()); return true; } else { msg = "error: container is attached id=" + std::to_string(id); return false; } } catch (eos::MDException& e) { errno = e.getErrno(); eos_debug("msg=\"caught exception\" errno=%d msg=\"%s\"", e.getErrno(), e.getMessage().str().c_str()); msg = "error: " + e.getMessage().str() + '\n'; return false; } } else { try { eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex); eos::common::RWMutexWriteLock ns_wr_lock(gOFS->eosViewRWMutex); std::shared_ptr file = gOFS->eosFileService->getFileMD(id); if (file->getContainerId()) { // Double check if the parent container really exists. It could be // that the file is attached to a container which is already deleted. try { (void) gOFS->eosDirectoryService->getContainerMD(file->getContainerId()); msg = "error: file fxid=" + eos::common::FileId::Fid2Hex(id) + " is attached to cid=" + std::to_string(file->getContainerId()); return false; } catch (const eos::MDException& e) { // This means the parent container does not exist so we can safely // remove this file entry. } } // If any of the unlink locations is a file systems that doesn't exist // anymore then just remove it auto unlink_locs = file->getUnlinkedLocations(); for (const auto& uloc : unlink_locs) { if (FsView::gFsView.mIdView.lookupByID(uloc) == nullptr) { file->removeLocation(uloc); } } // If there are no more locations we can also delete the file object if (file->getUnlinkedLocations().empty()) { gOFS->eosFileService->removeFile(file.get()); msg = "info: file object removed from namespace"; } else { // Move the unlinked locations to the locations list and back so // that we notify the listener for disk deletion unlink_locs = file->getUnlinkedLocations(); for (const auto& uloc : unlink_locs) { file->addLocation(uloc); } file->unlinkAllLocations(); if (force) { gOFS->eosFileService->removeFile(file.get()); msg = "info: file force removed from namespace, best-effort disk " "deletion(s)"; } else { msg = "info: file locations unlinked, waiting for disk deletion(s)"; } } return true; } catch (eos::MDException& e) { errno = e.getErrno(); eos_debug("msg=\"caught exception\" errno=%d msg=\"%s\"", e.getErrno(), e.getMessage().str().c_str()); msg = "error: " + e.getMessage().str() + '\n'; return false; } } } //---------------------------------------------------------------------------- // Query to determine if current node is acting as master //---------------------------------------------------------------------------- int XrdMgmOfs::IsMaster(const char* path, const char* ininfo, XrdOucEnv& env, XrdOucErrInfo& error, eos::common::VirtualIdentity& vid, const XrdSecEntity* client) { static const char* epname = "IsMaster"; // TODO (esindril): maybe enable SSS at some point // REQUIRE_SSS_OR_LOCAL_AUTH; if (!gOFS->mMaster->IsMaster()) { return Emsg(epname, error, ENOENT, "find master file [ENOENT]", ""); } const char* ok = "OK"; error.setErrInfo(strlen(ok) + 1, ok); return SFS_DATA; }