//------------------------------------------------------------------------------
//! @file md.cc
//! @author Andreas-Joachim Peters CERN
//! @brief meta data handling class
//------------------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include
#include
#include
#include
#include
#include
#include
#include "cap/cap.hh"
#include "common/Logging.hh"
#include "common/Path.hh"
#include "common/StackTrace.hh"
#include "common/StringConversion.hh"
#include
#include "eosfuse.hh"
#include "kv/kv.hh"
#include "md/md.hh"
#include "md/kernelcache.hh"
#include "misc/MacOSXHelper.hh"
#include "misc/longstring.hh"
/* -------------------------------------------------------------------------- */
metad::metad() : last_heartbeat(0), mdflush(0), mCb(0),
mdqueue_max_backlog(1000), z_ctx(0), z_socket(0)
{
// make a mapping for inode 1, it is re-loaded afterwards in init '/'
{
inomap.insert(1, 1);
}
shared_md md = std::make_shared(1);
(*md)()->set_nlink(1);
(*md)()->set_mode(S_IRWXU | S_IRWXG | S_IRWXO | S_IFDIR);
(*md)()->set_name(":root:");
(*md)()->set_pid(1);
stat.inodes_inc();
stat.inodes_ever_inc();
set_is_visible(0);
mdbackend = 0;
mdmap.insertTS(1, md);
}
/* -------------------------------------------------------------------------- */
metad::~metad()
{
if (z_socket) {
delete z_socket;
}
if (z_ctx) {
delete z_ctx;
}
}
/* -------------------------------------------------------------------------- */
void
metad::init(backend* _mdbackend)
{
mdbackend = _mdbackend;
std::string mdstream;
// load the root node
fuse_id fuseid;
XrdSysMutexHelper mLock(mdmap);
update(fuseid, mdmap[1], "", true);
mdmap.init(EosFuse::Instance().getKV());
dentrymessaging = false;
writesizeflush = false;
appname = false;
mdquery = false;
serverversion = "";
}
/* -------------------------------------------------------------------------- */
int
metad::connect(std::string zmqtarget, std::string zmqidentity,
std::string zmqname, std::string zmqclienthost,
std::string zmqclientuuid)
{
set_zmq_wants_to_connect(1);
std::lock_guard connectionMutex(zmq_socket_mutex);
if (z_socket && z_socket->handle() && (zmqtarget != zmq_target)) {
// delete the exinsting ZMQ connection
delete z_socket;
delete z_ctx;
}
if (zmqtarget.length()) {
zmq_target = zmqtarget;
}
if (zmqidentity.length()) {
zmq_identity = zmqidentity;
}
if (zmqname.length()) {
zmq_name = zmqname;
}
if (zmqclienthost.length()) {
zmq_clienthost = zmqclienthost;
}
if (zmqclientuuid.length()) {
zmq_clientuuid = zmqclientuuid;
}
eos_static_info("metad connect %s as %s %d",
zmq_target.c_str(), zmq_identity.c_str(), zmq_identity.length());
z_ctx = new zmq::context_t(1);
z_socket = new zmq::socket_t(*z_ctx, ZMQ_DEALER);
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1)
z_socket->set(zmq::sockopt::routing_id, zmq_identity);
#else
z_socket->setsockopt(ZMQ_IDENTITY, zmq_identity.c_str(), zmq_identity.length());
#endif
z_socket->set(zmq::sockopt::tcp_keepalive, 1);
z_socket->set(zmq::sockopt::tcp_keepalive_idle, 90);
z_socket->set(zmq::sockopt::tcp_keepalive_intvl, 90);
#pragma GCC diagnostic pop
while (1) {
try {
z_socket->connect(zmq_target);
int linger = 0;
z_socket->set(zmq::sockopt::linger, linger);
eos_static_notice("connected to %s", zmq_target.c_str());
break;
} catch (zmq::error_t& e) {
if (e.num() != EINTR) {
eos_static_err("msg=\"%s\" rc=%d", e.what(), e.num());
return e.num();
}
eos_static_err("msg=\"%s\" rc=%d", e.what(), e.num());
}
}
if (zmqclientuuid.length()) {
mdbackend->set_clientuuid(zmq_clientuuid);
}
set_zmq_wants_to_connect(0);
return 0;
}
/* -------------------------------------------------------------------------- */
metad::shared_md
metad::lookup(fuse_req_t req, fuse_ino_t parent, const char* name)
{
eos_static_info("ino=%#lx name=%s", parent, name);
// --------------------------------------------------
// STEP 1 : retrieve the required parent MD
// --------------------------------------------------
shared_md pmd = get(req, parent, "", false);
shared_md md;
if ((*pmd)()->id() == parent) {
XrdSysMutexHelper mLock(pmd->Locker());
fuse_ino_t inode = 0; // inode referenced by parent + name
// self lookup required for NFS exports
if (!strcmp(name, ".")) {
return pmd;
}
// parent lookup required for NFS exports
if (!strcmp(name, "..")) {
const uint64_t pmd_pid = (*pmd)()->pid();
mLock.UnLock();
shared_md ppmd = get(req, pmd_pid, "", false);
return ppmd;
}
// --------------------------------------------------
// STEP 2: check if we hold a cap for that directory
// --------------------------------------------------
if (pmd->cap_count() && !pmd->needs_refresh()) {
// --------------------------------------------------
// if we have a cap and we listed this directory, we trust the child information
// --------------------------------------------------
if (pmd->local_children().count(
eos::common::StringConversion::EncodeInvalidUTF8(name))) {
inode = pmd->local_children().at(
eos::common::StringConversion::EncodeInvalidUTF8(name));
} else {
if (pmd->local_enoent().count(name)) {
md = std::make_shared();
(*md)()->set_err(ENOENT);
return md;
}
// if we are still having the creator MD record, we can be sure, that we know everything about this directory
if ((*pmd)()->creator() ||
((*pmd)()->type() == (*pmd)()->MDLS)) {
// no entry - TODO return a NULLMD object instead of creating it all the time
md = std::make_shared();
(*md)()->set_err((*pmd)()->err());
return md;
}
if (pmd->get_todelete().count(eos::common::StringConversion::EncodeInvalidUTF8(
name))) {
// if this has been deleted, we just say this
md = std::make_shared();
(*md)()->set_err((*pmd)()->err());
if (EOS_LOGS_DEBUG) {
eos_static_debug("in deletion list %016lx name=%s", (*pmd)()->id(), name);
}
return md;
}
}
} else {
// --------------------------------------------------
// if we don't have a cap, get will result in an MGM call anyway
// --------------------------------------------------
}
// --------------------------------------------------
// try to get the meta data record
// --------------------------------------------------
const std::string pfullpath = (*pmd)()->fullpath();
mLock.UnLock();
md = get(req, inode, "", false, pmd, name);
if (md) {
md->Locker().Lock();
md->store_fullpath(pfullpath, name);
md->Locker().UnLock();
} else {
md = std::make_shared();
(*md)()->set_err(ENOENT);
}
} else {
// --------------------------------------------------
// no md available
// --------------------------------------------------
md = std::make_shared();
(*md)()->set_err((*pmd)()->err());
}
return md;
}
/* -------------------------------------------------------------------------- */
int
metad::forget(fuse_req_t req, fuse_ino_t ino, int nlookup)
{
shared_md md;
uint64_t pino = 0;
if (!mdmap.retrieveTS(ino, md)) {
return ENOENT;
}
{
// we should lock this, but since we are in a kernel call-back function, we skip it and
// accept an unsafe access on pid() later
// XrdSysMutexHelper mLock(md->Locker());
if (!(*md)()->id()) {
return EAGAIN;
}
if (EOS_LOGS_DEBUG) {
eos_static_debug("count=%d(-%d) - ino=%#lx", md->lookup_is(), nlookup, ino);
}
if (!md->lookup_dec(nlookup)) {
eos_static_debug("count=%d(-%d) - ino=%#lx", md->lookup_is(), nlookup, ino);
return EAGAIN;
}
pino = (*md)()->pid();
}
if (has_flush(ino)) {
eos_static_debug("flush - ino=%016x", ino);
return 0;
}
if ((pino > 1) && (ino != pino)) {
// this does not make sense for the mount directory (inode 1)
shared_md pmd;
if (!mdmap.retrieveTS(pino, pmd)) {
return ENOENT;
}
if (pmd->cap_count()) {
eos_static_debug("caps %d - ino=%016x", pmd->cap_count(), ino);
return 0;
}
if (pmd->opendir_is()) {
eos_static_debug("opendir %d - ino=%016x", pmd->opendir_is(), ino);
return 0;
}
} else {
// we don't remove the mount point
return 0;
}
if (EOS_LOGS_DEBUG) {
XrdSysMutexHelper mLock(md->Locker());
eos_static_debug("delete md object - ino=%#lx name=%s", ino,
(*md)()->name().c_str());
}
if (mdmap.eraseTS(ino)) {
stat.inodes_dec();
}
// - we currently don't forget old mappings, because it creates race conditions with overlaying caps
// PUTMEBACK-LATER: inomap.erase_bwd(ino);
return 0;
}
/* -------------------------------------------------------------------------- */
void
metad::mdx::convert(struct fuse_entry_param& e, double lifetime)
{
const char* k_mdino = "sys.eos.mdino";
const char* k_fifo = "sys.eos.fifo";
auto attrMap = (*this)()->attr();
e.ino = (*this)()->id();
e.attr.st_dev = 0;
e.attr.st_ino = (*this)()->id();
e.attr.st_mode = (*this)()->mode();
e.attr.st_nlink = (*this)()->nlink();
if (attrMap.count(k_mdino)) {
uint64_t mdino = std::stoull(attrMap[k_mdino]);
uint64_t local_ino = EosFuse::Instance().mds.inomap.forward(mdino);
shared_md tmd = EosFuse::Instance().mds.getlocal(NULL, local_ino);
if (!(*tmd)()->id()) {
local_ino = mdino;
e.attr.st_nlink = 2;
eos_static_err("converting hard-link %s target inode %#lx remote %#lx not in cache, nlink set to %d",
(*this)()->name().c_str(), local_ino, mdino, e.attr.st_nlink);
} else {
if (EOS_LOGS_DEBUG) {
eos_static_debug("hlnk convert name=%s id=%#lx target local_ino=%#lx nlink0=",
(*this)()->name().c_str(), (*this)()->id(), local_ino, (*tmd)()->nlink());
}
e.attr.st_nlink = (*tmd)()->nlink();
}
e.ino = e.attr.st_ino = local_ino;
}
if (attrMap.count(k_fifo)) {
e.attr.st_mode &= !S_IFREG;
e.attr.st_mode |= S_IFIFO;
}
e.attr.st_uid = (*this)()->uid();
e.attr.st_gid = (*this)()->gid();
e.attr.st_rdev = 0;
e.attr.st_size = (*this)()->size();
e.attr.st_blksize = 4096;
e.attr.st_blocks = (e.attr.st_size + 511) / 512;
e.attr.st_atime = (*this)()->atime();
e.attr.st_mtime = (*this)()->mtime();
e.attr.st_ctime = (*this)()->ctime();
e.attr.MTIMESPEC.tv_sec = (*this)()->mtime();
e.attr.MTIMESPEC.tv_nsec = (*this)()->mtime_ns();
e.attr.CTIMESPEC.tv_sec = (*this)()->ctime();
e.attr.CTIMESPEC.tv_nsec = (*this)()->ctime_ns();
if (!e.attr.st_atime) {
// if 0 atime, we adopt MTIME as ATIME
e.attr.ATIMESPEC.tv_sec = e.attr.MTIMESPEC.tv_sec;
e.attr.ATIMESPEC.tv_nsec = e.attr.MTIMESPEC.tv_nsec;
} else {
e.attr.ATIMESPEC.tv_sec = (*this)()->atime();
e.attr.ATIMESPEC.tv_nsec = (*this)()->atime_ns();
}
if (EosFuse::Instance().Config().options.md_kernelcache) {
e.attr_timeout = lifetime;
e.entry_timeout = (lifetime > 30) ? 30 : lifetime;
} else {
e.attr_timeout = 0;
e.entry_timeout = 0;
}
if (EosFuse::Instance().Config().options.overlay_mode) {
e.attr.st_mode |= EosFuse::Instance().Config().options.overlay_mode;
}
if (S_ISDIR(e.attr.st_mode)) {
if (!EosFuse::Instance().Config().options.show_tree_size) {
// show 4kb directory size
e.attr.st_size = 4096;
e.attr.st_blocks = (e.attr.st_size + 511) / 512;
}
// we mask this bits for the moment
e.attr.st_mode &= (~S_ISGID);
e.attr.st_mode &= (~S_ISUID);
}
if (S_ISDIR(e.attr.st_mode)) {
if (!EosFuse::Instance().Config().options.show_tree_size) {
// show 4kb directory size
e.attr.st_size = 4096;
}
// we mask this bits for the moment
e.attr.st_mode &= (~S_ISGID);
e.attr.st_mode &= (~S_ISUID);
}
if (S_ISLNK(e.attr.st_mode)) {
e.attr.st_size = (*this)()->target().size();
}
e.generation = 1;
}
/* -------------------------------------------------------------------------- */
std::string
metad::mdx::dump()
{
char sout[16384];
snprintf(sout, sizeof(sout),
"ino=%#lx dev=%#lx mode=%#o nlink=%u uid=%05u gid=%05u rdev=%#lx "
"size=%llu bsize=%lu blocks=%llu atime=%lu.%lu mtime=%lu.%lu ctime=%lu.%lu",
(unsigned long)(*this)()->id(), (unsigned long)0,
(unsigned int)(*this)()->mode(),
(unsigned int)(*this)()->nlink(),
(unsigned int)(*this)()->uid(), (unsigned int)(*this)()->gid(),
(unsigned long)0,
(unsigned long long)(*this)()->size(), (unsigned long) 4096,
(unsigned long long)(*this)()->size() / 512,
(unsigned long)(*this)()->atime(), (unsigned long)(*this)()->atime_ns(),
(unsigned long)(*this)()->mtime(), (unsigned long)(*this)()->mtime_ns(),
(unsigned long)(*this)()->ctime(), (unsigned long)(*this)()->ctime_ns());
return sout;
}
/* -------------------------------------------------------------------------- */
std::string
metad::mdx::dump(struct fuse_entry_param& e)
{
char sout[16384];
snprintf(sout, sizeof(sout),
"ino=%#lx dev=%#lx mode=%#o nlink=%u uid=%05u gid=%05u rdev=%#lx "
"size=%llu bsize=%lu blocks=%llu atime=%lu.%lu mtime=%lu.%lu ctime=%lu.%lu "
"attr-timeout=%lu entry-timeout=%lu",
(unsigned long) e.attr.st_ino, (unsigned long) e.attr.st_dev,
(unsigned int) e.attr.st_mode, (unsigned int) e.attr.st_nlink,
(unsigned int) e.attr.st_uid, (unsigned int) e.attr.st_gid,
(unsigned long) e.attr.st_rdev,
(unsigned long long) e.attr.st_size, (unsigned long) e.attr.st_blksize,
(unsigned long long) e.attr.st_blocks,
(unsigned long) e.attr.ATIMESPEC.tv_sec,
(unsigned long) e.attr.ATIMESPEC.tv_nsec,
(unsigned long) e.attr.MTIMESPEC.tv_sec,
(unsigned long) e.attr.MTIMESPEC.tv_nsec,
(unsigned long) e.attr.CTIMESPEC.tv_sec,
(unsigned long) e.attr.CTIMESPEC.tv_nsec,
(unsigned long) e.attr_timeout,
(unsigned long) e.entry_timeout);
return sout;
}
/* -------------------------------------------------------------------------- */
bool
metad::map_children_to_local(shared_md pmd)
{
bool ret = true;
// map a remote listing to a local one
std::set names;
std::vector names_to_delete;
// we always merge remote contents, for changes our cap will be dropped
for (auto map = (*pmd)()->children().begin(); map != (*pmd)()->children().end();
++map) {
if (EOS_LOGS_DEBUG) {
eos_static_debug("translate %s [%#lx]",
eos::common::StringConversion::EncodeInvalidUTF8(map->first).c_str(),
map->second);
}
uint64_t remote_ino = map->second;
uint64_t local_ino = inomap.forward(remote_ino);
if (EosFuse::Instance().Config().options.hide_versions &&
EosFuse::Instance().mds.supports_hideversion()) {
// check for version prefixes
if (map->first.substr(0,
strlen(EOS_COMMON_PATH_VERSION_FILE_PREFIX)) ==
EOS_COMMON_PATH_VERSION_FILE_PREFIX) {
// check if there is actually a 'babysitting' reference file for this version, if now we display it!
std::string nvfile = map->first.substr(strlen(
EOS_COMMON_PATH_VERSION_FILE_PREFIX));
eos_static_crit("hide %d:%d %s",
EosFuse::Instance().Config().options.hide_versions,
EosFuse::Instance().mds.supports_hideversion(),
nvfile.c_str());
if ((*pmd)()->children().count(nvfile)) {
continue;
}
}
}
// skip entries we already know, if we don't have the mapping we have forgotten already this one
if (pmd->local_children().count(
eos::common::StringConversion::EncodeInvalidUTF8(map->first)) && local_ino) {
continue;
}
// skip entries which are the deletion list
if (pmd->get_todelete().count(eos::common::StringConversion::EncodeInvalidUTF8(
map->first))) {
continue;
}
shared_md md;
if (!mdmap.retrieveTS(local_ino, md)) {
local_ino = remote_ino;
inomap.insert(remote_ino, local_ino);
stat.inodes_inc();
stat.inodes_ever_inc();
md = std::make_shared();
mdmap.insertTS(local_ino, md);
}
if (EOS_LOGS_DEBUG)
eos_static_debug("store-lookup r-ino %016lx <=> l-ino %016lx", remote_ino,
local_ino);
pmd->local_children()[eos::common::StringConversion::EncodeInvalidUTF8(
map->first)] = local_ino;
}
if (EOS_LOGS_DEBUG) {
for (auto map = pmd->local_children().begin();
map != pmd->local_children().end(); ++map) {
eos_static_debug("listing: %s [%#lx]", map->first.c_str(), map->second);
}
}
(*pmd)()->set_nchildren(pmd->local_children().size());
(*pmd)()->mutable_children()->clear();
return ret;
}
/* -------------------------------------------------------------------------- */
void
/* -------------------------------------------------------------------------- */
metad::wait_backlog(uint64_t id, size_t minfree)
/* -------------------------------------------------------------------------- */
{
// ------------------------------------------------------------------------
// wait_backlog should be called with mdflush locked.
//
// id: if non-zero the caller holds a lock on the associated mdx
// (order for lock acquisition is mdx, then mdflush). This should
// be the only mdx lock the caller holds.
// minfree: if possible wait for the number of elements in mdqueue map to
// have this much headroom below the mdqueue_max_backlog level.
//
// if the mdcflush thread is currently attempting to process "id", skip the
// wait as we risk a deadlock with mdcflush.
// ------------------------------------------------------------------------
while (mdqueue.size() + minfree > mdqueue_max_backlog) {
if (id) {
if (id == mdqueue_current)
return;
}
mdflush.WaitMS(25);
}
}
/* -------------------------------------------------------------------------- */
void
/* -------------------------------------------------------------------------- */
metad::wait_upstream(fuse_req_t req,
fuse_ino_t ino)
/* -------------------------------------------------------------------------- */
{
shared_md md;
if (mdmap.retrieveTS(ino, md)) {
if (md && (*md)()->id()) {
while (1) {
// wait that the entry is leaving the flush queue
mdflush.Lock();
if (mdqueue.count((*md)()->id())) {
mdflush.UnLock();
eos_static_notice("waiting for entry to be synced upstream ino=%#lx",
(*md)()->id());
std::this_thread::sleep_for(std::chrono::microseconds(500));
} else {
mdflush.UnLock();
break;
}
}
}
}
}
/* -------------------------------------------------------------------------- */
metad::shared_md
/* -------------------------------------------------------------------------- */
metad::getlocal(fuse_req_t req,
fuse_ino_t ino)
{
eos_static_info("ino=%1llx", ino);
shared_md md;
if (!mdmap.retrieveTS(ino, md)) {
md = std::make_shared();
(*md)()->set_err(ENOENT);
}
return md;
}
/* -------------------------------------------------------------------------- */
std::string
/* -------------------------------------------------------------------------- */
metad::getpath(fuse_ino_t ino)
{
eos_static_info("ino=%1llx", ino);
shared_md md;
if (!mdmap.retrieveTS(ino, md)) {
return "";
}
// TODO: we don't take md lock here; introduce a more robust approach.
// Currently return via the c-string to reduce disturbance of the source str
return (*md)()->fullpath().c_str();
}
/* -------------------------------------------------------------------------- */
metad::shared_md
/* -------------------------------------------------------------------------- */
metad::get(fuse_req_t req,
fuse_ino_t ino,
std::string authid,
bool listing,
shared_md pmd,
const char* name,
bool readdir)
{
eos_static_info("ino=%#lx pino=%#lx name=%s listing=%d", ino,
pmd ? (*pmd)()->id() : 0, name, listing);
shared_md md;
if (ino) {
if (!mdmap.retrieveTS(ino, md)) {
md = std::make_shared();
(*md)()->set_md_ino(inomap.backward(ino));
} else {
if (ino != 1) {
// we need this to refetch a hard link target which was removed server side
XrdSysMutexHelper mLock(md->Locker());
(*md)()->set_md_ino(ino);
}
}
if (EOS_LOGS_DEBUG) {
eos_static_debug("MD:\n%s", (!md) ? "" : dump_md(md).c_str());
}
} else {
// -------------------------------------------------------------------------
// this happens if we get asked for a child, which was never listed before
// -------------------------------------------------------------------------
md = std::make_shared();
}
if (!md || !(*md)()->id()) {
// -------------------------------------------------------------------------
// there is no local meta data available, this can only be found upstream
// -------------------------------------------------------------------------
} else {
// -------------------------------------------------------------------------
// there is local meta data, we have to decide if we can 'trust' it, or we
// need to refresh it from upstream - TODO !
// -------------------------------------------------------------------------
if (readdir && !listing) {
eos_static_info("returning opendir(readdir) entry");
return md;
}
if (pmd && (pmd->cap_count() || (*pmd)()->creator()) && !pmd->needs_refresh() &&
!md->needs_refresh()) {
eos_static_info("returning cap entry");
return md;
} else {
eos_static_info("pmd=%#lx cap-cnt=%d", pmd ? (*pmd)()->id() : 0,
pmd ? pmd->cap_count() : 0);
uint64_t md_pid = 0;
mode_t md_mode = 0;
{
XrdSysMutexHelper mLock(md->Locker());
if (((!listing) || (listing && (*md)()->type() == (*md)()->MDLS)) &&
(*md)()->md_ino() &&
md->cap_count() && !md->needs_refresh()) {
eos_static_info("returning cap entry via parent lookup cap-count=%d",
md->cap_count());
if (EOS_LOGS_DEBUG) {
eos_static_debug("MD:\n%s", dump_md(md, false).c_str());
}
return md;
}
md_pid = (*md)()->pid();
md_mode = (*md)()->mode();
}
if (!S_ISDIR(md_mode)) {
// files are covered by the CAP of the parent, so if there is a cap
// on the parent we can return this entry right away
if (mdmap.retrieveTS(md_pid, pmd)) {
if (pmd && (*pmd)()->id() && pmd->cap_count() && !md->needs_refresh()) {
return md;
}
}
}
}
XrdSysMutexHelper mLock(md->Locker());
if (((*md)()->id() != 1) && !(*md)()->pid() && !md->needs_refresh()) {
// this must have been generated locally, we return this entry
eos_static_info("returning generated entry");
if (EOS_LOGS_DEBUG) {
eos_static_debug("MD:\n%s", dump_md(md, false).c_str());
}
return md;
}
}
// ---------------------------------------------------------------------------
// we will get meta data from upstream
// ---------------------------------------------------------------------------
int rc = 0; // response code to a backend getMD call
std::vector contv; // response container
int thecase = 0;
if (ino == 1)
// -------------------------------------------------------------------------
// CASE 1: root mount
// -------------------------------------------------------------------------
{
thecase = 1;
// -------------------------------------------------------------------------
// the root inode is the only one we get by full path, all the others
// go by parent-ino + name or inode
// -------------------------------------------------------------------------
std::string root_path = "/";
// request the root meta data
rc = mdbackend->getMD(req, root_path, contv, listing, authid);
// set ourselfs as parent of root since we might mount
// a remote directory != '/'
(*md)()->set_pid(1);
} else if (!ino)
// -------------------------------------------------------------------------
// CASE 2: by remote parent inode + name
// -------------------------------------------------------------------------
{
thecase = 2;
if (pmd) {
// prevent resyning when we have deletions pending
/*while (1)
{
XrdSysMutexHelper mdLock(pmd->Locker());
if (pmd->WaitSync(1))
{
if (pmd->get_todelete().size())
continue;
break;
}
}
*/
pmd->Locker().Lock();
uint64_t pmd_ino = (*pmd)()->md_ino();
pmd->Locker().UnLock();
if (pmd_ino) {
rc = mdbackend->getMD(req, pmd_ino, name, contv, listing, authid);
} else {
rc = ENOENT;
}
} else {
rc = ENOENT;
}
} else
// -------------------------------------------------------------------------
// CASE 3: by remote inode
// -------------------------------------------------------------------------
{
thecase = 3;
XrdSysMutexHelper mLock(md->Locker());
if ((*md)()->md_ino()) {
/*
// prevent resyncing when we have deletions pending
while (1)
{
XrdSysMutexHelper mdLock(md->Locker());
if (md->WaitSync(1))
{
if (md->get_todelete().size())
continue;
break;
}
}
*/
eos_static_info("ino=%016lx type=%d", (*md)()->md_ino(), (*md)()->type());
rc = mdbackend->getMD(req, (*md)()->md_ino(),
listing ? (((*md)()->type() != (*md)()->MDLS)
? 0 : (*md)()->clock()) : (*md)()->clock(),
contv, listing, authid);
} else {
if ((*md)()->id()) {
// that can be a locally created entry which is not yet upstream
rc = 0;
if (EOS_LOGS_DEBUG) {
eos_static_debug("MD:\n%s", dump_md(md).c_str());
}
return md;
} else {
rc = ENOENT;
}
}
}
if (!rc) {
// -------------------------------------------------------------------------
// we need to store all response data and eventually create missing
// hierarchical entries
// -------------------------------------------------------------------------
eos_static_debug("apply vector=%d", contv.size());
for (auto it = contv.begin(); it != contv.end(); ++it) {
if (it->ref_inode_()) {
if (ino) {
// the response contains the remote inode according to the request
inomap.insert(it->ref_inode_(), ino);
}
uint64_t l_ino;
// store the retrieved meta data blob
if (!(l_ino = apply(req, *it, listing))) {
eos_static_crit("msg=\"failed to apply response\"");
} else {
ino = l_ino;
}
} else {
// we didn't get the md back
}
}
// if the md record was returned, it is accessible after the apply function
// attached it. We should also attach to the parent to be able to add
// a not yet published child entry at the parent.
if (md) {
std::string md_name;
mdmap.retrieveWithParentTS(ino, md, pmd, md_name);
eos_static_info("ino=%08llx pino=%08llx name=%s listing=%d", ino,
pmd ? (*pmd)()->id() : 0, name, listing);
switch (thecase) {
case 1:
// nothing to do
break;
case 2: {
// we make sure, that the meta data record is attached to the local parent
if (pmd && (*pmd)()->id()) {
std::string encname = eos::common::StringConversion::EncodeInvalidUTF8
(md_name);
XrdSysMutexHelper mLock(pmd->Locker());
if (!pmd->local_children().count(encname) &&
!pmd->get_todelete().count(encname) &&
!md->deleted()) {
eos_static_info("attaching %s [%#lx] to %s [%#lx]",
encname.c_str(), ino,
(*pmd)()->name().c_str(), (*pmd)()->id());
// persist this hierarchical dependency
pmd->local_children()[encname] = ino;
update(req, pmd, "", true);
}
}
break;
}
case 3:
break;
}
} else {
rc = ENOENT;
}
}
if (rc) {
shared_md md = std::make_shared();
(*md)()->set_err(rc);
if (EOS_LOGS_DEBUG) {
eos_static_debug("MD:\n%s", dump_md(md).c_str());
}
return md;
}
if (EOS_LOGS_DEBUG) {
eos_static_debug("MD:\n%s", dump_md(md).c_str());
}
return md;
}
/* -------------------------------------------------------------------------- */
uint64_t
metad::insert(metad::shared_md md, std::string authid)
{
{
if (EOS_LOGS_DEBUG) {
eos_static_debug("%s", dump_md(md, false).c_str());
}
mdmap.insertTS((*md)()->id(), md);
}
return (*md)()->id();
}
/* -------------------------------------------------------------------------- */
int
metad::wait_flush(fuse_req_t req, metad::shared_md md)
{
// logic to wait for a completion of request
md->Locker().UnLock();
while (1) {
if (md->WaitSync(1)) {
if (has_flush((*md)()->id())) {
// if a deletion was issued, OP state is (*md)()->RM not (*md)()->NONE
// hence we would never leave this loop
continue;
}
break;
}
}
eos_static_info("waited for sync rc=%d bw=%#lx", (*md)()->err(),
inomap.backward((*md)()->id()));
if (!inomap.backward((*md)()->id())) {
md->Locker().Lock();
return (*md)()->err();
} else {
md->Locker().Lock();
return 0;
}
}
/* -------------------------------------------------------------------------- */
bool
/* -------------------------------------------------------------------------- */
metad::has_flush(fuse_ino_t ino)
/* -------------------------------------------------------------------------- */
{
bool in_flush = false;
mdflush.Lock();
if (mdqueue.count(ino)) {
in_flush = true;
}
mdflush.UnLock();
return in_flush;
}
/* -------------------------------------------------------------------------- */
void
metad::update(fuse_req_t req, shared_md md, std::string authid,
bool localstore)
{
fuse_id id(req);
return update(id, md, authid, localstore);
}
/* -------------------------------------------------------------------------- */
void
metad::update(fuse_id fuseid, shared_md md, std::string authid,
bool localstore)
{
mdflush.Lock();
stat.inodes_backlog_store(mdqueue.size());
const uint64_t id = (*md)()->id();
if (!localstore) {
// only updates initiated from FUSE limited,
// server response updates pass
wait_backlog(id, 1);
}
flushentry fe(id, authid, localstore ? mdx::LSTORE : mdx::UPDATE, fuseid);
if (!localstore) {
fe.bind();
}
mdqueue[id]++;
mdflushqueue.push_back(fe);
eos_static_info("added ino=%#lx flushentry=%s queue-size=%u local-store=%d",
id, flushentry::dump(fe).c_str(), mdqueue.size(), localstore);
mdflush.Signal();
mdflush.UnLock();
}
/* -------------------------------------------------------------------------- */
void
metad::add(fuse_req_t req, metad::shared_md pmd, metad::shared_md md,
std::string authid, bool localstore)
{
using eos::common::StringConversion;
// this is called with a lock on the md object
stat.inodes_inc();
stat.inodes_ever_inc();
uint64_t pid = 0;
const uint64_t id = (*md)()->id();
uint64_t pmd_ino = 0;
const std::string encname = StringConversion::EncodeInvalidUTF8((
*md)()->name());
if (EOS_LOGS_DEBUG)
eos_static_debug("child=%s parent=%s inode=%016lx authid=%s localstore=%d",
(*md)()->name().c_str(),
(*pmd)()->name().c_str(), (*md)()->id(), authid.c_str(), localstore);
// avoid lock-order violation
md->Locker().UnLock();
{
XrdSysMutexHelper mLock(pmd->Locker());
if (!pmd->local_children().count(encname)) {
(*pmd)()->set_nchildren((*pmd)()->nchildren() + 1);
}
pmd->local_children()[encname] = id;
(*pmd)()->set_nlink(1);
pmd->get_todelete().erase(encname);
pid = (*pmd)()->id();
pmd_ino = (*pmd)()->md_ino();
}
md->Locker().Lock();
{
// store the local and remote parent inode
(*md)()->set_pid(pid);
(*md)()->set_md_pino(pmd_ino);
}
mdflush.Lock();
stat.inodes_backlog_store(mdqueue.size());
if (!localstore) {
wait_backlog(id, 2);
flushentry fe(id, authid, mdx::ADD, req);
fe.bind();
mdqueue[id]++;
mdflushqueue.push_back(fe);
}
flushentry fep(pid, authid, mdx::LSTORE, req);
fep.bind();
mdqueue[pid]++;
mdflushqueue.push_back(fep);
mdflush.Signal();
mdflush.UnLock();
}
/* -------------------------------------------------------------------------- */
int
metad::add_sync(fuse_req_t req, shared_md pmd, shared_md md, std::string authid)
{
// this is called with a lock on the md object
int rc = 0;
// store the local and remote parent inode
XrdSysMutexHelper mLockParent(pmd->Locker());
const uint64_t pid = (*pmd)()->id();
(*md)()->set_pid(pid);
(*md)()->set_md_pino((*pmd)()->md_ino());
mLockParent.UnLock();
mdx::md_op op = mdx::ADD;
if (EOS_LOGS_DEBUG) {
eos_static_debug("metacache::sync ino=%016lx authid=%s op=%d", (*md)()->id(),
authid.c_str(), (int) op);
}
(*md)()->set_operation((*md)()->SET);
eos_static_info("metacache::sync backend::putMD - start");
while (1) {
// wait that the parent is leaving the mdqueue
mdflush.Lock();
if (mdqueue.count(pid)) {
mdflush.UnLock();
eos_static_info("waiting for parent directory to be synced upstream parent-ino= %#lx ino=%#lx",
pid, (*md)()->id());
std::this_thread::sleep_for(std::chrono::microseconds(500));
} else {
mdflush.UnLock();
break;
}
}
// push to backend
if ((rc = mdbackend->putMD(req, (*md)(), authid, &(md->Locker())))) {
eos_static_err("metad::add_sync backend::putMD failed rc=%d", rc);
// in this case we always clean this MD record to force a refresh
inomap.erase_bwd((*md)()->id());
md->setop_none();
(*md)()->set_err(rc);
if ((*md)()->id()) {
if (mdmap.eraseTS((*md)()->id())) {
stat.inodes_dec();
stat.inodes_ever_inc();
}
}
return rc;
} else {
(*md)()->set_id((*md)()->md_ino());
inomap.insert((*md)()->md_ino(), (*md)()->id());
md->setop_none();
}
eos_static_info("metad::add_sync backend::putMD - stop");
std::string mdstream;
std::string md_name = (*md)()->name();
(*md)()->SerializeToString(&mdstream);
stat.inodes_inc();
stat.inodes_ever_inc();
if (EOS_LOGS_DEBUG)
eos_static_debug("child=%s parent=%s inode=%016lx authid=%s",
(*md)()->name().c_str(),
(*pmd)()->name().c_str(), (*md)()->id(), authid.c_str());
const uint64_t id = (*md)()->id();
// avoid lock-order violation
md->Locker().UnLock();
{
XrdSysMutexHelper mLock(pmd->Locker());
if (!pmd->local_children().count(
eos::common::StringConversion::EncodeInvalidUTF8(md_name))) {
(*pmd)()->set_nchildren((*pmd)()->nchildren() + 1);
}
pmd->local_children()[eos::common::StringConversion::EncodeInvalidUTF8(
md_name)] = id;
(*pmd)()->set_nlink(1);
pmd->get_todelete().erase(eos::common::StringConversion::EncodeInvalidUTF8(
md_name));
}
md->Locker().Lock();
mdflush.Lock();
stat.inodes_backlog_store(mdqueue.size());
wait_backlog(id, 1);
flushentry fep(pid, authid, mdx::LSTORE, req);
fep.bind();
mdqueue[pid]++;
mdflushqueue.push_back(fep);
mdflush.Signal();
mdflush.UnLock();
return 0;
}
/* -------------------------------------------------------------------------- */
int
metad::begin_flush(fuse_req_t req, shared_md emd, std::string authid)
{
shared_md md = std::make_shared();
(*md)()->set_operation((*md)()->BEGINFLUSH);
int rc = 0;
if (!((*emd))()->md_ino()) {
//TODO wait for the remote inode to be known
}
XrdSysMutexHelper mLock(md->Locker());
(*md)()->set_md_ino((*emd)()->md_ino());
if ((rc = mdbackend->putMD(req, (*md)(), authid, &(md->Locker())))) {
eos_static_err("metad::begin_flush backend::putMD failed rc=%d", rc);
}
return rc;
}
/* -------------------------------------------------------------------------- */
int
metad::end_flush(fuse_req_t req, shared_md emd, std::string authid)
{
shared_md md = std::make_shared();
(*md)()->set_operation((*md)()->ENDFLUSH);
int rc = 0;
if (!(*emd)()->md_ino()) {
//TODO wait for the remote inode to be known
}
XrdSysMutexHelper mLock(md->Locker());
(*md)()->set_md_ino((*emd)()->md_ino());
if ((rc = mdbackend->putMD(req, (*md)(), authid, &(md->Locker())))) {
eos_static_err("metad::begin_flush backend::putMD failed rc=%d", rc);
}
return rc;
}
/* -------------------------------------------------------------------------- */
void
metad::remove(fuse_req_t req, metad::shared_md pmd, metad::shared_md md,
std::string authid,
bool upstream)
{
// this is called with the md object locked
if (EOS_LOGS_DEBUG)
eos_static_debug("child=%s parent=%s inode=%#lx upstreaqm=%d",
(*md)()->name().c_str(),
(*pmd)()->name().c_str(), (*md)()->id(), upstream);
struct timespec ts;
eos::common::Timing::GetTimeSpec(ts);
if (!md->deleted()) {
md->lookup_inc();
stat.inodes_deleted_inc();
stat.inodes_deleted_ever_inc();
}
(*md)()->set_mtime(ts.tv_sec);
(*md)()->set_mtime_ns(ts.tv_nsec);
md->setop_delete();
if (EosFuse::Instance().Config().options.hide_versions &&
EosFuse::Instance().mds.supports_hideversion()) {
// indicate the MGM to remove also all versions
(*md)()->set_opflags(eos::fusex::md::DELETEVERSIONS);
}
std::string name = (*md)()->name();
const uint64_t id = (*md)()->id();
// avoid lock order violation
md->Locker().UnLock();
uint64_t pid = 0;
{
XrdSysMutexHelper mLock(pmd->Locker());
pid = (*pmd)()->id();
pmd->local_children().erase(eos::common::StringConversion::EncodeInvalidUTF8(
name));
(*pmd)()->set_nchildren((*pmd)()->nchildren() - 1);
pmd->get_todelete()[eos::common::StringConversion::EncodeInvalidUTF8(
name)] = id;
(*pmd)()->set_mtime(ts.tv_sec);
(*pmd)()->set_mtime_ns(ts.tv_nsec);
}
md->Locker().Lock();
if (!upstream) {
return;
}
mdflush.Lock();
// wait for there to be space in the queue
wait_backlog(id, 2);
flushentry fe(id, authid, mdx::RM, req);
fe.bind();
flushentry fep(pid, authid, mdx::LSTORE, req);
fep.bind();
mdqueue[pid]++;
mdqueue[id]++;
mdflushqueue.push_back(fe);
mdflushqueue.push_back(fep);
stat.inodes_backlog_store(mdqueue.size());
mdflush.Signal();
mdflush.UnLock();
}
/* -------------------------------------------------------------------------- */
void
metad::mv(fuse_req_t req, shared_md p1md, shared_md p2md, shared_md md,
std::string newname, std::string authid1, std::string authid2)
{
if (EOS_LOGS_DEBUG)
eos_static_debug("child=%s new-name=%s parent=%s newparent=%s inode=%016lx",
(*md)()->name().c_str(),
newname.c_str(),
(*p1md)()->name().c_str(), (*p2md)()->name().c_str(), (*md)()->id());
XrdSysMutexHelper mLock(md->Locker());
struct timespec ts;
eos::common::Timing::GetTimeSpec(ts);
uint64_t p1id = 0, p2id = 0;
if (p1md != p2md) {
// move between directories. We need to run an expensive algorithm to
// determine the correct lock order, but a rename should be rather uncommon,
// anyway.
MdLocker locker(p1md, p2md, determineLockOrder(p1md, p2md));
std::string oldname = (*md)()->name();
p1id = (*p1md)()->id();
p2id = (*p2md)()->id();
if (!p2md->local_children().count(
eos::common::StringConversion::EncodeInvalidUTF8(newname))) {
(*p2md)()->set_nchildren((*p2md)()->nchildren() + 1);
}
p2md->local_children()[eos::common::StringConversion::EncodeInvalidUTF8(
newname)] = (*md)()->id();
p1md->local_children().erase(eos::common::StringConversion::EncodeInvalidUTF8(
(*md)()->name()));
(*p1md)()->set_nchildren((*p1md)()->nchildren() - 1);
(*p1md)()->set_mtime(ts.tv_sec);
(*p1md)()->set_mtime_ns(ts.tv_nsec);
(*p1md)()->clear_pmtime();
(*p1md)()->clear_pmtime_ns();
(*p1md)()->set_ctime(ts.tv_sec);
(*p1md)()->set_ctime_ns(ts.tv_nsec);
(*p2md)()->set_mtime(ts.tv_sec);
(*p2md)()->set_mtime_ns(ts.tv_nsec);
(*p2md)()->clear_pmtime();
(*p2md)()->clear_pmtime_ns();
(*p2md)()->set_ctime(ts.tv_sec);
(*p2md)()->set_ctime_ns(ts.tv_nsec);
(*md)()->set_name(newname);
(*md)()->set_pid(p2id);
(*md)()->set_md_pino((*p2md)()->md_ino());
p1md->get_todelete()[eos::common::StringConversion::EncodeInvalidUTF8(
oldname)] = 0; //(*md)()->id(); // make it known as deleted
p2md->get_todelete().erase(eos::common::StringConversion::EncodeInvalidUTF8(
newname)); // the new target is not deleted anymore
p2md->local_enoent().erase(newname); // remove a possible enoent entry
md->setop_update();
p1md->setop_update();
p2md->setop_update();
} else {
// move within directory
XrdSysMutexHelper m1Lock(p1md->Locker());
p1id = p2id = (*p1md)()->id();
if (p2md->local_children().count(
eos::common::StringConversion::EncodeInvalidUTF8(newname))) {
(*p2md)()->set_nchildren((*p2md)()->nchildren() - 1);
}
p2md->local_children()[eos::common::StringConversion::EncodeInvalidUTF8(
newname)] = (*md)()->id();
p1md->local_children().erase(eos::common::StringConversion::EncodeInvalidUTF8(
(*md)()->name()));
p1md->get_todelete()[eos::common::StringConversion::EncodeInvalidUTF8(
(*md)()->name())] = (*md)()->id(); // make it known as deleted
p2md->get_todelete().erase(eos::common::StringConversion::EncodeInvalidUTF8(
newname)); // the new target is not deleted anymore
p2md->local_enoent().erase(newname); // remove a possible enoent entry
(*md)()->set_name(newname);
md->setop_update();
(*p1md)()->set_mtime(ts.tv_sec);
(*p1md)()->set_mtime_ns(ts.tv_nsec);
(*p1md)()->clear_pmtime();
(*p1md)()->clear_pmtime_ns();
(*p1md)()->set_ctime(ts.tv_sec);
(*p1md)()->set_ctime_ns(ts.tv_nsec);
p1md->setop_update();
}
(*md)()->clear_pmtime();
(*md)()->clear_pmtime_ns();
(*md)()->set_ctime(ts.tv_sec);
(*md)()->set_ctime_ns(ts.tv_nsec);
(*md)()->set_mv_authid(authid1); // store also the source authid
mdflush.Lock();
wait_backlog((*md)()->id(), (p1id != p2id) ? 3 : 2);
flushentry fe1(p1id, authid1, mdx::UPDATE, req);
fe1.bind();
mdqueue[p1id]++;
mdflushqueue.push_back(fe1);
if (p1id != p2id) {
flushentry fe2(p2id, authid2, mdx::UPDATE, req);
fe2.bind();
mdqueue[p2id]++;
mdflushqueue.push_back(fe2);
}
flushentry fe((*md)()->id(), authid2, mdx::UPDATE, req);
fe.bind();
mdqueue[(*md)()->id()]++;
mdflushqueue.push_back(fe);
stat.inodes_backlog_store(mdqueue.size());
mdflush.Signal();
mdflush.UnLock();
}
/* -------------------------------------------------------------------------- */
int
metad::rmrf(fuse_req_t req, shared_md md)
{
int rc = mdbackend->rmRf(req, (*md)());
return rc;
}
/* -------------------------------------------------------------------------- */
std::string
metad::dump_md(shared_md md, bool lock)
{
if (!(md)) {
return "";
}
google::protobuf::util::JsonPrintOptions options;
options.add_whitespace = true;
options.always_print_primitive_fields = true;
std::string jsonstring;
if (lock) {
md->Locker().Lock();
}
(void) google::protobuf::util::MessageToJsonString(*((eos::fusex::md*)((
*md)())),
&jsonstring, options);
char capcnt[16];
snprintf(capcnt, sizeof(capcnt), "%d", md->cap_count());
jsonstring += "\nlocal-children: {\n";
for (auto it = md->local_children().begin();
it != md->local_children().end(); ++it) {
char buff[32];
jsonstring += "\"";
jsonstring += it->first;
jsonstring += "\" : ";
jsonstring += longstring::to_decimal(it->second, buff);
if (it == md->local_children().end()) {
break;
} else {
jsonstring += "\",";
}
}
jsonstring += "}\n";
jsonstring += "\nto-delete: {\n";
for (auto it = md->get_todelete().begin();
it != md->get_todelete().end(); ++it) {
jsonstring += "\"";
jsonstring += it->first.c_str();
if (it == md->get_todelete().end()) {
jsonstring += "\"";
break;
} else {
jsonstring += "\",";
}
}
jsonstring += "}\n";
jsonstring += "\nenoent: {\n";
for (auto it = md->local_enoent().begin();
it != md->local_enoent().end(); ++it) {
jsonstring += "\"";
jsonstring += *it;
if (it == md->local_enoent().end()) {
jsonstring += "\"";
break;
} else {
jsonstring += "\",";
}
}
jsonstring += "}\n";
jsonstring += "\ncap-cnt: ";
jsonstring += capcnt;
jsonstring += "\nlru-prev: ";
jsonstring += std::to_string(md->lru_prev());
jsonstring += "\nlru_next: ";
jsonstring += std::to_string(md->lru_next());
jsonstring += "\n";
jsonstring += "\nrefresh: ";
jsonstring += md->needs_refresh() ? "true" : "false";
jsonstring += "\n";
if (lock) {
md->Locker().UnLock();
}
return jsonstring;
}
/* -------------------------------------------------------------------------- */
std::string
metad::dump_md(eos::fusex::md& md)
{
google::protobuf::util::JsonPrintOptions options;
options.add_whitespace = true;
options.always_print_primitive_fields = true;
std::string jsonstring;
(void) google::protobuf::util::MessageToJsonString(md, &jsonstring, options);
return jsonstring;
}
/* -------------------------------------------------------------------------- */
std::string
metad::dump_container(eos::fusex::container& cont)
{
google::protobuf::util::JsonPrintOptions options;
options.add_whitespace = true;
options.always_print_primitive_fields = true;
std::string jsonstring;
(void) google::protobuf::util::MessageToJsonString(cont, &jsonstring, options);
return jsonstring;
}
/* -------------------------------------------------------------------------- */
int
metad::getlk(fuse_req_t req, shared_md md, struct flock* lock)
{
XrdSysMutexHelper locker(md->Locker());
// fill lock request structure
(*md)()->mutable_flock()->set_pid(fuse_req_ctx(req)->pid);
(*md)()->mutable_flock()->set_len(lock->l_len);
(*md)()->mutable_flock()->set_start(lock->l_start);
(*md)()->set_operation((*md)()->GETLK);
switch (lock->l_type) {
case F_RDLCK:
(*md)()->mutable_flock()->set_type(eos::fusex::lock::RDLCK);
break;
case F_WRLCK:
(*md)()->mutable_flock()->set_type(eos::fusex::lock::WRLCK);
break;
case F_UNLCK:
(*md)()->mutable_flock()->set_type(eos::fusex::lock::UNLCK);
break;
default:
return EINVAL;
}
// do sync upstream lock call
int rc = mdbackend->doLock(req, *(*md)(), &(md->Locker()));
// digest the response
if (!rc) {
// store the md->flock response into the flock structure
eos_static_notice("pid=%llu len=%llu start=%llu type=%llu\n",
(*md)()->flock().pid(),
(*md)()->flock().len(),
(*md)()->flock().start(),
(*md)()->flock().type());
lock->l_pid = (*md)()->flock().pid();
lock->l_len = (*md)()->flock().len();
lock->l_start = (*md)()->flock().start();
lock->l_whence = SEEK_SET;
switch ((*md)()->flock().type()) {
case eos::fusex::lock::RDLCK:
lock->l_type = F_RDLCK;
break;
case eos::fusex::lock::WRLCK:
lock->l_type = F_WRLCK;
break;
case eos::fusex::lock::UNLCK:
lock->l_type = F_UNLCK;
break;
default:
rc = (*md)()->flock().err_no();
}
} else {
rc = EAGAIN;
}
// clean the lock structure;
(*md)()->clear_flock();
return rc;
}
/* -------------------------------------------------------------------------- */
int
metad::setlk(fuse_req_t req, shared_md md, struct flock* lock, int sleep)
{
XrdSysMutexHelper locker(md->Locker());
// fill lock request structure
(*md)()->mutable_flock()->set_pid(fuse_req_ctx(req)->pid);
(*md)()->mutable_flock()->set_len(lock->l_len);
(*md)()->mutable_flock()->set_start(lock->l_start);
if (sleep) {
(*md)()->set_operation((*md)()->SETLKW);
} else {
(*md)()->set_operation((*md)()->SETLK);
}
switch (lock->l_type) {
case F_RDLCK:
(*md)()->mutable_flock()->set_type(eos::fusex::lock::RDLCK);
break;
case F_WRLCK:
(*md)()->mutable_flock()->set_type(eos::fusex::lock::WRLCK);
break;
case F_UNLCK:
(*md)()->mutable_flock()->set_type(eos::fusex::lock::UNLCK);
break;
default:
eos_static_notice("unsupported lock operation op:%x", lock->l_type);
return EINVAL;
}
bool backend_call = true;
if (lock->l_type == F_UNLCK) {
backend_call = false;
// check that we have actually a lock for that before doing an upstream call
for (auto it = md->LockTable().begin(); it != md->LockTable().end(); ++it) {
if (it->l_pid == (pid_t)(*md)()->flock().pid()) {
backend_call = true;
}
}
}
// do sync upstream lock call
int rc = 0;
if (backend_call) {
rc = mdbackend->doLock(req, *(*md)(), &(md->Locker()));
}
// digest the response
if (!rc) {
rc = (*md)()->flock().err_no();
} else {
rc = EAGAIN;
}
if (!rc) {
// store in the lock table - unlocking done during flush
if (lock->l_type != F_UNLCK) {
md->LockTable().push_back(*lock);
} else {
// remove from LockTable - not the most efficient
auto it = md->LockTable().begin();
while (it != md->LockTable().end()) {
if (it->l_pid == (pid_t)(*md)()->flock().pid()) {
it = md->LockTable().erase(it);
} else {
it++;
}
}
}
}
// clean the lock structure;
(*md)()->clear_flock();
return rc;
}
/* -------------------------------------------------------------------------- */
int
metad::statvfs(fuse_req_t req, struct statvfs* svfs)
{
return mdbackend->statvfs(req, svfs);
}
/* -------------------------------------------------------------------------- */
void
/* -------------------------------------------------------------------------- */
metad::cleanup(shared_md md)
/* -------------------------------------------------------------------------- */
{
// called with locked md, returns with unlocked md
eos_static_debug("id=%16x", (*md)()->id());
std::vector inval_entry_name;
std::vector inval_files;
std::vector inval_dirs;
for (auto it = md->local_children().begin();
it != md->local_children().end(); ++it) {
shared_md cmd;
if (mdmap.retrieveTS(it->second, cmd)) {
// XrdSysMutexHelper cmLock(cmd->Locker());
bool in_flush = has_flush(it->second);
if (!S_ISDIR((*cmd)()->mode())) {
if (!in_flush && !EosFuse::Instance().datas.has((*cmd)()->id())) {
// clean-only entries, which are not in the flush queue and not open
inval_files.push_back(it->second);
cmd->force_refresh();
}
}
if (!dentrymessaging) {
// if the server does not provide a dentry invalidation message
inval_entry_name.push_back(it->first);
} else {
// if the server provides a dentry invalidation message
// files and directories never get an inval_entry call, only when we see an explicit deletion or a negative cache entry needs cleanup
}
}
}
for (auto it : md->local_enoent()) {
inval_entry_name.push_back(it);
}
md->local_enoent().clear();
md->Locker().UnLock();
if (EosFuse::Instance().Config().options.md_kernelcache) {
for (auto it = inval_entry_name.begin(); it != inval_entry_name.end(); ++it) {
kernelcache::inval_entry((*md)()->id(), *it);
}
}
md->Locker().Lock();
(*md)()->set_type((*md)()->MD);
(*md)()->set_creator(false);
md->cap_count_reset();
(*md)()->set_nchildren(md->local_children().size());
md->get_todelete().clear();
md->setop_none(); /* so that wait_flush() returns */
md->Locker().UnLock();
if ((EosFuse::Instance().Config().options.data_kernelcache) ||
(EosFuse::Instance().Config().options.md_kernelcache)) {
for (auto it = inval_files.begin(); it != inval_files.end(); ++it) {
forget(0, *it, 0);
}
}
}
/* -------------------------------------------------------------------------- */
void
/* -------------------------------------------------------------------------- */
metad::cleanup(fuse_ino_t ino)
/* -------------------------------------------------------------------------- */
{
shared_md md;
if (mdmap.retrieveTS(ino, md)) {
md->Locker().Lock();
return cleanup(md);
}
}
/* -------------------------------------------------------------------------- */
uint64_t
metad::apply(fuse_req_t req, eos::fusex::container& cont, bool listing)
{
// apply receives either a single MD record or a parent MD + all children MD
// we have to make sure that the modification of children is atomic in the parent object
shared_md md;
shared_md pmd;
if (EOS_LOGS_DEBUG) {
eos_static_debug(dump_container(cont).c_str());
}
if (cont.type() == cont.MD) {
int mderr = 0;
if ((mderr = cont.md_().err())) {
eos_static_info("ref_ino=%016lx skipped applying MD container with err=%d",
(long) cont.ref_inode_(), mderr);
return 0;
}
uint64_t md_ino = cont.md_().md_ino();
uint64_t md_pino = cont.md_().md_pino();
uint64_t ino = inomap.forward(md_ino);
bool is_new = false;
{
// Create a new md object, if none is found in the cache
if (!mdmap.retrieveTS(ino, md)) {
is_new = true;
md = std::make_shared();
}
md->Locker().Lock();
if (EOS_LOGS_DEBUG) {
eos_static_debug("%s op=%d deleted=%d", md->dump().c_str(), md->getop(),
md->deleted());
}
if (md->deleted()) {
md->Locker().UnLock();
return ino;
}
}
uint64_t p_ino = inomap.forward(md_pino);
if (!p_ino) {
p_ino = md_pino;
// it might happen that we don't know yet anything about this parent
inomap.insert(md_pino, p_ino);
eos_static_debug("msg=\"creating lookup entry for parent inode\" md-pino=%016lx pino=%016lx md-ino=%016lx ino=%016lx",
md_pino, p_ino, md_ino, ino);
}
if (is_new) {
// in this case we need to create a new one
(*md)()->set_id(md_ino);
uint64_t new_ino = insert(md, (*md)()->authid());
ino = new_ino;
}
if (!S_ISDIR((*md)()->mode())) {
// if its a file we need to have a look at parent cap-count, so we get the parent md
md->Locker().UnLock();
mdmap.retrieveTS(p_ino, pmd);
md->Locker().Lock();
}
{
if (!has_flush(ino)) {
std::string fullpath = (*md)()->fullpath();
(*md)()->CopyFrom(cont.md_());
(*md)()->set_fullpath(fullpath);
shared_md d_md = EosFuse::Instance().datas.retrieve_wr_md(ino);
if (d_md) {
// see if this file is open for write, because in that case
// we have to keep the local size information and modification times
(*md)()->set_size((*d_md)()->size());
(*md)()->set_mtime((*d_md)()->mtime());
(*md)()->set_mtime_ns((*d_md)()->mtime_ns());
}
} else {
eos_static_warning("deferring MD overwrite local-ino=%016lx remote-ino=%016lx ",
(long) ino, (long) md_ino);
}
(*md)()->set_nchildren(md->local_children().size());
if (EOS_LOGS_DEBUG) {
eos_static_debug("store md for local-ino=%016lx remote-ino=%016lx -",
(long) ino, (long) md_ino);
eos_static_debug("%s", md->dump().c_str());
}
}
(*md)()->set_pid(p_ino);
(*md)()->set_id(ino);
md->clear_refresh();
eos_static_info("store local pino=%016lx for %016lx", (*md)()->pid(),
(*md)()->id());
inomap.insert(md_ino, ino);
md->Locker().UnLock();
if (is_new) {
XrdSysMutexHelper mLock(mdmap);
mdmap[ino] = md;
stat.inodes_inc();
stat.inodes_ever_inc();
}
return ino;
} else if (cont.type() == cont.MDMAP) {
uint64_t p_ino = inomap.forward(cont.ref_inode_());
for (auto map = cont.md_map_().md_map_().begin();
map != cont.md_map_().md_map_().end(); ++map) {
// loop over the map of meta data objects
int mderr = 0;
if ((mderr = map->second.err())) {
eos_static_info("ref_ino=%016lx ino=%016lx skipped applying MDMAP "
"container entry with err=%d",
(long) cont.ref_inode_(), (long) map->first, mderr);
continue;
}
uint64_t ino = inomap.forward(map->first);
eos::fusex::cap cap_received;
cap_received.set_id(0);
if (EOS_LOGS_DEBUG) {
eos_static_debug("remote-ino=%016lx local-ino=%016lx", (long) map->first, ino);
}
if (mdmap.retrieveTS(ino, md)) {
// this is an already known inode
eos_static_debug("lock mdmap");
{
bool child = false;
if (map->first != cont.ref_inode_()) {
child = true;
if (!S_ISDIR(map->second.mode())) {
shared_md child_pmd;
if (mdmap.retrieveTS(p_ino, child_pmd)) {
if (cap_received.id()) {
// store cap
EosFuse::Instance().getCap().store(req, cap_received);
md->cap_inc();
}
// don't overwrite md to be flushed
if (has_flush(ino)) {
continue;
}
}
}
md->Locker().Lock();
} else {
md->Locker().Lock();
pmd = md;
if (EOS_LOGS_DEBUG) {
eos_static_debug("lock pmd ino=%#lx", (*pmd)()->id());
}
}
if (map->second.has_capability()) {
// extract any new capability
cap_received = map->second.capability();
}
if (child) {
eos_static_debug("case 1 %s", (*md)()->name().c_str());
eos::fusex::md::TYPE mdtype = (*md)()->type();
size_t local_size = (*md)()->size();
uint64_t local_mtime = (*md)()->mtime();
uint64_t local_mtime_ns = (*md)()->mtime_ns();
(*md)()->CopyFrom(map->second);
md->clear_refresh();
shared_md d_md = EosFuse::Instance().datas.retrieve_wr_md(ino);
if (d_md) {
// see if this file is open for write, because in that case
// we have to keep the local size information and modification times
(*md)()->set_size((*d_md)()->size());
(*md)()->set_mtime((*d_md)()->mtime());
(*md)()->set_mtime_ns((*d_md)()->mtime_ns());
} else {
if (has_flush(ino)) {
(*md)()->set_size(local_size);
(*md)()->set_mtime(local_mtime);
(*md)()->set_mtime_ns(local_mtime_ns);
}
}
(*md)()->set_nchildren(md->local_children().size());
// if this object was a listing type, keep that
(*md)()->set_type(mdtype);
} else {
// we have to overlay the listing
std::map todelete;
mdflush.Lock();
if (!mdqueue.count((*md)()->id())) {
eos_static_debug("case 2 %s id %#lx", (*md)()->name().c_str(), (*md)()->id());
mdflush.UnLock();
todelete = md->get_todelete();
// overwrite local meta data with remote state
(*md)()->CopyFrom(map->second);
md->get_todelete() = todelete;
(*md)()->set_type((*md)()->MD);
(*md)()->set_nchildren(md->local_children().size());
if (!md->get_todelete().size()) {
// if there are no local deletions anymore, we can trust the remote value of nchildren
md->clear_refresh();
}
} else {
eos_static_debug("case 3 %s children=%d listing=%d", (*md)()->name().c_str(),
map->second.children().size(), listing);
mdflush.UnLock();
todelete = md->get_todelete();
// copy only the listing
(*md)()->mutable_children()->clear();
for (auto it = map->second.children().begin();
it != map->second.children().end(); ++it) {
(*((*md)()->mutable_children()))[eos::common::StringConversion::EncodeInvalidUTF8(
it->first)] = it->second;
}
// keep the listing
md->get_todelete() = todelete;
(*md)()->set_type((*md)()->MD);
(*md)()->set_nchildren(md->local_children().size());
if (!md->get_todelete().size()) {
// if there are no local deletions anymore, we can trust the remote value of nchildren
md->clear_refresh();
}
}
}
(*md)()->clear_capability();
(*md)()->set_id(ino);
p_ino = inomap.forward((*md)()->md_pino());
(*md)()->set_pid(p_ino);
eos_static_info("store remote-ino=%016lx local pino=%016lx for %016lx",
(*md)()-> md_pino(), (*md)()->pid(), (*md)()->id());
for (auto it = md->get_todelete().begin(); it != md->get_todelete().end();
++it) {
eos_static_info("%016lx to-delete=%s", (*md)()->id(), it->first.c_str());
}
if (EOS_LOGS_DEBUG) {
eos_static_debug("store md for local-ino=%08ld remote-ino=%016lx type=%d -",
(long) ino, (long) map->first, (*md)()->type());
eos_static_debug("%s", md->dump().c_str());
}
md->Locker().UnLock();
if (!child) {
if (EOS_LOGS_DEBUG) {
eos_static_debug("cap count %d\n", pmd->cap_count());
}
if (!pmd->cap_count()) {
if (EOS_LOGS_DEBUG) {
eos_static_debug("clearing out %0016lx", (*pmd)()->id());
}
// we don't wipe meta-data children which we still have to flush or have open
// if they have to be deleted, there is an explicit callback arriving for deletion
XrdSysMutexHelper scope_lock(pmd->Locker());
std::vector clear_children;
for (auto it = pmd->local_children().begin() ;
it != pmd->local_children().end(); ++it) {
bool in_flush = has_flush(it->second);
bool is_attached = EosFuse::Instance().datas.has(it->second);
if (!in_flush && !is_attached) {
clear_children.push_back(it->first);
}
}
for (auto it = clear_children.begin(); it != clear_children.end(); ++it) {
pmd->local_children().erase(*it);
}
pmd->get_todelete().clear();
}
}
if (cap_received.id()) {
// store cap
EosFuse::Instance().getCap().store(req, cap_received);
md->cap_inc();
}
}
} else {
// this is a new inode we don't know yet
md = std::make_shared();
if (map->second.has_capability()) {
// extract any new capability
cap_received = map->second.capability();
}
*md = map->second;
(*md)()->clear_capability();
md->clear_refresh();
if ((!pmd) && (map->first == cont.ref_inode_())) {
pmd = md;
(*md)()->set_type((*pmd)()->MD);
}
uint64_t new_ino = 0;
new_ino = inomap.forward((*md)()->md_ino());
(*md)()->set_id(new_ino);
insert(md, (*md)()->authid());
if (!listing) {
p_ino = inomap.forward((*md)()->md_pino());
}
(*md)()->set_pid(p_ino);
eos_static_info("store local pino=%016lx for %016lx", (*md)()->pid(),
(*md)()->id());
inomap.insert(map->first, new_ino);
{
mdmap.insertTS(new_ino, md);
stat.inodes_inc();
stat.inodes_ever_inc();
}
if ((pmd == md)) {
if (EOS_LOGS_DEBUG) {
eos_static_debug("cap count %d\n", pmd->cap_count());
}
if (!pmd->cap_count()) {
if (EOS_LOGS_DEBUG) {
eos_static_debug("clearing out %0016lx", (*pmd)()->id());
}
XrdSysMutexHelper scope_lock(pmd->Locker());
pmd->local_children().clear();
pmd->get_todelete().clear();
}
}
if (cap_received.id()) {
// store cap
EosFuse::Instance().getCap().store(req, cap_received);
md->cap_inc();
}
if (EOS_LOGS_DEBUG) {
eos_static_debug("store md for local-ino=%016lx remote-ino=%016lx type=%d -",
(long) new_ino, (long) map->first, (*md)()->type());
}
if (EOS_LOGS_DEBUG) {
eos_static_debug("%s", md->dump().c_str());
}
}
}
if (pmd) {
pmd->Locker().Lock();
}
if (pmd && listing) {
bool ret = false;
if (!(ret = map_children_to_local(pmd))) {
eos_static_err("local mapping has failed %d", ret);
assert(0);
}
if (EOS_LOGS_DEBUG)
for (auto map = pmd->local_children().begin();
map != pmd->local_children().end(); ++map) {
eos_static_debug("listing: %s [%#lx]", map->first.c_str(), map->second);
}
// now flag as a complete listing
(*pmd)()->set_type((*pmd)()->MDLS);
}
if (pmd) {
pmd->Locker().UnLock();
}
}
if (pmd) {
return (*pmd)()->id();
} else {
return 0;
}
}
/* -------------------------------------------------------------------------- */
void
metad::mdcflush(ThreadAssistant& assistant)
{
uint64_t lastflushid = 0;
ThreadAssistant::setSelfThreadName("metad::mdcflush");
while (!assistant.terminationRequested()) {
{
mdflush.Lock();
if (mdqueue.count(lastflushid)) {
// remove entries from the mdqueue, if their ref count is 0
if (!mdqueue[lastflushid]) {
mdqueue.erase(lastflushid);
}
}
mdqueue_current = 0;
stat.inodes_backlog_store(mdqueue.size());
while (mdqueue.size() == 0) {
// TODO(gbitzes): Fix this, so we don't need to poll. Have ThreadAssistant
// accept callbacks for when termination is requested, so we can wake up
// any condvar.
mdflush.Wait(1);
if (assistant.terminationRequested()) {
mdflush.UnLock();
return;
}
}
// TODO: add an optimzation to merge requests in the queue
auto it = mdflushqueue.begin();
uint64_t ino = it->id();
std::string authid = it->authid();
fuse_id f_id = it->get_fuse_id();
mdx::md_op op = it->op();
lastflushid = ino;
eos_static_info("metacache::flush ino=%#lx flushqueue-size=%u", ino,
mdflushqueue.size());
eos_static_info("metacache::flush %s", flushentry::dump(*it).c_str());
mdflushqueue.erase(it);
mdqueue[ino]--;
mdqueue_current = ino;
mdflush.UnLock();
if (assistant.terminationRequested()) {
return;
}
if (EOS_LOGS_DEBUG) {
eos_static_debug("metacache::flush ino=%016lx authid=%s op=%d", ino,
authid.c_str(), (int) op);
}
{
shared_md md;
if (!mdmap.retrieveTS(ino, md)) {
eos_static_crit("metacache::flush failed to retrieve ino=%016lx", ino);
continue;
}
eos_static_info("metacache::flush ino=%016lx", (unsigned long long) ino);
if (op != metad::mdx::LSTORE) {
XrdSysMutexHelper mdLock(md->Locker());
if (!(*md)()->md_pino()) {
// when creating objects locally faster than pushed upstream
// we might not know the remote parent id when we insert a local
// creation request
shared_md pmd;
if (mdmap.retrieveTS((*md)()->pid(), pmd)) {
// TODO: check if we need to lock pmd? But then we have to enforce
// locking order child -> parent
uint64_t md_pino = (*pmd)()->md_ino();
eos_static_info("metacache::flush providing parent inode %016lx to %016lx",
(*md)()->id(), md_pino);
(*md)()->set_md_pino(md_pino);
} else {
eos_static_crit("metacache::flush ino=%016lx parent remote inode not known",
(unsigned long long) ino);
}
}
}
md->Locker().Lock();
if ((*md)()->id()) {
uint64_t removeentry = 0;
const std::string md_name = (*md)()->name();
{
int rc = 0;
if (op == metad::mdx::RM) {
(*md)()->set_operation((*md)()->DELETE);
} else {
(*md)()->set_operation((*md)()->SET);
}
if (((op == metad::mdx::ADD) ||
(op == metad::mdx::UPDATE) ||
(op == metad::mdx::RM)) &&
(*md)()->id() != 1) {
eos_static_info("metacache::flush backend::putMD - start");
eos::fusex::md::TYPE mdtype = (*md)()->type();
(*md)()->set_type((*md)()->MD);
// push to backend
if ((rc = mdbackend->putMD(f_id, (*md)(), authid, &(md->Locker())))) {
eos_static_err("metacache::flush backend::putMD failed rc=%d", rc);
// we just set an error code
//! inomap.erase_bwd((*md)()->id());
//! removeentry=(*md)()->id();
(*md)()->set_err(rc);
} else {
inomap.insert((*md)()->md_ino(), (*md)()->id());
}
if (md->getop() != metad::mdx::RM) {
md->setop_none();
(*md)()->clear_mv_authid();
}
(*md)()->set_type(mdtype);
md->Signal();
eos_static_info("metacache::flush backend::putMD - stop");
}
if ((op == metad::mdx::ADD) || (op == metad::mdx::UPDATE) ||
(op == metad::mdx::LSTORE)) {
// TODO: local MD store is now disabled - delete this code
//! std::string mdstream;
//! md->SerializeToString(&mdstream);
//! EosFuse::Instance().getKV()->put(ino, mdstream);
md->Locker().UnLock();
} else {
md->Locker().UnLock();
if (op == metad::mdx::RM) {
// this step is coupled to the forget function, since we cannot
// forget an entry if we didn't process the outstanding KV changes
stat.inodes_deleted_dec();
if (EOS_LOGS_DEBUG) {
eos_static_debug("count=%d(-%d) - ino=%#lx", md->lookup_is(), 1, ino);
}
XrdSysMutexHelper mLock(md->Locker());
if (md->lookup_dec(1)) {
// forget this inode
removeentry = ino;
}
}
}
}
if (removeentry) {
shared_md pmd;
if (EOS_LOGS_DEBUG) {
eos_static_debug("delete md object - ino=%#lx", removeentry);
}
{
if (EOS_LOGS_DEBUG) {
eos_static_debug("calling forget function %#lx", removeentry);
}
forget(0, removeentry, 0);
}
{
if (pmd) {
XrdSysMutexHelper mmLock(pmd->Locker());
// we don't remote entries from the local deletion list because there could be
// a race condition of a thread doing MDLS overwriting the locally deleted entry
pmd->get_todelete().erase(eos::common::StringConversion::EncodeInvalidUTF8(
md_name));
pmd->Signal();
}
}
}
} else {
md->Locker().UnLock();
}
}
}
}
}
/* -------------------------------------------------------------------------- */
void
metad::mdstackfree(ThreadAssistant& assistant)
{
size_t cnt = 0;
int max_inodes = EosFuse::Instance().Config().options.inmemory_inodes;
ThreadAssistant::setSelfThreadName("metad::mdstackfree");
while (!assistant.terminationRequested()) {
cnt++;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
// do this ~every 64 seconds
if (!(cnt % 128)) {
EosFuse::Instance().Tracker().clean();
}
// do this ~every 128 seconds
if (!(cnt % 256)) {
XrdSysMutexHelper mLock(mdmap);
for (auto it = mdmap.begin(); it != mdmap.end();) {
if (!it->second) {
it++;
continue;
}
// Try if we can acquire a md lock, if yes, then remove them
// from the map & LRU if not,we try the next cycle
std::optional pid;
if (it->second->Locker().CondLock())
{
pid = it->second->pid();
it->second->Locker().UnLock();
}
// if the parent is gone, we can remove the child
if ((pid && !mdmap.count(*pid)) &&
(!S_ISDIR((*(it->second))()->mode()) || it->second->deleted())) {
eos_static_debug("removing orphaned inode from mdmap ino=%#lx path=%s",
it->first, (*(it->second))()->fullpath().c_str());
mdmap.lru_remove(it->first);
it = mdmap.erase(it);
stat.inodes_dec();
} else {
if (it->second->deleted()) {
if ((!has_flush(it->first)) &&
(!EosFuse::Instance().datas.has(it->first))) {
eos_static_debug("removing deleted inode from mdmap ino=%#lx path=%s",
it->first, (*(it->second))()->fullpath().c_str());
mdmap.lru_remove(it->first);
it = mdmap.erase(it);
stat.inodes_dec();
} else {
it++;
}
} else {
it++;
}
}
}
} //end mLock(mdmap)
if (!EosFuse::Instance().Config().mdcachedir.empty()) {
// level the inodes stored in memory and eventually swap out into kv store
int swap_out_inodes = 0 ;
do {
swap_out_inodes = mdmap.sizeTS() - max_inodes -
EosFuse::Instance().mds.stats().inodes_stacked();
if (swap_out_inodes > 0) {
eos_static_info("swap-out %d inodes", swap_out_inodes);
// grab the last lru inode and swap out
mdmap.Lock();
mdmap.lru_dump();
uint64_t inode_to_swap = mdmap.lru_oldest();
if (!inode_to_swap) {
// nothing in the lru list anymore
stat.lru_resets_inc();
mdmap.lru_reset();
mdmap.UnLock();
break;
}
if (mdmap.count(inode_to_swap)) {
shared_md md = mdmap[inode_to_swap];
if ((md.use_count() > 2) ||
(md && md->LockTable().size())) {
eos_static_info("swap-out skipping referenced ino=%#llx ref-count=%lu\n",
inode_to_swap,
md.use_count());
if (md) {
mdmap.lru_update(inode_to_swap, md);
}
mdmap.UnLock();
continue;
}
if (md) {
eos_static_info("swap-out lru-removed ino=%#llx oldest=%#llx", inode_to_swap,
mdmap.lru_oldest());
mdmap.lru_remove(inode_to_swap);
mdmap[inode_to_swap] = 0;
if (mdmap.swap_out(inode_to_swap, md)) {
eos_static_err("swap-out failed for ino=%#llx", inode_to_swap);
}
}
} else {
// the inode to be swapped isn't there, reset LRU list
mdmap.lru_remove(inode_to_swap);
stat.lru_resets_inc();
mdmap.lru_reset();
}
mdmap.UnLock();
}
} while ((swap_out_inodes > 0) &&
(!assistant.terminationRequested()));
}
}
return;
}
/* -------------------------------------------------------------------------- */
bool
metad::determineLockOrder(shared_md md1, shared_md md2)
{
// Determine lock order of _two_ md objects, which is not as trivial as it
// might seem:
//
// Children are _always_ locked before their parents!
// If and only if two md's are not related as in parent and child, we decide the
// order based on increasing inodes.
//
// Example 1: /a/b/c and /a/ -> /a/b/c locked first, as it's a child of /a/
// Example 2: /a/b/c and /a/b/d -> Decision based on increasing inode.
//
// This procedure is very expensive.. we should simplify if possible..
md1->Locker().Lock();
fuse_ino_t inode1 = (*md1)()->id();
md1->Locker().UnLock();
md2->Locker().Lock();
fuse_ino_t inode2 = (*md2)()->id();
md2->Locker().UnLock();
if (isChild(md1, inode2)) {
return true;
}
if (isChild(md2, inode1)) {
return false;
}
// Determine based on increasing inode.
return inode1 < inode2;
}
/* -------------------------------------------------------------------------- */
bool
metad::isChild(shared_md potentialChild, fuse_ino_t parentId)
{
XrdSysMutexHelper helper(potentialChild->Locker());
if ((*potentialChild)()->id() == 1 || (*potentialChild)()->id() == 0) {
return false;
}
if ((*potentialChild)()->id() == parentId) {
return true;
}
shared_md pmd;
if (!mdmap.retrieveTS((*potentialChild)()->pid(), pmd)) {
eos_static_warning("could not lookup parent ino=%d of %d when determining lock order..",
(*potentialChild)()->pid(), (*potentialChild)()->id());
return false;
}
helper.UnLock();
return isChild(pmd, parentId);
}
/* -------------------------------------------------------------------------- */
int
metad::calculateDepth(shared_md md)
{
if ((*md)()->id() == 1 || (*md)()->id() == 0) {
return 1;
}
fuse_ino_t pino = (*md)()->pid();
if (pino == 1 || pino == 0) {
return 2;
}
shared_md pmd;
if (!mdmap.retrieveTS(pino, pmd)) {
eos_static_warning("could not lookup parent ino=%d of %d when calculating depth..",
pino, (*md)()->id());
return -1;
}
XrdSysMutexHelper mmLock(pmd->Locker());
return calculateDepth(pmd) + 1;
}
/* -------------------------------------------------------------------------- */
std::string
metad::calculateLocalPath(shared_md md)
{
std::string lpath = "/" + (*md)()->name();
if ((*md)()->id() == 1 || (*md)()->id() == 0) {
return "/";
}
fuse_ino_t pino = (*md)()->pid();
if (pino == 1 || pino == 0) {
lpath = "/";
lpath += (*md)()->name();
return lpath;
}
shared_md pmd;
if (!mdmap.retrieveTS(pino, pmd)) {
eos_static_warning("could not lookup parent ino=%d of %d when calculating depth..",
pino, (*md)()->id());
return "";
}
XrdSysMutexHelper mmLock(pmd->Locker());
return calculateLocalPath(pmd) + lpath;
}
/* -------------------------------------------------------------------------- */
void
metad::mdcallback(ThreadAssistant& assistant)
{
bool shutdown = false;
std::string sendlog = "";
std::string stacktrace = "";
ThreadAssistant::setSelfThreadName("metad::mdcallback");
while (!assistant.terminationRequested() || shutdown == false) {
mCb.Lock();
while (!mCbQueue.size()) {
mCb.WaitMS(1000);
if (assistant.terminationRequested() || shutdown == true) {
mCb.UnLock();
return;
}
}
auto it = mCbQueue.begin();
shared_response rsp = *it;
mCbQueue.erase(it);
mCb.UnLock();
if (rsp->type() == rsp->EVICT) {
eos_static_crit("evict message from MD server - instruction: %s",
rsp->evict_().reason().c_str());
if (rsp->evict_().reason().find("setlog") != std::string::npos) {
if (rsp->evict_().reason().find("debug") != std::string::npos) {
eos::common::Logging::GetInstance().SetLogPriority(LOG_DEBUG);
}
if (rsp->evict_().reason().find("info") != std::string::npos) {
eos::common::Logging::GetInstance().SetLogPriority(LOG_INFO);
}
if (rsp->evict_().reason().find("error") != std::string::npos) {
eos::common::Logging::GetInstance().SetLogPriority(LOG_ERR);
}
if (rsp->evict_().reason().find("notice") != std::string::npos) {
eos::common::Logging::GetInstance().SetLogPriority(LOG_NOTICE);
}
if (rsp->evict_().reason().find("warning") != std::string::npos) {
eos::common::Logging::GetInstance().SetLogPriority(LOG_WARNING);
}
if (rsp->evict_().reason().find("crit") != std::string::npos) {
eos::common::Logging::GetInstance().SetLogPriority(LOG_CRIT);
}
} else {
if (rsp->evict_().reason().find("stacktrace") != std::string::npos) {
std::string stacktrace_file = EosFuse::Instance().Config().logfilepath;
stacktrace_file += ".strace";
eos::common::StackTrace::GdbTrace(0, getpid(), "thread apply all bt",
stacktrace_file.c_str(), &stacktrace);
mCb.Lock();
mCbTrace = stacktrace;
mCb.UnLock();
} else {
if (rsp->evict_().reason().find("sendlog") != std::string::npos) {
std::string refs;
XrdCl::Proxy::WriteAsyncHandler::DumpReferences(refs);
eos_static_warning("\n%s\n", refs.c_str());
sendlog = "";
int logtagindex =
eos::common::Logging::GetInstance().GetPriorityByString("debug");
for (int j = 0; j <= logtagindex; j++) {
for (int i = 1; i <= 512; i++) {
std::string logline;
eos::common::Logging::GetInstance().gMutex.Lock();
const char* log = eos::common::Logging::GetInstance().gLogMemory[j][
(eos::common::Logging::GetInstance().gLogCircularIndex[j] - i +
eos::common::Logging::GetInstance().gCircularIndexSize) %
eos::common::Logging::GetInstance().gCircularIndexSize].c_str();
if (log) {
logline = log;
}
eos::common::Logging::GetInstance().gMutex.UnLock();
if (logline.length()) {
sendlog += logline;
sendlog += "\n";
}
}
}
mCb.Lock();
mCbLog = sendlog;
mCb.UnLock();
} else {
if (rsp->evict_().reason().find("resetbuffer") != std::string::npos) {
eos_static_warning("MGM asked us to reset the buffer in flight");
XrdCl::Proxy::sWrBufferManager.reset();
XrdCl::Proxy::sRaBufferManager.reset();
} else if (rsp->evict_().reason().find("log2big") != std::string::npos) {
// we were asked to truncate our logfile
EosFuse::Instance().truncateLogFile();
} else {
// suicide
if (rsp->evict_().reason().find("abort") != std::string::npos) {
kill(getpid(), SIGABRT);
} else {
kill(getpid(), SIGTERM);
}
pause();
}
}
}
}
}
if (rsp->type() == rsp->DROPCAPS) {
eos_static_notice("MGM asked us to drop all known caps");
// a newly started MGM requests this as a response to the first heartbeat
EosFuse::Instance().caps.reset();
}
if (rsp->type() == rsp->CONFIG) {
if (rsp->config_().hbrate()) {
eos_static_warning("MGM asked us to set our heartbeat interval to %d seconds, %s dentry-messaging, %s writesizeflush, %s appname, %s mdquery versions %s and server-version=%s",
rsp->config_().hbrate(),
rsp->config_().dentrymessaging() ? "enable" : "disable",
rsp->config_().writesizeflush() ? "enable" : "disable",
rsp->config_().appname() ? "accepts" : "rejects",
rsp->config_().mdquery() ? "accepts" : "rejects",
rsp->config_().hideversion() ? "hidden" : "visible",
rsp->config_().serverversion().c_str());
XrdSysMutexHelper cLock(EosFuse::Instance().mds.ConfigMutex);
EosFuse::Instance().mds.dentrymessaging = rsp->config_().dentrymessaging();
EosFuse::Instance().mds.writesizeflush = rsp->config_().writesizeflush();
EosFuse::Instance().mds.appname = rsp->config_().appname();
EosFuse::Instance().mds.mdquery = rsp->config_().mdquery();
EosFuse::Instance().mds.hideversion = rsp->config_().hideversion();
EosFuse::Instance().mds.hb_interval = (int) rsp->config_().hbrate();
if (rsp->config_().serverversion().length()) {
EosFuse::Instance().mds.serverversion = rsp->config_().serverversion();
}
}
}
if (rsp->type() == rsp->DENTRY) {
uint64_t md_ino = rsp->dentry_().md_ino();
std::string authid = rsp->dentry_().authid();
std::string name = rsp->dentry_().name();
uint64_t ino = inomap.forward(md_ino);
uint64_t pt_mtime = rsp->dentry_().pt_mtime();
uint64_t pt_mtime_ns = rsp->dentry_().pt_mtime_ns();
if (rsp->dentry_().type() == rsp->dentry_().ADD) {
} else if (rsp->dentry_().type() == rsp->dentry_().REMOVE) {
eos_static_notice("remove-dentry: remote-ino=%#lx ino=%#lx clientid=%s authid=%s name=%s",
md_ino, ino, rsp->lease_().clientid().c_str(), authid.c_str(), name.c_str());
// remove directory entry
if (EosFuse::Instance().Config().options.md_kernelcache) {
kernelcache::inval_entry(ino, name);
}
shared_md pmd;
if (ino && mdmap.retrieveTS(ino, pmd)) {
{
XrdSysMutexHelper mLock(pmd->Locker());
if (pmd->local_children().count(
eos::common::StringConversion::EncodeInvalidUTF8(name))) {
pmd->local_children().erase(eos::common::StringConversion::EncodeInvalidUTF8(
name));
pmd->get_todelete().erase(eos::common::StringConversion::EncodeInvalidUTF8(
name));
(*pmd)()->set_nchildren((*pmd)()->nchildren() - 1);
if (pt_mtime || pt_mtime_ns) {
(*pmd)()->set_mtime(pt_mtime);
(*pmd)()->set_mtime_ns(pt_mtime_ns);
(*pmd)()->set_ctime(pt_mtime);
(*pmd)()->set_ctime_ns(pt_mtime_ns);
}
}
kernelcache::inval_inode(ino, false);
}
}
}
}
if (rsp->type() == rsp->REFRESH) {
uint64_t md_ino = rsp->refresh_().md_ino();
uint64_t ino = inomap.forward(md_ino);
mode_t mode = 0;
eos_static_notice("refresh-dentry: remote-ino=%#lx ino=%#lx",
md_ino, ino);
shared_md md;
// force meta data refresh
if (ino && mdmap.retrieveTS(ino, md)) {
XrdSysMutexHelper mLock(md->Locker());
md->force_refresh();
mode = (*md)()->mode();
}
if (EOS_LOGS_DEBUG) {
eos_static_debug("%s", dump_md(md).c_str());
}
if (EosFuse::Instance().Config().options.md_kernelcache) {
eos_static_info("invalidate metadata cache for ino=%#lx", ino);
kernelcache::inval_inode(ino, S_ISDIR(mode) ? false : true);
}
}
if (rsp->type() == rsp->LEASE) {
uint64_t md_ino = rsp->lease_().md_ino();
std::string authid = rsp->lease_().authid();
uint64_t ino = inomap.forward(md_ino);
eos_static_notice("lease: remote-ino=%#lx ino=%#lx clientid=%s authid=%s",
md_ino, ino, rsp->lease_().clientid().c_str(), authid.c_str());
shared_md check_md;
if (ino && mdmap.retrieveTS(ino, check_md)) {
std::string capid = cap::capx::capid(ino, rsp->lease_().clientid());
// wait that the inode is flushed out of the mdqueue
do {
mdflush.Lock();
if (mdqueue.count(ino)) {
mdflush.UnLock();
eos_static_info("lease: delaying cap-release remote-ino=%#lx ino=%#lx clientid=%s authid=%s",
md_ino, ino, rsp->lease_().clientid().c_str(), authid.c_str());
std::this_thread::sleep_for(std::chrono::milliseconds(25));
if (assistant.terminationRequested()) {
return;
}
} else {
mdflush.UnLock();
break;
}
} while (1);
eos_static_debug("");
fuse_ino_t ino = EosFuse::Instance().getCap().forget(capid);
shared_md md;
bool is_locked = false;
if (mdmap.retrieveTS(ino, md)) {
is_locked = true;
md->Locker().Lock();
}
// invalidate children
if (md) {
if ((*md)()->id()) {
// force an update of the metadata with next access
eos_static_info("md=%16x", (*md)()->id());
cleanup(md);
if (EOS_LOGS_DEBUG) {
eos_static_debug("%s", dump_md(md).c_str());
}
} else {
if (is_locked) {
// in case this should somehow happen
md->Locker().UnLock();
}
}
}
} else {
// there might have been several caps and the first has wiped already the MD,
// still we want to remove the cap entry
std::string capid = cap::capx::capid(ino, rsp->lease_().clientid());
eos_static_debug("");
EosFuse::Instance().getCap().forget(capid);
}
}
if (rsp->type() == rsp->CAP) {
std::string clientid = rsp->cap_().clientid();
uint64_t ino = inomap.forward(rsp->cap_().id());
cap::shared_cap cap = EosFuse::Instance().caps.get(ino, clientid);
eos_static_notice("cap-update: cap-id=%#lx %s", rsp->cap_().id(),
cap->dump().c_str());
if ((*cap)()->id()) {
EosFuse::Instance().caps.update_quota(cap, rsp->cap_()._quota());
eos_static_notice("cap-update: cap-id=%#lx %s", rsp->cap_().id(),
cap->dump().c_str());
}
}
if (rsp->type() == rsp->MD) {
fuse_id fuseid;
uint64_t md_ino = rsp->md_().md_ino();
std::string authid = rsp->md_().authid();
uint64_t ino = inomap.forward(md_ino);
eos_static_notice("md-update: remote-ino=%#lx ino=%#lx authid=%s",
md_ino, ino, authid.c_str());
// we get this when a file update/flush appeared
shared_md md;
int64_t bookingsize = 0;
uint64_t pino = 0;
mode_t mode = 0;
std::string md_clientid;
std::string old_name;
if (mdmap.retrieveTS(ino, md)) {
eos_static_notice("md-update: (existing) remote-ino=%#lx ino=%#lx authid=%s",
md_ino, ino, authid.c_str());
// updated file MD
if (EOS_LOGS_DEBUG) {
eos_static_debug("%s op=%d", md->dump().c_str(), md->getop());
}
md->Locker().Lock();
bookingsize = rsp->md_().size() - (*md)()->size();
md_clientid = rsp->md_().clientid();
eos_static_info("md-update: %s %s", (*md)()->name().c_str(),
rsp->md_().name().c_str());
// check if this implies a rename
if ((*md)()->name() != rsp->md_().name()) {
old_name = rsp->md_().name();
}
// verify that this record is newer than
if (rsp->md_().clock() >= (*md)()->clock()) {
eos_static_info("overwriting clock MD %#lx => %#lx", (*md)()->clock(),
rsp->md_().clock());
std::string fullpath = (*md)()->fullpath(); // keep the fullpath information
*md = rsp->md_();
(*md)()->set_creator(false);
(*md)()->set_bc_time(time(NULL));
(*md)()->set_fullpath(fullpath);
} else {
eos_static_warning("keeping clock MD %#lx => %#lx", (*md)()->clock(),
rsp->md_().clock());
}
(*md)()->clear_clientid();
pino = inomap.forward((*md)()->md_pino());
(*md)()->set_id(ino);
(*md)()->set_pid(pino);
mode = (*md)()->mode();
if (EOS_LOGS_DEBUG) {
eos_static_debug("%s op=%d", md->dump().c_str(), md->getop());
}
// update the local store
update(fuseid, md, authid, true);
std::string name = (*md)()->name();
md->Locker().UnLock();
// adjust local quota
cap::shared_cap cap = EosFuse::Instance().caps.get(pino, md_clientid);
if ((*cap)()->id()) {
if (bookingsize >= 0) {
EosFuse::Instance().caps.book_volume(cap, (uint64_t) bookingsize);
} else {
EosFuse::Instance().caps.free_volume(cap, (uint64_t) - bookingsize);
}
EosFuse::instance().caps.book_inode(cap);
} else {
eos_static_debug("missing quota node for pino=%#lx and clientid=%s",
pino, (*md)()->clientid().c_str());
}
// possibly invalidate kernel cache
if (EosFuse::Instance().Config().options.md_kernelcache ||
EosFuse::Instance().Config().options.data_kernelcache) {
eos_static_info("invalidate data cache for ino=%#lx", ino);
kernelcache::inval_inode(ino, S_ISDIR(mode) ? false : true);
}
if (EosFuse::Instance().Config().options.md_kernelcache) {
if (old_name.length()) {
eos_static_info("invalidate previous name for ino=%#lx old-name=%s", ino,
old_name.c_str());
kernelcache::inval_entry(pino, old_name.c_str());
}
kernelcache::inval_inode(pino, false);
}
if (S_ISREG(mode)) {
// invalidate local disk cache
EosFuse::Instance().datas.invalidate_cache(ino);
eos_static_info("invalidate local disk cache for ino=%#lx", ino);
}
} else {
eos_static_info("md-update: (new) remote-ino=%#lx ino=%#lx authid=%s",
md_ino, ino, authid.c_str());
// new file
md = std::make_shared();
*md = rsp->md_();
(*md)()->set_id(md_ino);
insert(md, authid);
uint64_t md_pino = (*md)()->md_pino();
std::string md_clientid = (*md)()->clientid();
uint64_t md_size = (*md)()->size();
md->Locker().Lock();
uint64_t pino = inomap.forward(md_pino);
shared_md pmd;
if (pino && mdmap.retrieveTS(pino, pmd)) {
if ((*md)()->pt_mtime()) {
(*pmd)()->set_mtime((*md)()->pt_mtime());
(*pmd)()->set_mtime_ns((*md)()->pt_mtime_ns());
(*pmd)()->set_ctime((*md)()->pt_mtime());
(*pmd)()->set_ctime_ns((*md)()->pt_mtime_ns());
}
(*md)()->clear_pt_mtime();
(*md)()->clear_pt_mtime_ns();
inomap.insert((*md)()->md_ino(), (*md)()->id());
add(0, pmd, md, authid, true);
// adjust local quota
cap::shared_cap cap = EosFuse::Instance().caps.get(pino, md_clientid);
if ((*cap)()->id()) {
EosFuse::Instance().caps.book_volume(cap, md_size);
EosFuse::instance().caps.book_inode(cap);
} else {
eos_static_debug("missing quota node for pino=%#llx and clientid=%s",
pino, (*md)()->clientid().c_str());
}
const std::string md_name = (*md)()->name();
md->Locker().UnLock();
// possibly invalidate kernel cache for parent
if (EosFuse::Instance().Config().options.md_kernelcache) {
eos_static_info("invalidate md cache for ino=%016lx", pino);
kernelcache::inval_entry(pino, md_name);
kernelcache::inval_inode(pino, false);
XrdSysMutexHelper mLock(pmd->Locker());
pmd->local_enoent().erase(md_name);
}
} else {
eos_static_err("missing parent mapping pino=%16x for ino%16x",
md_pino, md_ino);
md->Locker().UnLock();
}
}
}
}
}
/* -------------------------------------------------------------------------- */
void
metad::mdcommunicate(ThreadAssistant& assistant)
{
eos::fusex::container hb;
hb.mutable_heartbeat_()->set_name(zmq_name);
hb.mutable_heartbeat_()->set_host(zmq_clienthost);
hb.mutable_heartbeat_()->set_uuid(zmq_clientuuid);
hb.mutable_heartbeat_()->set_version(VERSION);
hb.mutable_heartbeat_()->set_protversion(FUSEPROTOCOLVERSION);
hb.mutable_heartbeat_()->set_pid((int32_t) getpid());
hb.mutable_heartbeat_()->set_starttime(time(NULL));
hb.mutable_heartbeat_()->set_leasetime(
EosFuse::Instance().Config().options.leasetime);
hb.mutable_heartbeat_()->set_mount(EosFuse::Instance().Config().localmountdir);
hb.mutable_heartbeat_()->set_automounted(
EosFuse::Instance().Config().options.automounted);
hb.mutable_heartbeat_()->set_appname(EosFuse::Instance().Config().appname);
hb.set_type(hb.HEARTBEAT);
size_t cnt = 0;
EosFuse::Instance().mds.hb_interval = 10;
bool shutdown = false;
bool first = true;
ThreadAssistant::setSelfThreadName("metad::mdcommunicate");
while (!assistant.terminationRequested() || shutdown == false) {
try {
std::unique_lock connectionMutex(zmq_socket_mutex);
eos_static_debug("");
zmq::pollitem_t items[] = {
{static_cast(*z_socket), 0, ZMQ_POLLIN, 0}
};
struct timespec ts;
eos::common::Timing::GetTimeSpec(ts);
do {
// if there is a ZMQ reconnection to be done we release the ZQM socket mutex
if (zmq_wants_to_connect()) {
connectionMutex.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
continue;
}
if (first) {
// we want to see the first hearteat directly after startup
first = false;
break;
}
// 10 milliseconds
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
zmq_poll(items, 1, 10);
#pragma GCC diagnostic pop
if (assistant.terminationRequested()) {
shutdown = true;
EosFuse::Instance().caps.reset();
eos_static_notice("sending shutdown heartbeat message");
hb.mutable_heartbeat_()->set_shutdown(true);
break;
}
if (items[0].revents & ZMQ_POLLIN) {
int rc;
int64_t more = 0;
size_t more_size = sizeof(more);
zmq_msg_t message;
rc = zmq_msg_init(&message);
if (rc) {
rc = 0;
}
do {
(void)zmq_msg_recv(&message, static_cast(*z_socket), 0);
zmq_getsockopt(static_cast(*z_socket), ZMQ_RCVMORE, &more, &more_size);
} while (more);
std::string s((const char*) zmq_msg_data(&message), zmq_msg_size(&message));
shared_response rsp = std::make_shared();
eos_static_notice("parsing response");
if (rsp->ParseFromString(s)) {
mCb.Lock();
mCbQueue.push_back(rsp);
mCb.Signal();
mCb.UnLock();
} else {
eos_static_err("unable to parse message");
}
zmq_msg_close(&message);
}
// leave the loop to send a heartbeat after the given interval
if ((eos::common::Timing::GetCoarseAgeInNs(&ts,
0) >= (EosFuse::Instance().mds.hb_interval * 1000000000ll))) {
break;
}
} while (1);
eos_static_debug("send");
// prepare a heart-beat message
struct timespec tsnow;
eos::common::Timing::GetTimeSpec(tsnow);
hb.mutable_heartbeat_()->set_clock(tsnow.tv_sec);
hb.mutable_heartbeat_()->set_clock_ns(tsnow.tv_nsec);
mCb.Lock();
if (mCbLog.length()) {
hb.mutable_heartbeat_()->set_log(mCbLog);
mCbLog.clear();
}
if (mCbTrace.length()) {
hb.mutable_heartbeat_()->set_trace(mCbTrace);
mCbTrace.clear();
}
mCb.UnLock();
if (!(cnt % (60 / EosFuse::Instance().mds.hb_interval))) {
// we send a statistics update every 60 heartbeats
EosFuse::Instance().getHbStat((*hb.mutable_statistics_()));
std::string blocker;
std::string origin;
uint64_t blocker_inode;
size_t blocked_ops;
bool root_blocked;
hb.mutable_statistics_()->set_blockedms(
EosFuse::Instance().Tracker().blocked_ms(blocker, blocker_inode, origin,
blocked_ops, root_blocked));
hb.mutable_statistics_()->set_blockedfunc(blocker + std::string(":") + origin);
hb.mutable_statistics_()->set_blockedops(blocked_ops);
hb.mutable_statistics_()->set_blockedroot(root_blocked);
} else {
hb.clear_statistics_();
}
{
// add caps to be revoked
XrdSysMutexHelper rLock(EosFuse::Instance().getCap().get_revocationLock());
// clear the hb map
hb.mutable_heartbeat_()->mutable_authrevocation()->clear();
auto rmap = hb.mutable_heartbeat_()->mutable_authrevocation();
cap::revocation_set_t& revocationset =
EosFuse::Instance().getCap().get_revocationmap();
size_t n_revocations = 0;
for (auto it = revocationset.begin(); it != revocationset.end();) {
(*rmap)[*it] = 0;
eos_static_notice("cap-revocation: authid=%s", it->c_str());
it = revocationset.erase(it);
n_revocations++;
if (n_revocations > 32 * 1024) {
eos_static_notice("stopped revocations after 32k entries");
break;
}
}
eos_static_debug("cap-revocation: map-size=%u",
revocationset.size());
}
std::string hbstream;
zmq::message_t hb_msg;
hb.SerializeToString(&hbstream);
hb_msg.rebuild(hbstream.c_str(), hbstream.length());
if (!z_socket->send(hb_msg, zmq::send_flags::none)) {
eos_static_err("err sending heartbeat: hbstream.c_str()=%s, hbstream.length()=%d, hbstream:hex=%s",
hbstream.c_str(), hbstream.length(),
eos::common::stringToHex(hbstream).c_str());
} else {
eos_static_debug("debug sending heartbeat: hbstream.c_str()=%s, hbstream.length()=%d, hbstream:hex=%s",
hbstream.c_str(), hbstream.length(),
eos::common::stringToHex(hbstream).c_str());
last_heartbeat = time(NULL);
}
if (!is_visible()) {
set_is_visible(1);
}
hb.mutable_heartbeat_()->clear_log();
hb.mutable_heartbeat_()->clear_trace();
} catch (std::exception& e) {
eos_static_err("catched exception %s", e.what());
}
cnt++;
}
}
/* -------------------------------------------------------------------------- */
void
metad::vmap::insert(fuse_ino_t a, fuse_ino_t b)
{
// weonly store ino=1 mappings
if ((a != 1) &&
(b != 1)) {
return;
}
eos_static_info("inserting %llx <=> %llx", a, b);
XrdSysMutexHelper mLock(mMutex);
if (fwd_map.count(a) && fwd_map[a] == b) {
return;
}
if (bwd_map.count(b)) {
fwd_map.erase(bwd_map[b]);
}
fwd_map[a] = b;
bwd_map[b] = a;
}
/* -------------------------------------------------------------------------- */
std::string
metad::vmap::dump()
{
//XrdSysMutexHelper mLock(this);
std::string sout;
char stime[1024];
snprintf(stime, sizeof(stime), "%lu this=%llx forward=%lu backward=%lu",
time(NULL), (unsigned long long) this, fwd_map.size(), bwd_map.size());
sout += stime;
sout += "\n";
for (auto it = fwd_map.begin(); it != fwd_map.end(); it++) {
char out[1024];
snprintf(out, sizeof(out), "%16lx => %16lx\n", it->first, it->second);
sout += out;
}
for (auto it = bwd_map.begin(); it != bwd_map.end(); it++) {
char out[1024];
snprintf(out, sizeof(out), "%16lx <= %16lx\n", it->first, it->second);
sout += out;
}
sout += "end\n";
return sout;
}
/* -------------------------------------------------------------------------- */
void
metad::vmap::erase_fwd(fuse_ino_t lookup)
{
XrdSysMutexHelper mLock(mMutex);
if (fwd_map.count(lookup)) {
bwd_map.erase(fwd_map[lookup]);
}
fwd_map.erase(lookup);
}
/* -------------------------------------------------------------------------- */
void
metad::vmap::erase_bwd(fuse_ino_t lookup)
{
XrdSysMutexHelper mLock(mMutex);
if (bwd_map.count(lookup)) {
fwd_map.erase(bwd_map[lookup]);
}
bwd_map.erase(lookup);
}
/* -------------------------------------------------------------------------- */
fuse_ino_t
metad::vmap::forward(fuse_ino_t lookup)
{
XrdSysMutexHelper mLock(mMutex);
auto it = fwd_map.find(lookup);
fuse_ino_t ino = (it == fwd_map.end()) ? 0 : it->second;
if (!ino) {
return lookup;
}
return ino;
}
/* -------------------------------------------------------------------------- */
fuse_ino_t
metad::vmap::backward(fuse_ino_t lookup)
{
XrdSysMutexHelper mLock(mMutex);
auto it = bwd_map.find(lookup);
return (it == bwd_map.end()) ? lookup : it->second;
}
/* -------------------------------------------------------------------------- */
size_t
metad::pmap::sizeTS()
{
XrdSysMutexHelper mLock(this);
return size();
}
/* -------------------------------------------------------------------------- */
bool
metad::pmap::retrieveOrCreateTS(fuse_ino_t ino, shared_md& ret)
{
XrdSysMutexHelper mLock(this);
if (this->retrieve(ino, ret)) {
return false;
}
ret = std::make_shared();
if (ino) {
(*this)[ino] = ret;
}
return true;
}
/* -------------------------------------------------------------------------- */
bool
metad::pmap::retrieveTS(fuse_ino_t ino, shared_md& ret)
{
XrdSysMutexHelper mLock(this);
return this->retrieve(ino, ret);
}
/* -------------------------------------------------------------------------- */
bool
metad::pmap::retrieve(fuse_ino_t ino, shared_md& ret)
{
auto it = this->find(ino);
if (it == this->end()) {
if (!ret) {
ret = std::make_shared();
(*ret)()->set_err(ENOENT);
}
return false;
}
shared_md md = it->second;
eos_static_debug("retc=%x", (bool)(md));
if (!md) {
md = std::make_shared();
// swap-in this inode
const int rc = swap_in(ino, md);
if (rc) {
eos_static_crit("failed to swap-in ino=%#llx", ino);
if (!ret) {
ret = std::make_shared();
(*ret)()->set_err(rc);
}
return false;
}
// attach the new object
(*this)[ino] = md;
// add to the lru list
lru_add(ino, md);
}
ret = md;
// update lru entry whenever we retrieve something
lru_update(ino, ret);
return true;
}
/* -------------------------------------------------------------------------- */
uint64_t
metad::pmap::lru_oldest() const
{
return lru_last;
}
/* -------------------------------------------------------------------------- */
uint64_t
metad::pmap::lru_newest() const
{
return lru_first;
}
/* -------------------------------------------------------------------------- */
void
metad::pmap::lru_add(fuse_ino_t ino, shared_md md)
{
if (ino <= 1) {
return;
}
md->set_lru_prev(lru_first);
md->set_lru_next(0);
// lru list insert with outside lock handling
if (this->count(lru_first)) {
if ((*this)[lru_first]) {
// connect the new inode to the head of the lru list
(*this)[lru_first]->set_lru_next(ino);
} else {
// points to swapped-out entry
lru_last = ino;
}
}
lru_first = ino;
if (!lru_last) {
lru_last = ino;
}
eos_static_info("ino=%#llx first=%#llx last=%#llx prev=%llx next=%#llx", ino,
lru_first, lru_last, md->lru_prev(), md->lru_next());
}
/* -------------------------------------------------------------------------- */
void
metad::pmap::lru_remove(fuse_ino_t ino)
{
if (ino <= 1) {
return;
}
uint64_t prev = 0;
uint64_t next = 0;
if (EOS_LOGS_DEBUG)
eos_static_debug("ino=%#llx first=%#llx last=%#llx", ino,
lru_first, lru_last);
// lru list handling with outside lock handling
if (this->count(ino)) {
shared_md smd = (*this)[ino];
if (smd) {
prev = (*this)[ino]->lru_prev();
next = (*this)[ino]->lru_next();
if (this->count(prev) && (*this)[prev]) {
(*this)[prev]->set_lru_next(next);
} else {
// this is the tail of the LRU list
lru_last = next;
if (next && this->count(next) && (*this)[next]) {
(*this)[next]->set_lru_prev(0);
}
}
if (this->count(next) && (*this)[next]) {
(*this)[next]->set_lru_prev(prev);
} else {
// this is the head of the LRU list
lru_first = prev;
if (prev && this->count(prev) && (*this)[prev]) {
(*this)[prev]->set_lru_next(0);
}
}
}
if (EOS_LOGS_DEBUG) {
eos_static_debug("last:%#llx => %#llx (prev=%#llx)", lru_last, next, prev);
}
}
if (EOS_LOGS_DEBUG)
eos_static_debug("ino=%#llx first=%#llx last=%#llx prev=%#llx next=%#llx", ino,
lru_first, lru_last, prev, next);
}
/* -------------------------------------------------------------------------- */
void
metad::pmap::lru_update(fuse_ino_t ino, shared_md md)
{
if (ino == 1) {
return;
}
if (lru_first == ino) {
return;
}
if (EOS_LOGS_DEBUG)
eos_static_debug("ino=%#llx first=%#llx last=%#llx", ino,
lru_first, lru_last);
// move an lru item to the head of the list
uint64_t prev = md->lru_prev();
uint64_t next = md->lru_next();
if (this->count(prev) && (*this)[prev]) {
(*this)[prev]->set_lru_next(next);
} else {
if (next) {
lru_last = next;
} else {
lru_last = ino;
}
}
if (this->count(next) && (*this)[next]) {
(*this)[next]->set_lru_prev(prev);
}
if (this->count(lru_first) && (*this)[lru_first]) {
(*this)[lru_first]->set_lru_next(ino);
md->set_lru_prev(lru_first);
md->set_lru_next(0);
lru_first = ino;
}
if (EOS_LOGS_DEBUG)
eos_static_debug("ino=%#llx first=%#llx last=%#llx prev=%#llx next=%#llx", ino,
lru_first, lru_last, prev, next);
}
/* -------------------------------------------------------------------------- */
void
metad::pmap::lru_dump(bool force)
{
if (!EOS_LOGS_DEBUG && !force) {
return;
}
uint64_t start = lru_first;
std::stringstream ss;
size_t cnt = 0;
size_t max = this->size();
ss << std::endl;
do {
if (this->count(start) && (*this)[start]) {
shared_md md = (*this)[start];
ss << "[ " << std::setw(16) << std::hex << md->lru_next();
ss << " <- ";
ss << std::setw(16) << std::hex << start;
ss << " -> ";
ss << std::setw(16) << std::hex << md->lru_prev();
ss << " ]" << std::endl;
if (start == md->lru_prev()) {
eos_static_crit("corruption in list");
break;
}
start = md->lru_prev();
cnt++;
} else {
start = 0;
}
} while (start && (cnt < max));
if (force) {
std::cerr << ss.str();
eos_static_crit("first=%#llx last=%#llx cnt=%d max=%d",
lru_first, lru_last, cnt, max);
} else {
std::cerr << ss.str();
eos_static_debug("first=%#llx last=%#llx",
lru_first, lru_last);
}
}
/* -------------------------------------------------------------------------- */
void
metad::pmap::lru_reset()
{
eos_static_crit("resetting LRU list");
// force an output of that list
lru_dump(true);
// wipe lru list
this->lru_first = this->lru_last = 0;
std::map::reverse_iterator it;
for (it = (*this).rbegin(); it != (*this).rend(); ++it) {
shared_md md = (*this)[it->first];
if (md) {
md->set_lru_prev(0);
md->set_lru_next(0);
}
}
// recreate lru list in backward inode order
for (it = (*this).rbegin(); it != (*this).rend(); ++it) {
shared_md md = (*this)[it->first];
if (md) {
lru_add(it->first, md);
}
}
// done
}
/* -------------------------------------------------------------------------- */
int
metad::mdx::state_serialize(std::string& mdsstream)
{
eos::fusex::md_state state;
// serialize in-memory state into md_state and then into string
state.set_op(op);
state.set_lookup_cnt(lookup_cnt);
state.set_cap_cnt(cap_cnt);
state.set_opendir_cnt(opendir_cnt);
state.set_lock_remote(lock_remote);
state.set_refresh(refresh);
state.set_rmrf(rmrf);
for (auto it = todelete.begin(); it != todelete.end() ; ++it) {
(*(state.mutable_todelete()))[it->first] = it->second;
}
for (auto it = _local_children.begin(); it != _local_children.end() ; ++it) {
(*(state.mutable_children()))[it->first] = it->second;
}
for (auto it = _local_enoent.begin(); it != _local_enoent.end(); ++it) {
(*(state.mutable_enoent())) [*it] = 0;
}
if (!state.SerializeToString(&mdsstream)) {
return EFAULT;
}
return 0;
}
/* -------------------------------------------------------------------------- */
int
metad::mdx::state_deserialize(std::string& mdsstream)
{
eos::fusex::md_state state;
// deserialize in-memory state from string
if (!state.ParseFromString(mdsstream)) {
return EFAULT;
}
op = (metad::mdx::md_op)state.op();
lookup_cnt = state.lookup_cnt();
cap_cnt = state.cap_cnt();
opendir_cnt = state.opendir_cnt();
lock_remote = state.lock_remote();
refresh = state.refresh();
rmrf = state.rmrf();
for (auto it = state.todelete().begin(); it != state.todelete().end(); ++it) {
todelete[it->first] = it->second;
}
for (auto it = state.children().begin(); it != state.children().end(); ++it) {
_local_children[it->first] = it->second;
}
for (auto it = state.enoent().begin(); it != state.enoent().end(); ++it) {
_local_enoent.insert(it->first);
}
return 0;
}
/* -------------------------------------------------------------------------- */
int
metad::pmap::swap_out(fuse_ino_t ino, shared_md md)
{
// serialize an in-memory md object into the kv store
std::string mdstream;
std::string mdsstream;
if (!(*md)()->SerializeToString(&mdstream)) {
return EFAULT;
}
if (md->state_serialize(mdsstream)) {
return EFAULT;
}
if (store) {
std::string md_key = "md.";
md_key += std::to_string(ino);
if (store->put(md_key, mdstream)) {
return EIO;
}
std::string md_state_key = "mds.";
md_state_key += std::to_string(ino);
if (store->put(md_state_key, mdsstream)) {
return EIO;
}
}
EosFuse::Instance().mds.stats().inodes_stacked_inc();
return 0;
}
/* -------------------------------------------------------------------------- */
int
metad::pmap::swap_in(fuse_ino_t ino, shared_md md)
{
// deserialize an in-memory md object from the kv store
std::string mdstream;
std::string mdsstream;
if (store) {
std::string md_key = "md.";
md_key += std::to_string(ino);
if (store->get(md_key, mdstream)) {
eos_static_err("unable to swap-in md ino=%16x errno=%d", ino, EIO);
return EIO;
}
if (!(*md)()->ParseFromString(mdstream)) {
eos_static_err("unable to swap-in md ino=%16x errno=%d", ino, EFAULT);
return EFAULT;
}
std::string md_state_key = "mds.";
md_state_key += std::to_string(ino);
if (store->get(md_state_key, mdsstream)) {
eos_static_err("unable to swap-in md-state ino=%16x errno=%d", ino, EIO);
return EIO;
}
if (md->state_deserialize(mdsstream)) {
eos_static_err("unable to swap-in md-state ino=%16x errno=%d", ino, EFAULT);
return EFAULT;
}
}
EosFuse::Instance().mds.stats().inodes_stacked_dec();
return 0;
}
/* -------------------------------------------------------------------------- */
int
metad::pmap::swap_rm(fuse_ino_t ino)
{
// delete from the external KV store
if (store) {
std::string md_key = "md.";
md_key += std::to_string(ino);
if (store->erase(md_key)) {
return EIO;
}
std::string md_state_key = "mds.";
md_state_key += std::to_string(ino);
if (store->erase(md_state_key)) {
return EIO;
}
}
return 0;
}
/* -------------------------------------------------------------------------- */
void
metad::pmap::insertTS(fuse_ino_t ino, shared_md& md)
{
XrdSysMutexHelper mLock(this);
bool exists = this->count(ino);
(*this)[ino] = md;
// lru list handling
if (!exists) {
lru_add(ino, md);
}
lru_dump();
}
/* -------------------------------------------------------------------------- */
bool
metad::pmap::eraseTS(fuse_ino_t ino)
{
XrdSysMutexHelper mLock(this);
// lru list handling
lru_remove(ino);
bool exists = false;
auto it = this->find(ino);
if ((it != this->end()) && it->first) {
exists = true;
}
if (exists && !it->second) {
// deletion of a stacked inode has to be accounted for
EosFuse::Instance().mds.stats().inodes_stacked_dec();
}
if (exists) {
this->erase(it);
}
swap_rm(ino); // ignore return code
return exists;
}
/* -------------------------------------------------------------------------- */
void
metad::pmap::retrieveWithParentTS(fuse_ino_t ino, shared_md& md, shared_md& pmd,
std::string& md_name)
{
// Atomically retrieve md objects for an inode, and its parent.
while (true) {
// In this particular case, we need to first lock mdmap, and then
// md.. The following algorithm is meant to avoid deadlocks with code
// which locks md first, and then mdmap.
md.reset();
pmd.reset();
md_name.clear();
XrdSysMutexHelper mLock(this);
if (!retrieve(ino, md)) {
return; // ino not there, nothing to do
}
// md has been found. Can we lock it?
if (md->Locker().CondLock()) {
// Success!
retrieve((*md)()->pid(), pmd);
md_name = (*md)()->name();
md->Locker().UnLock();
return;
}
// Nope, unlock mdmap and try again.
mLock.UnLock();
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}