// ----------------------------------------------------------------------
// File: XrdMgmOfs.cc
// Author: Andreas-Joachim Peters - CERN
// ----------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2011 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "common/Mapping.hh"
#include "common/FileId.hh"
#include "common/LayoutId.hh"
#include "common/Path.hh"
#include "common/SecEntity.hh"
#include "common/StackTrace.hh"
#include "common/ParseUtils.hh"
#include "common/StringTokenizer.hh"
#include "common/Strerror_r_wrapper.hh"
#include "mgm/Access.hh"
#include "mgm/FileSystem.hh"
#include "mgm/XrdMgmOfs.hh"
#include "mgm/XrdMgmOfsFile.hh"
#include "mgm/XrdMgmOfsSecurity.hh"
#include "mgm/Stat.hh"
#include "mgm/Policy.hh"
#include "mgm/Quota.hh"
#include "mgm/Acl.hh"
#include "mgm/Workflow.hh"
#include "mgm/proc/ProcInterface.hh"
#include "mgm/tracker/ReplicationTracker.hh"
#include "mgm/Recycle.hh"
#include "mgm/Macros.hh"
#include "mgm/ZMQ.hh"
#include "mgm/tgc/MultiSpaceTapeGc.hh"
#include "mgm/utils/AttrHelper.hh"
#include "mgm/XattrLock.hh"
#include "mgm/placement/FsScheduler.hh"
#include "namespace/utils/Attributes.hh"
#include "namespace/Prefetcher.hh"
#include "namespace/Resolver.hh"
#include "XrdOss/XrdOss.hh"
#include "XrdSec/XrdSecInterface.hh"
#include "XrdSec/XrdSecEntityAttr.hh"
#include "XrdSfs/XrdSfsAio.hh"
#include "common/Constants.hh"
#include "XrdOuc/XrdOucPgrwUtils.hh"
#ifdef __APPLE__
#define ECOMM 70
#endif
#ifndef S_IAMB
#define S_IAMB 0x1FF
#endif
namespace
{
//----------------------------------------------------------------------------
//! Thrown if a disk location could not be found
//----------------------------------------------------------------------------
struct DiskLocationNotFound: public std::runtime_error {
using std::runtime_error::runtime_error;
};
//----------------------------------------------------------------------------
//! @param locations locations to be searched
//! @return first location that is a disk as opposed to tape
//! @throw DiskLocationNotFound if a disk location could not be found
//----------------------------------------------------------------------------
eos::IFileMD::location_t
getFirstDiskLocation(const eos::IFileMD::LocationVector& locations)
{
if (locations.empty()) {
throw DiskLocationNotFound("Failed to find disk location");
}
if (EOS_TAPE_FSID != locations.at(0)) {
return locations.at(0);
}
if (2 > locations.size()) {
throw DiskLocationNotFound("Failed to find disk location");
}
return locations.at(1);
}
}
/******************************************************************************/
/* MGM File Interface */
/******************************************************************************/
/* copied for "eos_static_..." */
static int
emsg(XrdOucErrInfo& error, int ec, const char* txt, const char* txt2)
{
// Get the reason for the error
if (ec < 0) {
ec = -ec;
}
char* etext = strerror(ec);
char sbuff[1024];
char ebuff[64];
if (etext == NULL) {
etext = ebuff;
snprintf(ebuff, sizeof(ebuff), "error code %d", ec);
}
snprintf(sbuff, sizeof(sbuff), "create_cow: unable to %s %s: %s", txt, txt2,
etext);
eos_static_err(sbuff);
error.setErrInfo(ec, sbuff);
return SFS_ERROR;
}
/*
* Auxiliary routine: creates the copy-on-write clone an intermediate directories
* cow_type:
* 0 = copy (for file updates, two files exist)
* 1 = rename (for a "deletes", clone's contents survive under different name)
* 2 = hardlink (file untouched but a new name is created, e.g. for recycle)
*
* returns:
* - error code if the clone could not be created
* - -1 if the file is not to be cloned
*/
int
XrdMgmOfsFile::create_cow(int cowType,
std::shared_ptr dmd, std::shared_ptr fmd,
eos::common::VirtualIdentity& vid, XrdOucErrInfo& error)
{
char sbuff[1024];
uint64_t cloneId = fmd->getCloneId();
if (cloneId == 0 or not fmd->getCloneFST().empty()) {
return -1;
}
eos_static_info("Creating cow clone (type %d) for %s fxid:%lx cloneId %lld",
cowType, fmd->getName().c_str(), fmd->getId(), cloneId);
snprintf(sbuff, sizeof(sbuff), "%s/clone/%ld", gOFS->MgmProcPath.c_str(),
cloneId);
std::shared_ptr cloneMd, dirMd;
try {
cloneMd = gOFS->eosView->getContainer(sbuff);
} catch (eos::MDException& e) {
eos_static_debug("caught exception %d %s path %s\n", e.getErrno(),
e.getMessage().str().c_str(), sbuff);
return emsg(error, ENOENT /*EEXIST*/, "open file ()", sbuff);
}
if (!dmd) {
return emsg(error, ENOENT, "determine parent", fmd->getName().c_str());
}
/* set up directory for clone */
int tlen = strlen(sbuff);
snprintf(sbuff + tlen, sizeof(sbuff) - tlen, "/%lx", dmd->getId());
try {
dirMd = gOFS->eosView->getContainer(sbuff);
} catch (eos::MDException& e) {
dirMd = gOFS->eosView->createContainer(sbuff, true);
dirMd->setMode(dmd->getMode());
eos::IFileMD::XAttrMap xattrs = dmd->getAttributes();
for (const auto& a : xattrs) {
if (a.first == "sys.acl" || a.first == "user.acl" ||
a.first == "sys.eval.useracl") {
dirMd->setAttribute(a.first, a.second);
}
}
}
/* create the clone */
if (cowType ==
XrdMgmOfsFile::cowDelete) { /* basically a "mv" */
dmd->removeFile(fmd->getName());
snprintf(sbuff, sizeof(sbuff), "%lx", fmd->getId());
fmd->setName(sbuff);
fmd->setCloneId(0); /* don't ever cow this again! */
dirMd->addFile(fmd.get());
gOFS->eosFileService->updateStore(fmd.get());
} else { /* cowType == cowUpdate or cowType == cowUnlink */
std::shared_ptr gmd;
eos::IFileMD::ctime_t ttime;
tlen = strlen(sbuff);
snprintf(sbuff + tlen, sizeof(sbuff) - tlen, "/%lx", fmd->getId());
gmd = gOFS->eosView->createFile(sbuff, vid.uid, vid.gid);
gmd->setAttribute("sys.clone.targetFid", sbuff + tlen + 1);
gmd->setSize(fmd->getSize());
if (cowType ==
XrdMgmOfsFile::cowUpdate) { /* prepare a "cp --reflink" (to be performed on the FSTs) */
fmd->getCTime(ttime);
gmd->setCTime(ttime);
fmd->getMTime(ttime);
gmd->setMTime(ttime);
gmd->setCUid(fmd->getCUid());
gmd->setCGid(fmd->getCGid());
gmd->setFlags(fmd->getFlags());
gmd->setLayoutId(fmd->getLayoutId());
gmd->setChecksum(fmd->getChecksum());
gmd->setContainerId(dirMd->getId());
for (unsigned int i = 0; i < fmd->getNumLocation(); i++) {
gmd->addLocation(fmd->getLocation(i));
}
} else if (cowType == cowUnlink) {
int nlink = (fmd->hasAttribute(XrdMgmOfsFile::k_nlink)) ?
std::stoi(fmd->getAttribute(XrdMgmOfsFile::k_nlink)) + 1 : 1;
fmd->setAttribute(k_nlink, std::to_string(nlink));
gOFS->eosFileService->updateStore(fmd.get());
uint64_t hlTarget = eos::common::FileId::FidToInode(fmd->getId());
gmd->setAttribute(XrdMgmOfsFile::k_mdino, std::to_string(hlTarget));
eos_static_debug("create_cow Unlink %s (%ld) -> %s (%ld)",
gmd->getName().c_str(), gmd->getSize(),
fmd->getName().c_str(), fmd->getSize());
}
gOFS->eosFileService->updateStore(gmd.get());
fmd->setCloneFST(eos::common::FileId::Fid2Hex(gmd->getId()));
gOFS->eosFileService->updateStore(fmd.get());
}
gOFS->eosDirectoryService->updateStore(dirMd.get());
gOFS->FuseXCastRefresh(dirMd->getIdentifier(), dirMd->getParentIdentifier());
gOFS->FuseXCastRefresh(cloneMd->getIdentifier(),
cloneMd->getParentIdentifier());
return 0;
}
/*----------------------------------------------------------------------------*
* special handling of hard links
* returns:
* 0 = continue deleting fmd
* 1 = do nothing
*
*----------------------------------------------------------------------------*/
int
XrdMgmOfsFile::handleHardlinkDelete(std::shared_ptr cmd,
std::shared_ptr fmd,
eos::common::VirtualIdentity& vid)
{
if (!cmd) {
return 0;
}
long nlink =
-2; /* assume this has nothing to do with hard links */
if (fmd->hasAttribute(XrdMgmOfsFile::k_mdino)) {
/* this is a hard link, decrease reference count on underlying file */
uint64_t hlTgt = std::stoull(fmd->getAttribute(XrdMgmOfsFile::k_mdino));
uint64_t clock;
/* gmd = the hard link target */
std::shared_ptr gmd = gOFS->eosFileService->getFileMD(
eos::common::FileId::InodeToFid(hlTgt), &clock);
nlink = std::stol(gmd->getAttribute(XrdMgmOfsFile::k_nlink)) - 1;
if (nlink > 0) {
gmd->setAttribute(XrdMgmOfsFile::k_nlink, std::to_string(nlink));
} else {
gmd->removeAttribute(XrdMgmOfsFile::k_nlink);
}
gOFS->eosFileService->updateStore(gmd.get());
eos_static_info("hlnk update target %s for %s nlink %ld",
gmd->getName().c_str(), fmd->getName().c_str(), nlink);
if (nlink <= 0) {
if (gmd->getName().substr(0, 13) == "...eos.ino...") {
eos_static_info("hlnk unlink target %s for %s nlink %ld",
gmd->getName().c_str(), fmd->getName().c_str(), nlink);
uint64_t cloneId = gmd->getCloneId();
if (cloneId != 0 and
gmd->getCloneFST().empty()) { /* this file needs to be cloned */
XrdOucErrInfo error;
std::shared_ptr dmd;
try {
dmd = gOFS->eosDirectoryService->getContainerMD(gmd->getContainerId());
} catch (eos::MDException& e) {
}
XrdMgmOfsFile::create_cow(XrdMgmOfsFile::cowDelete, dmd, gmd, vid, error);
return 1;
} else { /* delete hard link target */
cmd->removeFile(gmd->getName());
gmd->unlinkAllLocations();
gmd->setContainerId(0);
}
gOFS->eosFileService->updateStore(gmd.get());
}
}
} else if (fmd->hasAttribute(
XrdMgmOfsFile::k_nlink)) { /* a hard link target */
nlink = std::stol(fmd->getAttribute(XrdMgmOfsFile::k_nlink));
eos_static_info("hlnk rm target nlink %ld", nlink);
if (nlink > 0) {
// hard links exist, just rename the file so the inode does not disappear
char nameBuf[1024];
uint64_t ino = eos::common::FileId::FidToInode(fmd->getId());
snprintf(nameBuf, sizeof(nameBuf), "...eos.ino...%lx", ino);
std::string nameBufs(nameBuf);
fmd->setAttribute(XrdMgmOfsFile::k_nlink, std::to_string(nlink));
eos_static_info("hlnk unlink rename %s=>%s new nlink %d",
fmd->getName().c_str(), nameBufs.c_str(), nlink);
cmd->removeFile(nameBufs); // if the target exists, remove it!
gOFS->eosView->renameFile(fmd.get(), nameBufs);
return 1;
}
/* no other links exist, continue deleting the target like a simple file */
}
eos_static_debug("hard link nlink %ld, delete %s", nlink,
fmd->getName().c_str());
return 0;
}
//------------------------------------------------------------------------------
// Get the application name if specified
//------------------------------------------------------------------------------
const std::string
XrdMgmOfsFile::GetApplicationName(XrdOucEnv* open_opaque,
const XrdSecEntity* client)
{
// Application name derived from the following in order of priority:
// 1. eos.app=
// 2. XRD_APPNAME= env variable or -DSAppName for xrdcp commands
const std::string eos_tag = "eos.app";
const std::string xrd_tag = "xrd.appname";
std::string app_name;
const char* val = nullptr;
if (open_opaque && (val = open_opaque->Get(eos_tag.c_str()))) {
app_name = val;
} else {
if (client) {
if (!client->eaAPI->Get(xrd_tag, app_name)) {
app_name.clear();
}
}
}
return app_name;
}
//------------------------------------------------------------------------------
// Get POSIX open flags from the given XRootD open mode
//------------------------------------------------------------------------------
int
XrdMgmOfsFile::GetPosixOpenFlags(XrdSfsFileOpenMode open_mode)
{
int open_flags = 0;
if (open_mode & SFS_O_CREAT) {
open_mode = SFS_O_CREAT;
} else if (open_mode & SFS_O_TRUNC) {
open_mode = SFS_O_TRUNC;
}
switch (open_mode & (SFS_O_RDONLY | SFS_O_WRONLY | SFS_O_RDWR |
SFS_O_CREAT | SFS_O_TRUNC)) {
case SFS_O_CREAT:
open_flags = O_CREAT | O_EXCL | O_RDWR;
break;
case SFS_O_TRUNC:
open_flags = O_CREAT | O_TRUNC | O_RDWR;
break;
case SFS_O_RDONLY:
open_flags = O_RDONLY;
break;
case SFS_O_WRONLY:
open_flags = O_WRONLY;
break;
case SFS_O_RDWR:
open_flags = O_RDWR;
break;
default:
open_flags = O_RDONLY;
break;
}
return open_flags;
}
//------------------------------------------------------------------------------
// Get XRootD acceess operation bases on the given open flags
//------------------------------------------------------------------------------
Access_Operation
XrdMgmOfsFile::GetXrdAccessOperation(int open_flags)
{
Access_Operation op;
if (open_flags & O_CREAT) {
op = AOP_Create;
} else {
if (open_flags == O_RDONLY) {
op = AOP_Read;
} else {
op = AOP_Update;
}
}
return op;
}
/*----------------------------------------------------------------------------*/
/*
* @brief open a given file with the indicated mode
*
* @param inpath path to open
* @param open_mode SFS_O_RDONLY,SFS_O_WRONLY,SFS_O_RDWR,SFS_O_CREAT,SFS_TRUNC
* @param Mode posix access mode bits to be assigned
* @param client XRootD authentication object
* @param ininfo CGI
* @return SFS_OK on succes, otherwise SFS_ERROR on error or redirection
*
* Mode may also contain SFS_O_MKPATH if one desires to automatically create
* all missing directories for a file (if possible).
*
*/
/*----------------------------------------------------------------------------*/
int
XrdMgmOfsFile::open(eos::common::VirtualIdentity* invid,
const char* inpath,
XrdSfsFileOpenMode open_mode,
mode_t Mode,
const XrdSecEntity* client,
const char* ininfo)
{
using eos::common::LayoutId;
static const char* epname = "open";
const char* tident = error.getErrUser();
eos::IFileMD::XAttrMap attrmapF;
errno = 0;
eos::common::Timing tm("Open");
COMMONTIMING("begin", &tm);
EXEC_TIMING_BEGIN("Open");
XrdOucString spath = inpath;
XrdOucString sinfo = ininfo;
SetLogId(logId, tident);
int open_flags = GetPosixOpenFlags(open_mode);
bool isRW = ((open_flags == O_RDONLY) ? false : true);
bool isRewrite = ((open_flags & O_CREAT) ? false : true);
Access_Operation acc_op = GetXrdAccessOperation(open_flags);
{
EXEC_TIMING_BEGIN("IdMap");
if (spath.beginswith("/zteos64:")) {
sinfo += "&authz=";
sinfo += spath.c_str() + 1;
ininfo = sinfo.c_str();
}
if (!invid) {
eos::common::Mapping::IdMap(client, ininfo, tident, vid,
gOFS->mTokenAuthz, acc_op, spath.c_str());
} else {
vid = *invid;
}
EXEC_TIMING_END("IdMap");
}
gOFS->MgmStats.Add("IdMap", vid.uid, vid.gid, 1);
COMMONTIMING("IdMap", &tm);
SetLogId(logId, vid, tident);
NAMESPACEMAP;
BOUNCE_ILLEGAL_NAMES;
BOUNCE_NOT_ALLOWED;
spath = path;
COMMONTIMING("Bounce", &tm);
if (!spath.beginswith("/proc/") && spath.endswith("/")) {
return Emsg(epname, error, EISDIR,
"open - you specified a directory as target file name", path);
}
bool isCreation = false;
// flag indicating parallel IO access
bool isPio = false;
// flag indicating access with reconstruction
bool isPioReconstruct = false;
// flag indicating FUSE file access
bool isFuse = false;
// flag indiciating an atomic upload where a file get's a hidden unique name and is renamed when it is closed
bool isAtomicUpload = false;
// flag indicating an atomic file name
bool isAtomicName = false;
// flag indicating a new injection - upload of a file into a stub without physical location
bool isInjection = false;
// flag indicating to drop the current disk replica in the policy space
bool isRepair = false;
// flag indicating a read for repair (meaningfull only on the FST)
bool isRepairRead = false;
// chunk upload ID
XrdOucString ocUploadUuid = "";
// Set of filesystem IDs to reconstruct
std::set pio_reconstruct_fs;
// list of filesystem IDs usable for replacement of RAIN file
std::vector pio_replacement_fs;
// tried hosts CGI
std::string tried_cgi;
// versioning CGI
std::string versioning_cgi;
// file size
uint64_t fmdsize = 0;
// io priority string
std::string ioPriority;
XrdOucString pinfo = (ininfo ? ininfo : "");
eos::common::StringConversion::MaskTag(pinfo, "cap.msg");
eos::common::StringConversion::MaskTag(pinfo, "cap.sym");
eos::common::StringConversion::MaskTag(pinfo, "authz");
if (isRW) {
eos_info("op=write trunc=%d path=%s info=%s",
open_mode & SFS_O_TRUNC, path, pinfo.c_str());
} else {
eos_info("op=read path=%s info=%s", path, pinfo.c_str());
}
ACCESSMODE_R;
if (isRW) {
SET_ACCESSMODE_W;
}
if (ProcInterface::IsProcAccess(path)) {
if (ProcInterface::IsWriteAccess(path, pinfo.c_str())) {
SET_ACCESSMODE_W;
}
} else {
if (getenv("EOS_HA_REDIRECT_READS")) {
SET_ACCESSMODE_R_MASTER;
}
}
MAYSTALL;
MAYREDIRECT;
XrdOucString currentWorkflow = "default";
unsigned long long byfid = 0;
unsigned long long bypid = 0;
COMMONTIMING("fid::fetch", &tm);
/* check paths starting with fid: fxid: ino: ... */
if (spath.beginswith("fid:") || spath.beginswith("fxid:") ||
spath.beginswith("ino:")) {
WAIT_BOOT;
// reference by fid+fsid
byfid = eos::Resolver::retrieveFileIdentifier(spath).getUnderlyingUInt64();
try {
eos::Prefetcher::prefetchFileMDAndWait(gOFS->eosView, byfid);
eos::common::RWMutexReadLock lock(gOFS->eosViewRWMutex);
fmd = gOFS->eosFileService->getFileMD(byfid);
spath = gOFS->eosView->getUri(fmd.get()).c_str();
bypid = fmd->getContainerId();
eos_info("msg=\"access by inode\" ino=%s path=%s", path, spath.c_str());
path = spath.c_str();
} catch (eos::MDException& e) {
eos_debug("caught exception %d %s\n", e.getErrno(),
e.getMessage().str().c_str());
MAYREDIRECT_ENOENT;
MAYSTALL_ENOENT;
return Emsg(epname, error, ENOENT,
"open - you specified a not existing inode number", path);
}
}
COMMONTIMING("fid::fetched", &tm);
openOpaque = new XrdOucEnv(ininfo);
// Handle (delegated) tpc redirection for writes
if (isRW && RedirectTpcAccess()) {
return SFS_REDIRECT;
}
const std::string app_name = GetApplicationName(openOpaque, client);
// Decide if this is a FUSE access
if (!app_name.empty() &&
((app_name == "fuse") || (app_name == "xrootdfs") ||
(app_name.find("fuse::") == 0))) {
isFuse = true;
}
{
// handle io priority
const char* val = 0;
if ((val = openOpaque->Get("eos.iopriority"))) {
if (vid.hasUid(11)) { // operator role
// admin members can set iopriority
ioPriority = val;
} else {
eos_info("msg=\"suppressing IO priority setting '%s', no operator role\"",
val);
}
}
}
{
// Handle obfuscation and encryption
const char* val = 0;
if ((val = openOpaque->Get("eos.obfuscate"))) {
try {
mEosObfuscate = std::strtoul(val, 0, 10);
} catch (...) {
eos_warning("msg=\"ignore invalid eos.obfuscate\" value=\"%s\"", val);
}
}
if ((val = openOpaque->Get("eos.key"))) {
mEosKey = val;
if (mEosObfuscate == 0) {
mEosObfuscate = 1;
}
}
}
{
// figure out if this is an OC upload
const char* val = 0;
if ((val = openOpaque->Get("oc-chunk-uuid"))) {
ocUploadUuid = val;
}
}
{
// populate tried hosts from the CGI
const char* val = 0;
if ((val = openOpaque->Get("tried"))) {
tried_cgi = val;
tried_cgi += ",";
}
}
{
// extract the workflow name from the CGI
const char* val = 0;
if ((val = openOpaque->Get("eos.workflow"))) {
currentWorkflow = val;
}
}
{
// populate versioning cgi from the CGI
const char* val = 0;
if ((val = openOpaque->Get("eos.versioning"))) {
versioning_cgi = val;
}
}
if (!isFuse && isRW) {
// resolve symbolic links
try {
std::string s_path = path;
spath = gOFS->eosView->getRealPath(s_path).c_str();
eos_info("msg=\"rewrote symlinks\" sym-path=%s realpath=%s", s_path.c_str(),
spath.c_str());
path = spath.c_str();
} catch (eos::MDException& e) {
eos_debug("caught exception %d %s\n",
e.getErrno(),
e.getMessage().str().c_str());
// will throw the error later
}
}
// ---------------------------------------------------------------------------
// PIO MODE CONFIGURATION
// ---------------------------------------------------------------------------
// PIO mode return's a vector of URLs to a client and the client contact's
// directly these machines and run's the RAIN codec on client side.
// The default mode return's one gateway machine and this machine run's the
// RAIN codec.
// On the fly reconstruction is done using PIO mode when the reconstruction
// action is defined ('eos.pio.action=reconstruct'). The client can specify
// a list of filesystem's which should be excluded. In case they are used
// in the layout the stripes on the explicitly referenced filesystems and
// all other unavailable filesystems get reconstructed into stripes on
// new machines.
// ---------------------------------------------------------------------------
// ---------------------------------------------------------------------------
// discover PIO mode
// ---------------------------------------------------------------------------
XrdOucString sPio = (openOpaque) ? openOpaque->Get("eos.cli.access") : "";
if (sPio == "pio") {
isPio = true;
}
// Discover PIO reconstruction mode
XrdOucString sPioRecover = (openOpaque) ?
openOpaque->Get("eos.pio.action") : "";
if (sPioRecover == "reconstruct") {
isPioReconstruct = true;
}
{
// Discover PIO reconstruction filesystems (stripes to be replaced)
std::string sPioRecoverFs = (openOpaque) ?
(openOpaque->Get("eos.pio.recfs") ? openOpaque->Get("eos.pio.recfs") : "")
: "";
std::vector fsToken;
eos::common::StringConversion::Tokenize(sPioRecoverFs, fsToken, ",");
if (openOpaque->Get("eos.pio.recfs") && fsToken.empty()) {
return Emsg(epname, error, EINVAL, "open - you specified a list of"
" reconstruction filesystems but the list is empty", path);
}
for (size_t i = 0; i < fsToken.size(); i++) {
errno = 0;
unsigned int rfs = (unsigned int) strtol(fsToken[i].c_str(), 0, 10);
XrdOucString srfs = "";
srfs += (int) rfs;
if (errno || (srfs != fsToken[i].c_str())) {
return Emsg(epname, error, EINVAL, "open - you specified a list of "
"reconstruction filesystems but the list contains non "
"numerical or illegal id's", path);
}
// store in the reconstruction filesystem list
pio_reconstruct_fs.insert(rfs);
}
}
int rcode = SFS_ERROR;
XrdOucString redirectionhost = "invalid?";
XrdOucString targethost = "";
int targetport = atoi(gOFS->MgmOfsTargetPort.c_str());
int targethttpport = gOFS->mHttpdPort;
int ecode = 0;
unsigned long fmdlid = 0;
unsigned long long cid = 0;
// Proc filter
if (ProcInterface::IsProcAccess(path)) {
if (gOFS->mExtAuthz &&
(vid.prot != "sss") &&
(vid.prot != "gsi") &&
(vid.prot != "krb5") &&
(vid.host != "localhost") &&
(vid.host != "localhost.localdomain")) {
return Emsg(epname, error, EPERM, "execute proc command - you don't have"
" the requested permissions for that operation (1)", path);
}
gOFS->MgmStats.Add("OpenProc", vid.uid, vid.gid, 1);
if (!ProcInterface::Authorize(path, ininfo, vid, client)) {
return Emsg(epname, error, EPERM, "execute proc command - you don't have "
"the requested permissions for that operation (2)", path);
} else {
mProcCmd = ProcInterface::GetProcCommand(tident, vid, path, ininfo, logId);
if (mProcCmd) {
eos_static_info("proccmd=%s", mProcCmd->GetCmd(ininfo).c_str());
mProcCmd->SetLogId(logId, vid, tident);
mProcCmd->SetError(&error);
rcode = mProcCmd->open(path, ininfo, vid, &error);
// If we need to stall the client then save the IProcCommand object and
// add it to the map for when the client comes back.
if (rcode > 0) {
if (mProcCmd->GetCmd(ininfo) != "proto") {
return rcode;
}
if (!ProcInterface::SaveSubmittedCmd(tident, std::move(mProcCmd))) {
eos_err("failed to save submitted command object");
return Emsg(epname, error, EINVAL, "save sumitted command object "
"for path ", path);
}
// Now the mProcCmd object is null and moved to the global map
}
return rcode;
} else {
return Emsg(epname, error, ENOMEM, "allocate proc command object for ",
path);
}
}
}
gOFS->MgmStats.Add("Open", vid.uid, vid.gid, 1);
bool dotFxid = spath.beginswith("/.fxid:");
if (dotFxid) {
byfid = eos::Resolver::retrieveFileIdentifier(spath).getUnderlyingUInt64();
try {
eos::Prefetcher::prefetchFileMDAndWait(gOFS->eosView, byfid);
eos::common::RWMutexReadLock lock(gOFS->eosViewRWMutex);
fmd = gOFS->eosFileService->getFileMD(byfid);
spath = gOFS->eosView->getUri(fmd.get()).c_str();
bypid = fmd->getContainerId();
eos_info("msg=\"access by inode\" ino=%s path=%s", path, spath.c_str());
path = spath.c_str();
} catch (eos::MDException& e) {
eos_debug("caught exception %d %s\n", e.getErrno(),
e.getMessage().str().c_str());
return Emsg(epname, error, ENOENT,
"open - you specified a not existing fxid", path);
}
}
COMMONTIMING("authorize", &tm);
AUTHORIZE(client, openOpaque, acc_op,
((acc_op == AOP_Create) ? "create" : "open"), inpath, error);
COMMONTIMING("authorized", &tm);
eos::common::Path cPath(path);
// indicate the scope for a possible token
TOKEN_SCOPE;
isAtomicName = cPath.isAtomicFile();
// prevent any access to a recycling bin for writes
if (isRW && cPath.GetFullPath().beginswith(Recycle::gRecyclingPrefix.c_str())) {
return Emsg(epname, error, EPERM,
"open file - nobody can write to a recycling bin",
cPath.GetParentPath());
}
std::shared_ptr dmd;
// check if we have to create the full path
if (Mode & SFS_O_MKPTH) {
eos_debug("%s", "msg=\"SFS_O_MKPTH was requested\"");
XrdSfsFileExistence file_exists;
std::shared_ptr _fmd;
int ec = gOFS->_exists(cPath.GetParentPath(), file_exists,
error, vid, dmd, _fmd, 0);
// check if that is a file
if ((!ec) && (file_exists != XrdSfsFileExistNo) &&
(file_exists != XrdSfsFileExistIsDirectory)) {
return Emsg(epname, error, ENOTDIR,
"open file - parent path is not a directory",
cPath.GetParentPath());
}
// if it does not exist try to create the path!
if ((!ec) && (file_exists == XrdSfsFileExistNo)) {
ec = gOFS->_mkdir(cPath.GetParentPath(), Mode, error, vid, ininfo);
if (ec) {
gOFS->MgmStats.Add("OpenFailedPermission", vid.uid, vid.gid, 1);
return SFS_ERROR;
}
}
}
bool isSharedFile = gOFS->VerifySharePath(path, openOpaque);
if (gOFS->is_squashfs_access(path, vid)) {
isSharedFile = true;
}
COMMONTIMING("path-computed", &tm);
// Get the directory meta data if it exists
eos::IContainerMD::XAttrMap attrmap;
Acl acl;
Workflow workflow;
bool stdpermcheck = false;
int versioning = 0;
uid_t d_uid = vid.uid;
gid_t d_gid = vid.gid;
std::string creation_path = path;
{
// This is probably one of the hottest code paths in the MGM, we definitely
// want prefetching here.
if (!byfid) {
if (!(open_flags & O_EXCL)) {
// if we want to create, why would we prefetch file md?
eos::Prefetcher::prefetchFileMDAndWait(gOFS->eosView, cPath.GetPath());
} else {
eos::Prefetcher::prefetchContainerMDAndWait(gOFS->eosView,
cPath.GetParentPath());
}
}
eos::common::RWMutexReadLock ns_rd_lock(gOFS->eosViewRWMutex);
try {
if (byfid) {
dmd = gOFS->eosDirectoryService->getContainerMD(bypid);
} else if (!dmd) {
dmd = gOFS->eosView->getContainer(cPath.GetParentPath());
}
// get the attributes out
eos::listAttributes(gOFS->eosView, dmd.get(), attrmap, false);
// extract workflows
workflow.Init(&attrmap);
if (dmd) {
try {
std::string filePath = cPath.GetPath();
std::string fileName = cPath.GetName();
if (ocUploadUuid.length()) {
eos::common::Path aPath(cPath.GetAtomicPath(attrmap.count("sys.versioning"),
ocUploadUuid));
filePath = aPath.GetPath();
fileName = aPath.GetName();
}
if ((fmd = dmd->findFile(fileName))) {
/* in case of a hard link, may need to switch to target */
/* A hard link to another file */
if (fmd->hasAttribute(XrdMgmOfsFile::k_mdino)) {
std::shared_ptr gmd;
uint64_t mdino;
if (eos::common::StringToNumeric(fmd->getAttribute(XrdMgmOfsFile::k_mdino),
mdino)) {
gmd = gOFS->eosFileService->getFileMD(
eos::common::FileId::InodeToFid(mdino));
eos_info("hlnk switched from %s (%#lx) to file %s (%#lx)",
fmd->getName().c_str(), fmd->getId(),
gmd->getName().c_str(), gmd->getId());
fmd = gmd;
} else {
//Conversion from string to inode number failed, log the error and return an error to the client
return Emsg(epname, error, ENOENT,
"convert the inode extended attribute to a number", path);
}
}
if (fmd->isLink()) {
// we have to get it by path
fmd = gOFS->eosView->getFile(filePath);
}
uint64_t dmd_id = fmd->getContainerId();
// If fmd is resolved via a symbolic link, we have to find the
// 'real' parent directory
if (dmd_id != dmd->getId()) {
// retrieve the 'real' parent
try {
dmd = gOFS->eosDirectoryService->getContainerMD(dmd_id);
} catch (const eos::MDException& e) {
// this looks like corruption, but will return in ENOENT for the parent
dmd.reset();
errno = ENOENT;
}
}
// check for O_EXCL here to save some time
if (open_flags & O_EXCL) {
gOFS->MgmStats.Add("OpenFailedExists", vid.uid, vid.gid, 1);
return Emsg(epname, error, EEXIST, "create file - (O_EXCL)", path);
}
}
} catch (eos::MDException& e) {
fmd.reset();
}
if (!fmd) {
if (dmd && dmd->findContainer(cPath.GetName())) {
errno = EISDIR;
} else {
errno = ENOENT;
}
} else {
mFid = fmd->getId();
fmdlid = fmd->getLayoutId();
cid = fmd->getContainerId();
fmdsize = fmd->getSize();
}
if (dmd) {
d_uid = dmd->getCUid();
d_gid = dmd->getCGid();
}
} else {
fmd.reset();
}
} catch (eos::MDException& e) {
dmd.reset();
errno = e.getErrno();
eos_debug("msg=\"exception\" ec=%d emsg=\"%s\"\n",
e.getErrno(), e.getMessage().str().c_str());
};
COMMONTIMING("container::fetched", &tm);
// Check permissions
if (!dmd) {
int save_errno = errno;
MAYREDIRECT_ENOENT;
if (cPath.GetSubPath(2)) {
eos_info("info=\"checking l2 path\" path=%s", cPath.GetSubPath(2));
// Check if we have a redirection setting at level 2 in the namespace
try {
dmd = gOFS->eosView->getContainer(cPath.GetSubPath(2));
// get the attributes out
eos::listAttributes(gOFS->eosView, dmd.get(), attrmap, false);
} catch (eos::MDException& e) {
dmd.reset();
errno = e.getErrno();
eos_debug("msg=\"exception\" ec=%d emsg=%s\n",
e.getErrno(), e.getMessage().str().c_str());
}
// ---------------------------------------------------------------------
if (attrmap.count("sys.redirect.enoent")) {
// there is a redirection setting here
redirectionhost = "";
redirectionhost = attrmap["sys.redirect.enoent"].c_str();
int portpos = 0;
if ((portpos = redirectionhost.find(":")) != STR_NPOS) {
XrdOucString port = redirectionhost;
port.erase(0, portpos + 1);
ecode = atoi(port.c_str());
redirectionhost.erase(portpos);
} else {
ecode = 1094;
}
if (!gOFS->SetRedirectionInfo(error, redirectionhost.c_str(), ecode)) {
eos_err("msg=\"failed setting redirection\" path=\"%s\"", path);
return SFS_ERROR;
}
rcode = SFS_REDIRECT;
gOFS->MgmStats.Add("RedirectENOENT", vid.uid, vid.gid, 1);
XrdOucString predirectionhost = redirectionhost.c_str();
eos::common::StringConversion::MaskTag(predirectionhost, "cap.msg");
eos::common::StringConversion::MaskTag(predirectionhost, "cap.sym");
eos::common::StringConversion::MaskTag(pinfo, "authz");
eos_info("info=\"redirecting\" hostport=%s:%d", predirectionhost.c_str(),
ecode);
return rcode;
}
}
// put back original errno
errno = save_errno;
gOFS->MgmStats.Add("OpenFailedENOENT", vid.uid, vid.gid, 1);
return Emsg(epname, error, errno, "open file", path);
}
bool sticky_owner;
attr::checkDirOwner(attrmap, d_uid, d_gid, vid, sticky_owner, path);
// -------------------------------------------------------------------------
// ACL and permission check
// -------------------------------------------------------------------------
if (dotFxid and (not vid.sudoer) and (vid.uid != 0)) {
/* restricted: this could allow access to a file hidden by the hierarchy */
eos_debug(".fxid=%d uid %d sudoer %d", dotFxid, vid.uid, vid.sudoer);
errno = EPERM;
return Emsg(epname, error, errno, "open file - open by fxid denied", path);
}
if (fmd) {
eos::listAttributes(gOFS->eosView, fmd.get(), attrmapF, false);
}
acl.SetFromAttrMap(attrmap, vid, &attrmapF);
eos_info("acl=%d r=%d w=%d wo=%d egroup=%d shared=%d mutable=%d facl=%d",
acl.HasAcl(), acl.CanRead(), acl.CanWrite(), acl.CanWriteOnce(),
acl.HasEgroup(), isSharedFile, acl.IsMutable(),
acl.EvalUserAttrFile());
if (acl.HasAcl()) {
if ((vid.uid != 0) && (!vid.sudoer) &&
(isRW ? (acl.CanNotWrite() && acl.CanNotUpdate()) : acl.CanNotRead())) {
eos_debug("uid %d sudoer %d isRW %d CanNotRead %d CanNotWrite %d CanNotUpdate %d",
vid.uid, vid.sudoer, isRW, acl.CanNotRead(), acl.CanNotWrite(),
acl.CanNotUpdate());
errno = EPERM;
gOFS->MgmStats.Add("OpenFailedPermission", vid.uid, vid.gid, 1);
return Emsg(epname, error, errno, "open file - forbidden by ACL", path);
}
if (isRW) {
// Update case - unless SFS_O_TRUNC is specified then this is a normal write
if (fmd && ((open_flags & O_TRUNC) == 0)) {
eos_debug("CanUpdate %d CanNotUpdate %d stdpermcheck %d file uid/gid = %d/%d",
acl.CanUpdate(), acl.CanNotUpdate(), stdpermcheck, fmd->getCUid(),
fmd->getCGid());
if (acl.CanNotUpdate() || (acl.CanNotWrite() && !acl.CanUpdate())) {
// the ACL has !u set - we don't allow to do file updates
gOFS->MgmStats.Add("OpenFailedNoUpdate", vid.uid, vid.gid, 1);
return Emsg(epname, error, EPERM, "update file - fobidden by ACL",
path);
}
stdpermcheck = (!acl.CanUpdate());
} else {
// Write case
if (!(acl.CanWrite() || acl.CanWriteOnce())) {
// We have to check the standard permissions
stdpermcheck = true;
}
}
} else {
// read case
if (!acl.CanRead()) {
// we have to check the standard permissions
stdpermcheck = true;
}
}
} else {
stdpermcheck = true;
}
if (isRW && !acl.IsMutable() && vid.uid && !vid.sudoer) {
// immutable directory
errno = EPERM;
gOFS->MgmStats.Add("OpenFailedPermission", vid.uid, vid.gid, 1);
return Emsg(epname, error, errno, "open file - directory immutable", path);
}
// check publicaccess level
if (!gOFS->allow_public_access(path, vid)) {
return Emsg(epname, error, EACCES, "access - public access level restriction",
path);
}
int taccess = -1;
if ((!isSharedFile || isRW) && stdpermcheck
&& (!(taccess = dmd->access(vid.uid, vid.gid,
(isRW) ? W_OK | X_OK : R_OK | X_OK)))) {
eos_debug("fCUid %d dCUid %d uid %d isSharedFile %d isRW %d stdpermcheck %d access %d",
fmd ? fmd->getCUid() : 0, dmd->getCUid(), vid.uid, isSharedFile, isRW,
stdpermcheck, taccess);
if (!((vid.uid == DAEMONUID) && (isPioReconstruct))) {
// we don't apply this permission check for reconstruction jobs issued via the daemon account
errno = EPERM;
gOFS->MgmStats.Add("OpenFailedPermission", vid.uid, vid.gid, 1);
return Emsg(epname, error, errno, "open file", path);
}
}
if (sticky_owner) {
eos_info("msg=\"client acting as directory owner\" path=\"%s\" uid=\"%u=>%u\" gid=\"%u=>%u\"",
path, vid.uid, vid.gid, d_uid, d_gid);
vid.uid = d_uid;
vid.gid = d_gid;
}
// If a file has the sys.proc attribute, it will be redirected as a command
if (fmd != nullptr && fmd->hasAttribute("sys.proc")) {
ns_rd_lock.Release();
return open("/proc/user/", open_mode, Mode, client,
fmd->getAttribute("sys.proc").c_str());
}
}
// check for versioning depth, cgi overrides sys & user attributes
versioning = attr::getVersioning(attrmap, versioning_cgi);
// check for atomic uploads only in non fuse clients
isAtomicUpload = !isFuse &&
attr::checkAtomicUpload(attrmap, openOpaque->Get("eos.atomic"));
// check for injection in non fuse clients with cgi
isInjection = !isFuse && openOpaque->Get("eos.injection");
if (openOpaque->Get("eos.repair")) {
isRepair = true;
}
if (openOpaque->Get("eos.repairread")) {
isRepairRead = true;
}
// Short-cut to block multi-source access to EC files
if (IsRainRetryWithExclusion(isRW, fmdlid)) {
return Emsg(epname, error, ENETUNREACH, "open file - "
"multi-source reading on EC file blocked for ", path);
}
// ---------------------------------------------------------------------------
// attribute lock logic, don't allow file opens which have an attr lock
// ---------------------------------------------------------------------------
XattrLock alock(attrmapF);
if (alock.foreignLock(vid, isRW)) {
return Emsg(epname, error, EPERM,
"open file - file has a valid extended attribute lock ", path);
}
if (isRW) {
// Allow updates of 0-size RAIN files so that we are able to write from the
// FUSE mount with lazy-open mode enabled.
if (!getenv("EOS_ALLOW_RAIN_RWM") && isRewrite && (vid.uid > 3) &&
(fmdsize != 0) && (LayoutId::IsRain(fmdlid))) {
// Unpriviledged users are not allowed to open RAIN files for update
gOFS->MgmStats.Add("OpenFailedNoUpdate", vid.uid, vid.gid, 1);
return Emsg(epname, error, EPERM, "update RAIN layout file - "
"you have to be a priviledged user for updates");
}
if (!isInjection && (open_flags & O_TRUNC) && fmd) {
// check if this directory is write-once for the mapped user
if (acl.HasAcl()) {
if (acl.CanWriteOnce()) {
gOFS->MgmStats.Add("OpenFailedNoUpdate", vid.uid, vid.gid, 1);
// this is a write once user
return Emsg(epname, error, EEXIST,
"overwrite existing file - you are write-once user");
} else {
if ((!stdpermcheck) && (!acl.CanWrite())) {
return Emsg(epname, error, EPERM,
"overwrite existing file - you have no write permission");
}
}
}
if (versioning) {
if (isAtomicUpload) {
// atomic uploads need just to purge version to max-1, the version is created on commit
// purge might return an error if the file was not yet existing/versioned
gOFS->PurgeVersion(cPath.GetVersionDirectory(), error, versioning - 1);
errno = 0;
} else {
// handle the versioning for a specific file ID
if (gOFS->Version(mFid, error, vid, versioning)) {
return Emsg(epname, error, errno, "version file", path);
}
}
} else {
// drop the old file (for non atomic uploads) and create a new truncated one
if ((!isAtomicUpload) && gOFS->_rem(path, error, vid, ininfo, false, false)) {
return Emsg(epname, error, errno, "remove file for truncation", path);
}
}
if (!ocUploadUuid.length()) {
fmd.reset();
} else {
eos_info("%s", "msg=\"keep attached to existing fmd in chunked upload\"");
}
gOFS->MgmStats.Add("OpenWriteTruncate", vid.uid, vid.gid, 1);
} else {
if (isInjection && !fmd) {
errno = ENOENT;
return Emsg(epname, error, errno, "inject into a non-existing file", path);
}
if (!(fmd) && ((open_flags & O_CREAT))) {
gOFS->MgmStats.Add("OpenWriteCreate", vid.uid, vid.gid, 1);
} else {
if (acl.HasAcl()) {
if (acl.CanWriteOnce()) {
// this is a write once user
return Emsg(epname, error, EEXIST,
"overwrite existing file - you are write-once user");
} else {
if ((!stdpermcheck) && (!acl.CanWrite()) && (!acl.CanUpdate())) {
return Emsg(epname, error, EPERM,
"overwrite existing file - you have no write permission");
}
}
}
gOFS->MgmStats.Add("OpenWrite", vid.uid, vid.gid, 1);
}
}
// -------------------------------------------------------------------------
// write case
// -------------------------------------------------------------------------
if (!fmd) {
if (!(open_flags & O_CREAT)) {
// Open for write for non existing file without creation flag
return Emsg(epname, error, ENOENT, "open file without creation flag", path);
} else {
// creation of a new file or isOcUpload
COMMONTIMING("write::begin", &tm);
{
// -------------------------------------------------------------------
std::shared_ptr ref_fmd;
eos::common::RWMutexWriteLock ns_wr_lock(gOFS->eosViewRWMutex);
try {
// we create files with the uid/gid of the parent directory
if (isAtomicUpload) {
creation_path = cPath.GetAtomicPath(versioning, ocUploadUuid);
eos_info("atomic-path=%s", creation_path.c_str());
try {
ref_fmd = gOFS->eosView->getFile(path);
} catch (eos::MDException& e) {
// empty
}
}
// Avoid any race condition when opening for creation O_EXCL
if (open_flags & O_EXCL) {
if (isAtomicUpload) {
try {
fmd = gOFS->eosView->getFile(creation_path);
} catch (eos::MDException& e1) {
// empty
}
}
if (fmd) {
gOFS->MgmStats.Add("OpenFailedExists", vid.uid, vid.gid, 1);
return Emsg(epname, error, EEXIST, "create file - (O_EXCL)", path);
}
}
{
// a faster replacement for createFile view view
auto file = gOFS->eosFileService->createFile(0);
if (!file) {
eos_static_crit("File creation failed for %s", creation_path.c_str());
throw_mdexception(EIO, "File creation failed");
}
eos::common::Path cPath(creation_path.c_str());
std::string fileName = cPath.GetName();
file->setName(fileName);
file->setCUid(vid.uid);
file->setCGid(vid.gid);
file->setCTimeNow();
file->setATimeNow(0);
file->setMTimeNow();
file->clearChecksum(0);
dmd->addFile(file.get());
fmd = file;
}
if ((mEosObfuscate > 0) ||
(attrmap.count("sys.file.obfuscate") &&
(attrmap["sys.file.obfuscate"] == "1"))) {
std::string skey = eos::common::SymKey::RandomCipher(mEosKey);
// attach an obfucation key
fmd->setAttribute("user.obfuscate.key", skey);
if (mEosKey.length()) {
fmd->setAttribute("user.encrypted", "1");
}
attrmapF["user.obfuscate.key"] = skey;
}
if (ocUploadUuid.length()) {
fmd->setFlags(0);
} else {
fmd->setFlags(Mode & (S_IRWXU | S_IRWXG | S_IRWXO));
}
// For versions copy xattrs over from the original file
if (versioning) {
static std::set skip_tag {"sys.eos.btime", "sys.fs.tracking", eos::common::EOS_DTRACE_ATTR, eos::common::EOS_VTRACE_ATTR, "sys.tmp.atomic"};
for (const auto& xattr : attrmapF) {
if (skip_tag.find(xattr.first) == skip_tag.end()) {
fmd->setAttribute(xattr.first, xattr.second);
}
}
}
fmd->setAttribute("sys.utrace", logId);
fmd->setAttribute("sys.vtrace", vid.getTrace());
if (ref_fmd) {
// If we have a target file we tag the latest atomic upload name
// on a temporary attribute
ref_fmd->setAttribute("sys.tmp.atomic", fmd->getName());
if (acl.EvalUserAttrFile()) {
// we inherit existing ACLs during (atomic) versioning
ref_fmd->setAttribute("user.acl", acl.UserAttrFile());
ref_fmd->setAttribute("sys.eval.useracl", "1");
}
}
mFid = fmd->getId();
fmdlid = fmd->getLayoutId();
// oc chunks start with flags=0
cid = fmd->getContainerId();
auto cmd = dmd; // we have this already
cmd->setMTimeNow();
eos::ContainerIdentifier cmd_id = cmd->getIdentifier();
eos::ContainerIdentifier cmd_pid = cmd->getParentIdentifier();
gOFS->mReplicationTracker->Create(fmd);
ns_wr_lock.Release();
cmd->notifyMTimeChange(gOFS->eosDirectoryService);
gOFS->eosView->updateContainerStore(cmd.get());
gOFS->eosView->updateFileStore(fmd.get());
if (ref_fmd) {
gOFS->eosView->updateFileStore(ref_fmd.get());
}
gOFS->FuseXCastRefresh(cmd_id, cmd_pid);
} catch (eos::MDException& e) {
fmd.reset();
errno = e.getErrno();
eos_debug("msg=\"exception\" ec=%d emsg=\"%s\"\n",
e.getErrno(), e.getMessage().str().c_str());
};
// -------------------------------------------------------------------
} // end ns_lock
COMMONTIMING("write::end", &tm);
if (!fmd) {
// creation failed
gOFS->MgmStats.Add("OpenFailedCreate", vid.uid, vid.gid, 1);
return Emsg(epname, error, errno, "create file", path);
}
isCreation = true;
// -------------------------------------------------------------------------
}
} else {
// we attached to an existing file
if (open_flags & O_EXCL) {
gOFS->MgmStats.Add("OpenFailedExists", vid.uid, vid.gid, 1);
return Emsg(epname, error, EEXIST, "create file (O_EXCL)", path);
}
}
} else {
if (!fmd) {
// check if there is a redirect or stall for missing entries
MAYREDIRECT_ENOENT;
MAYSTALL_ENOENT;
if (auto redirect_kv = attrmap.find("sys.redirect.enoent");
redirect_kv != attrmap.end()) {
// there is a redirection setting here
redirectionhost = "";
redirectionhost = redirect_kv->second.c_str();
int portpos = 0;
if ((portpos = redirectionhost.find(":")) != STR_NPOS) {
XrdOucString port = redirectionhost;
port.erase(0, portpos + 1);
ecode = atoi(port.c_str());
redirectionhost.erase(portpos);
} else {
ecode = 1094;
}
if (!gOFS->SetRedirectionInfo(error, redirectionhost.c_str(), ecode)) {
eos_err("msg=\"failed setting redirection\" path=\"%s\"", path);
return SFS_ERROR;
}
rcode = SFS_REDIRECT;
gOFS->MgmStats.Add("RedirectENOENT", vid.uid, vid.gid, 1);
return rcode;
}
gOFS->MgmStats.Add("OpenFailedENOENT", vid.uid, vid.gid, 1);
return Emsg(epname, error, errno, "open file", path);
}
if (isSharedFile) {
gOFS->MgmStats.Add("OpenShared", vid.uid, vid.gid, 1);
} else {
gOFS->MgmStats.Add("OpenRead", vid.uid, vid.gid, 1);
}
}
// ---------------------------------------------------------------------------
// flush synchronization logic, don't open a file which is currently flushing
// ---------------------------------------------------------------------------
if (gOFS->zMQ->gFuseServer.Flushs().hasFlush(eos::common::FileId::FidToInode(
mFid))) {
// the first 255ms are covered inside hasFlush, otherwise we stall clients for a sec
return gOFS->Stall(error, 1, "file is currently being flushed");
}
// ---------------------------------------------------------------------------
// construct capability
// ---------------------------------------------------------------------------
XrdOucString capability = "";
if (gOFS->mTapeEnabled) {
capability += "&tapeenabled=1";
}
if (isPioReconstruct) {
capability += "&mgm.access=update";
} else {
if (isRW) {
if (isRewrite) {
capability += "&mgm.access=update";
} else {
capability += "&mgm.access=create";
}
uint64_t cloneId;
if (fmd && (cloneId = fmd->getCloneId()) != 0) {
char sbuff[1024];
std::string cloneFST = fmd->getCloneFST();
if (cloneFST == "") { /* This triggers the copy-on-write */
if (int rc = create_cow(cowUpdate, dmd, fmd, vid, error)) {
return rc;
}
}
eos_debug("file %s cloneid %ld cloneFST %s trunc %d", path, fmd->getCloneId(),
fmd->getCloneFST().c_str(), open_flags & O_TRUNC);
snprintf(sbuff, sizeof(sbuff), "&mgm.cloneid=%ld&mgm.cloneFST=%s", cloneId,
fmd->getCloneFST().c_str());
capability += sbuff;
}
} else {
capability += "&mgm.access=read";
}
}
if (mEosObfuscate && !isFuse) {
// add obfuscation key to redirection capability
if (attrmapF.count("user.obfuscate.key")) {
capability += "&mgm.obfuscate.key=";
capability += attrmapF["user.obfuscate.key"].c_str();
}
// add encryption key to redirection capability
if (mEosKey.length()) {
capability += "&mgm.encryption.key=";
capability += mEosKey.c_str();
}
}
// ---------------------------------------------------------------------------
// forward some allowed user opaque tags
// ---------------------------------------------------------------------------
unsigned long layoutId = (isCreation) ? LayoutId::kPlain : fmdlid;
// the client can force to read a file on a defined file system
unsigned long forcedFsId = 0;
// the client can force to place a file in a specified group of a space
long forced_group = -1;
// this is the filesystem defining the client access point in the selection
// vector - for writes it is always 0, for reads it comes out of the
// FileAccess function
unsigned long fsIndex = 0;
XrdOucString space = "default";
unsigned long new_lid = 0;
eos::mgm::Scheduler::tPlctPolicy plctplcy;
std::string targetgeotag;
std::string bandwidth;
std::string ioprio;
std::string iotype;
bool schedule = false;
uint64_t atimeage = 0;
// select space and layout according to policies
COMMONTIMING("Policy::begin", &tm);
Policy::GetLayoutAndSpace(path, attrmap, vid, new_lid, space, *openOpaque,
forcedFsId, forced_group, bandwidth, schedule,
ioprio, iotype, isRW, true, &atimeage);
COMMONTIMING("Policy::end", &tm);
// do a local redirect here if there is only one replica attached
if (!isRW && !isPio && (fmd->getNumLocation() == 1) &&
Policy::RedirectLocal(path, attrmap, vid, layoutId, space, *openOpaque)) {
XrdCl::URL url(std::string("root://localhost//") + std::string(
path ? path : "/dummy/") + std::string("?") + std::string(
ininfo ? ininfo : ""));
std::string localhost = "localhost";
if (gOFS->Tried(url, localhost, "*")) {
gOFS->MgmStats.Add("OpenFailedRedirectLocal", vid.uid, vid.gid, 1);
eos_info("msg=\"local-redirect disabled - forwarding to FST\" path=\"%s\" info=\"%s\"",
path, ininfo);
} else {
eos::common::FileSystem::fs_snapshot_t local_snapshot;
unsigned int local_id = fmd->getLocation(0);
{
eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex);
eos::mgm::FileSystem* local_fs = FsView::gFsView.mIdView.lookupByID(local_id);
local_fs->SnapShotFileSystem(local_snapshot);
}
// compute the local path
std::string local_path = eos::common::FileId::FidPrefix2FullPath(
eos::common::FileId::Fid2Hex(fmd->getId()).c_str(),
local_snapshot.mPath.c_str());
eos_info("msg=\"local-redirect screening - forwarding to local\" local-path=\"%s\" path=\"%s\" info=\"%s\"",
local_path.c_str(), path, ininfo);
redirectionhost = "file://localhost";
redirectionhost += local_path.c_str();
ecode = -1;
if (!gOFS->SetRedirectionInfo(error, redirectionhost.c_str(), ecode)) {
eos_err("msg=\"failed setting redirection\" path=\"%s\"", path);
return SFS_ERROR;
}
rcode = SFS_REDIRECT;
gOFS->MgmStats.Add("OpenRedirectLocal", vid.uid, vid.gid, 1);
eos_info("local-redirect=\"%s\"", redirectionhost.c_str());
return rcode;
}
}
if (ioPriority.length()) {
ioprio = ioPriority;
capability += "&mgm.iopriority=";
capability += ioPriority.c_str();
} else {
if (ioprio.length()) {
capability += "&mgm.iopriority=";
capability += ioprio.c_str();
}
}
if (schedule) {
capability += "&mgm.schedule=1";
}
if (iotype.length()) {
capability += "&mgm.iotype=";
capability += iotype.c_str();
}
if (fmd && atimeage) {
static std::set skip_tag {"balancer", "groupdrainer", "groupbalancer", "geobalancer", "drainer", "converter", "fsck"};
if (app_name.empty() || (skip_tag.find(app_name) == skip_tag.end())) {
// do a potential atime update, we don't need a name
try {
if (fmd->setATimeNow(atimeage)) {
gOFS->eosView->updateFileStore(fmd.get());
}
} catch (eos::MDException& e) {
errno = e.getErrno();
std::string errmsg = e.getMessage().str();
eos_debug("msg=\"exception\" ec=%d emsg=\"%s\"\n",
e.getErrno(), e.getMessage().str().c_str());
gOFS->MgmStats.Add("OpenFailedQuota", vid.uid, vid.gid, 1);
return Emsg(epname, error, errno, "open file and update atime for reading",
errmsg.c_str());
}
}
}
// get placement policy
Policy::GetPlctPolicy(path, attrmap, vid, *openOpaque, plctplcy, targetgeotag);
unsigned long long ext_mtime_sec = 0;
unsigned long long ext_mtime_nsec = 0;
unsigned long long ext_ctime_sec = 0;
unsigned long long ext_ctime_nsec = 0;
std::string ext_etag;
std::map ext_xattr_map;
if (openOpaque->Get("eos.ctime")) {
std::string str_ctime = openOpaque->Get("eos.ctime");
size_t pos = str_ctime.find('.');
if (pos == std::string::npos) {
ext_ctime_sec = strtoull(str_ctime.c_str(), 0, 10);
ext_ctime_nsec = 0;
} else {
ext_ctime_sec = strtoull(str_ctime.substr(0, pos).c_str(), 0, 10);
ext_ctime_nsec = strtoull(str_ctime.substr(pos + 1).c_str(), 0, 10);
}
}
if (openOpaque->Get("eos.mtime")) {
std::string str_mtime = openOpaque->Get("eos.mtime");
size_t pos = str_mtime.find('.');
if (pos == std::string::npos) {
ext_mtime_sec = strtoull(str_mtime.c_str(), 0, 10);
ext_mtime_nsec = 0;
} else {
ext_mtime_sec = strtoull(str_mtime.substr(0, pos).c_str(), 0, 10);
ext_mtime_nsec = strtoull(str_mtime.substr(pos + 1).c_str(), 0, 10);
}
}
if (openOpaque->Get("eos.etag")) {
ext_etag = openOpaque->Get("eos.etag");
}
if (openOpaque->Get("eos.xattr")) {
std::vector xattr_keys;
eos::common::StringConversion::GetKeyValueMap(openOpaque->Get("eos.xattr"),
ext_xattr_map, "=", "#", &xattr_keys);
for (auto it = xattr_keys.begin(); it != xattr_keys.end(); ++it) {
if (it->substr(0, 5) != "user.") {
ext_xattr_map.erase(*it);
}
}
}
if ((!isInjection) && (isCreation || (open_flags & O_TRUNC))) {
eos_info("blocksize=%llu lid=%x", LayoutId::GetBlocksize(new_lid), new_lid);
layoutId = new_lid;
{
std::shared_ptr fmdnew;
if (!byfid) {
try {
fmdnew = dmd->findFile(fileName);
} catch (eos::MDException& e) {
if ((!isAtomicUpload) && (fmdnew != fmd)) {
// file has been recreated in the meanwhile
return Emsg(epname, error, EEXIST, "open file (file recreated)", path);
}
}
}
// Set the layout and commit new meta data
fmd->setLayoutId(layoutId);
if (isFuse && (open_flags & O_TRUNC)) {
std::string s;
if (fmd->hasAttribute("sys.fusex.state")) {
s = fmd->getAttribute("sys.fusex.state");
}
s += "T";
fmd->setAttribute("sys.fusex.state",
eos::common::StringConversion::ReduceString(s).c_str());
}
// if specified set an external modification/creation time
if (ext_mtime_sec) {
eos::IFileMD::ctime_t mtime;
mtime.tv_sec = ext_mtime_sec;
mtime.tv_nsec = ext_mtime_nsec;
fmd->setMTime(mtime);
} else {
fmd->setMTimeNow();
}
if (ext_ctime_sec) {
eos::IFileMD::ctime_t ctime;
ctime.tv_sec = ext_ctime_sec;
ctime.tv_nsec = ext_ctime_nsec;
fmd->setCTime(ctime);
}
if (isCreation) {
// store the birth time as an extended attribute
eos::IFileMD::ctime_t ctime;
fmd->getCTime(ctime);
char btime[256];
snprintf(btime, sizeof(btime), "%lu.%lu", ctime.tv_sec, ctime.tv_nsec);
fmd->setAttribute("sys.eos.btime", btime);
} else {
fmd->setATimeNow(0);
}
// if specified set an external temporary ETAG
if (ext_etag.length()) {
fmd->setAttribute("sys.tmp.etag", ext_etag);
}
for (auto it = ext_xattr_map.begin(); it != ext_xattr_map.end(); ++it) {
fmd->setAttribute(it->first, it->second);
}
if (acl.EvalUserAttrFile()) {
// we inherit existing ACLs during (atomic) versioning
fmd->setAttribute("user.acl", acl.UserAttrFile());
fmd->setAttribute("sys.eval.useracl", "1");
}
try {
eos::common::RWMutexWriteLock ns_wr_lock(gOFS->eosViewRWMutex);
eos::FileIdentifier fmd_id = fmd->getIdentifier();
std::shared_ptr cmd =
gOFS->eosDirectoryService->getContainerMD(cid);
eos::ContainerIdentifier cmd_id = cmd->getIdentifier();
eos::ContainerIdentifier pcmd_id = cmd->getParentIdentifier();
cmd->setMTimeNow();
if (isCreation || (!fmd->getNumLocation())) {
eos::IQuotaNode* ns_quota = gOFS->eosView->getQuotaNode(cmd.get());
if (ns_quota) {
ns_quota->addFile(fmd.get());
}
}
ns_wr_lock.Release();
COMMONTIMING("filemd::update", &tm);
gOFS->eosView->updateFileStore(fmd.get());
cmd->notifyMTimeChange(gOFS->eosDirectoryService);
gOFS->eosView->updateContainerStore(cmd.get());
gOFS->FuseXCastRefresh(fmd_id, cmd_id);
gOFS->FuseXCastRefresh(cmd_id, pcmd_id);
COMMONTIMING("fusex::bc", &tm);
} catch (eos::MDException& e) {
errno = e.getErrno();
std::string errmsg = e.getMessage().str();
eos_debug("msg=\"exception\" ec=%d emsg=\"%s\"\n",
e.getErrno(), e.getMessage().str().c_str());
gOFS->MgmStats.Add("OpenFailedQuota", vid.uid, vid.gid, 1);
return Emsg(epname, error, errno, "open file", errmsg.c_str());
}
}
}
// 0-size files can be read from the MGM if this is not FUSE access
// atomic files are only served from here and also rain files are skipped
if (!isRW && !fmd->getSize() && (!isFuse || isAtomicName)) {
if (isAtomicName || (!LayoutId::IsRain(layoutId))) {
eos_info("msg=\"0-size file read from the MGM\" path=%s", path);
mIsZeroSize = true;
return SFS_OK;
}
}
// @todo(esindril) the tag is wrong should actually be mgm.uid
capability += "&mgm.ruid=";
capability += (int) vid.uid;
capability += "&mgm.rgid=";
capability += (int) vid.gid;
// @todo(esindril) not used and should be removed
capability += "&mgm.uid=99";
capability += "&mgm.gid=99";
capability += "&mgm.path=";
{
// an '&' will create a failure on the FST
XrdOucString safepath = spath.c_str();
eos::common::StringConversion::SealXrdPath(safepath);
capability += safepath;
}
capability += "&mgm.manager=";
capability += gOFS->ManagerId.c_str();
capability += "&mgm.fid=";
std::string hex_fid;
if (!isRW) {
const char* val;
if ((val = openOpaque->Get("eos.clonefst")) && (strlen(val) < 32)) {
hex_fid = fmd->getCloneFST();
eos_debug("open read eos.clonefst %s hex_fid %s", val, hex_fid.c_str());
if (hex_fid != val) {
return Emsg(epname, error, EINVAL, "open - invalid clonefst argument", path);
}
}
}
if (hex_fid.empty()) {
hex_fid = eos::common::FileId::Fid2Hex(mFid);
}
capability += hex_fid.c_str();
XrdOucString sizestring;
capability += "&mgm.cid=";
capability += eos::common::StringConversion::GetSizeString(sizestring, cid);
// add the mgm.sec information to the capability
capability += "&mgm.sec=";
capability += eos::common::SecEntity::ToKey(client, app_name.c_str()).c_str();
if (attrmap.count("user.tag")) {
capability += "&mgm.container=";
capability += attrmap["user.tag"].c_str();
}
// Size which will be reserved with a placement of one replica for the file
unsigned long long bookingsize = 0;
bool hasClientBookingSize = false;
unsigned long long targetsize = 0;
unsigned long long minimumsize = 0;
unsigned long long maximumsize = 0;
if (attrmap.count("sys.forced.bookingsize")) {
// we allow only a system attribute not to get fooled by a user
bookingsize = strtoull(attrmap["sys.forced.bookingsize"].c_str(), 0, 10);
} else {
if (attrmap.count("user.forced.bookingsize")) {
bookingsize = strtoull(attrmap["user.forced.bookingsize"].c_str(), 0, 10);
} else {
bookingsize = 1024ll; // 1k as default
if (openOpaque->Get("eos.bookingsize")) {
bookingsize = strtoull(openOpaque->Get("eos.bookingsize"), 0, 10);
hasClientBookingSize = true;
} else {
if (openOpaque->Get("oss.asize")) {
bookingsize = strtoull(openOpaque->Get("oss.asize"), 0, 10);
hasClientBookingSize = true;
}
}
}
}
if (attrmap.count("sys.forced.minsize")) {
minimumsize = strtoull(attrmap["sys.forced.minsize"].c_str(), 0, 10);
}
if (attrmap.count("sys.forced.maxsize")) {
maximumsize = strtoull(attrmap["sys.forced.maxsize"].c_str(), 0, 10);
}
if (openOpaque->Get("oss.asize")) {
targetsize = strtoull(openOpaque->Get("oss.asize"), 0, 10);
}
if (openOpaque->Get("eos.targetsize")) {
targetsize = strtoull(openOpaque->Get("eos.targetsize"), 0, 10);
}
std::string spacename = space.c_str();
auto strategy = gOFS->mFsScheduler->getPlacementStrategy(spacename);
const char* strategy_cstr;
if ((strategy_cstr = openOpaque->Get("eos.schedulingstrategy"))) {
strategy = placement::strategy_from_str(strategy_cstr);
eos_debug("msg=\"using scheduling strategy\" strategy=%s",
strategy_to_str(strategy).c_str());
}
bool use_geoscheduler = strategy ==
placement::PlacementStrategyT::kGeoScheduler;
//eos::mgm::FileSystem* filesystem = 0;
std::vector selectedfs;
std::vector excludefs = GetExcludedFsids();
std::vector proxys;
std::vector firewalleps;
// file systems which are unavailable during a read operation
std::vector unavailfs;
// file systems which have been replaced with a new reconstructed stripe
std::vector replacedfs;
int retc = 0;
bool isRecreation = false;
// Place a new file
if (isCreation || (!fmd->getNumLocation()) || isInjection) {
const char* containertag = 0;
if (attrmap.count("user.tag")) {
containertag = attrmap["user.tag"].c_str();
}
/// ###############
// if the client should go through a firewall entrypoint, try to get it
// if the scheduled fs need to be accessed through a dataproxy, try to get it
// if any of the two fails, the scheduling operation fails
Scheduler::PlacementArguments plctargs;
plctargs.alreadyused_filesystems = &selectedfs;
if (isRepair) {
plctargs.bookingsize = bookingsize ? bookingsize : gOFS->getFuseBookingSize();
} else {
plctargs.bookingsize = isFuse ? gOFS->getFuseBookingSize() : bookingsize;
}
plctargs.dataproxys = &proxys;
plctargs.firewallentpts = &firewalleps;
plctargs.forced_scheduling_group_index = forced_group;
plctargs.grouptag = containertag;
plctargs.lid = layoutId;
plctargs.inode = (ino64_t) fmd->getId();
plctargs.path = path;
plctargs.plctTrgGeotag = &targetgeotag;
plctargs.plctpolicy = plctplcy;
plctargs.exclude_filesystems = &excludefs;
plctargs.selected_filesystems = &selectedfs;
plctargs.spacename = &spacename;
plctargs.truncate = open_flags & O_TRUNC;
plctargs.vid = &vid;
if (!plctargs.isValid()) {
// there is something wrong in the arguments of file placement
return Emsg(epname, error, EINVAL, "open - invalid placement argument", path);
}
if (!use_geoscheduler) {
COMMONTIMING("PlctScheduler::FilePlacement", &tm);
uint64_t n_replicas_ = eos::common::LayoutId::GetStripeNumber(layoutId) + 1;
if (n_replicas_ > std::numeric_limits::max()) {
eos_err("msg=\"too many replicas requested\" n_replicas=%" PRIu64, n_replicas_);
return Emsg(epname, error, EINVAL, "open - too many replicas requested", path);
}
uint8_t n_replicas = static_cast(n_replicas_);
placement::PlacementArguments args{n_replicas, placement::ConfigStatus::kRW, strategy};
if (!excludefs.empty()) {
args.excludefs = excludefs;
}
if (forced_group >= 0) {
args.forced_group_index = forced_group;
}
auto ret = gOFS->mFsScheduler->schedule(spacename,
args);
COMMONTIMING("PlctScheduler::FilePlaced", &tm);
if (ret.is_valid_placement(n_replicas)) {
for (int i = 0; i < n_replicas; i++) {
selectedfs.push_back(ret.ids[i]);
}
// TODO: this should be demoted to DEBUG once we have a proper understanding
eos_info("msg=\"FlatScheduler selected filesystems\" fs=%s",
ret.result_string().c_str());
} else {
// Fallback to classic geoscheduler on failure
eos_err("msg =\"no valid placement found with FlatScheduler\" ret=%d, err_msg=%s",
ret.ret_code, ret.error_string().c_str());
use_geoscheduler = true;
}
}
if (use_geoscheduler) {
COMMONTIMING("Scheduler::FilePlacement", &tm);
eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex);
retc = Quota::FilePlacement(&plctargs);
COMMONTIMING("Scheduler::FilePlaced", &tm);
}
// reshuffle the selectedfs by returning as first entry the lowest if the
// sum of the fsid is odd the highest if the sum is even
if (selectedfs.size() > 0) {
std::vector newselectedfs;
auto result = std::minmax_element(selectedfs.begin(), selectedfs.end());
int sum = std::accumulate(selectedfs.begin(), selectedfs.end(), 0);
if ((sum % 2) == 0) {
newselectedfs.push_back(*result.second);
} else {
newselectedfs.push_back(*result.first);
}
for (const auto& i : selectedfs) {
if (i != newselectedfs.front()) {
newselectedfs.push_back(i);
}
}
selectedfs.swap(newselectedfs);
}
} else {
// Access existing file - fill the vector with the existing locations
for (unsigned int i = 0; i < fmd->getNumLocation(); i++) {
auto loc = fmd->getLocation(i);
if (loc != 0 && loc != eos::common::TAPE_FS_ID) {
selectedfs.push_back(loc);
excludefs.push_back(loc);
}
}
eos::IFileMD::LocationVector unlinked = fmd->getUnlinkedLocations();
for (auto loc : unlinked) {
excludefs.push_back(loc);
}
if (selectedfs.empty()) {
// this file has not a single existing replica
gOFS->MgmStats.Add("OpenFileOffline", vid.uid, vid.gid, 1);
// Fire and forget a sync::offline workflow event
errno = 0;
workflow.SetFile(path, mFid);
const auto workflowType = openOpaque->Get("eos.workflow") != nullptr ?
openOpaque->Get("eos.workflow") : "default";
std::string workflowErrorMsg;
const auto ret_wfe = workflow.Trigger("sync::offline", std::string{workflowType},
vid,
ininfo, workflowErrorMsg);
if (ret_wfe < 0 && errno == ENOKEY) {
eos_debug("msg=\"no workflow defined for sync::offline\"");
} else {
eos_info("msg=\"workflow trigger returned\" retc=%d errno=%d event=\"sync::offline\"",
ret_wfe, errno);
}
return Emsg(epname, error, ENODEV, "open - no disk replica exists", path);
}
/// ###############
// reconstruction opens files in RW mode but we actually need RO mode in this case
/// ###############
// if the client should go through a firewall entrypoint, try to get it
// if the scheduled fs need to be accessed through a dataproxy, try to get it
// if any of the two fails, the scheduling operation fails
Scheduler::AccessArguments acsargs;
acsargs.bookingsize = fmd->getSize();
acsargs.dataproxys = &proxys;
acsargs.firewallentpts = &firewalleps;
acsargs.forcedfsid = forcedFsId;
acsargs.forcedspace = space.c_str();
acsargs.fsindex = &fsIndex;
acsargs.isRW = isPioReconstruct ? false : isRW;
acsargs.lid = layoutId;
acsargs.inode = (ino64_t) fmd->getId();
acsargs.locationsfs = &selectedfs;
acsargs.tried_cgi = &tried_cgi;
acsargs.unavailfs = &unavailfs;
acsargs.vid = &vid;
if (!acsargs.isValid()) {
// there is something wrong in the arguments of file access
return Emsg(epname, error, EINVAL, "open - invalid access argument", path);
}
{
COMMONTIMING("Scheduler::FileAccess", &tm);
eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex);
retc = Scheduler::FileAccess(&acsargs);
COMMONTIMING("Scheduler::FileAccessed", &tm);
}
if (acsargs.isRW) {
// If this is an update, we don't have to send the client to cgi
// excluded locations, we tell that the file is unreachable
for (size_t k = 0; k < selectedfs.size(); k++) {
// if the fs is available
if (std::find(unavailfs.begin(), unavailfs.end(),
selectedfs[k]) != unavailfs.end()) {
eos_info("msg=\"location %d is excluded as an unavailable filesystem"
" - returning ENETUNREACH\"", selectedfs[k]);
retc = ENETUNREACH;
break;
}
}
}
if ((retc == ENETUNREACH) || (retc == EROFS) || isRepair) {
if (isRW && ((((fmd->getSize() == 0) && (bookingsize == 0)) || isRepair))) {
// File-recreation due to offline/full file systems
const char* containertag = 0;
if (attrmap.count("user.tag")) {
containertag = attrmap["user.tag"].c_str();
}
isCreation = true;
/// ###############
// if the client should go through a firewall entrypoint, try to get it
// if the scheduled fs need to be accessed through a dataproxy, try to get it
// if any of the two fails, the scheduling operation fails
Scheduler::PlacementArguments plctargs;
plctargs.alreadyused_filesystems = &excludefs;
plctargs.bookingsize = bookingsize;
plctargs.dataproxys = &proxys;
plctargs.firewallentpts = &firewalleps;
plctargs.forced_scheduling_group_index = forced_group;
plctargs.grouptag = containertag;
plctargs.lid = layoutId;
plctargs.inode = (ino64_t) fmd->getId();
plctargs.path = path;
plctargs.plctTrgGeotag = &targetgeotag;
plctargs.plctpolicy = plctplcy;
plctargs.exclude_filesystems = &excludefs;
plctargs.selected_filesystems = &selectedfs;
std::string spacename = space.c_str();
plctargs.spacename = &spacename;
plctargs.truncate = open_flags & O_TRUNC;
plctargs.vid = &vid;
if (!plctargs.isValid()) {
// there is something wrong in the arguments of file placement
return Emsg(epname, error, EINVAL, "open - invalid placement argument", path);
}
if (!use_geoscheduler) {
COMMONTIMING("PlctScheduler::FilePlacement", &tm);
uint8_t n_replicas = eos::common::LayoutId::GetStripeNumber(layoutId) + 1;
placement::PlacementArguments args{n_replicas, placement::ConfigStatus::kRW, strategy};
if (!excludefs.empty()) {
args.excludefs = excludefs;
}
if (forced_group >= 0) {
args.forced_group_index = forced_group;
}
auto ret = gOFS->mFsScheduler->schedule(spacename,
args);
COMMONTIMING("PlctScheduler::FilePlaced", &tm);
if (ret.is_valid_placement(n_replicas)) {
for (int i = 0; i < n_replicas; i++) {
selectedfs.push_back(ret.ids[i]);
}
} else {
eos_info("msg =\"no valid placement found with FSScheduler\" ret=%d, err_msg=%s",
ret.ret_code, ret.error_string().c_str());
use_geoscheduler = true;
}
}
if (use_geoscheduler) {
COMMONTIMING("Scheduler::FilePlacement", &tm);
eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex);
retc = Quota::FilePlacement(&plctargs);
COMMONTIMING("Scheduler::FilePlaced", &tm);
}
eos_info("msg=\"file-recreation due to offline/full locations\" path=%s retc=%d",
path, retc);
isRecreation = true;
// end scope fs_rd_lock
} else {
// Normal read failed, try to reply with the tiredrc value if this
// exists in the URL otherwise we'll return ENETUNREACH which is a
// client recoverable error.
char* triedrc = openOpaque->Get("triedrc");
if (triedrc) {
int errno_tried = GetTriedrcErrno(triedrc);
if (errno_tried) {
return Emsg(epname, error, errno_tried, "open file", path);
}
}
}
}
if (retc == EXDEV) {
// Indicating that the layout requires the replacement of stripes
retc = 0; // TODO: we currently don't support repair on the fly mode
}
}
LogSchedulingInfo(selectedfs, proxys, firewalleps);
if (retc) {
// If we don't have quota we don't bounce the client back
if ((retc != ENOSPC) && (retc != EDQUOT)) {
// INLINE Workflows
int stalltime = 0;
workflow.SetFile(path, fmd->getId());
std::string errorMsg;
if ((stalltime = workflow.Trigger("open", "enonet", vid, ininfo,
errorMsg)) > 0) {
eos_info("msg=\"triggered ENOENT workflow\" path=%s", path);
return gOFS->Stall(error, stalltime, ""
"File is currently unavailable - triggered workflow!");
}
// check if we have a global redirect or stall for offline files
MAYREDIRECT_ENONET;
MAYSTALL_ENONET;
MAYREDIRECT_ENETUNREACH;
MAYSTALL_ENETUNREACH;
// check if the dir attributes tell us to let clients rebounce
if (attrmap.count("sys.stall.unavailable")) {
int stalltime = atoi(attrmap["sys.stall.unavailable"].c_str());
if (stalltime) {
// stall the client
gOFS->MgmStats.Add("OpenStalled", vid.uid, vid.gid, 1);
eos_info("attr=sys info=\"stalling file since replica's are down\" path=%s rw=%d",
path, isRW);
return gOFS->Stall(error, stalltime,
"Required filesystems are currently unavailable!");
}
}
if (attrmap.count("user.stall.unavailable")) {
int stalltime = atoi(attrmap["user.stall.unavailable"].c_str());
if (stalltime) {
// stall the client
gOFS->MgmStats.Add("OpenStalled", vid.uid, vid.gid, 1);
eos_info("attr=user info=\"stalling file since replica's are down\" path=%s rw=%d",
path, isRW);
return gOFS->Stall(error, stalltime,
"Required filesystems are currently unavailable!");
}
}
if ((attrmap.count("sys.redirect.enonet"))) {
// there is a redirection setting here if files are unaccessible
redirectionhost = "";
redirectionhost = attrmap["sys.redirect.enonet"].c_str();
int portpos = 0;
if ((portpos = redirectionhost.find(":")) != STR_NPOS) {
XrdOucString port = redirectionhost;
port.erase(0, portpos + 1);
ecode = atoi(port.c_str());
redirectionhost.erase(portpos);
} else {
ecode = 1094;
}
if (!gOFS->SetRedirectionInfo(error, redirectionhost.c_str(), ecode)) {
eos_err("msg=\"failed setting redirection\" path=\"%s\"", path);
return SFS_ERROR;
}
rcode = SFS_REDIRECT;
gOFS->MgmStats.Add("RedirectENONET", vid.uid, vid.gid, 1);
return rcode;
}
if (!gOFS->mMaster->IsMaster() && gOFS->mMaster->IsRemoteMasterOk()) {
// Redirect ENONET to the actual master
int port {0};
std::string hostname;
std::string master_id = gOFS->mMaster->GetMasterId();
if (!eos::common::ParseHostNamePort(master_id, hostname, port)) {
eos_err("msg=\"failed parsing remote master info\", id=%s",
master_id.c_str());
return Emsg(epname, error, retc, "open file - failed parsing remote "
"master info", path);
}
redirectionhost = hostname.c_str();
ecode = port;
if (!gOFS->SetRedirectionInfo(error, redirectionhost.c_str(), ecode)) {
eos_err("msg=\"failed setting redirection\" path=\"%s\"", path);
return SFS_ERROR;
}
rcode = SFS_REDIRECT;
gOFS->MgmStats.Add("RedirectENONET", vid.uid, vid.gid, 1);
return rcode;
}
gOFS->MgmStats.Add("OpenFileOffline", vid.uid, vid.gid, 1);
} else {
// Remove the created file from the namespace as root since somebody could
// have a no-delete ACL. Do this only if there are no replicas already
// attached to the file md entry. If there are, this means the current
// thread was blocked in scheduling and a retry of the client went
// through successfully. If we delete the entry we end up with data lost.
if (isCreation) {
bool do_remove = false;
try {
eos::Prefetcher::prefetchFileMDAndWait(gOFS->eosView, path);
eos::common::RWMutexReadLock ns_rd_lock(gOFS->eosViewRWMutex);
auto tmp_fmd = gOFS->eosView->getFile(path);
if (tmp_fmd->getNumLocation() == 0) {
do_remove = true;
}
} catch (eos::MDException& e) {}
if (do_remove) {
eos::common::VirtualIdentity vidroot = eos::common::VirtualIdentity::Root();
gOFS->_rem(cPath.GetPath(), error, vidroot, 0, false, false, false);
}
}
gOFS->MgmStats.Add("OpenFailedQuota", vid.uid, vid.gid, 1);
}
if (isRW) {
if (retc == ENOSPC) {
return Emsg(epname, error, retc, "get free physical space", path);
}
if (retc == EDQUOT) {
return Emsg(epname, error, retc,
"get quota space - quota not defined or exhausted", path);
}
return Emsg(epname, error, retc, "access quota space", path);
} else {
return Emsg(epname, error, retc, "open file ", path);
}
} else {
if (isRW) {
// we want to define the order of chunks during creation, so we attach also rain layouts
if (isCreation && hasClientBookingSize && ((bookingsize == 0) ||
ocUploadUuid.length() || (LayoutId::IsRain(layoutId)))) {
// ---------------------------------------------------------------------
// if this is a creation we commit the scheduled replicas NOW
// we do the same for chunked/parallel uploads
// ---------------------------------------------------------------------
{
// get an empty file checksum
std::string binchecksum = LayoutId::GetEmptyFileChecksum(layoutId);
eos::Buffer cx;
cx.putData(binchecksum.c_str(), binchecksum.size());
// FUSEX repair access needs to retrieve the file by fid
// TODO: Refactor isCreation and isRecreation code paths
//eos::common::RWMutexWriteLock lock(gOFS->eosViewRWMutex);
// -------------------------------------------------------------------
try {
std::string locations;
if (fmd->hasAttribute("sys.fs.tracking")) {
locations = fmd->getAttribute("sys.fs.tracking");
}
if (isRecreation) {
fmd->unlinkAllLocations();
locations += "=";
}
if (isRecreation) {
std::string s;
if (fmd->hasAttribute("sys.fusex.state")) {
s = fmd->getAttribute("sys.fusex.state");
}
s += "Z";
fmd->setAttribute("sys.fusex.state",
eos::common::StringConversion::ReduceString(s).c_str());
}
for (auto& fsid : selectedfs) {
fmd->addLocation(fsid);
locations += "+";
locations += std::to_string(fsid);
}
fmd->setAttribute("sys.fs.tracking",
eos::common::StringConversion::ReduceString(locations).c_str());
fmd->setChecksum(cx);
gOFS->eosView->updateFileStore(fmd.get());
} catch (eos::MDException& e) {
errno = e.getErrno();
std::string errmsg = e.getMessage().str();
eos_debug("msg=\"exception\" ec=%d emsg=\"%s\"\n",
e.getErrno(), e.getMessage().str().c_str());
gOFS->MgmStats.Add("OpenFailedQuota", vid.uid, vid.gid, 1);
return Emsg(epname, error, errno, "open file", errmsg.c_str());
}
// -------------------------------------------------------------------
}
mIsZeroSize = true;
}
if (isFuse && !isCreation) {
// ---------------------------------------------------------------------
// if we come from fuse for an update
// consistently redirect to the highest fsid having if possible the same
// geotag as the client
// ---------------------------------------------------------------------
if (byfid) {
// the new FUSE client needs to have the replicas attached after the
// first open call
std::string locations;
try {
if (fmd->hasAttribute("sys.fs.tracking")) {
locations = fmd->getAttribute("sys.fs.tracking");
}
for (auto& fsid : selectedfs) {
fmd->addLocation(fsid);
locations += "+";
locations += std::to_string(fsid);
}
gOFS->eosView->updateFileStore(fmd.get());
} catch (eos::MDException& e) {
errno = e.getErrno();
std::string errmsg = e.getMessage().str();
eos_debug("msg=\"exception\" ec=%d emsg=\"%s\"\n",
e.getErrno(), e.getMessage().str().c_str());
gOFS->MgmStats.Add("OpenFailedQuota", vid.uid, vid.gid, 1);
return Emsg(epname, error, errno, "open file", errmsg.c_str());
}
}
eos::common::FileSystem::fsid_t fsid = 0;
fsIndex = 0;
std::string fsgeotag;
{
eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex);
for (size_t k = 0; k < selectedfs.size(); k++) {
auto filesystem = FsView::gFsView.mIdView.lookupByID(selectedfs[k]);
fsgeotag = "";
if (filesystem) {
fsgeotag = filesystem->GetString("stat.geotag");
}
// if the fs is available
if (std::find(unavailfs.begin(), unavailfs.end(), selectedfs[k]) ==
unavailfs.end()) {
// take the highest fsid with the same geotag if possible
if ((vid.geolocation.empty() ||
(fsgeotag.find(vid.geolocation) != std::string::npos)) &&
(selectedfs[k] > fsid)) {
fsIndex = k;
fsid = selectedfs[k];
}
}
}
} // fs_rd_lock scope
// if the client has a geotag which does not match any of the fs's
if (!fsIndex) {
fsid = 0;
for (size_t k = 0; k < selectedfs.size(); k++)
if (selectedfs[k] > fsid) {
fsIndex = k;
fsid = selectedfs[k];
}
}
// EOS-2787
// reshuffle the selectedfs to set if available the highest with matching geotag in front
if (fsid) {
std::vector newselectedfs;
newselectedfs.push_back(fsid);
for (const auto& i : selectedfs) {
if (i != newselectedfs.front()) {
newselectedfs.push_back(i);
}
}
selectedfs.swap(newselectedfs);
fsIndex = 0;
}
}
} else {
if (!fmd->getSize()) {
// 0-size files can be read from the MGM if this is not FUSE access and
// also if this is not a rain file
if (!isFuse && !LayoutId::IsRain(layoutId)) {
mIsZeroSize = true;
return SFS_OK;
}
}
}
}
// If this is a RAIN layout, we want a nice round-robin for the entry
// server since it has the burden of encoding and traffic fan-out
if (isRW && LayoutId::IsRain(layoutId)) {
fsIndex = mFid % selectedfs.size();
eos_static_info("selecting entry-server fsIndex=%lu fsid=%lu fxid=%lx mod=%lu",
fsIndex, selectedfs[fsIndex], mFid, selectedfs.size());
}
// Get the redirection host from the selected entry in the vector
if (!selectedfs[fsIndex]) {
eos_err("%s", "msg=\"0 filesystem in selection\"");
return Emsg(epname, error, ENETUNREACH, "received filesystem id 0", path);
}
XrdOucString piolist = "";
XrdOucString infolog = "";
std::string fs_hostport, fs_host, fs_port, fs_http_port, fs_prefix,
fs_host_alias, fs_port_alias;
uint32_t fs_id;
{
eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex);
auto filesystem = FsView::gFsView.mIdView.lookupByID(selectedfs[fsIndex]);
if (!filesystem) {
return Emsg(epname, error, ENETUNREACH,
"received non-existent filesystem", path);
}
fs_hostport = filesystem->GetString("hostport");
fs_host = filesystem->GetString("host");
fs_port = filesystem->GetString("port");
fs_host_alias = filesystem->GetString("stat.alias.host");
fs_port_alias = filesystem->GetString("stat.alias.port");
// allow FST host alias
if (fs_host_alias.length()) {
fs_host = fs_host_alias;
if (fs_port_alias.length()) {
fs_port = fs_port_alias;
}
fs_hostport = fs_host + std::string(":") + fs_port;
eos_info("redirection-alias=\"%s:%s\"", fs_host_alias.c_str(),
fs_port_alias.c_str());
}
fs_http_port = filesystem->GetString("stat.http.port");
fs_prefix = filesystem->GetPath();
fs_id = filesystem->GetId();
} // fs_rd_lock scope
// Set the FST gateway for clients who are geo-tagged with default
if ((firewalleps.size() > fsIndex) && (proxys.size() > fsIndex)) {
// Do this with forwarding proxy syntax only if the firewall entrypoint is
// different from the endpoint
if (!(firewalleps[fsIndex].empty()) &&
((!proxys[fsIndex].empty() && firewalleps[fsIndex] != proxys[fsIndex]) ||
(firewalleps[fsIndex] != fs_hostport))) {
// Build the URL for the forwarding proxy and must have the following
// redirection proxy:port?eos.fstfrw=endpoint:port/abspath
auto idx = firewalleps[fsIndex].rfind(':');
if (idx != std::string::npos) {
targethost = firewalleps[fsIndex].substr(0, idx).c_str();
targetport = atoi(firewalleps[fsIndex].substr(idx + 1,
std::string::npos).c_str());
targethttpport = 8001;
} else {
targethost = firewalleps[fsIndex].c_str();
targetport = 0;
targethttpport = 8001;
}
std::ostringstream oss;
oss << targethost << "?" << "eos.fstfrw=";
// Check if we have to redirect to the fs host or to a proxy
if (proxys[fsIndex].empty()) {
oss << fs_host << ":" << fs_port;
} else {
oss << proxys[fsIndex];
}
redirectionhost = oss.str().c_str();
redirectionhost += "&";
} else {
if (proxys[fsIndex].empty()) { // there is no proxy to use
targethost = fs_host.c_str();
targetport = atoi(fs_port.c_str());
targethttpport = atoi(fs_http_port.c_str());
// default xrootd & http port
if (!targetport) {
targetport = 1095;
}
if (!targethttpport) {
targethttpport = 8001;
}
} else { // we have a proxy to use
auto idx = proxys[fsIndex].rfind(':');
if (idx != std::string::npos) {
targethost = proxys[fsIndex].substr(0, idx).c_str();
targetport = atoi(proxys[fsIndex].substr(idx + 1, std::string::npos).c_str());
targethttpport = 8001;
} else {
targethost = proxys[fsIndex].c_str();
targetport = 0;
targethttpport = 0;
}
}
redirectionhost = targethost;
redirectionhost += "?";
}
if (!proxys[fsIndex].empty()) {
if (!(fs_prefix.empty())) {
XrdOucString s = "mgm.fsprefix";
s += "=";
s += fs_prefix.c_str();
s.replace(":", "#COL#");
redirectionhost += s;
}
}
} else {
// There is no proxy or firewall entry point to use
targethost = fs_host.c_str();
targetport = atoi(fs_port.c_str());
targethttpport = atoi(fs_http_port.c_str());
redirectionhost = targethost;
redirectionhost += "?";
}
// ---------------------------------------------------------------------------
// Rebuild the layout ID (for read it should indicate only the number of
// available stripes for reading);
// For 'pio' mode we hand out plain layouts to the client and add the IO
// layout as an extra field
// ---------------------------------------------------------------------------
// Get the unique set of file systems
std::set ufs(selectedfs.begin(), selectedfs.end());
ufs.insert(pio_reconstruct_fs.begin(), pio_reconstruct_fs.end());
// If file system 0 sentinel is present then it must be removed
ufs.erase(0u);
new_lid = LayoutId::GetId(
isPio ? LayoutId::kPlain :
LayoutId::GetLayoutType(layoutId),
(isPio ? LayoutId::kNone :
LayoutId::GetChecksum(layoutId)),
isPioReconstruct ? static_cast(ufs.size()) : static_cast
(selectedfs.size()),
LayoutId::GetBlocksizeType(layoutId),
LayoutId::GetBlockChecksum(layoutId));
// For RAIN layouts we need to keep the original number of stripes since this
// is used to compute the different groups and block sizes in the FSTs
if ((LayoutId::IsRain(layoutId))) {
LayoutId::SetStripeNumber(new_lid,
LayoutId::GetStripeNumber(layoutId));
}
capability += "&mgm.lid=";
capability += static_cast(new_lid);
// space to be prebooked/allocated
capability += "&mgm.bookingsize=";
if (isPioReconstruct) {
// For pio reconstruct the booking size needs to be 0,
// the recovery will fail on non xfs filesystem otherwise.
capability += "0";
} else {
capability +=
eos::common::StringConversion::GetSizeString(sizestring, bookingsize);
}
if (minimumsize) {
capability += "&mgm.minsize=";
capability += eos::common::StringConversion::GetSizeString(sizestring,
minimumsize);
}
if (maximumsize) {
capability += "&mgm.maxsize=";
capability += eos::common::StringConversion::GetSizeString(sizestring,
maximumsize);
}
// Expected size of the target file on close
if (targetsize) {
capability += "&mgm.targetsize=";
capability += eos::common::StringConversion::GetSizeString(sizestring,
targetsize);
}
if (LayoutId::GetLayoutType(layoutId) == LayoutId::kPlain) {
capability += "&mgm.fsid=";
capability += (int) fs_id;
}
if (isRepairRead) {
capability += "&mgm.repairread=1";
}
if (mIsZeroSize) {
capability += "&mgm.zerosize=1";
}
// Add the store flag for RAIN reconstruct jobs
if (isPioReconstruct) {
capability += "&mgm.rain.store=1";
// Append also the mgm.rain.size since we can't deduce at the FST during
// the recovery step and we need it for the stat information
capability += "&mgm.rain.size=";
capability += std::to_string(fmdsize).c_str();
}
if (bandwidth.length() && (bandwidth != "0")) {
capability += "&mgm.iobw=";
capability += bandwidth.c_str();
}
if ((LayoutId::GetLayoutType(layoutId) == LayoutId::kReplica) ||
(LayoutId::IsRain(layoutId))) {
capability += "&mgm.fsid=";
capability += (int) fs_id;
eos::mgm::FileSystem* repfilesystem = 0;
replacedfs.resize(selectedfs.size());
// If replacement has been specified try to get new locations for
// reconstruction or for missing stripes
if (isPioReconstruct && !(pio_reconstruct_fs.empty())) {
const char* containertag = 0;
if (attrmap.count("user.tag")) {
containertag = attrmap["user.tag"].c_str();
}
// Get the scheduling group of one of the stripes
if (fmd->getNumLocation() == 0) {
eos_err("msg=\"no locations available for file\"");
return Emsg(epname, error, EIO, "get any locations for file", path);
}
eos::common::FileSystem::fs_snapshot_t orig_snapshot;
unsigned int orig_id = fmd->getLocation(0);
{
eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex);
// Note this is a eos::common::filesystem not a mgm one
auto orig_fs = FsView::gFsView.mIdView.lookupByID(orig_id);
if (!orig_fs) {
return Emsg(epname, error, EINVAL, "reconstruct filesystem", path);
}
orig_fs->SnapShotFileSystem(orig_snapshot);
} // fs_rd_lock
forced_group = orig_snapshot.mGroupIndex;
// Add new stripes if file doesn't have the nomial number
auto stripe_diff = (LayoutId::GetStripeNumber(fmd->getLayoutId()) + 1) -
selectedfs.size();
// Create a plain layout with the number of replacement stripes to be
// scheduled in the file placement routine
unsigned long plain_lid = new_lid;
if (pio_reconstruct_fs.find(0) != pio_reconstruct_fs.end()) {
LayoutId::SetStripeNumber(plain_lid, stripe_diff - 1);
} else {
LayoutId::SetStripeNumber(plain_lid,
pio_reconstruct_fs.size() - 1 + stripe_diff);
}
eos_info("msg=\"nominal stripes:%d reconstructed stripes=%d group_idx=%d\"",
LayoutId::GetStripeNumber(new_lid) + 1,
LayoutId::GetStripeNumber(plain_lid) + 1,
forced_group);
// Compute the size of the stripes to be placed
unsigned long num_data_stripes = LayoutId::GetStripeNumber(layoutId) + 1 -
LayoutId::GetRedundancyStripeNumber(layoutId);
uint64_t plain_book_sz = (uint64_t)std::ceil((float)fmd->getSize() /
LayoutId::GetBlocksize(layoutId));
plain_book_sz = std::ceil((float) plain_book_sz / std::pow(num_data_stripes,
2)) *
num_data_stripes * LayoutId::GetBlocksize(layoutId) + LayoutId::OssXsBlockSize;
eos_info("msg=\"plain booking size is %llu", plain_book_sz);
eos::common::VirtualIdentity rootvid = eos::common::VirtualIdentity::Root();
// Attempt to use a firewall entrypoint or a dataproxy if required, if any
// of the two fail, then scheduling fails
Scheduler::PlacementArguments plctargs;
plctargs.alreadyused_filesystems = &selectedfs;
plctargs.bookingsize = plain_book_sz;
plctargs.dataproxys = &proxys;
plctargs.firewallentpts = &firewalleps;
plctargs.forced_scheduling_group_index = forced_group;
plctargs.grouptag = containertag;
plctargs.lid = plain_lid;
plctargs.inode = (ino64_t) fmd->getId();
plctargs.path = path;
plctargs.plctTrgGeotag = &targetgeotag;
plctargs.plctpolicy = plctplcy;
plctargs.exclude_filesystems = &excludefs;
plctargs.selected_filesystems = &pio_replacement_fs;
std::string spacename = space.c_str();
plctargs.spacename = &spacename;
plctargs.truncate = false;
plctargs.vid = &rootvid;
if (!plctargs.isValid()) {
return Emsg(epname, error, EIO, "open - invalid placement argument", path);
}
COMMONTIMING("Scheduler::FilePlacement", &tm);
{
eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex);
retc = Quota::FilePlacement(&plctargs);
}
COMMONTIMING("Scheduler::FilePlaced", &tm);
LogSchedulingInfo(selectedfs, proxys, firewalleps);
if (retc) {
gOFS->MgmStats.Add("OpenFailedReconstruct", rootvid.uid, rootvid.gid, 1);
return Emsg(epname, error, retc, "schedule stripes for reconstruction", path);
}
for (const auto& elem : pio_replacement_fs) {
eos_debug("msg=\"reconstruction scheduled on new fs\" fsid=%lu num=%lu",
elem, pio_replacement_fs.size());
}
auto selection_diff = (LayoutId::GetStripeNumber(fmd->getLayoutId()) + 1)
- selectedfs.size();
eos_info("msg=\"fs selection summary\" nominal=%d actual=%d diff=%d",
(LayoutId::GetStripeNumber(fmd->getLayoutId()) + 1),
selectedfs.size(), selection_diff);
// If there are stripes missing then fill them in from the replacements
if (pio_replacement_fs.size() < selection_diff) {
eos_err("msg=\"not enough replacement fs\" need=%lu have=%lu",
selection_diff, pio_replacement_fs.size());
return Emsg(epname, error, retc, "schedule enough stripes for reconstruction",
path);
}
for (size_t i = 0; i < selection_diff; ++i) {
selectedfs.push_back(pio_replacement_fs.back());
pio_replacement_fs.pop_back();
}
}
replacedfs.resize(selectedfs.size());
{
// Put all the replica urls into the capability,
// this is all under a view lock
eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex);
for (unsigned int i = 0; i < selectedfs.size(); ++i) {
if (!selectedfs[i]) {
eos_err("%s", "msg=\"fsid 0 in replica vector\"");
}
// Logic to discover filesystems to be reconstructed
bool replace = false;
if (isPioReconstruct) {
replace = (pio_reconstruct_fs.find(selectedfs[i]) != pio_reconstruct_fs.end());
}
if (replace) {
// If we don't find any replacement
if (pio_replacement_fs.empty()) {
return Emsg(epname, error, EIO, "get replacement file system", path);
}
// Take one replacement filesystem from the replacement list
replacedfs[i] = selectedfs[i];
selectedfs[i] = pio_replacement_fs.back();
pio_replacement_fs.pop_back();
eos_info("msg=\"replace fs\" old-fsid=%u new-fsid=%u", replacedfs[i],
selectedfs[i]);
} else {
// There is no replacement happening
replacedfs[i] = 0;
}
repfilesystem = FsView::gFsView.mIdView.lookupByID(selectedfs[i]);
if (!repfilesystem) {
// Don't fail IO on a shadow file system but throw a critical error
// message
eos_crit("msg=\"Unable to get replica filesystem information\" "
"path=\"%s\" fsid=%d", path, selectedfs[i]);
continue;
}
if (replace) {
fsIndex = i;
// Set the FST gateway if this is available otherwise the actual FST
if ((firewalleps.size() > fsIndex) && (proxys.size() > fsIndex) &&
!(firewalleps[fsIndex].empty()) &&
((!proxys[fsIndex].empty() && firewalleps[fsIndex] != proxys[fsIndex]) ||
(firewalleps[fsIndex] != repfilesystem->GetString("hostport")))) {
// Build the URL for the forwarding proxy and must have the following
// redirection proxy:port?eos.fstfrw=endpoint:port/abspath
auto idx = firewalleps[fsIndex].rfind(':');
if (idx != std::string::npos) {
targethost = firewalleps[fsIndex].substr(0, idx).c_str();
targetport = atoi(firewalleps[fsIndex].substr(idx + 1,
std::string::npos).c_str());
targethttpport = 8001;
} else {
targethost = firewalleps[fsIndex].c_str();
targetport = 0;
targethttpport = 0;
}
std::ostringstream oss;
oss << targethost << "?"
<< "eos.fstfrw=";
// check if we have to redirect to the fs host or to a proxy
if (proxys[fsIndex].empty()) {
oss << repfilesystem->GetString("host").c_str() << ":"
<< repfilesystem->GetString("port").c_str();
} else {
oss << proxys[fsIndex];
}
redirectionhost = oss.str().c_str();
} else {
if ((proxys.size() > fsIndex) && !proxys[fsIndex].empty()) {
// We have a proxy to use
(void) proxys[fsIndex].c_str();
auto idx = proxys[fsIndex].rfind(':');
if (idx != std::string::npos) {
targethost = proxys[fsIndex].substr(0, idx).c_str();
targetport = atoi(proxys[fsIndex].substr(idx + 1, std::string::npos).c_str());
targethttpport = 8001;
} else {
targethost = proxys[fsIndex].c_str();
targetport = 0;
targethttpport = 0;
}
} else {
// There is no proxy to use
targethost = repfilesystem->GetString("host").c_str();
targetport = atoi(repfilesystem->GetString("port").c_str());
targethttpport = atoi(repfilesystem->GetString("stat.http.port").c_str());
}
redirectionhost = targethost;
redirectionhost += "?";
}
// point at the right vector entry
fsIndex = i;
}
capability += "&mgm.url";
capability += (int) i;
capability += "=root://";
XrdOucString replicahost = "";
int replicaport = 0;
// -----------------------------------------------------------------------
// Logic to mask 'offline' filesystems
// -----------------------------------------------------------------------
for (unsigned int k = 0; k < unavailfs.size(); ++k) {
if (selectedfs[i] == unavailfs[k]) {
replicahost = "__offline_";
break;
}
}
if ((proxys.size() > i) && !proxys[i].empty()) {
// We have a proxy to use
auto idx = proxys[i].rfind(':');
if (idx != std::string::npos) {
replicahost = proxys[i].substr(0, idx).c_str();
replicaport =
atoi(proxys[i].substr(idx + 1, std::string::npos).c_str());
} else {
replicahost = proxys[i].c_str();
replicaport = 0;
}
} else {
// There is no proxy to use
replicahost += repfilesystem->GetString("host").c_str();
replicaport = atoi(repfilesystem->GetString("port").c_str());
}
capability += replicahost;
capability += ":";
capability += replicaport;
capability += "//";
// add replica fsid
capability += "&mgm.fsid";
capability += (int)i;
capability += "=";
capability += (int)repfilesystem->GetId();
if ((proxys.size() > i) && !proxys[i].empty()) {
std::string fsprefix = repfilesystem->GetPath();
if (!fsprefix.empty()) {
XrdOucString s = "mgm.fsprefix";
s += (int)i;
s += "=";
s += fsprefix.c_str();
s.replace(":", "#COL#");
capability += s;
}
}
if (isPio) {
if (replacedfs[i]) {
// Add the drop message to the replacement capability
capability += "&mgm.drainfsid";
capability += (int)i;
capability += "=";
capability += (int)replacedfs[i];
}
piolist += "pio.";
piolist += (int)i;
piolist += "=";
piolist += replicahost;
piolist += ":";
piolist += replicaport;
piolist += "&";
}
eos_debug("msg=\"redirection url\" %d => %s", i, replicahost.c_str());
infolog += "target[";
infolog += (int)i;
infolog += "]=(";
infolog += replicahost.c_str();
infolog += ",";
infolog += (int)repfilesystem->GetId();
infolog += ") ";
}
} // fs_rd_lock
}
// ---------------------------------------------------------------------------
// Encrypt capability
// ---------------------------------------------------------------------------
XrdOucEnv incapability(capability.c_str());
eos::common::SymKey* symkey = eos::common::gSymKeyStore.GetCurrentKey();
eos_debug("capability=%s\n", capability.c_str());
int caprc = 0;
XrdOucEnv* capabilityenvRaw = nullptr;
if ((caprc = eos::common::SymKey::CreateCapability(&incapability,
capabilityenvRaw,
symkey, gOFS->mCapabilityValidity))) {
return Emsg(epname, error, caprc, "sign capability", path);
}
std::unique_ptr capabilityenv(capabilityenvRaw);
int caplen = 0;
if (isPio) {
redirectionhost = piolist;
redirectionhost += "mgm.lid=";
redirectionhost += static_cast(layoutId);
redirectionhost += "&mgm.logid=";
redirectionhost += this->logId;
redirectionhost += capabilityenv->Env(caplen);
} else {
redirectionhost += capabilityenv->Env(caplen);
redirectionhost += "&mgm.logid=";
redirectionhost += this->logId;
if (openOpaque->Get("eos.blockchecksum")) {
redirectionhost += "&mgm.blockchecksum=";
redirectionhost += openOpaque->Get("eos.blockchecksum");
} else {
if ((!isRW) && (LayoutId::GetLayoutType(layoutId) == LayoutId::kReplica)) {
redirectionhost += "&mgm.blockchecksum=ignore";
}
}
if (openOpaque->Get("eos.checksum") || openOpaque->Get("eos.cloneid")) {
redirectionhost += "&mgm.checksum=";
redirectionhost += openOpaque->Get("eos.checksum");
}
if (openOpaque->Get("eos.mtime")) {
redirectionhost += "&mgm.mtime=0";
}
// For the moment we redirect only on storage nodes
redirectionhost += "&mgm.replicaindex=";
redirectionhost += (int) fsIndex;
redirectionhost += "&mgm.replicahead=";
redirectionhost += (int) fsIndex;
}
if (vid.prot == "https") {
struct stat buf;
std::string etag;
eos::common::VirtualIdentity rootvid = eos::common::VirtualIdentity::Root();
// get the current ETAG
gOFS->_stat(path, &buf, error, rootvid, "", &etag);
redirectionhost += "&mgm.etag=";
if (!etag.length()) {
redirectionhost += "undef";
} else {
redirectionhost += etag.c_str();
}
}
// add the MGM hex id for this file
redirectionhost += "&mgm.id=";
redirectionhost += hex_fid.c_str();
if (isFuse) {
redirectionhost += "&mgm.mtime=0";
} else {
if (!isRW) {
eos::IFileMD::ctime_t mtime;
try {
fmd->getMTime(mtime);
redirectionhost += "&mgm.mtime=";
std::string smtime;
smtime += std::to_string(mtime.tv_sec);
redirectionhost += smtime.c_str();
} catch (eos::MDException& ex) {
}
}
}
// Also trigger synchronous create workflow event if it's defined
if (isCreation) {
errno = 0;
workflow.SetFile(path, mFid);
auto workflowType = openOpaque->Get("eos.workflow") != nullptr ?
openOpaque->Get("eos.workflow") : "default";
std::string errorMsg;
auto ret_wfe = workflow.Trigger("sync::create", std::string{workflowType}, vid,
ininfo, errorMsg);
if (ret_wfe < 0 && errno == ENOKEY) {
eos_debug("msg=\"no workflow defined for sync::create\"");
} else {
eos_info("msg=\"workflow trigger returned\" retc=%d errno=%d", ret_wfe, errno);
if (ret_wfe != 0) {
// Remove the file from the namespace in this case
try {
eos::common::RWMutexWriteLock lock(gOFS->eosViewRWMutex);
gOFS->eosView->removeFile(fmd.get());
} catch (eos::MDException& ex) {
eos_err("Failed to remove file from namespace in case of create workflow error. Reason: %s",
ex.what());
}
return Emsg(epname, error, ret_wfe, errorMsg.c_str(),
path);
}
}
}
// add workflow cgis, has to come after create workflow
workflow.SetFile(path, mFid);
if (isRW) {
redirectionhost += workflow.getCGICloseW(currentWorkflow.c_str(), vid).c_str();
} else {
redirectionhost += workflow.getCGICloseR(currentWorkflow.c_str()).c_str();
}
// Notify tape garbage collector if tape support is enabled
if (gOFS->mTapeEnabled) {
try {
eos::common::RWMutexReadLock tgc_ns_rd_lock(gOFS->eosViewRWMutex);
const auto tgcFmd = gOFS->eosFileService->getFileMD(mFid);
const bool isATapeFile = tgcFmd->hasAttribute("sys.archive.file_id");
tgc_ns_rd_lock.Release();
if (isATapeFile) {
if (isRW) {
const std::string tgcSpace = nullptr != space.c_str() ? space.c_str() : "";
gOFS->mTapeGc->fileOpenedForWrite(tgcSpace, mFid);
} else {
const auto fsId = getFirstDiskLocation(selectedfs);
const std::string tgcSpace = FsView::gFsView.mIdView.lookupSpaceByID(fsId);
gOFS->mTapeGc->fileOpenedForRead(tgcSpace, mFid);
}
}
} catch (...) {
// Ignore any garbage collection exceptions
}
}
// Always redirect
if ((vid.prot == "https") || (vid.prot == "http")) {
ecode = targethttpport;
} else {
ecode = targetport;
}
rcode = SFS_REDIRECT;
XrdOucString predirectionhost = redirectionhost.c_str();
eos::common::StringConversion::MaskTag(predirectionhost, "cap.msg");
eos::common::StringConversion::MaskTag(predirectionhost, "cap.sym");
if (isRW) {
eos_info("op=write path=%s info=%s %s redirection=%s xrd_port=%d "
"http_port=%d", path, pinfo.c_str(), infolog.c_str(),
predirectionhost.c_str(), targetport, targethttpport);
} else {
eos_info("op=read path=%s info=%s %s redirection=%s xrd_port=%d "
"http_port=%d", path, pinfo.c_str(), infolog.c_str(),
predirectionhost.c_str(), targetport, targethttpport);
}
EXEC_TIMING_END("Open");
COMMONTIMING("end", &tm);
char clientinfo[1024];
snprintf(clientinfo, sizeof(clientinfo),
"open:rt=%.02f io:bw=%s io:sched=%d io:type=%s io:prio=%s io:redirect=%s:%d",
__exec_time__, bandwidth.length() ? bandwidth.c_str() : "inf", schedule,
iotype.length() ? iotype.c_str() : "buffered",
ioprio.length() ? ioprio.c_str() : "default", targethost.c_str(), ecode);
std::string sclientinfo(clientinfo);
std::string zclientinfo;
eos::common::SymKey::ZBase64(sclientinfo, zclientinfo);
redirectionhost += "&eos.clientinfo=";
redirectionhost += zclientinfo.c_str();
if (!gOFS->SetRedirectionInfo(error, redirectionhost.c_str(), ecode)) {
eos_err("msg=\"failed setting redirection\" path=\"%s\"", path);
return SFS_ERROR;
}
eos_info("path=%s %s duration=%0.03fms timing=%s",
path, clientinfo, tm.RealTime(), tm.Dump().c_str());
return rcode;
}
//----------------------------------------------------------------------------
// Read a partial result of a 'proc' interface command
//----------------------------------------------------------------------------
XrdSfsXferSize
XrdMgmOfsFile::read(XrdSfsFileOffset offset,
char* buff,
XrdSfsXferSize blen)
{
static const char* epname = "read";
if (mIsZeroSize) {
return 0;
}
if (mProcCmd) {
return mProcCmd->read(offset, buff, blen);
}
return Emsg(epname, error, EOPNOTSUPP, "read", fileName.c_str());
}
//------------------------------------------------------------------------------
// Read file pages into a buffer and return corresponding checksums
//------------------------------------------------------------------------------
XrdSfsXferSize
XrdMgmOfsFile::pgRead(XrdSfsFileOffset offset, char* buffer,
XrdSfsXferSize rdlen, uint32_t* csvec, uint64_t opts)
{
XrdSfsXferSize bytes;
if ((bytes = read(offset, buffer, rdlen)) <= 0) {
return bytes;
}
// Generate the crc's
XrdOucPgrwUtils::csCalc(buffer, offset, bytes, csvec);
return bytes;
}
/*----------------------------------------------------------------------------*/
/*
* @brief close a file object
*
* @return SFS_OK
*
* The close on the MGM is called only for files opened using the 'proc' e.g.
* EOS shell comamnds. By construction failures can happen only during the open
* of a 'proc' file e.g. the close always succeeds!
*/
/*----------------------------------------------------------------------------*/
int
XrdMgmOfsFile::close()
{
oh = -1;
if (mProcCmd) {
mProcCmd->close();
return SFS_OK;
}
return SFS_OK;
}
/*----------------------------------------------------------------------------*/
/*
* @brief stat the size of an open 'proc' command/file
*
* @param buf stat struct where to store information
* @return SFS_OK if open proc file otherwise SFS_ERROR
*
* For 'proc' files the result is created during the file open call.
* The stat function will fill the size of the created result into the stat
* buffer.
*/
/*----------------------------------------------------------------------------*/
int
XrdMgmOfsFile::stat(struct stat* buf)
{
static const char* epname = "stat";
if (mIsZeroSize) {
memset(buf, 0, sizeof(struct stat));
return 0;
}
if (mProcCmd) {
return mProcCmd->stat(buf);
}
return Emsg(epname, error, EOPNOTSUPP, "stat", fileName.c_str());
}
/*----------------------------------------------------------------------------*/
int
XrdMgmOfsFile::sync()
/*----------------------------------------------------------------------------*/
/*
* sync an open file - no implemented (no use case)
*
* @return SFS_ERROR and EOPNOTSUPP
*/
/*----------------------------------------------------------------------------*/
{
static const char* epname = "sync";
return Emsg(epname, error, EOPNOTSUPP, "sync", fileName.c_str());
}
/*----------------------------------------------------------------------------*/
int
XrdMgmOfsFile::sync(XrdSfsAio* aiop)
/*----------------------------------------------------------------------------*/
/*
* aio sync an open file - no implemented (no use case)
*
* @return SFS_ERROR and EOPNOTSUPP
*/
/*----------------------------------------------------------------------------*/
{
static const char* epname = "sync";
// Execute this request in a synchronous fashion
//
return Emsg(epname, error, EOPNOTSUPP, "sync", fileName.c_str());
}
/*----------------------------------------------------------------------------*/
int
XrdMgmOfsFile::truncate(XrdSfsFileOffset flen)
/*----------------------------------------------------------------------------*/
/*
* truncate an open file - no implemented (no use case)
*
* @return SFS_ERROR and EOPNOTSUPP
*/
/*----------------------------------------------------------------------------*/
{
static const char* epname = "trunc";
return Emsg(epname, error, EOPNOTSUPP, "truncate", fileName.c_str());
}
/*----------------------------------------------------------------------------*/
XrdMgmOfsFile::~XrdMgmOfsFile()
/*----------------------------------------------------------------------------*/
/*
* @brief destructor
*
* Cleans-up the file object on destruction
*/
/*----------------------------------------------------------------------------*/
{
if (oh > 0) {
close();
}
if (openOpaque) {
delete openOpaque;
openOpaque = 0;
}
}
//------------------------------------------------------------------------------
/*
* @brief create an error message for a file object
*
* @param pfx message prefix value
* @param einfo error text/code object
* @param ecode error code
* @param op name of the operation performed
* @param target target of the operation e.g. file name etc.
*
* @return SFS_ERROR in all cases
*
* This routines prints also an error message into the EOS log.
*/
//------------------------------------------------------------------------------
int
XrdMgmOfsFile::Emsg(const char* pfx,
XrdOucErrInfo& einfo,
int ecode,
const char* op,
const char* target)
{
char etext[128], buffer[4096];
// Get the reason for the error
if (ecode < 0) {
ecode = -ecode;
}
if (eos::common::strerror_r(ecode, etext, sizeof(etext))) {
sprintf(etext, "reason unknown (%d)", ecode);
}
// Format the error message
snprintf(buffer, sizeof(buffer), "Unable to %s %s; %s", op, target, etext);
eos_err("Unable to %s %s; %s", op, target, etext);
// Place the error message in the error object and return
einfo.setErrInfo(ecode, buffer);
return SFS_ERROR;
}
//------------------------------------------------------------------------------
// Check if this is a client retry with exclusion of some diskserver. This
// happens usually for CMS workflows. To distinguish such a scenario from
// a legitimate retry due to a recoverable error, we need to search for the
// "tried=" opaque tag without a corresponding "triedrc=" tag.
//------------------------------------------------------------------------------
bool
XrdMgmOfsFile::IsRainRetryWithExclusion(bool is_rw, unsigned long lid) const
{
if (!is_rw && eos::common::LayoutId::IsRain(lid)) {
char* tried_info = openOpaque->Get("tried");
if ((tried_info == nullptr) || strlen(tried_info) == 0) {
return false;
}
// Don't exclude if tried information contains a globally unique cluster
// ID which has the form: +
bool exclude = false;
auto endpoints = eos::common::StringTokenizer::split
>(tried_info, ',');
for (const auto& ep : endpoints) {
if (!ep.empty() && (ep[0] != '+')) {
exclude = true;
break;
}
}
if (openOpaque->Get("triedrc") == nullptr) {
return exclude;
}
}
return false;
}
//------------------------------------------------------------------------------
// Parse the triedrc opaque info and return the corresponding error number
//------------------------------------------------------------------------------
int
XrdMgmOfsFile::GetTriedrcErrno(const std::string& input) const
{
if (input.empty()) {
return 0;
}
std::vector vect_err;
eos::common::StringConversion::Tokenize(input, vect_err, ",");
for (const auto& elem : vect_err) {
if (elem == "enoent") {
return ENOENT;
} else if (elem == "ioerr") {
return EIO;
} else if (elem == "fserr") {
return EFAULT;
} else if (elem == "srverr") {
return EFAULT;
}
}
return 0;
}
//------------------------------------------------------------------------------
// Handle (delegated) TPC redirection
//------------------------------------------------------------------------------
bool
XrdMgmOfsFile::RedirectTpcAccess()
{
if (!gOFS->mTpcRedirect) {
return false;
}
const char* tpc_key = openOpaque->Get("tpc.key");
if (tpc_key == nullptr) {
return false;
}
bool is_delegated_tpc = (strncmp(tpc_key, "delegate", 8) == 0);
// Support the tpc.dlgon=1 marker for XRootD client >= 4.11.2
const char* dlg_marker = openOpaque->Get("tpc.dlgon");
if (dlg_marker) {
is_delegated_tpc = is_delegated_tpc || (strncmp(dlg_marker, "1", 1) == 0);
}
auto it = gOFS->mTpcRdrInfo.find(is_delegated_tpc);
// If rdr info not present or if host is empty then skip
if ((it == gOFS->mTpcRdrInfo.end()) || (it->second.first.empty())) {
return false;
}
error.setErrInfo(it->second.second, it->second.first.c_str());
eos_info("msg=\"tpc %s redirect\" rdr_host=%s rdr_port=%i",
is_delegated_tpc ? "delegated" : "undelegated",
it->second.first.c_str(), it->second.second);
return true;
}
//------------------------------------------------------------------------------
// Dump scheduling info
//------------------------------------------------------------------------------
void
XrdMgmOfsFile::LogSchedulingInfo(const std::vector& selected_fs,
const std::vector& proxy_eps,
const std::vector& fwall_eps) const
{
eos::common::Logging& g_logging = eos::common::Logging::GetInstance();
if (g_logging.gLogMask & LOG_MASK(LOG_DEBUG)) {
std::ostringstream oss;
oss << "selectedfs: ";
for (const auto& elem : selected_fs) {
oss << elem << " ";
}
oss << "proxys: ";
for (const auto& elem : proxy_eps) {
oss << elem << " ";
}
oss << "firewallentrypoints: ";
for (const auto& elem : fwall_eps) {
oss << elem << " ";
}
eos_debug("msg=\"scheduling info %s\"", oss.str().c_str());
}
}
//------------------------------------------------------------------------------
// Get file system ids excluded from scheduling
//------------------------------------------------------------------------------
std::vector
XrdMgmOfsFile::GetExcludedFsids() const
{
std::vector fsids;
std::string sfsids;
if (openOpaque) {
sfsids = (openOpaque->Get("eos.excludefsid") ?
openOpaque->Get("eos.excludefsid") : "");
}
if (sfsids.empty()) {
return fsids;
}
auto lst_ids = eos::common::StringTokenizer::split>
(sfsids, ',');
for (const auto& sid : lst_ids) {
try {
fsids.push_back(std::stoul(sid));
} catch (...) {}
}
return fsids;
}