//------------------------------------------------------------------------------
//! @file IProcCommand.cc
//! @author Elvin Sindrilaru - CERN
//------------------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2017 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "common/Path.hh"
#include "common/CommentLog.hh"
#include "mgm/XrdMgmOfs.hh"
#include "mgm/proc/IProcCommand.hh"
#include "mgm/proc/ProcInterface.hh"
#include "mgm/Macros.hh"
#include "namespace/interface/IView.hh"
#include "json/json.h"
#include
EOSMGMNAMESPACE_BEGIN
std::atomic_uint_least64_t IProcCommand::uuid{0};
std::mutex IProcCommand::mMapCmdsMutex;
std::map
IProcCommand::mCmdsExecuting;
//------------------------------------------------------------------------------
// Open a proc command e.g. call the appropriate user or admin command and
// store the output in a resultstream or in case of find in a temporary output
// file.
//------------------------------------------------------------------------------
int
IProcCommand::open(const char* path, const char* info,
eos::common::VirtualIdentity& vid,
XrdOucErrInfo* error)
{
// @todo (esindril): configure delay based on the type of command
int delay = 5;
if (!mExecRequest) {
if (HasSlot()) {
LaunchJob();
mExecRequest = true;
} else {
eos_notice("%s", SSTR("cmd_type=" << mReqProto.command_case() <<
" no more slots, stall client 3 seconds").c_str());
return delay - 2;
}
}
if (mFuture.wait_for(std::chrono::seconds(delay)) !=
std::future_status::ready) {
// Stall the client
std::string msg = "command not ready, stall the client 5 seconds";
eos_notice("%s", msg.c_str());
error->setErrInfo(0, msg.c_str());
return delay;
} else {
eos::console::ReplyProto reply = mFuture.get();
// Routing redirect encountered
if (reply.retc() == SFS_REDIRECT) {
eos_notice("msg=\"routing redirect\" path=%s hostport=%s:%d "
"stall_timeout=%d", mRoutingInfo.path.c_str(),
mRoutingInfo.host.c_str(), mRoutingInfo.port,
mRoutingInfo.stall_timeout);
if (mRoutingInfo.stall_timeout) {
// Force re-execution of the command upon return from stall
mExecRequest = false;
std::string stall_msg = "No master MGM available";
return gOFS->Stall(*error, mRoutingInfo.stall_timeout,
stall_msg.c_str());
}
return gOFS->Redirect(*error, mRoutingInfo.host.c_str(),
mRoutingInfo.port);
}
// Output is written in file
if (!ofstdoutStreamFilename.empty() && !ofstderrStreamFilename.empty()) {
ifstdoutStream.open(ofstdoutStreamFilename, std::ifstream::in);
ifstderrStream.open(ofstderrStreamFilename, std::ifstream::in);
iretcStream.str(std::string("&mgm.proc.retc=") + std::to_string(reply.retc()));
readStdOutStream = true;
} else {
std::ostringstream oss;
if (mReqProto.format() == eos::console::RequestProto::FUSE) {
// The proto dumpmd issued by the FST uses the FUSE format
// (resync metadata, background Fsck and standalone Fsck)
// @todo This format should be dropped once Quarkdb migration is complete
// and the NS will be queried directly
oss << reply.std_out();
} else {
oss << "mgm.proc.stdout=" << reply.std_out().c_str()
<< "&mgm.proc.stderr=" << reply.std_err().c_str()
<< "&mgm.proc.retc=" << reply.retc();
}
mTmpResp = oss.str();
}
// Store the client's command comment in the comments logbook
if ((vid.uid <= 2) || (vid.sudoer)) {
// Only instance users or sudoers can add to the logbook
if (mComment.length() && gOFS->mCommentLog) {
std::string argsJson;
(void) google::protobuf::util::MessageToJsonString(mReqProto, &argsJson);
if (!gOFS->mCommentLog->Add(mTimestamp, "", "", argsJson.c_str(),
mComment.c_str(), stdErr.c_str(), // @note stErr or reply.std_err()?
reply.retc())) {
eos_err("failed to log to comments logbook");
}
}
}
}
return SFS_OK;
}
//------------------------------------------------------------------------------
// Read a part of the result stream created during open
//------------------------------------------------------------------------------
size_t
IProcCommand::read(XrdSfsFileOffset offset, char* buff, XrdSfsXferSize blen)
{
size_t cpy_len = 0;
if (readStdOutStream && ifstdoutStream.is_open() && ifstderrStream.is_open()) {
ifstdoutStream.read(buff, blen);
cpy_len = (size_t)ifstdoutStream.gcount();
if (cpy_len < (size_t)blen) {
readStdOutStream = false;
readStdErrStream = true;
ifstderrStream.read(buff + cpy_len, blen - cpy_len);
cpy_len += (size_t)ifstderrStream.gcount();
}
} else if (readStdErrStream && ifstderrStream.is_open()) {
ifstderrStream.read(buff, blen);
cpy_len = (size_t)ifstderrStream.gcount();
if (cpy_len < (size_t)blen) {
readStdErrStream = false;
readRetcStream = true;
iretcStream.read(buff + cpy_len, blen - cpy_len);
cpy_len += (size_t)iretcStream.gcount();
}
} else if (readRetcStream) {
iretcStream.read(buff, blen);
cpy_len = (size_t)iretcStream.gcount();
if (cpy_len < (size_t)blen) {
readRetcStream = false;
}
} else if ((size_t)offset < mTmpResp.length()) {
cpy_len = std::min((size_t)(mTmpResp.size() - offset), (size_t)blen);
memcpy(buff, mTmpResp.data() + offset, cpy_len);
}
return cpy_len;
}
//------------------------------------------------------------------------------
// Launch command asynchronously, creating the corresponding promise and future
//------------------------------------------------------------------------------
void
IProcCommand::LaunchJob()
{
if (mDoAsync) {
mFuture = ProcInterface::sProcThreads.PushTask
([this]() -> eos::console::ReplyProto {
return ProcessRequest();
});
if (EOS_LOGS_DEBUG) {
eos_debug("%s", ProcInterface::sProcThreads.GetInfo().c_str());
}
} else {
std::promise promise;
mFuture = promise.get_future();
promise.set_value(ProcessRequest());
}
}
//------------------------------------------------------------------------------
// Check if we can safely delete the current object as there is no async
// thread executing the ProcessResponse method
//------------------------------------------------------------------------------
bool
IProcCommand::KillJob()
{
bool is_killed = true;
if (mDoAsync) {
mForceKill.store(true);
if (mFuture.valid()) {
is_killed = (mFuture.wait_for(std::chrono::seconds(0)) ==
std::future_status::ready);
}
}
return is_killed;
}
//------------------------------------------------------------------------------
// Open temporary output files for file based results
//------------------------------------------------------------------------------
bool
IProcCommand::OpenTemporaryOutputFiles()
{
ostringstream tmpdir;
tmpdir << "/var/tmp/eos/mgm/";
tmpdir << uuid++;
ofstdoutStreamFilename = tmpdir.str();
ofstdoutStreamFilename += ".stdout";
ofstderrStreamFilename = tmpdir.str();
ofstderrStreamFilename += ".stderr";
eos::common::Path cPath(ofstdoutStreamFilename.c_str());
if (!cPath.MakeParentPath(S_IRWXU)) {
eos_err("Unable to create temporary outputfile directory %s",
tmpdir.str().c_str());
return false;
}
// own the directory by daemon
if (::chown(cPath.GetParentPath(), 2, 2)) {
eos_err("Unable to own temporary outputfile directory %s",
cPath.GetParentPath());
}
ofstdoutStream.open(ofstdoutStreamFilename, std::ofstream::out);
ofstderrStream.open(ofstderrStreamFilename, std::ofstream::out);
if ((!ofstdoutStream) || (!ofstderrStream)) {
if (ofstdoutStream.is_open()) {
ofstdoutStream.close();
}
if (ofstderrStream.is_open()) {
ofstderrStream.close();
}
return false;
}
ofstdoutStream << "mgm.proc.stdout=";
ofstderrStream << "&mgm.proc.stderr=";
return true;
}
//------------------------------------------------------------------------------
// Open temporary output files for file based results
//------------------------------------------------------------------------------
bool
IProcCommand::CloseTemporaryOutputFiles()
{
ofstdoutStream.close();
ofstderrStream.close();
return !(ofstdoutStream.is_open() || ofstderrStream.is_open());
}
//------------------------------------------------------------------------------
// Format console output string as json
//------------------------------------------------------------------------------
Json::Value
IProcCommand::ConvertOutputToJsonFormat(const std::string& stdOut)
{
using eos::common::StringConversion;
std::stringstream ss(stdOut);
Json::Value jsonOut;
std::string line;
do {
Json::Value jsonEntry;
line.clear();
if (!std::getline(ss, line)) {
break;
}
if (!line.length()) {
continue;
}
XrdOucString sline = line.c_str();
while (sline.replace("", "n")) {}
while (sline.replace("?configstatus@rw", "_rw")) {}
line = sline.c_str();
std::map map;
StringConversion::GetKeyValueMap(line.c_str(), map, "=", " ");
// These values violate the JSON hierarchy and have to be rewritten
StringConversion::ReplaceMapKey(map, "cfg.balancer",
"cfg.balancer.status");
StringConversion::ReplaceMapKey(map, "cfg.geotagbalancer",
"cfg.geotagbalancer.status");
StringConversion::ReplaceMapKey(map, "cfg.geobalancer",
"cfg.geobalancer.status");
StringConversion::ReplaceMapKey(map, "cfg.groupbalancer",
"cfg.groupbalancer.status");
StringConversion::ReplaceMapKey(map, "geotagbalancer",
"geotagbalancer.status");
StringConversion::ReplaceMapKey(map, "geobalancer",
"geobalancer.status");
StringConversion::ReplaceMapKey(map, "groupbalancer",
"groupbalancer.status");
StringConversion::ReplaceMapKey(map, "cfg.wfe", "cfg.wfe.status");
StringConversion::ReplaceMapKey(map, "cfg.lru", "cfg.lru.status");
StringConversion::ReplaceMapKey(map, "local.drain", "local.drain.status");
StringConversion::ReplaceMapKey(map, "stat.health", "stat.health.status");
StringConversion::ReplaceMapKey(map, "wfe", "wfe.status");
StringConversion::ReplaceMapKey(map, "lru", "lru.status");
StringConversion::ReplaceMapKey(map, "balancer", "balancer.status");
StringConversion::ReplaceMapKey(map, "converter", "converter.status");
for (auto& it : map) {
std::vector token;
char* conv;
errno = 0;
StringConversion::Tokenize(it.first, token, ".");
double val = strtod(it.second.c_str(), &conv);
std::string value;
if (token.empty()) {
continue;
}
if (it.second.length()) {
value = it.second;
} else {
value = "NULL";
}
auto* jep = &(jsonEntry[token[0]]);
for (int i = 1; i < (int)token.size(); i++) {
jep = &((*jep)[token[i]]);
}
// Unquote value
std::stringstream quoted_ss(value);
quoted_ss >> std::quoted(value);
// Seal value
XrdOucString svalue = value.c_str();
eos::common::StringConversion::Seal(svalue);
value = svalue.c_str();
if (errno || (!val && (conv == it.second.c_str())) ||
((conv - it.second.c_str()) != (long long)it.second.length())) {
// non numeric
(*jep) = value;
} else {
// numeric
(*jep) = val;
}
}
jsonOut.append(jsonEntry);
} while (true);
return jsonOut;
}
//------------------------------------------------------------------------------
// Create a JSON string from the command output, error and return code
//------------------------------------------------------------------------------
std::string
IProcCommand::ResponseToJsonString(const std::string& out,
const std::string& err, int rc)
{
Json::Value json;
try {
json["result"] = ConvertOutputToJsonFormat(out);
json["errormsg"] = err;
json["retc"] = std::to_string(rc);
} catch (Json::Exception& e) {
eos_err("Json conversion exception cmd_type=%s emsg=\"%s\"",
SSTR(mReqProto.command_case()).c_str(), e.what());
json["errormsg"] = "illegal string in json conversion";
json["retc"] = std::to_string(EFAULT);
}
return SSTR(json);
}
//------------------------------------------------------------------------------
// Retrieve the file's full path given its numeric id
//------------------------------------------------------------------------------
// drop when we drop non-proto commands using it
void
IProcCommand::GetPathFromFid(XrdOucString& path, unsigned long long fid,
const std::string& err_msg_prefix)
{
std::string serr;
std::string spath(path.c_str());
retc = GetPathFromFid(spath, fid, serr);
path = spath.c_str();
stdErr = serr.c_str();
}
int
IProcCommand::GetPathFromFid(std::string& path, unsigned long long fid,
std::string& err_msg)
{
if (path.empty()) {
if (fid == 0ULL) {
err_msg += "error: fid is 0";
return EINVAL;
}
try {
eos::common::RWMutexReadLock ns_rd_lock(gOFS->eosViewRWMutex);
std::string temp = gOFS->eosView->getUri(gOFS->eosFileService->getFileMD(
fid).get());
path = temp;
return 0;
} catch (eos::MDException& e) {
errno = e.getErrno();
eos_debug("caught exception %d %s\n", e.getErrno(),
e.getMessage().str().c_str());
err_msg = "error: " + e.getMessage().str() + '\n';
return errno;
}
}
return EINVAL;
}
//------------------------------------------------------------------------------
// Retrieve the container's full path given its numeric id
//------------------------------------------------------------------------------
// drop when we drop non-proto commands using it
void
IProcCommand::GetPathFromCid(XrdOucString& path, unsigned long long cid,
const std::string& err_msg_prefix)
{
std::string serr;
std::string spath(path.c_str());
retc = GetPathFromCid(spath, cid, serr);
path = spath.c_str();
stdErr = serr.c_str();
}
int
IProcCommand::GetPathFromCid(std::string& path, unsigned long long cid,
std::string& err_msg)
{
if (path.empty()) {
if (cid == 0ULL) {
err_msg += "error: cid is 0";
return EINVAL;
}
try {
eos::common::RWMutexReadLock ns_rd_lock(gOFS->eosViewRWMutex);
std::string temp = gOFS->eosView->getUri
(gOFS->eosDirectoryService->getContainerMD(cid).get());
path = temp;
return 0;
} catch (eos::MDException& e) {
errno = e.getErrno();
eos_debug("caught exception %d %s\n", e.getErrno(),
e.getMessage().str().c_str());
err_msg = "error: " + e.getMessage().str() + '\n';
return errno;
}
}
return EINVAL;
}
//------------------------------------------------------------------------------
// Check if operation forbidden
//------------------------------------------------------------------------------
bool
IProcCommand::IsOperationForbidden(const std::string& path,
const eos::common::VirtualIdentity& vid,
std::string& err_check, int& errno_check) const
{
if (eos::mgm::ProcBounceIllegalNames(path, err_check, errno_check) ||
eos::mgm::ProcBounceNotAllowed(path, mVid, err_check, errno_check)) {
return true;
}
return false;
}
//----------------------------------------------------------------------------
// Fill routing information if a routing redirect should happen
//----------------------------------------------------------------------------
bool
IProcCommand::ShouldRoute(const std::string& path,
eos::console::ReplyProto& reply)
{
eos_debug("msg=\"applying routing\" path=%s is_redirect=%d",
path.c_str(), gOFS->IsRedirect);
if (gOFS->IsRedirect) {
if (gOFS->ShouldRoute(__FUNCTION__, 0, mVid, path.c_str(), 0,
mRoutingInfo.host, mRoutingInfo.port,
mRoutingInfo.stall_timeout)) {
mRoutingInfo.path = path;
reply.set_retc(SFS_REDIRECT);
return true;
}
}
return false;
}
//------------------------------------------------------------------------------
// Check if there is still an available slot for the current type of command
//------------------------------------------------------------------------------
bool
IProcCommand::HasSlot()
{
static const uint64_t slot_limit {50};
std::unique_lock lock(mMapCmdsMutex);
auto it = mCmdsExecuting.find(mReqProto.command_case());
if (it == mCmdsExecuting.end()) {
mCmdsExecuting[mReqProto.command_case()] = 1;
mHasSlot = true;
} else {
if (it->second >= slot_limit) {
return false;
} else {
++it->second;
mHasSlot = true;
}
}
return true;
}
EOSMGMNAMESPACE_END