// ---------------------------------------------------------------------- // Fileq: XrdFstOfs.cc // Author: Andreas-Joachim Peters - CERN // ---------------------------------------------------------------------- /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2011 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "fst/XrdFstOfs.hh" #include "fst/XrdFstOss.hh" #include "fst/Config.hh" #include "fst/filemd/FmdAttr.hh" #include "fst/checksum/ChecksumPlugins.hh" #include "fst/http/HttpServer.hh" #include "fst/storage/FileSystem.hh" #include "fst/storage/Storage.hh" #include "fst/Messaging.hh" #include "fst/Deletion.hh" #include "fst/Verify.hh" #include "fst/utils/XrdOfsPathHandler.hh" #include "common/PasswordHandler.hh" #include "common/FileId.hh" #include "common/FileSystem.hh" #include "common/Path.hh" #include "common/Statfs.hh" #include "common/SyncAll.hh" #include "common/StackTrace.hh" #include "common/Timing.hh" #include "common/eos_cta_pb/EosCtaAlertHandler.hh" #include "common/Constants.hh" #include "common/StringConversion.hh" #include "common/StringTokenizer.hh" #include "common/StringUtils.hh" #include "common/SymKeys.hh" #include "common/XattrCompat.hh" #include "common/ParseUtils.hh" #include "common/ShellCmd.hh" #include "common/BufferManager.hh" #include "common/async/ExecutorMgr.hh" #include "namespace/interface/IFileMD.hh" #include "XrdNet/XrdNetOpts.hh" #include "XrdNet/XrdNetUtils.hh" #include "XrdOfs/XrdOfs.hh" #include "XrdOuc/XrdOucHash.hh" #include "XrdOuc/XrdOucTrace.hh" #include "XrdSfs/XrdSfsAio.hh" #include "Xrd/XrdScheduler.hh" #include "XrdCl/XrdClFileSystem.hh" #include "XrdCl/XrdClDefaultEnv.hh" #include "XrdVersion.hh" #include "qclient/Members.hh" #include "qclient/QClient.hh" #include "qclient/shared/SharedManager.hh" #include "proto/Delete.pb.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include // The global OFS handle eos::fst::XrdFstOfs eos::fst::gOFS; extern XrdSysError OfsEroute; extern XrdOss* XrdOfsOss; extern XrdOfs* XrdOfsFS; extern XrdSysTrace OfsTrace; // Set the version information XrdVERSIONINFO(XrdSfsGetFileSystem2, FstOfs); #ifdef COVERAGE_BUILD // Forward declaration of gcov flush API extern "C" void __gcov_flush(); #endif //------------------------------------------------------------------------------ // XRootD OFS interface implementation //------------------------------------------------------------------------------ extern "C" { XrdSfsFileSystem* XrdSfsGetFileSystem2(XrdSfsFileSystem* nativeFS, XrdSysLogger* Logger, const char* configFn, XrdOucEnv* envP) { if (XrdOfsFS) { return XrdOfsFS; } OfsEroute.SetPrefix("FstOfs_"); OfsEroute.logger(Logger); // Disable XRootD log rotation Logger->setRotate(0); std::ostringstream oss; oss << "FstOfs (Object Storage File System) " << VERSION; XrdOucString version = "FstOfs (Object Storage File System) "; OfsEroute.Say("++++++ (c) 2010 CERN/IT-DSS ", oss.str().c_str()); // Initialize the subsystems eos::fst::gOFS.ConfigFN = (configFn && *configFn ? strdup(configFn) : 0); fprintf(stderr, "pcrctl= %d\n", prctl(PR_SET_KEEPCAPS, 1, 0, 0, 0)); if (eos::fst::gOFS.Configure(OfsEroute, envP)) { return 0; } XrdOfsFS = &eos::fst::gOFS; return &eos::fst::gOFS; } } EOSFSTNAMESPACE_BEGIN //------------------------------------------------------------------------------ // Get stacktrace from crashing process //------------------------------------------------------------------------------ void XrdFstOfs::xrdfstofs_stacktrace(int sig) { (void) signal(SIGINT, SIG_IGN); (void) signal(SIGTERM, SIG_IGN); (void) signal(SIGQUIT, SIG_IGN); void* array[10]; size_t size; // Get void*'s for all entries on the stack size = backtrace(array, 10); // Print out all the frames to stderr fprintf(stderr, "error: received signal %d:\n", sig); backtrace_symbols_fd(array, size, 2); eos::common::StackTrace::GdbTrace(0 , getpid(), "thread apply all bt"); if (getenv("EOS_CORE_DUMP")) { eos::common::StackTrace::GdbTrace(0 , getpid(), "generate-core-file"); } // Now we put back the initial handler and send the signal again signal(sig, SIG_DFL); kill(getpid(), sig); int wstatus = 0; wait(&wstatus); } //------------------------------------------------------------------------------ // Print coverage data //------------------------------------------------------------------------------ void XrdFstOfs::xrdfstofs_coverage(int sig) { #ifdef COVERAGE_BUILD eos_static_notice("msg=\"printing coverage data\""); __gcov_flush(); return; #endif eos_static_notice("msg=\"compiled without coverage support\""); } //------------------------------------------------------------------------------ // FST shutdown procedure //------------------------------------------------------------------------------ void XrdFstOfs::xrdfstofs_shutdown(int sig) { static XrdSysMutex ShutDownMutex; ShutDownMutex.Lock(); // this handler goes only one-shot .. sorry ! gOFS.sShutdown = true; pid_t watchdog; pid_t ppid = getpid(); if (!(watchdog = fork())) { eos::common::SyncAll::AllandClose(); // Sleep for an amount of time proportional to the number of filesystems // on the current machine auto timeout = std::chrono::seconds(gOFS.Storage->GetFSCount() * 5); std::this_thread::sleep_for(timeout); fprintf(stderr, "@@@@@@ 00:00:00 op=shutdown msg=\"shutdown timedout after " "%li seconds, signal=%i\n", timeout.count(), sig); if (ppid > 1) { kill(ppid, 9); } fprintf(stderr, "@@@@@@ 00:00:00 %s", "op=shutdown status=forced-complete\n"); kill(getpid(), 9); } // Handler to shutdown the daemon for valgrinding and clean server stop // (e.g. let time to finish write operations) if (gOFS.mFstMessaging) { gOFS.mFstMessaging->StopListener(); // stop any communication delete gOFS.mFstMessaging; gOFS.mFstMessaging = nullptr; } eos_static_warning("%s", "op=shutdown msg=\"stopped messaging\""); gOFS.Storage->Shutdown(); eos_static_warning("%s", "op=shutdown msg=\"stopped storage activities\""); if (watchdog > 1) { kill(watchdog, 9); } int wstatus = 0; wait(&wstatus); // Close all file descriptors we can sync or are sockets eos::common::SyncAll::AllandCloseFileSocks(); eos_static_warning("%s", "op=shutdown status=completed"); // harakiri - yes! (void) signal(SIGABRT, SIG_IGN); (void) signal(SIGINT, SIG_IGN); (void) signal(SIGTERM, SIG_IGN); (void) signal(SIGQUIT, SIG_IGN); kill(getpid(), 9); } //------------------------------------------------------------------------------ // FST "graceful" shutdown procedure //------------------------------------------------------------------------------ void XrdFstOfs::xrdfstofs_graceful_shutdown(int sig) { using namespace eos::common; eos_static_info("entering the \"graceful\" shutdown procedure"); pid_t watchdog; static XrdSysMutex grace_shutdown_mtx; grace_shutdown_mtx.Lock(); gOFS.sShutdown = true; const char* swait = getenv("EOS_GRACEFUL_SHUTDOWN_TIMEOUT"); std::int64_t wait = (swait ? std::strtol(swait, nullptr, 10) : 390); pid_t ppid = getpid(); if (!(watchdog = fork())) { std::this_thread::sleep_for(std::chrono::seconds(wait)); SyncAll::AllandClose(); std::this_thread::sleep_for(std::chrono::seconds(15)); fprintf(stderr, "@@@@@@ 00:00:00 %s %li seconds\"\n", "op=shutdown msg=\"shutdown timedout after ", wait); if (ppid > 1) { kill(ppid, 9); } fprintf(stderr, "@@@@@@ 00:00:00 %s", "op=shutdown status=forced-complete"); kill(getpid(), 9); } // Stop any communication - this will also stop scheduling to this node eos_static_warning("op=shutdown msg=\"stop messaging\""); if (gOFS.mFstMessaging) { gOFS.mFstMessaging->StopListener(); // stop any communication delete gOFS.mFstMessaging; gOFS.mFstMessaging = nullptr; } // Wait for 60 seconds heartbeat timeout (see mgm/FsView) + 30 seconds // for in-flight redirections eos_static_warning("op=shutdown msg=\"wait 90 seconds for configuration " "propagation\""); std::chrono::seconds config_timeout(60 + 30); std::this_thread::sleep_for(config_timeout); std::chrono::seconds io_timeout((std::int64_t)(wait * 0.9)); if (gOFS.WaitForOngoingIO(io_timeout)) { eos_static_warning("%s", "op=shutdown msg=\"successful graceful IO shutdown\""); } else { eos_static_err("%s", "op=shutdown msg=\"failed graceful IO shutdown\""); } eos_static_warning("%s", "op=shutdown msg=\"storage object shutdown\""); gOFS.Storage->Shutdown(); if (watchdog > 1) { kill(watchdog, 9); } int wstatus = 0; ::wait(&wstatus); // Close all file descriptors we can sync or are sockets SyncAll::AllandCloseFileSocks(); eos_static_warning("%s", "op=shutdown status=completed"); // harakiri - yes! (void) signal(SIGABRT, SIG_IGN); (void) signal(SIGINT, SIG_IGN); (void) signal(SIGTERM, SIG_IGN); (void) signal(SIGQUIT, SIG_IGN); (void) signal(SIGUSR1, SIG_IGN); kill(getpid(), 9); } //------------------------------------------------------------------------------ // Constructor //------------------------------------------------------------------------------ XrdFstOfs::XrdFstOfs() : eos::common::LogId(), mFstMessaging(nullptr), Storage(nullptr), mHostName(NULL), mMqOnQdb(false), mHttpd(nullptr), mGeoTag("nogeotag"), mXrdBuffPool(eos::common::KB, 32 * eos::common::MB), mCloseThreadPool(8, 64, 5, 6, 5, "async_close"), mMgmXrdPool(nullptr), mSimIoReadErr(false), mSimIoWriteErr(false), mSimXsReadErr(false), mSimXsWriteErr(false), mSimFmdOpenErr(false), mSimErrIoReadOff(0ull), mSimErrIoWriteOff(0ull), mSimDiskWriting(false), mSimCloseErr(false), mSimUnresponsive(false) { Eroute = 0; TransferScheduler = 0; TpcMap.resize(2); TpcMap[0].set_deleted_key(""); // readers TpcMap[1].set_deleted_key(""); // writers if (!getenv("EOS_NO_SHUTDOWN")) { // Add shutdown handler (void) signal(SIGINT, xrdfstofs_shutdown); (void) signal(SIGTERM, xrdfstofs_shutdown); (void) signal(SIGQUIT, xrdfstofs_shutdown); // Add graceful shutdown handler (void) signal(SIGUSR1, xrdfstofs_graceful_shutdown); } if (getenv("EOS_COVERAGE_REPORT")) { // Add coverage report handler (void) signal(SIGPROF, xrdfstofs_coverage); } if (getenv("EOS_FST_ENABLE_STACKTRACE")) { // Add stacktrace handler - this is useful for crashes inside containers // where abrtd is not configured (void) signal(SIGSEGV, xrdfstofs_stacktrace); (void) signal(SIGABRT, xrdfstofs_stacktrace); (void) signal(SIGBUS, xrdfstofs_stacktrace); } if (getenv("EOS_MGM_ALIAS")) { // Use MGM alias if available mMgmAlias = getenv("EOS_MGM_ALIAS"); } if (getenv("EOS_FST_ALIAS")) { gConfig.HostAlias = getenv("EOS_FST_ALIAS"); fprintf(stderr, "Setting host alias to %s\n", gConfig.HostAlias.c_str()); } if (getenv("EOS_FST_PORT_ALIAS")) { gConfig.PortAlias = getenv("EOS_FST_PORT_ALIAS"); } // Initialize the google sparse hash maps gOFS.WNoDeleteOnCloseFid.clear_deleted_key(); gOFS.WNoDeleteOnCloseFid.set_deleted_key(0); setenv("EOSFSTOFS", std::to_string((unsigned long long)this).c_str(), 1); if (getenv("EOS_FST_CALL_MANAGER_XRD_POOL")) { int max_size = 10; const char* csize {nullptr}; if ((csize = getenv("EOS_FST_CALL_MANAGER_XRD_POOL_SIZE"))) { try { max_size = std::stoi(csize); if (max_size < 1) { max_size = 1; } if (max_size > 32) { max_size = 32; } } catch (...) { // ignore } } mMgmXrdPool.reset(new eos::common::XrdConnPool(true, max_size)); fprintf(stderr, "Config Enabled CallManager xrootd connection pool with " "size=%i\n", max_size); } if (getenv("EOS_FST_FSCK_DELETE_BY_MOVE")) { mEnvFsckDeleteByMove = true; } UpdateTpcKeyValidity(); } //------------------------------------------------------------------------------ // Destructor //------------------------------------------------------------------------------ XrdFstOfs::~XrdFstOfs() { if (mHostName) { free(const_cast(mHostName)); } } //------------------------------------------------------------------------------ // Get a new OFS directory object - not implemented //------------------------------------------------------------------------------ XrdSfsDirectory* XrdFstOfs::newDir(char* user, int MonID) { return (XrdSfsDirectory*)(0); } //------------------------------------------------------------------------------ // Get a new OFS file object //----------------------------------------------------------------------------- XrdSfsFile* XrdFstOfs::newFile(char* user, int MonID) { return static_cast(new XrdFstOfsFile(user, MonID)); } //------------------------------------------------------------------------------ // OFS layer configuration //------------------------------------------------------------------------------ int XrdFstOfs::Configure(XrdSysError& Eroute, XrdOucEnv* envP) { char* var; const char* val; int cfgFD; int NoGo = 0; eos::common::StringConversion::InitLookupTables(); { // Run a dummy command so that the ShellExecutor is forked before any XrdCl // is initialized. Otherwise it might segv due to the following bug: // https://github.com/xrootd/xrootd/issues/1515 eos::common::ShellCmd dummy_cmd("uname -a"); } if (XrdOfs::Configure(Eroute, envP)) { Eroute.Emsg("Config", "default OFS configuration failed"); return SFS_ERROR; } // Enforcing 'sss' authentication for all communications if (!getenv("EOS_FST_NO_SSS_ENFORCEMENT")) { setenv("XrdSecPROTOCOL", "sss", 1); Eroute.Say("=====> fstofs enforces SSS authentication for XROOT clients"); } else { Eroute.Say("=====> fstofs does not enforce SSS authentication for XROOT" " clients - make sure MGM enforces sss for this FST!"); } // Get the hostname const char* errtext = 0; mHostName = XrdNetUtils::MyHostName(0, &errtext); if (!mHostName) { Eroute.Emsg("Config", "hostname is invalid : %s", mHostName); return 1; } TransferScheduler = new XrdScheduler(&Eroute, &OfsTrace, 8, 128, 60); TransferScheduler->Start(); gConfig.autoBoot = false; gConfig.FstOfsBrokerUrl = "root://localhost:1097//eos/"; if (getenv("EOS_BROKER_URL")) { gConfig.FstOfsBrokerUrl = getenv("EOS_BROKER_URL"); } // Handle geotag configuration char* ptr_geotag = getenv("EOS_GEOTAG"); if (ptr_geotag) { mGeoTag = eos::common::SanitizeGeoTag(ptr_geotag); if (mGeoTag != ptr_geotag) { Eroute.Emsg("Config", mGeoTag.c_str()); return 1; } } { // set the start date as string XrdOucString out = ""; time_t t = time(NULL); struct tm* timeinfo; timeinfo = localtime(&t); out = asctime(timeinfo); out.erase(out.length() - 1); gConfig.StartDate = out.c_str(); } gConfig.FstMetaLogDir = "/var/tmp/eos/md/"; gConfig.FstAuthDir = "/var/eos/auth/"; setenv("XrdClientEUSER", "daemon", 1); SetXrdClConfig(); // Extract the manager from the config file XrdOucStream Config(&Eroute, getenv("XRDINSTANCE")); if (!ConfigFN || !*ConfigFN) { // this error will be reported by XrdOfsFS.Configure } else { // Try to open the configuration file. if ((cfgFD = open(ConfigFN, O_RDONLY, 0)) < 0) { return Eroute.Emsg("Config", errno, "open config file fn=", ConfigFN); } Config.Attach(cfgFD); // Now start reading records until eof. while ((var = Config.GetMyFirstWord())) { if (!strncmp(var, "fstofs.", 7)) { var += 7; // we parse config variables here if (!strcmp("broker", var)) { if (!(val = Config.GetWord())) { Eroute.Emsg("Config", "argument 2 for broker missing. Should be " "URL like root:////"); NoGo = 1; } else { if (getenv("EOS_BROKER_URL")) { gConfig.FstOfsBrokerUrl = getenv("EOS_BROKER_URL"); } else { gConfig.FstOfsBrokerUrl = val; } } } if (!strcmp("trace", var)) { if (!(val = Config.GetWord())) { Eroute.Emsg("Config", "argument 2 for trace missing. Can be 'client'"); NoGo = 1; } else { //EnvPutInt( NAME_DEBUG, 3); } } if (!strcmp("autoboot", var)) { if ((!(val = Config.GetWord())) || (strcmp("true", val) && strcmp("false", val) && strcmp("1", val) && strcmp("0", val))) { Eroute.Emsg("Config", "argument 2 for autobootillegal or missing. " "Must be ,,<1> or <0>!"); NoGo = 1; } else { if ((!strcmp("true", val) || (!strcmp("1", val)))) { gConfig.autoBoot = true; } } } if (!strcmp("metalog", var)) { if (!(val = Config.GetWord())) { Eroute.Emsg("Config", "argument 2 for metalog missing"); NoGo = 1; } else { if (strlen(val)) { gConfig.FstMetaLogDir = val; if (val[strlen(val) - 1] != '/') { gConfig.FstMetaLogDir += '/'; } } } } if (!strcmp("authdir", var)) { if (!(val = Config.GetWord())) { Eroute.Emsg("Config", "argument 2 for authdir missing"); NoGo = 1; } else { if (strlen(val)) { gConfig.FstAuthDir = val; if (val[strlen(val) - 1] != '/') { gConfig.FstAuthDir += '/'; } } } } if (!strcmp("protowfendpoint", var)) { if ((val = Config.GetWord())) { gConfig.ProtoWFEndpoint = val; } } if (!strcmp("protowfresource", var)) { if ((val = Config.GetWord())) { gConfig.ProtoWFResource = val; } } if (!strcmp("qdbcluster", var)) { std::string qdb_cluster; while ((val = Config.GetWord())) { qdb_cluster += val; qdb_cluster += " "; } Eroute.Say("=====> fstofs.qdbcluster : ", qdb_cluster.c_str()); if (!qdb_cluster.empty()) { if (!mQdbContactDetails.members.parse(qdb_cluster)) { Eroute.Emsg("Config", "failed to parse qdbcluster members"); NoGo = 1; } } } if (!strcmp("qdbpassword", var)) { while ((val = Config.GetWord())) { mQdbContactDetails.password += val; } // Trim whitespace at the end common::PasswordHandler::rightTrimWhitespace(mQdbContactDetails.password); std::string pwlen = std::to_string(mQdbContactDetails.password.size()); Eroute.Say("=====> fstofs.qdbpassword length : ", pwlen.c_str()); } if (!strcmp("qdbpassword_file", var)) { std::string path; while ((val = Config.GetWord())) { path += val; } if (!common::PasswordHandler::readPasswordFile(path, mQdbContactDetails.password)) { Eroute.Emsg("Config", "failed to open path pointed to by qdbpassword_file"); NoGo = 1; } std::string pwlen = std::to_string(mQdbContactDetails.password.size()); Eroute.Say("=====> fstofs.qdbpassword length : ", pwlen.c_str()); } if (!strcmp("mq_implementation", var)) { std::string value; while ((val = Config.GetWord())) { value += val; } if (value == "qdb") { mMqOnQdb = true; } else { Eroute.Emsg("Config", "unrecognized value for mq_implementation"); NoGo = 1; } Eroute.Say("=====> fstofs.mq_implementation : ", value.c_str()); } } } Config.Close(); close(cfgFD); } if (NoGo) { return 1; } if (!mQdbContactDetails.members.empty() && mQdbContactDetails.password.empty()) { Eroute.Say("=====> Configuration error: Found QDB cluster members, but no password." " EOS will only connect to password-protected QDB instances. (fstofs.qdbpassword / fstofs.qdbpassword_file missing)"); return 1; } if (gConfig.autoBoot) { Eroute.Say("=====> fstofs.autoboot : true"); } else { Eroute.Say("=====> fstofs.autoboot : false"); } // Create the qclient shared by all file systems for doing the ns scan for // the fsck consistency checks if (!mQdbContactDetails.empty()) { mFsckQcl.reset(new qclient::QClient(mQdbContactDetails.members, mQdbContactDetails.constructOptions())); } if (!gConfig.FstOfsBrokerUrl.endswith("/")) { gConfig.FstOfsBrokerUrl += "/"; } if (!mFmdHandler) { mFmdHandler.reset(new FmdAttrHandler(makeFSPathHandler(this))); Eroute.Say("=====> fstofs.filemd_handler : attr"); } gConfig.FstDefaultReceiverQueue = gConfig.FstOfsBrokerUrl; gConfig.FstOfsBrokerUrl += mHostName; gConfig.FstOfsBrokerUrl += ":"; gConfig.FstOfsBrokerUrl += myPort; gConfig.FstOfsBrokerUrl += "/fst"; gConfig.FstHostPort = mHostName; gConfig.FstHostPort += ":"; gConfig.FstHostPort += myPort; gConfig.KernelVersion = GetKernelRelease().c_str(); Eroute.Say("=====> fstofs.broker : ", gConfig.FstOfsBrokerUrl.c_str(), ""); // Extract our queue name gConfig.FstQueue = gConfig.FstOfsBrokerUrl; { int pos1 = gConfig.FstQueue.find("//"); int pos2 = gConfig.FstQueue.find("//", pos1 + 2); if (pos2 != STR_NPOS) { gConfig.FstQueue.erase(0, pos2 + 1); } else { Eroute.Emsg("Config", "cannot determine my queue name: ", gConfig.FstQueue.c_str()); return 1; } } // Create our wildcard broadcast name gConfig.FstQueueWildcard = gConfig.FstQueue; gConfig.FstQueueWildcard += "/*"; // Create our wildcard config broadcast name gConfig.FstConfigQueueWildcard = "*/"; gConfig.FstConfigQueueWildcard += mHostName; gConfig.FstConfigQueueWildcard += ":"; gConfig.FstConfigQueueWildcard += myPort; // Set logging parameters XrdOucString unit = "fst@"; unit += mHostName; unit += ":"; unit += myPort; // Setup the circular in-memory log buffer eos::common::Logging& g_logging = eos::common::Logging::GetInstance(); g_logging.SetLogPriority(LOG_INFO); g_logging.SetUnit(unit.c_str()); // Get the XRootD log directory char* logdir = 0; XrdOucEnv::Import("XRDLOGDIR", logdir); if (logdir) { eoscpTransferLog = logdir; eoscpTransferLog += "eoscp.log"; } Eroute.Say("=====> eoscp-log : ", eoscpTransferLog.c_str()); // Compute checksum of the keytab file std::string kt_cks = GetKeytabChecksum("/etc/eos.keytab"); gConfig.KeyTabAdler = kt_cks.c_str(); // Create the messaging object(recv thread) gConfig.FstDefaultReceiverQueue += "*/mgm"; int pos1 = gConfig.FstDefaultReceiverQueue.find("//"); int pos2 = gConfig.FstDefaultReceiverQueue.find("//", pos1 + 2); if (pos2 != STR_NPOS) { gConfig.FstDefaultReceiverQueue.erase(0, pos2 + 1); } Eroute.Say("=====> fstofs.defaultreceiverqueue : ", gConfig.FstDefaultReceiverQueue.c_str(), ""); // Set our Eroute for XrdMqMessage XrdMqMessage::Eroute = OfsEroute; { // Setup auth dir XrdOucString scmd = "mkdir -p "; scmd += gConfig.FstAuthDir; scmd += " ; chown -R daemon "; scmd += gConfig.FstAuthDir; scmd += " ; chmod 700 "; scmd += gConfig.FstAuthDir; int src = system(scmd.c_str()); if (src) { eos_err("%s returned %d", scmd.c_str(), src); } if (access(gConfig.FstAuthDir.c_str(), R_OK | W_OK | X_OK)) { Eroute.Emsg("Config", "cannot access the auth directory for r/w: ", gConfig.FstAuthDir.c_str()); return 1; } Eroute.Say("=====> fstofs.authdir : ", gConfig.FstAuthDir.c_str()); } // Enable the shared object notification queue ObjectManager.mEnableQueue = true; ObjectManager.SetAutoReplyQueue("/eos/*/mgm"); ObjectManager.SetDebug(false); // Enable experimental MQ on QDB? Note that any functionality not supported // will fallback to regular MQ, which is still required. // Shared object manager to be used qclient::SharedManager* qsm = nullptr; if (getenv("EOS_USE_MQ_ON_QDB")) { eos_static_notice("%s", "msg=\"running SharedManager via QDB i.e NO-MQ\""); qsm = new qclient::SharedManager(mQdbContactDetails.members, mQdbContactDetails.constructSubscriptionOptions()); mMessagingRealm.reset(new mq::MessagingRealm(nullptr, nullptr, nullptr, qsm)); } else { eos_static_notice("%s", "msg=\"running SharedManager via MQ\""); mMessagingRealm.reset(new mq::MessagingRealm(&ObjectManager, &ObjectNotifier, &XrdMqMessaging::gMessageClient, qsm)); } if (!mMessagingRealm->haveQDB()) { ObjectNotifier.SetShareObjectManager(&ObjectManager); if (!ObjectNotifier.Start()) { eos_crit("%s", "msg=\"error starting the shared object change notifier\""); return 1; } // Create the specific listener class when running with MQ mFstMessaging = new eos::fst::Messaging(gConfig.FstOfsBrokerUrl.c_str(), gConfig.FstDefaultReceiverQueue.c_str(), false, false, &ObjectManager); mFstMessaging->SetLogId("FstOfsMessaging", ""); if (!mFstMessaging->StartListenerThread() || mFstMessaging->IsZombie()) { eos_static_crit("%s", "msg=\"failed to start messaging\""); Eroute.Emsg("Config", "cannot create messaging object(thread)"); return 1; } } // Attach Storage to the meta log dir Storage = eos::fst::Storage::Create( gConfig.FstMetaLogDir.c_str()); Eroute.Say("=====> fstofs.metalogdir : ", gConfig.FstMetaLogDir.c_str()); if (!Storage) { Eroute.Emsg("Config", "cannot setup meta data storage using directory: ", gConfig.FstMetaLogDir.c_str()); return 1; } // Request broadcasts after the Communicator thread is started inside the // Storage class otherwise we might miss the updates. Practice show this is // not enough and we might need to sleep for a couple of seconds to have the // communicator thread up and running. std::this_thread::sleep_for(std::chrono::seconds(2)); if (!mMessagingRealm->haveQDB()) { RequestBroadcasts(); // Start dumper thread XrdOucString dumperfile = gConfig.FstMetaLogDir; dumperfile += "so.fst.dump."; dumperfile += gConfig.FstHostPort; ObjectManager.StartDumper(dumperfile.c_str()); } // Start the embedded HTTP server mHttpdPort = 8001; if (getenv("EOS_FST_HTTP_PORT")) { try { mHttpdPort = std::stol(getenv("EOS_FST_HTTP_PORT")); } catch (...) { // no change } } mHttpd.reset(new eos::fst::HttpServer(mHttpdPort)); if (mHttpd) { const char* ptr = getenv("EOS_FST_ENABLE_LIBMICROHTTPD"); if (ptr && (strncmp(ptr, "1", 1) == 0)) { if (!mHttpd->Start()) { eos_static_warning("%s", "msg=\"failed to start libmicrohttpd\""); } else { eos_static_notice("%s", "msg=\"successfully started libmicrohttpd\""); } } else { eos_static_notice("%s", "msg=\"libmicrohttpd is disabled\""); } } else { eos_static_crit("%s", "msg=\"failed to allocate HttpServer object\""); NoGo = 1; } eos_notice("FST_HOST=%s FST_PORT=%ld FST_HTTP_PORT=%d VERSION=%s RELEASE=%s " "KEYTABADLER=%s", mHostName, myPort, mHttpdPort, VERSION, RELEASE, kt_cks.c_str()); return NoGo; } //------------------------------------------------------------------------------ // Define error bool variables to en-/disable error simulation in the OFS layer //------------------------------------------------------------------------------ void XrdFstOfs::SetSimulationError(const std::string& input) { mSimIoReadErr = mSimIoWriteErr = mSimXsReadErr = mSimXsWriteErr = mSimFmdOpenErr = false; mSimErrIoReadOff = mSimErrIoWriteOff = 0ull; mSimDiskWriting = mSimCloseErr = mSimUnresponsive = false; if (input.find("io_read") == 0) { mSimIoReadErr = true; mSimErrIoReadOff = GetSimulationErrorOffset(input); } if (input.find("io_write") == 0) { mSimIoWriteErr = true; mSimErrIoWriteOff = GetSimulationErrorOffset(input); } if (input.find("xs_read") == 0) { mSimXsReadErr = true; } if (input.find("xs_write") == 0) { mSimXsWriteErr = true; } if (input.find("fmd_open") == 0) { mSimFmdOpenErr = true; } if (input.find("fake_write") == 0) { mSimDiskWriting = true; } if (input.find("close") == 0) { mSimCloseErr = true; } if (input.find("unresponsive") == 0) { mSimUnresponsive = true; } } //------------------------------------------------------------------------------ // Get simulation error offset. Parse the last characters and return the // desired offset e.g. io_read_8M should return 8MB //----------------------------------------------------------------------------- uint64_t XrdFstOfs::GetSimulationErrorOffset(const std::string& input) const { uint64_t offset {0ull}; size_t num = std::count(input.begin(), input.end(), '_'); if ((num < 2) || (*input.rbegin() == '_')) { return offset; } size_t pos = input.rfind('_'); std::string soff = input.substr(pos + 1); offset = eos::common::StringConversion::GetDataSizeFromString(soff.c_str()); return offset; } //------------------------------------------------------------------------------ // Stat path //------------------------------------------------------------------------------ int XrdFstOfs::stat(const char* path, struct stat* buf, XrdOucErrInfo& out_error, const XrdSecEntity* client, const char* opaque) { EPNAME("stat"); memset(buf, 0, sizeof(struct stat)); XrdOucString url = path; if (url.beginswith("/#/")) { url.replace("/#/", ""); XrdOucString url64; eos::common::SymKey::DeBase64(url, url64); fprintf(stderr, "doing stat for %s\n", url64.c_str()); // use an IO object to stat this ... std::unique_ptr io(eos::fst::FileIoPlugin::GetIoObject(url64.c_str())); if (io) { if (io->fileStat(buf)) { return gOFS.Emsg(epname, out_error, errno, "stat file", url64.c_str()); } else { return SFS_OK; } } else { return gOFS.Emsg(epname, out_error, EINVAL, "stat file - IO object not supported", url64.c_str()); } } if (!XrdOfsOss->Stat(path, buf)) { // we store the mtime.ns time in st_dev ... sigh@Xrootd ... #ifdef __APPLE__ unsigned long nsec = buf->st_mtimespec.tv_nsec; #else unsigned long nsec = buf->st_mtim.tv_nsec; #endif // mask for 10^9 nsec &= 0x7fffffff; // enable bit 32 as indicator nsec |= 0x80000000; // overwrite st_dev buf->st_dev = nsec; return SFS_OK; } else { return gOFS.Emsg(epname, out_error, errno, "stat file", path); } } //------------------------------------------------------------------------------ // CallManager function //------------------------------------------------------------------------------ int XrdFstOfs::CallManager(XrdOucErrInfo* error, const char* path, const char* manager, XrdOucString& capOpaqueFile, XrdOucString* return_result, unsigned short timeout, bool use_xrd_conn_pool, bool retry) { EPNAME("CallManager"); int rc = SFS_OK; XrdOucString msg = ""; XrdCl::Buffer arg; XrdCl::XRootDStatus status; XrdOucString address = "root://"; XrdOucString lManager; size_t tried = 0; if (!manager) { // use the broadcasted manager name XrdSysMutexHelper lock(gConfig.Mutex); lManager = gConfig.Manager.c_str(); address += lManager.c_str(); } else { address += manager; } address += "//dummy?xrd.wantprot=sss"; XrdCl::URL url(address.c_str()); if (!url.IsValid()) { eos_err("error=URL is not valid: %s", address.c_str()); return EINVAL; } // Use xrd connection pool if is requested by the caller and this is // allowed globally. std::unique_ptr conn_helper; if (use_xrd_conn_pool) { if (getenv("EOS_FST_CALL_MANAGER_XRD_POOL")) { conn_helper.reset(new eos::common::XrdConnIdHelper(*mMgmXrdPool, url)); if (conn_helper->HasNewConnection()) { eos_info("msg=\"using url=%s\"", url.GetURL().c_str()); } } } // Request sss authentication on the MGM side std::string opaque = capOpaqueFile.c_str(); // Get XrdCl::FileSystem object // !!! WATCH OUT: GOTO ANCHOR !!! std::unique_ptr fs; std::unique_ptr response; XrdCl::Buffer* responseRaw = nullptr; again: fs.reset(new XrdCl::FileSystem(url)); if (!fs) { eos_err("error=failed to get new FS object"); if (error) { gOFS.Emsg(epname, *error, ENOMEM, "allocate FS object calling the manager node for fn=", path); } return EINVAL; } arg.FromString(opaque); status = fs->Query(XrdCl::QueryCode::OpaqueFile, arg, responseRaw, timeout); response.reset(responseRaw); responseRaw = nullptr; if (status.IsOK()) { eos_static_debug("msg=\"MGM query succeeded\" opaque=\"%s\"", opaque.c_str()); rc = SFS_OK; } else { eos_static_err("msg=\"MGM query failed\" opaque=\"%s\"", opaque.c_str()); msg = (status.GetErrorMessage().c_str()); rc = SFS_ERROR; if (msg.find("[EIDRM]") != STR_NPOS) { rc = EIDRM; } if (msg.find("[EBADE]") != STR_NPOS) { rc = EBADE; } if (msg.find("[EBADR]") != STR_NPOS) { rc = EBADR; } if (msg.find("[EINVAL]") != STR_NPOS) { rc = EINVAL; } if (msg.find("[EADV]") != STR_NPOS) { rc = EADV; } if (msg.find("[EAGAIN]") != STR_NPOS) { rc = EAGAIN; } if (msg.find("[ENOTCONN]") != STR_NPOS) { rc = ENOTCONN; } if (msg.find("[EPROTO]") != STR_NPOS) { rc = EPROTO; } if (msg.find("[EREMCHG]") != STR_NPOS) { rc = EREMCHG; } if (rc != SFS_ERROR) { return gOFS.Emsg(epname, *error, rc, msg.c_str(), path); } else { eos_static_err("msg=\"query error\" status=%d code=%d", status.status, status.code); if (retry && (status.code >= 100) && (status.code <= 300) && (!timeout)) { // implement automatic retry - network errors will be cured at some point std::this_thread::sleep_for(std::chrono::seconds(1)); tried++; eos_static_info("msg=\"retry query\" query=\"%s\"", opaque.c_str()); if (!manager || (tried > 60)) { // use the broadcasted manager name in the repeated try XrdSysMutexHelper lock(gConfig.Mutex); lManager = gConfig.Manager.c_str(); address = "root://"; address += lManager.c_str(); address += "//dummy"; url.Clear(); url.FromString((address.c_str())); } goto again; } return gOFS.Emsg(epname, *error, ECOMM, msg.c_str(), path); } } if (response && return_result) { *return_result = response->GetBuffer(); } return rc; } //------------------------------------------------------------------------------ // Remove entry - interface function //------------------------------------------------------------------------------ int XrdFstOfs::rem(const char* path, XrdOucErrInfo& error, const XrdSecEntity* client, const char* opaque) { EPNAME("rem"); XrdOucString stringOpaque = opaque; stringOpaque.replace("?", "&"); stringOpaque.replace("&&", "&"); XrdOucEnv openOpaque(stringOpaque.c_str()); XrdOucEnv* capOpaque = 0; int caprc = 0; if ((caprc = eos::common::SymKey::ExtractCapability(&openOpaque, capOpaque))) { // No capability - go away! if (capOpaque) { delete capOpaque; capOpaque = 0; } return gOFS.Emsg(epname, error, caprc, "remove - capability illegal", path); } int envlen; if (capOpaque) { eos_info("path=%s info=%s capability=%s", path, opaque, capOpaque->Env(envlen)); } else { eos_info("path=%s info=%s", path, opaque); return gOFS.Emsg(epname, error, caprc, "remove - empty capability", path); } int rc = _rem(path, error, client, capOpaque); if (capOpaque) { delete capOpaque; capOpaque = 0; } return rc; } //------------------------------------------------------------------------------ // Remove entry - low level function //------------------------------------------------------------------------------ int XrdFstOfs::_rem(const char* path, XrdOucErrInfo& error, const XrdSecEntity* client, XrdOucEnv* capOpaque, const char* fstpath, unsigned long long fid, unsigned long fsid, bool ignoreifnotexist, std::string* deletion_report) { EPNAME("rem"); std::string fstPath = ""; const char* localprefix = 0; const char* hexfid = 0; const char* sfsid = 0; if ((!fstpath) && (!fsid) && (!fid)) { // Standard deletion brings all information via the opaque info if (!(localprefix = capOpaque->Get("mgm.localprefix"))) { return gOFS.Emsg(epname, error, EINVAL, "open - no local prefix in capability", path); } if (!(hexfid = capOpaque->Get("mgm.fid"))) { return gOFS.Emsg(epname, error, EINVAL, "open - no file id in capability", path); } if (!(sfsid = capOpaque->Get("mgm.fsid"))) { return gOFS.Emsg(epname, error, EINVAL, "open - no file system id in capability", path); } fstPath = eos::common::FileId::FidPrefix2FullPath(hexfid, localprefix); fid = eos::common::FileId::Hex2Fid(hexfid); fsid = atoi(sfsid); } else { // Deletion during close provides the local storage path, fid & fsid fstPath = fstpath; } eos_info("fstpath=%s", fstPath.c_str()); int rc = 0; errno = 0; // If file not found this will be ENOENT struct stat sbd; sbd.st_size = 0; // Unlink file and possible blockxs file - for local files we need to go // through XrdOfs::rem to also clean up any potential blockxs files if (eos::common::LayoutId::GetIoType(fstPath.c_str()) == eos::common::LayoutId::kLocal) { // get the size before deletion XrdOfs::stat(fstPath.c_str(), &sbd, error, client, 0); rc = XrdOfs::rem(fstPath.c_str(), error, client, 0); if (rc) { eos_info("rc=%i, errno=%i", rc, errno); } } else { // Check for additional opaque info to create remote IO object std::string sFstPath = fstPath.c_str(); std::string s3credentials = gOFS.Storage->GetFileSystemConfig(fsid, "s3credentials"); if (!s3credentials.empty()) { sFstPath += "?s3credentials=" + s3credentials; } std::unique_ptr io(eos::fst::FileIoPlugin::GetIoObject( sFstPath.c_str())); if (!io) { return gOFS.Emsg(epname, error, EINVAL, "open - no IO plug-in avaialble", sFstPath.c_str()); } // get the size before deletion io->fileStat(&sbd); rc = io->fileRemove(); } if (rc) { if (errno == ENOENT) { // Ignore error if a file to be deleted doesn't exist if (ignoreifnotexist) { rc = 0; } else { eos_notice("msg=\"file already deleted\" path=\%s\" fst_path=\"%s\" " "fsid=%lu fid=%", path, fstPath.c_str(), fsid, fid); } } } if (rc) { return gOFS.Emsg(epname, error, errno, "delete file", fstPath.c_str()); } else { // make a deletion report entry if (deletion_report) { // just return the report to the caller e.g. storage::Remover *deletion_report = MakeDeletionReport(fsid, fid, sbd, false); } else { // send the report via MQ MakeDeletionReport(fsid, fid, sbd, true); } } mFmdHandler->LocalDeleteFmd(fid, fsid); return SFS_OK; } //------------------------------------------------------------------------------ // Query file system information //------------------------------------------------------------------------------ int XrdFstOfs::fsctl(const int cmd, const char* args, XrdOucErrInfo& error, const XrdSecEntity* client) { static const char* epname = "fsctl"; const char* tident = error.getErrUser(); if ((cmd == SFS_FSCTL_LOCATE)) { char locResp[4096]; char rType[3], *Resp[] = {rType, locResp}; rType[0] = 'S'; rType[1] = 'r'; //(fstat.st_mode & S_IWUSR ? 'w' : 'r'); rType[2] = '\0'; sprintf(locResp, "[::%s:%d] ", mHostName, myPort); error.setErrInfo(strlen(locResp) + 3, (const char**) Resp, 2); ZTRACE(fsctl, "located at headnode: " << locResp); return SFS_DATA; } return gOFS.Emsg(epname, error, EPERM, "execute fsctl function", ""); } //------------------------------------------------------------------------------ // Function dealing with plugin calls //------------------------------------------------------------------------------ int XrdFstOfs::FSctl(const int cmd, XrdSfsFSctl& args, XrdOucErrInfo& error, const XrdSecEntity* client) { using eos::common::FileId; char ipath[16384]; char iopaque[16384]; static const char* epname = "FSctl"; const char* tident = error.getErrUser(); if ((cmd == SFS_FSCTL_LOCATE)) { char locResp[4096]; char rType[3], *Resp[] = {rType, locResp}; rType[0] = 'S'; rType[1] = 'r'; //(fstat.st_mode & S_IWUSR ? 'w' : 'r'); rType[2] = '\0'; sprintf(locResp, "[::%s:%d] ", mHostName, myPort); error.setErrInfo(strlen(locResp) + 3, (const char**) Resp, 2); ZTRACE(fsctl, "located at headnode: " << locResp); return SFS_DATA; } // Accept only plugin calls! if (cmd != SFS_FSCTL_PLUGIN) { return gOFS.Emsg(epname, error, EPERM, "execute non-plugin function", ""); } if (args.Arg1Len) { if (args.Arg1Len < 16384) { strncpy(ipath, args.Arg1, args.Arg1Len); ipath[args.Arg1Len] = 0; } else { return gOFS.Emsg(epname, error, EINVAL, "convert path argument - string too long", ""); } } else { ipath[0] = 0; } if (args.Arg2Len) { if (args.Arg2Len < 16384) { strncpy(iopaque, args.Arg2, args.Arg2Len); iopaque[args.Arg2Len] = 0; } else { return gOFS.Emsg(epname, error, EINVAL, "convert opaque argument - string too long", ""); } } else { iopaque[0] = 0; } // From here on we can deal with XrdOucString which is more 'comfortable' XrdOucString path = ipath; XrdOucString opaque = iopaque; XrdOucString result = ""; XrdOucEnv env(opaque.c_str()); eos_static_debug("msg=\"handle query\" tident=%s path=\"%s\" opaque=\"%s\" " "prot=\"%s\"", tident, path.c_str(), opaque.c_str(), client->prot); const char* scmd {nullptr}; if ((scmd = env.Get("fst.pcmd"))) { XrdOucString execmd = scmd; if (execmd == "debug") { return HandleDebug(env, error); } if (execmd == "resync") { return HandleResync(env, error); } if (execmd == "rtlog") { return HandleRtlog(env, error); } if (execmd == "verify") { return HandleVerify(env, error); } if (execmd == "drop") { return HandleDropFile(env, error); } if (execmd == "clean_orphans") { return HandleCleanOrphans(env, error); } if (execmd == "getfmd") { char* afid = env.Get("fst.getfmd.fid"); char* afsid = env.Get("fst.getfmd.fsid"); if ((!afid) || (!afsid)) { return Emsg(epname, error, EINVAL, "execute FSctl command", path.c_str()); } unsigned long long fileid = eos::common::FileId::Hex2Fid(afid); unsigned long fsid = atoi(afsid); auto fmd = mFmdHandler->LocalGetFmd(fileid, fsid, true); if (!fmd) { eos_static_err("msg=\"no FMD record found\" fxid=%08llx fsid=%lu", fileid, fsid); const char* err = "ERROR"; error.setErrInfo(strlen(err) + 1, err); return SFS_DATA; } auto fmdenv = fmd->FmdToEnv(); int envlen; XrdOucString fmdenvstring = fmdenv->Env(envlen); error.setErrInfo(fmdenvstring.length() + 1, fmdenvstring.c_str()); return SFS_DATA; } if (execmd == "getxattr") { char* key = env.Get("fst.getxattr.key"); char* path = env.Get("fst.getxattr.path"); if (!key) { eos_static_err("no key specified as attribute name"); const char* err = "ERROR"; error.setErrInfo(strlen(err) + 1, err); return SFS_DATA; } if (!path) { eos_static_err("no path specified to get the attribute from"); const char* err = "ERROR"; error.setErrInfo(strlen(err) + 1, err); return SFS_DATA; } char value[1024]; #ifdef __APPLE__ ssize_t attr_length = getxattr(path, key, value, sizeof(value), 0, 0); #else ssize_t attr_length = getxattr(path, key, value, sizeof(value)); #endif if (attr_length > 0) { value[1023] = 0; XrdOucString skey = key; XrdOucString attr = ""; if (skey == "user.eos.checksum") { // Checksum's are binary and need special reformatting (we swap the // byte order if they are 4 bytes long ) if (attr_length == 4) { for (ssize_t k = 0; k < 4; k++) { char hex[4]; snprintf(hex, sizeof(hex) - 1, "%02x", (unsigned char) value[3 - k]); attr += hex; } } else { for (ssize_t k = 0; k < attr_length; k++) { char hex[4]; snprintf(hex, sizeof(hex) - 1, "%02x", (unsigned char) value[k]); attr += hex; } } } else { attr = value; } error.setErrInfo(attr.length() + 1, attr.c_str()); return SFS_DATA; } else { eos_static_err("getxattr failed for path=%s", path); const char* err = "ERROR"; error.setErrInfo(strlen(err) + 1, err); return SFS_DATA; } } if (execmd == "local_rename") { if (strncmp(client->prot, "sss", 3) != 0) { eos_static_err("%s", "msg=\"only sss authenticated clients can trigger" " a local rename"); return gOFS.Emsg(epname, error, EPERM, "do local rename", "- needs sss authentication"); } std::string sfsid = (env.Get("fst.rename.fsid") ? env.Get("fst.rename.fsid") : ""); std::string sold_fid = (env.Get("fst.rename.ofid") ? env.Get("fst.rename.ofid") : ""); std::string snew_fid = (env.Get("fst.rename.nfid") ? env.Get("fst.rename.nfid") : ""); std::string ns_path = (env.Get("fst.nspath") ? env.Get("fst.nspath") : ""); unsigned long fsid {0ul}; eos::IFileMD::id_t old_fid {0ull}; eos::IFileMD::id_t new_fid {0ull}; try { fsid = std::stoul(sfsid); // File identifier are in hex! old_fid = std::stoull(sold_fid, 0, 16); new_fid = std::stoull(snew_fid, 0, 16); } catch (...) {} if (!fsid || !old_fid || !new_fid) { eos_static_err("msg=\"failed local rename, unexpected input \" " "fsid=\"%s\" old_fid=\"%s\" new_fid=\"%s\"", sfsid.c_str(), sold_fid.c_str(), snew_fid.c_str()); return gOFS.Emsg(epname, error, EINVAL, "do local rename", ""); } // Get the local mount point for the given file system id std::string fs_prefix; { eos::common::RWMutexReadLock lock(gOFS.Storage->mFsMutex); if (fsid && gOFS.Storage->mFsMap.count(fsid)) { fs_prefix = gOFS.Storage->mFsMap[fsid]->GetPath().c_str(); } } if (fs_prefix.empty()) { eos_static_err("msg=\"failed to get local prefix for file system\" " "fsid=%08llx", fsid); return gOFS.Emsg(epname, error, EINVAL, "do local rename", ""); } std::string old_path = FileId::FidPrefix2FullPath(FileId::Fid2Hex(old_fid).c_str(), fs_prefix.c_str()); std::string new_path = FileId::FidPrefix2FullPath(FileId::Fid2Hex(new_fid).c_str(), fs_prefix.c_str()); // Check that new path doesn't exist already struct stat info; if (::stat(new_path.c_str(), &info) == 0) { eos_static_err("msg=\"new path already exists on filesystem\" " "fsid=%08llx new_path=%s", fsid, new_path.c_str()); return gOFS.Emsg(epname, error, EEXIST, "do local rename", ""); } // Make sure the directory component of the new location exists mode_t mode = 0755; std::string dirs {new_path}; size_t pos = dirs.rfind('/'); if (pos != std::string::npos) { dirs.erase(dirs.rfind('/')); } if (!CreateDirHierarchy(dirs, mode)) { eos_static_err("msg=\"failed creating directory hierarchy\" " "fsid=%08llx new_path=%s", fsid, new_path.c_str()); return gOFS.Emsg(epname, error, EEXIST, "do local rename", ""); } if (::rename(old_path.c_str(), new_path.c_str())) { eos_static_err("msg=\"rename failed\" old_path=%s new_path=%s errno=%d", old_path.c_str(), new_path.c_str(), errno); return gOFS.Emsg(epname, error, EEXIST, "do local rename", ""); } else { // Update the filemd info to point to the origianl file identifier if (!mFmdHandler->UpdateFmd(new_path, new_fid)) { eos_static_err("msg=\"failed to update fid for the fmd object\" " "path=%s new_fid=%08llx", new_path.c_str(), new_fid); // Clean up the file on disk (void) unlink(new_path.c_str()); return gOFS.Emsg(epname, error, EEXIST, "do local rename", ""); } if (!ns_path.empty()) { // Update the user.eos.lfn attribute to point to the original // namespace file name if (setxattr(new_path.c_str(), "user.eos.lfn", ns_path.c_str(), ns_path.length(), 0)) { eos_static_warning("msg=\"failed to update the user.eos.lfn xattr\"" " local_path=\"%s\" ns_path=\"%s\"", new_path.c_str(), ns_path.c_str()); } // Rename any potential block xs files std::string old_xs_path = old_path + ".xsmap"; std::string new_xs_path = new_path + ".xsmap"; if (::stat(old_xs_path.c_str(), &info) == 0) { if (::rename(old_xs_path.c_str(), new_xs_path.c_str())) { eos_static_err("msg=\"block xs file rename failed\" " "old_xs_path=%s new_xs_path=%s errno=%d", old_xs_path.c_str(), new_xs_path.c_str(), errno); return gOFS.Emsg(epname, error, EEXIST, "do local rename", ""); } } } } const char* done = "OK"; error.setErrInfo(strlen(done) + 1, done); return SFS_DATA; } } return Emsg(epname, error, EINVAL, "execute FSctl command", path.c_str()); } //------------------------------------------------------------------------------ // Stall message for the client //------------------------------------------------------------------------------ int XrdFstOfs::Stall(XrdOucErrInfo& error, // Error text & code int stime, // Seconds to stall const char* msg) // Message to give { XrdOucString smessage = msg; smessage += "; come back in "; smessage += stime; smessage += " seconds!"; EPNAME("Stall"); const char* tident = error.getErrUser(); ZTRACE(delay, "Stall " << stime << ": " << smessage.c_str()); // Place the error message in the error object and return error.setErrInfo(0, smessage.c_str()); // All done return stime; } //------------------------------------------------------------------------------ // Redirect message for the client //------------------------------------------------------------------------------ int XrdFstOfs::Redirect(XrdOucErrInfo& error, // Error text & code const char* host, int& port) { EPNAME("Redirect"); const char* tident = error.getErrUser(); ZTRACE(delay, "Redirect " << host << ":" << port); // Place the error message in the error object and return error.setErrInfo(port, host); // All done return SFS_REDIRECT; } //------------------------------------------------------------------------------ // When getting queried for checksum at the diskserver redirect to the MGM //------------------------------------------------------------------------------ int XrdFstOfs::chksum(XrdSfsFileSystem::csFunc Func, const char* csName, const char* inpath, XrdOucErrInfo& error, const XrdSecEntity* client, const char* ininfo) { int ecode = 1094; XrdOucString RedirectManager; { XrdSysMutexHelper lock(gConfig.Mutex); RedirectManager = gConfig.Manager; } int pos = RedirectManager.find(":"); if (pos != STR_NPOS) { RedirectManager.erase(pos); } return gOFS.Redirect(error, RedirectManager.c_str(), ecode); } //------------------------------------------------------------------------------ // Wait for ongoing IO operations to finish //------------------------------------------------------------------------------ bool XrdFstOfs::WaitForOngoingIO(std::chrono::seconds timeout) { bool all_done = true; std::chrono::seconds check_interval(5); auto deadline = std::chrono::steady_clock::now() + timeout; while (std::chrono::steady_clock::now() <= deadline) { all_done = true; { XrdSysMutexHelper scope_lock(OpenFidMutex); all_done = ! openedForWriting.isAnyOpen(); if (all_done) { all_done = ! openedForReading.isAnyOpen(); } if (all_done) { break; } } std::this_thread::sleep_for(check_interval); } return all_done; } //------------------------------------------------------------------------------ // Report file deletion //------------------------------------------------------------------------------ std::string XrdFstOfs::MakeDeletionReport(eos::common::FileSystem::fsid_t fsid, unsigned long long fid, struct stat& deletion_stat, bool viamq) { struct timespec ts_now; char report[16384]; eos::common::Timing::GetTimeSpec(ts_now); snprintf(report, sizeof(report) - 1, "log=%s&" "host=%s&fid=%llu&fxid=%08llx&fsid=%u&" "del_ts=%lu&del_tns=%lu&" "dc_ts=%lu&dc_tns=%lu&" "dm_ts=%lu&dm_tns=%lu&" "da_ts=%lu&da_tns=%lu&" "dsize=%li&sec.app=deletion" , this->logId, gOFS.mHostName, fid, fid, fsid , ts_now.tv_sec, ts_now.tv_nsec #ifdef __APPLE__ , deletion_stat.st_ctimespec.tv_sec , deletion_stat.st_ctimespec.tv_nsec , deletion_stat.st_mtimespec.tv_sec , deletion_stat.st_mtimespec.tv_nsec , deletion_stat.st_atimespec.tv_sec , deletion_stat.st_atimespec.tv_nsec #else , deletion_stat.st_ctim.tv_sec , deletion_stat.st_ctim.tv_nsec , deletion_stat.st_mtim.tv_sec , deletion_stat.st_mtim.tv_nsec , deletion_stat.st_atim.tv_sec , deletion_stat.st_atim.tv_nsec #endif , deletion_stat.st_size); if (viamq) { XrdOucString reportString = report; gOFS.ReportQueueMutex.Lock(); gOFS.ReportQueue.push(reportString); gOFS.ReportQueueMutex.UnLock(); } return report; } //------------------------------------------------------------------------------ // Compute adler checksum of given keytab file //------------------------------------------------------------------------------ std::string XrdFstOfs::GetKeytabChecksum(const std::string& kt_path) const { std::string kt_cks = "unaccessible"; int fd = ::open(kt_path.c_str(), O_RDONLY); if (fd >= 0) { char buffer[65535]; size_t nread = ::read(fd, buffer, sizeof(buffer)); if (nread > 0) { std::unique_ptr KeyCKS = ChecksumPlugins::GetChecksumObject(eos::common::LayoutId::kAdler); if (KeyCKS) { KeyCKS->Add(buffer, nread, 0); kt_cks = KeyCKS->GetHexChecksum(); } } close(fd); } return kt_cks; } //------------------------------------------------------------------------------ // Request broadcasts from all the registered queues //------------------------------------------------------------------------------ void XrdFstOfs::RequestBroadcasts() { using eos::fst::Config; eos_notice("%s", "msg=\"requesting broadcasts\""); // Create a wildcard broadcast XrdMqSharedHash* hash = 0; // Create a node broadcast ObjectManager.CreateSharedHash(gConfig.FstConfigQueueWildcard.c_str(), gConfig.FstDefaultReceiverQueue.c_str()); { eos::common::RWMutexReadLock rd_lock(ObjectManager.HashMutex); hash = ObjectManager.GetHash(gConfig.FstConfigQueueWildcard.c_str()); while (!hash->BroadcastRequest( gConfig.FstDefaultReceiverQueue.c_str())) { eos_static_notice("msg=\"retry broadcast request in 1 second\" hash=\"%s\"", gConfig.FstConfigQueueWildcard.c_str()); std::this_thread::sleep_for(std::chrono::seconds(1)); } } // Create a filesystem broadcast ObjectManager.CreateSharedHash(gConfig.FstQueueWildcard.c_str(), gConfig.FstDefaultReceiverQueue.c_str()); { eos::common::RWMutexReadLock rd_lock(ObjectManager.HashMutex); hash = ObjectManager.GetHash(gConfig.FstQueueWildcard.c_str()); while (!hash->BroadcastRequest( gConfig.FstDefaultReceiverQueue.c_str())) { eos_static_notice("msg=\"retry broadcast request in 1 second\" hash=\"%s\"", gConfig.FstQueueWildcard.c_str()); std::this_thread::sleep_for(std::chrono::seconds(1)); } } } //---------------------------------------------------------------------------- // Update the TPC key min/max validity values, default [2, 15] min //---------------------------------------------------------------------------- void XrdFstOfs::UpdateTpcKeyValidity() { const char* ptr = getenv("EOS_FST_TPC_KEY_MIN_VALIDITY_SEC"); if (ptr && strlen(ptr)) { std::string_view str(ptr); unsigned int min_validity = 0ul; if (eos::common::StringToNumeric(str, min_validity)) { if (min_validity < 60) { min_validity = 60; } if (min_validity > 3600) { min_validity = 3600; } mTpcKeyMinValidity = std::chrono::seconds(min_validity); fprintf(stderr, "=====> Update TPC key min validity to %li seconds\n", mTpcKeyMinValidity.count()); } } ptr = getenv("EOS_FST_TPC_KEY_MAX_VALIDITY_SEC"); if (ptr && strlen(ptr)) { std::string_view str(ptr); unsigned int max_validity = 0ul; if (eos::common::StringToNumeric(str, max_validity)) { if (max_validity < mTpcKeyMinValidity.count()) { max_validity = mTpcKeyMinValidity.count(); } if (max_validity > std::chrono::seconds(3600).count()) { max_validity = 3600; } mTpcKeyMaxValidity = std::chrono::seconds(max_validity); fprintf(stderr, "=====> Update TPC key max validity to %li seconds\n", mTpcKeyMaxValidity.count()); } } if (mTpcKeyMaxValidity.count() < mTpcKeyMinValidity.count()) { mTpcKeyMaxValidity = mTpcKeyMinValidity; } } //------------------------------------------------------------------------------ // Create directory hierarchy //------------------------------------------------------------------------------ bool XrdFstOfs::CreateDirHierarchy(const std::string& dir_hierarchy, mode_t mode) const { struct stat info; std::string path = "/"; auto lst_dirs = eos::common::StringTokenizer::split> (dir_hierarchy, '/'); for (const auto& dir : lst_dirs) { path += dir; path += "/"; if (::stat(path.c_str(), &info) == 0) { continue; } if (::mkdir(path.c_str(), mode) != 0) { return false; } } return true; } //------------------------------------------------------------------------------ // Handle debug query //------------------------------------------------------------------------------ int XrdFstOfs::HandleDebug(XrdOucEnv& env, XrdOucErrInfo& err_obj) { std::string dbg_level = (env.Get("fst.debug.level") ? env.Get("fst.debug.level") : ""); std::string dbg_filter = (env.Get("fst.debug.filter") ? env.Get("fst.debug.filter") : ""); eos::common::Logging& g_logging = eos::common::Logging::GetInstance(); int dbg_val = g_logging.GetPriorityByString(dbg_level.c_str()); if (dbg_val < 0) { std::string msg = SSTR("unknown debug level <" << dbg_level << ">"); eos_err("msg=\"%s\"", msg.c_str()); err_obj.setErrInfo(EINVAL, msg.c_str()); return SFS_ERROR; } // We set the shared hash debug for the lowest 'debug' level if (dbg_level == "debug") { //ObjectManager.SetDebug(true); } else { ObjectManager.SetDebug(false); } g_logging.SetLogPriority(dbg_val); eos_notice("msg=\"setting debug level to <%s>\"", dbg_level.c_str()); if (dbg_filter.length()) { g_logging.SetFilter(dbg_filter.c_str()); eos_notice("setting message logid filter to <%s>", dbg_filter.c_str()); } // @todo(esindril) once xrootd bug regarding handling of SFS_OK response // in XrdXrootdXeq is fixed we can just return SFS_OK (>= XRootD 5) // return SFS_OK; const char* done = "OK"; err_obj.setErrInfo(strlen(done) + 1, done); return SFS_DATA; } //------------------------------------------------------------------------------ // Handle resync query //------------------------------------------------------------------------------ int XrdFstOfs::HandleResync(XrdOucEnv& env, XrdOucErrInfo& err_obj) { using eos::common::FileId; if ((env.Get("fst.resync.fsid") == nullptr) || (env.Get("fst.resync.fxid") == nullptr) || (env.Get("fst.resync.force") == nullptr)) { eos_static_err("%s", "msg=\"discard resync with missing arguments\""); err_obj.setErrInfo(EINVAL, "resync missing arguments"); return SFS_ERROR; } bool force {false}; FileId::fileid_t fid = FileId::Hex2Fid(env.Get("fst.resync.fxid")); FileSystem::fsid_t fsid = strtoul(env.Get("fst.resync.fsid"), 0, 10); char* ptr = env.Get("fst.resync.force"); if (ptr && (strncmp(ptr, "1", 1) == 0)) { force = true; } if ((ptr == nullptr) || (fsid == 0ul)) { eos_static_err("msg=\"resync with invalid args\" fsid=%lu fxid=%08llx", (unsigned long) fsid, fid); err_obj.setErrInfo(EINVAL, "resync with invalid args"); return SFS_ERROR; } if (!fid) { eos_static_warning("msg=\"deleting fmd\" fsid=%lu fxid=%08llx", fsid, fid); mFmdHandler->LocalDeleteFmd(fid, fsid); } else { auto fmd = mFmdHandler->LocalGetFmd(fid, fsid, true, force); if (fmd) { if (force) { eos_static_info("msg=\"force resync\" fxid=%08llx fsid=%lu", fid, fsid); std::string fpath = eos::common::FileId::FidPrefix2FullPath (eos::common::FileId::Fid2Hex(fid).c_str(), gOFS.Storage->GetStoragePath(fsid).c_str()); if (mFmdHandler->ResyncDisk(fpath.c_str(), fsid, false) == 0) { if (!mFmdHandler->ResyncMgm(fsid, fid, nullptr)) { eos_static_err("msg=\"resync mgm failed\" fid=%08llx fsid=%lu", fid, fsid); } } else { eos_static_err("msg=\"resync disk failed\" fid=%08llx fsid=%lu", fid, fsid); } } else { // Resync of meta data from the MGM by storing the FmdHelper in the // WrittenFilesQueue to have it done asynchronously gOFS.WrittenFilesQueueMutex.Lock(); gOFS.WrittenFilesQueue.push(*fmd.get()); gOFS.WrittenFilesQueueMutex.UnLock(); } } } // @todo(esindril) once xrootd bug regarding handling of SFS_OK response // in XrdXrootdXeq is fixed we can just return SFS_OK (>= XRootD 5) // return SFS_OK; const char* done = "OK"; err_obj.setErrInfo(strlen(done) + 1, done); return SFS_DATA; } //------------------------------------------------------------------------------ // Handle rtlog query //------------------------------------------------------------------------------ int XrdFstOfs::HandleRtlog(XrdOucEnv& env, XrdOucErrInfo& err_obj) { XrdOucString tag = env.Get("mgm.rtlog.tag"); XrdOucString lines = env.Get("mgm.rtlog.lines"); XrdOucString filter = env.Get("mgm.rtlog.filter"); XrdOucString response = ""; if (!filter.length()) { filter = " "; } if ((!lines.length()) || (!tag.length())) { eos_static_err("msg=\"rtlog illegal parameter\" lines=%s tag=%s", lines.c_str(), tag.c_str()); err_obj.setErrInfo(EINVAL, "rtlog illegal parameter"); return SFS_ERROR; } eos::common::Logging& g_logging = eos::common::Logging::GetInstance(); if (g_logging.GetPriorityByString(tag.c_str()) == -1) { eos_static_err("%s", "msg=\"unknown rtlog tag\""); err_obj.setErrInfo(EINVAL, "rtlog unknown tag"); return SFS_ERROR; } int logtagindex = g_logging.GetPriorityByString(tag.c_str()); for (int j = 0; j <= logtagindex; j++) { for (int i = 1; i <= atoi(lines.c_str()); i++) { g_logging.gMutex.Lock(); XrdOucString logline = g_logging.gLogMemory[j][ (g_logging.gLogCircularIndex[j] - i + g_logging.gCircularIndexSize) % g_logging.gCircularIndexSize].c_str(); g_logging.gMutex.UnLock(); if (logline.length() && ((logline.find(filter.c_str())) != STR_NPOS)) { response += logline; response += "\n"; } if (!logline.length()) { break; } } } // Use XrdOucBuffPool to manage XrdOucBuffer objects that can hold redirection // info >= 2kb but not bigger than MaxSize const uint32_t aligned_sz = eos::common::GetPowerCeil(response.length() + 1); XrdOucBuffer* buff = mXrdBuffPool.Alloc(aligned_sz); if (buff == nullptr) { eos_static_err("msg=\"requested rtlog result buffer too big\" req_sz=%llu " "max_sz=%i", response.length(), mXrdBuffPool.MaxSize()); err_obj.setErrInfo(ENOMEM, "rtlog result buffer too big"); return SFS_ERROR; } eos_static_debug("msg=\"rtlog reply\" data=\"%s\"", response.c_str()); (void) strcpy(buff->Buffer(), response.c_str()); buff->SetLen(response.length() + 1); err_obj.setErrInfo(buff->DataLen(), buff); return SFS_DATA; } //------------------------------------------------------------------------------ // Handle verify query //------------------------------------------------------------------------------ int XrdFstOfs::HandleVerify(XrdOucEnv& env, XrdOucErrInfo& err_obj) { int envlen = 0; eos_static_info("ms=\"verify opaque\" data=\%s\"", env.Env(envlen)); Verify* new_verify = Verify::Create(&env); if (new_verify) { Storage->PushVerification(new_verify); } else { eos_static_err("%s", "msg=\"failed verify, illegal opaque info\""); } // @todo(esindril) once xrootd bug regarding handling of SFS_OK response // in XrdXrootdXeq is fixed we can just return SFS_OK (>= XRootD 5) // return SFS_OK; const char* done = "OK"; err_obj.setErrInfo(strlen(done) + 1, done); return SFS_DATA; } //------------------------------------------------------------------------------ // Handle drop file query //------------------------------------------------------------------------------ int XrdFstOfs::HandleDropFile(XrdOucEnv& env, XrdOucErrInfo& err_obj) { int caprc = 0; XrdOucEnv* capOpaque {nullptr}; bool is_fsck = false; char* ptr = env.Get("fst.drop.type"); // Check if drop request comes from an fsck operation if (ptr && (strncmp(ptr, "fsck", 4) == 0)) { is_fsck = true; } if ((caprc = eos::common::SymKey::ExtractCapability(&env, capOpaque))) { eos_static_err("msg=\"extract capability failed for deletion\" errno=%d", caprc); return SFS_ERROR; } else { int envlen = 0; eos_static_debug("opaque=\"%s\"", capOpaque->Env(envlen)); std::unique_ptr new_del = Deletion::Create(capOpaque); if (new_del) { if (mEnvFsckDeleteByMove && is_fsck) { gOFS.Storage->DeleteByMove(std::move(new_del)); } else { gOFS.Storage->AddDeletion(std::move(new_del)); } } else { eos_static_err("%s", "msg=\"illegal drop opaque information\""); return SFS_ERROR; } } delete capOpaque; // @todo(esindril) once xrootd bug regarding handling of SFS_OK response // in XrdXrootdXeq is fixed we can just return SFS_OK (>= XRootD 5) // return SFS_OK; const char* done = "OK"; err_obj.setErrInfo(strlen(done) + 1, done); return SFS_DATA; } //------------------------------------------------------------------------------ // Handle clean orphans query //------------------------------------------------------------------------------ int XrdFstOfs::HandleCleanOrphans(XrdOucEnv& env, XrdOucErrInfo& err_obj) { const char* ptr = env.Get("fst.fsid"); eos::common::FileSystem::fsid_t fsid = 0ul; if (ptr) { std::string sfsid {ptr}; try { size_t pos = 0; fsid = std::stoul(sfsid, &pos); if (pos != sfsid.length()) { throw std::invalid_argument("fsid conversion failed"); } } catch (...) { err_obj.setErrInfo(EINVAL, "fsid is not numeric"); return SFS_ERROR; } } else { err_obj.setErrInfo(EINVAL, "query missing fst.fsid key "); return SFS_ERROR; } std::ostringstream err_msg; if (!Storage->CleanupOrphans(fsid, err_msg)) { err_obj.setErrInfo(EINVAL, err_msg.str().c_str()); return SFS_ERROR; } // @todo(esindril) once xrootd bug regarding handling of SFS_OK response // in XrdXrootdXeq is fixed we can just return SFS_OK (>= XRootD 5) // return SFS_OK; const char* done = "OK"; err_obj.setErrInfo(strlen(done) + 1, done); return SFS_DATA; } //------------------------------------------------------------------------------ // Query MGM for the deletion list //------------------------------------------------------------------------------ int XrdFstOfs::Query2Delete() { const std::string mgm_endpoint = gConfig.GetManager(); if (mgm_endpoint.empty()) { eos_static_err("%s", "msg=\"no MGM endpoint available\""); return SFS_ERROR; } std::ostringstream oss; oss << "root://" << mgm_endpoint << "//dummy?xrd.wantprot=sss"; XrdCl::URL url(oss.str()); if (!url.IsValid()) { eos_static_err("msg=\"invalid url\" url=\"%s\"", oss.str().c_str()); return SFS_ERROR; } std::string request = "/?mgm.pcmd=query2delete&mgm.target.nodename="; request += gConfig.FstQueue.c_str(); XrdCl::Buffer arg; XrdCl::Buffer* raw_resp {nullptr}; std::unique_ptr resp; int attempts = 5; do { XrdCl::FileSystem fs {url}; arg.FromString(request); XrdCl::XRootDStatus status = fs.Query(XrdCl::QueryCode::OpaqueFile, arg, raw_resp); resp.reset(raw_resp); raw_resp = nullptr; if (!status.IsOK()) { if (status.code == XrdCl::errSocketTimeout) { eos_static_info("%s", "msg=\"retry query2delete in 2 seconds, " "MGM not ready\""); std::this_thread::sleep_for(std::chrono::seconds(2)); --attempts; } else { eos_static_err("msg=\"failed query request\" request=\"%s\" " "status=\"%s\" errno=%u", request.c_str(), status.ToStr().c_str(), status.errNo); return SFS_ERROR; } } else { break; } } while (attempts); if (resp && resp->GetBuffer()) { eos::fst::DeletionsProto del_fst; if (!del_fst.ParseFromArray(resp->GetBuffer(), resp->GetSize())) { eos_static_err("%s", "msg=\"query2delete failed to parse protobuf\""); return SFS_ERROR; } // Submit deletions for (const auto& del : del_fst.fs()) { std::vector fids; fids.reserve(del.fids().size()); fids.insert(fids.cend(), del.fids().cbegin(), del.fids().cend()); try { gOFS.Storage->AddDeletion(std::make_unique (std::move(fids), del.fsid(), del.path().c_str())); } catch (const std::bad_alloc& e) { eos_static_err("%s", "msg=\"failed to alloc deletion object\""); continue; } } } return SFS_OK; } //------------------------------------------------------------------------------ // Set various XrdCl config options more appropriate for the EOS use-case but // still allow the env variables to override them. //------------------------------------------------------------------------------ void XrdFstOfs::SetXrdClConfig() { char* ptr {nullptr}; int env_value {0}; std::map map_settings { {"TimeoutResolution", 1}, {"ConnectionWindow", 5}, {"ConnectionRetry", 1}, {"StreamErrorWindow", 0}, {"MetalinkProcessing", 0}, {"ParallelEvtLoop", 8} }; for (auto& elem : map_settings) { std::string env_name = "XRD_" + elem.first; std::transform(env_name.begin(), env_name.end(), env_name.begin(), ::toupper); ptr = getenv(env_name.c_str()); env_value = elem.second; // Env variable overrides default values if (ptr) { try { env_value = std::stoi(std::string(ptr)); } catch (...) { eos_static_err("msg=\"invalid value for env var %s keeping the defaule\" " "default_value=1", env_value); } } XrdCl::DefaultEnv::GetEnv()->PutInt(elem.first, env_value); eos_static_info("msg=\"update xrootd client timeouts\" name=%s value=%i", elem.first.c_str(), env_value); } } //------------------------------------------------------------------------------ // Get Kernel relase information //------------------------------------------------------------------------------ std::string XrdFstOfs::GetKernelRelease() { std::string kernel_release; struct utsname buf; int rc = uname(&buf); if (!rc) { kernel_release = buf.release; } return kernel_release; } EOSFSTNAMESPACE_END