// ---------------------------------------------------------------------- // File: Publish.cc // Author: Andreas-Joachim Peters - CERN // ---------------------------------------------------------------------- /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2011 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "fst/storage/Storage.hh" #include "fst/XrdFstOfs.hh" #include "fst/Config.hh" #include "fst/storage/FileSystem.hh" #include "qclient/Formatting.hh" #include "common/LinuxStat.hh" #include "common/ShellCmd.hh" #include "common/Timing.hh" #include "common/StringTokenizer.hh" #include "common/StringUtils.hh" #include "common/IntervalStopwatch.hh" #include "common/SymKeys.hh" #include "XrdVersion.hh" XrdVERSIONINFOREF(XrdgetProtocol); EOSFSTNAMESPACE_BEGIN constexpr std::chrono::minutes Storage::sConsistencyTimeout; //------------------------------------------------------------------------------ // Open random temporary file in /tmp/ // // @return string temporary file path, if open failed return empty string //------------------------------------------------------------------------------ std::string MakeTemporaryFile() { char tmp_name[] = "/tmp/fst.publish.XXXXXX"; int tmp_fd = mkstemp(tmp_name); if (tmp_fd == -1) { eos_static_crit("%s", "msg=\"failed to create temporary file!\""); return ""; } (void) close(tmp_fd); return tmp_name; } //------------------------------------------------------------------------------ // Serialize hot files vector into std::string // Return " " if given an empty vector, instead of "". // // This is to keep the entry in the hash, even if no opened files exist. //------------------------------------------------------------------------------ static std::string HotFilesToString( const std::vector& entries) { if (entries.size() == 0u) { return " "; } std::ostringstream ss; for (size_t i = 0; i < entries.size(); i++) { ss << entries[i].uses; ss << ":"; ss << eos::common::FileId::Fid2Hex(entries[i].fid); ss << " "; } return ss.str(); } //------------------------------------------------------------------------------ // Retrieve net speed //------------------------------------------------------------------------------ static uint64_t GetNetSpeed(const std::string& tmpname) { if (getenv("EOS_FST_NETWORK_SPEED")) { return strtoull(getenv("EOS_FST_NETWORK_SPEED"), nullptr, 10); } std::string getNetspeedCommand = SSTR( "ip route list | sed -ne '/^default/s/.*dev //p' | cut -d ' ' -f1 |" " xargs -i ethtool {} 2>&1 | grep Speed | cut -d ' ' -f2 | cut -d 'M' -f1 > " << tmpname); eos::common::ShellCmd scmd1(getNetspeedCommand.c_str()); eos::common::cmd_status rc = scmd1.wait(5); unsigned long long netspeed = 1000000000; if (rc.exit_code) { eos_static_err("%s", "msg=\"ip route list call failed to get netspeed\""); return netspeed; } FILE* fnetspeed = fopen(tmpname.c_str(), "r"); if (fnetspeed) { if ((fscanf(fnetspeed, "%llu", &netspeed)) == 1) { // we get MB as a number => convert into bytes netspeed *= 1000000; eos_static_info("ethtool:networkspeed=%.02f GB/s", 1.0 * netspeed / 1000000000.0); } fclose(fnetspeed); } return netspeed; } //------------------------------------------------------------------------------ // Retrieve uptime //------------------------------------------------------------------------------ static std::string GetUptime(const std::string& tmpname) { eos::common::ShellCmd cmd(SSTR("uptime | tr -d \"\n\"| tr -s \" \" > " << tmpname)); eos::common::cmd_status rc = cmd.wait(5); if (rc.exit_code) { eos_static_err("%s", "msg=\"retrieve uptime call failed\""); return "N/A"; } std::string retval; eos::common::StringConversion::LoadFileIntoString(tmpname.c_str(), retval); return retval; } //------------------------------------------------------------------------------ // Get uptime information in a more pretty format //------------------------------------------------------------------------------ static std::map GetUptimePrettyFormat(const std::string tmpname) { using eos::common::StringTokenizer; std::map map_info; { // Get uptime information in seconds eos::common::ShellCmd cmd(SSTR("cat /proc/uptime | sed 's/\\..*//' > " << tmpname)); eos::common::cmd_status rc = cmd.wait(5); if (rc.exit_code) { eos_static_err("%s", "msg=\"failed to retrieve uptime\""); return map_info; } std::string uptime_sec; eos::common::StringConversion::LoadFileIntoString(tmpname.c_str(), uptime_sec); eos::common::trim(uptime_sec); map_info["sys.stat.uptime_sec"] = uptime_sec; } { // Get load information eos::common::ShellCmd cmd(SSTR("cat /proc/loadavg > " << tmpname)); eos::common::cmd_status rc = cmd.wait(2); if (rc.exit_code) { eos_static_err("%s", "msg=\"failed to retrieve loadavg\""); return map_info; } std::string load_info; eos::common::StringConversion::LoadFileIntoString(tmpname.c_str(), load_info); std::list load_tokens = StringTokenizer::split>(load_info, ' '); if (load_tokens.size() >= 3) { map_info["sys.stat.load_avg_1m"] = load_tokens.front(); load_tokens.pop_front(); map_info["sys.stat.load_avg_5m"] = load_tokens.front(); load_tokens.pop_front(); map_info["sys.stat.load_avg_15m"] = load_tokens.front(); load_tokens.pop_front(); } } return map_info; } //------------------------------------------------------------------------------ // Retrieve xrootd version //------------------------------------------------------------------------------ static std::string GetXrootdVersion() { XrdOucString v = XrdVERSIONINFOVAR(XrdgetProtocol).vStr; int pos = v.find(" "); if (pos != STR_NPOS) { v.erasefromstart(pos + 1); } return v.c_str(); } //------------------------------------------------------------------------------ // Retrieve eos version //------------------------------------------------------------------------------ static std::string GetEosVersion() { return SSTR(VERSION << "-" << RELEASE); } //------------------------------------------------------------------------------ // Retrieve FST network interface //------------------------------------------------------------------------------ static std::string GetNetworkInterface() { if (getenv("EOS_FST_NETWORK_INTERFACE")) { return getenv("EOS_FST_NETWORK_INTERFACE"); } return "eth0"; } //------------------------------------------------------------------------------ // Retrieve number of TCP sockets in the system //------------------------------------------------------------------------------ static std::string GetNumOfTcpSockets(const std::string& tmpname) { std::string command = SSTR("cat /proc/net/tcp | wc -l | tr -d \"\n\" > " << tmpname); eos::common::ShellCmd cmd(command.c_str()); eos::common::cmd_status rc = cmd.wait(5); if (rc.exit_code) { eos_static_err("%s", "msg=\"retrieve #socket call failed\""); } std::string retval; eos::common::StringConversion::LoadFileIntoString(tmpname.c_str(), retval); return retval; } //------------------------------------------------------------------------------ // Get size of subtree by using the system "du -sb" command //------------------------------------------------------------------------------ static std::string GetSubtreeSize(const std::string& path) { const std::string tmp_name = MakeTemporaryFile(); const std::string command = SSTR("du -sb " << path << " | cut -f1 > " << tmp_name); eos::common::ShellCmd cmd(command.c_str()); eos::common::cmd_status rc = cmd.wait(5); if (rc.exit_code) { eos_static_err("msg=\"failed to compute subtree size\" path=%s", path.c_str()); } std::string retval; eos::common::StringConversion::LoadFileIntoString(tmp_name.c_str(), retval); (void) unlink(tmp_name.c_str()); return retval; } //------------------------------------------------------------------------------ // Overwrite statfs statistics for testing environment //------------------------------------------------------------------------------ static void OverwriteTestingStatfs(const std::string& path, std::map& output) { const char* ptr = getenv("EOS_FST_TESTING"); if (ptr == nullptr) { return; } eos_static_info("msg=\"overwrite statfs values\" path=%s", path.c_str()); uint64_t subtree_max_size = 10ull * 1024 * 1024 * 1024; // 10GB ptr = getenv("EOS_FST_SUBTREE_MAX_SIZE"); if (ptr) { if (!eos::common::StringToNumeric(std::string(ptr), subtree_max_size, subtree_max_size)) { eos_static_err("msg=\"failed convertion\" data=\"%s\"", ptr); } } uint64_t bsize = 4096; (void) eos::common::StringToNumeric(output["stat.statfs.bsize"], bsize, bsize); uint64_t used_bytes {0ull}; const std::string sused_bytes = GetSubtreeSize(path); (void) eos::common::StringToNumeric(sused_bytes, used_bytes); double filled = 100.0 - ((double) 100.0 * (subtree_max_size - used_bytes) / subtree_max_size); output["stat.statfs.filled"] = std::to_string(filled); output["stat.statfs.usedbytes"] = std::to_string(used_bytes); output["stat.statfs.freebytes"] = std::to_string(subtree_max_size - used_bytes); output["stat.statfs.capacity"] = std::to_string(subtree_max_size); } //------------------------------------------------------------------------------ // Get statistics about this FST, used for publishing //------------------------------------------------------------------------------ std::map Storage::GetFstStatistics(const std::string& tmpfile, unsigned long long netspeed) { eos::common::LinuxStat::linux_stat_t osstat; if (!eos::common::LinuxStat::GetStat(osstat)) { eos_crit("failed to get the memory usage information"); } std::map output; // Kernel version output["stat.sys.kernel"] = gConfig.KernelVersion.c_str(); // Virtual memory size output["stat.sys.vsize"] = SSTR(osstat.vsize); // rss usage output["stat.sys.rss"] = SSTR(osstat.rss); // number of active threads on this machine output["stat.sys.threads"] = SSTR(osstat.threads); // eos version output["stat.sys.eos.version"] = GetEosVersion(); // xrootd version output["stat.sys.xrootd.version"] = GetXrootdVersion(); // adler32 of keytab output["stat.sys.keytab"] = gConfig.KeyTabAdler.c_str(); // machine uptime output["stat.sys.uptime"] = GetUptime(tmpfile); auto uptime_info = GetUptimePrettyFormat(tmpfile); output.insert(uptime_info.begin(), uptime_info.end()); // active TCP sockets output["stat.sys.sockets"] = GetNumOfTcpSockets(tmpfile); // startup time of the FST daemon output["stat.sys.eos.start"] = gConfig.StartDate.c_str(); // FST geotag output["stat.geotag"] = gOFS.GetGeoTag(); // http port output["http.port"] = SSTR(gOFS.mHttpdPort); // debug level eos::common::Logging& g_logging = eos::common::Logging::GetInstance(); output["debug.state"] = eos::common::StringConversion::ToLower (g_logging.GetPriorityString (g_logging.gPriorityLevel)).c_str(); // net info output["stat.net.ethratemib"] = SSTR(netspeed / (8 * 1024 * 1024)); output["stat.net.inratemib"] = SSTR( mFstLoad.GetNetRate(GetNetworkInterface().c_str(), "rxbytes") / 1024.0 / 1024.0); output["stat.net.outratemib"] = SSTR( mFstLoad.GetNetRate(GetNetworkInterface().c_str(), "txbytes") / 1024.0 / 1024.0); // publish timestamp output["stat.publishtimestamp"] = SSTR( eos::common::getEpochInMilliseconds().count()); return output; } //------------------------------------------------------------------------------ // Insert statfs info into the map //------------------------------------------------------------------------------ static void InsertStatfs(struct statfs* statfs, std::map& output) { output["stat.statfs.type"] = std::to_string(statfs->f_type); output["stat.statfs.bsize"] = std::to_string(statfs->f_bsize); output["stat.statfs.blocks"] = std::to_string(statfs->f_blocks); output["stat.statfs.bfree"] = std::to_string(statfs->f_bfree); output["stat.statfs.bavail"] = std::to_string(statfs->f_bavail); output["stat.statfs.files"] = std::to_string(statfs->f_files); output["stat.statfs.ffree"] = std::to_string(statfs->f_ffree); #ifdef __APPLE__ output["stat.statfs.namelen"] = std::to_string(MNAMELEN); #else output["stat.statfs.namelen"] = std::to_string(statfs->f_namelen); #endif output["stat.statfs.freebytes"] = std::to_string(statfs->f_bfree * statfs->f_bsize); output["stat.statfs.usedbytes"] = std::to_string((statfs->f_blocks - statfs->f_bfree) * statfs->f_bsize); output["stat.statfs.filled"] = std::to_string( (double) 100.0 * ((double)(statfs->f_blocks - statfs->f_bfree) / (double)( 1 + statfs->f_blocks))); output["stat.statfs.capacity"] = std::to_string(statfs->f_blocks * statfs->f_bsize); output["stat.statfs.fused"] = std::to_string(statfs->f_files - statfs->f_ffree); } //------------------------------------------------------------------------------ // Get statistics about this FileSystem, used for publishing //------------------------------------------------------------------------------ std::map Storage::GetFsStatistics(FileSystem* fs) { if (!fs) { eos_static_crit("asked to publish statistics for a null filesystem"); return {}; } eos::common::FileSystem::fsid_t fsid = fs->GetLocalId(); if (!fsid) { // during the boot phase we can find a filesystem without ID eos_static_warning("asked to publish statistics for filesystem with fsid=0"); return {}; } std::map output; // Publish statfs std::unique_ptr statfs = fs->GetStatfs(); if (statfs) { InsertStatfs(statfs->GetStatfs(), output); OverwriteTestingStatfs(fs->GetPath(), output); } // Publish stat.disk.* double readratemb; double writeratemb; double diskload; std::map iostats; if (fs->GetFileIOStats(iostats)) { readratemb = strtod(iostats["read-mb-second"].c_str(), 0); writeratemb = strtod(iostats["write-mb-second"].c_str(), 0); diskload = strtod(iostats["load"].c_str(), 0); } else { readratemb = mFstLoad.GetDiskRate(fs->GetPath().c_str(), "readSectors") * 512.0 / 1000000.0; writeratemb = mFstLoad.GetDiskRate(fs->GetPath().c_str(), "writeSectors") * 512.0 / 1000000.0; diskload = mFstLoad.GetDiskRate(fs->GetPath().c_str(), "millisIO") / 1000.0; } output["stat.disk.readratemb"] = std::to_string(readratemb); output["stat.disk.writeratemb"] = std::to_string(writeratemb); output["stat.disk.load"] = std::to_string(diskload); // Publish stat.health.* std::map health; if (!fs->GetHealthInfo(health)) { health = mFstHealth.getDiskHealth(fs->GetPath()); } output["stat.health"] = (health.count("summary") ? health["summary"].c_str() : "N/A"); // set some reasonable defaults if information is not available output["stat.health.indicator"] = (health.count("indicator") ? health["indicator"] : "N/A"); output["stat.health.drives_total"] = (health.count("drives_total") ? health["drives_total"] : "1"); output["stat.health.drives_failed"] = (health.count("drives_failed") ? health["drives_failed"] : "0"); output["stat.health.redundancy_factor"] = (health.count("redundancy_factor") ? health["redundancy_factor"] : "1"); { // don't publish smart info too often, it is few kb per filesystem! time_t now = time(NULL); static std::map smartPublishing; static XrdSysMutex smartPublishingMutex; bool publish = false; { XrdSysMutexHelper scope_lock(smartPublishingMutex); if (!smartPublishing[fs] || (smartPublishing[fs] < now)) { smartPublishing[fs] = now + 3600; publish = true; } } if (publish) { // compress the json smart info eos::common::SymKey::ZBase64(health["attributes"], output["stat.health.z64smart"]); } } // Publish generic statistics, related to free space and current load long long r_open = (long long) gOFS.openedForReading.getOpenOnFilesystem(fsid); long long w_open = (long long) gOFS.openedForWriting.getOpenOnFilesystem(fsid); output["stat.ropen"] = std::to_string(r_open); output["stat.wopen"] = std::to_string(w_open); if (auto kv = output.find("stat.statfs.fused"); kv != output.end()) { // FIXME: Actually subtract the statfs of the .eosorphans, also count // checksums & scrub files! output["stat.usedfiles"] = kv->second; } output["stat.boot"] = fs->GetStatusAsString(fs->GetStatus()); output["stat.geotag"] = gOFS.GetGeoTag(); output["stat.publishtimestamp"] = std::to_string( eos::common::getEpochInMilliseconds().count()); output["stat.disk.iops"] = std::to_string(fs->getIOPS()); output["stat.disk.bw"] = std::to_string(fs->getSeqBandwidth()); // in MB output["stat.http.port"] = std::to_string(gOFS.mHttpdPort); // FST alias if (gConfig.HostAlias.length()) { output["stat.alias.host"] = gConfig.HostAlias.c_str(); } // FST port alias if (gConfig.PortAlias.length()) { output["stat.alias.port"] = gConfig.PortAlias.c_str(); } // debug level output["stat.ropen.hotfiles"] = HotFilesToString( gOFS.openedForReading.getHotFiles(fsid, 10)); output["stat.wopen.hotfiles"] = HotFilesToString( gOFS.openedForWriting.getHotFiles(fsid, 10)); return output; } //------------------------------------------------------------------------------ // Publish statistics about the given filesystem //------------------------------------------------------------------------------ bool Storage::PublishFsStatistics(FileSystem* fs) { if (!fs) { eos_static_crit("%s", "msg=\"asked to publish statistics for a null fs\""); return false; } eos::common::FileSystem::fsid_t fsid = fs->GetLocalId(); if (!fsid) { // during the boot phase we can find a filesystem without ID eos_static_warning("%s", "msg=\"asked to publish statistics for fsid=0\""); return false; } common::FileSystemUpdateBatch batch; std::map fsStats = GetFsStatistics(fs); for (auto it = fsStats.begin(); it != fsStats.end(); it++) { batch.setStringTransient(it->first, it->second); } CheckFilesystemFullness(fs, fsid); return fs->applyBatch(batch); } //------------------------------------------------------------------------------ // Publish //------------------------------------------------------------------------------ void Storage::Publish(ThreadAssistant& assistant) noexcept { eos_static_info("%s", "msg=\"publisher activated\""); std::string tmp_name = MakeTemporaryFile(); if (tmp_name.empty()) { return; } unsigned long long netspeed = GetNetSpeed(tmp_name); eos_static_info("msg=\"publish networkspeed=%.02f GB/s\"", 1.0 * netspeed / 1000000000.0); // The following line acts as a barrier that prevents progress // until the config queue becomes known gConfig.getFstNodeConfigQueue("Publish"); while (!assistant.terminationRequested()) { std::chrono::milliseconds randomizedReportInterval = gConfig.getRandomizedPublishInterval(); common::IntervalStopwatch stopwatch(randomizedReportInterval); { // Publish with a MuxTransaction all file system changes eos::common::RWMutexReadLock fs_rd_lock(mFsMutex); if (!gOFS.ObjectManager.OpenMuxTransaction()) { eos_static_err("%s", "msg=\"cannot open mux transaction\""); } else { std::map> map_futures; // Copy out statfs info in parallel to speed-up things for (const auto& elem : mFsMap) { auto fs = elem.second; if (!fs) { continue; } try { map_futures.emplace(fs, std::async(std::launch::async, &Storage::PublishFsStatistics, this, fs)); } catch (const std::system_error& e) { eos_static_err("msg=\"exception while collecting fs statistics\" " "fsid=%lu msg=\"%s\"", elem.first, e.what()); } } for (auto& elem : map_futures) { if (elem.second.get() == false) { eos_static_err("msg=\"failed to publish fs stats\" fspath=%s", elem.first->GetPath().c_str()); } } auto fstStats = GetFstStatistics(tmp_name, netspeed); // Set node status values common::SharedHashLocator locator = gConfig.getNodeHashLocator("Publish"); if (!locator.empty()) { mq::SharedHashWrapper::Batch batch; for (auto it = fstStats.begin(); it != fstStats.end(); it++) { batch.SetTransient(it->first, it->second); } mq::SharedHashWrapper hash(gOFS.mMessagingRealm.get(), locator, true, false); hash.set(batch); } gOFS.ObjectManager.CloseMuxTransaction(); } } std::chrono::milliseconds sleepTime = stopwatch.timeRemainingInCycle(); if (sleepTime == std::chrono::milliseconds(0)) { eos_static_warning("msg=\"publisher cycle exceeded %d millisec - took %d " "millisec", randomizedReportInterval.count(), stopwatch.timeIntoCycle()); } else { assistant.wait_for(sleepTime); } } (void) unlink(tmp_name.c_str()); } EOSFSTNAMESPACE_END