//------------------------------------------------------------------------------
// File XrdFstOss.cc
// Author Elvin-Alin Sindrilaru - CERN
//------------------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2011 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include
#include
#include
#include
#include
#include
#include
#include
#include "XrdVersion.hh"
#include "XrdOuc/XrdOucUtils.hh"
#include "XrdOuc/XrdOuca2x.hh"
#include "XrdOuc/XrdOucStream.hh"
#include "fst/XrdFstOss.hh"
#include "fst/checksum/ChecksumPlugins.hh"
#include "fst/XrdFstOssFile.hh"
extern XrdSysError OssEroute;
// Set the version information
XrdVERSIONINFO(XrdOssGetStorageSystem, FstOss);
extern "C"
{
XrdOss*
XrdOssGetStorageSystem(XrdOss* native_oss,
XrdSysLogger* Logger,
const char* config_fn,
const char* parms)
{
OssEroute.SetPrefix("FstOss_");
OssEroute.logger(Logger);
eos::fst::XrdFstOss* fstOss = new eos::fst::XrdFstOss();
return (fstOss->Init(Logger, config_fn) ? 0 : (XrdOss*) fstOss);
}
}
EOSFSTNAMESPACE_BEGIN
#define XrdFstOssFDMINLIM 64
// pointer to the current OSS implementation to be used by the oss files
XrdFstOss* XrdFstSS = 0;
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
XrdFstOss::XrdFstOss() :
eos::common::LogId(),
mFdFence(-1),
mFdLimit(-1),
mPrBytes(0),
mPrActive(0),
mPrDepth(0),
mPrQSize(0)
{
eos_debug("Calling the constructor of XrdFstOss.");
mPrPBits = (long long)sysconf(_SC_PAGESIZE);
mPrPSize = static_cast(mPrPBits);
mPrPBits--;
mPrPMask = ~mPrPBits;
}
//------------------------------------------------------------------------------
// Destructor
//------------------------------------------------------------------------------
XrdFstOss::~XrdFstOss()
{
// empty
}
//------------------------------------------------------------------------------
// Init function
//------------------------------------------------------------------------------
int
XrdFstOss::Init(XrdSysLogger* lp, const char* configfn)
{
int NoGo = 0;
XrdFstSS = this;
// Set logging parameters
XrdOucString unit = "fstoss@";
unit += "localhost";
// Setup the circular in-memory log buffer
eos::common::Logging& g_logging = eos::common::Logging::GetInstance();
g_logging.SetLogPriority(LOG_INFO);
g_logging.SetUnit(unit.c_str());
eos_debug("info=\"oss logging configured\"");
// Process the configuration file
OssEroute.logger(lp);
NoGo = Configure(configfn, OssEroute);
// Establish the FD limit
struct rlimit rlim;
if (getrlimit(RLIMIT_NOFILE, &rlim) < 0) {
eos_warning("can not get resource limits, errno=", errno);
mFdLimit = XrdFstOssFDMINLIM;
} else {
mFdLimit = rlim.rlim_cur;
}
if (mFdFence < 0 || mFdFence >= mFdLimit) {
mFdFence = mFdLimit >> 1;
}
return NoGo;
}
//------------------------------------------------------------------------------
// Configuration function for the oss plugin
//------------------------------------------------------------------------------
int
XrdFstOss::Configure(const char* configfn, XrdSysError& Eroute)
{
char* var;
int cfgFD;
int NoGo = 0;
XrdOucEnv myEnv;
XrdOucStream Config(&Eroute, getenv("XRDINSTANCE"), &myEnv, "=====> ");
// If there is no config file, return with the defaults sets
if (!configfn || !*configfn) {
Eroute.Say("Config warning: config file not specified; defaults assumed.");
return NoGo;
}
// Try to open the configuration file
if ((cfgFD = open(configfn, O_RDONLY, 0)) < 0) {
Eroute.Emsg("Config", errno, "open config file", configfn);
return 1;
}
Config.Attach(cfgFD);
// Now start reading records until eof
while ((var = Config.GetMyFirstWord())) {
if (!strncmp(var, "oss.", 4)) {
if (!strncmp(var + 4, "preread", 7)) {
NoGo = xprerd(Config, Eroute);
}
}
}
eos_info("preread depth=%i, queue_size=%i and bytes=%i",
mPrDepth, mPrQSize, mPrBytes);
Config.Close();
(void) close(cfgFD);
return NoGo;
}
//------------------------------------------------------------------------------
// Function xprerd to parse the preread directive
//------------------------------------------------------------------------------
int
XrdFstOss::xprerd(XrdOucStream& Config, XrdSysError& Eroute)
{
static const long long m16 = 16777216LL;
char* val;
long long lim = 1048576;
int depth, qeq = 0, qsz = 128;
if (!(val = Config.GetWord())) {
Eroute.Emsg("Config", "preread depth not specified");
return 1;
}
if (!strcmp(val, "on")) {
depth = 3;
} else if (XrdOuca2x::a2i(Eroute, "preread depth", val, &depth, 0, 1024)) {
return 1;
}
while ((val = Config.GetWord())) {
if (!strcmp(val, "limit")) {
if (!(val = Config.GetWord())) {
Eroute.Emsg("Config", "preread limit not specified");
return 1;
}
if (XrdOuca2x::a2sz(Eroute, "preread limit", val, &lim, 0, m16)) {
return 1;
}
} else if (!strcmp(val, "qsize")) {
if (!(val = Config.GetWord())) {
Eroute.Emsg("Config", "preread qsize not specified");
return 1;
}
if (XrdOuca2x::a2i(Eroute, "preread qsize", val, &qsz, 0, 1024)) {
return 1;
}
if (qsz < depth) {
Eroute.Emsg("Config", "preread qsize must be >= depth");
return 1;
}
} else {
Eroute.Emsg("Config", "invalid preread option -", val);
return 1;
}
}
if (lim < mPrPSize || !qsz) {
depth = 0;
}
if (!qeq && depth) {
qsz = qsz / (depth / 2 + 1);
if (qsz < depth) {
qsz = depth;
}
}
mPrDepth = depth;
mPrQSize = qsz;
mPrBytes = lim;
return 0;
}
//------------------------------------------------------------------------------
// New file
//------------------------------------------------------------------------------
XrdOssDF*
XrdFstOss::newFile(const char* tident)
{
return (XrdOssDF*) new XrdFstOssFile(tident);
}
//------------------------------------------------------------------------------
// New directory
//------------------------------------------------------------------------------
XrdOssDF*
XrdFstOss::newDir(const char* tident)
{
eos_debug("Calling XrdFstOss::newDir - not used in EOS");
return NULL;
}
//------------------------------------------------------------------------------
// Unlink file and its block checksum if needed
//------------------------------------------------------------------------------
int
XrdFstOss::Unlink(const char* path, int opts, XrdOucEnv* ep)
{
int retc = 0;
struct stat statinfo;
// Unlink the block checksum files - this is not the 'best' solution,
// but we don't have any info about block checksums
Adler xs; // the type does not matter here
const char* xs_path = xs.MakeBlockXSPath(path);
if ((Stat(xs_path, &statinfo))) {
eos_debug("error=cannot stat closed file - probably already unlinked: %s",
xs_path);
} else {
if (!xs.UnlinkXSPath()) {
eos_debug("info=\"removed block-xs\" path=%s.", path);
}
}
// Unlink the file
int i;
char local_path[MAXPATHLEN + 1 + 8];
strncpy(local_path, path, (size_t)(MAXPATHLEN + 8));
local_path[MAXPATHLEN + 8] = '\0';
if (lstat(local_path, &statinfo)) {
retc = (errno == ENOENT ? 0 : -errno);
} else if ((statinfo.st_mode & S_IFMT) == S_IFLNK) {
retc = BreakLink(local_path, statinfo);
} else if ((statinfo.st_mode & S_IFMT) == S_IFDIR) {
i = strlen(local_path);
if (local_path[i - 1] != '/') {
strcpy(local_path + i, "/");
}
if ((retc = rmdir(local_path))) {
retc = -errno;
}
return retc;
}
if (!retc) {
if (unlink(local_path)) {
retc = -errno;
} else {
retc = XrdOssOK;
}
}
return retc;
}
//------------------------------------------------------------------------------
// Delete a link file
//------------------------------------------------------------------------------
int
XrdFstOss::BreakLink(const char* local_path, struct stat& statbuff)
{
char lnkbuff[MAXPATHLEN + 64];
int lnklen, retc = XrdOssOK;
// Read the contents of the link
if ((lnklen = readlink(local_path, lnkbuff, sizeof(lnkbuff) - 1)) < 0) {
return -errno;
}
// Return the actual stat information on the target (which might not exist
lnkbuff[lnklen] = '\0';
if (stat(lnkbuff, &statbuff)) {
statbuff.st_size = 0;
} else if (unlink(lnkbuff) && errno != ENOENT) {
retc = -errno;
OssEroute.Emsg("BreakLink", retc, "unlink symlink target", lnkbuff);
}
return retc;
}
//--------------------------------------------------------------------------
// Chmod on a file
//--------------------------------------------------------------------------
int
XrdFstOss::Chmod(const char* path, mode_t mode, XrdOucEnv* eP)
{
return (chmod(path, mode) ? -errno : XrdOssOK);
}
//--------------------------------------------------------------------------
// Create a file named 'path' with 'mode' access mode bits set
//--------------------------------------------------------------------------
int
XrdFstOss::Create(const char* tident,
const char* path,
mode_t mode,
XrdOucEnv& env,
int opts)
{
int retc = 0;
int datfd;
int is_link = 0;
int missing = 1;
char local_path[MAXPATHLEN + 1], *p, pc;
struct stat buf;
const int AMode = S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH; // 775
if (strlen(path) >= MAXPATHLEN) {
return -ENAMETOOLONG;
}
strncpy(local_path, path, (size_t)(MAXPATHLEN));
local_path[MAXPATHLEN] = '\0';
// Determine the state of the file. We will need this information as we go on
if ((missing = lstat(path, &buf))) {
retc = errno;
} else {
if ((is_link = ((buf.st_mode & S_IFMT) == S_IFLNK))) {
if (stat(path, &buf)) {
if (errno != ENOENT) {
return -errno;
}
OssEroute.Emsg("Create", "removing dangling link", path);
if (unlink(path)) {
retc = errno;
}
missing = 1;
is_link = 0;
}
}
}
if (retc && (retc != ENOENT)) {
return -retc;
}
// The file must not exist if it's declared "new". Otherwise, reuse the space
if (!missing) {
if (opts & XRDOSS_new) {
return -EEXIST;
}
if ((buf.st_mode & S_IFMT) == S_IFDIR) {
return -EISDIR;
}
do {
datfd = open(local_path, opts >> 8, mode);
} while (datfd < 0 && errno == EINTR);
if (datfd < 0) {
return -errno;
} else {
close(datfd);
}
if (opts >> 8 & O_TRUNC && buf.st_size && is_link) {
buf.st_mode = (buf.st_mode & ~S_IFMT) | S_IFLNK;
}
return XrdOssOK;
}
// If the path is to be created, make sure the path exists at this point
if ((opts & XRDOSS_mkpath) && (p = rindex(local_path, '/'))) {
p++;
pc = *p;
*p = '\0';
XrdOucUtils::makePath(local_path, AMode);
*p = pc;
}
// Simply open the file in the local filesystem, creating it if need be
do {
datfd = open(local_path, opts >> 8, mode);
} while (datfd < 0 && errno == EINTR);
if (datfd < 0) {
return -errno;
} else {
close(datfd);
}
return XrdOssOK;
}
//------------------------------------------------------------------------------
// Create directory
//------------------------------------------------------------------------------
int
XrdFstOss::Mkdir(const char* path,
mode_t mode,
int mkpath,
XrdOucEnv* eP)
{
// Operation not supported in EOS
return -ENOTSUP;
}
//------------------------------------------------------------------------------
// Delete a directory from the namespace
//------------------------------------------------------------------------------
int
XrdFstOss::Remdir(const char* path,
int opts,
XrdOucEnv* eP)
{
// Operation not supported in EOS
return -ENOTSUP;
}
//------------------------------------------------------------------------------
// Renames a file with name 'old_name' to 'new_name'
//------------------------------------------------------------------------------
int
XrdFstOss::Rename(const char* oldname,
const char* newname,
XrdOucEnv* old_env,
XrdOucEnv* new_env)
{
int retc2;
int retc = XrdOssOK;
char local_path_old[MAXPATHLEN + 8];
char local_path_new[MAXPATHLEN + 8];
char* slash_plus, sPChar;
struct stat statbuff;
static const mode_t pMode = S_IRWXU | S_IRWXG;
strncpy(local_path_old, oldname, (size_t)(MAXPATHLEN + 7));
local_path_old[MAXPATHLEN + 7] = '\0';
strncpy(local_path_new, newname, (size_t)(MAXPATHLEN + 7));
local_path_new[MAXPATHLEN + 7] = '\0';
// Make sure that the target file does not exist
retc2 = lstat(local_path_new, &statbuff);
if (!retc2) {
return -EEXIST;
}
// We need to create the directory path if it does not exist.
if (!(slash_plus = rindex(local_path_new, '/'))) {
return -EINVAL;
}
slash_plus++;
sPChar = *slash_plus;
*slash_plus = '\0';
retc2 = XrdOucUtils::makePath(local_path_new, pMode);
*slash_plus = sPChar;
if (retc2) {
return retc2;
}
// Check if this path is really a symbolic link elsewhere
if (lstat(local_path_old, &statbuff)) {
retc = -errno;
} else if (rename(local_path_old, local_path_new)) {
retc = -errno;
}
return retc;
}
//------------------------------------------------------------------------------
// Determine if file 'path' actually exists
//------------------------------------------------------------------------------
int
XrdFstOss::Stat(const char* path,
struct stat* buff,
int opts,
XrdOucEnv* EnvP)
{
int retc;
char local_path[MAXPATHLEN + 1];
strncpy(local_path, path, (size_t)(MAXPATHLEN));
local_path[MAXPATHLEN] = '\0';
// Stat the file in the local filesystem and update access time if so requested
if (!stat(local_path, buff)) {
if (opts & XRDOSS_updtatm && (buff->st_mode & S_IFMT) == S_IFREG) {
struct utimbuf times;
times.actime = time(0);
times.modtime = buff->st_mtime;
utime(local_path, ×);
}
retc = XrdOssOK;
} else {
retc = (errno ? -errno : -ENOMSG);
}
return retc;
}
//------------------------------------------------------------------------------
// Truncate a file
//------------------------------------------------------------------------------
int
XrdFstOss::Truncate(const char* path,
unsigned long long size,
XrdOucEnv* envP)
{
struct stat statbuff;
char local_path[MAXPATHLEN + 1];
strncpy(local_path, path, (size_t)(MAXPATHLEN));
local_path[MAXPATHLEN] = '\0';
if (lstat(local_path, &statbuff)) {
return -errno;
} else if ((statbuff.st_mode & S_IFMT) == S_IFDIR) {
return -EISDIR;
} else if ((statbuff.st_mode & S_IFMT) == S_IFLNK) {
struct stat buff;
if (stat(local_path, &buff)) {
return -errno;
}
}
if (truncate(local_path, size)) {
return -errno;
}
return XrdOssOK;
}
//------------------------------------------------------------------------------
// Add new entry to file name <-> blockchecksum map
//------------------------------------------------------------------------------
XrdSysRWLock*
XrdFstOss::AddMapping(const std::string& fileName,
CheckSum*& blockXs,
bool isRW)
{
XrdSysRWLockHelper wr_lock(mRWMap, 0); // --> wrlock map
std::pair pair_value;
eos_debug("Initial map size: %i and filename: %s",
mMapFileXs.size(), fileName.c_str());
if (mMapFileXs.count(fileName)) {
pair_value = mMapFileXs[fileName];
XrdSysRWLockHelper wr_xslock(pair_value.first, 0); // --> wrlock xs obj
// If no. ref 0 then the obj is closed and waiting to be deleted so we can
// add the new one, else return the old one
if (pair_value.second->GetTotalRef() == 0) {
pair_value.second->CloseMap();
delete pair_value.second;
pair_value = std::make_pair(pair_value.first, blockXs);
mMapFileXs[fileName] = pair_value;
eos_debug("Update old entry, map size: %i. ", mMapFileXs.size());
} else {
blockXs->CloseMap();
delete blockXs;
blockXs = pair_value.second;
}
blockXs->IncrementRef(isRW);
return pair_value.first;
} else {
XrdSysRWLock* mutex_xs = new XrdSysRWLock();
pair_value = std::make_pair(mutex_xs, blockXs);
// Can increment without the lock as no one knows about this obj. yet
blockXs->IncrementRef(isRW);
mMapFileXs[fileName] = pair_value;
eos_debug("Add completely new obj, map size: %i and filename: %s",
mMapFileXs.size(), fileName.c_str());
return mutex_xs;
}
}
//------------------------------------------------------------------------------
// Get blockchecksum object for a filname
//------------------------------------------------------------------------------
std::pair
XrdFstOss::GetXsObj(const std::string& fileName, bool isRW)
{
XrdSysRWLockHelper rd_lock(mRWMap); // --> rdlock map
std::pair pair_value;
if (mMapFileXs.count(fileName)) {
pair_value = mMapFileXs[fileName];
XrdSysRWLock* mutex_xs = pair_value.first;
CheckSum* xs_obj = pair_value.second;
// Lock xs obj as multiple threads can update the value here
XrdSysRWLockHelper xs_wrlock(mutex_xs, 0); // --> wrlock xs obj
eos_debug("\nXs obj no ref: %i.\n", xs_obj->GetTotalRef());
if (xs_obj->GetTotalRef() != 0) {
xs_obj->IncrementRef(isRW);
return std::make_pair(mutex_xs, xs_obj);
} else {
// If no refs., it means the obj was closed and waiting to be deleted
return std::make_pair(NULL, NULL);
}
}
return std::make_pair(NULL, NULL);
}
//------------------------------------------------------------------------------
// Drop blockchecksum object for a file name
//------------------------------------------------------------------------------
void
XrdFstOss::DropXs(const std::string& fileName, bool force)
{
XrdSysRWLockHelper wr_lock(mRWMap, 0); // --> wrlock map
std::pair pair_value;
eos_debug("Oss map size before drop: %i.", mMapFileXs.size());
if (mMapFileXs.count(fileName)) {
pair_value = mMapFileXs[fileName];
// If no refs to the checksum, we can safely delete it
pair_value.first->WriteLock(); // --> wrlock xs obj
eos_debug("Xs obj no ref: %i.", pair_value.second->GetTotalRef());
if ((pair_value.second->GetTotalRef() == 0) || force) {
pair_value.second->CloseMap();
pair_value.first->UnLock(); // <-- unlock xs obj
delete pair_value.first;
delete pair_value.second;
mMapFileXs.erase(fileName);
} else {
eos_debug("Do not drop the mapping");
pair_value.first->UnLock(); // <-- unlock xs obj
}
}
eos_debug("Oss map size after drop: %i.", mMapFileXs.size());
}
EOSFSTNAMESPACE_END