//------------------------------------------------------------------------------ //! @file XrdIo.hh //! @author Elvin-Alin Sindrilaru - CERN //! @brief Class used for doing remote IO operations unsing the xrd client //------------------------------------------------------------------------------ /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2011 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #ifndef __EOSFST_XRDFILEIO_HH__ #define __EOSFST_XRDFILEIO_HH__ #include "fst/io/FileIo.hh" #include "fst/io/SimpleHandler.hh" #include "common/FileMap.hh" #include "common/XrdConnPool.hh" #include "common/BufferManager.hh" #include "XrdCl/XrdClFile.hh" #include namespace eos { namespace common { class BufferManager; class Buffer; } } EOSFSTNAMESPACE_BEGIN //! Forward declarations class XrdIo; class AsyncMetaHandler; //------------------------------------------------------------------------------ //! Struct that holds a readahead buffer and corresponding handler //------------------------------------------------------------------------------ struct ReadaheadBlock { //---------------------------------------------------------------------------- //! Constuctor //! //! @param blocksize the size of the readahead //! @param buf_mgr buffer manager, if null buffers are allocated on demand //! @param handler pre-allocated handler //---------------------------------------------------------------------------- ReadaheadBlock(uint64_t blocksize, eos::common::BufferManager* buf_mgr = nullptr, SimpleHandler* hd = nullptr); //---------------------------------------------------------------------------- //! Update current request //! //! @param offset offset //! @param length length //! @param isWrite true if write request, otherwise false //---------------------------------------------------------------------------- void Update(uint64_t offset, uint32_t length, bool isWrite) { mHandler->Update(offset, length, isWrite); } //---------------------------------------------------------------------------- //! Get pointer to the underlying data buffer //---------------------------------------------------------------------------- char* GetDataPtr(); //---------------------------------------------------------------------------- //! Destructor //---------------------------------------------------------------------------- virtual ~ReadaheadBlock(); eos::common::BufferManager* mBufMgr; ///< Buffer manager object std::shared_ptr mBuffer; ///< Current data block std::unique_ptr mHandler; ///< Async handler for the requests }; typedef std::map PrefetchMap; //------------------------------------------------------------------------------ //! Class used for handling asynchronous open responses //------------------------------------------------------------------------------ class AsyncIoOpenHandler: public XrdCl::ResponseHandler, public eos::common::LogId { public: //---------------------------------------------------------------------------- //! Constructor //! //! @param io_file file object //! @param layout_handler handler for the layout object //---------------------------------------------------------------------------- AsyncIoOpenHandler(XrdIo* io_file, XrdCl::ResponseHandler* layout_handler) : mFileIO(io_file), mLayoutOpenHandler(layout_handler) {} //---------------------------------------------------------------------------- //! Destructor //---------------------------------------------------------------------------- virtual ~AsyncIoOpenHandler() {} //---------------------------------------------------------------------------- //! Called when a response to associated request arrives or an error occurs //! //! @param status status of the request //! @param response an object associated with the response (request dependent) //! @param hostList list of hosts the request was redirected to //--------------------------------------------------------------------------- virtual void HandleResponseWithHosts(XrdCl::XRootDStatus* status, XrdCl::AnyObject* response, XrdCl::HostList* hostList); private: XrdIo* mFileIO; ///< File IO object corresponding to this handler XrdCl::ResponseHandler* mLayoutOpenHandler; ///< Open handler for the layout }; //------------------------------------------------------------------------------ //! Class used for doing remote IO operations using the Xrd client //------------------------------------------------------------------------------ class XrdIo : public FileIo { friend class AsyncIoOpenHandler; public: //---------------------------------------------------------------------------- //! Constructor //! //! @param path path or URI for the file //---------------------------------------------------------------------------- XrdIo(std::string path); //---------------------------------------------------------------------------- //! Destructor //---------------------------------------------------------------------------- virtual ~XrdIo(); //---------------------------------------------------------------------------- //! Open file - synchronously //! //! @param flags open flags //! @param mode open mode //! @param opaque opaque information //! @param timeout timeout value //! //! @return 0 if successful, -1 otherwise and error code is set //---------------------------------------------------------------------------- int fileOpen(XrdSfsFileOpenMode flags, mode_t mode = 0, const std::string& opaque = "", uint16_t timeout = 0) override; //---------------------------------------------------------------------------- //! Open file asynchronously //! //! @param flags open flags //! @param mode open mode //! @param opaque opaque info to be appended to the request //! @param timeout operation timeout //! //! @return future holding the status response //-------------------------------------------------------------------------- std::future fileOpenAsync(XrdSfsFileOpenMode flags, mode_t mode = 0, const std::string& opaque = "", uint16_t timeout = 0) override; //---------------------------------------------------------------------------- //! Read from file - sync //! //! @param offset offset in file //! @param buffer where the data is read //! @param length read length //! @param timeout timeout value //! //! @return number of bytes read or -1 if error //---------------------------------------------------------------------------- int64_t fileRead(XrdSfsFileOffset offset, char* buffer, XrdSfsXferSize length, uint16_t timeout = 0); //---------------------------------------------------------------------------- //! Write to file - sync //! //! @param offset offset //! @param buffer data to be written //! @param length length //! @param timeout timeout value //! //! @return number of bytes written or -1 if error //---------------------------------------------------------------------------- int64_t fileWrite(XrdSfsFileOffset offset, const char* buffer, XrdSfsXferSize length, uint16_t timeout = 0); //---------------------------------------------------------------------------- //! Vector read - sync //! //! @param chunkList list of chunks for the vector read //! @param timeout timeout value //! //! @return number of bytes read of -1 if error //---------------------------------------------------------------------------- int64_t fileReadV(XrdCl::ChunkList& chunkList, uint16_t timeout = 0); //---------------------------------------------------------------------------- //! Vector read - async //! //! @param chunkList list of chunks for the vector read //! @param timeout timeout value //! //! @return 0(SFS_OK) if request successfully sent, otherwise -1(SFS_ERROR) //---------------------------------------------------------------------------- int64_t fileReadVAsync(XrdCl::ChunkList& chunkList, uint16_t timeout = 0); //---------------------------------------------------------------------------- //! Read from file asynchronously //! //! @param offset offset in file //! @param buffer where the data is read //! @param length read length //! @param timeout timeout value //! //! @return number of bytes read or -1 if error //! @note The buffer given by the user is not neccessarily populated with //! any meaningful data when this function returns. The user should call //! fileWaitAsyncIO to enforce this guarantee. //---------------------------------------------------------------------------- int64_t fileReadAsync(XrdSfsFileOffset offset, char* buffer, XrdSfsXferSize length, uint16_t timeout = 0); //---------------------------------------------------------------------------- //! Read from file with prefetching //! //! @param offset offset in file //! @param buffer where the data is read //! @param length read length //! @param timeout timeout value //! //! @return number of bytes read or -1 if error //---------------------------------------------------------------------------- int64_t fileReadPrefetch(XrdSfsFileOffset offset, char* buffer, XrdSfsXferSize length, uint16_t timeout = 0); //---------------------------------------------------------------------------- //! Write to file - async //! //! @param offset offset //! @param buffer data to be written //! @param length length //! @param timeout timeout value //! //! @return number of bytes written or -1 if error //---------------------------------------------------------------------------- int64_t fileWriteAsync(XrdSfsFileOffset offset, const char* buffer, XrdSfsXferSize length, uint16_t timeout = 0); //---------------------------------------------------------------------------- //! Write to file - async //! //! @param offset offset //! @param buffer data to be written //! @param length length //! //! @return future holding the status response //-------------------------------------------------------------------------- std::future fileWriteAsync(const char* buffer, XrdSfsFileOffset offset, XrdSfsXferSize length); //-------------------------------------------------------------------------- //! Wait for all async IO //! //! @return global return code of async IO //-------------------------------------------------------------------------- virtual int fileWaitAsyncIO(); //---------------------------------------------------------------------------- //! Truncate //! //! @param offset truncate file to this value //! @param timeout timeout value //! //! @return 0 if successful, -1 otherwise and error code is set //---------------------------------------------------------------------------- int fileTruncate(XrdSfsFileOffset offset, uint16_t timeout = 0); //---------------------------------------------------------------------------- //! Truncate asynchronous //! //! @param offset truncate file to this value //! @param timeout timeout value //! //! @return future holding the status response //---------------------------------------------------------------------------- std::future fileTruncateAsync(XrdSfsFileOffset offset, uint16_t timeout = 0); //---------------------------------------------------------------------------- //! Allocate file space //! //! @param length space to be allocated //! //! @return 0 on success, -1 otherwise and error code is set //---------------------------------------------------------------------------- int fileFallocate(XrdSfsFileOffset length) { return 0; } //---------------------------------------------------------------------------- //! Deallocate file space //! //! @param fromOffset offset start //! @param toOffset offset end //! //! @return 0 on success, -1 otherwise and error code is set //---------------------------------------------------------------------------- int fileFdeallocate(XrdSfsFileOffset fromOffset, XrdSfsFileOffset toOffset) { return 0; } //---------------------------------------------------------------------------- //! Remove file //! //! @param timeout timeout value //! //! @return 0 on success, -1 otherwise and error code is set //---------------------------------------------------------------------------- int fileRemove(uint16_t timeout = 0); //---------------------------------------------------------------------------- //! Delete not openedfile //! //! @param timeout timeout value //! //! @return 0 on success, -1 otherwise and error code is set //---------------------------------------------------------------------------- int fileDelete(const char* url); //---------------------------------------------------------------------------- //! Sync file to disk //! //! @param timeout timeout value //! //! @return 0 on success, -1 otherwise and error code is set //---------------------------------------------------------------------------- int fileSync(uint16_t timeout = 0); //---------------------------------------------------------------------------- //! Get pointer to async meta handler object //! //! @return pointer to async handler, NULL otherwise //---------------------------------------------------------------------------- void* fileGetAsyncHandler(); //---------------------------------------------------------------------------- //! Check for the existence of a file //! //! @param path to the file //! //! @return 0 on success, -1 otherwise and error code is set //---------------------------------------------------------------------------- int fileExists(); //---------------------------------------------------------------------------- //! Close file //! //! @param timeout timeout value //! //! @return 0 on success, -1 otherwise and error code is set //---------------------------------------------------------------------------- int fileClose(uint16_t timeout = 0); //---------------------------------------------------------------------------- //! Get stats about the file //! //! @param buf stat buffer //! @param timeout timeout value //! //! @return 0 on success, -1 otherwise and error code is set //---------------------------------------------------------------------------- int fileStat(struct stat* buf, uint16_t timeout = 0); //---------------------------------------------------------------------------- //! Execute implementation dependant commands //! //! @param buf stat buffer //! @param timeout timeout value //! //! @return 0 on success, -1 otherwise and error code is set //---------------------------------------------------------------------------- int fileFctl(const std::string& cmd, uint16_t timeout = 0); //---------------------------------------------------------------------------- //! Set a binary attribute (name has to start with 'user.' !!!) //! //! @param name attribute name //! @param value attribute value //! @param len value length //! //! @return 0 on success, -1 otherwise and error code is set //---------------------------------------------------------------------------- int attrSet(const char* name, const char* value, size_t len); //---------------------------------------------------------------------------- //! Set a binary attribute (name has to start with 'user.' !!!) //! //! @param name attribute name //! @param value attribute value //! //! @return 0 on success, -1 otherwise and error code is set //---------------------------------------------------------------------------- int attrSet(string name, std::string value); //---------------------------------------------------------------------------- //! Get a binary attribute by name //! //! @param name attribute name //! @param value contains attribute value upon success //! @param size the buffer size, after success the value size //! //! @return 0 on success, -1 otherwise and error code is set //---------------------------------------------------------------------------- int attrGet(const char* name, char* value, size_t& size); //---------------------------------------------------------------------------- //! Get a binary attribute by name //! //! @param name attribute name //! @param value contains attribute value upon success //! //! @return 0 on success, -1 otherwise and error code is set //---------------------------------------------------------------------------- int attrGet(string name, std::string& value); //---------------------------------------------------------------------------- //! Delete a binary attribute by name //! //! @param name attribute name //! //! @return 0 on success, -1 otherwise and error code is set //---------------------------------------------------------------------------- int attrDelete(const char* name); //---------------------------------------------------------------------------- //! List all attributes for the associated path //! //! @param list contains all attribute names for the set path upon success //! //! @return 0 on success, -1 otherwise and error code is set //---------------------------------------------------------------------------- int attrList(std::vector& list); //---------------------------------------------------------------------------- //! Set attribute synchronization mode //! //! @param on if true - every set attributes runs 'pull-modify-push', //! otherwise the destructor finished a 'pull-modify-modify-....-push' //! sequence //---------------------------------------------------------------------------- void setAttrSync(bool mode = false) { mAttrSync = mode; } //---------------------------------------------------------------------------- //! Get block size used for read-ahead //! //! @return : default block size //---------------------------------------------------------------------------- int32_t GetBlockSize() { return mBlocksize; } //---------------------------------------------------------------------------- //! Plug-in function to fill a statfs structure about the storage filling //! state //! //! @param path to statfs //! @param statfs return struct //! //! @return 0 if successful otherwise errno //---------------------------------------------------------------------------- int Statfs(struct statfs* statFs); //---------------------------------------------------------------------------- //! Traversing filesystem/storage routines - FTS search handle //---------------------------------------------------------------------------- class FtsHandle { friend class XrdIo; protected: std::vector< std::vector > found_dirs; std::deque< std::string> found_files; size_t deepness; public: FtsHandle(const char* dirp) { found_dirs.resize(1); deepness = 0; }; virtual ~FtsHandle() = default; }; //---------------------------------------------------------------------------- //! Open a curser to traverse a storage system //! //! @param subtree where to start traversing //! //! @return returns implementation dependent handle or 0 in case of error //---------------------------------------------------------------------------- FileIo::FtsHandle* ftsOpen(); //---------------------------------------------------------------------------- //! Return the next path related to a traversal cursor obtained with ftsOpen //! //! @param fts_handle cursor obtained by ftsOpen //! //! @return returns implementation dependent handle or 0 in case of error //---------------------------------------------------------------------------- std::string ftsRead(FileIo::FtsHandle* fts_handle); //---------------------------------------------------------------------------- //! Close a traversal cursor //! //! @param fts_handle cursor to close //! //! @return 0 if fts_handle was an open cursor, otherwise -1 //---------------------------------------------------------------------------- virtual int ftsClose(FileIo::FtsHandle* fts_handle); #ifdef IN_TEST_HARNESS public: #else private: #endif static eos::common::XrdConnPool mXrdConnPool; ///< Xrd connection pool bool mDoReadahead; ///< mark if readahead is enabled const uint32_t mNumRdAheadBlocks; ///< no. of blocks used for readahead int32_t mBlocksize; ///< block size for rd/wr opertations XrdCl::File* mXrdFile; ///< handler to xrd file AsyncMetaHandler* mMetaHandler; ///< async requests meta handler PrefetchMap mMapBlocks; ///< map of block read/prefetched std::queue mQueueBlocks; ///< queue containing available blocks XrdSysMutex mPrefetchMutex; ///< mutex to serialise the prefetch step eos::common::FileMap mFileMap; ///< extended attribute file map std::string mAttrUrl; ///< extended attribute url std::string mOpaque; ///< opaque tags in original url bool mAttrLoaded; ///< mark if remote attributes have been loaded bool mAttrDirty; ///< mark if local attr modfied and not committed bool mAttrSync; ///< mark if attributes are updated synchronously XrdCl::URL mTargetUrl; ///< URL used to avoid physical connection sharing ///< RAAI helper for connection ids std::unique_ptr mXrdIdHelper; XrdCl::XRootDStatus mWriteStatus; uint64_t mPrefetchOffset; ///< Last block offset of a prefetch hit uint64_t mPrefetchHits; ///< Number of prefetch hits uint64_t mPrefetchBlocks; ///< Number of prefetched blocks //---------------------------------------------------------------------------- //! Method used to prefetch the next block using the readahead mechanism //! //! @param offset begin offset of the current block we are reading //! @param timeout timeout value //! //! @return true if prefetch request was sent, otherwise false //---------------------------------------------------------------------------- bool PrefetchBlock(int64_t offset, uint16_t timeout = 0); //---------------------------------------------------------------------------- //! Try to find a block in cache with contains the provided offset //! //! @param offset offset to be searched for //! //! @return iterator to the block containing the offset or if no such block //! is found we return the iterator to the end of the map //---------------------------------------------------------------------------- PrefetchMap::iterator FindBlock(uint64_t offset); //------------------------------------------------------------------------------ //! Recycle blocks from the map that are not useful since the current offset //! is already grater then their offset //! //! @param iter iterator in the map of prefetched blocks //------------------------------------------------------------------------------ void RecycleBlocks(std::map::iterator iter); //---------------------------------------------------------------------------- //! Download a remote file into a string object //! //! @param url from where to download //! @param download string where to place the contents //! //! @return 0 success, otherwise -1 and errno //---------------------------------------------------------------------------- static int Download(std::string url, std::string& download); //---------------------------------------------------------------------------- //! Upload a string object into a remote file //! //! @param url from where to upload //! @param upload string to store into remote file //! //! @return 0 success, otherwise -1 and errno //---------------------------------------------------------------------------- static int Upload(std::string url, std::string& upload); //---------------------------------------------------------------------------- //! Get a directory listing - taken from XrdCl sources //---------------------------------------------------------------------------- XrdCl::XRootDStatus GetDirList(XrdCl::FileSystem* fs, const XrdCl::URL& url, std::vector* files, std::vector* directories); //---------------------------------------------------------------------------- //! Build request URL containing the full endpoint with path and any extra //! opaque information //! //! @param opaque input opaque information //! @param out output containing the path with any additional opaque info //---------------------------------------------------------------------------- std::string BuildRequestUrl() const; //---------------------------------------------------------------------------- //! Disable copy constructor //---------------------------------------------------------------------------- XrdIo(const XrdIo&) = delete; //---------------------------------------------------------------------------- //! Disable assign operator //---------------------------------------------------------------------------- XrdIo& operator = (const XrdIo&) = delete; }; //------------------------------------------------------------------------------ //! Class BufferAllocateException //------------------------------------------------------------------------------ class BufferAllocateException: public std::exception { public: const char* what() const noexcept override { return "failed to allocate buffer"; } }; //------------------------------------------------------------------------------ //! Class XrdIoHandler //------------------------------------------------------------------------------ class XrdIoHandler: public XrdCl::ResponseHandler { public: enum class OpType { None, Write, Truncate, Open }; //---------------------------------------------------------------------------- //! Constructor //! //! @param promise promise object used to notify when the answer arrives //! @param op type of operation for current handler //! @param buf_mgr buffer manager object that supplies/recycles buffers //! @param buffer user buffer object to be dulicated if valid and needed //! @param buffer_len useful contents of the buffer to be used //---------------------------------------------------------------------------- XrdIoHandler(std::promise&& promise, OpType op, eos::common::BufferManager* buf_mgr = nullptr, const char* buffer = nullptr, unsigned long long buffer_len = 0ull): mOperationType(op), mBufMgr(buf_mgr), mBuffer(nullptr) { if (mBufMgr && buffer && buffer_len) { int attempts = 5; do { mBuffer = mBufMgr->GetBuffer(buffer_len); if (mBuffer) { mBuffer->mLength = buffer_len; (void) memcpy(mBuffer->GetDataPtr(), buffer, buffer_len); break; } else { std::this_thread::sleep_for(std::chrono::seconds(1)); } } while (--attempts); if (!mBuffer) { throw BufferAllocateException(); } } // The promise must be swaped after any potential exception is thrown // otherwise the caller will be left with an invalid promise. std::swap(mPromise, promise); } //---------------------------------------------------------------------------- //! Destructor //---------------------------------------------------------------------------- ~XrdIoHandler() { if (mBufMgr && mBuffer) { mBufMgr->Recycle(mBuffer); } } //---------------------------------------------------------------------------- //! Handle response //! //! @param pStatus status of the response //! @param pResponse object containing extra info about the response //---------------------------------------------------------------------------- virtual void HandleResponse(XrdCl::XRootDStatus* pStatus, XrdCl::AnyObject* pResponse) { if (pStatus) { mPromise.set_value(*pStatus); delete pStatus; } else { mPromise.set_value(XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errUnknown, EINVAL, "no status returned")); } if (pResponse) { delete pResponse; } delete this; } //---------------------------------------------------------------------------- //! Get pointer to the underlying data buffer //---------------------------------------------------------------------------- char* GetDataPtr() { if (mBuffer) { return mBuffer->GetDataPtr(); } return nullptr; } private: OpType mOperationType; std::promise mPromise; eos::common::BufferManager* mBufMgr; std::shared_ptr mBuffer; }; EOSFSTNAMESPACE_END #endif // __EOSFST_XRDFILEIO_HH__