//------------------------------------------------------------------------------
//! @file PrepareManager.cc
//! @author Cedric Caffy - CERN
//------------------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2017 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "PrepareManager.hh"
#include "common/Constants.hh"
#include "mgm/Stat.hh"
#include "mgm/EosCtaReporter.hh"
#include "mgm/bulk-request/response/QueryPrepareResponse.hh"
#include
#include
#include
#include "common/Timing.hh"
#include
#include
#include
#include
#include
#include
#include
#include
#include "XrdSfs/XrdSfsFlags.hh"
EOSBULKNAMESPACE_BEGIN
PrepareManager::PrepareManager(std::unique_ptr&&
mgmFsInterface): mMgmFsInterface(std::move(mgmFsInterface))
{
}
int PrepareManager::prepare(XrdSfsPrep& pargs, XrdOucErrInfo& error,
const XrdSecEntity* client) noexcept
{
return doPrepare(pargs, error, client);
}
int PrepareManager::prepare(XrdSfsPrep& pargs, XrdOucErrInfo& error,
const common::VirtualIdentity* vid) noexcept
{
XrdSecEntity client;
return doPrepare(pargs, error, &client, vid);
}
void PrepareManager::initializeStagePrepareRequest(XrdOucString& reqid,
const common::VirtualIdentity& vid)
{
// Override the XRootD-supplied request ID. The request ID can be any arbitrary string, so long as
// it is guaranteed to be unique for each request.
//
// Note: To use the default request ID supplied in pargs.reqid, return SFS_OK instead of SFS_DATA.
// Overriding is only possible in the case of PREPARE. In the case of ABORT and QUERY requests,
// pargs.reqid should contain the request ID that was returned by the corresponding PREPARE.
// Request ID = XRootD-generated request ID + timestamp
ostringstream ss;
ss << ':' << time(0);
reqid.append(ss.str().c_str());
}
void PrepareManager::initializeCancelPrepareRequest(XrdOucString& reqid)
{
//Nothing to do as cancellation does not require the creation of an ID
}
int PrepareManager::doPrepare(XrdSfsPrep& pargs, XrdOucErrInfo& error,
const XrdSecEntity* client, const common::VirtualIdentity* vidClient) noexcept
{
EXEC_TIMING_BEGIN("Prepare");
eos_info("prepareOpts=\"%s\"",
PrepareUtils::prepareOptsToString(pargs.opts).c_str());
static const char* epname = mEpname.c_str();
const char* tident;
XrdOucTList* pptr = pargs.paths;
XrdOucTList* optr = pargs.oinfo;
std::string info;
info = (optr ? (optr->text ? optr->text : "") : "");
eos::common::VirtualIdentity vid;
if (vidClient != nullptr) {
vid = *vidClient;
tident = vid.tident.c_str();
} else {
tident = error.getErrUser();
eos::common::Mapping::IdMap(client, info.c_str(), tident, vid);
mMgmFsInterface->addStats("IdMap", vid.uid, vid.gid, 1);
}
ACCESSMODE_W;
MAYSTALL;
{
const char* path = "/";
const char* ininfo = "";
MAYREDIRECT;
}
const int nbFilesProvidedByUser =
eos::common::XrdUtils::countNbElementsInXrdOucTList(pargs.paths);
{
mMgmFsInterface->addStats("Prepare", vid.uid, vid.gid,
nbFilesProvidedByUser);
}
std::string cmd = "mgm.pcmd=event";
std::list> pathsToPrepare;
// Initialise the request ID for the Prepare request to the one provided by XRootD
XrdOucString reqid(pargs.reqid);
// Validate the event type
std::string event;
#if (XrdMajorVNUM(XrdVNUMBER) == 4 && XrdMinorVNUM(XrdVNUMBER) >= 10) || XrdMajorVNUM(XrdVNUMBER) >= 5
// Strip "quality of service" bits from pargs.opts so that only the action to
// be taken is left
const int pargsOptsAction = getPrepareActionsFromOpts(pargs.opts);
// The XRootD prepare actions are mutually exclusive
switch (pargsOptsAction) {
case 0:
if (mMgmFsInterface->isTapeEnabled()) {
mMgmFsInterface->Emsg(epname, error, EINVAL,
"prepare with empty pargs.opts on tape-enabled back-end");
return SFS_ERROR;
}
break;
case Prep_STAGE:
event = "sync::prepare";
mPrepareAction = PrepareAction::STAGE;
initializeStagePrepareRequest(reqid, vid);
break;
case Prep_CANCEL:
mPrepareAction = PrepareAction::ABORT;
initializeCancelPrepareRequest(reqid);
event = "sync::abort_prepare";
break;
case Prep_EVICT:
mPrepareAction = PrepareAction::EVICT;
event = "sync::evict_prepare";
break;
default:
// More than one flag was set or there is an unknown flag
mMgmFsInterface->Emsg(epname, error, EINVAL,
"prepare - invalid value for pargs.opts =",
std::to_string(pargs.opts).c_str());
return SFS_ERROR;
}
#else
// The XRootD prepare flags are mutually exclusive
switch (pargs.opts) {
case 0:
if (mMgmFsInterface.isTapeEnabled()) {
mMgmFsInterface.Emsg(epname, error, EINVAL,
"prepare with empty pargs.opts on tape-enabled back-end");
return SFS_ERROR;
}
break;
case Prep_STAGE:
event = "sync::prepare";
break;
case Prep_FRESH:
event = "sync::abort_prepare";
break;
default:
// More than one flag was set or there is an unknown flag
mMgmFsInterface.Emsg(epname, error, EINVAL,
"prepare - invalid value for pargs.opts =",
std::to_string(pargs.opts).c_str());
return SFS_ERROR;
}
#endif
int error_counter = 0;
XrdOucErrInfo first_error;
struct timespec ts_now;
eos::common::Timing::GetTimeSpec(ts_now);
// check that all files exist
for (
; pptr
; pptr = pptr->next, optr = optr ? optr->next : optr) {
XrdOucString prep_path = (pptr->text ? pptr->text : "");
std::string orig_path = prep_path.c_str();
std::unique_ptr currentFile = nullptr;
EosCtaReporterPrepareReq eosLog([&](const std::string & in) {
mMgmFsInterface->writeEosReportRecord(in);
});
eosLog
.addParam(EosCtaReportParam::SEC_APP, "tape_prepare")
.addParam(EosCtaReportParam::LOG, std::string(mMgmFsInterface->get_logId()))
.addParam(EosCtaReportParam::PATH, orig_path)
.addParam(EosCtaReportParam::RUID, vid.uid)
.addParam(EosCtaReportParam::RGID, vid.gid)
.addParam(EosCtaReportParam::TD, vid.tident.c_str())
.addParam(EosCtaReportParam::HOST, mMgmFsInterface->get_host())
.addParam(EosCtaReportParam::PREP_REQ_REQID, reqid.c_str())
.addParam(EosCtaReportParam::TS, ts_now.tv_sec)
.addParam(EosCtaReportParam::TNS, ts_now.tv_nsec);
eos_info("msg=\"checking file exists\" path=\"%s\"", prep_path.c_str());
{
const char* inpath = prep_path.c_str();
const char* ininfo = "";
NAMESPACEMAP;
//Valgrind Source and destination overlap in strncpy(...)
if (prep_path.c_str() != path) {
prep_path = path;
}
}
{
const char* path = prep_path.c_str();
const char* ininfo = "";
MAYREDIRECT;
}
XrdSfsFileExistence check;
if (prep_path.length() == 0) {
std::string errorMsg = "prepare - path empty or uses forbidden characters";
mMgmFsInterface->Emsg(epname, error, ENOENT,
errorMsg.append(":").c_str(),
orig_path.c_str());
if (error_counter == 0) {
first_error = error;
}
error_counter++;
eosLog
.addParam(EosCtaReportParam::PREP_REQ_SENTTOWFE, false)
.addParam(EosCtaReportParam::PREP_REQ_SUCCESSFUL, false)
.addParam(EosCtaReportParam::PREP_REQ_ERROR, errorMsg);
continue;
}
currentFile = std::make_unique(prep_path.c_str());
if (mMgmFsInterface->_exists(prep_path.c_str(), check, error, vid, "") ||
(check != XrdSfsFileExistIsFile)) {
std::string errorMsg =
"prepare - file does not exist or is not accessible to you";
mMgmFsInterface->Emsg(epname, error, ENOENT,
errorMsg.append(":").c_str(),
prep_path.c_str());
currentFile->setError(errorMsg);
if (error_counter == 0) {
first_error = error;
}
error_counter++;
addFileToBulkRequest(std::move(currentFile));
eosLog
.addParam(EosCtaReportParam::PREP_REQ_SENTTOWFE, false)
.addParam(EosCtaReportParam::PREP_REQ_SUCCESSFUL, false)
.addParam(EosCtaReportParam::PREP_REQ_ERROR, errorMsg);
continue;
}
//Extended attributes for the current file's parent directory
eos::IContainerMD::XAttrMap attributes;
if (!event.empty() &&
mMgmFsInterface->_attr_ls(eos::common::Path(prep_path.c_str()).GetParentPath(),
error, vid,
nullptr, attributes) == 0) {
bool foundPrepareTag = false;
std::string eventAttr = "sys.workflow." + event;
eosLog.addParam(EosCtaReportParam::PREP_REQ_EVENT, event);
for (const auto& attrEntry : attributes) {
foundPrepareTag |= attrEntry.first.find(eventAttr) == 0;
}
if (foundPrepareTag) {
pathsToPrepare.emplace_back(&(pptr->text),
optr != nullptr ? & (optr->text) : nullptr,
std::move(eosLog));
} else {
// don't do workflow if no such tag
std::ostringstream oss;
oss << "No prepare workflow set on the directory " << eos::common::Path(
prep_path.c_str()).GetParentPath();
currentFile->setError(oss.str());
addFileToBulkRequest(std::move(currentFile));
eosLog
.addParam(EosCtaReportParam::PREP_REQ_SENTTOWFE, false)
.addParam(EosCtaReportParam::PREP_REQ_SUCCESSFUL, true);
continue;
}
} else {
// don't do workflow if event not set or we can't check attributes
if (!event.empty()) {
std::ostringstream oss;
oss << "Unable to check the extended attributes of the directory "
<< eos::common::Path(prep_path.c_str()).GetParentPath();
currentFile->setError(oss.str());
eosLog
.addParam(EosCtaReportParam::PREP_REQ_SENTTOWFE, false)
.addParam(EosCtaReportParam::PREP_REQ_SUCCESSFUL, false)
.addParam(EosCtaReportParam::PREP_REQ_ERROR, oss.str());
}
addFileToBulkRequest(std::move(currentFile));
continue;
}
// check that we have write permission on path
// This can only be done after we confirm that there the directory contains a prepare workflow attribute
if (mMgmFsInterface->_access(prep_path.c_str(), P_OK, error, vid, "")) {
std::string errorMsg = "prepare - you don't have prepare permission";
mMgmFsInterface->Emsg(epname, error, EPERM,
errorMsg.append(":").c_str(),
prep_path.c_str());
currentFile->setError(errorMsg);
pathsToPrepare.pop_back();
if (error_counter == 0) {
first_error = error;
}
error_counter++;
addFileToBulkRequest(std::move(currentFile));
eosLog
.addParam(EosCtaReportParam::PREP_REQ_SENTTOWFE, true)
.addParam(EosCtaReportParam::PREP_REQ_SUCCESSFUL, true)
.addParam(EosCtaReportParam::PREP_REQ_ERROR, errorMsg);
continue;
}
if (currentFile != nullptr) {
addFileToBulkRequest(std::move(currentFile));
}
}
try {
saveBulkRequest();
} catch (const PersistencyException& ex) {
return ex.fillXrdErrInfo(error, EIO);
}
if (isStagePrepare() && nbFilesProvidedByUser == error_counter) {
//All stage request failed
eos_err("Unable to prepare - failed to prepare all files with reqID %s",
reqid.c_str());
if (error_counter > 0) {
int err_code;
std::stringstream err_message;
err_message << first_error.getErrText(err_code);
if (error_counter > 1) {
err_message << " (all " << (error_counter - 1) <<
" other files also failed with errors)";
}
error.setErrInfo(err_code, err_message.str().c_str());
}
if (!ignorePrepareFailures()) {
return SFS_ERROR;
}
}
//Trigger the prepare workflow
triggerPrepareWorkflow(pathsToPrepare, cmd, event, reqid, error, vid);
int retc = SFS_OK;
#if (XrdMajorVNUM(XrdVNUMBER) == 4 && XrdMinorVNUM(XrdVNUMBER) >= 10) || XrdMajorVNUM(XrdVNUMBER) >= 5
// If we generated our own request ID, return it to the client
if (isStagePrepare()) {
// If we return SFS_DATA, the first parameter is the length of the buffer, not the error code
error.setErrInfo(reqid.length() + 1, reqid.c_str());
retc = SFS_DATA;
} else {
if (error_counter > 0) {
if (!ignorePrepareFailures()) {
int err_code;
std::stringstream err_message;
err_message << first_error.getErrText(err_code);
if (error_counter > 1) {
err_message << " (" << (error_counter - 1) <<
" other files also failed with errors)";
}
error.setErrInfo(err_code, err_message.str().c_str());
retc = SFS_ERROR;
}
}
}
#endif
EXEC_TIMING_END("Prepare");
return retc;
}
void PrepareManager::saveBulkRequest()
{
}
bool PrepareManager::ignorePrepareFailures()
{
return false;
}
void PrepareManager::addFileToBulkRequest(std::unique_ptr&& file)
{
//The normal PrepareManager does not have any bulk-request, do nothing
// Sub-classes may decide to implement this member function
}
const int PrepareManager::getPrepareActionsFromOpts(const int pargsOpts) const
{
const int pargsOptsQoS = Prep_PMASK | Prep_SENDAOK | Prep_SENDERR | Prep_SENDACK
| Prep_WMODE | Prep_COLOC | Prep_FRESH;
return pargsOpts & ~pargsOptsQoS;
}
const bool PrepareManager::isStagePrepare() const
{
return mPrepareAction == PrepareAction::STAGE;
}
void PrepareManager::triggerPrepareWorkflow(
std::list>& pathsToPrepare,
const std::string& cmd, const std::string& event, const XrdOucString& reqid,
XrdOucErrInfo& error, const eos::common::VirtualIdentity& vid)
{
for (auto& pathTuple : pathsToPrepare) {
EosCtaReporterPrepareReq eosLog = std::move(std::get<2>(pathTuple));
XrdOucString prep_path = (*std::get<0>(pathTuple) ? *std::get<0>
(pathTuple) : "");
{
const char* inpath = prep_path.c_str();
const char* ininfo = "";
NAMESPACEMAP;
//Valgrind Source and destination overlap in strncpy(...)
if (prep_path.c_str() != path) {
prep_path = path;
}
}
XrdOucString prep_info = std::get<1>(pathTuple) != nullptr ?
(*std::get<1>(pathTuple) ? *std::get<1>(pathTuple) : "") : "";
eos_info("msg=\"about to trigger WFE\" path=\"%s\" info=\"%s\"",
prep_path.c_str(), prep_info.c_str());
XrdOucEnv prep_env(prep_info.c_str());
prep_info = cmd.c_str();
prep_info += "&mgm.event=";
prep_info += event.c_str();
prep_info += "&mgm.workflow=";
if (prep_env.Get("eos.workflow")) {
prep_info += prep_env.Get("eos.workflow");
} else {
prep_info += "default";
}
prep_info += "&mgm.fid=0&mgm.path=";
prep_info += prep_path.c_str();
prep_info += "&mgm.logid=";
prep_info += this->logId;
prep_info += "&mgm.ruid=";
prep_info += (int)vid.uid;
prep_info += "&mgm.rgid=";
prep_info += (int)vid.gid;
prep_info += "&mgm.reqid=";
prep_info += reqid.c_str();
if (prep_env.Get("activity")) {
prep_info += "&activity=";
prep_info += prep_env.Get("activity");
}
XrdSecEntity lClient(vid.prot.c_str());
lClient.name = (char*) vid.name.c_str();
lClient.tident = (char*) vid.tident.c_str();
lClient.host = (char*) vid.host.c_str();
XrdOucString lSec = "&mgm.sec=";
lSec += eos::common::SecEntity::ToKey(&lClient,
"eos").c_str();
prep_info += lSec;
XrdSfsFSctl args;
args.Arg1 = prep_path.c_str();
args.Arg1Len = prep_path.length();
args.Arg2 = prep_info.c_str();
args.Arg2Len = prep_info.length();
auto ret_wfe = mMgmFsInterface->FSctl(SFS_FSCTL_PLUGIN, args, error, &lClient);
// Log errors but continue to process the rest of the files in the list
if (ret_wfe != SFS_DATA) {
std::ostringstream oss;
oss << "Unable to prepare - synchronous prepare workflow error " <<
prep_path.c_str() << "; " << error.getErrText();
eos_err(oss.str().c_str());
eosLog
.addParam(EosCtaReportParam::PREP_REQ_SENTTOWFE, false)
.addParam(EosCtaReportParam::PREP_REQ_SUCCESSFUL, false)
.addParam(EosCtaReportParam::PREP_REQ_ERROR, oss.str());
} else {
eosLog
.addParam(EosCtaReportParam::PREP_REQ_SENTTOWFE, true)
.addParam(EosCtaReportParam::PREP_REQ_SUCCESSFUL, true);
}
}
}
std::unique_ptr PrepareManager::queryPrepare(
XrdSfsPrep& pargs, XrdOucErrInfo& error, const XrdSecEntity* client)
{
std::unique_ptr queryPrepareResult(new
QueryPrepareResult());
int retCode = doQueryPrepare(pargs, error, client, *queryPrepareResult);
queryPrepareResult->setReturnCode(retCode);
return queryPrepareResult;
}
std::unique_ptr PrepareManager::queryPrepare(
XrdSfsPrep& pargs, XrdOucErrInfo& error,
const common::VirtualIdentity* vidClient)
{
std::unique_ptr queryPrepareResult(new
QueryPrepareResult());
int retCode = doQueryPrepare(pargs, error, nullptr, *queryPrepareResult,
vidClient);
queryPrepareResult->setReturnCode(retCode);
return queryPrepareResult;
}
int PrepareManager::doQueryPrepare(XrdSfsPrep& pargs, XrdOucErrInfo& error,
const XrdSecEntity* client, QueryPrepareResult& result,
const common::VirtualIdentity* vidClient)
{
EXEC_TIMING_BEGIN("QueryPrepare");
ACCESSMODE_R;
eos_info("cmd=\"_prepare_query\"");
eos::common::VirtualIdentity vid;
if (vidClient != nullptr) {
vid = *vidClient;
} else {
const char* tident = error.getErrUser();
XrdOucTList* optr = pargs.oinfo;
std::string info(optr && optr->text ? optr->text : "");
eos::common::Mapping::IdMap(client, info.c_str(), tident, vid);
}
MAYSTALL;
{
const char* path = "/";
const char* ininfo = "";
MAYREDIRECT;
}
// ID of the original prepare request. We don't need this to look up the list of files in
// the request, as they are provided in the arguments. Anyway we return it in the reply
// as a convenience for the client to track which prepare request the query applies to.
XrdOucString reqid(pargs.reqid);
int path_cnt = 0;
FileCollection filesToQueryCollection;
for (XrdOucTList* pptr = pargs.paths; pptr; pptr = pptr->next) {
if (!pptr->text) {
continue;
}
filesToQueryCollection.addFile(std::make_unique(pptr->text));
++path_cnt;
}
mMgmFsInterface->addStats("QueryPrepare", vid.uid, vid.gid, path_cnt);
auto filesToQuery = filesToQueryCollection.getAllFiles();
std::shared_ptr response = result.getResponse();
// Set the queryPrepareFileResponses for each file in the list
for (auto& file : *filesToQuery) {
response->responses.push_back(QueryPrepareFileResponse(file->getPath()));
auto& rsp = response->responses.back();
auto currentFile = file;
// check if the file exists
XrdOucString prep_path;
{
const char* inpath = rsp.path.c_str();
const char* ininfo = "";
NAMESPACEMAP;
prep_path = path;
}
{
const char* path = rsp.path.c_str();
const char* ininfo = "";
MAYREDIRECT;
}
//Initialization of variables
XrdOucErrInfo xrd_error;
struct stat buf;
eos::IFileMD::XAttrMap xattrs;
XrdSfsFileExistence check;
if (prep_path.length() == 0) {
currentFile->setErrorIfNotAlreadySet("USER ERROR: path empty or uses forbidden characters");
goto logErrorAndContinue;
}
if (mMgmFsInterface->_exists(prep_path.c_str(), check, error, vid, "") ||
check != XrdSfsFileExistIsFile) {
currentFile->setErrorIfNotAlreadySet("USER ERROR: file does not exist or is not accessible to you");
goto logErrorAndContinue;
}
rsp.is_exists = true;
// Check file state (online/offline)
if (mMgmFsInterface->_stat(rsp.path.c_str(), &buf, xrd_error, vid, nullptr,
nullptr, false)) {
currentFile->setErrorIfNotAlreadySet(xrd_error.getErrText());
goto logErrorAndContinue;
}
mMgmFsInterface->_stat_set_flags(&buf);
rsp.is_on_tape = buf.st_rdev & XRDSFS_HASBKUP;
rsp.is_online = !(buf.st_rdev & XRDSFS_OFFLINE);
// Check file status in the extended attributes
if (mMgmFsInterface->_attr_ls(eos::common::Path(prep_path.c_str()).GetPath(),
xrd_error, vid,
nullptr, xattrs) == 0) {
auto xattr_it = xattrs.find(eos::common::RETRIEVE_REQID_ATTR_NAME);
if (xattr_it != xattrs.end()) {
// has file been requested? (not necessarily with this request ID)
rsp.is_requested = !xattr_it->second.empty();
// and is this specific request ID present in the request?
rsp.is_reqid_present = (xattr_it->second.find(reqid.c_str()) != string::npos);
}
xattr_it = xattrs.find(eos::common::RETRIEVE_REQTIME_ATTR_NAME);
if (xattr_it != xattrs.end()) {
rsp.request_time = xattr_it->second;
}
xattr_it = xattrs.find(eos::common::RETRIEVE_ERROR_ATTR_NAME);
if (xattr_it == xattrs.end()) {
// If there is no retrieve error, check for an archive error
xattr_it = xattrs.find(eos::common::ARCHIVE_ERROR_ATTR_NAME);
}
if (xattr_it != xattrs.end()) {
currentFile->setErrorIfNotAlreadySet(xattr_it->second);
}
} else {
// failed to read extended attributes
currentFile->setErrorIfNotAlreadySet(xrd_error.getErrText());
goto logErrorAndContinue;
}
if (mMgmFsInterface->_access(prep_path.c_str(), P_OK, error, vid, "")) {
currentFile->setError("USER ERROR: you don't have prepare permission"s);
goto logErrorAndContinue;
}
logErrorAndContinue:
if (currentFile->getError()) {
rsp.error_text = currentFile->getError().value();
}
}
response->request_id = reqid.c_str();
/*
json_ss << "{"
<< "\"request_id\":\"" << reqid << "\","
<< "\"responses\":[";
bool is_first(true);
for (auto& r : response) {
if (is_first) {
is_first = false;
} else {
json_ss << ",";
}
json_ss << r;
}
json_ss << "]"
<< "}";
*/
result.setQueryPrepareFinished();
EXEC_TIMING_END("QueryPrepare");
return SFS_DATA;
}
EOSBULKNAMESPACE_END