//------------------------------------------------------------------------------
// File: FuseServer/Clients.cc
// Author: Andreas-Joachim Peters - CERN
//------------------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2019 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include
#include
#include
#include "mgm/FuseServer/Clients.hh"
#include "common/Logging.hh"
#include "common/CommentLog.hh"
#include "mgm/XrdMgmOfs.hh"
#include "mgm/Stat.hh"
#include "mgm/ZMQ.hh"
EOSMGMNAMESPACE_BEGIN
//------------------------------------------------------------------------------
// Retrieve global eosxd client statistics
//------------------------------------------------------------------------------
void
FuseServer::Clients::ClientStats(size_t& nclients, size_t& active_clients,
size_t& locked_clients)
{
nclients = 0;
active_clients = 0 ;
locked_clients = 0;
struct timespec now_time;
eos::common::Timing::GetTimeSpec(now_time, true);
eos::common::RWMutexReadLock lLock(*this);
for (auto it = this->map().begin(); it != this->map().end(); ++it) {
nclients++;
if (it->second.statistics().blockedms() > (5 * 1000 * 60)) {
locked_clients++;
}
int64_t idletime = (it->second.get_opstime_sec()) ? (now_time.tv_sec -
it->second.get_opstime_sec()) : -1;
if (idletime <= 300) {
active_clients++;
}
}
}
//------------------------------------------------------------------------------
// Monitor heart beat
//------------------------------------------------------------------------------
void
FuseServer::Clients::MonitorHeartBeat()
{
eos_static_info("msg=\"starting fusex heart beat thread\"");
while (true) {
EXEC_TIMING_BEGIN("Eosxd::int::MonitorHeartBeat");
client_uuid_t evictmap;
client_uuid_t evictversionmap;
struct timespec tsnow;
{
eos::common::RWMutexReadLock lLock(*this);
eos::common::Timing::GetTimeSpec(tsnow);
for (auto it = map().begin(); it != map().end(); ++it) {
double last_heartbeat = tsnow.tv_sec - it->second.heartbeat().clock() +
(((int64_t) tsnow.tv_nsec - (int64_t) it->second.heartbeat().clock_ns())
* 1.0 / 1000000000.0);
if (it->second.heartbeat().shutdown()) {
evictmap[it->second.heartbeat().uuid()] = it->first;
it->second.set_state(Client::EVICTED);
eos_static_info("client='%s' shutdown [ %s ] ",
it->first.c_str(), Info(it->first).c_str());
gOFS->MgmStats.Add("Eosxd::prot::umount", 0, 0, 1);
} else {
if (last_heartbeat > mHeartBeatWindow) {
if (last_heartbeat > mHeartBeatOfflineWindow) {
if (last_heartbeat > mHeartBeatRemoveWindow) {
evictmap[it->second.heartbeat().uuid()] = it->first;
it->second.set_state(Client::EVICTED);
eos_static_info("client='%s' evicted [ %s ] ",
it->first.c_str(), Info(it->first).c_str());
gOFS->MgmStats.Add("Eosxd::prot::evicted", 0, 0, 1);
} else {
// drop locks once
if (it->second.state() != Client::OFFLINE) {
gOFS->zMQ->gFuseServer.Locks().dropLocks(it->second.heartbeat().uuid());
eos_static_info("client='%s' offline [ %s ] ",
it->first.c_str(), Info(it->first).c_str());
gOFS->MgmStats.Add("Eosxd::prot::offline", 0, 0, 1);
}
it->second.set_state(Client::OFFLINE);
}
} else {
it->second.set_state(Client::VOLATILE);
}
} else {
it->second.set_state(Client::ONLINE);
}
}
if (it->second.heartbeat().protversion() < it->second.heartbeat().PROTOCOLV2) {
// protocol version mismatch, evict this client
evictversionmap[it->second.heartbeat().uuid()] = it->first;
it->second.set_state(Client::EVICTED);
}
}
}
// Delete clients to be evicted
if (!evictmap.empty()) {
{
std::set uuids;
{
eos::common::RWMutexWriteLock lLock(*this);
for (auto it = evictmap.begin(); it != evictmap.end(); ++it) {
uuids.insert(it->first);
mMap.erase(it->second);
mUUIDView.erase(it->first);
gOFS->zMQ->gFuseServer.Locks().dropLocks(it->first);
}
}
for (auto it = uuids.begin(); it != uuids.end(); ++it) {
gOFS->zMQ->gFuseServer.Cap().dropCaps(*it);
}
}
}
// Delete client ot be evicted because of a version mismatch
if (!evictversionmap.empty()) {
for (auto it = evictversionmap.begin(); it != evictversionmap.end(); ++it) {
std::string versionerror =
"Server supports PROTOCOLV4 and requires atleast PROTOCOLV2";
std::string uuid = it->first;
Evict(uuid, versionerror);
eos::common::RWMutexWriteLock lLock(*this);
mMap.erase(it->second);
mUUIDView.erase(it->first);
}
}
gOFS->zMQ->gFuseServer.Flushs().expireFlush();
EXEC_TIMING_END("Eosxd::int::MonitorHeartBeat");
std::this_thread::sleep_for(std::chrono::seconds(1));
if (should_terminate()) {
break;
}
if (gOFS) {
gOFS->MgmStats.Add("Eosxd::int::MonitorHeartBeat", 0, 0, 1);
}
}
return ;
}
//------------------------------------------------------------------------------
//
//------------------------------------------------------------------------------
bool
FuseServer::Clients::Dispatch(const std::string identity,
eos::fusex::heartbeat& hb)
{
gOFS->MgmStats.Add("Eosxd::int::Heartbeat", 0, 0, 1);
EXEC_TIMING_BEGIN("Eosxd::int::Heartbeat");
bool rc = true;
eos::common::RWMutexWriteLock lLock(*this);
std::set caps_to_revoke;
if (this->map().count(identity)) {
rc = false;
}
// if heartbeats are older than the offline window, we just ignore them to avoid client 'waving'
struct timespec tsnow;
eos::common::Timing::GetTimeSpec(tsnow);
double heartbeat_delay = tsnow.tv_sec - hb.clock() + (((
int64_t) tsnow.tv_nsec - (int64_t) hb.clock_ns()) * 1.0 / 1000000000.0);
if (heartbeat_delay > mHeartBeatOfflineWindow) {
eos_static_warning("delayed heartbeat from client=%s - delay=%.02f - dropping heartbeat",
identity.c_str(), heartbeat_delay);
return rc;
}
if (hb.log().size()) {
gOFS->mFusexLogTraces->Add(time(NULL), hb.host().c_str(), hb.uuid().c_str(),
hb.version().c_str(), std::string(hb.host() + ":" + hb.mount()).c_str() ,
hb.log().c_str(), 0);
hb.clear_log();
}
if (hb.trace().size()) {
gOFS->mFusexStackTraces->Add(time(NULL), hb.host().c_str(), hb.uuid().c_str(),
hb.version().c_str(), std::string(hb.host() + ":" + hb.mount()).c_str() ,
hb.trace().c_str(), 0);
hb.clear_trace();
}
(this->map())[identity].heartbeat() = hb;
// tag first ops time
if (!this->map()[identity].get_opstime_sec()) {
this->map()[identity].tag_opstime();
}
(this->uuidview())[hb.uuid()] = identity;
lLock.Release();
{
// apply auth revocation requested by the client
auto map = hb.mutable_authrevocation();
for (auto it = map->begin(); it != map->end(); ++it) {
Caps::shared_cap cap = gOFS->zMQ->gFuseServer.Cap().GetTS(it->first);
if (cap) {
caps_to_revoke.insert(cap);
eos_static_debug("cap-revocation: authid=%s vtime:= %u",
it->first.c_str(),
(*cap)()->vtime());
}
}
}
if (rc) {
{
eos::common::RWMutexReadLock lLock(*this);
eos_static_info("client='%s' mount [ %s ] ", identity.c_str(),
Info(identity).c_str());
}
gOFS->MgmStats.Add("Eosxd::prot::mount", 0, 0, 1);
// ask a client to drop all caps when we see him the first time because we might have lost our caps due to a restart/failover
BroadcastDropAllCaps(identity, hb);
// communicate our current heart-beat interval
eos::fusex::config cfg;
cfg.set_hbrate(mHeartBeatInterval);
cfg.set_dentrymessaging(true);
cfg.set_writesizeflush(true);
cfg.set_appname(true);
cfg.set_mdquery(true);
cfg.set_hideversion(true);
cfg.set_serverversion(std::string(VERSION) + std::string("::") + std::string(
RELEASE));
BroadcastConfig(identity, cfg);
} else {
if (caps_to_revoke.size()) {
gOFS->MgmStats.Add("Eosxd::int::AuthRevocation", 0, 0, caps_to_revoke.size());
EXEC_TIMING_BEGIN("Eosxd::int::AuthRevocation");
// revoke LEASES by cap
for (auto it = caps_to_revoke.begin(); it != caps_to_revoke.end(); ++it) {
gOFS->zMQ->gFuseServer.Cap().RemoveTS(*it);
}
EXEC_TIMING_END("Eosxd::int::AuthRevocation");
}
}
EXEC_TIMING_END("Eosxd::int::Heartbeat");
return rc;
}
//------------------------------------------------------------------------------
// Get Clients Info - the caller must hold a lock as we access the map!
//------------------------------------------------------------------------------
std::string
FuseServer::Clients::Info(const std::string& identity)
{
std::string out;
char formatline[16384];
struct timespec tsnow;
eos::common::Timing::GetTimeSpec(tsnow);
auto it = this->map().find(identity);
if (it != this->map().end()) {
snprintf(formatline, sizeof(formatline),
"name=%s host=%s version=%s state=%s start=%s dt=[%.02f:%.02f] uuid=%s pid=%u fds=%u type=%s mount=%s app=%s",
it->second.heartbeat().name().c_str(),
it->second.heartbeat().host().c_str(),
it->second.heartbeat().version().c_str(),
it->second.status[it->second.state()],
eos::common::Timing::utctime(it->second.heartbeat().starttime()).c_str(),
(int64_t)tsnow.tv_sec - (int64_t)it->second.heartbeat().clock() +
(((int64_t) tsnow.tv_nsec -
(int64_t) it->second.heartbeat().clock_ns()) * 1.0 / 1000000000.0),
it->second.heartbeat().delta() * 1000,
it->second.heartbeat().uuid().c_str(),
it->second.heartbeat().pid(),
it->second.statistics().open_files(),
it->second.heartbeat().automounted() ? "autofs" : "static",
it->second.heartbeat().mount().c_str(),
it->second.heartbeat().appname().c_str()
);
out += formatline;
}
return out;
}
//------------------------------------------------------------------------------
//
//------------------------------------------------------------------------------
void
FuseServer::Clients::Print(std::string& out, std::string options)
{
struct timespec tsnow;
eos::common::Timing::GetTimeSpec(tsnow);
std::unordered_map clientcaps;
for (const auto& cap : gOFS->zMQ->gFuseServer.Cap().GetAllCaps()) {
if ((*cap)()->id()) {
clientcaps[(*cap)()->clientuuid()]++;
}
}
struct timespec now_time;
eos::common::Timing::GetTimeSpec(now_time, true);
eos::common::RWMutexReadLock lLock(*this);
for (auto it = this->map().begin(); it != this->map().end(); ++it) {
char formatline[4096];
bool t5min_idle, t1h_idle, t1d_idle, t1w_idle;
t5min_idle = t1h_idle = t1d_idle = t1w_idle = false;
int64_t idletime = (it->second.get_opstime_sec()) ? (now_time.tv_sec -
it->second.get_opstime_sec()) : -1;
if (idletime >= 300) {
t5min_idle = it->second.validate_opstime(now_time, 300);
t1h_idle = it->second.validate_opstime(now_time, 3600);
t1d_idle = it->second.validate_opstime(now_time, 86400);
t1w_idle = it->second.validate_opstime(now_time, 7 * 86400);
}
std::string idle;
std::string lockup;
std::string lockfunc;
// preset the idle string
if (idletime > 300) {
if (t1w_idle) {
idle = ">1w";
} else {
if (t1d_idle) {
idle = ">1d";
} else {
if (t1h_idle) {
idle = ">1h";
} else {
if (t5min_idle) {
idle = ">5m";
} else {
idle = "act";
}
}
}
}
} else {
idle = "act";
}
// if ( it->second.statistics().blockedms() > (5*1000*60) ) {
if (it->second.statistics().blockedms() > (5 * 1000)) {
// a mutex hanging for longer than 5 minutes marks a client as locked up
lockup = "locked:";
lockup += it->second.statistics().blockedfunc();
lockfunc = it->second.statistics().blockedfunc();
} else {
lockup = "vacant";
}
if (options.find("m") == std::string::npos) {
snprintf(formatline, sizeof(formatline),
"client : %-8s %32s %-8s %-8s %s %.02f %.02f %36s p=%u caps=%lu fds=%u %s [%s] %s mount=%s prot=%d app=%s\n",
it->second.heartbeat().name().c_str(),
it->second.heartbeat().host().c_str(),
it->second.heartbeat().version().c_str(),
it->second.status[it->second.state()],
eos::common::Timing::utctime(it->second.heartbeat().starttime()).c_str(),
(int64_t)tsnow.tv_sec - (int64_t)it->second.heartbeat().clock() +
(((int64_t) tsnow.tv_nsec -
(int64_t) it->second.heartbeat().clock_ns()) * 1.0 / 1000000000.0),
it->second.heartbeat().delta() * 1000,
it->second.heartbeat().uuid().c_str(),
it->second.heartbeat().pid(),
clientcaps[it->second.heartbeat().uuid()],
it->second.statistics().open_files(),
it->second.heartbeat().automounted() ? "autofs" : "static",
lockup.c_str(),
idle.c_str(),
it->second.heartbeat().mount().c_str(),
it->second.heartbeat().protversion(),
it->second.heartbeat().appname().c_str()
);
out += formatline;
}
if (options.find("l") != std::string::npos) {
snprintf(formatline, sizeof(formatline),
"...... ino : %ld\n"
"...... ino-to-del : %ld\n"
"...... ino-backlog : %ld\n"
"...... ino-ever : %ld\n"
"...... ino-ever-del : %ld\n"
"...... threads : %d\n"
"...... total-ram : %.03f GB\n"
"...... free-ram : %.03f GB\n"
"...... vsize : %.03f GB\n"
"...... rsize : %.03f GB\n"
"...... wr-buf-mb : %.00f MB\n"
"...... ra-buf-mb :%.00f MB\n"
"...... load1 : %.02f\n"
"...... leasetime : %u s\n"
"...... open-files : %u\n"
"...... logfile-size : %lu\n"
"...... rbytes : %lu\n"
"...... wbytes : %lu\n"
"...... n-op : %lu\n"
"...... rd60 : %.02f MB/s\n"
"...... wr60 : %.02f MB/s\n"
"...... iops60 : %.02f \n"
"...... xoff : %lu\n"
"...... ra-xoff : %lu\n"
"...... ra-nobuf : %lu\n"
"...... wr-nobuf : %lu\n"
"...... idle : %ld\n"
"...... recovery-ok : %d\n"
"...... recovery-fail: %d\n"
"...... blockedms : %.02f [%s]\n"
"...... blockedops : %u\n"
"...... blockedroot : %s\n",
it->second.statistics().inodes(),
it->second.statistics().inodes_todelete(),
it->second.statistics().inodes_backlog(),
it->second.statistics().inodes_ever(),
it->second.statistics().inodes_ever_deleted(),
it->second.statistics().threads(),
it->second.statistics().total_ram_mb() / 1024.0,
it->second.statistics().free_ram_mb() / 1024.0,
it->second.statistics().vsize_mb() / 1024.0,
it->second.statistics().rss_mb() / 1024.0,
it->second.statistics().wr_buf_mb(),
it->second.statistics().ra_buf_mb(),
it->second.statistics().load1(),
it->second.heartbeat().leasetime() ? it->second.heartbeat().leasetime() : 300,
it->second.statistics().open_files(),
it->second.statistics().logfilesize(),
it->second.statistics().rbytes(),
it->second.statistics().wbytes(),
it->second.statistics().nio(),
it->second.statistics().rd_rate_60_mb(),
it->second.statistics().wr_rate_60_mb(),
it->second.statistics().iops_60(),
it->second.statistics().xoff(),
it->second.statistics().raxoff(),
it->second.statistics().ranobuf(),
it->second.statistics().wrnobuf(),
idletime,
it->second.statistics().recovery_ok(),
it->second.statistics().recovery_fail(),
it->second.statistics().blockedms(),
it->second.statistics().blockedfunc().length() ?
it->second.statistics().blockedfunc().c_str() : "none",
it->second.statistics().blockedops(),
it->second.statistics().blockedroot() ? "true" : "false"
);
out += formatline;
}
if (options.find("m") != std::string::npos) {
snprintf(formatline, sizeof(formatline),
"client=%s host=%s version=%s state=%s time=\"%s\" tof=%.02f delta=%.02f uuid=%s pid=%u caps=%lu fds=%u type=%s mount=\"%s\" prot=%d app=%s "
"ino=%ld "
"ino-to-del=%ld "
"ino-backlog=%ld "
"ino-ever=%ld "
"ino-ever-del=%ld "
"threads=%d "
"total-ram-gb=%.03f "
"free-ram-gb=%.03f "
"vsize-gb=%.03f "
"rsize-gb=%.03f "
"wr-buf-mb=%.00f "
"ra-buf-mb=%.00f "
"load1=%.02f "
"leasetime=%u "
"open-files=%u "
"logfile-size=%lu "
"rbytes=%lu "
"wbytes=%lu "
"n-op=%lu "
"rd60-rate-mb=%.02f "
"wr60-rate-mb=%.02f "
"iops60=%.02f "
"xoff=%lu "
"ra-xoff=%lu "
"ra-nobuf=%lu "
"wr-nobuf=%lu "
"idle=%ld "
"recovery-ok=%d "
"recovery-fail=%d "
"blockedms=%f "
"blockedfunc=%s "
"blockedops=%u "
"blockedroot=%d\n",
it->second.heartbeat().name().c_str(),
it->second.heartbeat().host().c_str(),
it->second.heartbeat().version().c_str(),
it->second.status[it->second.state()],
eos::common::Timing::utctime(it->second.heartbeat().starttime()).c_str(),
(int64_t)tsnow.tv_sec - (int64_t)it->second.heartbeat().clock() +
(((int64_t) tsnow.tv_nsec -
(int64_t) it->second.heartbeat().clock_ns()) * 1.0 / 1000000000.0),
it->second.heartbeat().delta() * 1000,
it->second.heartbeat().uuid().c_str(),
it->second.heartbeat().pid(),
clientcaps[it->second.heartbeat().uuid()],
it->second.statistics().open_files(),
it->second.heartbeat().automounted() ? "autofs" : "static",
it->second.heartbeat().mount().c_str(),
it->second.heartbeat().protversion(),
it->second.heartbeat().appname().c_str(),
it->second.statistics().inodes(),
it->second.statistics().inodes_todelete(),
it->second.statistics().inodes_backlog(),
it->second.statistics().inodes_ever(),
it->second.statistics().inodes_ever_deleted(),
it->second.statistics().threads(),
it->second.statistics().total_ram_mb() / 1024.0,
it->second.statistics().free_ram_mb() / 1024.0,
it->second.statistics().vsize_mb() / 1024.0,
it->second.statistics().rss_mb() / 1024.0,
it->second.statistics().wr_buf_mb(),
it->second.statistics().ra_buf_mb(),
it->second.statistics().load1(),
it->second.heartbeat().leasetime() ? it->second.heartbeat().leasetime() : 300,
it->second.statistics().open_files(),
it->second.statistics().logfilesize(),
it->second.statistics().rbytes(),
it->second.statistics().wbytes(),
it->second.statistics().nio(),
it->second.statistics().rd_rate_60_mb(),
it->second.statistics().wr_rate_60_mb(),
it->second.statistics().iops_60(),
it->second.statistics().xoff(),
it->second.statistics().raxoff(),
it->second.statistics().ranobuf(),
it->second.statistics().wrnobuf(),
idletime,
it->second.statistics().recovery_ok(),
it->second.statistics().recovery_fail(),
it->second.statistics().blockedms(),
it->second.statistics().blockedfunc().length() ?
it->second.statistics().blockedfunc().c_str() : "none",
it->second.statistics().blockedops(),
it->second.statistics().blockedroot());
out += formatline;
}
if (options.find("k") != std::string::npos) {
std::map> rlocks;
std::map> wlocks;
gOFS->zMQ->gFuseServer.Locks().lsLocks(it->second.heartbeat().uuid(), rlocks,
wlocks);
for (auto rit = rlocks.begin(); rit != rlocks.end(); ++rit) {
if (rit->second.size()) {
snprintf(formatline, sizeof(formatline), " t:rlock i:%016lx p:",
rit->first);
out += formatline;
std::string pidlocks;
for (auto pit = rit->second.begin(); pit != rit->second.end(); ++pit) {
if (pidlocks.length()) {
pidlocks += ",";
}
char spid[16];
snprintf(spid, sizeof(spid), "%u", *pit);
pidlocks += spid;
}
out += pidlocks;
out += "\n";
}
}
for (auto wit = wlocks.begin(); wit != wlocks.end(); ++wit) {
if (wit->second.size()) {
snprintf(formatline, sizeof(formatline), " t:wlock i:%016lx p:",
wit->first);
out += formatline;
std::string pidlocks;
for (auto pit = wit->second.begin(); pit != wit->second.end(); ++pit) {
if (pidlocks.length()) {
pidlocks += ",";
}
char spid[16];
snprintf(spid, sizeof(spid), "%u", *pit);
pidlocks += spid;
}
out += pidlocks;
out += "\n";
}
}
}
}
}
//------------------------------------------------------------------------------
//
//------------------------------------------------------------------------------
size_t
FuseServer::Clients::leasetime(const std::string& uuid)
{
// requires a Client read lock
size_t leasetime = 0;
if (this->uuidview().count(uuid) &&
this->map().count(this->uuidview()[uuid])) {
leasetime = this->map()[this->uuidview()[uuid]].heartbeat().leasetime();
}
if (leasetime > (7 * 86400)) {
// don't allow longer lease times as a week
leasetime = 7 * 86400;
}
return leasetime;
}
//------------------------------------------------------------------------------
//
//------------------------------------------------------------------------------
int
FuseServer::Clients::Evict(std::string& uuid, std::string reason,
std::vector* evicted_out)
{
if ((uuid == "static") ||
(uuid == "autofs")) {
std::vector> evict_uuids;
// evict static or autofs clients with criteria
if (reason.substr(0, 4) == "mem:") {
// evict according to memory footprint
uint64_t memory_condition = strtoull(reason.substr(4).c_str(), 0, 10);
if (memory_condition) {
eos::common::RWMutexReadLock lLock(*this);
for (auto it = this->map().begin(); it != this->map().end(); ++it) {
if ((uuid == "static") && (it->second.heartbeat().automounted())) {
continue;
}
if ((uuid == "autofs") && (!it->second.heartbeat().automounted())) {
continue;
}
if (it->second.statistics().rss_mb() > memory_condition) {
std::string memreason = "consuming ";
memreason += std::to_string(it->second.statistics().rss_mb());
memreason += " MB of resident memory";
evict_uuids.push_back(std::pair
(it->second.heartbeat().uuid(),
memreason));
}
}
}
int retc = 0;
for (auto it : evict_uuids) {
retc |= Evict(it.first, it.second, evicted_out);
}
return retc;
} else if (reason.substr(0, 5) == "idle:") {
// evict according to idle time
int64_t idle_condition = strtoull(reason.substr(5).c_str(), 0, 10);
if (idle_condition) {
struct timespec now_time;
eos::common::Timing::GetTimeSpec(now_time, true);
eos::common::RWMutexReadLock lLock(*this);
for (auto it = this->map().begin(); it != this->map().end(); ++it) {
if ((uuid == "static") && (it->second.heartbeat().automounted())) {
continue;
}
if ((uuid == "autofs") && (!it->second.heartbeat().automounted())) {
continue;
}
int64_t idletime = (it->second.get_opstime_sec()) ? (now_time.tv_sec -
it->second.get_opstime_sec()) : -1;
if (idletime > idle_condition) {
std::string idlereason = "longer than ";
idlereason += std::to_string(idletime);
idlereason += " seconds idle";
evict_uuids.push_back(std::pair
(it->second.heartbeat().uuid(),
idlereason));
}
}
}
int retc = 0;
for (auto it : evict_uuids) {
retc |= Evict(it.first, it.second, evicted_out);
}
return retc;
}
} else {
// prepare eviction message for a client by uuid
eos::fusex::response rsp;
rsp.set_type(rsp.EVICT);
rsp.mutable_evict_()->set_reason(reason);
std::string rspstream;
rsp.SerializeToString(&rspstream);
eos::common::RWMutexReadLock lLock(*this);
if (!mUUIDView.count(uuid)) {
// even if this uuid does not exist we can use it to remove stale locks
gOFS->zMQ->gFuseServer.Locks().dropLocks(uuid);
return ENOENT;
}
std::string id = mUUIDView[uuid];
lLock.Release();
eos_static_info("msg=\"evicting client\" uuid=%s name=%s",
uuid.c_str(), id.c_str());
if (evicted_out) {
std::string out = "uuid=";
out += uuid;
out += " name=";
out += id;
out += " reason='";
out += reason;
out += "'";
evicted_out->push_back(out);
}
gOFS->zMQ->mTask->reply(id, rspstream);
return 0;
}
return EINVAL;
}
//------------------------------------------------------------------------------
//
//------------------------------------------------------------------------------
int
FuseServer::Clients::DeleteEntry(uint64_t md_ino,
const std::string& uuid,
const std::string& clientid,
const std::string& name,
struct timespec& pt_mtime
)
{
gOFS->MgmStats.Add("Eosxd::int::DeleteEntry", 0, 0, 1);
EXEC_TIMING_BEGIN("Eosxd::int::DeleteEntry");
// prepare release cap message
eos::fusex::response rsp;
rsp.set_type(rsp.DENTRY);
rsp.mutable_dentry_()->set_type(eos::fusex::dentry::REMOVE);
rsp.mutable_dentry_()->set_name(name);
rsp.mutable_dentry_()->set_md_ino(md_ino);
rsp.mutable_dentry_()->set_clientid(clientid);
rsp.mutable_dentry_()->set_pt_mtime(pt_mtime.tv_sec);
rsp.mutable_dentry_()->set_pt_mtime_ns(pt_mtime.tv_nsec);
std::string rspstream;
rsp.SerializeToString(&rspstream);
eos::common::RWMutexReadLock lLock(*this);
if (!mUUIDView.count(uuid)) {
return ENOENT;
}
std::string id = mUUIDView[uuid];
lLock.Release();
eos_static_info("msg=\"asking dentry deletion\" uuid=%s clientid=%s id=%lx name=%s",
uuid.c_str(), clientid.c_str(), md_ino, name.c_str());
gOFS->zMQ->mTask->reply(id, rspstream);
EXEC_TIMING_END("Eosxd::int::DeleteEntry");
return 0;
}
//------------------------------------------------------------------------------
//
//------------------------------------------------------------------------------
int
FuseServer::Clients::RefreshEntry(uint64_t md_ino,
const std::string& uuid,
const std::string& clientid,
bool notprot5
)
{
EXEC_TIMING_BEGIN("Eosxd::int::RefreshEntry");
// prepare release cap message
eos::fusex::response rsp;
rsp.set_type(rsp.REFRESH);
rsp.mutable_refresh_()->set_md_ino(md_ino);
std::string rspstream;
rsp.SerializeToString(&rspstream);
eos::common::RWMutexReadLock lLock(*this);
if (!mUUIDView.count(uuid)) {
return ENOENT;
}
std::string id = mUUIDView[uuid];
eos_static_info("client=%s\n", map()[id].heartbeat().version().c_str());
if (notprot5 && map()[id].heartbeat().protversion() >= map()[id].heartbeat().PROTOCOLV5) {
// this protocol version does not need refresh messages
if (EOS_LOGS_DEBUG) {
eos_static_debug("suppressing refresh to client '%s' version='%s' protocl='%d'",
clientid.c_str(), map()[id].heartbeat().version().c_str(),
map()[id].heartbeat().protversion());
}
} else {
if (DeferClient(map()[id].heartbeat().version(), "4.4.18")) {
// dont' send refresh to client version < 4.4.18 (4.4.17 deadlocks, others ignore)
eos_static_info("suppressing refresh to client '%s' version='%s'",
clientid.c_str(), map()[id].heartbeat().version().c_str());
} else {
gOFS->MgmStats.Add("Eosxd::int::RefreshEntry", 0, 0, 1);
std::string id = mUUIDView[uuid];
lLock.Release();
eos_static_info("msg=\"asking dentry refresh\" uuid=%s clientid=%s id=%lx",
uuid.c_str(), clientid.c_str(), md_ino);
gOFS->zMQ->mTask->reply(id, rspstream);
}
}
EXEC_TIMING_END("Eosxd::int::RefreshEntry");
return 0;
}
//------------------------------------------------------------------------------
//
//------------------------------------------------------------------------------
int
FuseServer::Clients::SendMD(const eos::fusex::md& md,
const std::string& uuid,
const std::string& clientid,
uint64_t md_ino,
uint64_t md_pino,
uint64_t clock,
struct timespec& p_mtime
)
/*----------------------------------------------------------------------------*/
{
gOFS->MgmStats.Add("Eosxd::int::SendMD", 0, 0, 1);
EXEC_TIMING_BEGIN("Eosxd::int::SendMD");
// prepare update message
eos::fusex::response rsp;
rsp.set_type(rsp.MD);
*(rsp.mutable_md_()) = md;
rsp.mutable_md_()->set_type(eos::fusex::md::MD);
// the client needs this to sort out the quota accounting using the cap map
rsp.mutable_md_()->set_clientid(clientid);
// when a file is created the inode is not yet written in the const md object
rsp.mutable_md_()->set_md_ino(md_ino);
rsp.mutable_md_()->set_md_pino(md_pino);
if (p_mtime.tv_sec) {
rsp.mutable_md_()->set_pt_mtime(p_mtime.tv_sec);
rsp.mutable_md_()->set_pt_mtime_ns(p_mtime.tv_nsec);
}
rsp.mutable_md_()->set_clock(clock);
std::string rspstream;
rsp.SerializeToString(&rspstream);
eos::common::RWMutexReadLock lLock(*this);
if (!mUUIDView.count(uuid)) {
return ENOENT;
}
std::string id = mUUIDView[uuid];
lLock.Release();
eos_static_debug("msg=\"sending md update\" uuid=%s clientid=%s id=%lx",
uuid.c_str(), clientid.c_str(), md_ino);
gOFS->zMQ->mTask->reply(id, rspstream);
EXEC_TIMING_END("Eosxd::int::SendMD");
return 0;
}
//------------------------------------------------------------------------------
//
//------------------------------------------------------------------------------
int
FuseServer::Clients::SendCAP(FuseServer::Caps::shared_cap cap)
/*----------------------------------------------------------------------------*/
{
gOFS->MgmStats.Add("Eosxd::int::SendCAP", 0, 0, 1);
EXEC_TIMING_BEGIN("Eosxd::int::SendCAP");
// prepare update message
eos::fusex::response rsp;
rsp.set_type(rsp.CAP);
*(rsp.mutable_cap_()) = *(*cap)();
const std::string& uuid = (*cap)()->clientuuid();
std::string rspstream;
rsp.SerializeToString(&rspstream);
eos::common::RWMutexReadLock lLock(*this);
if (!mUUIDView.count(uuid)) {
return ENOENT;
}
std::string clientid = mUUIDView[uuid];
lLock.Release();
eos_static_info("msg=\"sending cap update\" uuid=%s clientid=%s cap-id=%lx",
uuid.c_str(), clientid.c_str(), (*cap)()->id());
gOFS->zMQ->mTask->reply(clientid, rspstream);
EXEC_TIMING_END("Eosxd::int::SendCAP");
return 0;
}
//------------------------------------------------------------------------------
//
//------------------------------------------------------------------------------
void
FuseServer::Clients::HandleStatistics(const std::string identity,
const eos::fusex::statistics& stats)
{
eos::common::RWMutexWriteLock lLock(*this);
uint64_t previous_ops = (this->map())[identity].statistics().nio();
(this->map())[identity].statistics() = stats;
// update the last ops time whenever the operations counter changes
// this is very rough and only precise to the interval of statistic updates
if (!previous_ops ||
((this->map())[identity].statistics().nio() != previous_ops)) {
(this->map())[identity].tag_opstime();
}
if (EOS_LOGS_DEBUG) {
eos_static_debug("");
}
}
//------------------------------------------------------------------------------
//
//------------------------------------------------------------------------------
bool
FuseServer::Clients::DeferClient(std::string clientversion,
std::string allowversion)
{
std::vector v1;
std::vector v2;
eos::common::StringConversion::Tokenize(clientversion, v1, ".");
eos::common::StringConversion::Tokenize(allowversion, v2, ".");
unsigned long client_v = 0;
unsigned long allowd_v = 0;
if (v1.size() == v2.size()) {
for (size_t i = 0; i != v1.size(); i++) {
if (i != 0) {
client_v *= 1000;
allowd_v *= 1000;
}
client_v += strtoul(v1[i].c_str(), 0, 10);
allowd_v += strtoul(v2[i].c_str(), 0, 10);
}
}
if (EOS_LOGS_DEBUG) {
eos_static_debug("client-v:%lu allowd-v:%lu (%s/%s)", client_v, allowd_v,
clientversion.c_str(), allowversion.c_str());
}
return (client_v < allowd_v);
}
/*----------------------------------------------------------------------------*/
int
FuseServer::Clients::SetHeartbeatInterval(int interval)
{
// broadcast to all clients
eos::common::RWMutexWriteLock lLock(*this);
mHeartBeatInterval = interval;
for (auto it = this->map().begin(); it != this->map().end(); ++it) {
std::string uuid = it->second.heartbeat().uuid();
std::string id = mUUIDView[uuid];
if (id.length()) {
eos::fusex::config cfg;
cfg.set_hbrate(interval);
cfg.set_dentrymessaging(true);
cfg.set_writesizeflush(true);
cfg.set_appname(true);
cfg.set_mdquery(true);
cfg.set_hideversion(true);
cfg.set_serverversion(std::string(VERSION) + std::string("::") + std::string(
RELEASE));
BroadcastConfig(id, cfg);
}
}
return 0;
}
/*----------------------------------------------------------------------------*/
int
FuseServer::Clients::SetQuotaCheckInterval(int interval)
{
eos::common::RWMutexWriteLock lLock(*this);
mQuotaCheckInterval = interval;
return 0;
}
/*----------------------------------------------------------------------------*/
/*----------------------------------------------------------------------------*/
int
FuseServer::Clients::BroadcastConfig(const std::string& identity,
eos::fusex::config& cfg)
/*----------------------------------------------------------------------------*/
{
gOFS->MgmStats.Add("Eosxd::int::BcConfig", 0, 0, 1);
EXEC_TIMING_BEGIN("Eosxd::int::BcConfig");
// prepare new heartbeat interval message
eos::fusex::response rsp;
rsp.set_type(rsp.CONFIG);
*(rsp.mutable_config_()) = cfg;
std::string rspstream;
rsp.SerializeToString(&rspstream);
eos_static_info("msg=\"broadcast config to client\" name=%s heartbeat-rate=%d",
identity.c_str(), cfg.hbrate());
gOFS->zMQ->mTask->reply(identity, rspstream);
EXEC_TIMING_END("Eosxd::int::BcConfig");
return 0;
}
/*----------------------------------------------------------------------------*/
int
FuseServer::Clients::BroadcastDropAllCaps(const std::string& identity,
eos::fusex::heartbeat& hb)
/*----------------------------------------------------------------------------*/
{
gOFS->MgmStats.Add("Eosxd::int::BcDropAll", 0, 0, 1);
EXEC_TIMING_BEGIN("Eosxd::int::BcDropAll");
// prepare drop all caps message
eos::fusex::response rsp;
rsp.set_type(rsp.DROPCAPS);
std::string rspstream;
rsp.SerializeToString(&rspstream);
eos_static_info("msg=\"broadcast drop-all-caps to client\" uuid=%s name=%s",
hb.uuid().c_str(), identity.c_str());
gOFS->zMQ->mTask->reply(identity, rspstream);
EXEC_TIMING_END("Eosxd::int::BcDropAll");
return 0;
}
/*----------------------------------------------------------------------------*/
void
FuseServer::Clients::SetBroadCastMaxAudience(int size)
/*----------------------------------------------------------------------------*/
{
mMaxBroadCastAudience = size;
}
/*----------------------------------------------------------------------------*/
void
FuseServer::Clients::SetBroadCastAudienceSuppressMatch(const std::string& match)
{
mMaxbroadCastAudienceMatch = match;
}
EOSMGMNAMESPACE_END