//------------------------------------------------------------------------------
// File: XrdFstOfsFile.cc
//------------------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2011 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied waDon'trranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "fst/XrdFstOfsFile.hh"
#include "fst/XrdFstOfs.hh"
#include "fst/Config.hh"
#include "common/Constants.hh"
#include "common/Path.hh"
#include "common/http/OwnCloud.hh"
#include "common/StringTokenizer.hh"
#include "common/StringUtils.hh"
#include "common/SecEntity.hh"
#include "common/CtaCommon.hh"
#include "common/IoPriority.hh"
#include "common/Timing.hh"
#include "common/xrootd-ssi-protobuf-interface/eos_cta/include/CtaFrontendApi.hpp"
#include "fst/layout/Layout.hh"
#include "fst/layout/LayoutPlugin.hh"
#include "fst/checksum/ChecksumPlugins.hh"
#include "fst/storage/FileSystem.hh"
#include "XrdOss/XrdOssApi.hh"
#include "fst/io/FileIoPluginCommon.hh"
#include "namespace/utils/Etag.hh"
#include "XrdOuc/XrdOucPgrwUtils.hh"
extern XrdOss* XrdOfsOss;
EOSFSTNAMESPACE_BEGIN
constexpr uint16_t XrdFstOfsFile::msDefaultTimeout;
thread_local int t_iopriority = 0;
//------------------------------------------------------------------------------
// Get TPC key expiration timestamp
//------------------------------------------------------------------------------
time_t
XrdFstOfsFile::GetTpcKeyExpireTS(std::string_view tpc_ttl, time_t now_ts)
{
using namespace std::chrono;
time_t now = time(nullptr);
if (now_ts) {
now = now_ts;
}
time_t expire_ts = now + gOFS.mTpcKeyMinValidity.count();
if (!tpc_ttl.empty()) {
unsigned int ttl_val = 0ul;
if (eos::common::StringToNumeric(tpc_ttl, ttl_val)) {
if ((ttl_val >= gOFS.mTpcKeyMinValidity.count()) &&
(ttl_val <= gOFS.mTpcKeyMaxValidity.count())) {
expire_ts = now + ttl_val;
} else {
if (ttl_val < gOFS.mTpcKeyMinValidity.count()) {
expire_ts = now + gOFS.mTpcKeyMinValidity.count();
} else {
expire_ts = now + gOFS.mTpcKeyMaxValidity.count();
}
}
}
}
eos_static_debug("msg=\"tpc key validity\" seconds=%u", expire_ts - now);
return expire_ts;
}
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
XrdFstOfsFile::XrdFstOfsFile(const char* user, int MonID) :
XrdOfsFileBase(user, MonID), eos::common::LogId(),
mOpenOpaque(nullptr), mCapOpaque(nullptr), mFstPath(""), mBookingSize(0),
mTargetSize(0), mMinSize(0), mMaxSize(0), viaDelete(false),
mWrDelete(false), mRainSize(0), mNsPath(""), mLocalPrefix(""),
mRedirectManager(""), mTapeEnabled(false), mSecString(""), mEtag(""),
mFileId(0), mFsId(0), mLid(0), mCid(0), mForcedMtime(1), mForcedMtime_ms(0),
mFusex(false), mFusexIsUnlinked(false), closed(false), mOpened(false),
mHasWrite(false), hasWriteError(false), hasReadError(false), mIsRW(false),
mIsDevNull(false), isCreation(false), isReplication(false),
noAtomicVersioning(false),
mIsInjection(false), mRainReconstruct(false), deleteOnClose(false),
repairOnClose(false), mIsOCchunk(false), writeErrorFlag(false),
mEventOnClose(false), mSyncOnClose(false), mEventWorkflow(""),
mSyncEventOnClose(false), mFmd(nullptr), mCheckSum(nullptr),
mLayout(nullptr), mMaxOffsetWritten(0ull),
mWritePosition(0ull), openSize(0),
closeSize(0), mTpcThreadStatus(EINVAL), mTpcState(kTpcIdle),
mTpcFlag(kTpcNone), mTpcKey(""), mIsTpcDst(false), mTpcRetc(0),
mTpcCancel(false), mIsHttp(false)
{
rBytes = wBytes = sFwdBytes = sBwdBytes = sXlFwdBytes
= sXlBwdBytes = rOffset = wOffset = 0;
rStart.tv_sec = wStart.tv_sec = rvStart.tv_sec = rTime.tv_sec = lrTime.tv_sec =
rvTime.tv_sec = lrvTime.tv_sec = 0;
rStart.tv_usec = wStart.tv_usec = rvStart.tv_usec = rTime.tv_usec =
lrTime.tv_usec = rvTime.tv_usec = lrvTime.tv_usec = 0;
wTime.tv_sec = lwTime.tv_sec = cTime.tv_sec = 0;
wTime.tv_usec = lwTime.tv_usec = cTime.tv_usec = 0;
rCalls = wCalls = nFwdSeeks = nBwdSeeks = nXlFwdSeeks = nXlBwdSeeks = 0;
closeTime.tv_sec = closeTime.tv_usec = 0;
currentTime.tv_sec = openTime.tv_usec = 0;
openTime.tv_sec = openTime.tv_usec = 0;
totalBytes = 0;
msSleep = 0;
mBandwidth = 0;
timeToOpen = 0;
timeToClose = 0;
timeToRead = 0;
timeToReadV = 0;
timeToWrite = 0;
tz.tz_dsttime = tz.tz_minuteswest = 0;
mIoPriorityValue = 0;
mIoPriorityClass = IOPRIO_CLASS_NONE;
mIoPriorityErrorReported = false;
}
//------------------------------------------------------------------------------
// Destructor
//------------------------------------------------------------------------------
XrdFstOfsFile::~XrdFstOfsFile()
{
viaDelete = true;
if (!closed) {
close();
}
}
//------------------------------------------------------------------------------
// Open file
//------------------------------------------------------------------------------
int
XrdFstOfsFile::open(const char* path, XrdSfsFileOpenMode open_mode,
mode_t create_mode, const XrdSecEntity* client,
const char* opaque)
{
EPNAME("open");
eos::common::Timing tm("open");
COMMONTIMING("begin", &tm);
const char* tident = error.getErrUser();
SetLogId(ExtractLogId(opaque).c_str(), client, tident);
mTident = error.getErrUser();
char* val = 0;
mIsRW = false;
int retc = SFS_OK;
int envlen = 0;
mNsPath = path;
gettimeofday(&openTime, &tz);
bool hasCreationMode = (open_mode & (SFS_O_CREAT | SFS_O_TRUNC));
bool isRepairRead = false;
// Mask some opaque parameters to shorten the logging
XrdOucString maskOpaque = opaque ? opaque : "";
eos::common::StringConversion::MaskTag(maskOpaque, "cap.sym");
eos::common::StringConversion::MaskTag(maskOpaque, "cap.msg");
eos::common::StringConversion::MaskTag(maskOpaque, "authz");
eos_info("path=%s info=%s open_mode=%x", mNsPath.c_str(),
maskOpaque.c_str(), open_mode);
// Process and filter open opaque information
std::string in_opaque = (opaque ? opaque : "");
in_opaque += "&mgm.path=";
in_opaque += mNsPath.c_str();
//----------------------------------------------------------------------------
// @todo (esindril): This should be dropped after Sept 2018 since it's
// just a temporary fix for an issue on the eos fuse.
//----------------------------------------------------------------------------
FilterTagsInPlace(in_opaque, {"xrdcl.secuid", "xrdcl.secgid"});
// Process TPC information - after this mOpenOpaque and mCapOpaque will be
// properly populated and decrypted.
int tpc_retc = ProcessTpcOpaque(in_opaque, client);
if (tpc_retc == SFS_ERROR) {
eos_err("%s", "msg=\"failed while processing TPC/open opaque\"");
return SFS_ERROR;
} else if (tpc_retc >= SFS_STALL) {
return tpc_retc; // this is stall time in seconds
}
if (ProcessOpenOpaque()) {
eos_err("%s", "msg=\"failed while processing open opaque info\"");
return SFS_ERROR;
}
eos::common::VirtualIdentity vid;
if (ProcessCapOpaque(isRepairRead, vid)) {
eos_err("%s", "msg=\"failed while processing cap opaque info\"");
return SFS_ERROR;
}
if (ProcessMixedOpaque()) {
eos_err("%s", "msg=\"failed while processing mixed opaque info\"");
return SFS_ERROR;
}
// For RAIN layouts if the opaque information contains the tag mgm.rain.store=1
// the corrupted files are recovered back on disk. There is no other way to make
// the distinction between an open for write and open for recovery
if (mCapOpaque && (val = mCapOpaque->Get("mgm.rain.store"))) {
if (strncmp(val, "1", 1) == 0) {
eos_info("%s", "msg=\"enabling RAIN store recovery\"");
open_mode = SFS_O_RDWR;
mRainReconstruct = true;
mHasWrite = true;
// Get logical file size
if ((val = mCapOpaque->Get("mgm.rain.size"))) {
try {
mRainSize = std::stoull(val);
} catch (...) {
// ignore
}
} else {
eos_warning("%s", "msg=\"unknown RAIN file size during reconstruction\"");
}
}
}
if ((open_mode & (SFS_O_WRONLY | SFS_O_RDWR | SFS_O_CREAT | SFS_O_TRUNC))) {
mIsRW = true;
}
// File is supposed to act as a sink, used for draining
if (mNsPath == "/replicate:0") {
if (mIsRW) {
eos_info("%s", "msg=\"file fxid=0 acting as a sink i.e. /dev/null\"");
mIsDevNull = true;
return SFS_OK;
} else {
eos_info("%s", "msg=\"sink file i.e. /dev/null can only be opened for RW\"");
return gOFS.Emsg(epname, error, EIO, "open - sink file can only be "
"opened RW mode", mNsPath.c_str());
}
}
COMMONTIMING("path::print", &tm);
eos_info("ns_path=%s fst_path=%s", mNsPath.c_str(), mFstPath.c_str());
if (mNsPath.beginswith("/replicate:")) {
if (gOFS.openedForWriting.isOpen(mFsId, mFileId)) {
eos_err("msg=\"forbid replica open, file %s opened in RW mode",
mNsPath.c_str());
return gOFS.Emsg(epname, error, ETXTBSY, "open - cannot replicate: file "
"is opened in RW mode", mNsPath.c_str());
}
isReplication = true;
}
// Check if this is an open for HTTP
if ((!mIsRW) && ((std::string(client->tident) == "http"))) {
if (gOFS.openedForWriting.isOpen(mFsId, mFileId)) {
eos_err("msg=\"forbid replica open for synchronization,file %s opened "
"in RW mode", mNsPath.c_str());
return gOFS.Emsg(epname, error, ETXTBSY, "open - cannot synchronize "
"file opened in RW mode", mNsPath.c_str());
}
}
// Get the layout object
mLayout.reset(eos::fst::LayoutPlugin::GetLayoutObject
(this, mLid, client, &error, mFstPath.c_str(), msDefaultTimeout,
mRainReconstruct));
if (mLayout == nullptr) {
int envlen;
eos_err("msg=\"unable to handle layout for %s\"", mCapOpaque->Env(envlen));
return gOFS.Emsg(epname, error, EINVAL, "open - illegal layout specified ",
mCapOpaque->Env(envlen));
}
mLayout->SetLogId(logId, client, tident);
errno = 0;
if ((mRainReconstruct && (mTpcFlag == kTpcSrcCanDo)) ||
(mTpcFlag == kTpcSrcSetup)) {
eos_info("%s", "msg=kTpcSrcSetup return SFS_OK");
return SFS_OK;
}
COMMONTIMING("creation::barrier", &tm);
OpenFileTracker::CreationBarrier creationSerialization(gOFS.runningCreation,
mFsId, mFileId);
COMMONTIMING("layout::exists", &tm);
if ((retc = mLayout->GetFileIo()->fileExists())) {
// We have to distinguish if an Exists call fails or return ENOENT, otherwise
// we might trigger an automatic clean-up of a file !!!
if (errno != ENOENT) {
return gOFS.Emsg(epname, error, EIO, "open - unable to check for existence"
" of file ", mCapOpaque->Env(envlen));
}
if (mIsRW || (mCapOpaque->Get("mgm.zerosize"))) {
if (!mIsRW && mCapOpaque->Get("mgm.zerosize")) {
// this commit should not call the versioning/atomic functionality
noAtomicVersioning = true;
}
// File does not exist, keep the create flag for writers and readers with 0-size at MGM
mIsRW = true;
isCreation = true;
openSize = 0;
mWritePosition = 0;
// Used to indicate if a file was written in the meanwhile by someone else
updateStat.st_mtime = 0;
open_mode |= SFS_O_CREAT;
create_mode |= SFS_O_MKPTH;
eos_debug("adding creation flag because of %d %d", retc, errno);
} else {
// The open will fail but the client will get a recoverable error,
// therefore it will try to read again from the other replicas.
eos_warning("open for read, local file does not exists");
return gOFS.Emsg(epname, error, ENOENT, "open, file does not exist ",
mCapOpaque->Env(envlen));
}
} else {
eos_debug("removing creation flag because of %d %d", retc, errno);
// Remove the creat flag
if (open_mode & SFS_O_CREAT) {
open_mode -= SFS_O_CREAT;
}
}
if (!isCreation) {
creationSerialization.Release();
}
// Capability access distinction
if (mIsRW) {
if (isCreation) {
if (mCapOpaque->Get("mgm.zerosize") == nullptr) {
if (!mCapOpaque->Get("mgm.access")
|| ((strcmp(mCapOpaque->Get("mgm.access"), "create")) &&
(strcmp(mCapOpaque->Get("mgm.access"), "write")) &&
(strcmp(mCapOpaque->Get("mgm.access"), "update")))) {
return gOFS.Emsg(epname, error, EPERM, "open - capability does not "
"allow to create/write/update this file", path);
}
}
} else {
if (!mCapOpaque->Get("mgm.access")
|| ((strcmp(mCapOpaque->Get("mgm.access"), "create")) &&
(strcmp(mCapOpaque->Get("mgm.access"), "write")) &&
(strcmp(mCapOpaque->Get("mgm.access"), "update")))) {
return gOFS.Emsg(epname, error, EPERM, "open - capability does not "
"allow to update/write/create this file", path);
}
}
} else {
if (!mCapOpaque->Get("mgm.access")
|| ((strcmp(mCapOpaque->Get("mgm.access"), "read")) &&
(strcmp(mCapOpaque->Get("mgm.access"), "create")) &&
(strcmp(mCapOpaque->Get("mgm.access"), "write")) &&
(strcmp(mCapOpaque->Get("mgm.access"), "update")))) {
return gOFS.Emsg(epname, error, EPERM, "open - capability does not allow "
"to read this file", path);
}
}
// Get IO priority
if (mCapOpaque->Get("mgm.iopriority")) {
std::string key;
std::string value;
std::string kv = mCapOpaque->Get("mgm.iopriority");
if (eos::common::StringConversion::SplitKeyValue(kv, key, value, ":")) {
mIoPriorityClass = ioprio_class(key);
mIoPriorityValue = ioprio_value(value);
}
}
if (mCapOpaque->Get("mgm.iobw")) {
mBandwidth = strtoull(mCapOpaque->Get("mgm.iobw"), 0, 10);
eos_info("msg=\"bandwidth limited\" bw=%d", mBandwidth);
}
// Bookingsize is only needed for file creation
if (mIsRW && isCreation) {
const char* sbookingsize = 0;
const char* stargetsize = 0;
if (!(sbookingsize = mCapOpaque->Get("mgm.bookingsize"))) {
return gOFS.Emsg(epname, error, EINVAL, "open - no booking size in capability",
mNsPath.c_str());
} else {
mBookingSize = strtoull(mCapOpaque->Get("mgm.bookingsize"), 0, 10);
if (errno == ERANGE) {
eos_err("invalid bookingsize in capability bookingsize=%s", sbookingsize);
return gOFS.Emsg(epname, error, EINVAL,
"open - invalid bookingsize in capability", mNsPath.c_str());
}
}
if ((stargetsize = mCapOpaque->Get("mgm.targetsize"))) {
mTargetSize = strtoull(mCapOpaque->Get("mgm.targetsize"), 0, 10);
if (errno == ERANGE) {
eos_err("invalid targetsize in capability targetsize=%s", stargetsize);
return gOFS.Emsg(epname, error, EINVAL,
"open - invalid targetsize in capability", mNsPath.c_str());
}
}
}
// Check if the booking size violates the min/max-size criteria
if (mBookingSize && mMaxSize) {
if (mBookingSize > mMaxSize) {
eos_err("invalid bookingsize specified - violates maximum file size criteria");
return gOFS.Emsg(epname, error, ENOSPC, "open - bookingsize violates "
"maximum allowed filesize", mNsPath.c_str());
}
}
if (mBookingSize && mMinSize) {
if (mBookingSize < mMinSize) {
eos_err("invalid bookingsize specified - violates minimum file size criteria");
return gOFS.Emsg(epname, error, ENOSPC, "open - bookingsize violates "
"minimum allowed filesize", mNsPath.c_str());
}
}
if (gOFS.mSimFmdOpenErr) {
return gOFS.Emsg(epname, error, ENOENT, "open - no FMD record found, "
"simulated error");
}
COMMONTIMING("clone::fst", &tm);
char* sCloneFST = mCapOpaque->Get("mgm.cloneFST");
int clone_create_rc = 1;
if (sCloneFST) {
std::string mc_fst_path = eos::common::FileId::FidPrefix2FullPath(sCloneFST,
mLocalPrefix.c_str());
struct stat clone_stat;
int clonerc = ::stat(mc_fst_path.c_str(), &clone_stat) ? errno : 0;
eos_info("fstpath=%s clonepath=%s clonerc=%d len=%d", mFstPath.c_str(),
mc_fst_path.c_str(), clonerc, clonerc ? -1 : clone_stat.st_size);
/* clone handling:
* if read-write and clone does not exist, create it
* if read-only switch to clone if it exists
* (note: if several clones were allowed, we'd might have to search!)
*/
if (mIsRW && clonerc != 0) { /* for RW, only if clone not yet created */
if (open_mode & SFS_O_TRUNC) {
/* rename data file to clone, it will be re-created */
clone_create_rc = ::rename(mFstPath.c_str(), mc_fst_path.c_str()) ? errno : 0;
eos_info("copy-on-write: rename %s %s rc=%d", mFstPath.c_str(),
mc_fst_path.c_str(), clone_create_rc);
} else {
/* copy data file to clone before modyfying */
char sbuff[1024];
snprintf(sbuff, sizeof(sbuff),
"cp --preserve=xattr,ownership,mode --reflink=auto %s %s",
mFstPath.c_str(), mc_fst_path.c_str());
clone_create_rc = system(sbuff);
eos_info("copy-on-write: %s rc=%d", sbuff, clone_create_rc);
}
}
}
XrdOucString oss_opaque = "";
oss_opaque += "&mgm.lid=";
oss_opaque += std::to_string(mLid).c_str();
oss_opaque += "&mgm.bookingsize=";
oss_opaque += std::to_string(mBookingSize).c_str();
if (!(val = mCapOpaque->Get("mgm.iotype"))) {
// provided by a client
if ((val = mOpenOpaque->Get("eos.iotype"))) {
oss_opaque += "&mgm.ioflag=";
oss_opaque += val;
if (std::string(val) == "csync") {
// cannot be done in the OSS
mSyncOnClose = true;
}
}
} else {
// forced by the MGM configuration
oss_opaque += "&mgm.ioflag=";
oss_opaque += val;
if (std::string(val) == "csync") {
// cannot be done in the OSS
mSyncOnClose = true;
}
}
// Open layout implementation
eos_info("fst_path=%s open-mode=%x create-mode=%x layout-name=%s oss-opaque=%s",
mFstPath.c_str(), open_mode, create_mode, mLayout->GetName(),
oss_opaque.c_str());
COMMONTIMING("layout::open", &tm);
int rc = mLayout->Open(open_mode, create_mode, oss_opaque.c_str());
COMMONTIMING("layout::opened", &tm);
if (rc) {
// If we have local errors in open we don't disable the filesystem -
// this is done by the Scrub thread if necessary!
if (mLayout->IsEntryServer() && (!isReplication)) {
eos_warning("msg=\"open error return recoverable error "
"EIO(kXR_IOError)\" fid=%08llx", mFileId);
// Clean-up before re-bouncing
if (hasCreationMode && !mRainReconstruct && !mIsInjection) {
DropAllFromMgm(mFileId, path, mRedirectManager.c_str());
}
}
return gOFS.Emsg(epname, error, EIO, "open - failed open");
}
COMMONTIMING("get::localfmd", &tm);
mFmd = gOFS.mFmdHandler->LocalGetFmd(mFileId, mFsId, isRepairRead, mIsRW,
vid.uid, vid.gid, mLid);
COMMONTIMING("resync::localfmd", &tm);
if (mFmd == nullptr) {
if (gOFS.mFmdHandler->ResyncMgm(mFsId, mFileId, mRedirectManager.c_str())) {
eos_info("msg=\"resync ok\" fsid=%lu fxid=%llx", mFsId, mFileId);
mFmd = gOFS.mFmdHandler->LocalGetFmd(mFileId, mFsId, isRepairRead,
mIsRW, vid.uid, vid.gid, mLid);
std::string dummy_xs;
int rc = 0;
if ((rc = gOFS.mFmdHandler->ResyncDisk(mFstPath.c_str(), mFsId, false, 0,
dummy_xs))) {
eos_err("msg=\"failed to resync from disk\" fsid=%lu fxid=%llx path=%s rc=%d",
mFsId, mFileId, mFstPath.c_str(), rc);
} else {
eos_info("msg=\"resync from disk\" path=%s", mFstPath.c_str());
}
} else {
eos_err("msg=\"resync failed\" fsid=%lu fxid=%08llx", mFsId, mFileId);
}
}
if (mFmd == nullptr) {
eos_err("msg=\"no FMD record found\" fsid=%lu fxid=%08llx", mFsId, mFileId);
if ((!mIsRW) || (mLayout->IsEntryServer() && (!isReplication))) {
eos_warning("failed to get FMD record return recoverable error ENOENT(kXR_NotFound)");
if (hasCreationMode && !mRainReconstruct && !mIsInjection) {
// clean-up before re-bouncing
DropAllFromMgm(mFileId, path, mRedirectManager.c_str());
}
}
// Return an error that can be recovered at the MGM
return gOFS.Emsg(epname, error, ENOENT, "open - no FMD record found");
}
// Update the fmd information for any clone objects
if (sCloneFST) {
if (mIsRW && (clone_create_rc == 0)) {
// Populate local DB (future reads need it)
unsigned long long clFid = eos::common::FileId::Hex2Fid(sCloneFST);
auto lfmd = gOFS.mFmdHandler->LocalGetFmd(clFid, mFsId, false, mIsRW,
vid.uid, vid.gid, mLid);
if (lfmd == nullptr) {
// We have an invalid FMD, drop and try again!
gOFS.mFmdHandler->LocalDeleteFmd(clFid, mFsId);
lfmd = gOFS.mFmdHandler->LocalGetFmd(clFid, mFsId, false, mIsRW,
vid.uid, vid.gid, mLid);
// FIXME: maybe we don't need to exit here?
if (!lfmd) {
return gOFS.Emsg(epname, error, ENOENT, "open unable to create FMD");
}
}
lfmd->mProtoFmd.set_checksum(mFmd->mProtoFmd.checksum());
lfmd->mProtoFmd.set_diskchecksum(mFmd->mProtoFmd.diskchecksum());
lfmd->mProtoFmd.set_mgmchecksum(mFmd->mProtoFmd.mgmchecksum());
if (!gOFS.mFmdHandler->Commit(lfmd.get())) {
eos_err("copy-on-write unable to commit meta data to local database");
(void) gOFS.Emsg(epname, this->error, EIO,
"copy-on-write - unable to commit meta data", mNsPath.c_str());
}
eos_debug("fid %lld cs %s diskcs %s mgmcs %s", lfmd->mProtoFmd.fid(),
lfmd->mProtoFmd.checksum().c_str(), lfmd->mProtoFmd.diskchecksum().c_str(),
lfmd->mProtoFmd.mgmchecksum().c_str());
} else {
eos_debug("fid %lld cs %s diskcs %s mgmcs %s", mFmd->mProtoFmd.fid(),
mFmd->mProtoFmd.checksum().c_str(), mFmd->mProtoFmd.diskchecksum().c_str(),
mFmd->mProtoFmd.mgmchecksum().c_str());
}
}
if (isCreation) {
creationSerialization.Release();
}
COMMONTIMING("layout::stat", &tm);
if (isReplication && !isCreation) {
mLayout->Stat(&updateStat);
}
if (isCreation && mBookingSize) {
COMMONTIMING("full::mutex", &tm);
// Check if the file system is full
XrdSysMutexHelper lock(gOFS.Storage->mFsFullMapMutex);
if (gOFS.Storage->mFsFullMap[mFsId]) {
if (mLayout->IsEntryServer() && (!isReplication)) {
writeErrorFlag = kOfsDiskFullError;
mLayout->Remove();
eos_warning("not enough space return recoverable error ENODEV(kXR_FSError)");
if (hasCreationMode && !mRainReconstruct && !mIsInjection) {
// clean-up before re-bouncing
DropAllFromMgm(mFileId, path, mRedirectManager.c_str());
}
// Return an error that can be recovered at the MGM
return gOFS.Emsg(epname, error, ENODEV, "open - not enough space");
}
return gOFS.Emsg(epname, error, ENOSPC, "create file - disk space "
"(headroom) exceeded fn=", mFstPath.c_str());
}
COMMONTIMING("layout::fallocate", &tm);
rc = mLayout->Fallocate(mBookingSize);
COMMONTIMING("layout::fallocated", &tm);
if (rc) {
eos_crit("msg=\"file allocation failed\" retc=%d errno=%d size=%llu",
rc, errno, mBookingSize);
if (mLayout->IsEntryServer() && (!isReplication)) {
mLayout->Remove();
eos_warning("msg=\"not enough space i.e file allocation failed, return "
"recoverable error ENODEV(kXR_FSError)\" fxid=%08llx",
mFileId);
if (hasCreationMode && !mRainReconstruct && !mIsInjection) {
// clean-up before re-bouncing
DropAllFromMgm(mFileId, path, mRedirectManager.c_str());
}
// Return an error that can be recovered at the MGM
return gOFS.Emsg(epname, error, ENODEV, "open - file allocation failed");
}
mLayout->Remove();
return gOFS.Emsg(epname, error, ENOSPC, "open - cannot allocate "
"required space", mNsPath.c_str());
}
}
if (isCreation) {
gOFS.mFmdHandler->Commit(mFmd.get());
}
if (!isCreation) {
COMMONTIMING("layout::stat", &tm);
// Get the real size of the file, not the local stripe size!
struct stat statinfo {};
if ((retc = mLayout->Stat(&statinfo))) {
return gOFS.Emsg(epname, error, EIO,
"open - cannot stat layout to determine file size", mNsPath.c_str());
}
// We feed the layout size, not the physical on disk!
eos_info("msg=\"layout size\" disk_size=%zu db_size= %llu",
statinfo.st_size, mFmd->mProtoFmd.size());
openSize = mFmd->mProtoFmd.size();
mWritePosition = openSize;
if (!eos::common::LayoutId::IsRain(mLayout->GetLayoutId())) {
// If replica layout and physical size of replica difference from the
// fmd_size it means the file is being written to, so we save the actual
// size from disk.
if ((off_t) statinfo.st_size != (off_t) mFmd->mProtoFmd.size()) {
openSize = statinfo.st_size;
mWritePosition = openSize;
}
}
// Preset with the last known checksum
if (mCheckSum && mIsRW && !IsChunkedUpload()) {
eos_info("msg=\"checksum reset init\" file-xs=%s",
mFmd->mProtoFmd.checksum().c_str());
mCheckSum->ResetInit(0, openSize, mFmd->mProtoFmd.checksum().c_str());
}
}
// For RAIN layouts we enable full file checksum only at the entry server for
// write operations. For the rest of the cases we rely on the block and parity
// checking.
if (eos::common::LayoutId::IsRain(mLid) && !(mIsRW &&
mLayout->IsEntryServer())) {
mCheckSum.reset(nullptr);
}
// Set the eos lfn as extended attribute
std::unique_ptr io
(FileIoPlugin::GetIoObject(mLayout->GetLocalReplicaPath(), this));
COMMONTIMING("fileio::object", &tm);
if (mIsRW) {
if (mNsPath.beginswith("/replicate:") || mNsPath.beginswith("/fusex-open")) {
if (mCapOpaque->Get("mgm.path")) {
XrdOucString unsealedpath = mCapOpaque->Get("mgm.path");
XrdOucString sealedpath = path;
if (io->attrSet(std::string("user.eos.lfn"),
std::string(unsealedpath.c_str()))) {
eos_err("unable to set extended attribute errno=%d", errno);
}
} else {
eos_err("msg=\"no lfn in replication capability\"");
}
} else {
if (io->attrSet(std::string("user.eos.lfn"), std::string(mNsPath.c_str()))) {
eos_err("unable to set extended attribute errno=%d", errno);
}
}
}
// For reading of replica file check for xs errors
if (!mIsRW) {
if ((eos::common::LayoutId::GetLayoutType(mLid) ==
eos::common::LayoutId::kReplica) &&
gOFS.mFmdHandler->FileHasXsError(mLayout->GetLocalReplicaPath(), mFsId)) {
eos_err("msg=\"open failed due to checksum mismatch\" path=%s",
mNsPath.c_str());
return gOFS.Emsg(epname, error, EIO, "open - replica checksum mismatch",
mNsPath.c_str());
}
}
COMMONTIMING("open::accounting", &tm);
if (mIsRW) {
gOFS.openedForWriting.up(mFsId, mFileId);
} else {
gOFS.openedForReading.up(mFsId, mFileId);
}
mOpened = true;
COMMONTIMING("end", &tm);
timeToOpen = tm.RealTime();
// report slow open as errors if longer than 1000ms
if (timeToOpen > 1000) {
eos_err("slow open operation: open-duration=%.03fms path='%s' fxid=%08llx %s",
timeToOpen,
mNsPath.c_str(), mFileId, tm.Dump().c_str());
}
eos_info("open-duration=%.03fms path='%s' fxid=%08llx %s", timeToOpen,
mNsPath.c_str(), mFileId, tm.Dump().c_str());
return SFS_OK;
}
//------------------------------------------------------------------------------
// Pre-read into file system cache
//------------------------------------------------------------------------------
int
XrdFstOfsFile::read(XrdSfsFileOffset fileOffset, XrdSfsXferSize amount)
{
int rc = XrdOfsFile::read(fileOffset, amount);
eos_debug("rc=%d offset=%lu size=%llu", rc, fileOffset, amount);
return rc;
}
//------------------------------------------------------------------------------
// Read AIO - not supported
//------------------------------------------------------------------------------
int
XrdFstOfsFile::read(XrdSfsAio* aioparm)
{
return gOFS.Emsg("read_aio", error, ENOTSUP, "read aio - operation not "
"supported");
}
//------------------------------------------------------------------------------
// Read from file
//------------------------------------------------------------------------------
XrdSfsXferSize
XrdFstOfsFile::read(XrdSfsFileOffset fileOffset, char* buffer,
XrdSfsXferSize buffer_size)
{
gettimeofday(&rStart, &tz);
// use RR scheduling if there is a round-robin app name
std::mutex* mutex = 0;
if (!mAppRR.empty()) {
if (mIsRW) {
mutex = gOFS.openedForWriting.scheduleRR(mFsId, mAppRR);
} else {
mutex = gOFS.openedForReading.scheduleRR(mFsId, mAppRR);
}
}
auto lockScope = (mutex == nullptr) ?
std::unique_lock() :
std::unique_lock(*mutex);
eos_debug("fileOffset=%lli, buffer_size=%i", fileOffset, buffer_size);
// EPNAME("read");
if (mTpcFlag == kTpcSrcRead) {
if (!(rCalls % 10)) {
if (!TpcValid()) {
eos_err("msg=\"tcp interrupted by control-c - cancel tcp read\" key=%s",
mTpcKey.c_str());
return gOFS.Emsg("read", error, EINTR, "read - tpc transfer interrupted"
" by client disconnect", FName());
}
}
}
if (mBandwidth) {
gettimeofday(¤tTime, &tz);
float abs_time = static_cast((currentTime.tv_sec -
openTime.tv_sec) * 1000 +
(currentTime.tv_usec - openTime.tv_usec) / 1000);
//........................................................................
// Regulate the io - sleep as desired
//........................................................................
float exp_time = totalBytes / mBandwidth / 1000.0;
if (abs_time < exp_time) {
msSleep += (exp_time - abs_time);
std::int64_t thisSleep = msSleep;
std::this_thread::sleep_for(std::chrono::milliseconds(thisSleep));
}
}
int rc = mLayout->Read(fileOffset, buffer, buffer_size);
eos_debug("layout read %d checkSum %d", rc, mCheckSum.get());
/* maintaining a checksum is tricky if there have been writes,
* but the read + append case can be supported in "Add" */
if ((rc > 0) && (mCheckSum) && (!mHasWrite)) {
XrdSysMutexHelper cLock(mChecksumMutex);
mCheckSum->Add(buffer, static_cast(rc),
static_cast(fileOffset));
}
if (rc > 0) {
// if required, unobfuscate a buffer server side
if (mLayout->IsEntryServer() && mHmac.key.length()) {
eos::common::SymKey::UnobfuscateBuffer(const_cast(buffer), rc,
fileOffset, mHmac);
}
if (mLayout->IsEntryServer() || eos::common::LayoutId::IsRain(mLid)) {
XrdSysMutexHelper vecLock(vecMutex);
rvec.push_back(rc);
}
rOffset = fileOffset + rc;
totalBytes += rc;
}
if (rc < 0) {
// Here we might take some other action
int envlen = 0;
eos_crit("block-read error=%d offset=%llu len=%llu file=%s",
error.getErrInfo(),
static_cast(fileOffset),
static_cast(buffer_size),
FName(), mCapOpaque ? mCapOpaque->Env(envlen) : FName());
// Used to understand if a reconstruction of a RAIN file worked
hasReadError = true;
}
eos_debug("rc=%d offset=%lu size=%llu", rc, fileOffset,
static_cast(buffer_size));
if ((fileOffset + buffer_size) >= openSize) {
if (mCheckSum && (!mHasWrite)) {
/* even if there were only reads up to here the file may still be modified if opened R/W. As
* VerifyChecksum "finalises" the context, this has to be handled in write anyway;
* but not finalising now speeds up the slightly less marginal case of "write a lot" +
* "read a little" + "write a little" (seen in "git") */
if (!mCheckSum->NeedsRecalculation()) {
// If this is the last read of sequential reading, we can verify
// the checksum now (unless we're writing as well)
if (VerifyChecksum()) {
return gOFS.Emsg("read", error, EIO, "read file - wrong file "
"checksum fn=", FName());
}
}
}
}
AddLayoutReadTime();
return rc;
}
//----------------------------------------------------------------------------
// Read file pages into a buffer and return corresponding checksums
//----------------------------------------------------------------------------
XrdSfsXferSize
XrdFstOfsFile::pgRead(XrdSfsFileOffset offset, char* buffer,
XrdSfsXferSize rdlen, uint32_t* csvec, uint64_t opts)
{
eos_debug("offset=%lli len=%i", offset, rdlen);
XrdSfsXferSize bytes;
// Read the data into the buffer
if ((bytes = read(offset, buffer, rdlen)) <= 0) {
return bytes;
}
// Generate the crc's
XrdOucPgrwUtils::csCalc(buffer, offset, bytes, csvec);
return bytes;
}
//------------------------------------------------------------------------------
// Read file pages and checksums using asynchronous I/O - NOT SUPPORTED
//------------------------------------------------------------------------------
int
XrdFstOfsFile::pgRead(XrdSfsAio* aioparm, uint64_t opts)
{
return gOFS.Emsg("pgRead_aio", error, ENOTSUP, "pgRead aio - operation not "
"supported");
}
//------------------------------------------------------------------------------
// Vector read
//------------------------------------------------------------------------------
XrdSfsXferSize
XrdFstOfsFile::readv(XrdOucIOVec* readV, int readCount)
{
eos_debug("msg=\"readv request\" count=%i", readCount);
gettimeofday(&rvStart, &tz);
std::string output_init, output_final;
auto print_readv_request = [](XrdOucIOVec * readv, int num_chunks) {
std::ostringstream oss;
for (int i = 0; i < num_chunks; ++i) {
oss << "index=" << i
<< " offset=" << readv[i].offset
<< " length=" << readv[i].size
<< std::endl;
}
return oss.str();
};
if (EOS_LOGS_DEBUG) {
output_init = print_readv_request(readV, readCount);
eos_debug("msg=\"initial readv request\" obj=%p content=\"%s\"",
readV, output_init.c_str());
}
// Copy the XrdOucIOVec structure to XrdCl::ChunkList
uint32_t total_read = 0;
XrdCl::ChunkList chunkList;
chunkList.reserve(readCount);
for (int i = 0; i < readCount; ++i) {
total_read += (uint32_t)readV[i].size;
chunkList.push_back(XrdCl::ChunkInfo((uint64_t)readV[i].offset,
(uint32_t)readV[i].size,
(void*)readV[i].data));
}
int64_t rv = mLayout->ReadV(chunkList, total_read);
totalBytes += rv;
if (EOS_LOGS_DEBUG) {
output_final = print_readv_request(readV, readCount);
eos_debug("msg=\"final readv request\" obj=%p content=\"%s\"",
readV, output_final.c_str());
if (output_init != output_final) {
eos_crit("%s", "msg=\"readv object corrupted\"");
}
}
AddLayoutReadVTime();
return rv;
}
//------------------------------------------------------------------------------
// Write to file
//------------------------------------------------------------------------------
XrdSfsXferSize
XrdFstOfsFile::write(XrdSfsFileOffset fileOffset, const char* buffer,
XrdSfsXferSize buffer_size)
{
gettimeofday(&wStart, &tz);
if (gOFS.mSimUnresponsive) {
eos_warning("simulating unresponsiveness in write delaying by 120s");
std::this_thread::sleep_for(std::chrono::seconds(120));
}
if (mIsDevNull) {
eos_debug("offset=%llu, length=%li discarded for sink file", fileOffset,
buffer_size);
mMaxOffsetWritten = fileOffset + buffer_size;
AddLayoutWriteTime();
return buffer_size;
}
{
// use global RR serialization (we just use fsid 0 for that)
std::mutex* mutex = 0;
if (!mAppRR.empty()) {
if (mIsRW) {
mutex = gOFS.openedForWriting.scheduleRR(0, mAppRR);
} else {
mutex = gOFS.openedForReading.scheduleRR(0, mAppRR);
}
}
auto lockScope = (mutex == nullptr) ?
std::unique_lock() :
std::unique_lock(*mutex);
}
// use RR scheduling if there is a round-robin app name per filesystem
std::mutex* mutex = 0;
if (!mAppRR.empty()) {
if (mIsRW) {
mutex = gOFS.openedForWriting.scheduleRR(mFsId, mAppRR);
} else {
mutex = gOFS.openedForReading.scheduleRR(mFsId, mAppRR);
}
}
auto lockScope = (mutex == nullptr) ?
std::unique_lock() :
std::unique_lock(*mutex);
if (mBandwidth) {
gettimeofday(¤tTime, &tz);
float abs_time = static_cast((currentTime.tv_sec -
openTime.tv_sec) * 1000 +
(currentTime.tv_usec - openTime.tv_usec) / 1000);
//........................................................................
// Regulate the io - sleep as desired
//........................................................................
float exp_time = totalBytes / mBandwidth / 1000.0;
if (abs_time < exp_time) {
msSleep += (exp_time - abs_time);
std::int64_t thisSleep = msSleep;
std::this_thread::sleep_for(std::chrono::milliseconds(thisSleep));
}
}
// if the write position moves the checksum is dirty
if (mCheckSum) {
if (mWritePosition != (unsigned long long)fileOffset) {
mCheckSum->SetDirty();
}
// store next write position
mWritePosition = fileOffset + buffer_size;
}
// if required, obfuscate a buffer server side
if (mLayout->IsEntryServer() && mHmac.key.length()) {
eos::common::SymKey::ObfuscateBuffer(const_cast(buffer),
const_cast(buffer), buffer_size, fileOffset, mHmac);
}
int rc = mLayout->Write(fileOffset, const_cast(buffer), buffer_size);
// If we see a remote IO error, we don't fail, we just call repair afterwards,
// only for replica layouts and not for FuseX clients
if ((rc < 0) && isCreation && (!mFusex) &&
(mLayout->GetErrObj()->getErrInfo() == EREMOTEIO) &&
(eos::common::LayoutId::GetLayoutType(mLid) ==
eos::common::LayoutId::kReplica)) {
repairOnClose = true;
rc = buffer_size;
}
// Evt. add checksum
if (rc > 0) {
if (mCheckSum) {
XrdSysMutexHelper cLock(mChecksumMutex);
mCheckSum->Add(buffer, static_cast(rc),
static_cast(fileOffset));
}
totalBytes += rc;
if (static_cast(fileOffset + buffer_size) >
static_cast(mMaxOffsetWritten)) {
mMaxOffsetWritten = (fileOffset + buffer_size);
}
}
eos_debug("rc=%d offset=%lu size=%lu", rc, fileOffset,
static_cast(buffer_size));
if (rc < 0) {
int envlen = 0;
if (!hasWriteError || EOS_LOGS_DEBUG) {
eos_crit("block-write error=%d offset=%llu len=%llu file=%s",
mLayout->GetErrObj()->getErrInfo(),
static_cast(fileOffset),
static_cast(buffer_size),
FName(), mCapOpaque ? mCapOpaque->Env(envlen) : FName());
}
hasWriteError = true;
} else {
mHasWrite = true;
if (mLayout->IsEntryServer() || isReplication) {
XrdSysMutexHelper lock(vecMutex);
wvec.push_back(rc);
}
}
if (rc < 0) {
int envlen = 0;
// Indicate the deletion flag for write errors
mWrDelete = true;
XrdOucString errdetail;
if (isCreation) {
XrdOucString newerr;
// Add to the error message that this file has been removed after the error,
// which happens for creations
newerr = error.getErrText();
if (writeErrorFlag == kOfsSimulatedIoError) {
// Simulated IO error
errdetail += " => file has been removed because of a simulated IO error";
} else {
if (writeErrorFlag == kOfsDiskFullError) {
// Disk full error
errdetail +=
" => file has been removed because the target filesystem was full";
} else {
if (writeErrorFlag == kOfsMaxSizeError) {
// Maximum file size error
errdetail += " => file has been removed because the maximum target "
"filesize defined for that subtree was exceeded (maxsize=";
char smaxsize[16];
snprintf(smaxsize, sizeof(smaxsize) - 1, "%llu", (unsigned long long) mMaxSize);
errdetail += smaxsize;
errdetail += " bytes)";
} else {
if (writeErrorFlag == kOfsIoError) {
// Generic IO error
errdetail +=
" => file has been removed due to an IO error on the target filesystem";
} else {
errdetail += " => file has been removed due to an IO error (unspecified)";
}
}
}
}
newerr += errdetail.c_str();
error.setErrData(newerr.c_str());
}
eos_err("block-write error=%d offset=%llu len=%llu file=%s error=\"%s\"",
error.getErrInfo(),
(unsigned long long) fileOffset,
(unsigned long long) buffer_size, FName(),
mCapOpaque ? mCapOpaque->Env(envlen) : FName(),
errdetail.c_str());
}
AddLayoutWriteTime();
return rc;
}
//------------------------------------------------------------------------------
// Write AIO - no supported
//------------------------------------------------------------------------------
int
XrdFstOfsFile::write(XrdSfsAio* aioparm)
{
return gOFS.Emsg("write_aio", error, ENOTSUP, "write aio - operation not "
"supported");
}
//----------------------------------------------------------------------------
//! Write file pages into a file with corresponding checksums.
//----------------------------------------------------------------------------
XrdSfsXferSize
XrdFstOfsFile::pgWrite(XrdSfsFileOffset offset, char* buffer,
XrdSfsXferSize wrlen, uint32_t* csvec, uint64_t opts)
{
eos_debug("offset=%lli len=%i", offset, wrlen);
// If we have a checksum vector and verify is on, do verification.
if (opts & Verify) {
XrdOucPgrwUtils::dataInfo dInfo(buffer, csvec, offset, wrlen);
off_t badoff;
int badlen;
if (!XrdOucPgrwUtils::csVer(dInfo, badoff, badlen)) {
char eMsg[512];
snprintf(eMsg, sizeof(eMsg), "Checksum error at offset %lld",
(long long) badoff);
error.setErrInfo(EDOM, eMsg);
return SFS_ERROR;
}
}
return write(offset, buffer, wrlen);
}
//------------------------------------------------------------------------------
// Write file pages and checksums using asynchronous I/O - NOT SUPPORTED
//------------------------------------------------------------------------------
int
XrdFstOfsFile::pgWrite(XrdSfsAio* aioparm, uint64_t opts)
{
return gOFS.Emsg("pgWrite_aio", error, ENOTSUP, "pgWrite aio - operation not "
"supported");
}
//------------------------------------------------------------------------------
// Get file stat information
//------------------------------------------------------------------------------
int
XrdFstOfsFile::stat(struct stat* buf)
{
EPNAME("stat");
int rc = SFS_OK;
if (mIsDevNull) {
buf->st_size = mMaxOffsetWritten;
return rc;
}
if (mRainReconstruct) {
buf->st_size = mRainSize;
return rc;
}
if (mLayout) {
if ((rc = mLayout->Stat(buf)))
rc = gOFS.Emsg(epname, error, EIO, "stat - cannot stat layout to determine"
" file size ", mNsPath.c_str());
} else {
rc = gOFS.Emsg(epname, error, ENXIO, "stat - no layout to determine file size ",
mNsPath.c_str());
}
// store the file id as inode number
if (!rc) {
buf->st_ino = eos::common::FileId::FidToInode(mFileId);
}
// we store the mtime.ns time in st_dev ... sigh@Xrootd ...
#ifdef __APPLE__
unsigned long nsec = buf->st_mtimespec.tv_nsec;
#else
unsigned long nsec = buf->st_mtim.tv_nsec;
#endif
// mask for 10^9
nsec &= 0x7fffffff;
// enable bit 32 as indicator
nsec |= 0x80000000;
// overwrite st_dev
buf->st_dev = nsec;
#ifdef __APPLE__
eos_info("path=%s fxid=%08llx size=%lu mtime=%lu.%lu", mNsPath.c_str(), mFileId,
(unsigned long) buf->st_size, buf->st_mtimespec.tv_sec,
buf->st_dev & 0x7ffffff);
#else
eos_info("path=%s fxid=%08llx size=%lu mtime=%lu.%lu", mNsPath.c_str(), mFileId,
(unsigned long) buf->st_size, buf->st_mtim.tv_sec, buf->st_dev & 0x7ffffff);
#endif
return rc;
}
//------------------------------------------------------------------------------
// Sync file
//------------------------------------------------------------------------------
int
XrdFstOfsFile::sync()
{
static const int cbWaitTime = 3600;
// TPC transfer
if (mTpcFlag == kTpcDstSetup) {
XrdSysMutexHelper scope_lock(&mTpcJobMutex);
if (mTpcState == kTpcIdle) {
eos_info("msg=\"tpc enabled -> 1st sync\"");
mTpcThreadStatus = XrdSysThread::Run(&mTpcThread,
XrdFstOfsFile::StartDoTpcTransfer,
static_cast(this), XRDSYSTHREAD_HOLD,
"TPC Transfer Thread");
if (mTpcThreadStatus == 0) {
mTpcState = kTpcRun;
scope_lock.UnLock();
return SFS_OK;
} else {
eos_err("msg=\"failed to start TPC job thread\"");
mTpcState = kTpcDone;
if (mTpcInfo.Key) {
free(mTpcInfo.Key);
}
mTpcInfo.Key = strdup("Copy failed, could not start job");
return mTpcInfo.Fail(&error, "could not start job", ECANCELED);
}
} else if (mTpcState == kTpcRun) {
eos_info("msg=\"tpc running -> 2nd sync\"");
if (mTpcInfo.SetCB(&error)) {
return SFS_ERROR;
}
error.setErrCode(cbWaitTime);
mTpcInfo.Engage();
return SFS_STARTED;
} else if (mTpcState == kTpcDone) {
eos_info("msg=\"tpc already finished, retc=%i\"", mTpcRetc);
if (mTpcRetc) {
error.setErrInfo(mTpcRetc, (mTpcInfo.Key ? mTpcInfo.Key : "failed tpc"));
return SFS_ERROR;
} else {
return SFS_OK;
}
} else {
eos_err("msg=\"unknown tpc state\"");
error.setErrInfo(EINVAL, "unknown TPC state");
return SFS_ERROR;
}
} else {
// Standard file sync
return mLayout->Sync();
}
}
//------------------------------------------------------------------------------
// Sync AIO - no supported
//------------------------------------------------------------------------------
int
XrdFstOfsFile::sync(XrdSfsAio* aiop)
{
return gOFS.Emsg("sync_aio", error, ENOTSUP, "sync aio - operation not "
"supported");
}
//------------------------------------------------------------------------------
// Truncate file
//------------------------------------------------------------------------------
int
XrdFstOfsFile::truncate(XrdSfsFileOffset fsize)
{
eos_info("openSize=%llu fsize=%llu ", openSize, fsize);
if (mIsDevNull) {
return SFS_OK;
}
if (fsize != openSize) {
if (mCheckSum) {
if (mWritePosition != (unsigned long long)fsize) {
mCheckSum->SetDirty();
}
}
}
int rc = mLayout->Truncate(fsize);
if (!rc) {
if (fsize != openSize) {
mHasWrite = true;
}
mWritePosition = fsize;
}
return rc;
}
//------------------------------------------------------------------------------
// Close file
//------------------------------------------------------------------------------
int
XrdFstOfsFile::close()
{
gettimeofday(&closeStart, &tz);
if (gOFS.mSimUnresponsive) {
eos_warning("%s", "msg=\"simulating unresponsiveness unsing 120s delay\"");
std::this_thread::sleep_for(std::chrono::seconds(120));
}
// Reset the error.getErrInfo() value to 0 since this was hijacked by the
// XrdXrootdFile object to store the actual file descriptor corresponding to
// the current object. This was confusing when logging the error.getErrInfo()
// value at the end of the close.
error.setErrCode(0);
static bool async_close_cfg = IsAsyncCloseConfigured();
if (!async_close_cfg || DoSyncClose()) {
return _close();
}
// Delegate close to a different thread while the client is waiting for the
// callback (SFS_STARTED). This only happens for written files with size
// bigger than min size bytes.
eos_info("msg=\"close delegated to async thread\" fxid=%08llx "
"ns_path=\"%s\" fs_path=\"%s\"", mFileId, mNsPath.c_str(),
mFstPath.c_str());
// Create a close callback and put the client in waiting mode
auto closeCb = std::make_shared();
closeCb->Init(&error);
error.setErrInfo(1800, "delay client up to 30 minutes");
gOFS.mCloseThreadPool.PushTask([&, closeCb]() -> void {
eos_info("msg=\"doing close in the async thread\" fxid=%08llx", mFileId);
int rc = _close();
auto fileId = mFileId;
// During Reply() we expect the enclosing XrdFstOfsFile to be destroyed,
// so we don't refer to anything captured by reference once done
int reply_rc = closeCb->Reply(rc, (rc ? error.getErrInfo() : 0),
(rc ? error.getErrText() : ""));
if (reply_rc == 0)
{
eos_err("%s", "msg=\"callback reply failed\" fid=%llu", fileId);
}
});
return SFS_STARTED;
}
//------------------------------------------------------------------------------
// Close file - internal method
//------------------------------------------------------------------------------
int
XrdFstOfsFile::_close()
{
EPNAME("_close");
int rc = 0; // return code
int brc = 0; // return code before 'close' has been called
bool checksumerror = false;
bool targetsizeerror = false;
bool minimumsizeerror = false;
bool queueingerror = false;
bool commited_to_mgm = false;
bool consistencyerror = false;
bool atomicoverlap = false;
std::string queueing_errmsg;
std::string archive_req_id;
// Any close on a file opened in TPC mode invalidates tpc keys
if (mTpcKey.length()) {
{
XrdSysMutexHelper tpcLock(gOFS.TpcMapMutex);
if (gOFS.TpcMap[mIsTpcDst].count(mTpcKey)) {
eos_info("msg=\"remove tpc key\" key=%s", mTpcKey.c_str());
gOFS.TpcMap[mIsTpcDst].erase(mTpcKey);
try {
gOFS.TpcMap[mIsTpcDst].resize(0);
} catch (const std::length_error& e) {}
}
}
if (mTpcFlag == kTpcDstSetup) {
if (!mTpcThreadStatus) {
int retc = XrdSysThread::Join(mTpcThread, NULL);
eos_debug("TPC job join returned %i", retc);
} else {
eos_warning("TPC job was never started successfully");
}
}
}
if (mIsDevNull) {
eos_debug("msg=\"closing sink file i.e. /dev/null\"");
closed = true;
return SFS_OK;
}
// Set default workflow if nothing is specified
if (mEventWorkflow.length() == 0) {
mEventWorkflow = "default";
}
// We enter the close logic only once since there can be an explicit close or
// a close via the destructor
if (mOpened && (!closed) && mFmd) {
// Check if the file close comes from a client disconnect e.g. the destructor
const std::string hex_fid = eos::common::FileId::Fid2Hex(mFmd->mProtoFmd.fid());
XrdOucString capOpaqueString = "/?mgm.pcmd=drop";
XrdOucString OpaqueString = "";
OpaqueString += "&mgm.fsid=";
OpaqueString += (int) mFmd->mProtoFmd.fsid();
OpaqueString += "&mgm.fid=";
OpaqueString += hex_fid.c_str();
XrdOucEnv Opaque(OpaqueString.c_str());
capOpaqueString += OpaqueString;
eos_info("viaDelete=%d writeDelete=%d", viaDelete, mWrDelete);
bool issinglewriter = (gOFS.openedForWriting.getUseCount(mFmd->mProtoFmd.fsid(),
mFmd->mProtoFmd.fid()) <= 1);
if ((viaDelete || mWrDelete) &&
((isCreation || (isReplication && mIsRW) || mIsInjection ||
IsChunkedUpload()) && (!mFusex))) {
if (issinglewriter) {
// It is closed by the destructor e.g. no proper close
// or the specified checksum does not match the computed one
if (viaDelete) {
eos_info("msg=\"(unpersist): deleting file\" reason=\"client disconnect\""
" fsid=%lu fxid=%08llx", mFmd->mProtoFmd.fsid(), mFmd->mProtoFmd.fid());
}
if (mWrDelete) {
eos_info("msg=\"(unpersist): deleting file\" reason=\"write/policy error\""
" fsid=%lu fxid=%08llx", mFmd->mProtoFmd.fsid(), mFmd->mProtoFmd.fid());
}
// Delete the file - set the file to be deleted
deleteOnClose = true;
mLayout->Remove();
if (mLayout->IsEntryServer() && (!isReplication) && (!mIsInjection) &&
(!mRainReconstruct)) {
capOpaqueString += "&mgm.dropall=1";
}
// Delete the replica in the MGM
XrdOucErrInfo lerror;
if (gOFS.CallManager(&lerror, mCapOpaque->Get("mgm.path"),
mCapOpaque->Get("mgm.manager"), capOpaqueString)) {
eos_warning("(unpersist): unable to drop file id %s fsid %u at manager %s",
hex_fid.c_str(), mFmd->mProtoFmd.fid(), mCapOpaque->Get("mgm.manager"));
}
} else {
eos_info("msg=\"(unpersist): suppressing delete on close\" reason=\"several writers\""
" fsid=%lu fxid=%08llx", mFmd->mProtoFmd.fsid(), mFmd->mProtoFmd.fid());
}
} else {
// Check if this was a newly created file
if (isCreation) {
// If we had space allocation we have to truncate the allocated space to
// the real size of the file
if (eos::common::LayoutId::IsRain(mLayout->GetLayoutId())) {
// the entry server has to truncate only if this is not a recovery action
if (mLayout->IsEntryServer() && !mRainReconstruct) {
eos_info("msg=\"truncate RAIN layout\" truncate-offset=%llu",
mMaxOffsetWritten);
mLayout->Truncate(mMaxOffsetWritten);
}
//@note: there is a small probability here to have a race condition when
// computing the checksum for RAIN file in non-streaming mode. We should
// first collect all write replies and then re-read the file for the xs.
} else {
if ((long long) mMaxOffsetWritten > (long long) openSize) {
// Check if we have to deallocate something for this file transaction
if ((mBookingSize) && (mBookingSize > (long long) mMaxOffsetWritten)) {
eos_info("deallocationg %llu bytes", mBookingSize - mMaxOffsetWritten);
mLayout->Truncate(mMaxOffsetWritten);
// We have evt. to deallocate blocks which have not been written
mLayout->Fdeallocate(mMaxOffsetWritten, mBookingSize);
}
}
}
}
checksumerror = VerifyChecksum();
targetsizeerror = (mTargetSize) ? (mTargetSize != (off_t) mMaxOffsetWritten) :
false;
if (isCreation) {
// Check that the minimum file size policy is met!
minimumsizeerror = (mMinSize) ? ((off_t) mMaxOffsetWritten < mMinSize) : false;
if (minimumsizeerror) {
eos_warning("written file %s is smaller than required minimum file "
"size=%llu written=%llu", mNsPath.c_str(), mMinSize,
mMaxOffsetWritten);
}
}
if (eos::common::LayoutId::IsRain(mLayout->GetLayoutId())) {
// For RAID-like layouts don't do this check
targetsizeerror = false;
minimumsizeerror = false;
}
eos_debug("checksumerror=%i targetsizerror=%i "
"mMaxOffsetWritten=%zu targetsize=%lli",
checksumerror, targetsizeerror, mMaxOffsetWritten, mTargetSize);
// ---- add error simulation for checksum errors on read
if ((!mIsRW) && gOFS.mSimXsReadErr) {
checksumerror = true;
eos_warning("%s", "msg=\"simulating checksum errors on read\"");
}
// ---- add error simulation for checksum errors on write
if (mIsRW && gOFS.mSimXsWriteErr) {
checksumerror = true;
eos_warning("%s", "msg=\"simulating checksum errors on write\"");
}
if (mIsRW && (checksumerror || targetsizeerror || minimumsizeerror)) {
// Checksum error: checksum was preset and does not match
// Target size error: target size was preset and does not match
// Minimum size error: target minimum size was preset and does not match
// Set the file to be deleted
deleteOnClose = true;
mLayout->Remove();
if (mLayout->IsEntryServer() && (!isReplication) && (!mIsInjection) &&
(!mRainReconstruct)) {
capOpaqueString += "&mgm.dropall=1";
}
// Delete the replica in the MGM
XrdOucErrInfo lerror;
if (gOFS.CallManager(&lerror, mCapOpaque->Get("mgm.path"),
mCapOpaque->Get("mgm.manager"), capOpaqueString)) {
eos_warning("(unpersist): unable to drop file id %s fsid %u at manager %s",
hex_fid.c_str(), mFmd->mProtoFmd.fid(), mCapOpaque->Get("mgm.manager"));
}
}
// First we assume that, if we have writes, we update it
closeSize = openSize;
if ((!checksumerror) && (!minimumsizeerror) &&
(!targetsizeerror) &&
(mHasWrite || isCreation) &&
(!mRainReconstruct || !hasReadError)) {
// Commit meta data
struct stat statinfo;
if ((rc = mLayout->Stat(&statinfo))) {
rc = gOFS.Emsg(epname, error, EIO, "close - cannot stat closed layout"
" to determine file size", mNsPath.c_str());
}
if (!rc) {
// Attempt archive queueing if tape support enabled
if (mTapeEnabled && isCreation && mSyncEventOnClose &&
mLayout->IsEntryServer() &&
mEventWorkflow != common::RETRIEVE_WRITTEN_WORKFLOW_NAME) {
// Queueing error: queueing for archive failed
queueingerror = !QueueForArchiving(statinfo, queueing_errmsg, archive_req_id);
if (queueingerror) {
deleteOnClose = true;
mLayout->Remove();
if (mLayout->IsEntryServer()) {
capOpaqueString += "&mgm.dropall=1";
}
// Delete the replica in the MGM
XrdOucErrInfo lerror;
if (gOFS.CallManager(&lerror, mCapOpaque->Get("mgm.path"),
mCapOpaque->Get("mgm.manager"), capOpaqueString)) {
eos_warning("(unpersist): unable to drop file id %s fsid %u at manager %s",
hex_fid.c_str(), mFmd->mProtoFmd.fid(), mCapOpaque->Get("mgm.manager"));
}
}
}
if (!queueingerror && ((statinfo.st_size == 0) || mHasWrite)) {
// Update size
closeSize = statinfo.st_size;
mFmd->mProtoFmd.set_size(statinfo.st_size);
mFmd->mProtoFmd.set_disksize
(eos::common::LayoutId::ExpectedStripeSize(mLid, statinfo.st_size));
// Reset the diskchecksum after an update otherwise we might falsely report
// a checksum error. The diskchecksum will be updated by the scanner.
mFmd->mProtoFmd.set_diskchecksum("");
mFmd->mProtoFmd.set_mgmsize(eos::common::FmdHelper::UNDEF);
mFmd->mProtoFmd.set_mgmchecksum(""); // now again empty
mFmd->mProtoFmd.set_layouterror(0); // reset layout errors
mFmd->mProtoFmd.set_locations(""); // reset locations
mFmd->mProtoFmd.set_filecxerror(0);
mFmd->mProtoFmd.set_blockcxerror(0);
mFmd->mProtoFmd.set_locations(""); // reset locations
mFmd->mProtoFmd.set_mtime(statinfo.st_mtime);
#ifdef __APPLE__
mFmd->mProtoFmd.set_mtime_ns(0);
#else
mFmd->mProtoFmd.set_mtime_ns(statinfo.st_mtim.tv_nsec);
#endif
// Set the container id
mFmd->mProtoFmd.set_cid(mCid);
if (mCapOpaque->Get("mgm.source.lid")) {
try {
std::string data = mCapOpaque->Get("mgm.source.lid");
eos::common::LayoutId::layoutid_t src_lid = std::stoul(data);
mFmd->mProtoFmd.set_lid(src_lid);
// For RAIN files size in local db is the size of the logical
// file and not the size of the current stripe on disk
if (mCapOpaque->Get("mgm.bookingsize") && isReplication &&
eos::common::LayoutId::IsRain(src_lid)) {
data = mCapOpaque->Get("mgm.bookingsize");
mFmd->mProtoFmd.set_size(std::stoull(data));
}
} catch (...) {
eos_err("msg=\"failure to convert layout id or bookingsize\" "
"lid=\"%s\" bookingsize=\"%s\"",
mCapOpaque->Get("mgm.source.lid"),
mCapOpaque->Get("mgm.bookingsize"));
}
}
if (mCapOpaque->Get("mgm.source.ruid")) {
mFmd->mProtoFmd.set_uid(atoi(mCapOpaque->Get("mgm.source.ruid")));
}
if (mCapOpaque->Get("mgm.source.rgid")) {
mFmd->mProtoFmd.set_gid(atoi(mCapOpaque->Get("mgm.source.rgid")));
}
// Commit local
try {
if (!gOFS.mFmdHandler->Commit(mFmd.get())) {
eos_err("msg=\"unable to commit meta data to local database\" "
"fxid=%08llx", mFileId);
(void) gOFS.Emsg(epname, error, EIO, "close - unable to "
"commit meta data", mNsPath.c_str());
}
} catch (const std::length_error& e) {}
// Commit to central mgm cache
int envlen = 0;
XrdOucString capOpaqueFile = "";
XrdOucString mTimeString = "";
capOpaqueFile += "/?";
capOpaqueFile += mCapOpaque->Env(envlen);
capOpaqueFile += "&mgm.pcmd=commit";
capOpaqueFile += "&mgm.size=";
char filesize[1024];
sprintf(filesize, "%li", (int64_t)mFmd->mProtoFmd.size());
capOpaqueFile += filesize;
if (mCheckSum) {
capOpaqueFile += "&mgm.checksum=";
capOpaqueFile += mCheckSum->GetHexChecksum();
}
capOpaqueFile += "&mgm.mtime=";
capOpaqueFile += eos::common::StringConversion::GetSizeString(mTimeString,
(mForcedMtime != 1) ? mForcedMtime : (unsigned long long)
mFmd->mProtoFmd.mtime());
capOpaqueFile += "&mgm.mtime_ns=";
capOpaqueFile += eos::common::StringConversion::GetSizeString(mTimeString,
(mForcedMtime != 1) ? mForcedMtime_ms : (unsigned long long)
mFmd->mProtoFmd.mtime_ns());
if (mFusex) {
capOpaqueFile += "&mgm.fusex=1";
}
if (mHasWrite) {
capOpaqueFile += "&mgm.modified=1";
}
if (noAtomicVersioning) {
// prevent atomic/versioning on commmit
capOpaqueFile += "&mgm.commit.verify=1";
}
capOpaqueFile += "&mgm.add.fsid=";
capOpaqueFile += (int) mFmd->mProtoFmd.fsid();
// If is set, we can issue a drop replica
if (mCapOpaque->Get("mgm.drainfsid")) {
capOpaqueFile += "&mgm.drop.fsid=";
capOpaqueFile += mCapOpaque->Get("mgm.drainfsid");
}
if (mRainReconstruct) {
// Indicate that this is a commit of a RAIN reconstruction
if (mLayout->IsEntryServer()) {
capOpaqueFile += "&mgm.reconstruction=1";
if (!hasReadError && mOpenOpaque->Get("eos.pio.recfs")) {
capOpaqueFile += "&mgm.drop.fsid=";
capOpaqueFile += mOpenOpaque->Get("eos.pio.recfs");
}
}
} else {
if (mLayout->IsEntryServer() && !isReplication && !mIsInjection) {
// The entry server commits size and checksum
capOpaqueFile += "&mgm.commit.size=1";
if (mCheckSum) {
capOpaqueFile += "&mgm.commit.checksum=1";
}
} else {
bool issinglewriter = (gOFS.openedForWriting.getUseCount(mFmd->mProtoFmd.fsid(),
mFmd->mProtoFmd.fid()) <= 1);
if (issinglewriter && mCheckSum) {
// if we computed a checksum, we verify it IF there is only a single writer, if there are several writers we have a significant inconsistency window during commit between replicas
capOpaqueFile += "&mgm.replication=1&mgm.verify.checksum=1";
} else {
if (issinglewriter) {
// if we didn't compute a checksum, we disable checksum verification and we only indicate replication if there is only one active writer
capOpaqueFile += "&mgm.replication=1&mgm.verify.checksum=0";
}
}
}
}
// The log ID to the commit
capOpaqueFile += "&mgm.logid=";
capOpaqueFile += logId;
// Evt. tag as an OC-Chunk commit
if (IsChunkedUpload()) {
// Add the chunk information
int envlen;
capOpaqueFile += eos::common::OwnCloud::FilterOcQuery(mOpenOpaque->Env(envlen));
}
rc = gOFS.CallManager(&error, mCapOpaque->Get("mgm.path"),
mCapOpaque->Get("mgm.manager"), capOpaqueFile,
nullptr, 0, true);
if (rc) {
if ((error.getErrInfo() == EIDRM) || (error.getErrInfo() == EBADE) ||
(error.getErrInfo() == EBADR) || (error.getErrInfo() == EREMCHG)) {
if (error.getErrInfo() == EIDRM) {
// This file has been deleted in the meanwhile ... we can
// unlink that immediately
eos_info("info=\"unlinking fxid=%08llx path=%s - "
"file has been already unlinked from the namespace\"",
mFmd->mProtoFmd.fid(), mNsPath.c_str());
mFusexIsUnlinked = true;
}
if (error.getErrInfo() == EBADE) {
eos_err("info=\"unlinking fxid=%08llx path=%s - "
"file size of replica does not match reference\"",
mFmd->mProtoFmd.fid(), mNsPath.c_str());
consistencyerror = true;
}
if (error.getErrInfo() == EBADR) {
eos_err("info=\"unlinking fxid=%08llx path=%s - "
"checksum of replica does not match reference\"",
mFmd->mProtoFmd.fid(), mNsPath.c_str());
consistencyerror = true;
}
if (error.getErrInfo() == EREMCHG) {
eos_err("info=\"unlinking fxid=%08llx path=%s - "
"overlapping atomic upload - discarding this one\"",
mFmd->mProtoFmd.fid(), mNsPath.c_str());
atomicoverlap = true;
}
deleteOnClose = true;
} else {
eos_crit("commit returned an uncatched error msg=%s [probably timeout]"
" - closing transaction to keep the file save - rc=%d",
error.getErrText(), rc);
}
} else {
commited_to_mgm = true;
}
}
}
}
}
// Recompute our ETag
eos::calculateEtag(mCheckSum != nullptr, mFmd->mProtoFmd, mEtag);
int closerc = 0; // return of the close
brc = rc; // return before the close
rc |= ModifiedWhileInUse();
if (mSyncOnClose) {
eos_info("syncing layout for iotype=csync");
rc |= mLayout->Sync();
}
closerc = mLayout->Close();
if (gOFS.mSimCloseErr) {
// simulate an error during the close call
closerc = 1;
}
rc |= closerc;
closed = true;
if (closerc || (mRainReconstruct && hasReadError)) {
// For RAIN layouts if there is an error on close when writing then we
// delete the whole file. If we do RAIN reconstruction we cleanup this
// local replica which was not committed.
if (eos::common::LayoutId::IsRain(mLayout->GetLayoutId())) {
deleteOnClose = true;
} else {
// Some (remote) replica didn't make it through ... trigger an auto-repair
if (!deleteOnClose) {
repairOnClose = true;
}
}
}
{
XrdSysMutexHelper scope_lock(gOFS.OpenFidMutex);
if (mIsRW) {
if ((mIsInjection || isCreation || IsChunkedUpload()) && (!rc) &&
(gOFS.openedForWriting.getUseCount(mFmd->mProtoFmd.fsid(),
mFmd->mProtoFmd.fid() > 1))) {
// indicate that this file was closed properly and disable further delete on close
gOFS.WNoDeleteOnCloseFid[mFmd->mProtoFmd.fsid()][mFmd->mProtoFmd.fid()] = true;
}
gOFS.openedForWriting.down(mFmd->mProtoFmd.fsid(), mFmd->mProtoFmd.fid());
} else {
gOFS.openedForReading.down(mFmd->mProtoFmd.fsid(), mFmd->mProtoFmd.fid());
}
if (!gOFS.openedForWriting.isOpen(mFmd->mProtoFmd.fsid(),
mFmd->mProtoFmd.fid())) {
// When the last writer is gone we can remove the prohibiting entry
gOFS.WNoDeleteOnCloseFid[mFmd->mProtoFmd.fsid()].erase(mFmd->mProtoFmd.fid());
gOFS.WNoDeleteOnCloseFid[mFmd->mProtoFmd.fsid()].resize(0);
}
}
gettimeofday(&closeTime, &tz);
if (!deleteOnClose && mIsRW) {
// Store in the WrittenFilesQueue
gOFS.WrittenFilesQueueMutex.Lock();
gOFS.WrittenFilesQueue.push(*mFmd.get());
gOFS.WrittenFilesQueueMutex.UnLock();
}
// Check if the target filesystem has been put into some non-operational mode
// in the meanwhile, it makes no sense to try to commit in this case
{
eos::common::RWMutexReadLock lock(gOFS.Storage->mFsMutex);
if (gOFS.Storage->mFsMap.count(mFsId) &&
gOFS.Storage->mFsMap[mFsId]->GetConfigStatus() <
eos::common::ConfigStatus::kDrain) {
eos_notice("msg=\"failing transfer because filesystem has non-"
"operational state\" path=%s state=%s", mNsPath.c_str(),
eos::common::FileSystem::GetConfigStatusAsString
(gOFS.Storage->mFsMap[mFsId]->GetConfigStatus()));
deleteOnClose = true;
}
}
{
// Check if the delete on close has been prohibited for this file id
XrdSysMutexHelper scope_lock(gOFS.OpenFidMutex);
if (gOFS.WNoDeleteOnCloseFid[mFsId].count(mFileId)) {
eos_notice("msg=\"prohibiting delete on close since we had a "
"sussessfull put but still an unacknowledged open\" path=%s",
mNsPath.c_str());
deleteOnClose = false;
}
}
// Prepare a report and add to the report queue
if (mTpcFlag != kTpcSrcCanDo) {
// We don't want a report for the source tpc setup. The kTpcSrcRead
// stage actually uses the opaque info from kTpcSrcSetup and that's
// why we also generate a report at this stage.
XrdOucString reportString = "";
gettimeofday(&closeStop, &tz);
CloseTime();
MakeReportEnv(reportString);
eos_static_info("msg=\"%s\"", reportString.c_str());
if (eos::common::LayoutId::IsRain(mLid) &&
mLayout && !mLayout->IsEntryServer()) {
// RAIN non-entry stripes do not report any statistics
} else {
gOFS.ReportQueueMutex.Lock();
gOFS.ReportQueue.push(reportString);
gOFS.ReportQueueMutex.UnLock();
}
}
if (deleteOnClose && (!mFusex) &&
(mIsInjection || isCreation || IsChunkedUpload())) {
rc = SFS_ERROR;
eos_info("info=\"deleting on close\" fn=%s fstpath=%s", mNsPath.c_str(),
mFstPath.c_str());
int retc = gOFS._rem(mNsPath.c_str(), error, 0, mCapOpaque.get(),
mFstPath.c_str(), mFileId, mFsId, true);
if (retc) {
eos_debug(" returned retc=%d", retc);
}
if (commited_to_mgm) {
// If we committed the replica and an error happened remote, we have
// to unlink it again
const std::string hex_fid = eos::common::FileId::Fid2Hex(mFileId);
XrdOucString capOpaqueString = "/?mgm.pcmd=drop";
XrdOucString OpaqueString = "";
OpaqueString += "&mgm.fsid=";
OpaqueString += (int) mFsId;
OpaqueString += "&mgm.fid=";
OpaqueString += hex_fid.c_str();
// If deleteOnClose at the gateway then we drop all replicas
if (mLayout->IsEntryServer() && (!isReplication) && (!mIsInjection) &&
(!mRainReconstruct)) {
OpaqueString += "&mgm.dropall=1";
}
XrdOucEnv Opaque(OpaqueString.c_str());
capOpaqueString += OpaqueString;
// Delete the replica in the MGM - use local error object are we're not
// interested in propagatin the error info to the file object
XrdOucErrInfo lerror;
int rcode = gOFS.CallManager(&lerror, mCapOpaque->Get("mgm.path"),
mCapOpaque->Get("mgm.manager"), capOpaqueString);
if (rcode && (rcode != EIDRM)) {
eos_warning("(unpersist): unable to drop file id %s fsid %u at manager %s",
hex_fid.c_str(), mFileId, mCapOpaque->Get("mgm.manager"));
}
eos_info("info=\"removing on manager\" manager=%s fxid=%08llx fsid=%d "
"fn=%s fstpath=%s rc=%d", mCapOpaque->Get("mgm.manager"),
mFileId, (int) mFsId, mCapOpaque->Get("mgm.path"),
mFstPath.c_str(), rcode);
}
if (minimumsizeerror) {
// Minimum size criteria not fullfilled
gOFS.Emsg(epname, error, EIO, "store file - file has been cleaned "
"because it is smaller than the required minimum file size"
" in that directory", mNsPath.c_str());
eos_warning("info=\"deleting on close\" fn=%s fstpath=%s reason="
"\"minimum file size criteria\"", mCapOpaque->Get("mgm.path"),
mFstPath.c_str());
} else if (checksumerror) {
// Checksum error
gOFS.Emsg(epname, error, EIO, "store file - file has been cleaned "
"because of a checksum error ", mNsPath.c_str());
eos_warning("info=\"deleting on close\" fn=%s fstpath=%s reason="
"\"checksum error\"", mCapOpaque->Get("mgm.path"), mFstPath.c_str());
} else if (writeErrorFlag == kOfsSimulatedIoError) {
// Simulated write error
gOFS.Emsg(epname, error, EIO, "store file - file has been cleaned "
"because of a simulated IO error ", mNsPath.c_str());
eos_warning("info=\"deleting on close\" fn=%s fstpath=%s reason="
"\"simulated IO error\"", mCapOpaque->Get("mgm.path"), mFstPath.c_str());
} else if (writeErrorFlag == kOfsMaxSizeError) {
// Maximum size criteria not fullfilled
gOFS.Emsg(epname, error, EIO, "store file - file has been cleaned "
"because you exceeded the maximum file size settings for "
"this namespace branch", mNsPath.c_str());
eos_warning("info=\"deleting on close\" fn=%s fstpath=%s reason="
"\"maximum file size criteria\"", mCapOpaque->Get("mgm.path"),
mFstPath.c_str());
} else if (writeErrorFlag == kOfsDiskFullError) {
// Disk full detected during write
gOFS.Emsg(epname, error, EIO, "store file - file has been cleaned"
" because the target disk filesystem got full and you "
"didn't use reservation", mNsPath.c_str());
eos_warning("info=\"deleting on close\" fn=%s fstpath=%s reason="
"\"filesystem full\"", mCapOpaque->Get("mgm.path"), mFstPath.c_str());
} else if (writeErrorFlag == kOfsIoError) {
// Generic IO error on the underlying device
gOFS.Emsg(epname, error, EIO, "store file - file has been cleaned because"
" of an IO error during a write operation", mNsPath.c_str());
eos_crit("info=\"deleting on close\" fn=%s fstpath=%s reason="
"\"write IO error\"", mCapOpaque->Get("mgm.path"), mFstPath.c_str());
} else if (writeErrorFlag == kOfsFsRemovedError) {
// Filesystem has been unregistered
gOFS.Emsg(epname, error, EIO, "store file - file has been cleaned because"
" the target filesystem has been unregistered", mNsPath.c_str());
eos_crit("info=\"deleting on close\" fn=%s fstpath=%s reason="
"\"FS removed\"", mCapOpaque->Get("mgm.path"), mFstPath.c_str());
} else if (targetsizeerror) {
// Target size is different from the uploaded file size
gOFS.Emsg(epname, error, EIO, "store file - file has been "
"cleaned because the stored file does not match "
"the provided targetsize", mNsPath.c_str());
eos_warning("info=\"deleting on close\" fn=%s fstpath=%s reason="
"\"target size mismatch\"", mCapOpaque->Get("mgm.path"),
mFstPath.c_str());
} else if (consistencyerror) {
gOFS.Emsg(epname, error, EIO, "store file - file has been "
"cleaned because the stored file does not match "
"the reference meta-data size/checksum", mNsPath.c_str());
eos_crit("info=\"deleting on close\" fn=%s fstpath=%s reason="
"\"meta-data size/checksum mismatch\"", mCapOpaque->Get("mgm.path"),
mFstPath.c_str());
} else if (atomicoverlap) {
gOFS.Emsg(epname, error, EIO, "store file - file has been "
"cleaned because of an overlapping atomic upload "
"and we are not the last uploader", mNsPath.c_str());
eos_crit("info=\"deleting on close\" fn=%s fstpath=%s reason="
"\"suppressed atomic uploadh\"", mCapOpaque->Get("mgm.path"),
mFstPath.c_str());
} else if (queueingerror) {
std::string message =
SSTR("store file - file has been cleaned because of a queueing "
<< "to archive error; reason=\"" << queueing_errmsg << "\"");
gOFS.Emsg(epname, error, EIO, message.c_str(), mNsPath.c_str());
eos_warning("info=\"deleting on close\" fn=%s fstpath=%s reason=\"%s\"",
mCapOpaque->Get("mgm.path"), mFstPath.c_str(),
queueing_errmsg.c_str());
} else {
// Client has disconnected and file is cleaned-up
gOFS.Emsg(epname, error, EIO, "store file - file has been "
"cleaned because of a client disconnect", mNsPath.c_str());
eos_warning("info=\"deleting on close\" fn=%s fstpath=%s "
"reason=\"client disconnect\"", mCapOpaque->Get("mgm.path"),
mFstPath.c_str());
}
} else {
if (checksumerror) {
// Checksum error detected
rc = gOFS.Emsg(epname, error, EIO, "verify checksum - checksum error fn=",
mNsPath.c_str());
int envlen = 0;
eos_crit("msg=\"file checksum error detected\" info=\"%s\"",
mCapOpaque->Env(envlen));
}
}
if ((!IsChunkedUpload()) && repairOnClose) {
// Do an upcall to the MGM and ask to adjust the replica of the uploaded file
XrdOucString OpaqueString = "/?mgm.pcmd=adjustreplica&mgm.path=";
OpaqueString += mCapOpaque->Get("mgm.path");
eos_info("msg=\"repair on close\" path=%s", mCapOpaque->Get("mgm.path"));
if (gOFS.CallManager(&error, mCapOpaque->Get("mgm.path"),
mCapOpaque->Get("mgm.manager"), OpaqueString)) {
eos_err("msg=\"failed adjustreplica\" path=%s", mNsPath.c_str());
gOFS.Emsg(epname, error, EIO, "create all replicas - uploaded file is "
"at risk - only one replica has been successfully stored for fn=",
mNsPath.c_str());
} else {
if (!brc) {
// Reset the return code and clean error message
error.setErrInfo(0, "");
rc = 0;
}
}
eos_warning("msg=\"executed adjustreplica, file is at low risk due to "
"missing replicas\" path=%s", mNsPath.c_str());
}
// Trigger an MGM event from the entry point
if (!rc && (mEventOnClose || mSyncEventOnClose) && mLayout->IsEntryServer()) {
XrdOucString capOpaqueFile = "";
XrdOucString eventType = "";
capOpaqueFile += "/?";
int envlen = 0;
capOpaqueFile += mCapOpaque->Env(envlen);
capOpaqueFile += "&mgm.pcmd=event";
if (mIsRW) {
eventType = mSyncEventOnClose ? "sync::closew" : "closew";
} else {
eventType = "closer";
}
capOpaqueFile += "&mgm.event=";
capOpaqueFile += eventType;
// The log ID to the commit
capOpaqueFile += "&mgm.logid=";
capOpaqueFile += logId;
capOpaqueFile += "&mgm.ruid=";
capOpaqueFile += mCapOpaque->Get("mgm.ruid");
capOpaqueFile += "&mgm.rgid=";
capOpaqueFile += mCapOpaque->Get("mgm.rgid");
capOpaqueFile += "&mgm.sec=";
capOpaqueFile += mCapOpaque->Get("mgm.sec");
if (mEventWorkflow.length()) {
capOpaqueFile += "&mgm.workflow=";
capOpaqueFile += mEventWorkflow.c_str();
}
if (!archive_req_id.empty()) {
capOpaqueFile += "&mgm.archive_req_id=";
capOpaqueFile += archive_req_id.c_str();
}
eos_info("msg=\"notify\" event=\"%s\" workflow=\"%s\"", eventType.c_str(),
mEventWorkflow.c_str());
rc = gOFS.CallManager(&error, mCapOpaque->Get("mgm.path"),
mCapOpaque->Get("mgm.manager"), capOpaqueFile,
nullptr, 30, mSyncEventOnClose, false);
}
// Mask close error for fusex, if the file had been removed already
if (mFusexIsUnlinked && mFusex) {
error.setErrCode(0);
rc = 0;
}
}
eos_info("msg=\"done close\" rc=%i errc=%d", rc, error.getErrInfo());
return rc;
}
//------------------------------------------------------------------------------
// Implementation dependant commands
//------------------------------------------------------------------------------
int
XrdFstOfsFile::fctl(const int cmd, int alen, const char* args,
const XrdSecEntity* client)
{
eos_debug("cmd=%i, args=%s", cmd, args);
if (cmd == SFS_FCTL_SPEC1) {
if (strncmp(args, "delete", alen) == 0) {
eos_warning("Setting deletion flag for file %s", mFstPath.c_str());
// This indicates to delete the file during the close operation
viaDelete = true;
return SFS_OK;
} else if (strncmp(args, "nochecksum", alen) == 0) {
int retc = SFS_OK;
eos_warning("Setting nochecksum flag for file %s", mFstPath.c_str());
mCheckSum.reset(nullptr);
// Propagate command to all the replicas/stripes
if (mLayout) {
retc = mLayout->Fctl(std::string(args), client);
}
return retc;
}
}
error.setErrInfo(ENOTSUP, "fctl command not supported");
return SFS_ERROR;
}
//------------------------------------------------------------------------------
// Low-level open calling the default XrdOfs plugin
//------------------------------------------------------------------------------
int
XrdFstOfsFile::openofs(const char* path,
XrdSfsFileOpenMode open_mode,
mode_t create_mode,
const XrdSecEntity* client,
const char* opaque)
{
int retc = 0;
while ((retc = XrdOfsFile::open(path, open_mode, create_mode, client,
opaque)) > 0) {
eos_static_notice("msg\"xrootd-lock-table busy - snoozing & retry\" "
"delay=%d errno=%d", retc, errno);
std::this_thread::sleep_for(std::chrono::seconds(retc));
}
return retc;
}
//------------------------------------------------------------------------------
// Low-level read calling the default XrdOfs plugin
//------------------------------------------------------------------------------
XrdSfsXferSize
XrdFstOfsFile::readofs(XrdSfsFileOffset fileOffset, char* buffer,
XrdSfsXferSize buffer_size)
{
// EPNAME("read");
gettimeofday(&cTime, &tz);
rCalls++;
if (!getenv("EOS_FST_NO_IOPRIORITY")) {
if (ioprio_begin(IOPRIO_WHO_PROCESS, IOPRIO_PRIO_VALUE(mIoPriorityClass,
mIoPriorityValue), t_iopriority)) {
if (!mIoPriorityErrorReported) {
eos_warning("failed to set IO priority to %d:%d - errno=%d\n", mIoPriorityClass,
mIoPriorityValue, errno);
}
}
}
int rc = XrdOfsFile::read(fileOffset, buffer, buffer_size);
eos_debug("read %llu %llu %i rc=%d", this, fileOffset, buffer_size, rc);
if (!getenv("EOS_FST_NO_IOPRIORITY")) {
t_iopriority = ioprio_end(IOPRIO_WHO_PROCESS,
IOPRIO_PRIO_VALUE(mIoPriorityClass, mIoPriorityValue));
}
if (gOFS.mSimIoReadErr) {
if ((gOFS.mSimErrIoReadOff == 0) ||
(gOFS.mSimErrIoReadOff <= (uint64_t)fileOffset)) {
return gOFS.Emsg("readofs", error, EIO, "read file - simulated IO error fn=",
mNsPath.c_str());
}
}
if (mFsId) {
if (!gOFS.Storage->mFsMap.count(mFsId)) {
return gOFS.Emsg("readeofs", error, EBADF,
"read file - filesystem has been unregistered");
}
}
// Account seeks for monitoring
if (rOffset != static_cast(fileOffset)) {
if (rOffset < static_cast(fileOffset)) {
nFwdSeeks++;
sFwdBytes += (fileOffset - rOffset);
} else {
nBwdSeeks++;
sBwdBytes += (rOffset - fileOffset);
}
if ((rOffset + (EOS_FSTOFS_LARGE_SEEKS)) < (static_cast
(fileOffset))) {
sXlFwdBytes += (fileOffset - rOffset);
nXlFwdSeeks++;
}
if ((static_cast(rOffset) > (EOS_FSTOFS_LARGE_SEEKS)) &&
(rOffset - (EOS_FSTOFS_LARGE_SEEKS)) > (static_cast
(fileOffset))) {
sXlBwdBytes += (rOffset - fileOffset);
nXlBwdSeeks++;
}
}
gettimeofday(&lrTime, &tz);
AddReadTime();
return rc;
}
//------------------------------------------------------------------------------
// Low-level vector read calling the default XrdOfs plugin
//------------------------------------------------------------------------------
XrdSfsXferSize
XrdFstOfsFile::readvofs(XrdOucIOVec* readV, uint32_t readCount)
{
eos_debug("read count=%i", readCount);
gettimeofday(&cTime, &tz);
XrdSfsXferSize sz = XrdOfsFile::readv(readV, readCount);
gettimeofday(&lrvTime, &tz);
AddReadVTime();
// Collect monitoring info only if sz is > 0
if (sz > 0) {
XrdSysMutexHelper scope_lock(vecMutex);
for (uint32_t i = 0; i < readCount; ++i) {
monReadSingleBytes.push_back(readV[i].size);
}
monReadvBytes.push_back(sz);
monReadvCount.push_back(readCount);
}
return sz;
}
//------------------------------------------------------------------------------
// Low-level write calling the default XrdOfs plugin
//------------------------------------------------------------------------------
XrdSfsXferSize
XrdFstOfsFile::writeofs(XrdSfsFileOffset fileOffset, const char* buffer,
XrdSfsXferSize buffer_size)
{
if (gOFS.mSimIoWriteErr) {
if ((gOFS.mSimErrIoWriteOff == 0) ||
(gOFS.mSimErrIoWriteOff <= (uint64_t)fileOffset)) {
writeErrorFlag = kOfsSimulatedIoError;
return gOFS.Emsg("writeofs", error, EIO, "write file - simulated IO error fn=",
mNsPath.c_str());
}
}
if (mFsId) {
if ((mTargetSize && (mTargetSize == mBookingSize)) ||
(mBookingSize >= fileOffset + buffer_size)) {
// Space has been successfully pre-allocated, let client write
} else {
// Check if the file system is full
bool isfull = false;
{
XrdSysMutexHelper lock(gOFS.Storage->mFsFullMapMutex);
isfull = gOFS.Storage->mFsFullMap[mFsId];
}
if (isfull) {
writeErrorFlag = kOfsDiskFullError;
return gOFS.Emsg("writeofs", error, ENOSPC, "write file - disk space "
"(headroom) exceeded fn=", mCapOpaque ?
(mCapOpaque->Get("mgm.path") ? mCapOpaque->Get("mgm.path") :
FName()) : FName());
}
}
// check if the filesystem was unregistered in the meanwhile
eos::common::RWMutexReadLock lock(gOFS.Storage->mFsMutex);
if (!gOFS.Storage->mFsMap.count(mFsId)) {
writeErrorFlag = kOfsFsRemovedError;
return gOFS.Emsg("writeofs", error, EBADF,
"write file - filesystem has been unregistered");
}
}
if (mMaxSize) {
// Check that the user didn't exceed the maximum file size policy
if ((fileOffset + buffer_size) > mMaxSize) {
writeErrorFlag = kOfsMaxSizeError;
return gOFS.Emsg("writeofs", error, ENOSPC, "write file - your file "
"exceeds the maximum file size setting of bytes<=",
mCapOpaque ? (mCapOpaque->Get("mgm.maxsize") ?
mCapOpaque->Get("mgm.maxsize") : "") : "undef");
}
}
gettimeofday(&cTime, &tz);
wCalls++;
int rc;
if (!getenv("EOS_FST_NO_IOPRIORITY")) {
if (ioprio_begin(IOPRIO_WHO_PROCESS, IOPRIO_PRIO_VALUE(mIoPriorityClass,
mIoPriorityValue), t_iopriority)) {
if (!mIoPriorityErrorReported) {
eos_warning("failed to set IO priority to %d:%d - errno=%d\n", mIoPriorityClass,
mIoPriorityValue, errno);
mIoPriorityErrorReported = true;
}
}
}
if (gOFS.mSimDiskWriting) {
// don't write to the disks
XrdFstOfsFile::truncateofs(fileOffset);
rc = buffer_size;
} else {
rc = XrdOfsFile::write(fileOffset, buffer, buffer_size);
}
if (!getenv("EOS_FST_NO_IOPRIORITY")) {
t_iopriority = ioprio_end(IOPRIO_WHO_PROCESS,
IOPRIO_PRIO_VALUE(mIoPriorityClass, mIoPriorityValue));
}
if (rc != buffer_size) {
// Tag an io error
writeErrorFlag = kOfsIoError;
}
// Account seeks for monitoring
if (wOffset != static_cast(fileOffset)) {
if (wOffset < static_cast(fileOffset)) {
nFwdSeeks++;
sFwdBytes += (fileOffset - wOffset);
} else {
nBwdSeeks++;
sBwdBytes += (wOffset - fileOffset);
}
if ((wOffset + (EOS_FSTOFS_LARGE_SEEKS)) < (static_cast
(fileOffset))) {
sXlFwdBytes += (fileOffset - wOffset);
nXlFwdSeeks++;
}
if ((static_cast(wOffset) > (EOS_FSTOFS_LARGE_SEEKS)) &&
(wOffset - (EOS_FSTOFS_LARGE_SEEKS)) > (static_cast
(fileOffset))) {
sXlBwdBytes += (wOffset - fileOffset);
nXlBwdSeeks++;
}
}
if (rc > 0) {
wOffset = fileOffset + rc;
}
gettimeofday(&lwTime, &tz);
AddWriteTime();
return rc;
}
//------------------------------------------------------------------------------
// Low-level sync calling the default XrdOfs plugin
//------------------------------------------------------------------------------
int
XrdFstOfsFile::syncofs()
{
return XrdOfsFile::sync();
}
//------------------------------------------------------------------------------
// Low-level truncate calling the default XrdOfs plugin
//------------------------------------------------------------------------------
int
XrdFstOfsFile::truncateofs(XrdSfsFileOffset fileOffset)
{
// Truncation moves the max offset written
eos_debug("value=%llu", (unsigned long long) fileOffset);
mMaxOffsetWritten = fileOffset;
struct stat buf;
// stat the current file size
// if the file has the proper size we don't truncate
if (!::stat(mFstPath.c_str(), &buf)) {
// if the file has the proper size we don't truncate
if (buf.st_size == fileOffset) {
return SFS_OK;
}
}
return XrdOfsFile::truncate(fileOffset);
}
//------------------------------------------------------------------------------
// Low-level close calling the default XrdOfs plugin
//------------------------------------------------------------------------------
int
XrdFstOfsFile::closeofs()
{
return XrdOfsFile::close();
}
//------------------------------------------------------------------------------
// Return FMD checksum
//------------------------------------------------------------------------------
std::string
XrdFstOfsFile::GetFmdChecksum() const
{
if (mFmd) {
return mFmd->mProtoFmd.checksum();
} else {
return std::string();
}
}
//------------------------------------------------------------------------------
// Verify if a TPC key is still valid
//------------------------------------------------------------------------------
bool
XrdFstOfsFile::TpcValid() const
{
XrdSysMutexHelper scope_lock(gOFS.TpcMapMutex);
if (mTpcKey.length() && gOFS.TpcMap[mIsTpcDst].count(mTpcKey)) {
return true;
}
return false;
}
//------------------------------------------------------------------------------
// Process open opaque information - this can come directly from the client
// or from the MGM redirection and it's not encrypted but sent in plain
// text in the URL
//------------------------------------------------------------------------------
int
XrdFstOfsFile::ProcessOpenOpaque()
{
using namespace std::chrono;
EPNAME("open");
if (!mOpenOpaque) {
eos_warning("msg=\"no open opaque info to process\"");
return SFS_OK;
}
char* val = nullptr;
// Handle various tags which are sent in plain text e.g. mgm.etag
// Extract ETag from the redirection URL if available
if ((val = mOpenOpaque->Get("mgm.etag"))) {
mEtag = val;
}
// mgm.mtime=0 we set the mtime externaly. This indicates that during commit,
// it should not update the mtime as in the case of FUSE clients which will
// call utimes.
if ((val = mOpenOpaque->Get("mgm.mtime"))) {
time_t mtime = (time_t)strtoull(val, 0, 10);
if (mtime == 0) {
mForcedMtime = 0;
mForcedMtime_ms = 0;
} else {
mForcedMtime = mtime;
mForcedMtime_ms = 0;
}
}
// mgm.fusex=1 - Suppress the file close broadcast to the fusex network
// during the file close
if ((val = mOpenOpaque->Get("mgm.fusex"))) {
mFusex = true;
}
// Handle workflow events
if ((val = mOpenOpaque->Get("mgm.event"))) {
std::string event = val;
if (event == "closew") {
mEventOnClose = true;
} else if (event == "sync::closew") {
mSyncEventOnClose = true;
}
val = mOpenOpaque->Get("mgm.workflow");
mEventWorkflow = (val ? val : "");
val = mOpenOpaque->Get("mgm.instance");
mEventInstance = val ? val : "";
val = mOpenOpaque->Get("mgm.owner_uid");
mEventOwnerUid = val ? std::stoul(val) : 99;
val = mOpenOpaque->Get("mgm.owner_gid");
mEventOwnerGid = val ? std::stoul(val) : 99;
val = mOpenOpaque->Get("mgm.requestor");
mEventRequestor = val ? val : "";
val = mOpenOpaque->Get("mgm.requestorgroup");
mEventRequestorGroup = val ? val : "";
val = mOpenOpaque->Get("mgm.attributes");
mEventAttributes = val ? val : "";
}
if ((val = mOpenOpaque->Get("eos.injection"))) {
mIsInjection = true;
}
// enable round-robin scheduling per application/fsid on request
if ((val = mOpenOpaque->Get("eos.schedule"))) {
mAppRR = mSecMap["app"];
}
// Tag as an OC chunk upload
if (eos::common::OwnCloud::isChunkUpload(*mOpenOpaque.get())) {
mIsOCchunk = true;
}
if ((val = mOpenOpaque->Get("x-upload-range"))) {
// for partial range uploads via HTTP we run the same buisness logic like
// for OC chunk uploads
mIsOCchunk = true;
}
// Check if transfer is still valid to avoid any open replays
if ((val = mOpenOpaque->Get("fst.valid"))) {
try {
std::string sval = val;
int64_t valid_sec = std::stoll(sval);
auto now = system_clock::now();
auto now_sec = time_point_cast(now).time_since_epoch().count();
if (valid_sec < now_sec) {
eos_err("msg=\"fst validity expired, avoid open replay\"");
return gOFS.Emsg(epname, error, EINVAL, "open - fst validity expired",
mNsPath.c_str());
}
} catch (...) {
// ignore
}
}
return SFS_OK;
}
//------------------------------------------------------------------------------
// Process capability opaque information - this is encrypted information sent
// by the MGM to the FST
//------------------------------------------------------------------------------
int
XrdFstOfsFile::ProcessCapOpaque(bool& is_repair_read,
eos::common::VirtualIdentity& vid)
{
EPNAME("open");
if (!mCapOpaque) {
eos_warning("msg=\"no cap opaque info to process\"");
return SFS_OK;
}
int envlen {0};
XrdOucString maskOpaque = mCapOpaque->Env(envlen);
eos::common::StringConversion::MaskTag(maskOpaque, "mgm.obfuscate.key");
eos::common::StringConversion::MaskTag(maskOpaque, "mgm.encryption.key");
eos_info("capability=%s", maskOpaque.c_str());
char* val = nullptr;
const char* hexfid = 0;
const char* slid = 0;
const char* secinfo = 0;
const char* scid = 0;
const char* smanager = 0;
// Determine whether or not support for tape is enabled in the MGM
if (mCapOpaque->Get("tapeenabled")) {
mTapeEnabled = true;
}
// Handle file id info
if (!(hexfid = mCapOpaque->Get("mgm.fid"))) {
return gOFS.Emsg(epname, error, EINVAL, "open - no file id in capability",
mNsPath.c_str());
}
mFileId = eos::common::FileId::Hex2Fid(hexfid);
// Handle security info
if (!(secinfo = mCapOpaque->Get("mgm.sec"))) {
return gOFS.Emsg(epname, error, EINVAL,
"open - no security information in capability", mNsPath.c_str());
} else {
mSecString = secinfo;
mSecMap = eos::common::SecEntity::KeyToMap(std::string(secinfo));
}
// Handle min size value
if ((val = mCapOpaque->Get("mgm.minsize"))) {
errno = 0;
mMinSize = strtoull(val, 0, 10);
if (errno) {
eos_err("illegal minimum file size specified <%s>- restricting to 1 byte", val);
mMinSize = 1;
}
} else {
mMinSize = 0;
}
// Handle max size value
if ((val = mCapOpaque->Get("mgm.maxsize"))) {
errno = 0;
mMaxSize = strtoull(val, 0, 10);
if (errno) {
eos_err("illegal maximum file size specified <%s>- restricting to 1 byte", val);
mMaxSize = 1;
}
} else {
mMaxSize = 0;
}
// Handle repair read flag
if ((val = mCapOpaque->Get("mgm.repairread"))) {
is_repair_read = true;
}
// Handle layout id
if (!(slid = mCapOpaque->Get("mgm.lid"))) {
return gOFS.Emsg(epname, error, EINVAL, "open - no layout id in capability",
mNsPath.c_str());
}
mLid = atoi(slid);
// Handle container id
if (!(scid = mCapOpaque->Get("mgm.cid"))) {
return gOFS.Emsg(epname, error, EINVAL, "open - no container id in capability",
mNsPath.c_str());
}
mCid = strtoull(scid, 0, 10);
// Handle the redirect manager
if (!(smanager = mCapOpaque->Get("mgm.manager"))) {
return gOFS.Emsg(epname, error, EINVAL, "open - no manager name in capability",
mNsPath.c_str());
}
mRedirectManager = smanager;
{
// evt. update the shared hash manager entry
XrdSysMutexHelper lock(gConfig.Mutex);
XrdOucString ConfigManager = gConfig.Manager;
if (ConfigManager != mRedirectManager) {
eos_warning("msg=\"MGM master seems to have changed - adjusting global "
"config\" old-manager=\"%s\" new-manager=\"%s\"",
ConfigManager.c_str(), mRedirectManager.c_str());
gConfig.Manager = mRedirectManager;
}
}
// Handle virtual identity
vid = eos::common::VirtualIdentity::Nobody();
if ((val = mCapOpaque->Get("mgm.ruid"))) {
vid.uid = atoi(val);
} else {
return gOFS.Emsg(epname, error, EINVAL, "open - sec ruid missing",
mNsPath.c_str());
}
if ((val = mCapOpaque->Get("mgm.rgid"))) {
vid.gid = atoi(val);
} else {
return gOFS.Emsg(epname, error, EINVAL, "open - sec rgid missing",
mNsPath.c_str());
}
if ((val = mCapOpaque->Get("mgm.uid"))) {
vid.allowed_uids.clear();
vid.allowed_uids.insert(atoi(val));
} else {
return gOFS.Emsg(epname, error, EINVAL, "open - sec uid missing",
mNsPath.c_str());
}
if ((val = mCapOpaque->Get("mgm.gid"))) {
vid.allowed_gids.clear();
vid.allowed_gids.insert(atoi(val));
} else {
return gOFS.Emsg(epname, error, EINVAL, "open - sec gid missing",
mNsPath.c_str());
}
// enable round-robin scheduling per application/fsid on request
if ((val = mCapOpaque->Get("mgm.schedule"))) {
mAppRR = mSecMap["app"];
}
std::string obfuscation_key;
std::string encryption_key;
// handle obfuscation keys
if ((val = mCapOpaque->Get("mgm.obfuscate.key"))) {
obfuscation_key = val;
}
// handl encryption keys
if ((val = mCapOpaque->Get("mgm.encryption.key"))) {
encryption_key = val;
}
mHmac.set(obfuscation_key, encryption_key);
SetLogId(logId, vid, mTident.c_str());
return SFS_OK;
}
//----------------------------------------------------------------------------
// Process mixed opaque information - decisions that need to be taken based
// on both the ecrypted and un-encrypted opaque info
//----------------------------------------------------------------------------
int
XrdFstOfsFile::ProcessMixedOpaque()
{
EPNAME("open");
using eos::common::FileId;
// Handle checksum request
std::string opaqueCheckSum;
char* val = nullptr;
if (mOpenOpaque == nullptr || mCapOpaque == nullptr) {
eos_warning("msg=\"open or cap opaque are empty\"");
return SFS_OK;
}
if ((val = mOpenOpaque->Get("mgm.checksum"))) {
opaqueCheckSum = val;
}
// Call the checksum factory function with the selected layout
if (opaqueCheckSum != "ignore") {
mCheckSum = eos::fst::ChecksumPlugins::GetChecksumObject(mLid);
eos_debug("msg=\"checksum requested\" xs_ptr=%p lid=%u mgm.checksum=\"%s\"",
mCheckSum.get(), mLid, opaqueCheckSum.c_str());
}
// Handle file system id and local prefix - If we open a replica we have to
// take the right filesystem id and filesystem prefix for that replica
const char* sfsid = 0;
if (!(sfsid = mCapOpaque->Get("mgm.fsid"))) {
return gOFS.Emsg(epname, error, EINVAL,
"open - no file system id in capability", mNsPath.c_str());
}
if (mOpenOpaque->Get("mgm.replicaindex")) {
XrdOucString replicafsidtag = "mgm.fsid";
replicafsidtag += (int) atoi(mOpenOpaque->Get("mgm.replicaindex"));
if (mCapOpaque->Get(replicafsidtag.c_str())) {
sfsid = mCapOpaque->Get(replicafsidtag.c_str());
}
}
// Extract the local path prefix from the broadcasted configuration
if (mOpenOpaque->Get("mgm.fsprefix")) {
mLocalPrefix = mOpenOpaque->Get("mgm.fsprefix");
mLocalPrefix.replace("#COL#", ":");
} else {
// Extract the local path prefix from the broadcasted configuration!
mFsId = atoi(sfsid ? sfsid : "0");
eos::common::RWMutexReadLock lock(gOFS.Storage->mFsMutex);
if (mFsId && gOFS.Storage->mFsMap.count(mFsId)) {
mLocalPrefix = gOFS.Storage->mFsMap[mFsId]->GetPath().c_str();
}
}
// @note: the localprefix implementation does not work for gateway machines
if (!mLocalPrefix.length()) {
return gOFS.Emsg(epname, error, EINVAL, "open - cannot determine the prefix"
" path to use for the given filesystem id", mNsPath.c_str());
}
mFsId = atoi(sfsid);
mFstPath = FileId::FidPrefix2FullPath(FileId::Fid2Hex(mFileId).c_str(),
mLocalPrefix.c_str());
return SFS_OK;
}
//------------------------------------------------------------------------------
// Process TPC (third-party copy) opaque information i.e handle tags like
// tpc.key, tpc.dst, tpc.stage etc.
//------------------------------------------------------------------------------
int
XrdFstOfsFile::ProcessTpcOpaque(std::string& opaque, const XrdSecEntity* client)
{
EPNAME(__FUNCTION__);
mIsHttp = (client->tident ? (strncmp(client->tident, "http", 4) == 0) : false);
eos::common::StringConversion::ReplaceStringInPlace(opaque, "?", "&");
eos::common::StringConversion::ReplaceStringInPlace(opaque, "&&", "&");
XrdOucEnv env(opaque.c_str());
std::string tpc_stage = env.Get("tpc.stage") ? env.Get("tpc.stage") : "";
std::string tpc_key = env.Get("tpc.key") ? env.Get("tpc.key") : "";
std::string tpc_src = env.Get("tpc.src") ? env.Get("tpc.src") : "";
std::string tpc_dst = env.Get("tpc.dst") ? env.Get("tpc.dst") : "";
std::string tpc_org = env.Get("tpc.org") ? env.Get("tpc.org") : "";
std::string tpc_lfn = env.Get("tpc.lfn") ? env.Get("tpc.lfn") : "";
// Remove any TPC flags from now on
FilterTagsInPlace(opaque, {"tpc.stage", "tpc.key", "tpc.src", "tpc.dst",
"tpc.org", "tpc.lfn"
});
// Determine the TPC step that we are in
if (tpc_stage == "placement") {
mTpcFlag = kTpcSrcCanDo;
mIsTpcDst = false;
} else if ((tpc_stage == "copy") && tpc_key.length() && tpc_dst.length()) {
mTpcFlag = kTpcSrcSetup;
mIsTpcDst = false;
} else if ((tpc_stage == "copy") && tpc_key.length() && tpc_src.length()) {
mTpcFlag = kTpcDstSetup;
mIsTpcDst = true;
} else if (tpc_key.length() && tpc_org.length()) {
// @note(esindril) The above condition should be as follows but for backwards
// compatibility we keep it as it is. Consider changing it after 1st Jan 2024.
// else if ((tpc_stage == "copy") && tpc_key.length() && tpc_org.length()) {
mTpcFlag = kTpcSrcRead;
mIsTpcDst = false;
}
if ((mTpcFlag == kTpcSrcSetup) || (mTpcFlag == kTpcDstSetup)) {
// Create a TPC entry in the TpcMap
XrdSysMutexHelper tpc_lock(gOFS.TpcMapMutex);
if (gOFS.TpcMap[mIsTpcDst].count(tpc_key)) {
return gOFS.Emsg(epname, error, EPERM, "open - tpc key replayed",
mNsPath.c_str());
}
// Compute the tpc origin e.g. :@
// @todo(esindril) Xrootd 4.0
// std::string origin_host = client->addrInfo->Name();
std::string origin_host = client->host ? client->host : "";
std::string origin_tident = client->tident;
origin_tident.erase(origin_tident.find(":"));
tpc_org = origin_tident;
tpc_org += "@";
tpc_org += origin_host;
// Store the TPC initialization
gOFS.TpcMap[mIsTpcDst][tpc_key].key = tpc_key;
gOFS.TpcMap[mIsTpcDst][tpc_key].org = tpc_org;
gOFS.TpcMap[mIsTpcDst][tpc_key].src = tpc_src;
gOFS.TpcMap[mIsTpcDst][tpc_key].dst = tpc_dst;
gOFS.TpcMap[mIsTpcDst][tpc_key].path = mNsPath.c_str();
gOFS.TpcMap[mIsTpcDst][tpc_key].lfn = tpc_lfn;
// Set tpc key expiration time, only relevant for the TPC source
if (!mIsTpcDst) {
std::string_view tpc_ttl = env.Get("tpc.ttl") ? env.Get("tpc.ttl") : "";
gOFS.TpcMap[mIsTpcDst][tpc_key].expires = GetTpcKeyExpireTS(tpc_ttl);
} else {
gOFS.TpcMap[mIsTpcDst][tpc_key].expires = time(nullptr) + 3600 - 60;
}
mFstTpcInfo = gOFS.TpcMap[mIsTpcDst][tpc_key];
mTpcKey = tpc_key;
if (mTpcFlag == kTpcDstSetup) {
if (!tpc_lfn.length()) {
return gOFS.Emsg(epname, error, EINVAL, "open - tpc lfn missing",
mNsPath.c_str());
}
eos_info("msg=\"tpc dst session\" key=%s, org=%s, src=%s path=%s lfn=%s "
"expires=%llu", gOFS.TpcMap[mIsTpcDst][tpc_key].key.c_str(),
gOFS.TpcMap[mIsTpcDst][tpc_key].org.c_str(),
gOFS.TpcMap[mIsTpcDst][tpc_key].src.c_str(),
gOFS.TpcMap[mIsTpcDst][tpc_key].path.c_str(),
gOFS.TpcMap[mIsTpcDst][tpc_key].lfn.c_str(),
gOFS.TpcMap[mIsTpcDst][tpc_key].expires);
} else if (mTpcFlag == kTpcSrcSetup) {
// Store the opaque info but without any tpc.* info
gOFS.TpcMap[mIsTpcDst][tpc_key].opaque = opaque.c_str();
// Store also the decoded capability info
XrdOucEnv tmp_env(opaque.c_str());
XrdOucEnv* cap_env {nullptr};
int caprc = eos::common::SymKey::ExtractCapability(&tmp_env, cap_env);
if (caprc == ENOKEY) {
delete cap_env;
return gOFS.Emsg(epname, error, caprc, "open - missing capability");
} else if (caprc != 0) {
delete cap_env;
return gOFS.Emsg(epname, error, caprc, "open - capability illegal",
mNsPath.c_str());
} else {
int envlen = 0;
gOFS.TpcMap[mIsTpcDst][tpc_key].capability = cap_env->Env(envlen);
delete cap_env;
}
eos_info("msg=\"tpc src session\" key=%s, org=%s, dst=%s path=%s expires=%llu",
gOFS.TpcMap[mIsTpcDst][tpc_key].key.c_str(),
gOFS.TpcMap[mIsTpcDst][tpc_key].org.c_str(),
gOFS.TpcMap[mIsTpcDst][tpc_key].dst.c_str(),
gOFS.TpcMap[mIsTpcDst][tpc_key].path.c_str(),
gOFS.TpcMap[mIsTpcDst][tpc_key].expires);
}
} else if (mTpcFlag == kTpcSrcRead) {
// Verify a TPC entry in the TpcMap since the destination's open can now
// come before the transfer has been setup we have to give some time for
// the TPC client to deposit the key the not so nice side effect is that
// this thread stays busy during that time
bool exists = false;
for (size_t i = 0; i < 150; ++i) {
{
// Briefly take lock and release it
XrdSysMutexHelper tpcLock(gOFS.TpcMapMutex);
if (gOFS.TpcMap[mIsTpcDst].count(tpc_key)) {
exists = true;
break;
}
}
if (!exists) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
XrdSysMutexHelper tpc_lock(gOFS.TpcMapMutex);
time_t now = time(NULL);
if (!gOFS.TpcMap[mIsTpcDst].count(tpc_key)) {
eos_err("msg=\"tpc key not valid\" key=%s", tpc_key.c_str());
return gOFS.Emsg(epname, error, EPERM, "open - tpc key not valid",
mNsPath.c_str());
}
if (gOFS.TpcMap[mIsTpcDst][tpc_key].expires < now) {
eos_err("msg=\"tpc key expired\" key=%s", tpc_key.c_str());
return gOFS.Emsg(epname, error, EPERM, "open - tpc key expired",
mNsPath.c_str());
}
// We trust 'sss' anyway and we miss the host name in the 'sss' entity
std::string sec_prot = client->prot;
if ((sec_prot != "sss")) {
// Extract hostname from tident to avoid IPV4/6 fqdn mismatch errors
std::string exp_org, cur_org;
if (!GetHostFromTident(gOFS.TpcMap[mIsTpcDst][tpc_key].org, exp_org) ||
!GetHostFromTident(tpc_org, cur_org)) {
eos_err("failed to parse host from tpc_org=%s or cached_org=%s",
tpc_org.c_str(), gOFS.TpcMap[mIsTpcDst][tpc_key].org.c_str());
return gOFS.Emsg(epname, error, EPERM, "open - tpc origin parse error",
mNsPath.c_str());
}
if (exp_org != cur_org) {
eos_err("tpc origin missmatch tpc_org=%s, cached_org=%s",
tpc_org.c_str(), gOFS.TpcMap[mIsTpcDst][tpc_key].org.c_str());
return gOFS.Emsg(epname, error, EPERM, "open - tpc origin mismatch",
mNsPath.c_str());
}
}
eos_info("msg=\"tpc read\" key=%s, org=%s, dst=%s path=%s expires=%llu",
gOFS.TpcMap[mIsTpcDst][tpc_key].key.c_str(),
gOFS.TpcMap[mIsTpcDst][tpc_key].org.c_str(),
gOFS.TpcMap[mIsTpcDst][tpc_key].dst.c_str(),
gOFS.TpcMap[mIsTpcDst][tpc_key].path.c_str(),
gOFS.TpcMap[mIsTpcDst][tpc_key].expires);
// Grab the open information
mNsPath = gOFS.TpcMap[mIsTpcDst][tpc_key].path.c_str();
opaque = gOFS.TpcMap[mIsTpcDst][tpc_key].opaque.c_str();
SetLogId(ExtractLogId(opaque.c_str()).c_str());
// Store the provided origin to compare with our local connection
// gOFS.TpcMap[mIsTpcDst][tpc_key].org = tpc_org;
mFstTpcInfo = gOFS.TpcMap[mIsTpcDst][tpc_key];
mTpcKey = tpc_key;
// Save open opaque env
mOpenOpaque.reset(new XrdOucEnv(opaque.c_str()));
if (gOFS.TpcMap[mIsTpcDst][tpc_key].capability.length()) {
mCapOpaque.reset(new XrdOucEnv(
gOFS.TpcMap[mIsTpcDst][tpc_key].capability.c_str()));
} else {
return gOFS.Emsg(epname, error, EINVAL, "open - capability not found "
"for tpc key %s", tpc_key.c_str());
}
}
// Expire keys which are more than one 1 hours expired
if (mTpcFlag > kTpcNone) {
time_t now = time(NULL);
XrdSysMutexHelper tpcLock(gOFS.TpcMapMutex);
auto it = (gOFS.TpcMap[mIsTpcDst]).begin();
auto del = (gOFS.TpcMap[mIsTpcDst]).begin();
while (it != (gOFS.TpcMap[mIsTpcDst]).end()) {
del = it;
it++;
if (now > (del->second.expires + 3600)) {
eos_info("msg=\"expire tpc key\" key=%s", del->second.key.c_str());
gOFS.TpcMap[mIsTpcDst].erase(del);
}
}
}
// For non-TPC transfer, src placement and destination TPCs we need to save
// and decrypt the open and capability opaque info
if ((mTpcFlag == kTpcNone) ||
(mTpcFlag == kTpcDstSetup) ||
(mTpcFlag == kTpcSrcSetup) ||
(mTpcFlag == kTpcSrcCanDo)) {
mOpenOpaque.reset(new XrdOucEnv(opaque.c_str()));
XrdOucEnv* ptr_opaque {nullptr};
int caprc = eos::common::SymKey::ExtractCapability(mOpenOpaque.get(),
ptr_opaque);
mCapOpaque.reset(ptr_opaque);
if (caprc) {
// If we just miss the key, better stall the client
if (caprc == ENOKEY) {
eos_err("msg=\"FST still misses the required capability key\"");
return gOFS.Stall(error, 10, "FST still misses the required capability key");
}
return gOFS.Emsg(epname, error, caprc, "open - capability illegal",
mNsPath.c_str());
}
}
return SFS_OK;
}
//------------------------------------------------------------------------------
// Compute close time
//------------------------------------------------------------------------------
void
XrdFstOfsFile::CloseTime()
{
unsigned long mus = (closeStop.tv_sec - closeStart.tv_sec) * 1000000 +
(closeStop.tv_usec - closeStart.tv_usec);
timeToClose = mus / 1000.0;
}
//------------------------------------------------------------------------------
// Account for total read time
//------------------------------------------------------------------------------
void
XrdFstOfsFile::AddReadTime()
{
unsigned long mus = (lrTime.tv_sec - cTime.tv_sec) * 1000000 +
(lrTime.tv_usec - cTime.tv_usec);
rTime.tv_sec += (mus / 1000000);
rTime.tv_usec += (mus % 1000000);
}
void
XrdFstOfsFile::AddLayoutReadTime()
{
struct timeval nowtime;
gettimeofday(&nowtime, &tz);
timeToRead += ((nowtime.tv_sec - rStart.tv_sec) * 1000) + ((
nowtime.tv_usec - rStart.tv_usec) / 1000.0);
}
//------------------------------------------------------------------------------
// Account for total readv time
//------------------------------------------------------------------------------
void
XrdFstOfsFile::AddReadVTime()
{
unsigned long mus = (lrvTime.tv_sec - cTime.tv_sec) * 1000000 +
(lrvTime.tv_usec - cTime.tv_usec);
rvTime.tv_sec += (mus / 1000000);
rvTime.tv_usec += (mus % 1000000);
}
void
XrdFstOfsFile::AddLayoutReadVTime()
{
struct timeval nowtime;
gettimeofday(&nowtime, &tz);
timeToReadV += ((nowtime.tv_sec - rvStart.tv_sec) * 1000) + ((
nowtime.tv_usec - rvStart.tv_usec) / 1000.0);
}
//------------------------------------------------------------------------------
// Account for total write time
//------------------------------------------------------------------------------
void
XrdFstOfsFile::AddWriteTime()
{
unsigned long mus = ((lwTime.tv_sec - cTime.tv_sec) * 1000000) +
lwTime.tv_usec - cTime.tv_usec;
wTime.tv_sec += (mus / 1000000);
wTime.tv_usec += (mus % 1000000);
}
void
XrdFstOfsFile::AddLayoutWriteTime()
{
struct timeval nowtime;
gettimeofday(&nowtime, &tz);
timeToWrite += ((nowtime.tv_sec - wStart.tv_sec) * 1000) + ((
nowtime.tv_usec - wStart.tv_usec) / 1000.0);
}
//------------------------------------------------------------------------------
// Make report
//------------------------------------------------------------------------------
void
XrdFstOfsFile::MakeReportEnv(XrdOucString& reportString)
{
// compute avg, min, max, sigma for read and written bytes
unsigned long long rmin, rmax, rsum;
unsigned long long rvmin, rvmax, rvsum; // readv bytes
unsigned long long rsmin, rsmax, rssum; // read single bytes
unsigned long rcmin, rcmax, rcsum; // readv count
unsigned long long wmin, wmax, wsum;
double rsigma, rvsigma, rssigma, rcsigma, wsigma;
bool ioprio_default = false;
{
XrdSysMutexHelper vecLock(vecMutex);
ComputeStatistics(rvec, rmin, rmax, rsum, rsigma);
ComputeStatistics(wvec, wmin, wmax, wsum, wsigma);
ComputeStatistics(monReadvBytes, rvmin, rvmax, rvsum, rvsigma);
ComputeStatistics(monReadSingleBytes, rsmin, rsmax, rssum, rssigma);
ComputeStatistics(monReadvCount, rcmin, rcmax, rcsum, rcsigma);
bool sec_tpc = ((mTpcFlag == kTpcDstSetup) || (mTpcFlag == kTpcSrcRead));
char report[16384];
float iot = (float)(((closeTime.tv_sec - openTime.tv_sec) * 1000.0) + ((
closeTime.tv_usec - openTime.tv_usec) / 1000.0));
float rt = ((rTime.tv_sec * 1000.0) + (rTime.tv_usec / 1000.0));
float rvt = ((rvTime.tv_sec * 1000.0) + (rvTime.tv_usec / 1000.0));
float wt = ((wTime.tv_sec * 1000.0) + (wTime.tv_usec / 1000.0));
float idt = iot - timeToOpen - timeToClose - timeToRead - timeToReadV -
timeToWrite;
float usage = 100.0 - (100.0 * idt / iot);
if (rmin == 0xffffffff) {
rmin = 0;
}
if (wmin == 0xffffffff) {
wmin = 0;
}
if (!mIoPriorityClass || mIoPriorityErrorReported) {
mIoPriorityClass = IOPRIO_CLASS_BE;
mIoPriorityValue = 4;
ioprio_default = true;
}
std::string path = (mCapOpaque->Get("mgm.path") ?
mCapOpaque->Get("mgm.path") : mNsPath.c_str());
std::string sanitized_path = eos::common::StringConversion::SealXrdPath(path);
snprintf(report, sizeof(report) - 1,
"log=%s&path=%s&fstpath=%s&ruid=%u&rgid=%u&td=%s&"
"host=%s&lid=%lu&fid=%llu&fsid=%lu&"
"ots=%lu&otms=%lu&"
"cts=%lu&ctms=%lu&"
"nrc=%lu&nwc=%lu&"
"rb=%llu&rb_min=%llu&rb_max=%llu&rb_sigma=%.02f&"
"rv_op=%llu&rvb_min=%llu&rvb_max=%llu&rvb_sum=%llu&rvb_sigma=%.02f&"
"rs_op=%llu&rsb_min=%llu&rsb_max=%llu&rsb_sum=%llu&rsb_sigma=%.02f&"
"rc_min=%lu&rc_max=%lu&rc_sum=%lu&rc_sigma=%.02f&"
"wb=%llu&wb_min=%llu&wb_max=%llu&wb_sigma=%.02f&"
"sfwdb=%llu&sbwdb=%llu&sxlfwdb=%llu&sxlbwdb=%llu&"
"nfwds=%lu&nbwds=%lu&nxlfwds=%lu&nxlbwds=%lu&"
"usage=%.02f&iot=%.03f&idt=%.03f&lrt=%.03f&lrvt=%.03f&lwt=%.03f&ot=%.03f&ct=%.03f&rt=%.02f&rvt=%.02f&wt=%.02f&osize=%llu&csize=%llu&"
"delete_on_close=%d&prio_c=%d&prio_l=%d&prio_d=%d&forced_bw=%d&ms_sleep=%llu&ior_err=%d&iow_err=%d&%s"
, this->logId
, sanitized_path.c_str()
, mFstPath.c_str()
, this->vid.uid, this->vid.gid, mTident.c_str()
, gOFS.mHostName, mLid, mFileId, mFsId
, openTime.tv_sec, (unsigned long) openTime.tv_usec / 1000
, closeTime.tv_sec, (unsigned long) closeTime.tv_usec / 1000
, rCalls, wCalls
, rsum, rmin, rmax, rsigma
, (unsigned long long)monReadvBytes.size(), rvmin, rvmax, rvsum, rvsigma
, (unsigned long long)monReadSingleBytes.size(), rsmin, rsmax, rssum, rssigma
, rcmin, rcmax, rcsum, rcsigma
, wsum
, wmin
, wmax
, wsigma
, sFwdBytes
, sBwdBytes
, sXlFwdBytes
, sXlBwdBytes
, nFwdSeeks
, nBwdSeeks
, nXlFwdSeeks
, nXlBwdSeeks
, usage
, iot
, idt
, timeToRead
, timeToReadV
, timeToWrite
, timeToOpen
, timeToClose
, rt
, rvt
, wt
, (unsigned long long) openSize
, (unsigned long long) closeSize
, (deleteOnClose) ? 1 : 0
, mIoPriorityClass
, mIoPriorityValue
, ioprio_default
, mBandwidth
, msSleep
, hasReadError
, hasWriteError
, eos::common::SecEntity::ToEnv(mSecString.c_str(),
(sec_tpc ? "tpc" : 0)).c_str());
reportString = report;
}
if ((mTpcFlag > kTpcNone) && (mTpcFlag != kTpcSrcCanDo)) {
XrdSysMutexHelper tpc_lock(gOFS.TpcMapMutex);
ostringstream sstpc;
if (mTpcFlag == kTpcDstSetup) {
sstpc << "&tpc.src=" << mFstTpcInfo.src
<< "&tpc.src_lfn=" << mFstTpcInfo.lfn;
} else if ((mTpcFlag == kTpcSrcSetup) || (mTpcFlag == kTpcSrcRead)) {
sstpc << "&tpc.dst=" << mFstTpcInfo.dst
<< "&tpc.src_lfn=" << mFstTpcInfo.path;
}
reportString += sstpc.str().c_str();
}
}
//------------------------------------------------------------------------------
// Drop all replicas from the MGM
//------------------------------------------------------------------------------
int
XrdFstOfsFile::DropAllFromMgm(eos::common::FileId::fileid_t fileid,
const std::string path, const std::string manager)
{
// If we committed the replica and an error happened remote, we have
// to unlink it again
const std::string hex_fid = eos::common::FileId::Fid2Hex(fileid);
XrdOucErrInfo lerror;
XrdOucString capOpaqueString = "/?mgm.pcmd=drop";
XrdOucString OpaqueString = "";
OpaqueString += "&mgm.fid=";
OpaqueString += hex_fid.c_str();
OpaqueString += "&mgm.fsid=anyway";
OpaqueString += "&mgm.dropall=1";
XrdOucEnv Opaque(OpaqueString.c_str());
capOpaqueString += OpaqueString;
// Delete the replica in the MGM
int rcode = gOFS.CallManager(&lerror, path.c_str(), manager.c_str(),
capOpaqueString);
if (rcode && (error.getErrInfo() != EIDRM)) {
eos_warning("(unpersist): unable to drop file id %s fsid %u at manager %s",
hex_fid.c_str(), fileid, manager.c_str());
}
eos_info("info=\"removing on manager\" manager=%s fid=%llu fsid= drop-allrc=%d",
manager.c_str(), (unsigned long long) fileid, rcode);
return rcode;
}
//------------------------------------------------------------------------------
// Check if file has been modified while in use
//------------------------------------------------------------------------------
int
XrdFstOfsFile::ModifiedWhileInUse()
{
int rc = 0;
bool fileExists = true;
struct stat statinfo;
if (mLayout) {
if ((mLayout->Stat(&statinfo))) {
fileExists = false;
}
} else {
if ((XrdOfsOss->Stat(mFstPath.c_str(), &statinfo))) {
fileExists = false;
}
}
// Check if the file could have been changed in the meanwhile ...
if (fileExists && isReplication && (!mIsRW)) {
if (gOFS.openedForWriting.isOpen(mFsId, mFileId)) {
eos_err("file is now open for writing - discarding replication "
"[wopen=%d]", gOFS.openedForWriting.getUseCount(mFsId, mFileId));
gOFS.Emsg("closeofs", error, EIO,
"guarantee correctness - "
"file has been opened for writing during replication",
mNsPath.c_str());
rc = SFS_ERROR;
}
if ((statinfo.st_mtime != updateStat.st_mtime)) {
eos_err("file has been modified during replication");
rc = SFS_ERROR;
gOFS.Emsg("closeofs", error, EIO, "guarantee correctness -"
"file has been modified during replication", mNsPath.c_str());
}
}
return rc;
}
//------------------------------------------------------------------------------
// Layout read callback
//------------------------------------------------------------------------------
int
XrdFstOfsFile::LayoutReadCB(eos::fst::CheckSum::ReadCallBack::callback_data_t*
cbd)
{
return ((Layout*) cbd->caller)->Read(cbd->offset, cbd->buffer, cbd->size);
}
//------------------------------------------------------------------------------
// File read callback
//------------------------------------------------------------------------------
int
XrdFstOfsFile::FileIoReadCB(eos::fst::CheckSum::ReadCallBack::callback_data_t*
cbd)
{
return ((FileIo*) cbd->caller)->fileRead(cbd->offset, cbd->buffer, cbd->size);
}
//------------------------------------------------------------------------------
// Verify checksum method
//------------------------------------------------------------------------------
bool
XrdFstOfsFile::VerifyChecksum()
{
bool checksumerror = false;
int checksumlen = 0;
// Deal with checksums
if (mCheckSum) {
mCheckSum->Finalize();
if (mCheckSum->NeedsRecalculation()) {
if ((!mIsRW) && ((sFwdBytes + sBwdBytes)
|| (mCheckSum->GetMaxOffset() != openSize))) {
// We don't rescan files if they are read non-sequential or only
// partially
eos_debug("info=\"skipping checksum (re-scan) for non-sequential "
"reading ...\"");
mCheckSum.reset(nullptr);
return false;
}
} else {
eos_debug("isrw=%d max-offset=%lld opensize=%lld", mIsRW,
mCheckSum->GetMaxOffset(), openSize);
if (((!mIsRW) && ((mCheckSum->GetMaxOffset() != openSize) ||
(!mCheckSum->GetMaxOffset())))) {
eos_debug("info=\"skipping checksum (re-scan) for access without any IO or "
"partial sequential read IO from the beginning...\"");
mCheckSum.reset(nullptr);
return false;
}
}
// -------------------------------------------------------------------------------------------------------------------
// !!! CAUTION !!!
// be carefule with adler checksum - finalize can remove the dirty flag if all pieces of a file until the max checksum
// offset were written - however if the file size is diffrent from the max checksum offset, the checksum is dirty
// because the ending part of a file was not written
// -------------------------------------------------------------------------------------------------------------------
if ((mIsRW) && mCheckSum->GetMaxOffset() &&
(mCheckSum->GetMaxOffset() != (off_t)mMaxOffsetWritten)) {
// If there was a write which was not extending the file the checksum
// is dirty!
mCheckSum->SetDirty();
}
if (gOFS.openedForWriting.hadMultiOpen(mFsId, mFileId)) {
// If there were several writers on the file, we should set the checksum dirty
mCheckSum->SetDirty();
}
// If checksum is not completely computed
if (mCheckSum->NeedsRecalculation()) {
unsigned long long scansize = 0;
float scantime = 0; // is ms
if (!XrdOfsFile::fctl(SFS_FCTL_GETFD, 0, error)) {
// Rescan the file
eos::fst::CheckSum::ReadCallBack::callback_data_t cbd;
cbd.caller = (void*) mLayout.get();
eos::fst::CheckSum::ReadCallBack cb(LayoutReadCB, cbd);
if (mCheckSum->ScanFile(cb, scansize, scantime)) {
XrdOucString sizestring;
eos_info("info=\"rescanned checksum\" size=%s time=%.02f ms rate=%.02f MB/s %s",
eos::common::StringConversion::GetReadableSizeString(sizestring,
scansize, "B"),
scantime,
1.0 * scansize / 1000 / (scantime ? scantime : 99999999999999LL),
mCheckSum->GetHexChecksum());
} else {
eos_err("Rescanning of checksum failed");
mCheckSum.reset(nullptr);
return false;
}
} else {
eos_err("Couldn't get file descriptor");
mCheckSum.reset(nullptr);
return false;
}
} else {
// This was prefect streaming I/O
if ((!mIsRW) && ((sFwdBytes + sBwdBytes) ||
(mCheckSum->GetMaxOffset() != openSize))) {
eos_info("info=\"skipping checksum (re-scan) since file was not read "
"completely %llu %llu...\"", mCheckSum->GetMaxOffset(), openSize);
mCheckSum.reset(nullptr);
return false;
}
}
if (mIsRW) {
eos_info("(write) checksum type: %s checksum hex: %s requested-checksum hex: %s",
mCheckSum->GetName(), mCheckSum->GetHexChecksum(),
mOpenOpaque->Get("mgm.checksum") ? mOpenOpaque->Get("mgm.checksum") : "-none-");
// Check if the check sum for the file was given at upload time
if (mOpenOpaque->Get("mgm.checksum")) {
XrdOucString opaqueChecksum = mOpenOpaque->Get("mgm.checksum");
XrdOucString hexChecksum = mCheckSum->GetHexChecksum();
if ((opaqueChecksum != "disable") && (opaqueChecksum != hexChecksum)) {
eos_err("requested checksum %s does not match checksum %s of uploaded"
" file", opaqueChecksum.c_str(), hexChecksum.c_str());
mCheckSum.reset(nullptr);
return true;
}
}
mCheckSum->GetBinChecksum(checksumlen);
// Copy checksum into meta data
mFmd->mProtoFmd.set_checksum(mCheckSum->GetHexChecksum());
if (mHasWrite) {
// If we have no write, we don't set this attributes (xrd3cp!)
// set the eos checksum extended attributes
std::unique_ptr io(eos::fst::FileIoPlugin::GetIoObject(
mFstPath.c_str(), this));
if (((eos::common::LayoutId::GetLayoutType(mLid) ==
eos::common::LayoutId::kPlain) ||
(eos::common::LayoutId::GetLayoutType(mLid) ==
eos::common::LayoutId::kReplica))) {
// Don't put file checksum tags for complex layouts like raid6,readdp, archive
if (io->attrSet(std::string("user.eos.checksumtype"),
std::string(mCheckSum->GetName()))) {
eos_err("unable to set extended attribute errno=%d", errno);
}
if (io->attrSet("user.eos.checksum", mCheckSum->GetBinChecksum(checksumlen),
checksumlen)) {
eos_err("unable to set extended attribute errno=%d", errno);
}
}
// Reset any tagged error
if (io->attrSet("user.eos.filecxerror", "0")) {
eos_err("unable to set extended attribute errno=%d", errno);
}
if (io->attrSet("user.eos.blockcxerror", "0")) {
eos_err("unable to set extended attribute errno=%d", errno);
}
}
} else {
// This is a read with checksum check, compare with mFmd
bool isopenforwrite = gOFS.openedForWriting.isOpen(mFsId, mFileId);
if (isopenforwrite) {
eos_info("%s", "msg=\"read disable checksum check, file being written");
return false;
}
std::string computed_xs = mCheckSum->GetHexChecksum();
eos_info("msg=\"read checksum info\" xs_type=%s xs_computed=%s "
"xs_local=%s fxid=%08llx fsid=%lu", mCheckSum->GetName(),
computed_xs.c_str(), mFmd->mProtoFmd.checksum().c_str(),
mFileId, mFsId);
// We might fetch an unitialized value, so that is not to be considered
// a checksum error yet.
if (mFmd->mProtoFmd.checksum() != "none") {
if (computed_xs != mFmd->mProtoFmd.checksum().c_str()) {
checksumerror = true;
}
}
}
}
return checksumerror;
}
//------------------------------------------------------------------------------
// Queue file for CTA archiving
//------------------------------------------------------------------------------
bool
XrdFstOfsFile::QueueForArchiving(const struct stat& statinfo,
std::string& queueing_errmsg,
std::string& archive_req_id)
{
std::string decodedAttributes;
eos::common::SymKey::Base64Decode(mEventAttributes.c_str(), decodedAttributes);
std::map attributes;
eos::common::StringConversion::GetKeyValueMap(decodedAttributes.c_str(),
attributes,
eos::common::WF_CUSTOM_ATTRIBUTES_TO_FST_EQUALS,
eos::common::WF_CUSTOM_ATTRIBUTES_TO_FST_SEPARATOR, nullptr);
std::string mgm_hostname;
if (!gOFS.mMgmAlias.empty()) {
mgm_hostname = gOFS.mMgmAlias;
} else {
const char* ptr = mCapOpaque->Get("mgm.manager");
if (ptr != nullptr) {
mgm_hostname = ptr;
} else {
eos_err("%s", "msg=\"count not determine value of MGM hostname");
return false;
}
}
const int notifyRc = NotifyProtoWfEndPointClosew(mFmd->mProtoFmd.fid(),
mFmd->mProtoFmd.lid(),
statinfo.st_size,
mFmd->mProtoFmd.checksum(),
mEventOwnerUid,
mEventOwnerGid,
mEventRequestor,
mEventRequestorGroup,
mEventInstance,
mCapOpaque->Get("mgm.path"),
mgm_hostname,
attributes,
queueing_errmsg,
archive_req_id);
// Note: error variable defined in XrdSfsFile interface
if (notifyRc == 0) {
error.setErrCode(0);
eos_info("Return code rc=%i errc=%d", SFS_OK, error.getErrInfo());
return true;
} else if (SendArchiveFailedToManager(mFmd->mProtoFmd.fid(), queueing_errmsg)) {
eos_crit("msg=\"Failed to send archive failed event to manager\" "
"queueing_errmsg=\"%s\"", queueing_errmsg.c_str());
}
error.setErrCode(EIO);
eos_info("Return code rc=%i errc=%d", SFS_ERROR, error.getErrInfo());
return false;
}
//----------------------------------------------------------------------------
// Static method used to start an asynchronous thread which is doing the TPC
// transfer
//----------------------------------------------------------------------------
void*
XrdFstOfsFile::StartDoTpcTransfer(void* arg)
{
return reinterpret_cast(arg)->DoTpcTransfer();
}
//------------------------------------------------------------------------------
// Run method for the thread doing the TPC transfer
//------------------------------------------------------------------------------
void*
XrdFstOfsFile::DoTpcTransfer()
{
eos_info("msg=\"tpc now running - 1st sync\"");
std::string src_url = "";
std::string src_cgi = "";
// The sync initiates the third party copy
if (!TpcValid()) {
eos_err("msg=\"tpc session invalidated during sync\"");
XrdSysMutexHelper scope_lock(mTpcJobMutex);
mTpcState = kTpcDone;
mTpcRetc = ECONNABORTED;
mTpcInfo.Reply(SFS_ERROR, ECONNABORTED, "sync TPC session closed by "
"disconnect");
return 0;
}
{
XrdSysMutexHelper tpcLock(gOFS.TpcMapMutex);
// Construct the source URL
src_url = "root://";
src_url += gOFS.TpcMap[mIsTpcDst][mTpcKey].src;
src_url += "/";
src_url += gOFS.TpcMap[mIsTpcDst][mTpcKey].lfn;
src_url += "?fst.readahead=true";
src_cgi = "tpc.key=";
src_cgi += mTpcKey;
src_cgi += "&tpc.org=";
src_cgi += gOFS.TpcMap[mIsTpcDst][mTpcKey].org;
src_cgi += "&tpc.stage=copy";
}
XrdIo tpcIO(src_url);
tpcIO.SetLogId(logId);
eos_info("sync-url=%s sync-cgi=%s", src_url.c_str(), src_cgi.c_str());
if (tpcIO.fileOpen(0, 0, src_cgi)) {
eos_err("msg=\"TPC open failed for url=%s cgi=%s\"", src_url.c_str(),
src_cgi.c_str());
XrdSysMutexHelper scope_lock(mTpcJobMutex);
mTpcState = kTpcDone;
mTpcRetc = EFAULT;
mTpcInfo.Reply(SFS_ERROR, EFAULT,
SSTR("sync - TPC open failed for src_url=" << src_url).c_str());
return 0;
}
if (!TpcValid()) {
tpcIO.fileClose();
eos_err("msg=\"tpc session invalidated during sync\"");
XrdSysMutexHelper scope_lock(mTpcJobMutex);
mTpcState = kTpcDone;
mTpcRetc = ECONNABORTED;
mTpcInfo.Reply(SFS_ERROR, ECONNABORTED,
SSTR("sync - TPC session closed by disconnect src_url="
<< src_url).c_str());
return 0;
}
int64_t rbytes = 0;
int64_t wbytes = 0;
off_t offset = 0;
constexpr uint64_t eight_gb = 8 * (1ULL << 30);
static_assert(eight_gb == 8589934592,
"eight gb is not computed correctly!");
std::unique_ptr< std::vector > buffer
(new std::vector(tpcIO.GetBlockSize()));
eos_info("msg=\"tpc pull\" ");
struct stat st_info;
if (tpcIO.fileStat(&st_info)) {
eos_err("%s", "msg=\"failed to stat remote file\" src_url=%s",
src_url.c_str());
XrdSysMutexHelper scope_lock(mTpcJobMutex);
mTpcState = kTpcDone;
mTpcRetc = EIO;
mTpcInfo.Reply(SFS_ERROR, mTpcRetc, "sync - TPC remote stat failed");
return 0;
}
int64_t file_size = st_info.st_size;
int64_t nread {0ull};
while (offset < file_size) {
// Read the remote file in chunks and check after each chunk if the TPC
// has been aborted already
if (file_size - offset >= tpcIO.GetBlockSize()) {
nread = tpcIO.GetBlockSize();
} else {
nread = file_size - offset;
}
if (getenv("EOS_FST_TPC_READASYNC")) {
rbytes = tpcIO.fileReadPrefetch(offset, &((*buffer)[0]), nread, 30);
} else {
rbytes = tpcIO.fileRead(offset, &((*buffer)[0]), nread);
}
eos_debug("msg=\"tpc read\" rbytes=%lli request=%llu",
rbytes, tpcIO.GetBlockSize());
if ((rbytes == -1) || (rbytes != nread)) {
(void) tpcIO.fileClose();
eos_err("msg=\"tpc transfer terminated - remote read failed\"");
XrdSysMutexHelper scope_lock(mTpcJobMutex);
mTpcState = kTpcDone;
mTpcRetc = EIO;
mTpcInfo.Reply(SFS_ERROR, mTpcRetc,
SSTR("sync - TPC remote read failed src_url="
<< src_url).c_str());
return 0;
}
// Write the buffer out through the local object
wbytes = write(offset, &((*buffer)[0]), rbytes);
if (offset / eight_gb != (offset + rbytes) / eight_gb) {
eos_info("msg=\"tcp write\" offset=%llu", offset);
}
if (rbytes != wbytes) {
(void) tpcIO.fileClose();
eos_err("%s", "msg=\"tpc transfer terminated - local write failed\"");
XrdSysMutexHelper scope_lock(mTpcJobMutex);
mTpcState = kTpcDone;
mTpcRetc = EIO;
mTpcInfo.Reply(SFS_ERROR, mTpcRetc, "sync - TPC local write failed");
return 0;
}
offset += rbytes;
// Got an "ofs.tpc cancel" request from the client who triggered it
if (mTpcCancel) {
eos_err("%s", "msg=\"tpc transfer cancelled by the client\"");
XrdSysMutexHelper scope_lock(mTpcJobMutex);
mTpcState = kTpcDone;
mTpcRetc = ECANCELED;
mTpcInfo.Reply(SFS_ERROR, mTpcRetc,
SSTR("sync - TPC cancelled by client src_url="
<< src_url).c_str());
return 0;
}
// Check validity of the TPC key
if (!TpcValid()) {
(void) tpcIO.fileClose();
eos_err("msg=\"tpc transfer invalidated during sync\"");
XrdSysMutexHelper scope_lock(mTpcJobMutex);
mTpcState = kTpcDone;
mTpcRetc = ECONNABORTED;
mTpcInfo.Reply(SFS_ERROR, mTpcRetc, "sync - TPC session closed "
"by diconnect");
return 0;
}
}
// Close the remote file
eos_info("msg=\"done tpc transfer, close remote file\" src_url=%s",
src_url.c_str());
XrdCl::XRootDStatus st = tpcIO.fileClose();
XrdSysMutexHelper scope_lock(mTpcJobMutex);
mTpcState = kTpcDone;
mTpcInfo.Reply(SFS_OK, 0, "");
return 0;
}
//------------------------------------------------------------------------------
// Filter out particular tags from the opaque information
//------------------------------------------------------------------------------
void
XrdFstOfsFile::FilterTagsInPlace(std::string& opaque,
const std::set tags)
{
bool found = false;
std::ostringstream oss;
std::list tokens = eos::common::StringTokenizer::split
>(opaque, '&');
for (const auto& token : tokens) {
found = false;
for (const auto& tag : tags) {
if (token.find(tag) == 0) {
found = true;
break;
}
}
if (!found && !token.empty()) {
oss << token << "&";
}
}
opaque = oss.str();
if (!opaque.empty()) {
opaque.pop_back();
}
}
//------------------------------------------------------------------------------
// Return current mtime while open
//------------------------------------------------------------------------------
time_t
XrdFstOfsFile::GetMtime() const
{
if (!mIsRW) {
// this is to report the MGM mtime to http get requests
if (mForcedMtime != 1) {
return mForcedMtime;
}
}
if (mFmd) {
return mFmd->mProtoFmd.mtime();
} else {
return 0;
}
}
//------------------------------------------------------------------------------
// Extract logid from the opaque info
//------------------------------------------------------------------------------
std::string
XrdFstOfsFile::ExtractLogId(const char* opaque) const
{
std::string log_id = "unknown";
if (opaque == nullptr) {
return log_id;
}
std::string sopaque = opaque;
const std::string tag = "mgm.logid=";
size_t pos_begin = sopaque.find(tag);
if (pos_begin != std::string::npos) {
pos_begin += tag.length();
size_t pos_end = sopaque.find('&', pos_begin);
if (pos_end != std::string::npos) {
pos_end -= pos_begin;
}
log_id = sopaque.substr(pos_begin, pos_end);
}
return log_id;
}
//------------------------------------------------------------------------------
// Notify the workflow protobuf endpoint of closew event
//------------------------------------------------------------------------------
int
XrdFstOfsFile::NotifyProtoWfEndPointClosew(uint64_t file_id,
uint32_t file_lid, uint64_t file_size,
const std::string& file_checksum,
uint32_t owner_uid, uint32_t owner_gid,
const std::string& requestor_name,
const std::string& requestor_groupname,
const std::string& instance_name,
const std::string& fullpath,
const std::string& manager_name,
const std::map& xattrs,
std::string& errmsg_wfe,
std::string& archive_req_id)
{
using namespace eos::common;
cta::xrd::Request request;
auto notification = request.mutable_notification();
notification->mutable_cli()->mutable_user()->set_username(requestor_name);
notification->mutable_cli()->mutable_user()->set_groupname(requestor_groupname);
notification->mutable_file()->mutable_owner()->set_uid(owner_uid);
notification->mutable_file()->mutable_owner()->set_gid(owner_gid);
notification->mutable_file()->set_size(file_size);
// Insert a single checksum into the checksum blob
CtaCommon::SetChecksum(notification->mutable_file()->mutable_csb()->add_cs(),
file_lid, file_checksum);
notification->mutable_wf()->set_event(cta::eos::Workflow::CLOSEW);
notification->mutable_wf()->mutable_instance()->set_name(instance_name);
auto xrdname = getenv("XRDNAME");
auto requester_instance = std::string(gOFS.mHostName) + ":" +
(xrdname ? std::string(xrdname) : "NULL");
notification->mutable_wf()->set_requester_instance(requester_instance);
notification->mutable_file()->set_lpath(fullpath);
notification->mutable_file()->set_fid(file_id);
notification->mutable_file()->set_disk_file_id(std::to_string(file_id));
auto fxidString = StringConversion::FastUnsignedToAsciiHex(file_id);
std::string ctaArchiveFileId = "none";
for (const auto& attrPair : xattrs) {
google::protobuf::MapPair attr(attrPair.first,
attrPair.second);
notification->mutable_file()->mutable_xattr()->insert(attr);
if (attrPair.first == "sys.archive.file_id") {
ctaArchiveFileId = attrPair.second;
}
}
// Build query strings
std::ostringstream srcStream;
std::ostringstream reportStream;
std::ostringstream errorReportStream;
srcStream << "root://" << manager_name << "/" << fullpath << "?eos.lfn=fxid:"
<< fxidString;
notification->mutable_wf()->mutable_instance()->set_url(srcStream.str());
reportStream << "eosQuery://" << manager_name
<< "//eos/wfe/passwd?mgm.pcmd=event&mgm.fid=" << fxidString
<< "&mgm.logid=cta&mgm.event=sync::archived&mgm.workflow=default&mgm.path=/dummy_path&mgm.ruid=0&mgm.rgid=0"
"&cta_archive_file_id=" << ctaArchiveFileId;
notification->mutable_transport()->set_report_url(reportStream.str());
errorReportStream << "eosQuery://" << manager_name
<< "//eos/wfe/passwd?mgm.pcmd=event&mgm.fid=" << fxidString
<< "&mgm.logid=cta&mgm.event=" << ARCHIVE_FAILED_WORKFLOW_NAME
<< "&mgm.workflow=default&mgm.path=/dummy_path&mgm.ruid=0&mgm.rgid=0"
"&cta_archive_file_id=" << ctaArchiveFileId
<< "&mgm.errmsg=";
notification->mutable_transport()->set_error_report_url(
errorReportStream.str());
// Communication with service
std::string endPoint;
std::string resource;
{
XrdSysMutexHelper lock(gConfig.Mutex);
endPoint = gConfig.ProtoWFEndpoint;
resource = gConfig.ProtoWFResource;
}
if (endPoint.empty() || resource.empty()) {
eos_static_err("%s", "msg=\"you are running proto wf jobs without "
"specifying fstofs.protowfendpoint or "
"fstofs.protowfresource in the FST config file\"");
return ENOTCONN;
}
XrdSsiPb::Config config;
if (getenv("XRDDEBUG")) {
config.set("log", "all");
} else {
config.set("log", "info");
}
config.set("request_timeout", "120");
cta::xrd::Response response;
try {
// Instantiate service object only once, static is also thread-safe
// If static initialization throws an exception, it will be retried next time
static XrdSsiPbServiceType service(endPoint, resource, config);
auto sentAt = std::chrono::steady_clock::now();
service.Send(request, response);
auto receivedAt = std::chrono::steady_clock::now();
auto timeSpent = std::chrono::duration_cast
(receivedAt - sentAt);
eos_static_info("SSI Protobuf time for sync::closew=%ld", timeSpent.count());
} catch (std::runtime_error& err) {
eos_static_err("Could not send request to outside service. Reason: %s",
err.what());
return ENOTCONN;
}
switch (response.type()) {
case cta::xrd::Response::RSP_SUCCESS: {
auto archiveReqIdItor = response.xattr().find("sys.cta.objectstore.id");
if (response.xattr().end() != archiveReqIdItor) {
archive_req_id = archiveReqIdItor->second;
} else {
eos_static_err("msg=\"Failed to extract sys.cta.objectstore.id from response to closew notification to"
" protowfendpoint\" path=\"%s\"", fullpath.c_str());
}
}
return 0;
case cta::xrd::Response::RSP_ERR_CTA:
case cta::xrd::Response::RSP_ERR_USER:
case cta::xrd::Response::RSP_ERR_PROTOBUF:
case cta::xrd::Response::RSP_INVALID:
errmsg_wfe = response.message_txt();
eos_static_err("%s for file %s. Reason: %s",
CtaCommon::ctaResponseCodeToString(response.type()).c_str(),
fullpath.c_str(), response.message_txt().c_str());
return EPROTO;
default:
eos_static_err("Response:\n%s", response.DebugString().c_str());
return EPROTO;
}
}
//------------------------------------------------------------------------------
// Send archive failed event to the manager
//------------------------------------------------------------------------------
int XrdFstOfsFile::SendArchiveFailedToManager(const uint64_t fid,
const std::string& errmsg)
{
const auto fxidString = eos::common::StringConversion::FastUnsignedToAsciiHex(
fid);
std::string encodedErrMsg;
if (!common::SymKey::Base64Encode(errmsg.c_str(), errmsg.length(),
encodedErrMsg)) {
// "Failed to encode message using base64" in base64 is
// RmFpbGVkIHRvIGVuY29kZSBtZXNzYWdlIHVzaW5nIGJhc2U2NA==
encodedErrMsg = "RmFpbGVkIHRvIGVuY29kZSBtZXNzYWdlIHVzaW5nIGJhc2U2NA==";
}
XrdOucString errorReportOpaque = "";
errorReportOpaque += "/?";
errorReportOpaque += "mgm.pcmd=event";
errorReportOpaque += "&mgm.fid=";
errorReportOpaque += fxidString.c_str();
errorReportOpaque += "&mgm.logid=cta";
errorReportOpaque += "&mgm.event=";
errorReportOpaque += common::ARCHIVE_FAILED_WORKFLOW_NAME;
errorReportOpaque += "&mgm.workflow=default";
errorReportOpaque += "&mgm.path=/dummy_path";
errorReportOpaque += "&mgm.ruid=0";
errorReportOpaque += "&mgm.rgid=0";
errorReportOpaque += "&mgm.errmsg=";
errorReportOpaque += encodedErrMsg.c_str();
eos_info("msg=\"sending error message to manager\" path=\"%s\" manager=\"%s\" "
"errorReportOpaque=\"%s\"", mCapOpaque->Get("mgm.path"),
mCapOpaque->Get("mgm.manager"), errorReportOpaque.c_str());
return gOFS.CallManager(&error, mCapOpaque->Get("mgm.path"),
mCapOpaque->Get("mgm.manager"),
errorReportOpaque, nullptr, 30, mSyncEventOnClose, false);
}
//------------------------------------------------------------------------------
// Get hostname from tident
//------------------------------------------------------------------------------
bool
XrdFstOfsFile::GetHostFromTident(const std::string& tident,
std::string& hostname)
{
hostname.clear();
size_t pos = tident.find('@');
if ((pos == std::string::npos) || (pos + 1 == tident.length())) {
return false;
}
size_t dot_pos = tident.find('.', pos + 1);
hostname = tident.substr(pos + 1, dot_pos - pos - 1);
return true;
}
//------------------------------------------------------------------------------
// Check if async close is configured
//------------------------------------------------------------------------------
bool
XrdFstOfsFile::IsAsyncCloseConfigured()
{
const char* ptr = getenv("EOS_FST_ASYNC_CLOSE");
if (!ptr || (strncmp(ptr, "1", 1) != 0)) {
return false;
}
return true;
}
//------------------------------------------------------------------------------
// Decide if close should be done synchronously. There are cases when close
// should happen in the same thread eg. read, http tx, sink writes etc.
//----------------------------------------------------------------------------
bool
XrdFstOfsFile::DoSyncClose()
{
static uint64_t min_size_async_close = GetAsyncCloseMinSize();
// Even if async close is enabled there are some cases when close happens in
// the same XRootD thread
if (viaDelete || mWrDelete || mIsDevNull || (mIsRW == false) || mIsHttp ||
(mIsRW && (mMaxOffsetWritten <= min_size_async_close))) {
return true;
}
// For RAIN layouts especially only the entry server should do an async close.
// If all stripes are on the same FST (which is not a good idea) there is a
// risk that the thread pool for handling close requests will deadlock as
// some close requests will wait forever for queued depended close ops.
if (mIsRW && mLayout && !mLayout->IsEntryServer()) {
return true;
}
return false;
}
//------------------------------------------------------------------------------
// Get configured minimum file size for which the asynchronous close method
// is called.
//------------------------------------------------------------------------------
uint64_t
XrdFstOfsFile::GetAsyncCloseMinSize()
{
uint64_t min_size_async_close = 0ull;
const char* ptr = getenv("EOS_FST_ASYNC_CLOSE_MIN_SIZE_BYTES");
if (ptr) {
if (!eos::common::StringToNumeric(std::string(ptr), min_size_async_close,
0ul)) {
eos_static_err("%s", "msg=\"failed to convert "
"EOS_ASYNC_CLOSE_MIN_SIZE_BYTES, using by default 0\"");
}
}
return min_size_async_close;
}
EOSFSTNAMESPACE_END