//------------------------------------------------------------------------------
// File: ConvertCmd.cc
// Author: Mihai Patrascoiu - CERN
//------------------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2019 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "ConvertCmd.hh"
#include "mgm/XrdMgmOfs.hh"
#include "mgm/Scheduler.hh"
#include "mgm/FsView.hh"
#include "mgm/convert/ConverterDriver.hh"
#include "namespace/interface/IView.hh"
#include "namespace/interface/IFileMD.hh"
#include "namespace/interface/IContainerMD.hh"
#include "common/table_formatter/TableFormatterBase.hh"
#include
EOSMGMNAMESPACE_BEGIN
// Helper function forward declaration
static int CheckValidPath(const char*, eos::common::VirtualIdentity&,
std::string&, XrdSfsFileExistence = XrdSfsFileExistNo);
static std::string BuildConversionId(const std::string&,
eos::common::LayoutId::eChecksum,
int,
eos::common::FileId::fileid_t file_id,
const std::string&,
const std::string& = "");
//------------------------------------------------------------------------------
// Method implementing the specific behaviour of the command executed
//------------------------------------------------------------------------------
eos::console::ReplyProto
ConvertCmd::ProcessRequest() noexcept
{
eos::console::ReplyProto reply;
const eos::console::ConvertProto& convert = mReqProto.convert();
const auto& subcmd = convert.subcmd_case();
bool jsonOutput =
(mReqProto.format() == eos::console::RequestProto::JSON);
if (!gOFS->mConverterDriver) {
reply.set_std_err("error: converter engine is not enabled");
reply.set_retc(ENOTSUP);
return reply;
}
if (subcmd == eos::console::ConvertProto::kAction) {
ActionSubcmd(convert.action(), reply);
} else if (subcmd == eos::console::ConvertProto::kStatus) {
StatusSubcmd(convert.status(), reply, jsonOutput);
} else if (subcmd == eos::console::ConvertProto::kConfig) {
ConfigSubcmd(convert.config(), reply, jsonOutput);
} else if (subcmd == eos::console::ConvertProto::kFile) {
FileSubcmd(convert.file(), reply, jsonOutput);
} else if (subcmd == eos::console::ConvertProto::kRule) {
RuleSubcmd(convert.rule(), reply, jsonOutput);
} else if (subcmd == eos::console::ConvertProto::kList) {
ListSubcmd(convert.list(), reply, jsonOutput);
} else if (subcmd == eos::console::ConvertProto::kClear) {
// check if vid has admin permissions (root, sudoer, admin user, admin group)
if (!vid.uid || vid.sudoer || vid.hasUid(3) || vid.hasGid(4)) {
ClearSubcmd(convert.clear(), reply);
} else {
reply.set_retc(EPERM);
reply.set_std_err("error: you have to take role 'root' to execute this command");
}
} else {
reply.set_retc(EINVAL);
reply.set_std_err("error: command not supported");
}
return reply;
}
//------------------------------------------------------------------------------
// Execute action subcommand
//------------------------------------------------------------------------------
void ConvertCmd::ActionSubcmd(
const eos::console::ConvertProto_ActionProto& action,
eos::console::ReplyProto& reply)
{
std::ostringstream out;
auto converter_action = action.action();
if (converter_action == eos::console::ConvertProto_ActionProto::ENABLE) {
gOFS->mConverterDriver->Start();
out << "converter engine started";
} else {
gOFS->mConverterDriver->Stop();
out << "converter engine stopped";
}
reply.set_std_out(out.str());
}
//------------------------------------------------------------------------------
// Execute status subcommand
//------------------------------------------------------------------------------
void ConvertCmd::StatusSubcmd(
const eos::console::ConvertProto_StatusProto& status,
eos::console::ReplyProto& reply,
bool jsonOutput)
{
std::ostringstream out;
// Lambda function to parse threadpool information
auto parseKeyValueString = [](std::string skeyvalue) -> Json::Value {
using eos::common::StringConversion;
std::map map;
Json::Value json;
if (StringConversion::GetKeyValueMap(skeyvalue.c_str(),
map, "=", " "))
{
for (const auto& it : map) {
json[it.first] = map[it.first];
}
}
return json;
};
// Extract Converter Driver parameters
std::string threadpool = gOFS->mConverterDriver->GetThreadPoolInfo();
std::string config =
SSTR("maxthreads=" << gOFS->mConverterDriver->GetMaxThreadPoolSize()
<< " maxqueuesize=" << gOFS->mConverterDriver->GetMaxQueueSize());
uint64_t running = gOFS->mConverterDriver->NumRunningJobs();
uint64_t failed = gOFS->mConverterDriver->NumQdbFailedJobs();
int64_t pending = gOFS->mConverterDriver->NumPendingJobs();
auto state = gOFS->mConverterDriver->IsRunning() ? "enabled" : "disabled";
if (jsonOutput) {
Json::Value json;
json["threadpool"] = parseKeyValueString(threadpool.c_str());
json["config"] = parseKeyValueString(config.c_str());
json["status"] = state;
json["running"] = (Json::Value::UInt64) running;
json["pending"] = (Json::Value::UInt64) pending;
json["failed"] = (Json::Value::UInt64) failed;
Json::StreamWriterBuilder builder;
std::unique_ptr jsonwriter(
builder.newStreamWriter());
jsonwriter->write(json, &out);
} else {
out << "Status: " << state << std::endl
<< "Config: " << config << std::endl
<< "Threadpool: " << threadpool << std::endl
<< "Running jobs: " << running << std::endl
<< "Pending jobs: " << pending << std::endl
<< "Total failed jobs : " << failed << std::endl;
}
reply.set_std_out(out.str());
}
//------------------------------------------------------------------------------
// Execute config subcommand
//------------------------------------------------------------------------------
void ConvertCmd::ConfigSubcmd(
const eos::console::ConvertProto_ConfigProto& config,
eos::console::ReplyProto& reply,
bool jsonOutput)
{
using output_map = std::map;
std::ostringstream out;
std::ostringstream err;
output_map output;
int retc = 0;
if (config.maxthreads() != 0) {
if (config.maxthreads() > 5000) {
err << "error: maxthreads value " << config.maxthreads()
<< " above 5000 limit" << std::endl;
retc = EINVAL;
} else {
gOFS->mConverterDriver->SetMaxThreadPoolSize(config.maxthreads());
output["maxthreads"] = std::to_string(config.maxthreads());
}
}
if (config.maxqueuesize()) {
gOFS->mConverterDriver->SetMaxQueueSize(config.maxqueuesize());
output["maxqueuesize"] = std::to_string(config.maxqueuesize());
}
if (output.empty()) {
err << "error: no config values given" << std::endl;
retc = ENODATA;
} else if (jsonOutput) {
Json::Value json;
for (auto it = output.begin(); it != output.end(); it++) {
json[it->first] = it->second;
}
Json::StreamWriterBuilder builder;
std::unique_ptr jsonwriter(
builder.newStreamWriter());
jsonwriter->write(json, &out);
} else {
out << "Config values updated:" << std::endl;
for (auto it = output.begin(); it != output.end(); it++) {
out << it->first << "=" << it->second << std::endl;
}
}
reply.set_std_out(out.str());
reply.set_std_err(err.str());
reply.set_retc(retc);
}
//------------------------------------------------------------------------------
// Execute file subcommand
//------------------------------------------------------------------------------
void ConvertCmd::FileSubcmd(const eos::console::ConvertProto_FileProto& file,
eos::console::ReplyProto& reply,
bool jsonOutput)
{
using eos::common::LayoutId;
std::ostringstream out;
std::ostringstream err;
std::string errmsg;
int retc = 0;
auto enforce_file = XrdSfsFileExistence::XrdSfsFileExistIsFile;
std::string path = PathFromIdentifierProto(file.identifier(), errmsg);
if (!path.length()) {
reply.set_std_err(errmsg);
reply.set_retc(errno);
return;
}
if ((retc = CheckValidPath(path.c_str(), mVid, errmsg, enforce_file))) {
reply.set_std_err(errmsg);
reply.set_retc(retc);
return;
}
// Extract file id, layout id and replica location
eos::IFileMD::id_t file_id = 0;
eos::IFileMD::layoutId_t file_layoutid = 0;
eos::IFileMD::location_t replica_location = 0;
try {
eos::common::RWMutexReadLock vlock(gOFS->eosViewRWMutex);
auto fmd = gOFS->eosView->getFile(path).get();
file_id = fmd->getId();
file_layoutid = fmd->getLayoutId();
replica_location = fmd->getLocations().at(0);
} catch (eos::MDException& e) {
eos_debug("msg=\"exception retrieving file metadata\" path=%s "
"ec=%d emsg=\"%s\"", path.c_str(), e.getErrno(),
e.getMessage().str().c_str());
err << "error: failed to retrieve file metadata '" << path << "'";
reply.set_std_err(err.str());
reply.set_retc(e.getErrno());
return;
} catch (const std::out_of_range& e) {
eos_static_err("msg=\"exception getting replica locations\" path=%s "
"emsg=\"%s\"", path.c_str(), e.what());
err << "error: files without replicas can not be converted";
reply.set_std_err(err.str());
reply.set_retc(EINVAL);
return;
}
// Handle conversion space
const auto& conversion = file.conversion();
if ((retc = CheckConversionProto(conversion, errmsg))) {
reply.set_std_err(errmsg);
reply.set_retc(retc);
return;
}
std::string space = conversion.space();
if (space.empty()) {
eos::common::RWMutexReadLock lock(FsView::gFsView.ViewMutex);
auto filesystem = FsView::gFsView.mIdView.lookupByID(replica_location);
if (filesystem) {
space = filesystem->GetString("schedgroup");
} else {
err << "error: unable to retrieve "
<< "filesystem location for '" << path << "'";
reply.set_std_err(err.str());
reply.set_retc(EINVAL);
return;
}
}
// Handle checksum
LayoutId::eChecksum echecksum;
if (conversion.checksum().length()) {
echecksum = static_cast
(LayoutId::GetChecksumFromString(conversion.checksum()));
} else {
echecksum = static_cast
(LayoutId::GetChecksum(file_layoutid));
}
// Schedule conversion job
std::string conversion_id = BuildConversionId(conversion.layout(), echecksum,
conversion.replica(), file_id,
space, conversion.placement());
eos_info("msg=\"scheduling conversion job\" path=%s conversion_id=%s",
path.c_str(), conversion_id.c_str());
if (!gOFS->mConverterDriver->ScheduleJob(file_id, conversion_id)) {
err << "error: unable to push conversion job '" << conversion_id.c_str()
<< "' to QuarkDB";
reply.set_std_err(err.str());
reply.set_retc(EIO);
return;
}
if (jsonOutput) {
Json::Value json;
json["conversion_id"] = conversion_id;
json["path"] = path;
json["space"] = space;
json["checksum"] = LayoutId::GetChecksumString(echecksum);
Json::StreamWriterBuilder builder;
std::unique_ptr jsonwriter(
builder.newStreamWriter());
jsonwriter->write(json, &out);
} else {
out << "Scheduled conversion job: " << conversion_id;
}
reply.set_std_out(out.str());
}
//------------------------------------------------------------------------------
// Execute rule subcommand
//------------------------------------------------------------------------------
void ConvertCmd::RuleSubcmd(const eos::console::ConvertProto_RuleProto& rule,
eos::console::ReplyProto& reply,
bool jsonOutput)
{
using eos::common::LayoutId;
auto conversion = rule.conversion();
XrdOucErrInfo errInfo;
std::ostringstream out;
std::ostringstream err;
std::string errmsg;
std::string path;
int retc = 0;
auto enforce_dir = XrdSfsFileExistence::XrdSfsFileExistIsDirectory;
path = PathFromIdentifierProto(rule.identifier(), errmsg);
if (!path.length()) {
reply.set_std_err(errmsg);
reply.set_retc(errno);
return;
}
if ((retc = CheckValidPath(path.c_str(), mVid, errmsg, enforce_dir))) {
reply.set_std_err(errmsg);
reply.set_retc(retc);
return;
}
if ((retc = CheckConversionProto(conversion, errmsg))) {
reply.set_std_err(errmsg);
reply.set_retc(retc);
return;
}
if (conversion.checksum().empty()) {
err << "error: no conversion checksum provided";
reply.set_std_err(err.str());
reply.set_retc(EINVAL);
return;
}
// Handle space default scenario
std::string space = conversion.space().empty() ?
"default.0" : conversion.space();
// Handle checksum
LayoutId::eChecksum echecksum = static_cast(
LayoutId::GetChecksumFromString(conversion.checksum()));
//------------------------------------------
// This part acts as a placeholder
//------------------------------------------
// Build conversion rule
std::string conversion_rule = BuildConversionId(conversion.layout(), echecksum,
conversion.replica(), 0, space,
conversion.placement());
size_t pos = conversion_rule.find(":");
if (pos != std::string::npos) {
conversion_rule.erase(0, pos + 1);
}
// Set rule as extended attribute
eos_info("msg=\"placing conversion rule\" path=%s conversion_rule=%s",
path.c_str(), conversion_rule.c_str());
if (gOFS->_attr_set(path.c_str(), errInfo, mVid, 0, "sys.eos.convert.rule",
conversion_rule.c_str())) {
err << "error: could not set conversion rule '" << conversion_rule
<< "' on path '" << path << "' -- emsg=" << errInfo.getErrText();
reply.set_std_err(err.str());
reply.set_retc(errInfo.getErrInfo());
return;
}
if (jsonOutput) {
Json::Value json;
json["conversion_rule"] = conversion_rule;
json["path"] = path;
Json::StreamWriterBuilder builder;
std::unique_ptr jsonwriter(
builder.newStreamWriter());
jsonwriter->write(json, &out);
} else {
out << "Set conversion rule '" << conversion_rule
<< "' on path '" << path << "'";
}
reply.set_std_out(out.str());
}
//------------------------------------------------------------------------------
// List jobs subcommand
//------------------------------------------------------------------------------
void
ConvertCmd::ListSubcmd(const eos::console::ConvertProto_ListProto& list,
eos::console::ReplyProto& reply, bool jsonOutput)
{
std::ostringstream oss;
if (list.type() == "pending") {
auto pending = gOFS->mConverterDriver->GetPendingJobs();
if (pending.empty()) {
reply.set_std_out("info: no pending conversions");
return;
}
if (jsonOutput) {
Json::Value json;
for (const auto& elem : pending) {
json[std::to_string(elem.first)] = elem.second;
}
Json::StreamWriterBuilder builder;
std::unique_ptr jsonwriter(
builder.newStreamWriter());
jsonwriter->write(json, &oss);
} else {
TableFormatterBase table;
TableHeader header;
TableData body;
header.push_back(std::make_tuple("Fxid", 10, "-s"));
header.push_back(std::make_tuple("Conversion string", 0, "-s"));
table.SetHeader(header);
for (const auto& elem : pending) {
TableRow row;
row.emplace_back(eos::common::FileId::Fid2Hex(elem.first), "-s");
row.emplace_back(elem.second, "-s");
body.push_back(row);
}
table.AddRows(body);
oss << table.GenerateTable();
}
} else if (list.type() == "failed") {
auto failed = gOFS->mConverterDriver->GetFailedJobs();
if (failed.empty()) {
reply.set_std_out("info: no failed conversions");
return;
}
if (jsonOutput) {
Json::Value json;
for (const auto& elem : failed) {
json[elem.first] = elem.second;
}
Json::StreamWriterBuilder builder;
std::unique_ptr jsonwriter(
builder.newStreamWriter());
jsonwriter->write(json, &oss);
} else {
TableFormatterBase table;
TableHeader header;
TableData body;
header.push_back(std::make_tuple("Conversion string", 0, "-s"));
header.push_back(std::make_tuple("Failure", 80, "-s"));
table.SetHeader(header);
for (const auto& elem : failed) {
TableRow row;
std::string err_msg {elem.second};
std::replace(err_msg.begin(), err_msg.end(), '\0', ';');
row.emplace_back(elem.first, "-s");
row.emplace_back(err_msg, "-s");
body.push_back(row);
}
table.AddRows(body);
oss << table.GenerateTable();
}
}
if (!oss.str().empty()) {
reply.set_std_out(oss.str());
}
}
//------------------------------------------------------------------------------
// Clear jobs subcommand
//------------------------------------------------------------------------------
void
ConvertCmd::ClearSubcmd(const eos::console::ConvertProto_ClearProto& clear,
eos::console::ReplyProto& reply)
{
if (clear.type() == "pending") {
gOFS->mConverterDriver->ClearPendingJobs();
} else if (clear.type() == "failed") {
gOFS->mConverterDriver->ClearFailedJobs();
}
reply.set_std_out("info: list cleared");
}
//------------------------------------------------------------------------------
// Translate the identifier proto into a namespace path
//------------------------------------------------------------------------------
std::string ConvertCmd::PathFromIdentifierProto(
const eos::console::ConvertProto_IdentifierProto& identifier,
std::string& err_msg)
{
using eos::console::ConvertProto;
const auto& type = identifier.Identifier_case();
std::string path = "";
if (type == ConvertProto::IdentifierProto::kPath) {
path = identifier.path().c_str();
} else if (type == ConvertProto::IdentifierProto::kFileId) {
GetPathFromFid(path, identifier.fileid(), err_msg);
} else if (type == ConvertProto::IdentifierProto::kContainerId) {
GetPathFromCid(path, identifier.containerid(), err_msg);
} else {
err_msg = "error: received empty string path";
}
return path;
}
//------------------------------------------------------------------------------
// Check that the given proto conversion is valid
//------------------------------------------------------------------------------
int ConvertCmd::CheckConversionProto(
const eos::console::ConvertProto_ConversionProto& conversion,
std::string& err_msg)
{
using eos::common::LayoutId;
if (LayoutId::GetLayoutFromString(conversion.layout()) == -1) {
err_msg = "error: invalid conversion layout";
return EINVAL;
}
if (conversion.replica() < 1 || conversion.replica() > 32) {
err_msg = "error: invalid replica number (must be between 1 and 32)";
return EINVAL;
}
if (conversion.checksum().length()) {
auto xs_id = LayoutId::GetChecksumFromString(conversion.checksum());
if ((xs_id == -1) || (xs_id == LayoutId::eChecksum::kNone)) {
err_msg = "error: invalid conversion checksum";
return EINVAL;
}
}
if (conversion.placement().length() &&
eos::mgm::Scheduler::PlctPolicyFromString(conversion.placement()) == -1) {
err_msg = "error: invalid conversion placement policy";
return EINVAL;
}
return 0;
}
//------------------------------------------------------------------------------
//! Check that the given path points to a valid entry
//!
//! @param path the path to check
//! @param vid virtual identity of the client
//! @param err_msg string to place error message
//! @param enforce_type expected entry type (file or directory)
//!
//! @return 0 if path is valid, error code otherwise
//------------------------------------------------------------------------------
static int CheckValidPath(const char* path,
eos::common::VirtualIdentity& vid,
std::string& err_msg,
XrdSfsFileExistence enforce_type)
{
XrdSfsFileExistence fileExists;
XrdOucErrInfo errInfo;
// Check for path existence
if (gOFS->_exists(path, fileExists, errInfo, vid)) {
err_msg = "error: unable to check for path existence";
return errInfo.getErrInfo();
}
if (fileExists == XrdSfsFileExistNo) {
err_msg = "error: path does not point to a valid entry";
return EINVAL;
} else if ((fileExists != XrdSfsFileExistIsFile) &&
(fileExists != XrdSfsFileExistIsDirectory)) {
err_msg = "error: path does not point to a file or container";
return EINVAL;
}
if ((enforce_type != XrdSfsFileExistNo) && (fileExists != enforce_type)) {
std::string type = (fileExists == XrdSfsFileExistIsFile) ?
"file" : "directory";
err_msg = "error: path must point to a " + type;
return EINVAL;
}
return 0;
}
//------------------------------------------------------------------------------
//! Build and return a conversion id from the provided arguments
//!
//! @param layout the conversion layout
//! @param echecksum the conversion checksum
//! @param stripes the conversion number of stripes
//! @param file_id the conversion file id
//! @paran space the conversion target space
//! @param placement the conversion placement type
//!
//! @return 0 if path is valid, error code otherwise
//------------------------------------------------------------------------------
static std::string BuildConversionId(const std::string& layout,
eos::common::LayoutId::eChecksum echecksum,
int stripes,
eos::common::FileId::fileid_t file_id,
const std::string& space,
const std::string& placement)
{
using eos::common::LayoutId;
unsigned long layoutid = 0;
layoutid = LayoutId::GetId(LayoutId::GetLayoutFromString(layout),
echecksum,
stripes,
LayoutId::eBlockSize::k4M,
LayoutId::eChecksum::kCRC32C,
0, // excess replicas
LayoutId::GetRedundancyFromLayoutString(layout));
char buff[4096];
snprintf(buff, std::size(buff), "%016llx:%s#%08lx",
file_id, space.c_str(), layoutid);
std::string conversion {buff};
if (!placement.empty()) { // ~
conversion += "~";
conversion += placement;
}
return conversion;
}
EOSMGMNAMESPACE_END