//------------------------------------------------------------------------------ // File: XrdIo.cc // Author: Elvin-Alin Sindrilaru - CERN //------------------------------------------------------------------------------ /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2011 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include #include #include "fst/io/xrd/XrdIo.hh" #include "fst/io/ChunkHandler.hh" #include "fst/io/VectChunkHandler.hh" #include "fst/io/AsyncMetaHandler.hh" #include "common/FileMap.hh" #include "common/Logging.hh" #include "XrdCl/XrdClDefaultEnv.hh" #include "XrdCl/XrdClBuffer.hh" #include "XrdCl/XrdClConstants.hh" #include "XrdSfs/XrdSfsInterface.hh" // Linux compat for Apple #ifdef __APPLE__ #ifndef EREMOTEIO #define EREMOTEIO 121 #endif #endif namespace { //---------------------------------------------------------------------------- //! InitReadahead //! //! @return true if readahead is enabled, otherwise false //---------------------------------------------------------------------------- bool InitReadahead() { char* ptr = getenv("EOS_FST_XRDIO_READAHEAD"); return (ptr ? strtol(ptr, 0, 10) ? true : false : false); } //---------------------------------------------------------------------------- //! InitReadaheadForceDisable //! //! @return true if readahead is force disabled, otherwise false //---------------------------------------------------------------------------- bool InitReadaheadForceDisable() { char* ptr = getenv("EOS_FST_XRDIO_READAHEAD_FORCE_DISABLE"); return (ptr ? strtol(ptr, 0, 10) ? true : false : false); } //---------------------------------------------------------------------------- //! InitInitNumRdAheadBlocks //! //! @return number of blocks that should be read ahead //---------------------------------------------------------------------------- uint32_t InitNumRdAheadBlocks() { char* ptr = getenv("EOS_FST_XRDIO_READAHEAD_BLOCKS"); // default is 2 if envar is not set return (ptr ? strtoul(ptr, 0, 10) : 2ul); } //---------------------------------------------------------------------------- //! InitBlocksize //! //! @return read-ahead block size //---------------------------------------------------------------------------- int32_t InitBlocksize() { char* ptr = getenv("EOS_FST_XRDIO_BLOCK_SIZE"); //default is 1M if the envar is not set return (ptr ? strtol(ptr, 0, 10) : 1024 * 1024); } const bool sReadaheadForceDisable = InitReadaheadForceDisable(); const bool sReadahead = InitReadahead(); const int32_t sBlockSize = InitBlocksize(); const uint32_t sNumRdAheadBlocks = InitNumRdAheadBlocks(); eos::common::BufferManager gBuffMgr(2 * eos::common::GB); } EOSFSTNAMESPACE_BEGIN // Static variables eos::common::XrdConnPool XrdIo::mXrdConnPool; namespace { std::string getAttrUrl(std::string path) { size_t qfind = path.rfind("?"); size_t rfind = path.rfind("/", qfind); if (rfind != std::string::npos) { path.insert(rfind + 1, "."); } path += ".xattr"; return path; } } //------------------------------------------------------------------------------ // Constuctor for ReadaheadBlock //------------------------------------------------------------------------------ ReadaheadBlock::ReadaheadBlock(uint64_t blocksize, eos::common::BufferManager* buf_mgr, SimpleHandler* hd): mBufMgr(buf_mgr) { if (mBufMgr) { mBuffer = mBufMgr->GetBuffer(blocksize); } else { mBuffer = std::make_shared(blocksize); } if (mBuffer == nullptr) { throw std::bad_alloc(); } if (hd) { mHandler.reset(hd); } else { mHandler = std::make_unique(); } } //------------------------------------------------------------------------------ // Get pointer to the underlying data buffer //------------------------------------------------------------------------------ char* ReadaheadBlock::GetDataPtr() { return mBuffer->GetDataPtr(); } //------------------------------------------------------------------------------ // Destructor //------------------------------------------------------------------------------ ReadaheadBlock::~ReadaheadBlock() { if (mBufMgr) { mBufMgr->Recycle(mBuffer); } } //------------------------------------------------------------------------------ // Handle asynchronous open responses //------------------------------------------------------------------------------ void AsyncIoOpenHandler::HandleResponseWithHosts(XrdCl::XRootDStatus* status, XrdCl::AnyObject* response, XrdCl::HostList* hostList) { delete hostList; // Response shoud be nullptr in general if (response) { delete response; } mFileIO->mXrdFile->GetProperty("LastURL", mFileIO->mLastTriedUrl); if (status->IsOK()) { // Store the last URL we are connected after open mFileIO->mXrdFile->GetProperty("LastURL", mFileIO->mLastUrl); mFileIO->mIsOpen = true; } mLayoutOpenHandler->HandleResponseWithHosts(status, 0, 0); delete this; } //------------------------------------------------------------------------------ // Constructor //------------------------------------------------------------------------------ XrdIo::XrdIo(std::string path) : FileIo(path, "XrdIo"), mDoReadahead(sReadahead), mNumRdAheadBlocks(sNumRdAheadBlocks), mBlocksize(sBlockSize), mXrdFile(NULL), mMetaHandler(new AsyncMetaHandler()), mXrdIdHelper(nullptr), mPrefetchOffset(0ull), mPrefetchHits(0ull), mPrefetchBlocks(0ull) { // Set the TimeoutResolution to 1 XrdCl::Env* env = XrdCl::DefaultEnv::GetEnv(); env->PutInt("TimeoutResolution", 1); size_t qpos; // Opaque info can be part of the 'path' if (((qpos = mFilePath.find("?")) != std::string::npos)) { mOpaque = mFilePath.substr(qpos + 1); mFilePath.erase(qpos); } else { mOpaque = ""; } // Set url for xattr requests mAttrUrl = getAttrUrl(mFilePath.c_str()); setAttrSync(false);// by default sync attributes lazyly mAttrLoaded = false; mAttrDirty = false; } //------------------------------------------------------------------------------ // Destructor //------------------------------------------------------------------------------ XrdIo::~XrdIo() { if (mIsOpen) { fileClose(); } while (!mQueueBlocks.empty()) { ReadaheadBlock* ptr_readblock = mQueueBlocks.front(); mQueueBlocks.pop(); delete ptr_readblock; } while (!mMapBlocks.empty()) { delete mMapBlocks.begin()->second; mMapBlocks.erase(mMapBlocks.begin()); } delete mMetaHandler; // deal with asynchrnous dirty attributes if (!mAttrSync && mAttrDirty) { std::string lMap = mFileMap.Trim(); if (!XrdIo::Upload(mAttrUrl, lMap)) { mAttrDirty = false; } else { eos_static_err("msg=\"unable to upload to remote file map\" url=\"%s\"", mAttrUrl.c_str()); } } if (mXrdFile) { delete mXrdFile; } } //------------------------------------------------------------------------------ // Open file - synchronously //------------------------------------------------------------------------------ int XrdIo::fileOpen(XrdSfsFileOpenMode flags, mode_t mode, const std::string& opaque, uint16_t timeout) { const char* val = nullptr; mWriteStatus = XrdCl::XRootDStatus(); if (!opaque.empty()) { if (mOpaque.empty()) { mOpaque = opaque; } else { mOpaque = mOpaque + "&" + opaque; } } XrdOucEnv env_opaque(mOpaque.c_str()); // Decide if readahead is used and the block size if (!sReadaheadForceDisable && (val = env_opaque.Get("fst.readahead")) && (strncmp(val, "true", 4) == 0)) { eos_debug("%s", "msg=\"enabling the readahead\""); mDoReadahead = true; val = 0; if ((val = env_opaque.Get("fst.blocksize"))) { mBlocksize = static_cast(atoll(val)); } } if (mXrdFile) { delete mXrdFile; mXrdFile = NULL; } mXrdFile = new XrdCl::File(); // Final path + opaque info used in the open mTargetUrl.FromString(BuildRequestUrl()); mXrdIdHelper.reset(new eos::common::XrdConnIdHelper(mXrdConnPool, mTargetUrl)); if (mXrdIdHelper->HasNewConnection()) { eos_info("xrd_connection_id=%s", mTargetUrl.GetHostId().c_str()); } // Disable recovery on read and write if (!mXrdFile->SetProperty("ReadRecovery", "false") || !mXrdFile->SetProperty("WriteRecovery", "false")) { eos_warning("%s", "msg=failed to set XrdCl::File properties read recovery and write " "recovery to false\""); } XrdCl::OpenFlags::Flags flags_xrdcl = eos::common::LayoutId::MapFlagsSfs2XrdCl( flags); XrdCl::Access::Mode mode_xrdcl = eos::common::LayoutId::MapModeSfs2XrdCl(mode); XrdCl::XRootDStatus status = mXrdFile->Open(mTargetUrl.GetURL().c_str(), flags_xrdcl, mode_xrdcl, timeout); mXrdFile->GetProperty("LastURL", mLastTriedUrl); if (!status.IsOK()) { mLastErrMsg = status.ToString().c_str(); mLastErrCode = status.code; mLastErrNo = status.errNo; eos_err("error= \"open failed url=%s, errno=%i, errc=%i, msg=%s\"", mTargetUrl.GetURL().c_str(), mLastErrNo, mLastErrCode, mLastErrMsg.c_str()); if (!mLastErrNo) { eos_warning("%s", "msg=\"error encountered despite errno=0; setting errno=22\""); mLastErrNo = EINVAL; } errno = mLastErrNo; return SFS_ERROR; } else { errno = 0; mIsOpen = true; } // Save the last URL we are connected after open mXrdFile->GetProperty("LastURL", mLastUrl); return SFS_OK; } //------------------------------------------------------------------------------ // Open file asynchronously //------------------------------------------------------------------------------ std::future XrdIo::fileOpenAsync(XrdSfsFileOpenMode flags, mode_t mode, const std::string& opaque, uint16_t timeout) { using eos::common::LayoutId; std::promise open_promise; std::future open_future = open_promise.get_future(); if (!opaque.empty()) { if (mOpaque.empty()) { mOpaque = opaque; } else { mOpaque = mOpaque + "&" + opaque; } } XrdOucEnv env_opaque(mOpaque.c_str()); const char* val = nullptr; // Decide if readahead is used and the block size if (!sReadaheadForceDisable && (val = env_opaque.Get("fst.readahead")) && (strncmp(val, "true", 4) == 0)) { eos_debug("%s", "msg=\"enabling the readahead\""); mDoReadahead = true; val = 0; if ((val = env_opaque.Get("fst.blocksize"))) { mBlocksize = static_cast(atoll(val)); } } if (mXrdFile) { delete mXrdFile; mXrdFile = NULL; } mXrdFile = new XrdCl::File(); // Final path + opaque info used in the open mTargetUrl.FromString(BuildRequestUrl()); mXrdIdHelper.reset(new eos::common::XrdConnIdHelper(mXrdConnPool, mTargetUrl)); if (mXrdIdHelper->HasNewConnection()) { eos_info("xrd_connection_id=%s", mTargetUrl.GetHostId().c_str()); } if (!mXrdFile->SetProperty("ReadRecovery", "false") || !mXrdFile->SetProperty("WriteRecovery", "false")) { eos_warning("%s", "msg=\"failed to disable file read and write recovery\""); } XrdIoHandler* open_handler = new XrdIoHandler(std::move(open_promise), XrdIoHandler::OpType::Open); XrdCl::OpenFlags::Flags flags_xrdcl = LayoutId::MapFlagsSfs2XrdCl(flags); XrdCl::Access::Mode mode_xrdcl = LayoutId::MapModeSfs2XrdCl(mode); XrdCl::XRootDStatus status = mXrdFile->Open(mTargetUrl.GetURL().c_str(), flags_xrdcl, mode_xrdcl, open_handler, timeout); if (!status.IsOK()) { open_handler->HandleResponse(new XrdCl::XRootDStatus(status), nullptr); } return open_future; } //------------------------------------------------------------------------------ // Read from file - sync //------------------------------------------------------------------------------ int64_t XrdIo::fileRead(XrdSfsFileOffset offset, char* buffer, XrdSfsXferSize length, uint16_t timeout) { eos_debug("offset=%llu length=%llu", static_cast(offset), static_cast(length)); uint32_t bytes_read = 0; if (!mXrdFile) { errno = EIO; return SFS_ERROR; } XrdCl::XRootDStatus status = mXrdFile->Read(static_cast(offset), static_cast(length), buffer, bytes_read, timeout); if (!status.IsOK()) { errno = status.errNo; mLastErrMsg = status.ToString().c_str(); mLastErrCode = status.code; mLastErrNo = status.errNo; return SFS_ERROR; } return bytes_read; } //------------------------------------------------------------------------------ // Read with prefetching //------------------------------------------------------------------------------ int64_t XrdIo::fileReadPrefetch(XrdSfsFileOffset offset, char* buffer, XrdSfsXferSize length, uint16_t timeout) { eos_debug("offset=%lli length=%i", offset, length); if (!mXrdFile) { errno = EIO; return SFS_ERROR; } if (!mDoReadahead) { eos_debug("%s", "msg=\"readahead is disabled\""); return fileRead(offset, buffer, length, timeout); } int64_t fread = 0; // direct reads int64_t nread = 0; // total read for current request XrdSysMutexHelper lock(mPrefetchMutex); char* ptr_buff = buffer; while (length) { auto iter = FindBlock(offset); if (iter == mMapBlocks.end()) { RecycleBlocks(iter); // Read directly the current block and prefetch the next one fread = fileRead(offset, ptr_buff, length); if (offset && (offset != eos::common::LayoutId::OssXsBlockSize)) { eos_info("msg=\"disable readahead\" offset=%lli", offset); mDoReadahead = false; } if ((fread == length) && mDoReadahead) { if (!PrefetchBlock(offset + length, timeout)) { eos_err("msg=\"failed to send prefetch request\" offset=%lli", offset + length); mDoReadahead = false; } } nread += fread; return nread; } // Update prefetch statistics if (iter->first != mPrefetchOffset) { mPrefetchOffset = iter->first; ++mPrefetchBlocks; } SimpleHandler* sh = iter->second->mHandler.get(); uint64_t shift = offset - iter->first; RecycleBlocks(iter); PrefetchBlock(mMapBlocks.rbegin()->first + mBlocksize); if (!sh->WaitOK()) { // Error while prefetching, remove block from map eos_err("%s", "msg=\"prefetching failed, disable it and clean blocks\""); mDoReadahead = false; RecycleBlocks(mMapBlocks.end()); fread = fileRead(offset, ptr_buff, length); nread += fread; return nread; } eos_debug("msg=\"read from prefetched block\" blk_off=%lld, req_off= %lld", iter->first, offset); if (sh->GetRespLength() <= 0) { // The request got a response but it read 0 bytes eos_debug("%s", "msg=\"response contains 0 bytes\""); return nread; } uint32_t aligned_length = sh->GetRespLength() - shift; uint64_t read_length = ((uint32_t) length < aligned_length) ? length : aligned_length; ptr_buff = static_cast(memcpy(ptr_buff, iter->second->GetDataPtr() + shift, read_length)); ptr_buff += read_length; offset += read_length; length -= read_length; nread += read_length; // If prefetch block smaller than mBlocksize and current offset at the end // of the prefetch block then we reached the end of file if ((sh->GetRespLength() != mBlocksize) && ((uint64_t) offset >= iter->first + sh->GetRespLength())) { break; } } ++mPrefetchHits; return nread; } //------------------------------------------------------------------------------ // Vector read - sync //------------------------------------------------------------------------------ int64_t XrdIo::fileReadV(XrdCl::ChunkList& chunkList, uint16_t timeout) { eos_debug("read count=%i", chunkList.size()); int64_t nread = 0; if (!mXrdFile) { errno = EIO; return SFS_ERROR; } XrdCl::VectorReadInfo* vReadInfo = 0; XrdCl::XRootDStatus status = mXrdFile->VectorRead(chunkList, 0, vReadInfo, timeout); if (!status.IsOK()) { errno = status.errNo; mLastErrMsg = status.ToString().c_str(); mLastErrCode = status.code; mLastErrNo = status.errNo; return SFS_ERROR; } nread = vReadInfo->GetSize(); delete vReadInfo; return nread; } //------------------------------------------------------------------------------ // Vector read - async //------------------------------------------------------------------------------ int64_t XrdIo::fileReadVAsync(XrdCl::ChunkList& chunkList, uint16_t timeout) { if (!mXrdFile) { errno = EIO; return SFS_ERROR; } // Get vector handler and send async request VectChunkHandler* vhandler = 0; XrdCl::XRootDStatus status; eos_debug("read count=%i", chunkList.size()); vhandler = mMetaHandler->Register(chunkList, NULL, false); if (!vhandler) { eos_err("%s", "msg=\"unable to get vector handler\""); return SFS_ERROR; } int64_t nread = vhandler->GetLength(); status = mXrdFile->VectorRead(chunkList, static_cast(0), static_cast(vhandler), timeout); if (!status.IsOK()) { // TODO: for the time being we call this ourselves but this should be // dropped once XrdCl will call the handler for a request as it knows it // has already failed mMetaHandler->HandleResponse(&status, vhandler); mLastErrMsg = status.ToString().c_str(); mLastErrCode = status.code; mLastErrNo = status.errNo; return SFS_ERROR; } return nread; } //------------------------------------------------------------------------------ // Write to file - sync //------------------------------------------------------------------------------ int64_t XrdIo::fileWrite(XrdSfsFileOffset offset, const char* buffer, XrdSfsXferSize length, uint16_t timeout) { eos_debug("offset=%llu length=%llu", static_cast(offset), static_cast(length)); if (!mXrdFile) { errno = EIO; return SFS_ERROR; } XrdCl::XRootDStatus status = mXrdFile->Write(static_cast(offset), static_cast(length), buffer, timeout); if (!status.IsOK()) { errno = status.errNo; mLastErrMsg = status.ToString().c_str(); mLastErrCode = status.code; mLastErrNo = status.errNo; return SFS_ERROR; } return length; } //------------------------------------------------------------------------------ // Write to file - async //------------------------------------------------------------------------------ int64_t XrdIo::fileWriteAsync(XrdSfsFileOffset offset, const char* buffer, XrdSfsXferSize length, uint16_t timeout) { eos_static_debug("offset=%llu length=%i", offset, length); if (!mXrdFile) { errno = EIO; return SFS_ERROR; } if (!mWriteStatus.IsOK()) { // if there was any async write error, we always return it again return SFS_ERROR; } ChunkHandler* handler = mMetaHandler->Register(offset, length, (char*)buffer, true); // If previous write requests failed then we won't get a new handler // and we return directly an error if (!handler) { return SFS_ERROR; } // Obs: Use the handler buffer for write requests XrdCl::XRootDStatus status = mXrdFile->Write(static_cast(offset), static_cast(length), handler->GetBuffer(), handler, timeout); if (!status.IsOK()) { // remember write failures 'forever' mWriteStatus = status; mMetaHandler->HandleResponse(&status, handler); return SFS_ERROR; } return length; } //------------------------------------------------------------------------------ // Write to file - async //------------------------------------------------------------------------------ std::future XrdIo::fileWriteAsync(const char* buffer, XrdSfsFileOffset offset, XrdSfsXferSize length) { eos_static_debug("offset=%llu length=%i", offset, length); std::promise wr_promise; std::future wr_future = wr_promise.get_future(); if (!mXrdFile) { errno = EIO; wr_promise.set_value(XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errOSError, errno)); return wr_future; } XrdIoHandler* wr_handler = nullptr; try { wr_handler = new XrdIoHandler(std::move(wr_promise), XrdIoHandler::OpType::Write, &gBuffMgr, buffer, length); } catch (const BufferAllocateException& e) { errno = ENOMEM; eos_err("msg=\"%s\" offset=%lli, length=%li", e.what(), offset, length); wr_promise.set_value(XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errOSError, errno)); return wr_future; } XrdCl::XRootDStatus status = mXrdFile->Write(static_cast(offset), static_cast(length), wr_handler->GetDataPtr(), wr_handler); if (!status.IsOK()) { wr_handler->HandleResponse(new XrdCl::XRootDStatus(status), nullptr); } return wr_future; } //------------------------------------------------------------------------------ // Wait for async IO //------------------------------------------------------------------------------ int XrdIo::fileWaitAsyncIO() { bool async_ok = true; { XrdSysMutexHelper scope_lock(mPrefetchMutex); // Wait for any requests on the fly and then close while (!mMapBlocks.empty()) { SimpleHandler* shandler = mMapBlocks.begin()->second->mHandler.get(); if (shandler->HasRequest()) { async_ok = shandler->WaitOK(); } delete mMapBlocks.begin()->second; mMapBlocks.erase(mMapBlocks.begin()); } } // Wait for any async requests before closing if (mMetaHandler) { if (mMetaHandler->WaitOK() != XrdCl::errNone) { eos_err("error=async requests failed for file path=%s", mFilePath.c_str()); async_ok = false; } } if (async_ok) { return 0; } else { errno = EIO; return -1; } } //------------------------------------------------------------------------------ // Truncate file //------------------------------------------------------------------------------ int XrdIo::fileTruncate(XrdSfsFileOffset offset, uint16_t timeout) { if (!mXrdFile) { errno = EIO; return SFS_ERROR; } XrdCl::XRootDStatus status = mXrdFile->Truncate(static_cast(offset), timeout); if (!status.IsOK()) { errno = status.errNo; mLastErrMsg = status.ToString().c_str(); mLastErrCode = status.code; mLastErrNo = status.errNo; return SFS_ERROR; } return SFS_OK; } //------------------------------------------------------------------------------ // Truncate asynchronous //------------------------------------------------------------------------------ std::future XrdIo::fileTruncateAsync(XrdSfsFileOffset offset, uint16_t timeout) { eos_static_debug("offset=%llu", offset); std::promise tr_promise; std::future tr_future = tr_promise.get_future(); if (!mXrdFile) { errno = EIO; tr_promise.set_value(XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errUnknown, EIO)); return tr_future; } XrdIoHandler* tr_handler = new XrdIoHandler(std::move(tr_promise), XrdIoHandler::OpType::Truncate); XrdCl::XRootDStatus status = mXrdFile->Truncate(static_cast(offset), tr_handler, timeout); if (!status.IsOK()) { errno = status.errNo; tr_handler->HandleResponse(new XrdCl::XRootDStatus(status), nullptr); } return tr_future; } //------------------------------------------------------------------------------ // Sync file to disk //------------------------------------------------------------------------------ int XrdIo::fileSync(uint16_t timeout) { if (!mXrdFile) { errno = EIO; return SFS_ERROR; } XrdCl::XRootDStatus status = mXrdFile->Sync(timeout); if (!status.IsOK()) { errno = status.errNo; mLastErrMsg = status.ToString().c_str(); mLastErrCode = status.code; mLastErrNo = status.errNo; return SFS_ERROR; } return SFS_OK; } //------------------------------------------------------------------------------ // Get stats about the file //------------------------------------------------------------------------------ int XrdIo::fileStat(struct stat* buf, uint16_t timeout) { if (!mXrdFile) { eos_err("%s", "msg=\"underlying XrdClFile object doesn't exist\""); errno = EIO; return SFS_ERROR; } int rc = SFS_ERROR; XrdCl::StatInfo* stat = 0; XrdCl::XRootDStatus status = mXrdFile->Stat(true, stat, timeout); if (!status.IsOK()) { errno = status.errNo; mLastErrMsg = status.ToString().c_str(); mLastErrCode = status.code; mLastErrNo = status.errNo; eos_info("errcode=%i, errno=%i, errmsg=%s", mLastErrCode, mLastErrNo, mLastErrMsg.c_str()); } else { buf->st_dev = static_cast(atoi(stat->GetId().c_str())); buf->st_mode = static_cast(stat->GetFlags()); buf->st_size = static_cast(stat->GetSize()); buf->st_mtime = static_cast(stat->GetModTime()); rc = SFS_OK; } if (stat) { delete stat; } return rc; } //------------------------------------------------------------------------------ // Execute implementation dependant commands //------------------------------------------------------------------------------ int XrdIo::fileFctl(const std::string& cmd, uint16_t timeout) { if (!mXrdFile) { eos_info("underlying XrdClFile object doesn't exist"); errno = EIO; return SFS_ERROR; } XrdCl::Buffer arg; XrdCl::Buffer* response = 0; (void) arg.FromString(cmd); XrdCl::XRootDStatus status = mXrdFile->Fcntl(arg, response, timeout); delete response; return status.status; } //------------------------------------------------------------------------------ // Close file //------------------------------------------------------------------------------ int XrdIo::fileClose(uint16_t timeout) { if (!mXrdFile) { errno = EIO; return SFS_ERROR; } XrdCl::XRootDStatus okstatus; mWriteStatus = okstatus; bool async_ok = true; mIsOpen = false; if (fileWaitAsyncIO()) { async_ok = false; } XrdCl::XRootDStatus status = mXrdFile->Close(timeout); if (!status.IsOK()) { errno = status.errNo; mLastErrMsg = status.ToString().c_str(); mLastErrCode = status.code; mLastErrNo = status.errNo; return SFS_ERROR; } // If any of the async requests failed then we have an error if (!async_ok) { return SFS_ERROR; } return SFS_OK; } //------------------------------------------------------------------------------ // Remove file //------------------------------------------------------------------------------ int XrdIo::fileRemove(uint16_t timeout) { if (!mXrdFile) { errno = EIO; return SFS_ERROR; } // Send opaque coamand to file object to mark it for deletion XrdCl::Buffer arg; XrdCl::Buffer* response = 0; (void) arg.FromString("delete"); XrdCl::XRootDStatus status = mXrdFile->Fcntl(arg, response, timeout); delete response; if (!status.IsOK()) { eos_err("failed to mark the file for deletion:%s", mFilePath.c_str()); return SFS_ERROR; } return SFS_OK; } //------------------------------------------------------------------------------ // Check for existence //------------------------------------------------------------------------------ int XrdIo::fileExists() { XrdCl::URL xUrl(mFilePath); XrdCl::FileSystem fs(xUrl); XrdCl::StatInfo* stat; XrdCl::XRootDStatus status = fs.Stat(xUrl.GetPath(), stat); errno = 0; if (!status.IsOK()) { if (status.errNo == kXR_NotFound) { errno = ENOENT; mLastErrMsg = "no such file or directory"; mLastErrCode = status.code; mLastErrNo = status.errNo; } else { errno = EIO; mLastErrMsg = "failed to check for existence"; mLastErrCode = status.code; mLastErrNo = status.errNo; } return SFS_ERROR; } if (stat) { delete stat; return SFS_OK; } else { errno = ENODATA; return SFS_ERROR; } } //------------------------------------------------------------------------------ // Delete file by path //------------------------------------------------------------------------------ int XrdIo::fileDelete(const char* url) { XrdCl::URL xUrl(url); std::string attrurl = getAttrUrl(url); XrdCl::URL xAttrUrl(attrurl); XrdCl::FileSystem fs(xUrl); XrdCl::XRootDStatus status = fs.Rm(xUrl.GetPath()); XrdCl::XRootDStatus status_attr = fs.Rm(xAttrUrl.GetPath()); errno = 0; if (!status.IsOK()) { eos_err("error=failed to delete file - %s", url); mLastErrMsg = "failed to delete file"; mLastErrCode = status.code; mLastErrNo = status.errNo; errno = EIO; return SFS_ERROR; } return true; } //------------------------------------------------------------------------------ // Read from file asynchronously // @note The buffer given by the user is not neccessarily populated with // any meaningful data when this function returns. The user should call // fileWaitAsyncIO to enforce this guarantee. //------------------------------------------------------------------------------ int64_t XrdIo::fileReadAsync(XrdSfsFileOffset offset, char* buffer, XrdSfsXferSize length, uint16_t timeout) { // @todo(esindril) fall back to sync mode for the time being return fileRead(offset, buffer, length, timeout); } //------------------------------------------------------------------------------ // Try to find a block in cache which contains the required offset //------------------------------------------------------------------------------ PrefetchMap::iterator XrdIo::FindBlock(uint64_t offset) { if (mMapBlocks.empty()) { return mMapBlocks.end(); } PrefetchMap::iterator iter = mMapBlocks.lower_bound(offset); if ((iter != mMapBlocks.end()) && (iter->first == offset)) { // Found exactly the block needed return iter; } else { if (iter == mMapBlocks.begin()) { // Only blocks with bigger offsets, return pointer to end of the map return mMapBlocks.end(); } else { // Check if the previous block, we know the map is not empty iter--; if ((iter->first <= offset) && (offset < (iter->first + mBlocksize))) { return iter; } else { return mMapBlocks.end(); } } } } //------------------------------------------------------------------------------ // Prefetch block using the readahead mechanism //------------------------------------------------------------------------------ bool XrdIo::PrefetchBlock(int64_t offset, uint16_t timeout) { ReadaheadBlock* block {nullptr}; eos_debug("msg=\"try to prefetch\" offset=%lli length=%i", offset, mBlocksize); // Block is already prefetched if (FindBlock(offset) != mMapBlocks.end()) { return true; } if (mQueueBlocks.empty()) { if (mMapBlocks.size() < mNumRdAheadBlocks) { try { block = new ReadaheadBlock(mBlocksize, &gBuffMgr); } catch (const std::bad_alloc& e) { eos_static_err("%s", "msg=\"failed to allocate a prefetch block\""); return false; } } else { return false; } } else { block = mQueueBlocks.front(); mQueueBlocks.pop(); } block->mHandler->Update(offset, mBlocksize); XrdCl::XRootDStatus status = mXrdFile->Read(offset, mBlocksize, block->GetDataPtr(), block->mHandler.get(), timeout); if (!status.IsOK()) { // Create tmp status which is deleted in the HandleResponse method XrdCl::XRootDStatus* tmp_status = new XrdCl::XRootDStatus(status); block->mHandler->HandleResponse(tmp_status, NULL); mQueueBlocks.push(block); return false; } else { mMapBlocks.insert(std::make_pair(offset, block)); } return true; } //------------------------------------------------------------------------------ // Recycle blocks from the map that are not useful since the current offset // is already grater then their offset //------------------------------------------------------------------------------ void XrdIo::RecycleBlocks(std::map::iterator iter) { for (auto it = mMapBlocks.begin(); it != iter; ++it) { // Remove all elements from map so that we can align with the new // requests and prefetch a new block. But first we need to collect any // responses which are in-flight as otherwise these response might // arrive later on, when we are expecting replies for other blocks SimpleHandler* sh = it->second->mHandler.get(); if (sh->HasRequest()) { // Not interested in the result - discard it sh->WaitOK(); } mQueueBlocks.push(it->second); } mMapBlocks.erase(mMapBlocks.begin(), iter); } //------------------------------------------------------------------------------ // Get pointer to async meta handler object //------------------------------------------------------------------------------ void* XrdIo::fileGetAsyncHandler() { return static_cast(mMetaHandler); } //------------------------------------------------------------------------------ // Run a space query command as statfs //------------------------------------------------------------------------------ int XrdIo::Statfs(struct statfs* sfs) { XrdCl::URL xUrl(mFilePath); XrdCl::FileSystem fs(xUrl); XrdCl::Buffer* response = 0; XrdCl::Buffer arg(xUrl.GetPath().size()); arg.FromString(xUrl.GetPath()); XrdCl::XRootDStatus status = fs.Query(XrdCl::QueryCode::Space, arg, response, (uint16_t) 15); errno = 0; if (!status.IsOK()) { eos_err("msg=\"failed to statfs remote XRootD\" url=\"%s\"", mFilePath.c_str()); mLastErrMsg = "failed to statfs remote XRootD"; mLastErrCode = status.code; mLastErrNo = status.errNo; errno = EREMOTEIO; return errno; } if (response) { // oss.cgroup=default&oss.space=469799256416256&oss.free=468894771826688& // oss.maxf=68719476736&oss.used=904484589568&oss.quota=469799256416256 XrdOucEnv spaceEnv(response->ToString().c_str()); unsigned long long free_bytes = 0; unsigned long long total_bytes = 0; if (spaceEnv.Get("oss.free")) { free_bytes = strtoull(spaceEnv.Get("oss.free"), 0, 10); } else { errno = EINVAL; return errno; } if (spaceEnv.Get("oss.space")) { total_bytes = strtoull(spaceEnv.Get("oss.space"), 0, 10); } else { errno = EINVAL; return errno; } #ifdef __APPLE__ sfs->f_iosize = 4096; sfs->f_bsize = sfs->f_iosize; sfs->f_blocks = (fsblkcnt_t)(total_bytes / sfs->f_iosize); sfs->f_bavail = (fsblkcnt_t)(free_bytes / sfs->f_iosize); #else sfs->f_frsize = 4096; sfs->f_bsize = sfs->f_frsize; sfs->f_blocks = (fsblkcnt_t)(total_bytes / sfs->f_frsize); sfs->f_bavail = (fsblkcnt_t)(free_bytes / sfs->f_frsize); #endif sfs->f_bfree = sfs->f_bavail; sfs->f_files = 1000000; sfs->f_ffree = 1000000; delete response; return 0; } else { errno = EREMOTEIO; return errno; } } //------------------------------------------------------------------------------ // **** Attribute Interface **** //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ // Set attr //------------------------------------------------------------------------------ int XrdIo::attrSet(const char* name, const char* value, size_t len) { if (!mAttrSync && mAttrLoaded) { std::string key = name; std::string val; val.assign(value, len); if (val == "#__DELETE_ATTR_#") { mFileMap.Remove(key); } else { // just modify mFileMap.Set(key, val); } mAttrDirty = true; return 0; } std::string lBlob; // download if (!XrdIo::Download(mAttrUrl, lBlob) || errno == ENOENT) { mAttrLoaded = true; if (mFileMap.Load(lBlob)) { std::string key = name; std::string val; if (val == "#__DELETE_ATTR_#") { mFileMap.Remove(key); } else { val.assign(value, len); mFileMap.Set(key, val); } mAttrDirty = true; if (mAttrSync) { std::string lMap = mFileMap.Trim(); if (!XrdIo::Upload(mAttrUrl, lMap)) { mAttrDirty = false; return SFS_OK; } else { eos_static_err("msg=\"unable to upload to remote file map\" url=\"%s\"", mAttrUrl.c_str()); } } } else { eos_static_err("msg=\"unable to parse remote file map\" url=\"%s\"", mAttrUrl.c_str()); errno = EINVAL; } } else { eos_static_err("msg=\"unable to download remote file map\" url=\"%s\"", mAttrUrl.c_str()); } return SFS_ERROR; } //------------------------------------------------------------------------------ // Set a string attribute (name has to start with 'user.' !!!) //------------------------------------------------------------------------------ int XrdIo::attrSet(string name, std::string value) { return attrSet(name.c_str(), value.c_str(), value.length()); } //------------------------------------------------------------------------------ // Get a binary attribute by name (name has to start with 'user.' !!!) //------------------------------------------------------------------------------ int XrdIo::attrGet(const char* name, char* value, size_t& size) { errno = 0; if (!mAttrSync && mAttrLoaded) { std::string val = mFileMap.Get(name); size_t len = val.length() + 1; if (len > size) { len = size; } memcpy(value, val.c_str(), len); eos_static_info("key=%s value=%s", name, value); return 0; } std::string lBlob; if (!XrdIo::Download(mAttrUrl, lBlob) || errno == ENOENT) { mAttrLoaded = true; if (mFileMap.Load(lBlob)) { std::string val = mFileMap.Get(name); size_t len = val.length() + 1; if (len > size) { len = size; } memcpy(value, val.c_str(), len); eos_static_info("key=%s value=%s", name, value); return SFS_OK; } } else { eos_static_err("msg=\"unable to download remote file map\" url=\"%s\"", mAttrUrl.c_str()); } return SFS_ERROR; } ///------------------------------------------------------------------------------ // Get a string attribute by name (name has to start with 'user.' !!!) //------------------------------------------------------------------------------ int XrdIo::attrGet(string name, std::string& value) { errno = 0; if (!mAttrSync && mAttrLoaded) { value = mFileMap.Get(name); return SFS_OK; } std::string lBlob; if (!XrdIo::Download(mAttrUrl, lBlob) || errno == ENOENT) { mAttrLoaded = true; if (mFileMap.Load(lBlob)) { value = mFileMap.Get(name); return SFS_OK; } } else { eos_static_err("msg=\"unable to download remote file map\" url=\"%s\"", mAttrUrl.c_str()); } return SFS_ERROR; } //------------------------------------------------------------------------------ // Delete a binary attribute by name //------------------------------------------------------------------------------ int XrdIo::attrDelete(const char* name) { errno = 0; return attrSet(name, "#__DELETE_ATTR_#"); } //------------------------------------------------------------------------------ // List all attributes for the associated path //------------------------------------------------------------------------------ int XrdIo::attrList(std::vector& list) { if (!mAttrSync && mAttrLoaded) { std::map lMap = mFileMap.GetMap(); for (auto it = lMap.begin(); it != lMap.end(); ++it) { list.push_back(it->first); } return 0; } std::string lBlob; if (!XrdIo::Download(mAttrUrl, lBlob) || errno == ENOENT) { mAttrLoaded = true; if (mFileMap.Load(lBlob)) { std::map lMap = mFileMap.GetMap(); for (auto it = lMap.begin(); it != lMap.end(); ++it) { list.push_back(it->first); } return 0; } } else { eos_static_err("msg=\"unable to download remote file map\" url=\"%s\"", mAttrUrl.c_str()); } return -1; } //-------------------------------------------------------------------------- // **** Traversing filesystem/storage routines **** //-------------------------------------------------------------------------- //-------------------------------------------------------------------------- // Open a cursor to traverse a storage system //-------------------------------------------------------------------------- FileIo::FtsHandle* XrdIo::ftsOpen() { XrdCl::URL url(mFilePath.c_str()); XrdCl::FileSystem fs(url); std::vector files; std::vector directories; XrdCl::XRootDStatus status = XrdIo::GetDirList(&fs, url, &files, &directories); if (!status.IsOK()) { eos_err("error=listing remote XrdClFile - %s", status.ToString().c_str()); errno = status.errNo; mLastErrMsg = status.ToString().c_str(); mLastErrCode = status.code; mLastErrNo = status.errNo; return 0; } FtsHandle* handle = new FtsHandle(mFilePath.c_str()); if (!handle) { return 0; } for (auto it = files.begin(); it != files.end(); ++it) { XrdOucString fname = it->c_str(); // Skip attribute files if (fname.beginswith(".") && fname.endswith(".xattr")) { continue; } handle->found_files.push_back(mFilePath + *it); } for (auto it = directories.begin(); it != directories.end(); ++it) { eos_info("adding dir=%s deepness=%d", (mFilePath + *it + "/").c_str(), handle->deepness); handle->found_dirs[0].push_back(mFilePath + *it + "/"); } return (FileIo::FtsHandle*)(handle); } //------------------------------------------------------------------------------ // Return the next path related to a traversal cursor obtained with ftsOpen //------------------------------------------------------------------------------ std::string XrdIo::ftsRead(FileIo::FtsHandle* fts_handle) { FtsHandle* handle = (FtsHandle*) fts_handle; if (!handle->found_files.size()) { do { XrdCl::XRootDStatus status; std::vector files; std::vector directories; auto dit = handle->found_dirs[handle->deepness].begin(); bool found = true; while (dit == handle->found_dirs[handle->deepness].end()) { // move to next level handle->deepness++; handle->found_dirs.resize(handle->deepness + 1); if (!handle->found_dirs[handle->deepness].size()) { found = false; break; } else { dit = handle->found_dirs[handle->deepness].begin(); } } if (!found) { break; } eos_info("searching at deepness=%d directory=%s", handle->deepness, dit->c_str()); std::string surl_dir = *dit; XrdCl::URL url(surl_dir); XrdCl::FileSystem fs(url); status = XrdIo::GetDirList(&fs, url, &files, &directories); if (!status.IsOK()) { eos_err("error=listing remote XrdClFile - %s", status.ToString().c_str()); errno = status.errNo; mLastErrMsg = status.ToString().c_str(); mLastErrCode = status.code; mLastErrNo = status.errNo; return ""; } else { handle->found_dirs[handle->deepness].erase(dit); } std::string new_file{""}; std::string new_dir{""}; for (auto it = files.begin(); it != files.end(); ++it) { XrdOucString fname = it->c_str(); if (fname.beginswith(".") && fname.endswith(".xattr")) { continue; } new_file = surl_dir + *it; eos_info("adding file=%s", new_file.c_str()); handle->found_files.push_back(new_file); } for (auto it = directories.begin(); it != directories.end(); ++it) { new_dir = surl_dir + *it + "/"; eos_info("adding dir=%s deepness=%d", new_dir.c_str(), handle->deepness + 1); handle->found_dirs[handle->deepness + 1].push_back(new_dir); } } while (!handle->found_files.size()); } if (handle->found_files.size()) { std::string new_path = handle->found_files.front(); handle->found_files.pop_front(); return new_path; } return ""; } //------------------------------------------------------------------------------ // Close a traversal cursor //------------------------------------------------------------------------------ int XrdIo::ftsClose(FileIo::FtsHandle* fts_handle) { FtsHandle* handle = (FtsHandle*) fts_handle; handle->found_files.clear(); handle->found_dirs.resize(1); handle->found_dirs[0].resize(1); handle->deepness = 0; return 0; } //------------------------------------------------------------------------------ // Download a remote file into a string object //------------------------------------------------------------------------------ int XrdIo::Download(std::string url, std::string& download) { errno = 0; static int s_blocksize = 65536; XrdIo io(url.c_str()); off_t offset = 0; std::string opaque; if (!io.fileOpen(0, 0, opaque, 10)) { ssize_t rbytes = 0; download.resize(s_blocksize); do { rbytes = io.fileRead(offset, (char*) download.c_str(), s_blocksize, 30); if (rbytes == s_blocksize) { download.resize(download.size() + 65536); } if (rbytes > 0) { offset += rbytes; } } while (rbytes == s_blocksize); io.fileClose(); download.resize(offset); return 0; } if (errno == 3011) { return 0; } return -1; } //------------------------------------------------------------------------------ // Upload a string object into a remote file //------------------------------------------------------------------------------ int XrdIo::Upload(std::string url, std::string& upload) { errno = 0; XrdIo io(url.c_str()); std::string opaque; int rc = 0; if (!io.fileOpen(SFS_O_WRONLY | SFS_O_CREAT, S_IRWXU | S_IRGRP | SFS_O_MKPTH, opaque, 10)) { eos_static_info("opened %s", url.c_str()); if ((io.fileWrite(0, upload.c_str(), upload.length(), 30)) != (ssize_t) upload.length()) { eos_static_err("failed to write %d", upload.length()); rc = -1; } else { eos_static_info("uploaded %d\n", upload.length()); } io.fileClose(); } else { eos_static_err("failed to open %s", url.c_str()); rc = -1; } return rc; } //------------------------------------------------------------------------------ // Get a list of files and a list of directories inside a remote directory //------------------------------------------------------------------------------ XrdCl::XRootDStatus XrdIo::GetDirList(XrdCl::FileSystem* fs, const XrdCl::URL& url, std::vector* files, std::vector* directories) { eos_info("url=%s", url.GetURL().c_str()); using namespace XrdCl; DirectoryList* list; XrdCl::XRootDStatus status; status = fs->DirList(url.GetPath(), DirListFlags::Stat, list); if (!status.IsOK()) { return status; } for (DirectoryList::Iterator it = list->Begin(); it != list->End(); ++it) { if ((*it)->GetStatInfo()->TestFlags(StatInfo::IsDir)) { std::string directory = (*it)->GetName(); directories->push_back(directory); } else { std::string file = (*it)->GetName(); files->push_back(file); } } return XRootDStatus(); } //------------------------------------------------------------------------------ // Process opaque info //------------------------------------------------------------------------------ std::string XrdIo::BuildRequestUrl() const { using namespace std::chrono; // Add extra capability expiration time based on the XRD_STREAMTIMEOUT value uint64_t xrdcl_streamtimeout = XrdCl::DefaultStreamTimeout; std::string env_val; if (XrdCl::DefaultEnv::GetEnv()->GetString("StreamTimeout", env_val)) { try { xrdcl_streamtimeout = std::stoull(env_val); } catch (...) {} } auto now = system_clock::now(); auto valid_sec = time_point_cast(now).time_since_epoch().count() + xrdcl_streamtimeout - 1; std::ostringstream oss; oss << mFilePath << "?" << "fst.valid=" << valid_sec << "&" << mOpaque; return oss.str(); } EOSFSTNAMESPACE_END