//------------------------------------------------------------------------------
//! @file ProcDirectoryBulkRequestDAO.hh
//! @author Cedric Caffy - CERN
//------------------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2017 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "ProcDirectoryBulkRequestDAO.hh"
#include "mgm/Stat.hh"
#include "mgm/bulk-request/exception/PersistencyException.hh"
#include
#include
#include "namespace/interface/IView.hh"
#include "mgm/proc/ProcCommand.hh"
#include "mgm/bulk-request/BulkRequestFactory.hh"
EOSBULKNAMESPACE_BEGIN
ProcDirectoryBulkRequestDAO::ProcDirectoryBulkRequestDAO(XrdMgmOfs* fileSystem,
const ProcDirectoryBulkRequestLocations& procDirectoryBulkRequestLocations):
mFileSystem(fileSystem),
mProcDirectoryBulkRequestLocations(procDirectoryBulkRequestLocations),
mVid(common::VirtualIdentity::Root())
{
}
void ProcDirectoryBulkRequestDAO::saveBulkRequest(const StageBulkRequest*
bulkRequest)
{
std::string directoryBulkReqPath = generateBulkRequestProcPath(bulkRequest);
try {
if (bulkRequest->getFiles()->size() == 0) {
std::ostringstream oss;
oss << "In ProcDirectoryBulkRequestDAO::saveBulkRequest(), unable to persist the bulk-request id="
<< bulkRequest->getId() << " because it does not contain any files";
throw PersistencyException(oss.str());
}
//The bulk-request directory will have one extended attribute per file belonging to the bulk-request
//The persistency consists of creating the directory, and set the extended attribute representing each file
//The key of the extended attribute will be the fid of the file, the value will be the eventual error that
//a file can have (prepare submission error...)
eos_debug("msg=\"Persistence of the bulk request %s : creating the directory %s\"",
bulkRequest->getId().c_str(),
directoryBulkReqPath.c_str());
createBulkRequestDirectory(bulkRequest, directoryBulkReqPath);
eos_debug("msg=\"Persistence of the bulk request %s : creating the xattrs map from the bulk-request paths\"",
bulkRequest->getId().c_str());
eos::IContainerMD::XAttrMap xattrs;
generateXattrsMapFromBulkRequest(bulkRequest, xattrs);
eos_debug("msg=\"Persistence of the bulk request %s : persisting the bulk-request information in the directory %s\"",
bulkRequest->getId().c_str(), directoryBulkReqPath.c_str());
persistBulkRequestDirectory(directoryBulkReqPath, xattrs);
} catch (const PersistencyException& ex) {
cleanAfterExceptionHappenedDuringBulkRequestSave(directoryBulkReqPath);
throw ex;
}
}
void ProcDirectoryBulkRequestDAO::saveBulkRequest(const CancellationBulkRequest*
bulkRequest)
{
cancelStageBulkRequest(bulkRequest);
}
void ProcDirectoryBulkRequestDAO::cancelStageBulkRequest(
const CancellationBulkRequest* bulkRequest)
{
std::string bulkRequestProcPath = generateBulkRequestProcPath(bulkRequest);
if (bulkRequest->getFiles()->size() == 0) {
std::ostringstream oss;
oss << "In ProcDirectoryBulkRequestDAO::cancelStageBulkRequest(), unable to cancel the bulk-request id="
<< bulkRequest->getId() << " because it does not contain any files";
throw PersistencyException(oss.str());
}
eos::IContainerMD::XAttrMap xattrs;
generateXattrsMapFromBulkRequest(bulkRequest, xattrs);
persistBulkRequestDirectory(bulkRequestProcPath, xattrs);
}
void ProcDirectoryBulkRequestDAO::generateXattrsMapFromBulkRequest(
const BulkRequest* bulkRequest, eos::IContainerMD::XAttrMap& xattrs)
{
// Set last access time of the bulk-request directory
std::time_t now = std::time(nullptr);
std::string nowStr = std::to_string(now);
xattrs[LAST_ACCESS_TIME_XATTR_NAME] = nowStr;
}
void ProcDirectoryBulkRequestDAO::generateXattrsMapFromBulkRequest(
const StageBulkRequest* bulkRequest, eos::IContainerMD::XAttrMap& xattrs)
{
generateXattrsMapFromBulkRequest(static_cast(bulkRequest),
xattrs);
xattrs[ISSUER_UID_XATTR_NAME] = std::to_string(bulkRequest->getIssuerVid().uid);
xattrs[CREATION_TIME_XATTR_NAME] = std::to_string(
bulkRequest->getCreationTime());
std::map> filesWithMDFutures;
const auto files = bulkRequest->getFiles();
for (auto& file : *files) {
std::string path = file->getPath();
std::pair> itemToInsert(*file,
mFileSystem->eosView->getFileFut(path , false));
filesWithMDFutures.emplace(std::move(itemToInsert));
}
for (auto& fileMd : filesWithMDFutures) {
fileMd.second.wait();
}
for (auto& fileWithMDFuture : filesWithMDFutures) {
const std::string currentFilePath = fileWithMDFuture.first.getPath();
std::shared_ptr file;
std::string fid;
try {
eos::common::RWMutexReadLock nsLock(mFileSystem->eosViewRWMutex);
file = mFileSystem->eosView->getFile(currentFilePath);
fid = std::to_string(file->getId());
} catch (const eos::MDException& ex) {
//The file does not exist, we will store the path with URL encoding
std::string encodedFilePath =
eos::common::StringConversion::curl_default_escaped(currentFilePath);
// curl encoding does not convert dots '.', so we need to do this explicitly
eos::common::StringConversion::ReplaceStringInPlace(encodedFilePath, ".",
"%2E");
fid = encodedFilePath;
} catch (const std::exception& ex) {
std::ostringstream errMsg;
errMsg << "In ProcDirectoryBulkRequestDAO::generateXattrsMapFromBulkRequest(), got a standard exception trying to get informations about the file "
<< currentFilePath << " ExceptionWhat=\"" << ex.what() << "\"";
throw PersistencyException(errMsg.str());
}
//If a potential error has been set for this file, adding it in the value corresponding
//to this file's extended attribute
xattrs[FILE_ID_XATTR_KEY_PREFIX + fid] = "";
auto error = fileWithMDFuture.first.getError();
if (error) {
xattrs[FILE_ID_XATTR_KEY_PREFIX + fid] = *error;
}
}
}
void ProcDirectoryBulkRequestDAO::persistBulkRequestDirectory(
const std::string& directoryBulkReqPath,
const eos::IContainerMD::XAttrMap& xattrs)
{
std::shared_ptr bulkReqDirMd;
{
eos::common::RWMutexWriteLock nsLock(mFileSystem->eosViewRWMutex);
try {
bulkReqDirMd = mFileSystem->eosView->getContainer(directoryBulkReqPath);
for (auto& xattr : xattrs) {
bulkReqDirMd->setAttribute(xattr.first, xattr.second);
}
mFileSystem->eosView->updateContainerStore(bulkReqDirMd.get());
} catch (const eos::MDException& ex) {
std::ostringstream oss;
oss << "In ProcDirectoryBulkRequestDAO::persistBulkRequestDirectory(): unable to persist the bulk-request in the directory "
<< directoryBulkReqPath
<< "ExceptionWhat=\"" << ex.what() << "\"";
throw PersistencyException(oss.str());
}
}
}
void ProcDirectoryBulkRequestDAO::createBulkRequestDirectory(
const BulkRequest* bulkRequest, const std::string& bulkReqProcPath)
{
XrdOucErrInfo error;
int directoryCreationRetCode = mFileSystem->_mkdir(bulkReqProcPath.c_str(),
S_IFDIR | S_IRWXU, error, mVid);
if (directoryCreationRetCode != SFS_OK) {
std::ostringstream errMsg;
errMsg << "In ProcDirectoryBulkRequestDAO::createBulkRequestDirectory(), could not create the directory to save the bulk-request id="
<< bulkRequest->getId()
<< " XrdOfsErrMsg=\"" << error.getErrText() << "\"";
throw PersistencyException(errMsg.str());
}
}
std::string ProcDirectoryBulkRequestDAO::generateBulkRequestProcPath(
const BulkRequest* bulkRequest)
{
return generateBulkRequestProcPath(bulkRequest->getId(),
bulkRequest->getType());
}
std::string ProcDirectoryBulkRequestDAO::generateBulkRequestProcPath(
const std::string& bulkRequestId, const BulkRequest::Type& type)
{
return mProcDirectoryBulkRequestLocations.getDirectoryPathWhereBulkRequestCouldBeSaved(
type) + bulkRequestId;
}
/*
void ProcDirectoryBulkRequestDAO::insertBulkRequestFilesToBulkRequestDirectory(const BulkRequest * bulkRequest, const std::string & bulkReqProcPath) {
const auto & files = *bulkRequest->getFiles();
//Map of files associated to the future object for the in-memory prefetching of the file informations
std::map> filesWithMDFutures;
for(auto & file : files){
std::string path = file.first;
std::pair> itemToInsert(*file.second,mFileSystem->eosView->getFileFut(path , false));
filesWithMDFutures.emplace(std::move(itemToInsert));
}
for(auto & fileMd: filesWithMDFutures){
fileMd.second.wait();
}
for(auto & fileWithMDFuture : filesWithMDFutures){
const std::string & currentFilePath = fileWithMDFuture.first.getPath();
std::ostringstream pathOfFileToTouch;
pathOfFileToTouch << bulkReqProcPath << "/";
std::shared_ptr file;
try {
eos::common::RWMutexReadLock nsLock(mFileSystem->eosViewRWMutex);
file = mFileSystem->eosView->getFile(currentFilePath);
pathOfFileToTouch << file->getId();
} catch (const eos::MDException &ex){
//The file does not exist, we will store the path under the same format as it is in the recycle bin
std::string newFilePath = currentFilePath;
common::StringConversion::ReplaceStringInPlace(newFilePath,"/","#:#");
pathOfFileToTouch << newFilePath;
} catch(const std::exception &ex){
std::ostringstream errMsg;
errMsg << "In ProcDirectoryBulkRequestDAO::insertBulkRequestFilesToBulkRequestDirectory(), got a standard exception trying to get informations about the file "
<< currentFilePath << " ExceptionWhat=\"" << ex.what() << "\"";
throw PersistencyException(errMsg.str());
}
{
//Low level system call
try {
std::shared_ptr fmd;
{
eos::common::RWMutexWriteLock(mFileSystem->eosViewRWMutex);
auto fmd = mFileSystem->eosView->createFile(pathOfFileToTouch.str(),
vid.uid, vid.gid);
fmd->setSize(0);
if (fileWithMDFuture.first.getError()) {
fmd->setAttribute(ERROR_MSG_XATTR_NAME,
fileWithMDFuture.first.getError().value());
}
mFileSystem->eosView->updateFileStore(fmd.get());
}
} catch (const eos::MDException &ex) {
std::ostringstream errMsg;
errMsg << "In ProcDirectoryBulkRequestDAO::insertBulkRequestFilesToBulkRequestDirectory(), could not create the file to save the file "
<< pathOfFileToTouch.str() << " that belongs to the bulk-request id="
<< bulkRequest->getId() << " ExceptionWhat=\"" << ex.what() << "\"";
throw PersistencyException(errMsg.str());
}
}
}
updateLastAccessTime(bulkReqProcPath);
}
*/
void ProcDirectoryBulkRequestDAO::cleanAfterExceptionHappenedDuringBulkRequestSave(
const std::string& bulkReqProcPath) noexcept
{
try {
deleteDirectory(bulkReqProcPath);
} catch (const PersistencyException& ex) {
std::ostringstream debugMsg;
debugMsg <<
"In ProcDirectoryBulkRequestDAO::cleanAfterExceptionHappenedDuringBulkRequestSave() "
<< "unable to clean the directory " << bulkReqProcPath << "ErrorMsg=\"" <<
ex.what() << "\"";
eos_debug(debugMsg.str().c_str());
}
}
void ProcDirectoryBulkRequestDAO::deleteDirectory(const std::string& path)
{
if (existsAndIsDirectory(path)) {
// execute a proc command
ProcCommand Cmd;
XrdOucString info;
// we do a recursive deletion
info = "mgm.cmd=rm&mgm.option=r&mgm.retc=1&mgm.path=";
info += path.c_str();
XrdOucErrInfo lError;
int result = Cmd.open("/proc/user", info.c_str(), mVid, &lError);
Cmd.close();
if (result == SFS_ERROR) {
throw PersistencyException(lError.getErrText());
}
}
}
std::unique_ptr ProcDirectoryBulkRequestDAO::getBulkRequest(
const std::string& id, const BulkRequest::Type& type)
{
std::unique_ptr bulkRequest = nullptr;
std::string bulkRequestProcPath = this->generateBulkRequestProcPath(id, type);
try {
if (existsAndIsDirectory(bulkRequestProcPath)) {
// Directory exists, the bulk-request can be fetched
// Update the last access time of the bulk-request directory
updateLastAccessTime(bulkRequestProcPath);
//Get all the extended attributes of the directory
eos::IContainerMD::XAttrMap xattrs;
fetchExtendedAttributes(bulkRequestProcPath, xattrs);
switch (type) {
case BulkRequest::PREPARE_STAGE: {
bulkRequest = initializeStageBulkRequestFromXattrs(id, xattrs);
break;
}
default:
std::stringstream ss;
ss << "The bulk-request has a type (" << BulkRequest::bulkRequestTypeToString(
type) << ") that cannot be persisted";
throw PersistencyException(ss.str());
}
}
} catch (const PersistencyException& ex) {
std::ostringstream oss;
oss << "In ProcDirectoryBulkRequestDAO::getBulkRequest(): unable to get the bulk request from the persistency layer "
<< "ErrorMsg=\"" << ex.what() << "\"";
throw PersistencyException(oss.str());
}
return bulkRequest;
}
bool ProcDirectoryBulkRequestDAO::existsAndIsDirectory(const std::string&
dirPath)
{
XrdOucErrInfo error;
XrdSfsFileExistence fileExistence;
int retCode = mFileSystem->_exists(dirPath.c_str(), fileExistence, error, mVid);
if (retCode != SFS_OK) {
std::ostringstream oss;
oss << "In ProcDirectoryBulkRequestDAO::existsAndIsDirectory(), could not get information about the existence of the directory "
<< dirPath << " XrdOfsErrMsg=\"" << error.getErrText() << "\"";
eos_err(oss.str().c_str());
throw PersistencyException(oss.str());
}
return fileExistence == XrdSfsFileExistIsDirectory;
}
std::unique_ptr
ProcDirectoryBulkRequestDAO::initializeStageBulkRequestFromXattrs(
const std::string& requestId, const eos::IContainerMD::XAttrMap& xattrs)
{
common::VirtualIdentity vid;
time_t creationTime;
try {
vid.uid = ::strtoul(xattrs.at(ISSUER_UID_XATTR_NAME).c_str(), nullptr, 0);
creationTime = ::strtoul(xattrs.at(CREATION_TIME_XATTR_NAME).c_str(), nullptr,
0);
} catch (const std::out_of_range& ex) {
throw PersistencyException("Unable to fetch the attributes to create the stage bulk-request");
}
std::unique_ptr stageBulkRequest =
BulkRequestFactory::createStageBulkRequest(requestId, vid, creationTime);
fillBulkRequestFromXattrs(stageBulkRequest.get(), xattrs);
return stageBulkRequest;
}
void ProcDirectoryBulkRequestDAO::fillBulkRequestFromXattrs(
bulk::BulkRequest* bulkRequest, const eos::IContainerMD::XAttrMap& xattrs)
{
XrdOucErrInfo errFind;
XrdOucString stdErr;
std::map>
filesInBulkReqProcDirWithFuture;
std::vector fileWithPaths;
for (auto& fileIdInfos : xattrs) {
std::string fileIdOrObfuscatedPath = fileIdInfos.first;
std::optional currentFileError;
size_t pos = fileIdOrObfuscatedPath.find(FILE_ID_XATTR_KEY_PREFIX, 0);
if (pos != std::string::npos) {
//We have a file, get its potential error
fileIdOrObfuscatedPath.erase(0, FILE_ID_XATTR_KEY_PREFIX.length());
//The error is stored in the value assciated to the key FILE_ID_XATTR_KEY_PREFIX
if (!fileIdInfos.second.empty()) {
currentFileError = fileIdInfos.second;
}
} else {
continue;
}
//The files in the bulk-request proc directory will be wrapped into a ProcDirBulkRequestFile object.
ProcDirBulkRequestFile file(fileIdOrObfuscatedPath);
if (currentFileError) {
file.setError(*currentFileError);
}
try {
//The file name is normally a fid. But if the file submitted before did not exist, the path will be stored in another format (e.g: URL encoding)
eos::common::FileId::fileid_t fid = std::stoull(file.getName());
file.setFileId(fid);
initiateFileMDFetch(file, filesInBulkReqProcDirWithFuture);
} catch (std::invalid_argument& ex) {
// The current file is not a fid, it is therefore a file that has the URL encoding
// It may also have the old format #:#eos#:#test#:#testFile.txt (#:# replaced by '/')
std::string filePathCopy = file.getName();
if (filePathCopy.find("#:#") != std::string::npos) {
// TODO: Remove this once no more bulk requests exist with the format #:#eos#:#test#:#testFile.txt
common::StringConversion::ReplaceStringInPlace(filePathCopy, "#:#", "/");
} else {
filePathCopy = eos::common::StringConversion::curl_default_unescaped(
filePathCopy);
}
std::unique_ptr bulkRequestFile = std::make_unique(filePathCopy);
bulkRequestFile->setError(file.getError());
bulkRequest->addFile(std::move(bulkRequestFile));
}
}
getFilesPathAndAddToBulkRequest(filesInBulkReqProcDirWithFuture, bulkRequest);
}
void ProcDirectoryBulkRequestDAO::getDirectoryContent(const std::string& path,
std::map>& directoryContent)
{
XrdOucErrInfo error;
XrdOucString stdErr;
if (mFileSystem->_find(path.c_str(), error, stdErr, mVid,
directoryContent) == SFS_ERROR) {
std::ostringstream oss;
oss << "In ProcDirectoryBulkRequestDAO::getDirectoryContent(), could not list the content of the directory "
<< path << " XrdOfsErrMsg=" << error.getErrText();
eos_err(oss.str().c_str());
throw PersistencyException(oss.str());
}
//Drop the top directory: it does not belong to its content
directoryContent.erase(path);
}
void ProcDirectoryBulkRequestDAO::fetchExtendedAttributes(
const std::string& path, eos::IContainerMD::XAttrMap& xattrs)
{
XrdOucErrInfo error;
if (mFileSystem->_attr_ls(path.c_str(), error, mVid, nullptr,
xattrs) == SFS_ERROR) {
std::ostringstream oss;
oss << "In ProcDirectoryBulkRequestDAO::fetchExtendedAttributes() Unable to get the extended attribute of the file "
<< path << " XrdOfsErrMsg=" << error.getErrText();
eos_err(oss.str().c_str());
throw PersistencyException(oss.str());
}
}
void ProcDirectoryBulkRequestDAO::initiateFileMDFetch(const
ProcDirBulkRequestFile& file,
std::map>& filesWithFuture)
{
std::pair> fileWithFuture(
file, mFileSystem->eosFileService->getFileMDFut(file.getFileId().value()));
filesWithFuture.emplace(std::move(fileWithFuture));
}
void ProcDirectoryBulkRequestDAO::getFilesPathAndAddToBulkRequest(
std::map>& filesWithFuture,
BulkRequest* bulkRequest)
{
eos::common::RWMutexReadLock lock(mFileSystem->eosViewRWMutex);
for (auto& fileWithFuture : filesWithFuture) {
try {
fileWithFuture.second.wait();
std::shared_ptr fmd = mFileSystem->eosFileService->getFileMD(
fileWithFuture.first.getFileId().value());
std::unique_ptr bulkReqFile = std::make_unique
(mFileSystem->eosView->getUri(fmd.get()));
bulkReqFile->setError(fileWithFuture.first.getError());
bulkRequest->addFile(std::move(bulkReqFile));
} catch (const eos::MDException& ex) {
//We could not get any information about this file (might have been deleted for example)
//log this as a warning and remove this file
std::stringstream ss;
ss << "In ProcDirectoryBulkRequestDAO::getFilesPathAndAddToBulkRequest(), unable to get the metadata of the file id="
<< fileWithFuture.first.getFileId().value()
<< " ErrorMsg=\"" << ex.what() << "\"";
eos_warning(ss.str().c_str());
}
}
}
uint64_t ProcDirectoryBulkRequestDAO::deleteBulkRequestNotQueriedFor(
const BulkRequest::Type& type, const std::chrono::seconds& seconds)
{
std::string bulkRequestsPath =
mProcDirectoryBulkRequestLocations.getDirectoryPathWhereBulkRequestCouldBeSaved(
type);
std::map> allBulkRequestDirectories;
getDirectoryContent(bulkRequestsPath, allBulkRequestDirectories);
//Now get the last access time of each directory
std::set bulkRequestDirectoriesToDelete;
uint64_t nbDeletedBulkRequests = 0;
for (auto& kv : allBulkRequestDirectories) {
eos::IContainerMD::XAttrMap xattrs;
fetchExtendedAttributes(kv.first, xattrs);
try {
std::string lastAccessTimeStr = xattrs.at(LAST_ACCESS_TIME_XATTR_NAME);
std::time_t lastAccessTime = std::atoi(lastAccessTimeStr.c_str());
time_t elapsedTimeBetweenNowAndLastAccessTime = std::time(
nullptr) - lastAccessTime;
if (elapsedTimeBetweenNowAndLastAccessTime > seconds.count()) {
deleteDirectory(kv.first);
nbDeletedBulkRequests++;
eos_info("msg=\"Deleted a bulk request from the /proc/ persistency\" path=\"%s\"",
kv.first.c_str());
}
} catch (const std::out_of_range&) {
//The extended attribute LAST_ACCESS_TIME_XATTR_NAME was not found, log an error
eos_err("In ProcDirectoryBulkRequestDAO::deleteBulkRequestNotQueriedFor(), the directory %s does not have the %s extended attribute set. "
"Unable to know if it can be deleted or not.", kv.first.c_str(),
LAST_ACCESS_TIME_XATTR_NAME);
}
}
return nbDeletedBulkRequests;
}
void ProcDirectoryBulkRequestDAO::setExtendedAttribute(const std::string& path,
const std::string& xattrName, const std::string& xattrValue)
{
XrdOucErrInfo error;
int retAttrSet = mFileSystem->_attr_set(path.c_str(), error, mVid, nullptr,
xattrName.c_str(), xattrValue.c_str());
if (retAttrSet != SFS_OK) {
std::ostringstream oss;
oss << "In ProcDirectoryBulkRequestDAO::setExtendedAttribute(), could not set the extended attribute "
<< xattrName << " to the file path "
<< path << " XrdOfsErrMsg=\"" << error.getErrText() << "\"";
eos_err(oss.str().c_str());
throw PersistencyException(oss.str());
}
}
void ProcDirectoryBulkRequestDAO::updateLastAccessTime(const std::string& path)
{
std::time_t now = std::time(nullptr);
std::string nowStr = std::to_string(now);
setExtendedAttribute(path, LAST_ACCESS_TIME_XATTR_NAME, nowStr);
}
bool ProcDirectoryBulkRequestDAO::exists(const std::string& bulkRequestId,
const BulkRequest::Type& type)
{
std::string bulkRequestPath = generateBulkRequestProcPath(bulkRequestId, type);
return existsAndIsDirectory(bulkRequestPath);
}
void ProcDirectoryBulkRequestDAO::deleteBulkRequest(const BulkRequest*
bulkRequest)
{
std::string bulkRequestPath = generateBulkRequestProcPath(bulkRequest);
deleteDirectory(bulkRequestPath);
}
EOSBULKNAMESPACE_END