//------------------------------------------------------------------------------
// File: LRU.cc
// Author: Andreas-Joachim Peters - CERN
//------------------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2011 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "common/Logging.hh"
#include "common/LayoutId.hh"
#include "common/Mapping.hh"
#include "common/RWMutex.hh"
#include "common/ParseUtils.hh"
#include "common/Path.hh"
#include "common/IntervalStopwatch.hh"
#include "mgm/Quota.hh"
#include "mgm/LRU.hh"
#include "mgm/Stat.hh"
#include "mgm/XrdMgmOfs.hh"
#include "mgm/convert/ConverterDriver.hh"
#include "namespace/interface/IView.hh"
#include "namespace/interface/ContainerIterators.hh"
#include "namespace/Prefetcher.hh"
#include "namespace/ns_quarkdb/explorer/NamespaceExplorer.hh"
#include "namespace/ns_quarkdb/NamespaceGroup.hh"
#include
//! Attribute name defining any LRU policy
const char* LRU::gLRUPolicyPrefix = "sys.lru.*";
EOSMGMNAMESPACE_BEGIN
using namespace eos::common;
//------------------------------------------------------------------------------
// Start the LRU thread
//------------------------------------------------------------------------------
void LRU::Start()
{
mThread.reset(&LRU::backgroundThread, this);
}
//------------------------------------------------------------------------------
// Stop the LRU thread
//------------------------------------------------------------------------------
void LRU::Stop()
{
mThread.join();
}
//------------------------------------------------------------------------------
// Retrieve "lru.interval" configuration option as string, or empty if
// cannot be found. Assumes gFsView.ViewMutex is at-least readlocked.
//------------------------------------------------------------------------------
std::string LRU::getLRUIntervalConfig() const
{
if (FsView::gFsView.mSpaceView.count("default") == 0) {
return "";
}
return FsView::gFsView.mSpaceView["default"]->GetConfigMember("lru.interval");
}
//------------------------------------------------------------------------------
// Retrieve current LRU configuration options
//------------------------------------------------------------------------------
LRU::Options LRU::getOptions()
{
LRU::Options opts;
// Default options
opts.enabled = false;
opts.interval = std::chrono::minutes(30);
eos::common::RWMutexReadLock lock(FsView::gFsView.ViewMutex);
if (FsView::gFsView.mSpaceView.count("default") &&
(FsView::gFsView.mSpaceView["default"]->GetConfigMember("lru") == "on")) {
opts.enabled = true;
}
std::string interval = getLRUIntervalConfig();
int64_t intv = 0;
if (opts.enabled && (interval.empty() || !common::ParseInt64(interval, intv))) {
eos_static_crit("%s", "msg=\"unable to parse space config lru.interval "
"option, disabling LRU!\"");
opts.enabled = false;
} else {
opts.interval = std::chrono::seconds(intv);
}
if (opts.enabled) {
eos_static_info("msg=\"lru is enabled\" interval=%ds", opts.interval.count());
}
// Set long interval in case LRU is de-activated, prevent the background
// thread from spinning
if (!opts.enabled || opts.interval == std::chrono::seconds(0)) {
opts.interval = std::chrono::minutes(30);
}
return opts;
}
//------------------------------------------------------------------------------
// Constructor. To run the LRU thread, call Start
//------------------------------------------------------------------------------
LRU::LRU() :
mQcl(nullptr), mRootVid(eos::common::VirtualIdentity::Root()),
mRefresh(false)
{
}
//------------------------------------------------------------------------------
// Destructor - stop the background thread, if running
//------------------------------------------------------------------------------
LRU::~LRU()
{
Stop();
}
//------------------------------------------------------------------------------
// Parse an "sys.lru.expire.match" policy
//------------------------------------------------------------------------------
bool LRU::parseExpireMatchPolicy(const std::string& policy,
std::map& matchAgeMap)
{
matchAgeMap.clear();
std::map tmpMap;
if (!StringConversion::GetKeyValueMap(policy.c_str(), tmpMap, ":")) {
// Failed splitting on ":", cannot parse further
return false;
}
for (auto it = tmpMap.begin(); it != tmpMap.end(); it++) {
uint64_t out;
if (!StringConversion::GetSizeFromString(it->second, out)) {
eos_static_err("msg=\"LRU match attribute has illegal age\" "
"match=\"%s\", age=\"%s\"",
it->first.c_str(),
it->second.c_str());
} else {
matchAgeMap[it->first] = out;
eos_static_info("msg=\"add expire policy\" rule=\"%s %llu\"",
it->first.c_str(), out);
}
}
return true;
}
//------------------------------------------------------------------------------
// Perform a single LRU cycle, QDB namespace
//------------------------------------------------------------------------------
void LRU::performCycleQDB(ThreadAssistant& assistant) noexcept
{
eos_static_info("%s", "msg=\"start LRU scan on QDB\"");
// Build exploration options..
ExplorationOptions opts;
opts.populateLinkedAttributes = true;
opts.view = gOFS->eosView;
opts.ignoreFiles = true;
opts.depthLimit = eos::common::Path::MAX_LEVELS;
// Initialize qclient..
if (!mQcl) {
mQcl.reset(new qclient::QClient(gOFS->mQdbContactDetails.members,
gOFS->mQdbContactDetails.constructOptions()));
}
// Start exploring
NamespaceExplorer
explorer("/", opts, *(mQcl.get()),
static_cast(gOFS->namespaceGroup.get())->getExecutor());
NamespaceItem item;
int64_t processed = 0;
while (explorer.fetch(item)) {
eos_static_debug("lru-dir-qdb=\"%s\" attrs=%d", item.fullPath.c_str(),
item.attrs.size());
processDirectory(item.fullPath,item.attrs);
processed++;
if (processed % 1000 == 0) {
eos_static_info("msg=\"LRU scan in progress\" num_scanned_dirs=%lli",
processed);
if (assistant.terminationRequested()) {
eos_static_info("%s", "msg=\"termination requested, quit LRU\"");
break;
}
}
}
eos_static_info("msg=\"LRU scan done\" num_scanned_dirs=%lli", processed);
}
//------------------------------------------------------------------------------
// LRU method doing the actual policy scrubbing
//
// This thread loops in regular intervals over all directories which have
// a LRU policy attribute set (sys.lru.*) and applies the defined policy.
//------------------------------------------------------------------------------
void LRU::backgroundThread(ThreadAssistant& assistant) noexcept
{
// Eternal thread doing LRU scans
eos_static_notice("%s", "msg=\"starting LRU thread\"");
gOFS->WaitUntilNamespaceIsBooted(assistant);
// Wait that current MGM becomes a master
do {
eos_static_debug("%s", "msg=\"LRU waiting for master MGM\"");
assistant.wait_for(std::chrono::seconds(10));
} while (!assistant.terminationRequested() && !gOFS->mMaster->IsMaster());
while (!assistant.terminationRequested()) {
// every now and then we wake up
Options opts = getOptions();
common::IntervalStopwatch stopwatch(opts.interval);
// Only a master needs to run LRU
if (opts.enabled && gOFS->mMaster->IsMaster()) {
performCycleQDB(assistant);
}
while (stopwatch.timeRemainingInCycle() >= std::chrono::seconds(5)) {
assistant.wait_for(std::chrono::seconds(5));
if (assistant.terminationRequested() || mRefresh) {
mRefresh = false;
break;
}
}
}
eos_static_notice("%s", "msg=\"stopped LRU thread\"");
}
//------------------------------------------------------------------------------
// Process the given directory, apply all policies
//------------------------------------------------------------------------------
void LRU::processDirectory(const std::string& dir,
eos::IContainerMD::XAttrMap& map)
{
// No LRU on "/"
if (dir == "/" || dir == "") {
return;
}
// Don't walk into the proc directory
if (dir.substr(0,gOFS->MgmProcPath.length()) == gOFS->MgmProcPath.c_str()) {
eos_static_debug("skipping proc tree %s\n", dir.c_str());
return;
}
// Sort out the individual LRU policies
if (map.count("sys.lru.expire.empty")) {
// Remove empty directories older than
AgeExpireEmpty(dir.c_str(), map["sys.lru.expire.empty"]);
}
if (map.count("sys.lru.expire.match")) {
// Files with a given match will be removed after expiration time
AgeExpire(dir.c_str(), map["sys.lru.expire.match"]);
}
if (map.count("sys.lru.lowwatermark") && map.count("sys.lru.highwatermark")) {
// If the space in this directory reaches highwatermark, files are
// cleaned up according to the LRU policy
CacheExpire(dir.c_str(), map["sys.lru.lowwatermark"],
map["sys.lru.highwatermark"]);
}
if (map.count("sys.lru.convert.match")) {
// Files with a given match/age will be automatically converted
ConvertMatch(dir.c_str(), map);
}
}
//------------------------------------------------------------------------------
// Remove empty directories if they are older than age given in policy
//------------------------------------------------------------------------------
void
LRU::AgeExpireEmpty(const char* dir, const std::string& policy)
{
struct stat buf;
eos_static_debug("dir=%s", dir);
if (!gOFS->_stat(dir, &buf, mError, mRootVid, "")) {
// check if there is any child in that directory
if (buf.st_blksize) {
eos_static_debug("dir=%s children=%d", dir, buf.st_blksize);
return;
} else {
time_t now = time(NULL);
XrdOucString sage = policy.c_str();
time_t age = StringConversion::GetSizeFromString(sage);
eos_static_debug("ctime=%u age=%u now=%u", buf.st_ctime, age, now);
if ((buf.st_ctime + age) < now) {
eos_static_notice("msg=\"delete empty directory\" path=\"%s\"", dir);
if (gOFS->_remdir(dir, mError, mRootVid, "")) {
eos_static_err("msg=\"failed to delete empty directory\" "
"path=\"%s\"", dir);
}
}
}
}
}
//------------------------------------------------------------------------------
// Remove all files in the directory older than the policy defines
//------------------------------------------------------------------------------
void
LRU::AgeExpire(const char* dir, const std::string& policy)
{
eos_static_info("msg=\"applying age deletion policy\" dir=\"%s\" age=\"%s\"",
dir, policy.c_str());
std::map lMatchAgeMap;
if (!parseExpireMatchPolicy(policy, lMatchAgeMap)) {
eos_static_err("msg=\"LRU match attribute is illegal\" val=\"%s\"",
policy.c_str());
return;
}
time_t now = time(NULL);
std::vector lDeleteList;
{
// Check the directory contents
std::shared_ptr cmd;
eos::Prefetcher::prefetchContainerMDWithChildrenAndWait(gOFS->eosView, dir);
RWMutexReadLock lock(gOFS->eosViewRWMutex);
try {
cmd = gOFS->eosView->getContainer(dir);
lock.Release();
std::shared_ptr fmd;
// Loop through all file names
for (auto it = eos::FileMapIterator(cmd); it.valid(); it.next()) {
fmd = cmd->findFile(it.key());
std::string fullpath = dir;
fullpath += fmd->getName();
eos_static_debug("check_file=\"%s\"", fullpath.c_str());
// Loop over the match map
for (auto mit = lMatchAgeMap.begin(); mit != lMatchAgeMap.end(); mit++) {
XrdOucString fname = fmd->getName().c_str();
eos_static_debug("check_rule=\"%s\" maches=%d", mit->first.c_str(),
fname.matches(mit->first.c_str()));
if (fname.matches(mit->first.c_str())) {
// Full match check the age policy
eos::IFileMD::ctime_t ctime;
fmd->getCTime(ctime);
time_t age = mit->second;
if ((ctime.tv_sec + age) < now) {
// This entry can be deleted
eos_static_notice("msg=\"delete expired file\" path=\"%s\" "
"ctime=%u policy-age=%u age=%u",
fullpath.c_str(), ctime.tv_sec, age,
now - ctime.tv_sec);
lDeleteList.push_back(fullpath);
break;
}
}
}
}
} catch (eos::MDException& e) {
errno = e.getErrno();
cmd = std::shared_ptr((eos::IContainerMD*)0);
eos_static_err("msg=\"exception\" ec=%d emsg=\"%s\"",
e.getErrno(), e.getMessage().str().c_str());
}
}
for (auto it = lDeleteList.begin(); it != lDeleteList.end(); it++) {
if (gOFS->_rem(it->c_str(), mError, mRootVid, "")) {
eos_static_err("msg=\"failed to expire file\" path=\"%s\"", it->c_str());
}
}
}
//------------------------------------------------------------------------------
// Expire the oldest files to go under the low watermark
//------------------------------------------------------------------------------
void
LRU::CacheExpire(const char* dir, std::string& lowmark, std::string& highmark)
{
eos_static_info("msg=\"applying volume deletion policy\" "
"dir=\"%s\" low-mark=\"%s\" high-mark=\"%s\"",
dir, lowmark.c_str(), highmark.c_str());
// Update space quota, return if this is not a ns quota node
if (!Quota::UpdateFromNsQuota(dir, 0, 0)) {
return;
}
// Check for project quota
auto map_quotas = Quota::GetGroupStatistics(dir, Quota::gProjectId);
long long target_volume = map_quotas[SpaceQuota::kGroupBytesTarget];
long long is_volume = map_quotas[SpaceQuota::kGroupBytesIs];
if (target_volume <= 0) {
return;
}
errno = 0;
double lwm = strtod(lowmark.c_str(), 0);
if (!lwm || errno || (lwm >= 100)) {
eos_static_err("msg=\"low watermark value is illegal - "
"must be 0 < lw < 100\" low-watermark=\"%s\"",
lowmark.c_str());
return;
}
errno = 0;
double hwm = strtod(highmark.c_str(), 0);
if (!hwm || errno || (hwm < lwm) || (hwm >= 100)) {
eos_static_err("msg = \"high watermark value is illegal - "
"must be 0 < lw < hw < 100\" "
"low_watermark=\"%s\" high-watermark=\"%s\"",
lowmark.c_str(), highmark.c_str());
return;
}
double cwm = 100.0 * is_volume / target_volume;
eos_static_debug("cwm=%.02f hwm=%.02f", cwm, hwm);
// check if we have to do cache cleanup e.g. current is over high water mark
if (cwm < hwm) {
return;
}
unsigned long long bytes_to_free = is_volume - (lwm * target_volume / 100.0);
XrdOucString sizestring;
eos_static_notice("low-mark=%.02f high-mark=%.02f current-mark=%.02f "
"deletion-bytes=%s", lwm, hwm, cwm,
StringConversion::GetReadableSizeString(sizestring, bytes_to_free, "B"));
// Build the LRU list
std::map > cachedirs;
XrdOucString stdErr;
time_t ms = 0;
// map with path/mtime pairs
std::set lru_map;
unsigned long long lru_size = 0;
if (!gOFS->_find(dir, mError, stdErr, mRootVid, cachedirs, "", "", false, ms)) {
// Loop through the result and build an LRU list
// We just keep as many entries in the LRU list to have the required
// number of bytes to free available.
for (auto dit = cachedirs.begin(); dit != cachedirs.end(); dit++) {
eos_static_debug("path=%s", dit->first.c_str());
for (auto fit = dit->second.begin(); fit != dit->second.end(); fit++) {
// build the full path name
std::string fpath = dit->first;
fpath += *fit;
struct stat buf;
eos_static_debug("path=%s", fpath.c_str());
// get the current ctime & size information
if (!gOFS->_stat(fpath.c_str(), &buf, mError, mRootVid, "")) {
if (lru_map.size())
if ((lru_size > bytes_to_free) &&
lru_map.size() &&
((--lru_map.end())->ctime < buf.st_ctime)) {
// this entry is newer than all the rest
continue;
}
// add LRU entry in front
lru_entry_t lru;
lru.path = fpath;
lru.ctime = buf.st_ctime;
lru.size = buf.st_blocks * buf.st_blksize;
lru_map.insert(lru);
lru_size += lru.size;
eos_static_debug("msg=\"adding\" file=\"%s\" "
"bytes-free=\"%llu\" lru-size=\"%llu\"",
fpath.c_str(),
bytes_to_free,
lru_size);
// check if we can shrink the LRU map
if (lru_map.size() && (lru_size > bytes_to_free)) {
while (lru_map.size() &&
((lru_size - (--lru_map.end())->size) > bytes_to_free)) {
// remove the last element of the map
auto it = lru_map.end();
it--;
// subtract the size
lru_size -= it->size;
eos_static_info("msg=\"clean-up\" path=\"%s\"", it->path.c_str());
lru_map.erase(it);
}
}
}
}
}
} else {
eos_static_err("msg=\"%s\"", stdErr.c_str());
}
eos_static_notice("msg=\"cleaning LRU cache\" files-to-delete=%llu",
lru_map.size());
// Delete starting with the 'oldest' entry until we have freed enough space
// to go under the low watermark
for (auto it = lru_map.begin(); it != lru_map.end(); it++) {
eos_static_notice("msg=\"delete LRU file\" path=\"%s\" ctime=%lu size=%llu",
it->path.c_str(),
it->ctime,
it->size);
if (gOFS->_rem(it->path.c_str(), mError, mRootVid, "")) {
eos_static_err("msg=\"failed to expire file\" "
"path=\"%s\"", it->path.c_str());
}
}
}
//------------------------------------------------------------------------------
//! Convert all files matching
//------------------------------------------------------------------------------
void
LRU::ConvertMatch(const char* dir,
eos::IContainerMD::XAttrMap& map)
{
eos_static_info("msg=\"applying match policy\" dir=\"%s\" match=\"%s\"",
dir, map["sys.lru.convert.match"].c_str());
std::map < std::string, std::string> lMatchMap;
std::map < std::string, time_t> lMatchAgeMap;
std::map < std::string, ssize_t> lMatchSizeMap;
time_t now = time(NULL);
if (!StringConversion::GetKeyValueMap(map["sys.lru.convert.match"].c_str(),
lMatchMap,
":")
) {
eos_static_err("msg=\"LRU match attribute is illegal\" val=\"%s\"",
map["sys.lru.convert.match"].c_str());
return;
}
for (auto it = lMatchMap.begin(); it != lMatchMap.end(); it++) {
std::string time_tag;
std::string size_tag;
eos::common::StringConversion::SplitKeyValue(it->second, time_tag, size_tag);
if (time_tag.empty()) {
time_tag = it->second;
}
bool size_smaller = false;
bool size_larger = false;
size_t size_limit = 0;
if (size_tag.length()) {
if (size_tag.substr(0, 1) == "<") {
size_smaller = true;
}
if (size_tag.substr(0, 1) == ">") {
size_larger = true;
}
size_tag.erase(0, 1);
if (!size_smaller && !size_larger) {
eos_static_err("msg=\"LRU match attribute has illegal size\" "
" match=\"%s\", size=\"%s\"",
it->first.c_str(),
size_tag.c_str());
} else {
size_limit = eos::common::StringConversion::GetSizeFromString(size_tag.c_str());
}
}
eos_static_info("time-tag=%s size-tag=%s <%d >%d limit=%lu", time_tag.c_str(),
size_tag.c_str(), size_smaller, size_larger, size_limit);
time_t t = eos::common::StringConversion::GetSizeFromString(time_tag.c_str());
if (errno) {
eos_static_err("msg=\"LRU match attribute has illegal age\" "
"match=\"%s\", age=\"%s\"", it->first.c_str(),
time_tag.c_str());
} else {
std::string conv_attr = "sys.conversion.";
conv_attr += it->first;
if (map.count(conv_attr)) {
lMatchAgeMap[it->first] = t;
if (size_smaller) {
lMatchSizeMap[it->first] = -size_limit;
}
if (size_larger) {
lMatchSizeMap[it->first] = +size_limit;
}
eos_static_info("rule=\"%s %u\"", it->first.c_str(), t);
} else {
eos_static_err("msg=\"LRU match attribute has no conversion "
"attribute defined\" attr-missing=\"%s\"",
conv_attr.c_str());
}
}
}
std::vector < std::pair > lConversionList;
{
// Check the directory contents
std::shared_ptr cmd;
eos::Prefetcher::prefetchContainerMDWithChildrenAndWait(gOFS->eosView, dir);
RWMutexReadLock lock(gOFS->eosViewRWMutex);
try {
cmd = gOFS->eosView->getContainer(dir);
lock.Release();
std::shared_ptr fmd;
for (auto fit = eos::FileMapIterator(cmd); fit.valid(); fit.next()) {
fmd = cmd->findFile(fit.key());
if (fmd == nullptr) {
eos_static_err("msg=\"file is null\" fxid=%08llx", fit.key().c_str());
continue;
}
std::string fullpath = dir;
fullpath += fmd->getName();
eos_static_debug("check_file=\"%s\"", fullpath.c_str());
// Loop over the match map
for (auto mit = lMatchAgeMap.begin(); mit != lMatchAgeMap.end(); mit++) {
XrdOucString fname = fmd->getName().c_str();
eos_static_debug("check_rule=\"%s\" matched=%d", mit->first.c_str(),
fname.matches(mit->first.c_str()));
if (fname.matches(mit->first.c_str())) {
// Full match check the age policy
eos::IFileMD::ctime_t ctime;
fmd->getCTime(ctime);
time_t age = mit->second;
if ((ctime.tv_sec + age) < now) {
std::string conv_attr = "sys.conversion.";
conv_attr += mit->first;
// Check if this file has already the proper layout
std::string conversion = map[conv_attr];
std::string plctplcy;
if (((int)conversion.find("|")) != STR_NPOS) {
eos::common::StringConversion::SplitKeyValue(conversion, conversion, plctplcy,
"|");
}
unsigned long long lid = strtoll(map[conv_attr].c_str(), 0, 16);
if (fmd->getLayoutId() == lid) {
eos_static_debug("msg=\"skipping conversion - file has already "
"the desired target layout\" fxid=%08llx", fmd->getId());
continue;
}
if (lMatchSizeMap.count(mit->first)) {
if (lMatchSizeMap[mit->first] < 0) {
// check that this file is smaller as the required size
if ((ssize_t)fmd->getSize() >= (-lMatchSizeMap[mit->first])) {
eos_static_debug("msg=\"skipping conversion - file is larger "
"than required\" fxid=%08llx", fmd->getId());
continue;
} else {
eos_static_info("msg=\"converting according to age+size specification\" "
"path='%s' fxid=%08llx required-size < %ld size=%ld layout:%08x :=> %08x",
fullpath.c_str(), fmd->getId(), -lMatchSizeMap[mit->first],
(ssize_t)fmd->getSize(), lid, fmd->getLayoutId());
}
}
if (lMatchSizeMap[mit->first] > 0) {
// check that this file is larger than the required size
if ((ssize_t)fmd->getSize() <= lMatchSizeMap[mit->first]) {
eos_static_debug("msg=\"skipping conversion - file is smaller "
"than required\" fxid=%08llx", fmd->getId());
continue;
} else {
eos_static_info("msg=\"converting according to age+size specification\" "
"path='%s' fxid=%08llx required-size > %ld size=%ld layout:%08x "
":=> %08x", fullpath.c_str(), fmd->getId(), lMatchSizeMap[mit->first],
(ssize_t)fmd->getSize(), lid, fmd->getLayoutId());
}
}
} else {
eos_static_info("msg=\"converting according to age specification\" path='%s' "
"fxid=%08llx layout:%08x :=> %08x", fullpath.c_str(),
fmd->getId(), lid, fmd->getLayoutId());
}
// This entry can be converted
eos_static_notice("msg=\"convert expired file\" path=\"%s\" "
"ctime=%u policy-age=%u age=%u fxid=%08llx "
"layout=\"%s\"", fullpath.c_str(), ctime.tv_sec,
age, now - ctime.tv_sec, (unsigned long long) fmd->getId(),
map[conv_attr].c_str());
lConversionList.push_back(std::make_pair(fmd->getId(), map[conv_attr]));
break;
}
}
}
}
} catch (eos::MDException& e) {
errno = e.getErrno();
cmd.reset();
eos_static_err("msg=\"exception\" ec=%d emsg=\"%s\"",
e.getErrno(), e.getMessage().str().c_str());
}
}
for (auto it = lConversionList.begin(); it != lConversionList.end(); it++) {
const eos::common::FileId::fileid_t fid = it->first;
std::string conversion = it->second;
std::string plctplcy;
if (((int)conversion.find("|")) != STR_NPOS) {
eos::common::StringConversion::SplitKeyValue(conversion, conversion, plctplcy,
"|");
plctplcy = "~" + plctplcy;
}
char conversiontagfile[1024];
std::string space;
if (map.count("user.forced.space")) {
space = map["user.forced.space"];
}
if (map.count("sys.forced.space")) {
space = map["sys.forced.space"];
}
if (map.count("sys.lru.conversion.space")) {
space = map["sys.lru.conversion.space"];
}
// the conversion value can be directory an layout env representation like
// "eos.space=...&eos.layout ..."
XrdOucEnv cenv(conversion.c_str());
if (cenv.Get("eos.space")) {
space = cenv.Get("eos.space");
}
snprintf(conversiontagfile, sizeof(conversiontagfile) - 1,
"%s/%016llx:%s#%s%s",
gOFS->MgmProcConversionPath.c_str(), it->first, space.c_str(),
conversion.c_str(), plctplcy.c_str());
std::string conv_tag = conversiontagfile;
conv_tag.erase(0, gOFS->MgmProcConversionPath.length() + 1);
// For the new converted we need to tell it explicitly that we want the
// ctime to be updated since this doesn't happen by default
conv_tag += eos::mgm::ConversionInfo::UPDATE_CTIME;
if (gOFS->mConverterDriver->ScheduleJob(fid, conv_tag)) {
eos_static_info("msg=\"LRU scheduled conversion job\" tag=\"%s\"",
conv_tag.c_str());
} else {
eos_static_err("msg=\"LRU failed to schedule conversion job\" "
"tag=\"%s\"", conv_tag.c_str());
}
}
}
EOSMGMNAMESPACE_END