//------------------------------------------------------------------------------
// File: Iostat.cc
// Authors: Andreas-Joachim Peters - CERN
// Elvin Alin Sindrilaru - CERN
// Jaroslav Guenther - CERN
//
// Implementation follows presentation from EOS Workshop in 2022
// https://indico.cern.ch/event/1103358/contributions/4758312/attachments/2402845/4109660/EOS_IO_stat_monitoring.pdf
//------------------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2021 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "common/table_formatter/TableFormatterBase.hh"
#include "common/Report.hh"
#include "common/Path.hh"
#include "common/JeMallocHandler.hh"
#include "common/Logging.hh"
#include "common/Timing.hh"
#include "common/StringTokenizer.hh"
#include "common/StringUtils.hh"
#include "mgm/Iostat.hh"
#include "mgm/XrdMgmOfs.hh"
#include "mgm/IMaster.hh"
#include "mq/MessagingRealm.hh"
#include "namespace/interface/IView.hh"
#include "namespace/ns_quarkdb/QdbContactDetails.hh"
#include "namespace/ns_quarkdb/flusher/MetadataFlusher.hh"
#include "namespace/ns_quarkdb/qclient/include/qclient/ResponseParsing.hh"
#include "namespace/Prefetcher.hh"
#include "mq/ReportListener.hh"
#include "XrdNet/XrdNetUtils.hh"
#include "XrdNet/XrdNetAddr.hh"
#include
EOSMGMNAMESPACE_BEGIN
const char* Iostat::gIostatCollect = "iostat::collect";
const char* Iostat::gIostatReport = "iostat::report";
const char* Iostat::gIostatReportNamespace = "iostat::reportnamespace";
const char* Iostat::gIostatPopularity = "iostat::popularity";
const char* Iostat::gIostatUdpTargetList = "iostat::udptargets";
FILE* Iostat::gOpenReportFD = 0;
Period LAST_DAY = Period::DAY;
Period LAST_HOUR = Period::HOUR;
Period LAST_5MIN = Period::FIVEMIN;
Period LAST_1MIN = Period::ONEMIN;
PercentComplete P90 = PercentComplete::p90;
PercentComplete P95 = PercentComplete::p95;
PercentComplete P99 = PercentComplete::p99;
PercentComplete ALL = PercentComplete::p100;
//------------------------------------------------------------------------------
// IostatPeriods implementation
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
// Adds transfer data on a 24h timeline of [mDataBuffer]
// Provides:
// - [mLongestTransferTime] in last 24h
// - populates [mIntegralBuffer] and every 5 min extracts the
// time of transfer for > 90/95/100 percent of data
// - [mDataBuffer] circular buffer for all transfers in the last 24h
//------------------------------------------------------------------------------
void
IostatPeriods::Add(unsigned long long val, time_t start, time_t stop,
time_t now)
{
mTotal += val;
double value = (double)val;
// Window start/end times are "|", bin start [ and end ] times
// period window = |-----------[--]----------|
if (stop > now) {
eos_static_err("%s", "msg=\"failed report digest, transfer "
"stop time in the future\"");
return;
}
time_t t_window_start = now - (sBins * sBinWidth);
if (stop <= t_window_start) {
eos_static_warning("%s", "msg=\"failed report digest, transfer stopped "
"outside of collection time window\"");
return;
}
time_t tdiff = stop - start + 1;
if (tdiff < 1) {
eos_static_err("%s", "msg=\"transfer start time after stop time\"");
return;
}
StampBufferZero(now);
mTfCount += 1;
if (stop > mLastAddTime) {
mLastAddTime = stop;
}
if (mLongestTransferTime < (unsigned int)tdiff) {
mLongestTransferTime = tdiff;
}
time_t trep = now - stop;
if (mLongestReportTime < (unsigned int)(trep)) {
mLongestReportTime = trep;
}
// cutting off data out of time window
if (start < t_window_start) {
// re-calculate data portion to save into our time window
value = ((stop - t_window_start) * value) / tdiff;
start = t_window_start;
tdiff = stop - start;
}
// Number of bins the measurement hits
size_t mbins = tdiff / sBinWidth;
double val_per_bin = value / mbins;
int index_start = (start / sBinWidth) % sBins;
for (size_t ibin = 0; ibin < mbins; ++ibin) {
int bin_index = (index_start + ibin) % sBins;
// Code block to be added and tested in case sBinWidth !=1
// double ival = val_per_bin;
// if (ibin == 0 and mbins > 1):
// time_t t_start_bin_duration = (sBins * sBinWidth) - (start - t_window_start) - sBinWidth * (mbins - 1)
// ival = (t_start_bin_duration * value) / tdiff;
// if (ibin == mbins - 1 and mbins > 1):
// time_t t_stop_bin_duration = (sBins * sBinWidth) - (now - stop) - sBinWidth * (mbins - 1)
// ival = (t_stop_bin_duration * value) / tdiff;
// mDataBuffer[bin_index] += ival;
// mIntegralBuffer[ibin] += ival;
mDataBuffer[bin_index] += val_per_bin;
mIntegralBuffer[ibin] += val_per_bin;
}
}
//------------------------------------------------------------------------------
// Update Transfer Buffer to iterate over and calculate how long does it take
// to transfer [mPercComplete] % of the data within sample rate of 5 min
// [mLastTfSampleUpdateInterval]
//------------------------------------------------------------------------------
void
IostatPeriods::UpdateTransferSampleInfo(time_t now)
{
// Sum data of all transfers
double sumTx = 0.;
// Update rating (% of transfers)
for (size_t i = 0; i < sBins; ++i) {
if (mIntegralBuffer[i]) {
sumTx += mIntegralBuffer[i];
} else {
break;
}
}
// Reset counters for current sample
mTfCountInSample = 0ull;
mAvgTfSize = 0ull;
//std::cout << "sumTx" << sumTx << std::endl;
if (sumTx > 0) {
mTfCountInSample = mTfCount;
mAvgTfSize = std::ceil((double)sumTx / mTfCountInSample);
const double multiplier = std::pow(10.0, 6);
// integrate up to [mPercComplete] and record
// the time the transfers took in [mDurationToPercComplete]
for (size_t iperc = 0; iperc < std::size(mPercComplete); ++iperc) {
double sum_percent = 0;
for (size_t ibin = 0; ibin < sBins; ibin++) {
sum_percent += mIntegralBuffer[ibin] / sumTx;
if ((unsigned int)std::ceil(sum_percent * multiplier) >=
(unsigned int)std::ceil(mPercComplete[iperc] * multiplier)) {
mDurationToPercComplete[iperc] = ((ibin + 1) * sBinWidth);
break;
}
}
}
} else {
for (size_t iperc = 0; iperc < std::size(mPercComplete); ++iperc) {
mDurationToPercComplete[iperc] = 0;
}
}
mTfCount = 0;
mLongestTransferTimeInSample = mLongestTransferTime;
mLongestReportTimeInSample = mLongestReportTime;
mLongestTransferTime = 0;
mLongestReportTime = 0;
memset(mIntegralBuffer, 0, sizeof(mIntegralBuffer));
mLastTfMaxLenUpdateTime = now;
}
//------------------------------------------------------------------------------
// Reset bin content of the buffer w.r.t. given timstamp
//------------------------------------------------------------------------------
void
IostatPeriods::StampBufferZero(time_t& now)
{
// Clean-up all bins which are older than sPeriod (24h)
// last_end_index is the index of the timestamp corresponding
// to the last transfer stop time recorded
if ((now - mLastTfMaxLenUpdateTime) > mLastTfSampleUpdateInterval) {
UpdateTransferSampleInfo(now);
}
time_t last_upd_time = std::max(mLastAddTime, mLastStampZeroTime);
int zero_bins = 0;
if (now - last_upd_time >= sPeriod) {
zero_bins = sBins;
} else {
if (last_upd_time < now) {
zero_bins = (now - last_upd_time) / sBinWidth;
} else {
if ((last_upd_time == mLastStampZeroTime) &&
(last_upd_time != mLastAddTime)) {
zero_bins = 1;
}
}
}
int start_index = (last_upd_time / sBinWidth) % sBins;
if (last_upd_time != now) {
start_index = (start_index + 1) % sBins;
}
for (int i = 0; i < zero_bins; ++i) {
int index = (start_index + i) % sBins;
mDataBuffer[index] = 0.;
}
mLastStampZeroTime = now;
}
//------------------------------------------------------------------------------
// Getting the timestamp of the last time the transfer sample was taken
//------------------------------------------------------------------------------
std::string IostatPeriods::GetLastSampleUpdateTimestamp(bool date_format) const
{
std::string ts;
if (date_format) {
ts = common::Timing::ltime(mLastTfMaxLenUpdateTime);
} else {
ts = std::to_string(mLastTfMaxLenUpdateTime);
}
eos::common::trim(ts);
return ts;
}
//------------------------------------------------------------------------------
// Get the sum of values for the given buffer period
//------------------------------------------------------------------------------
unsigned long long
IostatPeriods::GetDataInPeriod(size_t period, unsigned long long time_offset,
time_t now) const
{
double sum = 0.;
if (time_offset > sPeriod) {
time_offset = sPeriod;
}
if (time_offset + period > sPeriod) {
period = sPeriod - time_offset;
}
size_t start_index = ((now - time_offset - period) / sBinWidth) % sBins;
size_t stop_index = ((now - time_offset) / sBinWidth) % sBins;
int range = stop_index - start_index ;
if (period >= sPeriod) {
range = sBins;
start_index = 0;
}
if (range < 0) {
range = sBins + range;
}
for (int pidx_cnt = 0; pidx_cnt < range; ++pidx_cnt) {
int idx = (start_index + pidx_cnt) % sBins;
sum += mDataBuffer[idx];
}
return std::ceil(sum);
}
//------------------------------------------------------------------------------
// Iostat implementation
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
// Iostat constructor
//------------------------------------------------------------------------------
Iostat::Iostat():
mDoneInit(false), mFlusher(nullptr), mLegacyMode(false), mRunning(false),
mQcl(nullptr), mReport(true), mReportNamespace(false), mReportPopularity(true),
mHashKeyBase("")
{
for (size_t i = 0; i < IOSTAT_POPULARITY_HISTORY_DAYS; i++) {
IostatPopularity[i].set_deleted_key("");
IostatPopularity[i].resize(100000);
}
mLastPopularityBin = 9999999;
}
//------------------------------------------------------------------------------
// Destructor
//------------------------------------------------------------------------------
Iostat::~Iostat()
{
(void) StopCollection();
mCirculateThread.join();
}
//------------------------------------------------------------------------------
// Get hash key under which info is stored in QDB. This also included the
// current year and it's cached for ~5 minutes.
//------------------------------------------------------------------------------
std::string
Iostat::GetHashKey() const
{
using namespace std::chrono;
static std::string key;
static seconds cache_interval {300};
static auto ts = steady_clock::now();
if (key.empty() ||
(duration_cast(steady_clock::now() - ts) > cache_interval)) {
key = mHashKeyBase + eos::common::Timing::GetCurrentYear();
ts = steady_clock::now();
}
return key;
}
//------------------------------------------------------------------------------
// Perform object initialization
//------------------------------------------------------------------------------
bool
Iostat::Init(const std::string& instance_name, int port,
const std::string& legacy_file)
{
mHashKeyBase = SSTR("eos-iostat:" << instance_name << ":");
mFlusherPath = SSTR(gOFS->mQClientDir << instance_name << ":" << port
<< "_iostat");
if (gOFS) {
// QDB namespace, initialize qclient
if (!gOFS->namespaceGroup->isInMemory()) {
const eos::QdbContactDetails& qdb_details = gOFS->mQdbContactDetails;
mQcl.reset(new qclient::QClient(qdb_details.members,
qdb_details.constructOptions()));
if (!OneOffQdbMigration(legacy_file)) {
eos_static_err("%s", "msg=\"failed while attempting migration to QDB\"");
return false;
}
if (!LoadFromQdb()) {
eos_static_err("%s", "msg=\"LoadFromQdb failed\"");
return false;
}
if (mFlusher == nullptr) {
mFlusher.reset(new eos::MetadataFlusher(mFlusherPath,
gOFS->mQdbContactDetails));
}
} else {
// In-memory namespace forces the stats to be saved in the file
mLegacyMode = true;
mLegacyFilePath = legacy_file;
if (!LegacyRestoreFromFile()) {
eos_static_err("msg=\"failed to restore info from file\" path=%s",
mLegacyFilePath.c_str());
return false;
}
}
}
mCirculateThread.reset(&Iostat::Circulate, this);
mDoneInit = true;
return true;
}
//------------------------------------------------------------------------------
// One off migration from file based to QDB of IoStat information
//------------------------------------------------------------------------------
bool
Iostat::OneOffQdbMigration(const std::string& legacy_file)
{
struct stat info;
if (stat(legacy_file.c_str(), &info)) {
// File does not exist, migration was probably already done
return true;
}
FILE* fin = fopen(legacy_file.c_str(), "r");
if (!fin) {
eos_static_err("msg=\"failed to open iostat file\" path=\"%s\"",
legacy_file.c_str());
return false;
}
int item = 0;
char line[16384];
std::string tag;
std::list entries;
while ((item = fscanf(fin, "%16383s\n", line)) == 1) {
XrdOucEnv env(line);
if (env.Get("tag") && env.Get("uid") && env.Get("val")) {
entries.push_back(EncodeKey(USER_ID_TYPE, env.Get("uid"), env.Get("tag")));
entries.push_back(env.Get("val"));
}
if (env.Get("tag") && env.Get("gid") && env.Get("val")) {
entries.push_back(EncodeKey(GROUP_ID_TYPE, env.Get("gid"), env.Get("tag")));
entries.push_back(env.Get("val"));
}
}
fclose(fin);
// Push all the collected info to QDB
qclient::QHash qhash(*mQcl, GetHashKey());
try {
bool done = qhash.hmset(entries);
if (!done) {
eos_static_err("%s", "msg=\"failed while inserting entries in QDB\"");
return false;
}
} catch (const std::exception& e) {
eos_static_err("msg=\"got exception while inserting entrines in QDB\" "
"emsg=\"%s\"", e.what());
return false;
}
// Save file based iostat as a backup
const std::string bkp_path = legacy_file + ".bkp";
if (rename(legacy_file.c_str(), bkp_path.c_str())) {
eos_static_err("msg=\"failed file rename\" old_path=\"%s\" new_path=\"%s\"",
legacy_file.c_str(), bkp_path.c_str());
return false;
}
eos_static_info("msg=\"saved iostat backup successfully\" old_path=\"%s\" "
" new_path=\"%s\"", legacy_file.c_str(), bkp_path.c_str());
return true;
}
//------------------------------------------------------------------------------
// Apply instance level configuration concerning IoStats
//------------------------------------------------------------------------------
void
Iostat::ApplyIostatConfig(FsView* fsview)
{
std::string iocollect = fsview->GetGlobalConfig(gIostatCollect);
if ((iocollect == "true") || (iocollect.empty())) {
StartCollection(); // enable by default
}
std::string iopopularity = fsview->GetGlobalConfig(gIostatPopularity);
mReportPopularity = (iopopularity == "true") || (iopopularity.empty());
mReport = fsview->GetBoolGlobalConfig(gIostatReport);
mReportNamespace = fsview->GetBoolGlobalConfig(gIostatReportNamespace);
std::string udplist = fsview->GetGlobalConfig(gIostatUdpTargetList);
std::string delimiter = "|";
std::vector hostlist;
eos::common::StringConversion::Tokenize(udplist, hostlist, delimiter);
std::unique_lock scope_lock(mBcastMutex);
mUdpPopularityTarget.clear();
for (size_t i = 0; i < hostlist.size(); ++i) {
AddUdpTarget(hostlist[i], false);
}
}
//------------------------------------------------------------------------------
// Store IoStat config in the instance level configuration
//------------------------------------------------------------------------------
bool
Iostat::StoreIostatConfig(FsView* fsview) const
{
bool ok = fsview->SetGlobalConfig(gIostatPopularity, mReportPopularity) &
fsview->SetGlobalConfig(gIostatReport, mReport) &
fsview->SetGlobalConfig(gIostatReportNamespace, mReportNamespace) &
fsview->SetGlobalConfig(gIostatCollect, mRunning);
std::string udp_popularity_targets = EncodeUdpPopularityTargets();
if (!udp_popularity_targets.empty()) {
ok &= fsview->SetGlobalConfig(gIostatUdpTargetList, udp_popularity_targets);
}
return ok;
}
//------------------------------------------------------------------------------
// Start collection thread
//------------------------------------------------------------------------------
bool
Iostat::StartCollection()
{
std::unique_lock scope_lock(mThreadSyncMutex);
if (!mRunning) {
mRunning = true;
mReceivingThread.reset(&Iostat::Receive, this);
StoreIostatConfig(&FsView::gFsView);
return true;
}
return false;
}
//------------------------------------------------------------------------------
// Stop collection thread
//------------------------------------------------------------------------------
bool
Iostat::StopCollection()
{
std::unique_lock scope_lock(mThreadSyncMutex);
if (mRunning) {
mReceivingThread.join();
mRunning = false;
StoreIostatConfig(&FsView::gFsView);
return true;
}
return false;
}
//------------------------------------------------------------------------------
// Start popularity thread
//------------------------------------------------------------------------------
bool
Iostat::StartPopularity()
{
std::unique_lock scope_lock(mThreadSyncMutex);
if (!mReportPopularity) {
mReportPopularity = true;
StoreIostatConfig(&FsView::gFsView);
return true;
}
return false;
}
//------------------------------------------------------------------------------
// Start popularity thread
//------------------------------------------------------------------------------
bool
Iostat::StopPopularity()
{
std::unique_lock scope_lock(mThreadSyncMutex);
if (mReportPopularity) {
mReportPopularity = false;
StoreIostatConfig(&FsView::gFsView);
return true;
}
return false;
}
//------------------------------------------------------------------------------
// Start daily report thread
//------------------------------------------------------------------------------
bool
Iostat::StartReport()
{
std::unique_lock scope_lock(mThreadSyncMutex);
if (!mReport) {
mReport = true;
StoreIostatConfig(&FsView::gFsView);
return true;
}
return false;
}
//------------------------------------------------------------------------------
// Stop daily report thread
//------------------------------------------------------------------------------
bool
Iostat::StopReport()
{
std::unique_lock scope_lock(mThreadSyncMutex);
if (mReport) {
mReport = false;
StoreIostatConfig(&FsView::gFsView);
return true;
}
return false;
}
//------------------------------------------------------------------------------
// Start namespace report thread
//------------------------------------------------------------------------------
bool
Iostat::StartReportNamespace()
{
std::unique_lock scope_lock(mThreadSyncMutex);
if (!mReportNamespace) {
mReportNamespace = true;
StoreIostatConfig(&FsView::gFsView);
return true;
}
return false;
}
//------------------------------------------------------------------------------
// Stop namespace report thread
//------------------------------------------------------------------------------
bool
Iostat::StopReportNamespace()
{
std::unique_lock scope_lock(mThreadSyncMutex);
if (mReportNamespace) {
mReportNamespace = false;
StoreIostatConfig(&FsView::gFsView);
return true;
}
return false;
}
//------------------------------------------------------------------------------
// Record measurement to the various periods it overlaps with
//------------------------------------------------------------------------------
void
Iostat::Add(const std::string& tag, uid_t uid, gid_t gid,
unsigned long long val,
time_t start, time_t stop, time_t now)
{
// Flush to QDB if not in testing mode - this can be called without a lock
// as this is only called from the thread digesting the report messages one
// by one
if (gOFS && !mLegacyMode) {
AddToQdb(tag, uid, gid, val);
}
std::unique_lock scope_lock(mDataMutex);
IostatTag[tag] += val;
IostatUid[tag][uid] += val;
IostatGid[tag][gid] += val;
IostatPeriodsUid[tag][uid].Add(val, start, stop, now);
IostatPeriodsGid[tag][gid].Add(val, start, stop, now);
IostatPeriodsTag[tag].Add(val, start, stop, now);
}
//------------------------------------------------------------------------------
// Low level implementation for Add method also sending data to QDB
//------------------------------------------------------------------------------
void
Iostat::AddToQdb(const std::string& tag, uid_t uid, gid_t gid,
unsigned long long val)
{
if (mFlusher) {
CacheUpdate(EncodeKey(USER_ID_TYPE, std::to_string(uid), tag),
EncodeKey(GROUP_ID_TYPE, std::to_string(gid), tag), val);
if (ShouldFlushCache()) {
FlushCache();
}
}
}
//------------------------------------------------------------------------------
// Save given update in the in-memory cache
//------------------------------------------------------------------------------
void
Iostat::CacheUpdate(const std::string& uid_key, const std::string& gid_key,
unsigned long long val)
{
mMapCacheUpdates[uid_key] += val;
mMapCacheUpdates[gid_key] += val;
}
//------------------------------------------------------------------------------
// Check if the cache needs to be flushed
//------------------------------------------------------------------------------
bool
Iostat::ShouldFlushCache()
{
using namespace std::chrono;
static auto timestamp = steady_clock::now();
if ((mMapCacheUpdates.size() >= mMapMaxSize) ||
(duration_cast(steady_clock::now() - timestamp) > mCacheFlushDelay)) {
timestamp = steady_clock::now();
return true;
}
return false;
}
//------------------------------------------------------------------------------
// Flush all cached entries to the QDB backed
//------------------------------------------------------------------------------
void
Iostat::FlushCache()
{
using namespace std::chrono;
using eos::common::Timing;
static const hours timeout {1};
static auto timestamp = steady_clock::now();
static std::string hash_key = mHashKeyBase + Timing::GetCurrentYear();
// Check for change of hash key when a new year starts
if (duration_cast(steady_clock::now() - timestamp) > timeout) {
timestamp = steady_clock::now();
std::string new_hash_key = mHashKeyBase + Timing::GetCurrentYear();
if (new_hash_key != hash_key) {
hash_key = new_hash_key;
}
}
static std::vector request;
request.reserve(3 * mMapMaxSize + 1);
request.push_back("HINCRBYMULTI");
for (const auto& elem : mMapCacheUpdates) {
const std::string svalue = std::to_string(elem.second);
request.push_back(hash_key);
request.push_back(elem.first);
request.push_back(svalue);
}
mMapCacheUpdates.clear();
mFlusher->exec(request);
request.clear();
}
//------------------------------------------------------------------------------
// Get sum of measurements for the given tag (looping all uids per tag)
//------------------------------------------------------------------------------
unsigned long long
Iostat::GetTotalStatForTag(const char* tag) const
{
unsigned long long val = 0ull;
if (!IostatTag.count(tag)) {
return val;
}
val = IostatTag.find(tag)->second;
return val;
}
//------------------------------------------------------------------------------
// Get sum of measurements for the given tag an period (looping all uids per tag)
//------------------------------------------------------------------------------
unsigned long long
Iostat::GetPeriodStatForTag(const char* tag, size_t period, time_t secago) const
{
auto it = IostatPeriodsTag.find(tag);
if (it == IostatPeriodsTag.end()) {
return 0ull;
}
return it->second.GetDataInPeriod(period, secago, time(0ull));
}
//------------------------------------------------------------------------------
// Method executed by the thread receiving reports
//------------------------------------------------------------------------------
void
Iostat::Receive(ThreadAssistant& assistant) noexcept
{
eos_static_info("%s", "msg=\"starting iostat receive thread\"");
if (gOFS == nullptr) {
return;
}
while (!mDoneInit) {
assistant.wait_for(std::chrono::seconds(5));
if (assistant.terminationRequested()) {
break;
}
}
const std::string qdb_channel = "/eos/*/report";
mq::ReportListener listener(gOFS->MgmOfsBroker.c_str(), gOFS->HostName,
gOFS->mMessagingRealm->haveQDB(),
gOFS->mQdbContactDetails, qdb_channel);
while (!assistant.terminationRequested()) {
std::string newmessage;
while (listener.fetch(newmessage, &assistant)) {
if (assistant.terminationRequested()) {
break;
}
XrdOucString body = newmessage.c_str();
while (body.replace("&&", "&")) {
}
XrdOucEnv ioreport(body.c_str());
time_t now = time(0);
std::unique_ptr report(new eos::common::Report(ioreport));
Add("bytes_read", report->uid, report->gid, report->rb, report->ots,
report->cts, now);
Add("bytes_read", report->uid, report->gid, report->rvb_sum, report->ots,
report->cts, now);
Add("bytes_written", report->uid, report->gid, report->wb, report->ots,
report->cts, now);
Add("read_calls", report->uid, report->gid, report->nrc, report->ots,
report->cts, now);
Add("readv_calls", report->uid, report->gid, report->rv_op, report->ots,
report->cts, now);
Add("write_calls", report->uid, report->gid, report->nwc, report->ots,
report->cts, now);
Add("fwd_seeks", report->uid, report->gid, report->nfwds, report->ots,
report->cts, now);
Add("bwd_seeks", report->uid, report->gid, report->nbwds, report->ots,
report->cts, now);
Add("xl_fwd_seeks", report->uid, report->gid, report->nxlfwds, report->ots,
report->cts, now);
Add("xl_bwd_seeks", report->uid, report->gid, report->nxlbwds, report->ots,
report->cts, now);
Add("bytes_fwd_seek", report->uid, report->gid, report->sfwdb, report->ots,
report->cts, now);
Add("bytes_bwd_wseek", report->uid, report->gid, report->sbwdb, report->ots,
report->cts, now);
Add("bytes_xl_fwd_seek", report->uid, report->gid, report->sxlfwdb, report->ots,
report->cts, now);
Add("bytes_xl_bwd_wseek", report->uid, report->gid, report->sxlbwdb,
report->ots, report->cts, now);
Add("disk_time_read", report->uid, report->gid, report->rt,
report->ots, report->cts, now);
Add("disk_time_write", report->uid, report->gid,
report->wt, report->ots, report->cts, now);
if (report->dsize) {
Add("bytes_deleted", 0, 0, report->dsize, now - 30, now, now);
Add("files_deleted", 0, 0, 1, now - 30, now, now);
}
// Do the UDP broadcasting
UdpBroadCast(report.get());
// Do the domain accounting
if (report->path.substr(0, 11) == "/replicate:") {
// check if this is a replication path
// push into the 'eos' domain
std::unique_lock scope_lock(mDataMutex);
if (report->rb) {
IostatPeriodsDomainIOrb["eos"].Add(report->rb, report->ots, report->cts, now);
}
if (report->wb) {
IostatPeriodsDomainIOwb["eos"].Add(report->wb, report->ots, report->cts, now);
}
} else {
if (mReportPopularity) {
// do the popularity accounting here for everything which is not replication!
AddToPopularity(report->path, report->rb, report->ots, report->cts);
}
std::string sdomain = report->sec_domain;
{
std::unique_lock scope_lock(mDataMutex);
if (report->rb) {
IostatPeriodsDomainIOrb[sdomain].Add(report->rb, report->ots, report->cts, now);
}
if (report->wb) {
IostatPeriodsDomainIOwb[sdomain].Add(report->wb, report->ots, report->cts, now);
}
}
}
// do the application accounting here
std::string apptag = "other";
if (report->sec_app.length()) {
apptag = report->sec_app;
}
// Push into app accounting
{
std::unique_lock scope_lock(mDataMutex);
if (report->rb) {
IostatPeriodsAppIOrb[apptag].Add(report->rb, report->ots, report->cts, now);
}
if (report->wb) {
IostatPeriodsAppIOwb[apptag].Add(report->wb, report->ots, report->cts, now);
}
}
if (mReport && gOFS->mMaster->IsMaster()) {
WriteRecord(body.c_str());
}
if (mReportNamespace) {
// add the record into the report namespace file
char path[4096];
snprintf(path, sizeof(path) - 1, "%s/%s", gOFS->IoReportStorePath.c_str(),
report->path.c_str());
eos::common::Path cPath(path);
if (cPath.MakeParentPath(S_IRWXU | S_IRGRP | S_IXGRP)) {
FILE* freport = fopen(path, "a+");
if (freport) {
fprintf(freport, "%s\n", body.c_str());
fclose(freport);
}
}
}
}
assistant.wait_for(std::chrono::seconds(1));
}
eos_static_info("%s", "msg=\"stopping iostat receiver thread\"");
}
//------------------------------------------------------------------------------
// Write record to the stream - used by the MGM/FUSEX to push entries
//------------------------------------------------------------------------------
void
Iostat::WriteRecord(const std::string& record)
{
static uint32_t sec_per_day = 24 * 3600;
static std::mutex s_mutex;
static std::string s_report_fn = "";
static time_t s_last_ts = 0ull;
time_t now_ts = time(NULL);
std::unique_lock scope_lock(s_mutex);
if (now_ts / sec_per_day != s_last_ts / sec_per_day) {
struct tm nowtm;
if (localtime_r(&now_ts, &nowtm)) {
static char logfile[4096];
snprintf(logfile, sizeof(logfile) - 1, "%s/%04u/%02u/%04u%02u%02u.eosreport",
gOFS->IoReportStorePath.c_str(),
1900 + nowtm.tm_year,
nowtm.tm_mon + 1,
1900 + nowtm.tm_year,
nowtm.tm_mon + 1,
nowtm.tm_mday);
std::string report_fn = logfile;
if (report_fn != s_report_fn) {
if (gOpenReportFD) {
fclose(gOpenReportFD);
gOpenReportFD = nullptr;
}
eos::common::Path cPath(report_fn.c_str());
if (cPath.MakeParentPath(S_IRWXU | S_IRGRP | S_IXGRP)) {
gOpenReportFD = fopen(report_fn.c_str(), "a+");
if (!gOpenReportFD) {
eos_static_err("msg=\"failed to open report file\" path=%s",
report_fn.c_str());
return;
}
} else {
eos_static_err("msg=\"failed to create report parent path\" path=%s",
report_fn.c_str());
return;
}
s_report_fn = report_fn;
s_last_ts = now_ts;
}
}
}
if (gOpenReportFD) {
fprintf(gOpenReportFD, "%s\n", record.c_str());
fflush(gOpenReportFD);
}
}
//------------------------------------------------------------------------------
// Print IO statistics
//------------------------------------------------------------------------------
void
Iostat::PrintOut(XrdOucString& out, bool summary, bool details,
bool monitoring, bool numerical, bool top,
bool domain, bool apps, bool sample_stat, time_t time_ago,
time_t time_interval, XrdOucString option)
{
std::string format_s = (!monitoring ? "s" : "os");
std::string format_ss = (!monitoring ? "-s" : "os");
std::string format_l = (!monitoring ? "+l" : "ol");
std::string format_ll = (!monitoring ? "l." : "ol");
std::unique_lock scope_lock(mDataMutex);
time_t now = time(NULL);
bool interval = false;
time_ago = time_ago % 86400;
time_interval = time_interval % 86400;
if (time_interval != 0) {
interval = true;
}
std::vector tags;
if (summary || top) {
for (auto tit = IostatTag.begin(); tit != IostatTag.end(); ++tit) {
tags.push_back(tit->first);
}
std::sort(tags.begin(), tags.end());
}
if (summary) {
TableFormatterBase table;
TableData table_data;
if (interval) {
if (!monitoring) {
table.SetHeader({
std::make_tuple("who", 3, format_ss),
std::make_tuple("io value", 24, format_s),
std::make_tuple("data in interval", 8, format_l),
std::make_tuple("avg rate [B/s]", 8, format_l),
});
} else {
table.SetHeader({
std::make_tuple("uid", 0, format_ss),
std::make_tuple("gid", 0, format_s),
std::make_tuple("measurement", 0, format_s),
std::make_tuple("intervaldata", 8, format_l),
std::make_tuple("intervalrate", 8, format_l),
});
}
} else {
if (!monitoring) {
table.SetHeader({
std::make_tuple("who", 3, format_ss),
std::make_tuple("io value", 24, format_s),
std::make_tuple("1min", 8, format_l),
std::make_tuple("5min", 8, format_l),
std::make_tuple("1h", 8, format_l),
std::make_tuple("24h", 8, format_l),
std::make_tuple("sum", 8, format_l),
});
} else {
table.SetHeader({
std::make_tuple("uid", 0, format_ss),
std::make_tuple("gid", 0, format_s),
std::make_tuple("measurement", 0, format_s),
std::make_tuple("60s", 0, format_l),
std::make_tuple("300s", 0, format_l),
std::make_tuple("3600s", 0, format_l),
std::make_tuple("86400s", 0, format_l),
std::make_tuple("total", 0, format_l),
});
}
}
for (const auto& elem : tags) {
const char* tag = elem.c_str();
table_data.emplace_back();
TableRow& row = table_data.back();
row.emplace_back("all", format_ss);
if (monitoring) {
row.emplace_back("all", format_s);
}
row.emplace_back(tag, format_s);
if (interval) {
row.emplace_back(GetPeriodStatForTag(tag, time_interval, time_ago), format_ll);
row.emplace_back(GetPeriodStatForTag(tag, time_interval,
time_ago) / (float)time_interval, format_ll);
} else {
// getting tag stat sums for 1day (idx=0), 1h (idx=1), 5m (idx=2), 1min (idx=3)
row.emplace_back(GetPeriodStatForTag(tag, 60), format_ll);
row.emplace_back(GetPeriodStatForTag(tag, 300), format_ll);
row.emplace_back(GetPeriodStatForTag(tag, 3600), format_ll);
row.emplace_back(GetPeriodStatForTag(tag, 86400), format_ll);
row.emplace_back(GetTotalStatForTag(tag), format_ll);
}
}
table.AddRows(table_data);
out += table.GenerateTable(HEADER).c_str();
table_data.clear();
if (!interval) {
//! UDP Popularity Broadcast Target
std::unique_lock mLock(mBcastMutex);
if (!mUdpPopularityTarget.empty()) {
TableFormatterBase table_udp;
if (!monitoring) {
table_udp.SetHeader({
std::make_tuple("UDP Popularity Broadcast Target", 32, format_ss)
});
} else {
table_udp.SetHeader({ std::make_tuple("udptarget", 0, format_ss) });
}
for (const auto& elem : mUdpPopularityTarget) {
table_data.emplace_back();
table_data.back().emplace_back(elem.c_str(), format_ss);
}
table_udp.AddRows(table_data);
out += table_udp.GenerateTable(HEADER).c_str();
}
}
}
if (details) {
if (interval) {
std::vector>
uidout, gidout;
TableFormatterBase table_user;
TableData table_data;
//! User statistic
if (!monitoring) {
table_user.SetHeader({
std::make_tuple("user", 5, format_ss),
std::make_tuple("io value", 24, format_s),
std::make_tuple("data in interval", 8, format_l),
std::make_tuple("avg rate [B/s]", 8, format_l),
});
} else {
table_user.SetHeader({
std::make_tuple("uid", 0, format_ss),
std::make_tuple("measurement", 0, format_s),
std::make_tuple("intervaldata", 8, format_l),
std::make_tuple("intervalrate", 8, format_l),
});
}
for (auto tuit = IostatPeriodsUid.begin(); tuit != IostatPeriodsUid.end();
tuit++) {
for (auto it = tuit->second.begin(); it != tuit->second.end(); ++it) {
std::string username;
if (numerical) {
username = std::to_string(it->first);
} else {
int terrc = 0;
username = eos::common::Mapping::UidToUserName(it->first, terrc);
}
// getting tag stat sums for 1day (idx=0), 1h (idx=1), 5m (idx=2), 1min (idx=3)
uidout.emplace_back(std::make_tuple(username, tuit->first.c_str(),
it->second.GetDataInPeriod(time_interval, time_ago, now),
it->second.GetDataInPeriod(time_interval, time_ago, now) / (float)time_interval
));
}
}
std::sort(uidout.begin(), uidout.end());
for (auto& tup : uidout) {
table_data.emplace_back();
TableRow& row = table_data.back();
row.emplace_back(std::get<0>(tup), format_ss);
row.emplace_back(std::get<1>(tup), format_s);
row.emplace_back(std::get<2>(tup), format_l);
row.emplace_back(std::get<3>(tup), format_l);
}
table_user.AddRows(table_data);
out += table_user.GenerateTable(HEADER).c_str();
table_data.clear();
// Group statistics
TableFormatterBase table_group;
if (!monitoring) {
table_group.SetHeader({
std::make_tuple("group", 5, format_ss),
std::make_tuple("io value", 24, format_s),
std::make_tuple("data in interval", 8, format_l),
std::make_tuple("avg rate [B/s]", 8, format_l),
});
} else {
table_group.SetHeader({
std::make_tuple("gid", 0, format_ss),
std::make_tuple("measurement", 0, format_s),
std::make_tuple("intervaldata", 8, format_l),
std::make_tuple("intervalrate", 8, format_l),
});
}
for (auto tgit = IostatPeriodsGid.begin(); tgit != IostatPeriodsGid.end();
tgit++) {
for (auto it = tgit->second.begin(); it != tgit->second.end(); ++it) {
std::string groupname;
if (numerical) {
groupname = std::to_string(it->first);
} else {
int terrc = 0;
groupname = eos::common::Mapping::GidToGroupName(it->first, terrc);
}
// getting stat sums for 1day (idx=0), 1h (idx=1), 5m (idx=2), 1min (idx=3)
gidout.emplace_back(std::make_tuple(groupname, tgit->first.c_str(),
it->second.GetDataInPeriod(time_interval, time_ago, now),
it->second.GetDataInPeriod(time_interval, time_ago, now) / (float)time_interval
));
}
}
std::sort(gidout.begin(), gidout.end());
for (auto& tup : gidout) {
table_data.emplace_back();
TableRow& row = table_data.back();
row.emplace_back(std::get<0>(tup), format_ss);
row.emplace_back(std::get<1>(tup), format_s);
row.emplace_back(std::get<2>(tup), format_l);
row.emplace_back(std::get<3>(tup), format_l);
}
table_group.AddRows(table_data);
out += table_group.GenerateTable(HEADER).c_str();
table_data.clear();
} else {
std::vector>
uidout_sec, gidout_sec;
std::vector>
uidout_b, gidout_b;
//std::vector>
// uidout, gidout;
TableData table_data;
XrdOucString marker_b =
"\n┏━> Sum of bytes transferred in last 1m/5m/1h/24h and total sum: \n";
//! User statistic
TableFormatterBase table_user_b;
if (!monitoring) {
table_user_b.SetHeader({
std::make_tuple("user", 5, format_ss),
std::make_tuple("io value", 24, format_s),
std::make_tuple("1min", 8, format_l),
std::make_tuple("5min", 8, format_l),
std::make_tuple("1h", 8, format_l),
std::make_tuple("24h", 8, format_l),
std::make_tuple("sum", 8, format_l),
});
} else {
table_user_b.SetHeader({
std::make_tuple("uid", 0, format_ss),
std::make_tuple("measurement", 0, format_s),
std::make_tuple("60s", 0, format_l),
std::make_tuple("300s", 0, format_l),
std::make_tuple("3600s", 0, format_l),
std::make_tuple("86400s", 0, format_l),
std::make_tuple("total", 0, format_l),
});
}
XrdOucString marker_sec =
"\n┏━> Transfer (tf) sample info every 5 min: tf time for 90/95/99% of data, max tf and report times, average tf size, tf count.\n";
TableFormatterBase table_user_sec;
if (sample_stat) {
if (!monitoring) {
table_user_sec.SetHeader({
std::make_tuple("user", 5, format_ss),
std::make_tuple("io value", 24, format_s),
std::make_tuple("90% [s]", 8, format_l),
std::make_tuple("95% [s]", 8, format_l),
std::make_tuple("99% [s]", 8, format_l),
std::make_tuple("max [s]", 8, format_l),
std::make_tuple("max report [s]", 8, format_l),
std::make_tuple("avg tf size", 8, format_l),
std::make_tuple("tf #", 8, format_l),
std::make_tuple("sample end time", 24, format_s)
});
} else {
table_user_sec.SetHeader({
std::make_tuple("uid", 0, format_ss),
std::make_tuple("measurement", 0, format_s),
std::make_tuple("tfsecto90p", 0, format_l),
std::make_tuple("tfsecto95p", 0, format_l),
std::make_tuple("tfsecto99p", 0, format_l),
std::make_tuple("maxtransfersec", 0, format_l),
std::make_tuple("maxreportsec", 0, format_l),
std::make_tuple("avgtfsize5min", 0, format_l),
std::make_tuple("tfcount", 0, format_l),
std::make_tuple("sampletimestamp", 0, format_s)
});
}
}
for (auto tuit = IostatPeriodsUid.begin(); tuit != IostatPeriodsUid.end();
tuit++) {
for (auto it = tuit->second.begin(); it != tuit->second.end(); ++it) {
std::string username;
if (numerical) {
username = std::to_string(it->first);
} else {
int terrc = 0;
username = eos::common::Mapping::UidToUserName(it->first, terrc);
}
// getting tag stat sums for 1day (idx=0), 1h (idx=1), 5m (idx=2), 1min (idx=3)
uidout_b.emplace_back(std::make_tuple(username, tuit->first.c_str(),
it->second.GetDataInPeriod(60, 0, now),
it->second.GetDataInPeriod(300, 0, now),
it->second.GetDataInPeriod(3600, 0, now),
it->second.GetDataInPeriod(86400, 0, now),
IostatUid[tuit->first][it->first]
));
if (sample_stat) {
std::string sample_time = "";
if (!monitoring) {
sample_time = it->second.GetLastSampleUpdateTimestamp(true);
} else {
sample_time = it->second.GetLastSampleUpdateTimestamp(false);
}
uidout_sec.emplace_back(std::make_tuple(username, tuit->first.c_str(),
it->second.GetTimeToPercComplete(P90),
it->second.GetTimeToPercComplete(P95),
it->second.GetTimeToPercComplete(P99),
it->second.GetLongestTransferTime(),
it->second.GetLongestReportTime(),
it->second.GetAvgTransferSize(),
it->second.GetTfCountInSample(),
sample_time
));
}
}
}
std::sort(uidout_b.begin(), uidout_b.end());
std::sort(uidout_sec.begin(), uidout_sec.end());
for (auto& tup : uidout_b) {
table_data.emplace_back();
TableRow& row = table_data.back();
row.emplace_back(std::get<0>(tup), format_ss);
row.emplace_back(std::get<1>(tup), format_s);
row.emplace_back(std::get<2>(tup), format_l);
row.emplace_back(std::get<3>(tup), format_l);
row.emplace_back(std::get<4>(tup), format_l);
row.emplace_back(std::get<5>(tup), format_l);
row.emplace_back(std::get<6>(tup), format_l);
}
table_user_b.AddRows(table_data);
out += !monitoring ? marker_b : "";
out += table_user_b.GenerateTable(HEADER).c_str();
table_data.clear();
if (sample_stat) {
for (auto& tup : uidout_sec) {
table_data.emplace_back();
TableRow& row = table_data.back();
row.emplace_back(std::get<0>(tup), format_ss);
row.emplace_back(std::get<1>(tup), format_s);
row.emplace_back(std::get<2>(tup), format_l);
row.emplace_back(std::get<3>(tup), format_l);
row.emplace_back(std::get<4>(tup), format_l);
row.emplace_back(std::get<5>(tup), format_l);
row.emplace_back(std::get<6>(tup), format_l);
row.emplace_back(std::get<7>(tup), format_l);
row.emplace_back(std::get<8>(tup), format_l);
row.emplace_back(std::get<9>(tup), format_s);
}
table_user_sec.AddRows(table_data);
out += !monitoring ? marker_sec : "";
out += table_user_sec.GenerateTable(HEADER).c_str();
table_data.clear();
}
//! Group statistic
TableFormatterBase table_group_b;
if (!monitoring) {
table_group_b.SetHeader({
std::make_tuple("group", 5, format_ss),
std::make_tuple("io value", 24, format_s),
std::make_tuple("1min", 8, format_l),
std::make_tuple("5min", 8, format_l),
std::make_tuple("1h", 8, format_l),
std::make_tuple("24h", 8, format_l),
std::make_tuple("sum", 8, format_l),
});
} else {
table_group_b.SetHeader({
std::make_tuple("gid", 0, format_ss),
std::make_tuple("measurement", 0, format_s),
std::make_tuple("60s", 0, format_l),
std::make_tuple("300s", 0, format_l),
std::make_tuple("3600s", 0, format_l),
std::make_tuple("86400s", 0, format_l),
std::make_tuple("total", 0, format_l),
});
}
TableFormatterBase table_group_sec;
if (sample_stat) {
if (!monitoring) {
table_group_sec.SetHeader({
std::make_tuple("group", 5, format_ss),
std::make_tuple("io value", 24, format_s),
std::make_tuple("90% [s]", 8, format_l),
std::make_tuple("95% [s]", 8, format_l),
std::make_tuple("99% [s]", 8, format_l),
std::make_tuple("max [s]", 8, format_l),
std::make_tuple("max report [s]", 8, format_l),
std::make_tuple("avg tf size", 8, format_l),
std::make_tuple("tf #", 8, format_l),
std::make_tuple("sample end time", 24, format_s)
});
} else {
table_group_sec.SetHeader({
std::make_tuple("gid", 0, format_ss),
std::make_tuple("measurement", 0, format_s),
std::make_tuple("tfsecto90p", 0, format_l),
std::make_tuple("tfsecto95p", 0, format_l),
std::make_tuple("tfsecto99p", 0, format_l),
std::make_tuple("maxtransfersec", 0, format_l),
std::make_tuple("maxreportsec", 0, format_l),
std::make_tuple("avgtfsize5min", 0, format_l),
std::make_tuple("tfcount", 0, format_l),
std::make_tuple("sampletimestamp", 0, format_s)
});
}
}
for (auto tgit = IostatPeriodsGid.begin(); tgit != IostatPeriodsGid.end();
tgit++) {
for (auto it = tgit->second.begin(); it != tgit->second.end(); ++it) {
std::string groupname;
if (numerical) {
groupname = std::to_string(it->first);
} else {
int terrc = 0;
groupname = eos::common::Mapping::GidToGroupName(it->first, terrc);
}
// getting stat sums for 1day (idx=0), 1h (idx=1), 5m (idx=2), 1min (idx=3)
gidout_b.emplace_back(std::make_tuple(groupname, tgit->first.c_str(),
it->second.GetDataInPeriod(60, 0, now), it->second.GetDataInPeriod(300, 0, now),
it->second.GetDataInPeriod(3600, 0, now), it->second.GetDataInPeriod(86400, 0,
now),
IostatGid[tgit->first][it->first]
));
if (sample_stat) {
std::string sample_time = "";
if (!monitoring) {
sample_time = it->second.GetLastSampleUpdateTimestamp(true);
} else {
sample_time = it->second.GetLastSampleUpdateTimestamp(false);
}
gidout_sec.emplace_back(std::make_tuple(groupname, tgit->first.c_str(),
it->second.GetTimeToPercComplete(P90),
it->second.GetTimeToPercComplete(P95),
it->second.GetTimeToPercComplete(P99),
it->second.GetLongestTransferTime(),
it->second.GetLongestReportTime(),
it->second.GetAvgTransferSize(),
it->second.GetTfCountInSample(),
sample_time
));
}
}
}
std::sort(gidout_b.begin(), gidout_b.end());
std::sort(gidout_sec.begin(), gidout_sec.end());
for (auto& tup : gidout_b) {
table_data.emplace_back();
TableRow& row = table_data.back();
row.emplace_back(std::get<0>(tup), format_ss);
row.emplace_back(std::get<1>(tup), format_s);
row.emplace_back(std::get<2>(tup), format_l);
row.emplace_back(std::get<3>(tup), format_l);
row.emplace_back(std::get<4>(tup), format_l);
row.emplace_back(std::get<5>(tup), format_l);
row.emplace_back(std::get<6>(tup), format_l);
}
table_group_b.AddRows(table_data);
out += !monitoring ? marker_b : "";
out += table_group_b.GenerateTable(HEADER).c_str();
table_data.clear();
if (sample_stat) {
for (auto& tup : gidout_sec) {
table_data.emplace_back();
TableRow& row = table_data.back();
row.emplace_back(std::get<0>(tup), format_ss);
row.emplace_back(std::get<1>(tup), format_s);
row.emplace_back(std::get<2>(tup), format_l);
row.emplace_back(std::get<3>(tup), format_l);
row.emplace_back(std::get<4>(tup), format_l);
row.emplace_back(std::get<5>(tup), format_l);
row.emplace_back(std::get<6>(tup), format_l);
row.emplace_back(std::get<7>(tup), format_l);
row.emplace_back(std::get<8>(tup), format_l);
row.emplace_back(std::get<9>(tup), format_s);
}
table_group_sec.AddRows(table_data);
out += !monitoring ? marker_sec : "";
out += table_group_sec.GenerateTable(HEADER).c_str();
table_data.clear();
}
}
}
if (top) {
TableFormatterBase table;
TableData table_data;
if (!monitoring) {
table.SetHeader({
std::make_tuple("io value", 18, format_ss),
std::make_tuple("ranking by", 10, format_s),
std::make_tuple("rank", 8, format_ll),
std::make_tuple("who", 4, format_s),
std::make_tuple("sum", 8, format_l)
});
} else {
table.SetHeader({
std::make_tuple("measurement", 0, format_ss),
std::make_tuple("rank", 0, format_ll),
std::make_tuple("uid", 0, format_s),
std::make_tuple("gid", 0, format_s),
std::make_tuple("counter", 0, format_l)
});
}
for (auto it = tags.begin(); it != tags.end(); ++it) {
std::vector > uidout, gidout;
table.AddSeparator();
// by uid name
for (auto sit : IostatUid[*it]) {
uidout.push_back(std::make_tuple(sit.second, sit.first));
}
std::sort(uidout.begin(), uidout.end());
std::reverse(uidout.begin(), uidout.end());
int topplace = 0;
for (auto sit : uidout) {
topplace++;
uid_t uid = std::get<1>(sit);
unsigned long long counter = std::get<0>(sit);
std::string username;
if (numerical) {
username = std::to_string(uid);
} else {
int terrc = 0;
username = eos::common::Mapping::UidToUserName(uid, terrc);
}
table_data.emplace_back();
TableRow& row = table_data.back();
row.emplace_back(it->c_str(), format_ss);
if (!monitoring) {
row.emplace_back("user", format_s);
}
row.emplace_back(topplace, format_ll);
row.emplace_back(username, format_s);
if (monitoring) {
row.emplace_back("", "", "", true);
}
row.emplace_back(counter, format_l);
}
// by gid name
for (auto sit : IostatGid[*it]) {
gidout.push_back(std::make_tuple(sit.second, sit.first));
}
std::sort(gidout.begin(), gidout.end());
std::reverse(gidout.begin(), gidout.end());
topplace = 0;
for (auto sit : gidout) {
topplace++;
uid_t gid = std::get<1>(sit);
unsigned long long counter = std::get<0>(sit);
std::string groupname;
if (numerical) {
groupname = std::to_string(gid);
} else {
int terrc = 0;
groupname = eos::common::Mapping::GidToGroupName(gid, terrc);
}
table_data.emplace_back();
TableRow& row = table_data.back();
row.emplace_back(it->c_str(), format_ss);
if (!monitoring) {
row.emplace_back("group", format_s);
}
row.emplace_back(topplace, format_ll);
if (monitoring) {
row.emplace_back("", "", "", true);
}
row.emplace_back(groupname, format_s);
row.emplace_back(counter, format_l);
}
}
table.AddRows(table_data);
out += table.GenerateTable(HEADER).c_str();
}
if (domain) {
TableData table_data;
if (interval) {
TableFormatterBase table;
//! User statistic
if (!monitoring) {
table.SetHeader({
std::make_tuple("io", 5, format_ss),
std::make_tuple("domain", 24, format_s),
std::make_tuple("data in interval", 8, format_l),
std::make_tuple("avg rate [B/s]", 8, format_l),
});
} else {
table.SetHeader({
std::make_tuple("measurement", 0, format_ss),
std::make_tuple("domain", 0, format_s),
std::make_tuple("intervaldata", 8, format_l),
std::make_tuple("intervalrate", 8, format_l),
});
}
// IO out bytes
for (auto it = IostatPeriodsDomainIOrb.begin();
it != IostatPeriodsDomainIOrb.end();
++it) {
table_data.emplace_back();
TableRow& row = table_data.back();
std::string name = !monitoring ? "out" : "domain_io_out";
row.emplace_back(name, format_ss);
row.emplace_back(it->first.c_str(), format_s);
row.emplace_back(it->second.GetDataInPeriod(time_interval, time_ago, now),
format_ll);
row.emplace_back(it->second.GetDataInPeriod(time_interval, time_ago,
now) / (float)time_interval, format_ll);
}
// IO in bytes
for (auto it = IostatPeriodsDomainIOwb.begin();
it != IostatPeriodsDomainIOwb.end();
++it) {
table_data.emplace_back();
TableRow& row = table_data.back();
std::string name = !monitoring ? "in" : "domain_io_in";
row.emplace_back(name, format_ss);
row.emplace_back(it->first.c_str(), format_s);
// getting stat sums for 1day (idx=0), 1h (idx=1), 5m (idx=2), 1min (idx=3)
row.emplace_back(it->second.GetDataInPeriod(time_interval, time_ago, now),
format_ll);
row.emplace_back(it->second.GetDataInPeriod(time_interval, time_ago,
now) / (float)time_interval, format_ll);
}
table.AddRows(table_data);
out += table.GenerateTable(HEADER).c_str();
table_data.clear();
} else {
XrdOucString marker_b =
"\n┏━> Sum of bytes transferred in last 1m/5m/1h/24h and total sum: \n";
//! User statistic
TableFormatterBase table_domain_b;
if (!monitoring) {
table_domain_b.SetHeader({
std::make_tuple("io", 5, format_ss),
std::make_tuple("domain", 24, format_s),
std::make_tuple("1min", 8, format_l),
std::make_tuple("5min", 8, format_l),
std::make_tuple("1h", 8, format_l),
std::make_tuple("24h", 8, format_l),
std::make_tuple("sum", 8, format_l),
});
} else {
table_domain_b.SetHeader({
std::make_tuple("measurement", 0, format_ss),
std::make_tuple("domain", 0, format_s),
std::make_tuple("60s", 0, format_l),
std::make_tuple("300s", 0, format_l),
std::make_tuple("3600s", 0, format_l),
std::make_tuple("86400s", 0, format_l),
std::make_tuple("total", 0, format_l),
});
}
// IO out bytes
for (auto it = IostatPeriodsDomainIOrb.begin();
it != IostatPeriodsDomainIOrb.end();
++it) {
table_data.emplace_back();
TableRow& row = table_data.back();
std::string name = !monitoring ? "out" : "domain_io_out";
row.emplace_back(name, format_ss);
row.emplace_back(it->first.c_str(), format_s);
// getting stat sums for 1day (idx=0), 1h (idx=1), 5m (idx=2), 1min (idx=3)
row.emplace_back(it->second.GetDataInPeriod(60, 0, now), format_l);
row.emplace_back(it->second.GetDataInPeriod(300, 0, now), format_l);
row.emplace_back(it->second.GetDataInPeriod(3600, 0, now), format_l);
row.emplace_back(it->second.GetDataInPeriod(86400, 0, now), format_l);
row.emplace_back(it->second.GetTotalSum(), format_l);
}
// IO in bytes
for (auto it = IostatPeriodsDomainIOwb.begin();
it != IostatPeriodsDomainIOwb.end();
++it) {
table_data.emplace_back();
TableRow& row = table_data.back();
std::string name = !monitoring ? "in" : "domain_io_in";
row.emplace_back(name, format_ss);
row.emplace_back(it->first.c_str(), format_s);
// getting stat sums for 1day (idx=0), 1h (idx=1), 5m (idx=2), 1min (idx=3)
row.emplace_back(it->second.GetDataInPeriod(60, 0, now), format_l);
row.emplace_back(it->second.GetDataInPeriod(300, 0, now), format_l);
row.emplace_back(it->second.GetDataInPeriod(3600, 0, now), format_l);
row.emplace_back(it->second.GetDataInPeriod(86400, 0, now), format_l);
row.emplace_back(it->second.GetTotalSum(), format_l);
}
table_domain_b.AddRows(table_data);
out += !monitoring ? marker_b : "";
out += table_domain_b.GenerateTable(HEADER).c_str();
table_data.clear();
}
}
if (apps) {
TableData table_data;
if (interval) {
TableFormatterBase table;
//! User statistic
if (!monitoring) {
table.SetHeader({
std::make_tuple("io", 5, format_ss),
std::make_tuple("application", 24, format_s),
std::make_tuple("data in interval", 8, format_l),
std::make_tuple("avg rate [B/s]", 8, format_l),
});
} else {
table.SetHeader({
std::make_tuple("measurement", 0, format_ss),
std::make_tuple("application", 0, format_s),
std::make_tuple("intervaldata", 8, format_l),
std::make_tuple("intervalrate", 8, format_l),
});
}
// IO out bytes
for (auto it = IostatPeriodsAppIOrb.begin(); it != IostatPeriodsAppIOrb.end();
++it) {
table_data.emplace_back();
TableRow& row = table_data.back();
std::string name = (!monitoring ? "out" : "app_io_out");
row.emplace_back(name, format_ss);
row.emplace_back(it->first.c_str(), format_s);
// getting stat sums for 1day (idx=0), 1h (idx=1), 5m (idx=2), 1min (idx=3)
row.emplace_back(it->second.GetDataInPeriod(time_interval, time_ago, now),
format_ll);
row.emplace_back(it->second.GetDataInPeriod(time_interval, time_ago,
now) / (float)time_interval, format_ll);
}
// IO in bytes
for (auto it = IostatPeriodsAppIOwb.begin(); it != IostatPeriodsAppIOwb.end();
++it) {
table_data.emplace_back();
TableRow& row = table_data.back();
std::string name = (!monitoring ? "in" : "app_io_in");
row.emplace_back(name, format_ss);
row.emplace_back(it->first.c_str(), format_s);
// getting stat sums for 1day (idx=0), 1h (idx=1), 5m (idx=2), 1min (idx=3)
row.emplace_back(it->second.GetDataInPeriod(time_interval, time_ago, now),
format_ll);
row.emplace_back(it->second.GetDataInPeriod(time_interval, time_ago,
now) / (float)time_interval, format_ll);
}
table.AddRows(table_data);
out += table.GenerateTable(HEADER).c_str();
table_data.clear();
} else {
XrdOucString marker_b =
"\n┏━> Sum of bytes transferred in last 1m/5m/1h/24h and total sum: \n";
//! User statistic
TableFormatterBase table_app_b;
if (!monitoring) {
table_app_b.SetHeader({
std::make_tuple("io", 5, format_ss),
std::make_tuple("application", 24, format_s),
std::make_tuple("1min", 8, format_l),
std::make_tuple("5min", 8, format_l),
std::make_tuple("1h", 8, format_l),
std::make_tuple("24h", 8, format_l),
std::make_tuple("sum", 8, format_l),
});
} else {
table_app_b.SetHeader({
std::make_tuple("measurement", 0, format_ss),
std::make_tuple("application", 0, format_s),
std::make_tuple("60s", 0, format_l),
std::make_tuple("300s", 0, format_l),
std::make_tuple("3600s", 0, format_l),
std::make_tuple("86400s", 0, format_l),
std::make_tuple("total", 0, format_l),
});
}
XrdOucString marker_sec =
"\n┏━> Transfer (tf) sample info every 5 min: tf time for 90/95/99% of data, max tf and report times, average tf size, tf count.\n";
TableFormatterBase table_app_sec;
if (sample_stat) {
if (!monitoring) {
table_app_sec.SetHeader({
std::make_tuple("io", 5, format_ss),
std::make_tuple("application", 24, format_s),
std::make_tuple("90% [s]", 8, format_l),
std::make_tuple("95% [s]", 8, format_l),
std::make_tuple("99% [s]", 8, format_l),
std::make_tuple("max [s]", 8, format_l),
std::make_tuple("max report [s]", 8, format_l),
std::make_tuple("avg tf size", 8, format_l),
std::make_tuple("tf #", 8, format_l),
std::make_tuple("sample end time", 24, format_s),
});
} else {
table_app_sec.SetHeader({
std::make_tuple("measurement", 0, format_ss),
std::make_tuple("application", 0, format_s),
std::make_tuple("tfsecto90p", 0, format_l),
std::make_tuple("tfsecto95p", 0, format_l),
std::make_tuple("tfsecto99p", 0, format_l),
std::make_tuple("maxtransfersec", 0, format_l),
std::make_tuple("maxreportsec", 0, format_l),
std::make_tuple("avgtfsize5min", 0, format_l),
std::make_tuple("tfcount", 0, format_l),
std::make_tuple("sampletimestamp", 0, format_s)
});
}
}
// IO out bytes
for (auto it = IostatPeriodsAppIOrb.begin(); it != IostatPeriodsAppIOrb.end();
++it) {
table_data.emplace_back();
TableRow& row = table_data.back();
std::string name = (!monitoring ? "out" : "app_io_out");
row.emplace_back(name, format_ss);
row.emplace_back(it->first.c_str(), format_s);
// getting stat sums for 1day (idx=0), 1h (idx=1), 5m (idx=2), 1min (idx=3)
row.emplace_back(it->second.GetDataInPeriod(60, 0, now), format_l);
row.emplace_back(it->second.GetDataInPeriod(300, 0, now), format_l);
row.emplace_back(it->second.GetDataInPeriod(3600, 0, now), format_l);
row.emplace_back(it->second.GetDataInPeriod(86400, 0, now), format_l);
row.emplace_back(it->second.GetTotalSum(), format_l);
}
// IO in bytes
for (auto it = IostatPeriodsAppIOwb.begin(); it != IostatPeriodsAppIOwb.end();
++it) {
table_data.emplace_back();
TableRow& row = table_data.back();
std::string name = (!monitoring ? "in" : "app_io_in");
row.emplace_back(name, format_ss);
row.emplace_back(it->first.c_str(), format_s);
// getting stat sums for 1day (idx=0), 1h (idx=1), 5m (idx=2), 1min (idx=3)
row.emplace_back(it->second.GetDataInPeriod(60, 0, now), format_l);
row.emplace_back(it->second.GetDataInPeriod(300, 0, now), format_l);
row.emplace_back(it->second.GetDataInPeriod(3600, 0, now), format_l);
row.emplace_back(it->second.GetDataInPeriod(86400, 0, now), format_l);
row.emplace_back(it->second.GetTotalSum(), format_l);
}
out += !monitoring ? marker_b : "";
table_app_b.AddRows(table_data);
out += table_app_b.GenerateTable(HEADER).c_str();
table_data.clear();
if (sample_stat) {
// IO out bytes
for (auto it = IostatPeriodsAppIOrb.begin(); it != IostatPeriodsAppIOrb.end();
++it) {
table_data.emplace_back();
TableRow& row = table_data.back();
std::string name = (!monitoring ? "out" : "app_io_out");
std::string sample_time = "";
if (!monitoring) {
sample_time = it->second.GetLastSampleUpdateTimestamp(true);
} else {
sample_time = it->second.GetLastSampleUpdateTimestamp(false);
}
row.emplace_back(name, format_ss);
row.emplace_back(it->first.c_str(), format_s);
// getting stat sums for 1day (idx=0), 1h (idx=1), 5m (idx=2), 1min (idx=3)
row.emplace_back(it->second.GetTimeToPercComplete(P90), format_l);
row.emplace_back(it->second.GetTimeToPercComplete(P95), format_l);
row.emplace_back(it->second.GetTimeToPercComplete(P99), format_l);
row.emplace_back(it->second.GetLongestTransferTime(), format_l);
row.emplace_back(it->second.GetLongestReportTime(), format_l);
row.emplace_back(it->second.GetAvgTransferSize(), format_l);
row.emplace_back(it->second.GetTfCountInSample(), format_l);
row.emplace_back(sample_time, format_s);
}
// IO in bytes
for (auto it = IostatPeriodsAppIOwb.begin(); it != IostatPeriodsAppIOwb.end();
++it) {
table_data.emplace_back();
TableRow& row = table_data.back();
std::string sample_time = "";
if (!monitoring) {
sample_time = it->second.GetLastSampleUpdateTimestamp(true);
} else {
sample_time = it->second.GetLastSampleUpdateTimestamp(false);
}
std::string name = (!monitoring ? "in" : "app_io_in");
row.emplace_back(name, format_ss);
row.emplace_back(it->first.c_str(), format_s);
// getting stat sums for 1day (idx=0), 1h (idx=1), 5m (idx=2), 1min (idx=3)
row.emplace_back(it->second.GetTimeToPercComplete(P90), format_l);
row.emplace_back(it->second.GetTimeToPercComplete(P95), format_l);
row.emplace_back(it->second.GetTimeToPercComplete(P99), format_l);
row.emplace_back(it->second.GetLongestTransferTime(), format_l);
row.emplace_back(it->second.GetLongestReportTime(), format_l);
row.emplace_back(it->second.GetAvgTransferSize(), format_l);
row.emplace_back(it->second.GetTfCountInSample(), format_l);
row.emplace_back(sample_time, format_s);
}
out += !monitoring ? marker_sec : "";
table_app_sec.AddRows(table_data);
out += table_app_sec.GenerateTable(HEADER).c_str();
table_data.clear();
}
}
}
}
//------------------------------------------------------------------------------
// Compute and print out the namespace popularity ranking
//------------------------------------------------------------------------------
void
Iostat::PrintNsPopularity(XrdOucString& out, XrdOucString option) const
{
size_t limit = 10;
size_t popularitybin = (((time(NULL))) % (IOSTAT_POPULARITY_DAY *
IOSTAT_POPULARITY_HISTORY_DAYS)) / IOSTAT_POPULARITY_DAY;
size_t days = 1;
time_t tmarker = time(NULL) / IOSTAT_POPULARITY_DAY * IOSTAT_POPULARITY_DAY;
bool monitoring = false;
bool bycount = false;
bool bybytes = false;
bool hotfiles = false;
if ((option.find("-m")) != STR_NPOS) {
monitoring = true;
}
if ((option.find("-a")) != STR_NPOS) {
limit = 999999999;
}
if ((option.find("-100")) != STR_NPOS) {
limit = 100;
}
if ((option.find("-1000")) != STR_NPOS) {
limit = 1000;
}
if ((option.find("-10000")) != STR_NPOS) {
limit = 10000;
}
if ((option.find("-n") != STR_NPOS)) {
bycount = true;
}
if ((option.find("-b") != STR_NPOS)) {
bybytes = true;
}
if ((option.find("-w") != STR_NPOS)) {
days = IOSTAT_POPULARITY_HISTORY_DAYS;
}
if (!(bycount || bybytes)) {
bybytes = bycount = true;
}
if ((option.find("-f") != STR_NPOS)) {
hotfiles = true;
}
std::string format_s = !monitoring ? "s" : "os";
std::string format_ss = !monitoring ? "-s" : "os";
std::string format_l = !monitoring ? "l" : "ol";
std::string format_ll = !monitoring ? "-l." : "ol";
std::string format_lll = !monitoring ? "+l" : "ol";
std::string unit = !monitoring ? "B" : "";
// The 'hotfiles' are the files with highest number of present file opens
if (hotfiles) {
eos::common::RWMutexReadLock rLock(FsView::gFsView.ViewMutex);
// print the hotfiles report
std::set::const_iterator it;
std::vector r_open_vector;
std::vector w_open_vector;
std::string key;
std::string val;
TableFormatterBase table;
TableData table_data;
if (!monitoring) {
table.SetHeader({
std::make_tuple("type", 5, format_ss),
std::make_tuple("heat", 5, format_s),
std::make_tuple("fs", 5, format_s),
std::make_tuple("host", 24, format_s),
std::make_tuple("path", 24, format_ss)
});
} else {
table.SetHeader({
std::make_tuple("measurement", 0, format_ss),
std::make_tuple("access", 0, format_s),
std::make_tuple("heat", 0, format_s),
std::make_tuple("fsid", 0, format_l),
std::make_tuple("path", 0, format_ss),
std::make_tuple("fxid", 0, format_s)
});
}
for (auto it = FsView::gFsView.mIdView.begin();
it != FsView::gFsView.mIdView.end(); it++) {
r_open_vector.clear();
w_open_vector.clear();
FileSystem* fs = it->second;
if (!fs) {
continue;
}
std::string r_open_hotfiles = fs->GetString("stat.ropen.hotfiles");
std::string w_open_hotfiles = fs->GetString("stat.wopen.hotfiles");
std::string node_queue = fs->GetString("queue");
auto it_node = FsView::gFsView.mNodeView.find(node_queue);
if (it_node == FsView::gFsView.mNodeView.end()) {
continue;
}
// Check if the corresponding node has a heartbeat
bool hasHeartbeat = it_node->second->HasHeartbeat();
// we only show the reports from the last minute, there could be pending values
if (!hasHeartbeat) {
r_open_hotfiles = "";
w_open_hotfiles = "";
}
if (r_open_hotfiles == " ") {
r_open_hotfiles = "";
}
if (w_open_hotfiles == " ") {
w_open_hotfiles = "";
}
eos::common::StringConversion::Tokenize(r_open_hotfiles, r_open_vector);
eos::common::StringConversion::Tokenize(w_open_hotfiles, w_open_vector);
std::string host = fs->GetString("host");
std::string path;
std::string id = fs->GetString("id");
std::vector> data;
std::vector> data_monitoring;
// Get information for read
for (size_t i = 0; i < r_open_vector.size(); i++) {
eos::common::StringConversion::SplitKeyValue(r_open_vector[i], key, val);
int rank = 0;
if (key.c_str()) {
rank = atoi(key.c_str());
}
{
unsigned long long fid = eos::common::FileId::Hex2Fid(val.c_str());
eos::Prefetcher::prefetchFileMDWithParentsAndWait(gOFS->eosView, fid);
eos::common::RWMutexReadLock viewLock(gOFS->eosViewRWMutex);
try {
path = gOFS->eosView->getUri(gOFS->eosFileService->getFileMD(fid).get());
} catch (eos::MDException& e) {
path = "";
}
}
if (rank > 1) {
data.emplace_back(std::make_tuple(
"read", key.c_str(), id.c_str(), host.c_str(), path.c_str()));
}
data_monitoring.emplace_back(std::make_tuple(
"hotfile", "read", key.c_str(), it->first, path.c_str(), val.c_str()));
}
// Get information for write
for (size_t i = 0; i < w_open_vector.size(); i++) {
eos::common::StringConversion::SplitKeyValue(w_open_vector[i], key, val);
int rank = 0;
if (key.c_str()) {
rank = atoi(key.c_str());
}
{
unsigned long long fid = eos::common::FileId::Hex2Fid(val.c_str());
eos::Prefetcher::prefetchFileMDWithParentsAndWait(gOFS->eosView, fid);
eos::common::RWMutexReadLock viewLock(gOFS->eosViewRWMutex);
try {
path = gOFS->eosView->getUri(gOFS->eosFileService->getFileMD(fid).get());
} catch (eos::MDException& e) {
path = "";
}
}
if (rank > 1) {
data.emplace_back(std::make_tuple(
"write", key.c_str(), id.c_str(), host.c_str(), path.c_str()));
}
data_monitoring.emplace_back(std::make_tuple(
"hotfile", "write", key.c_str(), it->first, path.c_str(), val.c_str()));
}
// Sort and output
if (!monitoring) {
std::sort(data.begin(), data.end());
for (auto it : data) {
table_data.emplace_back();
TableRow& row = table_data.back();
row.emplace_back(std::get<0>(it), format_ss);
row.emplace_back(std::get<1>(it), format_s);
row.emplace_back(std::get<2>(it), format_s);
row.emplace_back(std::get<3>(it), format_s);
row.emplace_back(std::get<4>(it), format_ss);
}
} else {
std::sort(data_monitoring.begin(), data_monitoring.end());
for (auto mdata : data_monitoring) {
table_data.emplace_back();
TableRow& row = table_data.back();
row.emplace_back(std::get<0>(mdata), format_ss);
row.emplace_back(std::get<1>(mdata), format_s);
row.emplace_back(std::get<2>(mdata), format_s);
row.emplace_back(std::get<3>(mdata), format_l);
row.emplace_back(std::get<4>(mdata), format_ss);
row.emplace_back(std::get<5>(mdata), format_s);
}
}
}
table.AddRows(table_data);
out += table.GenerateTable(HEADER).c_str();
return;
}
//! Namespace IO ranking (popularity)
for (size_t pbin = 0; pbin < days; pbin++) {
std::unique_lock scope_lock(mPopularityMutex);
size_t sbin = (IOSTAT_POPULARITY_HISTORY_DAYS + popularitybin - pbin) %
IOSTAT_POPULARITY_HISTORY_DAYS;
google::sparse_hash_map::const_iterator it;
std::vector popularity_nread(IostatPopularity[sbin].begin(),
IostatPopularity[sbin].end());
std::vector popularity_rb(IostatPopularity[sbin].begin(),
IostatPopularity[sbin].end());
// sort them (backwards) by rb or nread
std::sort(popularity_nread.begin(), popularity_nread.end(),
PopularityCmp_nread());
std::sort(popularity_rb.begin(), popularity_rb.end(), PopularityCmp_rb());
XrdOucString marker = "\n┏━> Today\n";
switch (pbin) {
case 1:
marker = "\n┏━> Yesterday\n";
break;
case 2:
marker = "\n┏━> 2 days ago\n";
break;
case 3:
marker = "\n┏━> 3 days ago\n";
break;
case 4:
marker = "\n┏━> 4 days ago\n";
break;
case 5:
marker = "\n┏━> 5 days ago\n";
break;
case 6:
marker = "\n┏━> 6 days ago\n";
}
if (bycount) {
TableFormatterBase table;
TableData table_data;
if (!monitoring) {
table.SetHeader({
std::make_tuple("rank", 5, format_ll),
std::make_tuple("by(read count)", 12, format_s),
std::make_tuple("read bytes", 10, format_lll),
std::make_tuple("path", 24, format_ss),
});
} else {
table.SetHeader({
std::make_tuple("measurement", 0, format_ss),
std::make_tuple("time", 0, format_lll),
std::make_tuple("rank", 0, format_ll),
std::make_tuple("nread", 0, format_lll),
std::make_tuple("rb", 0, format_lll),
std::make_tuple("path", 0, format_ss)
});
}
size_t cnt = 0;
for (auto it : popularity_nread) {
cnt++;
if (cnt > limit) {
break;
}
table_data.emplace_back();
TableRow& row = table_data.back();
if (monitoring) {
row.emplace_back("popularitybyaccess", format_ss);
row.emplace_back((unsigned) tmarker, format_lll);
}
row.emplace_back((int) cnt, format_ll);
row.emplace_back(it.second.nread, format_lll);
row.emplace_back(it.second.rb, format_lll, unit);
row.emplace_back(it.first.c_str(), format_s);
}
if (cnt > 0) {
out += !monitoring ? marker : "";
table.AddRows(table_data);
out += table.GenerateTable(HEADER).c_str();
}
}
if (bybytes) {
TableFormatterBase table;
TableData table_data;
if (!monitoring) {
table.SetHeader({
std::make_tuple("rank", 5, format_ll),
std::make_tuple("by(read bytes)", 12, format_s),
std::make_tuple("read count", 10, format_lll),
std::make_tuple("path", 24, format_ss),
});
} else {
table.SetHeader({
std::make_tuple("measurement", 0, format_ss),
std::make_tuple("time", 0, format_lll),
std::make_tuple("rank", 0, format_ll),
std::make_tuple("nread", 0, format_lll),
std::make_tuple("rb", 0, format_lll),
std::make_tuple("path", 0, format_ss)
});
}
size_t cnt = 0;
for (auto it : popularity_rb) {
cnt++;
if (cnt > limit) {
break;
}
table_data.emplace_back();
TableRow& row = table_data.back();
if (monitoring) {
row.emplace_back("popularitybyvolume", format_ss);
row.emplace_back((unsigned) tmarker, format_lll);
}
row.emplace_back((int) cnt, format_ll);
if (!monitoring) {
row.emplace_back(it.second.rb, format_lll, unit);
row.emplace_back(it.second.nread, format_lll);
} else {
row.emplace_back(it.second.nread, format_lll);
row.emplace_back(it.second.rb, format_lll, unit);
}
row.emplace_back(it.first.c_str(), format_s);
}
table.AddRows(table_data);
out += table.GenerateTable(HEADER2).c_str();
}
}
}
//------------------------------------------------------------------------------
// Print namespace activity report for given path
//------------------------------------------------------------------------------
void
Iostat::PrintNsReport(const char* path, XrdOucString& out) const
{
XrdOucString reportFile;
reportFile = gOFS->IoReportStorePath.c_str();
reportFile += "/";
reportFile += path;
std::ifstream inFile(reportFile.c_str());
std::string reportLine;
unsigned long long totalreadbytes = 0;
unsigned long long totalwritebytes = 0;
double totalreadtime = 0;
double totalwritetime = 0;
unsigned long long rcount = 0;
unsigned long long wcount = 0;
while (std::getline(inFile, reportLine)) {
XrdOucEnv ioreport(reportLine.c_str());
auto report = std::make_unique(ioreport);
report->Dump(out);
if (!report->wb) {
rcount++;
totalreadtime += ((report->cts - report->ots) + (1.0 * (report->ctms -
report->otms) / 1000000));
totalreadbytes += report->rb;
} else {
wcount++;
totalwritetime += ((report->cts - report->ots) + (1.0 *
(report->ctms - report->otms) / 1000000));
totalwritebytes += report->wb;
}
}
out += "----------------------- SUMMARY -------------------\n";
char summaryline[4096];
XrdOucString sizestring1, sizestring2;
snprintf(summaryline, sizeof(summaryline) - 1,
"| avg. readd: %.02f MB/s | avg. write: %.02f MB/s | "
"total read: %s | total write: %s | times read: %llu | "
"times written: %llu |\n",
totalreadtime ? (totalreadbytes / totalreadtime / 1000000.0) : 0,
totalwritetime ? (totalwritebytes / totalwritetime / 1000000.0) : 0,
eos::common::StringConversion::GetReadableSizeString
(sizestring1, totalreadbytes, "B"),
eos::common::StringConversion::GetReadableSizeString
(sizestring2, totalwritebytes, "B"), (unsigned long long)rcount,
(unsigned long long)wcount);
out += summaryline;
}
//------------------------------------------------------------------------------
// Circulate the entries to get stats collected over last sec, min, hour and day
//------------------------------------------------------------------------------
void
Iostat::Circulate(ThreadAssistant& assistant) noexcept
{
while (!assistant.terminationRequested()) {
if (mLegacyMode) {
static unsigned long long sc = 0ull;
// Store once per minute the current statistics
if (sc % 117 == 0) {
sc = 0ull;
// save the current state ~ every minute
if (!LegacyStoreInFile()) {
eos_static_err("msg=\"failed store io stat dump\" path=\"%s\"",
mLegacyFilePath.c_str());
}
}
sc++;
}
assistant.wait_for(std::chrono::milliseconds(512));
google::sparse_hash_map >::iterator
tit;
google::sparse_hash_map::iterator dit;
time_t now = time(NULL);
std::unique_lock scope_lock(mDataMutex);
// loop over tags
for (tit = IostatPeriodsUid.begin(); tit != IostatPeriodsUid.end(); ++tit) {
// loop over vids
google::sparse_hash_map::iterator it;
for (it = tit->second.begin(); it != tit->second.end(); ++it) {
it->second.StampBufferZero(now);
}
}
for (tit = IostatPeriodsGid.begin(); tit != IostatPeriodsGid.end(); ++tit) {
// loop over vids
google::sparse_hash_map::iterator it;
for (it = tit->second.begin(); it != tit->second.end(); ++it) {
it->second.StampBufferZero(now);
}
}
// loop over domain accounting
for (dit = IostatPeriodsDomainIOrb.begin();
dit != IostatPeriodsDomainIOrb.end();
dit++) {
dit->second.StampBufferZero(now);
}
for (dit = IostatPeriodsDomainIOwb.begin();
dit != IostatPeriodsDomainIOwb.end();
dit++) {
dit->second.StampBufferZero(now);
}
// loop over app accounting
for (dit = IostatPeriodsAppIOrb.begin(); dit != IostatPeriodsAppIOrb.end();
dit++) {
dit->second.StampBufferZero(now);
}
for (dit = IostatPeriodsAppIOwb.begin(); dit != IostatPeriodsAppIOwb.end();
dit++) {
dit->second.StampBufferZero(now);
}
size_t popularitybin = (((time(NULL))) % (IOSTAT_POPULARITY_DAY *
IOSTAT_POPULARITY_HISTORY_DAYS)) / IOSTAT_POPULARITY_DAY;
if (mLastPopularityBin != popularitybin) {
// only if we enter a new bin we erase it
std::unique_lock scope_lock(mPopularityMutex);
IostatPopularity[popularitybin].clear();
IostatPopularity[popularitybin].resize(10000);
mLastPopularityBin = popularitybin;
}
}
eos_static_info("%s", "msg=\"stopping iostat circulate thread\"");
}
//------------------------------------------------------------------------------
// Encode the UDP popularity targets to a string using the provided separator
//------------------------------------------------------------------------------
std::string
Iostat::EncodeUdpPopularityTargets() const
{
std::string out;
std::unique_lock scope_lock(mBcastMutex);
if (mUdpPopularityTarget.empty()) {
return out;
}
for (const auto& elem : mUdpPopularityTarget) {
out += elem;
out += "|";
}
out.pop_back();
return out;
}
//------------------------------------------------------------------------------
// Add UDP target
//------------------------------------------------------------------------------
bool
Iostat::AddUdpTarget(const std::string& target, bool store_and_lock)
{
std::unique_lock scope_lock(mBcastMutex, std::defer_lock);
if (store_and_lock) {
scope_lock.lock();
}
if (mUdpPopularityTarget.insert(target).second == false) {
// Target already exists
return false;
}
// Create an UDP socket for the specified target
int udpsocket = -1;
udpsocket = socket(AF_INET, SOCK_DGRAM, 0);
if (udpsocket >= 0) {
XrdOucString a_host, a_port, hp;
int port = 0;
hp = target.c_str();
if (!eos::common::StringConversion::SplitKeyValue(hp, a_host, a_port)) {
a_host = hp;
a_port = "31000";
}
port = atoi(a_port.c_str());
mUdpSocket[target] = udpsocket;
XrdNetAddr* addrs = 0;
int nAddrs = 0;
const char* err = XrdNetUtils::GetAddrs(a_host.c_str(), &addrs, nAddrs,
XrdNetUtils::allIPv64,
XrdNetUtils::NoPortRaw);
if (err || nAddrs == 0) {
return false;
}
memcpy((struct sockaddr*) &mUdpSockAddr[target], addrs[0].SockAddr(),
sizeof(sockaddr));
delete [] addrs;
mUdpSockAddr[target].sin_family = AF_INET;
mUdpSockAddr[target].sin_port = htons(port);
}
// Store configuration if required
if (store_and_lock) {
scope_lock.unlock();
return StoreIostatConfig(&FsView::gFsView);
}
return true;
}
//------------------------------------------------------------------------------
// Remove UDP target
//------------------------------------------------------------------------------
bool
Iostat::RemoveUdpTarget(const std::string& target)
{
bool store = false;
bool retc = false;
{
std::unique_lock scop_lock(mBcastMutex);
if (mUdpPopularityTarget.count(target)) {
mUdpPopularityTarget.erase(target);
if (mUdpSocket.count(target)) {
if (mUdpSocket[target] > 0) {
// close the UDP socket
close(mUdpSocket[target]);
}
mUdpSocket.erase(target);
mUdpSockAddr.erase(target);
}
retc = true;
store = true;
}
}
if (store) {
retc &= StoreIostatConfig(&FsView::gFsView);
}
return retc;
}
//------------------------------------------------------------------------------
// Do the UDP broadcast
//------------------------------------------------------------------------------
void
Iostat::UdpBroadCast(eos::common::Report* report) const
{
std::string u = "";
char fs[1024];
std::unique_lock scope_lock(mBcastMutex);
for (auto it = mUdpPopularityTarget.cbegin();
it != mUdpPopularityTarget.cend(); ++it) {
u = "";
XrdOucString tg = it->c_str();
XrdOucString sizestring;
if (tg.endswith("/json")) {
// do json format broadcast
tg.replace("/json", "");
u += "{\"app_info\": \"";
u += report->sec_app;
u += "\",\n";
u += " \"client_domain\": \"";
u += report->sec_domain;
u += "\",\n";
u += " \"client_host\": \"";
u += report->sec_host;
u += "\",\n";
u += " \"end_time\": ";
u += eos::common::StringConversion::GetSizeString(sizestring, report->cts);
u += ",\n";
u += " \"file_lfn\": \"";
u += report->path;
u += "\",\n";
u += " \"file_size\": ";
u += eos::common::StringConversion::GetSizeString(sizestring, report->csize);
u += ",\n";
u += " \"read_average\": ";
u += eos::common::StringConversion::GetSizeString(sizestring,
report->rb / ((report->nrc) ? report->nrc : 999999999));
u += ",\n";
u += " \"read_bytes_at_close\": ";
u += eos::common::StringConversion::GetSizeString(sizestring, report->rb);
u += ",\n";
u += " \"read_bytes\": ";
u += eos::common::StringConversion::GetSizeString(sizestring, report->rb);
u += ",\n";
u += " \"read_max\": ";
u += eos::common::StringConversion::GetSizeString(sizestring, report->rb_max);
u += ",\n";
u += " \"read_min\": ";
u += eos::common::StringConversion::GetSizeString(sizestring, report->rb_min);
u += ",\n";
u += " \"read_operations\": ";
u += eos::common::StringConversion::GetSizeString(sizestring, report->nrc);
u += ",\n";
snprintf(fs, sizeof(fs) - 1, "%.02f", report->rb_sigma);
u += " \"read_sigma\": ";
u += fs;
u += ",\n";
/* -- we have currently no access to this information */
/*
u += " \"read_single_average\": "; u += "0"; u += ",\n";
u += " \"read_single_bytes\": ";
u += eos::common::StringConversion::GetSizeString(sizestring, report->rb); u += ",\n";
u += " \"read_single_max\": "; u += "0"; u += ",\n";
u += " \"read_single_min\": "; u += "0"; u += ",\n";
u += " \"read_single_operations\": ";
u += eos::common::StringConversion::GetSizeString(sizestring, report->nrc); u += ",\n";
u += " \"read_single_sigma\": "; u += "0"; u += ",\n";
u += " \"read_vector_average\": "; u += "0"; u += ",\n";
u += " \"read_vector_bytes\": "; u += "0"; u += ",\n";
u += " \"read_vector_count_average\": "; u += "0"; u += ",\n";
u += " \"read_vector_count_max\": ";u += "0"; u += ",\n";
u += " \"read_vector_count_min\": ";u += "0"; u += ",\n";
u += " \"read_vector_count_sigma\": "; u += "0"; u += ",\n";
u += " \"read_vector_max\": "; u += "0"; u += ",\n";
u += " \"read_vector_min\": "; u += "0"; u += ",\n";
u += " \"read_vector_operations\": "; u += "0"; u += ",\n";
u += " \"read_vector_sigma\": "; u += "0"; u += ",\n"; */
u += " \"server_domain\": \"";
u += report->server_domain;
u += "\",\n";
u += " \"server_host\": \"";
u += report->server_name;
u += "\",\n";
u += " \"server_username\": \"";
u += report->sec_name;
u += "\",\n";
u += " \"start_time\": ";
u += eos::common::StringConversion::GetSizeString(sizestring, report->ots);
u += ",\n";
XrdOucString stime; // stores the current time in .
u += " \"unique_id\": \"";
u += gOFS->MgmOfsInstanceName.c_str();
u += "-";
u += eos::common::StringConversion::TimeNowAsString(stime);
u += "\",\n";
u += " \"user_dn\": \"";
u += report->sec_info;
u += "\",\n";
u += " \"user_fqan\": \"";
u += report->sec_grps;
u += "\",\n";
u += " \"user_role\": \"";
u += report->sec_role;
u += "\",\n";
u += " \"user_vo\": \"";
u += report->sec_vorg;
u += "\",\n";
u += " \"write_average\": ";
u += eos::common::StringConversion::GetSizeString(sizestring,
report->wb / ((report->nwc) ? report->nwc : 999999999));
u += ",\n";
u += " \"write_bytes_at_close\": ";
u += eos::common::StringConversion::GetSizeString(sizestring, report->wb);
u += ",\n";
u += " \"write_bytes\": ";
u += eos::common::StringConversion::GetSizeString(sizestring, report->wb);
u += ",\n";
u += " \"write_max\": ";
u += eos::common::StringConversion::GetSizeString(sizestring, report->wb_max);
u += ",\n";
u += " \"write_min\": ";
u += eos::common::StringConversion::GetSizeString(sizestring, report->wb_min);
u += ",\n";
u += " \"write_operations\": ";
u += eos::common::StringConversion::GetSizeString(sizestring, report->nwc);
u += ",\n";
snprintf(fs, sizeof(fs) - 1, "%.02f", report->wb_sigma);
u += " \"write_sigma\": ";
u += fs;
u += "}\n";
} else {
// do default format broadcast
u += "#begin\n";
u += "app_info=";
u += report->sec_app;
u += "\n";
u += "client_domain=";
u += report->sec_domain;
u += "\n";
u += "client_host=";
u += report->sec_host;
u += "\n";
u += "end_time=";
u += eos::common::StringConversion::GetSizeString(sizestring, report->cts);
u += "\n";
u += "file_lfn = ";
u += report->path;
u += "\n";
u += "file_size = ";
u += eos::common::StringConversion::GetSizeString(sizestring, report->csize);
u += "\n";
u += "read_average=";
u += eos::common::StringConversion::GetSizeString(sizestring,
report->rb / ((report->nrc) ? report->nrc : 999999999));
u += "\n";
u += "read_bytes_at_close=";
u += eos::common::StringConversion::GetSizeString(sizestring, report->rb);
u += "\n";
u += "read_bytes=";
u += eos::common::StringConversion::GetSizeString(sizestring, report->rb);
u += "\n";
u += "read_min=";
u += eos::common::StringConversion::GetSizeString(sizestring, report->rb_min);
u += "\n";
u += "read_max=";
u += eos::common::StringConversion::GetSizeString(sizestring, report->rb_max);
u += "\n";
u += "read_operations=";
u += eos::common::StringConversion::GetSizeString(sizestring, report->nrc);
u += "\n";
u += "read_sigma=";
u += "0";
u += "\n";
snprintf(fs, sizeof(fs) - 1, "%.02f", report->rb_sigma);
u += "read_sigma=";
u += fs;
u += "\n";
/* -- we have currently no access to this information */
/* u += "read_single_average="; u += "0"; u += "\n";
u += "read_single_bytes="; u += eos::common::StringConversion::GetSizeString(sizestring, report->rb); u += "\n";
u += "read_single_max="; u += "0"; u += "\n";
u += "read_single_min="; u += "0"; u += "\n";
u += "read_single_operations="; u += eos::common::StringConversion::GetSizeString(sizestring, report->nrc); u += "\n";
u += "read_single_sigma="; u += "0"; u += "\n";
u += "read_vector_average="; u += "0"; u += "\n";
u += "read_vector_bytes="; u += "0"; u += "\n";
u += "read_vector_count_average="; u += "0"; u += "\n";
u += "read_vector_count_max=";u += "0"; u += "\n";
u += "read_vector_count_min=";u += "0"; u += "\n";
u += "read_vector_count_sigma="; u += "0"; u += "\n";
u += "read_vector_max="; u += "0"; u += "\n";
u += "read_vector_min="; u += "0"; u += "\n";
u += "read_vector_operations="; u += "0"; u += "\n";
u += "read_vector_sigma="; u += "0"; u += "\n";*/
u += "server_domain=";
u += report->server_domain;
u += "\n";
u += "server_host=";
u += report->server_name;
u += "\n";
u += "server_username=";
u += report->sec_name;
u += "\n";
u += "start_time=";
u += eos::common::StringConversion::GetSizeString(sizestring, report->ots);
u += "\n";
XrdOucString stime; // stores the current time in .
u += "unique_id=";
u += gOFS->MgmOfsInstanceName.c_str();
u += "-";
u += eos::common::StringConversion::TimeNowAsString(stime);
u += "\n";
u += "user_dn = ";
u += report->sec_info;
u += "\n";
u += "user_fqan=";
u += report->sec_grps;
u += "\n";
u += "user_role=";
u += report->sec_role;
u += "\n";
u += "user_vo=";
u += report->sec_vorg;
u += "\n";
u += "write_average=";
u += eos::common::StringConversion::GetSizeString(sizestring,
report->wb / ((report->nwc) ? report->nwc : 999999999));
u += "\n";
u += "write_bytes_at_close=";
u += eos::common::StringConversion::GetSizeString(sizestring, report->wb);
u += "\n";
u += "write_bytes=";
u += eos::common::StringConversion::GetSizeString(sizestring, report->wb);
u += "\n";
u += "write_min=";
u += eos::common::StringConversion::GetSizeString(sizestring, report->wb_min);
u += "\n";
u += "write_max=";
u += eos::common::StringConversion::GetSizeString(sizestring, report->wb_max);
u += "\n";
u += "write_operations=";
u += eos::common::StringConversion::GetSizeString(sizestring, report->nwc);
u += "\n";
snprintf(fs, sizeof(fs) - 1, "%.02f", report->rb_sigma);
u += "write_sigma=";
u += fs;
u += "\n";
u += "#end\n";
}
int sendretc = sendto(mUdpSocket.at(*it), u.c_str(), u.length(), 0,
(struct sockaddr*) &mUdpSockAddr.at(*it),
sizeof(struct sockaddr_in));
if (sendretc < 0) {
eos_static_err("msg=\"failed to send udp message to %s\"", it->c_str());
}
eos_static_debug("sendto_retc=%d", sendretc);
}
}
//------------------------------------------------------------------------------
// Add entry to popularity statistics
//------------------------------------------------------------------------------
void
Iostat::AddToPopularity(const std::string& path, unsigned long long rb,
time_t start, time_t stop)
{
size_t popularitybin = (((start + stop) / 2) % (IOSTAT_POPULARITY_DAY *
IOSTAT_POPULARITY_HISTORY_DAYS)) / IOSTAT_POPULARITY_DAY;
eos::common::Path cPath(path.c_str());
std::unique_lock scope_lock(mPopularityMutex);
for (size_t k = 0; k < cPath.GetSubPathSize(); ++k) {
std::string sp = cPath.GetSubPath(k);
IostatPopularity[popularitybin][sp].rb += rb;
IostatPopularity[popularitybin][sp].nread++;
}
}
//------------------------------------------------------------------------------
// Create hash map key string from the given information
//------------------------------------------------------------------------------
std::string
Iostat::EncodeKey(const std::string& id_type, const std::string& id_val,
const std::string& tag)
{
return SSTR("idt=" << id_type << "&id=" << id_val << "&tag=" << tag);
}
//------------------------------------------------------------------------------
// Decode/parse hash map key to extract entry information
//------------------------------------------------------------------------------
bool
Iostat::DecodeKey(const std::string& key, std::string& id_type,
std::string& id_val, std::string& tag)
{
std::vector tokens {};
eos::common::StringConversion::Tokenize(key, tokens, "&");
for (const auto& token : tokens) {
std::vector kv {};
eos::common::StringConversion::Tokenize(token, kv, "=");
if (kv.size() != 2) {
eos_static_err("msg=\"unexpected token format\" token=\"%s\"",
token.c_str());
return false;
}
if (kv[0] == "idt") {
id_type = kv[1];
} else if (kv[0] == "id") {
id_val = kv[1];
} else if (kv[0] == "tag") {
tag = kv[1];
} else {
eos_static_err("msg=\"unexpected key format\" key=\"%s\"", kv[0].c_str());
return false;
}
}
return true;
}
//------------------------------------------------------------------------------
// Load Iostat information from Qdb backend
//------------------------------------------------------------------------------
bool
Iostat::LoadFromQdb()
{
std::string key = GetHashKey();
eos_static_info("msg=\"loading iostat info from Qdb\" hash_map=\"%s\"",
key.c_str());
qclient::redisReplyPtr reply;
try {
reply = mQcl->exec("HGETALL", key).get();
} catch (const std::exception& e) {
eos_static_err("msg=\"failed getting entries from Qdb\", emsg=\"%s\"",
e.what());
return true;
}
qclient::HgetallParser mQdbRespParser(reply);
if (!mQdbRespParser.ok()) {
eos_static_err("%s", "msg=\"failed parsing reply from Qdb\n");
return false;
}
int id = 0;
unsigned long long val = 0ull;
std::string id_type, id_val, tag;
std::map stored_iostat = mQdbRespParser.value();
std::unique_lock scope_lock(mDataMutex);
// Clean up the memory data structures
IostatUid.clear();
IostatUid.resize(0);
IostatTag.clear();
IostatTag.resize(0);
IostatGid.clear();
IostatGid.resize(0);
for (const auto& pair : stored_iostat) {
if (!DecodeKey(pair.first, id_type, id_val, tag)) {
continue;
}
// Convert entries from string to numeric
try {
id = std::stoi(id_val);
val = std::stoull(pair.second);
} catch (...) {
eos_static_err("msg=\"failed converting to numeric format\" key=\"%s\" "
"val=\"%s\"", pair.first.c_str(), pair.second.c_str());
continue;
}
if (id_type == USER_ID_TYPE) {
IostatUid[tag][id] = val;
if (!IostatTag.count(tag)) {
IostatTag[tag] = val;
} else {
IostatTag[tag] += val;
}
} else if (id_type == GROUP_ID_TYPE) {
IostatGid[tag][id] = val;
}
}
return true;
}
//------------------------------------------------------------------------------
// Store statistics in legacy file format
//------------------------------------------------------------------------------
bool
Iostat::LegacyStoreInFile()
{
if (mLegacyFilePath.empty()) {
return false;
}
XrdOucString tmpname = mLegacyFilePath.c_str();
tmpname += ".tmp";
FILE* fout = fopen(tmpname.c_str(), "w+");
if (!fout) {
return false;
}
if (chmod(tmpname.c_str(), S_IRWXU | S_IRGRP | S_IROTH)) {
fclose(fout);
return false;
}
std::unique_lock scope_lock(mDataMutex);
// Store user counters
for (auto tuit = IostatUid.begin(); tuit != IostatUid.end(); tuit++) {
for (auto it = tuit->second.begin(); it != tuit->second.end(); ++it) {
fprintf(fout, "tag=%s&uid=%u&val=%llu\n", tuit->first.c_str(), it->first,
(unsigned long long)it->second);
}
}
// Store group counter
for (auto tgit = IostatGid.begin(); tgit != IostatGid.end(); tgit++) {
for (auto it = tgit->second.begin(); it != tgit->second.end(); ++it) {
fprintf(fout, "tag=%s&gid=%u&val=%llu\n", tgit->first.c_str(), it->first,
(unsigned long long)it->second);
}
}
fclose(fout);
return (rename(tmpname.c_str(), mLegacyFilePath.c_str()) == 0);
}
//------------------------------------------------------------------------------
// Restore statistics from legacy file format
//------------------------------------------------------------------------------
bool
Iostat::LegacyRestoreFromFile()
{
if (mLegacyFilePath.empty()) {
return false;
}
FILE* fin = fopen(mLegacyFilePath.c_str(), "r");
if (!fin) {
return true;
}
int item = 0;
char line[16384];
std::unique_lock scope_lock(mDataMutex);
while ((item = fscanf(fin, "%16383s\n", line)) == 1) {
XrdOucEnv env(line);
if (env.Get("tag") && env.Get("uid") && env.Get("val")) {
std::string tag = env.Get("tag");
uid_t uid = atoi(env.Get("uid"));
unsigned long long val = strtoull(env.Get("val"), 0, 10);
IostatUid[tag][uid] = val;
if (!IostatTag.count(tag)) {
IostatTag[tag] = val;
} else {
IostatTag[tag] += val;
}
}
if (env.Get("tag") && env.Get("gid") && env.Get("val")) {
std::string tag = env.Get("tag");
gid_t gid = atoi(env.Get("gid"));
unsigned long long val = strtoull(env.Get("val"), 0, 10);
IostatGid[tag][gid] = val;
}
}
fclose(fin);
return true;
}
EOSMGMNAMESPACE_END