// ----------------------------------------------------------------------
// File: ReplicationTracker.cc
// Author: Andreas-Joachim Peters - CERN
// ----------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2019 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "common/Constants.hh"
#include "common/Path.hh"
#include "common/FileId.hh"
#include "common/IntervalStopwatch.hh"
#include "common/LayoutId.hh"
#include "mgm/FsView.hh"
#include "mgm/tracker/ReplicationTracker.hh"
#include "mgm/proc/ProcCommand.hh"
#include "mgm/XrdMgmOfs.hh"
#include "namespace/interface/IView.hh"
#include "namespace/Resolver.hh"
#include "namespace/Prefetcher.hh"
EOSMGMNAMESPACE_BEGIN
using namespace eos::common;
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
ReplicationTracker::ReplicationTracker(const char* path) : mPath(path)
{
mVid = eos::common::VirtualIdentity::Root();
mThread.reset(&ReplicationTracker::backgroundThread, this);
}
//------------------------------------------------------------------------------
// Destructor
//------------------------------------------------------------------------------
ReplicationTracker::~ReplicationTracker()
{
mThread.join();
}
//------------------------------------------------------------------------------
// Create a new file
//------------------------------------------------------------------------------
void
ReplicationTracker::Create(std::shared_ptr fmd)
{
if (!enabled()) {
return;
}
std::string prefix = Prefix(fmd);
std::string tag = prefix + eos::common::FileId::Fid2Hex(fmd->getId());
std::shared_ptr dmd;
try {
gOFS->eosView->createContainer(prefix, true);
dmd = gOFS->eosView->getContainer(prefix);
dmd->setCTimeNow();
gOFS->eosView->updateContainerStore(dmd.get());
} catch (const MDException& e) {
}
try {
fmd = gOFS->eosView->createFile(tag.c_str(), 0, 0);
} catch (const MDException& e) {
eos_static_crit("failed to create tag file='%s'", tag.c_str());
return;
}
std::string uri = gOFS->eosView->getUri(fmd.get());
eos_static_info("op=created tag='%s' uri='%s'", tag.c_str(), uri.c_str());
return;
}
std::string
ReplicationTracker::ConversionPolicy(bool injection, int fsid)
{
std::string space = FsView::gFsView.mIdView.lookupSpaceByID(fsid);
eos_static_debug("%s %d", space.c_str(), fsid);
if (space.length()) {
eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex);
auto it = FsView::gFsView.mSpaceView.find(space);
if (it != FsView::gFsView.mSpaceView.end()) {
if (injection) {
return it->second->GetConfigMember("policy.conversion.injection");
} else {
return it->second->GetConfigMember("policy.conversion.creation");
}
}
}
return "";
}
std::string
ReplicationTracker::ConversionSizePolicy(bool injection, int fsid)
{
std::string space = FsView::gFsView.mIdView.lookupSpaceByID(fsid);
eos_static_debug("%s %d", space.c_str(), fsid);
if (space.length()) {
eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex);
auto it = FsView::gFsView.mSpaceView.find(space);
if (it != FsView::gFsView.mSpaceView.end()) {
if (injection) {
return it->second->GetConfigMember("policy.conversion.injection.size");
} else {
return it->second->GetConfigMember("policy.conversion.creation.size");
}
}
}
return "";
}
//------------------------------------------------------------------------------
// Commit a file
//------------------------------------------------------------------------------
void
ReplicationTracker::Commit(std::shared_ptr fmd)
{
// check if this is still a 'temporary' name
if (fmd->getName().substr(0,
strlen(EOS_COMMON_PATH_ATOMIC_FILE_PREFIX)) ==
EOS_COMMON_PATH_ATOMIC_FILE_PREFIX) {
// check if this is still a 'temporary' name
return;
}
bool tapecopy = fmd->hasLocation(TAPE_FS_ID);
// check replica count
if ((fmd->getNumLocation() - tapecopy) ==
(eos::common::LayoutId::GetStripeNumber(fmd->getLayoutId()) + 1)) {
if (conversion_enabled()) {
// determine the space from the first filesystem ID stored
int fsid = fmd->getLocations()[0];
if (fsid == TAPE_FS_ID) {
if (fmd->getNumLocation() > 1) {
fsid = fmd->getLocations()[1];
}
}
{
std::string policy = ConversionPolicy(tapecopy, fsid);
if (policy.length()) {
size_t cutoff_size = 0;
bool do_conversion = true;
std::string size_policy = ConversionSizePolicy(tapecopy, fsid);
if (size_policy.length()) {
switch ((size_policy.at(0))) {
case '<':
// max size policy
cutoff_size = std::stol(size_policy.substr(1));
if (fmd->getSize() >= cutoff_size) {
if (EOS_LOGS_DEBUG) {
eos_static_debug("suppressing conversion because of minimum size policy '%s' fxid:%08llx",
policy.c_str(), fmd->getId());
}
do_conversion = false;
}
break;
case '>':
// min size policy
cutoff_size = std::stol(size_policy.substr(1));
if (fmd->getSize() <= cutoff_size) {
if (EOS_LOGS_DEBUG) {
eos_static_debug("suppressing conversion because of maximum size policy '%s' fxid:%08llx",
policy.c_str(), fmd->getId());
}
do_conversion = false;
}
default:
eos_static_warning("illegal space conversion policy size: should be empty '', size '>1000");
break;
}
}
if (do_conversion) {
// create a conversion job for this file according to the policy definition
eos_static_info("triggering conversion policy '%s' for fxid:%08llx",
policy.c_str(), fmd->getId());
std::string layout;
std::string space;
if (eos::common::StringConversion::SplitKeyValue(policy,
layout,
space,
"@")) {
std::string info = "mgm.cmd=file&mgm.subcmd=convert&mgm.convert.layout=";
info += layout;
info += "&mgm.convert.space=";
info += space;
info += "&mgm.file.id=";
info += std::to_string(fmd->getId());
XrdOucErrInfo error;
eos::common::VirtualIdentity rootvid = eos::common::VirtualIdentity::Root();
ProcCommand cmd;
cmd.open("/proc/user", info.c_str(), rootvid, &error);
cmd.close();
int rc = cmd.GetRetc();
if (rc) {
eos_static_err("converions-hook failed with rc=%d for fxid:%08llx", rc,
fmd->getId());
}
}
}
}
}
}
if (!enabled()) {
return;
}
std::string prefix = Prefix(fmd);
std::string tag = prefix + eos::common::FileId::Fid2Hex(fmd->getId());
std::string uri = gOFS->eosView->getUri(fmd.get());
std::shared_ptr entry_fmd;
eos::common::RWMutexWriteLock nslock(gOFS->eosViewRWMutex);
try {
entry_fmd = gOFS->eosView->getFile(tag);
gOFS->eosView->unlinkFile(entry_fmd.get());
} catch (const MDException& e) {
if (e.getErrno() != ENOENT) {
eos_static_crit("failed to remove tag file='%s' error='%s'", tag.c_str(),
e.what());
}
return;
}
eos_static_info("op=removed tag='%s' uri='%s'", tag.c_str(), uri.c_str());
}
return;
}
//------------------------------------------------------------------------------
// Validate a file
//------------------------------------------------------------------------------
void
ReplicationTracker::Validate(std::shared_ptr fmd)
{
}
//------------------------------------------------------------------------------
// Get Tag file prefix
//------------------------------------------------------------------------------
std::string
ReplicationTracker::Prefix(std::shared_ptr fmd)
{
char strackerfile[4096];
eos::IFileMD::ctime_t ctime;
fmd->getCTime(ctime);
time_t now = ctime.tv_sec;
struct tm nowtm;
localtime_r(&now, &nowtm);
snprintf(strackerfile, sizeof(strackerfile), "%s/%04u/%02u/%02u/",
mPath.c_str(),
1900 + nowtm.tm_year,
nowtm.tm_mon + 1,
nowtm.tm_mday);
return strackerfile;
}
//------------------------------------------------------------------------------
// Retrieve current LRU configuration options
//------------------------------------------------------------------------------
ReplicationTracker::Options ReplicationTracker::getOptions()
{
eos::common::RWMutexReadLock lock(FsView::gFsView.ViewMutex);
ReplicationTracker::Options opts;
// Default options
opts.enabled = false;
opts.interval = std::chrono::minutes(60);
if (FsView::gFsView.mSpaceView.count("default") &&
(FsView::gFsView.mSpaceView["default"]->GetConfigMember("tracker") == "on")) {
opts.enabled = true;
}
if (opts.enabled) {
enable();
eos_static_debug("creation tracker is enabled");
} else {
disable();
}
// this is hardcoded to 2 days, it could be 'dangerous' to make this really configurable
opts.atomic_cleanup_age = 2 * 86400;
return opts;
}
//------------------------------------------------------------------------------
// Background Thread cleaning up left-over atomic uploads
//------------------------------------------------------------------------------
void
ReplicationTracker::backgroundThread(ThreadAssistant& assistant) noexcept
{
gOFS->WaitUntilNamespaceIsBooted(assistant);
// set the initial state after boot
Options opts = getOptions();
if (opts.enabled) {
enable();
} else {
disable();
}
assistant.wait_for(std::chrono::seconds(10));
eos_static_info("msg=\"async thread started\"");
while (!assistant.terminationRequested()) {
// every now and then we wake up
Options opts = getOptions();
// Only a master needs to run a ReplicationTracker
if (opts.enabled) {
enable();
} else {
disable();
}
common::IntervalStopwatch stopwatch(enabled() ? opts.interval :
std::chrono::seconds(10));
if (gOFS->mMaster->IsMaster()) {
eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex);
auto it = FsView::gFsView.mSpaceView.find("default");
if (it != FsView::gFsView.mSpaceView.end()) {
if (it->second->GetConfigMember("policy.conversion") == "on") {
if (!conversion_enabled()) {
conversion_enable();
eos_static_info("enabling space conversion hooks");
}
} else {
if (conversion_enabled()) {
conversion_disable();
eos_static_info("disabling space conversion hooks");
}
}
}
}
if (opts.enabled && gOFS->mMaster->IsMaster()) {
eos_static_info("msg=\"scan started!\"");
Scan(opts.atomic_cleanup_age, true, 0);
eos_static_info("msg=\"scan finished!\"");
}
assistant.wait_for(stopwatch.timeRemainingInCycle());
}
}
//------------------------------------------------------------------------------
// Scan entries in creation tracker - opt cleanup or output
//------------------------------------------------------------------------------
void
ReplicationTracker::Scan(uint64_t atomic_age, bool cleanup, std::string* out)
{
eos::common::RWMutexReadLock viewReadLock;
time_t now = time(NULL);
std::map > found;
XrdOucString stdErr;
if (!enabled()) {
*out += "# tracker is disabled - use 'eos space config default space.tracker=on'\n";
}
if (!gOFS->_find(mPath.c_str(),
mError,
stdErr,
mVid,
found,
0,
0,
false,
10)
) {
for (auto rfoundit = found.rbegin(); rfoundit != found.rend(); rfoundit++) {
if (!rfoundit->second.size()) {
std::string creationpath = mPath + "/";
if (rfoundit->first == creationpath) {
// don't delete the creation proc entry
continue;
}
std::shared_ptr dmd;
eos::IContainerMD::ctime_t ctime;
// delete this directory if it is older than atomic_age
eos::common::RWMutexWriteLock viewWriteLock(gOFS->eosViewRWMutex);
try {
dmd = gOFS->eosView->getContainer(rfoundit->first);
dmd->getCTime(ctime);
uint64_t age = now - ctime.tv_sec;
if (age > atomic_age &&
!dmd->getNumFiles() &&
!dmd->getNumContainers()) {
gOFS->eosView->removeContainer(rfoundit->first);
}
} catch (const MDException& e) {
eos_static_crit("failed to remove directory='%s'", rfoundit->first.c_str());
}
viewWriteLock.Release();
} else {
for (auto fileit = rfoundit->second.begin(); fileit != rfoundit->second.end();
fileit++) {
std::string fspath = rfoundit->first;
std::string entry = *fileit;
std::string entry_path = fspath + "/" + entry;
XrdOucString fxid = "fxid:";
std::string fullpath;
bool flag_deletion = false;
bool is_atomic = false;
std::string reason = "KEEPIT";
fxid += entry.c_str();
std::shared_ptr fmd;
std::shared_ptr entry_fmd;
size_t n_rep = 0;
size_t n_layout_rep = 0;
unsigned long long fid = Resolver::retrieveFileIdentifier(
fxid).getUnderlyingUInt64();
eos::IFileMD::ctime_t ctime;
// reference by fxid
eos::Prefetcher::prefetchFileMDAndWait(gOFS->eosView, fid);
viewReadLock.Grab(gOFS->eosViewRWMutex);
try {
fmd = gOFS->eosFileService->getFileMD(fid);
fmd->getCTime(ctime);
fullpath = gOFS->eosView->getUri(fmd.get());
if (fmd->getName().substr(0,
strlen(EOS_COMMON_PATH_ATOMIC_FILE_PREFIX)) ==
EOS_COMMON_PATH_ATOMIC_FILE_PREFIX) {
is_atomic = true;
}
n_rep = fmd->getNumLocation();
n_layout_rep = (eos::common::LayoutId::GetStripeNumber(fmd->getLayoutId()) + 1);
if (n_rep < n_layout_rep) {
reason = "REPLOW";
} else {
reason = "REP-OK";
flag_deletion = true;
}
} catch (eos::MDException& e) {
errno = e.getErrno();
stdErr = "error: cannot retrieve file meta data - ";
stdErr += e.getMessage().str().c_str();
eos_static_debug("caught exception %d %s\n", e.getErrno(),
e.getMessage().str().c_str());
reason = "ENOENT";
flag_deletion = true;
ctime.tv_sec = now - atomic_age - 1;
}
viewReadLock.Release();
uint64_t age = now - ctime.tv_sec;
if (is_atomic && (age > atomic_age)) {
flag_deletion = true;
reason = "ATOMIC";
}
if (out) {
if (reason == "ENOENT") {
// don't show files which had been deleted
continue;
}
char outline[16384];
snprintf(outline, sizeof(outline),
"key=%s age=%lu (s) delete=%d rep=%lu/%lu atomic=%d reason=%s uri='%s'\n",
entry.c_str(),
age,
flag_deletion,
n_rep,
n_layout_rep,
is_atomic,
reason.c_str(),
fullpath.c_str());
*out += outline;
if (out->size() > (128 * 1024 * 1024)) {
* out += "# ... list has been truncated\n";
return;
}
} else {
if (reason == "ENOENT") {
// mark for tag deletion
flag_deletion = 1;
}
eos_static_info("key=%s age=%lu (s) delete=%d rep=%lu/%lu atomic=%d reason=%s uri='%s'",
entry.c_str(),
age,
flag_deletion,
n_rep,
n_layout_rep,
is_atomic,
reason.c_str(),
fullpath.c_str());
}
if (cleanup && flag_deletion) {
eos::common::RWMutexWriteLock viewWriteLock(gOFS->eosViewRWMutex);
// cleanup the tag entry
try {
entry_fmd = gOFS->eosView->getFile(entry_path);
gOFS->eosView->unlinkFile(entry_fmd.get());
} catch (const MDException& e) {
eos_static_crit("failed to remove tag file='%s'", entry_path.c_str());
}
if (reason == "ATOMIC") {
// cleanup the atomic left-over
try {
fmd = gOFS->eosFileService->getFileMD(fid);
gOFS->eosView->unlinkFile(fmd.get());
} catch (const MDException& e) {
eos_static_crit("failed to cleanup atomic target file='%s'", fullpath.c_str());
}
}
viewWriteLock.Release();
}
}
}
}
} else {
eos_static_err("find failed in path='%s' errmsg='%s'",
mPath.c_str(),
stdErr.c_str());
}
}
EOSMGMNAMESPACE_END