//------------------------------------------------------------------------------ //! @file: XrdFstOfsFile.hh //------------------------------------------------------------------------------ /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2019 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #ifndef __EOSFST_FSTOFSFILE_HH__ #define __EOSFST_FSTOFSFILE_HH__ #include "fst/Namespace.hh" #include "fst/storage/Storage.hh" #include "fst/checksum/CheckSum.hh" #include "fst/utils/TpcInfo.hh" #include "common/Fmd.hh" #include "common/FileId.hh" #include "common/SymKeys.hh" #include "XrdVersion.hh" #include "XrdOfs/XrdOfs.hh" #include "XrdOfs/XrdOfsTPCInfo.hh" #include "XrdOuc/XrdOucString.hh" #include namespace eos { namespace fst { class HttpHandler; class HttpServer; class S3Handler; } }; namespace eos { namespace common { class FmdHelper; } } EOSFSTNAMESPACE_BEGIN; // This defines for reports what is a large seek e.g. > 128 kB = default RA size #define EOS_FSTOFS_LARGE_SEEKS 128*1024ll // Forward declaration class Layout; class CheckSum; #if XrdMajorVNUM( XrdVNUMBER ) > 4 #define XrdOfsFileBase XrdOfsFileFull #else #define XrdOfsFileBase XrdOfsFile #endif //------------------------------------------------------------------------------ //! Class XrdFstOfsFile //------------------------------------------------------------------------------ class XrdFstOfsFile : public XrdOfsFileBase, public eos::common::LogId { friend class ReplicaParLayout; friend class RainMetaLayout; friend class RaidDpLayout; friend class ReedSLayout; friend class LocalIo; friend class HttpHandler; friend class HttpServer; friend class S3Handler; public: static constexpr uint16_t msDefaultTimeout {300}; static int LayoutReadCB(eos::fst::CheckSum::ReadCallBack::callback_data_t* cbd); static int FileIoReadCB(eos::fst::CheckSum::ReadCallBack::callback_data_t* cbd); //---------------------------------------------------------------------------- //! Constructor //! //! @param user user identity (tident) //! @param MonID monitoring id //---------------------------------------------------------------------------- XrdFstOfsFile(const char* user, int MonID = 0); //---------------------------------------------------------------------------- //! Destructor //---------------------------------------------------------------------------- virtual ~XrdFstOfsFile(); //---------------------------------------------------------------------------- //! Open file - see XrdSfsInterface.hh for description of parameters //! //! @return One of SFS_OK, SFS_ERROR, SFS_REDIRECT, SFS_STALL, or SFS_STARTED //---------------------------------------------------------------------------- int open(const char* fileName, XrdSfsFileOpenMode openMode, mode_t createMode, const XrdSecEntity* client, const char* opaque = 0) override; //---------------------------------------------------------------------------- //! Read from file //---------------------------------------------------------------------------- XrdSfsXferSize read(XrdSfsFileOffset fileOffset, char* buffer, XrdSfsXferSize buffer_size) override; //---------------------------------------------------------------------------- //! Read AIO - not supported //---------------------------------------------------------------------------- int read(XrdSfsAio* aioparm) override; //---------------------------------------------------------------------------- //! Read file pages into a buffer and return corresponding checksums. //! //! @param offset - The offset where the read is to start. It may be //! unaligned with certain caveats relative to csvec. //! @param buffer - pointer to buffer where the bytes are to be placed. //! @param rdlen - The number of bytes to read. The amount must be an //! integral number of XrdSfsPage::Size bytes. //! @param csvec - A vector of entries to be filled with the cooresponding //! CRC32C checksum for each page. However, if the offset is //! unaligned, then csvec[0] contains the crc for the page //! fragment that brings it to alignment for csvec[1]. //! It must be sized to hold all aligned XrdSys::Pagesize //! crc's plus additional ones for leading and ending page //! fragments, if any. //! @param opts - Processing options (see above). //! //! @return >= 0 The number of bytes that placed in buffer. //! @return SFS_ERROR File could not be read, error holds the reason. //---------------------------------------------------------------------------- XrdSfsXferSize pgRead(XrdSfsFileOffset offset, char* buffer, XrdSfsXferSize rdlen, uint32_t* csvec, uint64_t opts = 0) override; //---------------------------------------------------------------------------- //! Read file pages and checksums using asynchronous I/O - NOT SUPPORTED //! //! @param aioparm - Pointer to async I/O object controlling the I/O. //! @param opts - Processing options (see above). //! //! @return SFS_OK Request accepted and will be scheduled. //! @return SFS_ERROR File could not be read, error holds the reason. //----------------------------------------------------------------------------- int pgRead(XrdSfsAio* aioparm, uint64_t opts = 0) override; //---------------------------------------------------------------------------- //! Pre-read blocks into file system cache //---------------------------------------------------------------------------- int read(XrdSfsFileOffset fileOffset, XrdSfsXferSize amount) override; //---------------------------------------------------------------------------- //! Vector read - OFS interface method //! //! @param readV vector read structure //! @param readCount number of entries in the vector read structure //! //! @return number of bytes read upon success, otherwise SFS_ERROR //! //---------------------------------------------------------------------------- XrdSfsXferSize readv(XrdOucIOVec* readV, int readCount) override; //---------------------------------------------------------------------------- //! Write to file //---------------------------------------------------------------------------- XrdSfsXferSize write(XrdSfsFileOffset fileOffset, const char* buffer, XrdSfsXferSize buffer_size) override; //---------------------------------------------------------------------------- //! Write AIO - no supported //---------------------------------------------------------------------------- int write(XrdSfsAio* aioparm) override; //---------------------------------------------------------------------------- //! Write file pages into a file with corresponding checksums. //! //! @param offset - The offset where the write is to start. It may be //! unaligned with certain caveats relative to csvec. //! @param buffer - pointer to buffer containing the bytes to write. //! @param wrlen - The number of bytes to write. If amount is not an //! integral number of XrdSys::PageSize bytes, then this must //! be the last write to the file at or above the offset. //! @param csvec - A vector which contains the corresponding CRC32 checksum //! for each page or page fragment. If offset is unaligned //! then csvec[0] is the crc of the leading fragment to //! align the subsequent full page who's crc is in csvec[1]. //! It must be sized to hold all aligned XrdSys::Pagesize //! crc's plus additional ones for leading and ending page //! fragments, if any. //! @param opts - Processing options (see above). //! //! @return >= 0 The number of bytes written. //! @return SFS_ERROR File could not be read, error holds the reason. //---------------------------------------------------------------------------- XrdSfsXferSize pgWrite(XrdSfsFileOffset offset, char* buffer, XrdSfsXferSize wrlen, uint32_t* csvec, uint64_t opts = 0) override; //---------------------------------------------------------------------------- //! Write file pages and checksums using asynchronous I/O - NOT SUPPORTED //! //! @param aioparm - Pointer to async I/O object controlling the I/O. //! @param opts - Processing options (see above). //! //! @return SFS_OK Request accepted and will be scheduled. //! @return SFS_ERROR File could not be read, error holds the reason. //---------------------------------------------------------------------------- int pgWrite(XrdSfsAio* aioparm, uint64_t opts = 0) override; //---------------------------------------------------------------------------- //! Get file stat information //! //! @param buf pointer to the structure where info it to be returned //! //! @return One of SFS_OK, SFS_ERROR, SFS_REDIRECT, or SFS_STALL. When //! SFS_OK is returned, buf must hold stat information //---------------------------------------------------------------------------- int stat(struct stat* buf) override; //---------------------------------------------------------------------------- //! Sync file //! //! @return One of SFS_OK, SFS_ERROR, SFS_REDIRECT, SFS_STALL, or SFS_STARTED //---------------------------------------------------------------------------- int sync() override; //---------------------------------------------------------------------------- //! Sync AIO //---------------------------------------------------------------------------- int sync(XrdSfsAio* aiop) override; //---------------------------------------------------------------------------- //! Truncate file //! //! @param fsize truncate size of the file //! //! @return One of SFS_OK, SFS_ERROR, SFS_REDIRECT, or SFS_STALL //---------------------------------------------------------------------------- int truncate(XrdSfsFileOffset fileOffset) override; //---------------------------------------------------------------------------- //! Close file //---------------------------------------------------------------------------- int close() override; //---------------------------------------------------------------------------- //! Execute special operation on the file (version 2) //! //! @param cmd - The operation to be performed: //! SFS_FCTL_SPEC1 Perform implementation defined action //! @param alen - Length of data pointed to by args. //! @param args - Data sent with request, zero if alen is zero. //! @param client - Client's identify (see common description). //! //! @return SFS_OK a null response is sent. //! @return SFS_DATA error.code length of the data to be sent. //! error.message contains the data to be sent. //! o/w one of SFS_ERROR, SFS_REDIRECT, or SFS_STALL. //---------------------------------------------------------------------------- int fctl(const int cmd, int alen, const char* args, const XrdSecEntity* client = 0) override; //---------------------------------------------------------------------------- //! Return logical path //---------------------------------------------------------------------------- std::string GetPath() const { return mNsPath.c_str(); } //---------------------------------------------------------------------------- //! Return the file system id //---------------------------------------------------------------------------- unsigned long GetFileSystemId() const { return mFsId; } private: #ifdef IN_TEST_HARNESS public: #endif eos::common::SymKey::hmac_t mHmac; std::unique_ptr mOpenOpaque; ///< Open opaque info (encrypted) std::unique_ptr mCapOpaque; ///< Capability opaque info (decrypted) std::string mFstPath; ///< Physical path on the FST off_t mBookingSize; off_t mTargetSize; off_t mMinSize; off_t mMaxSize; bool viaDelete; bool mWrDelete; ///< Flag file to be deleted due to write errors uint64_t mRainSize; ///< Rain file size used during reconstruction XrdOucString mNsPath; /// Logical file path (from the namespace) XrdOucString mLocalPrefix; ///< Prefix on the local storage XrdOucString mRedirectManager; ///< Manager host where we bounce back bool mTapeEnabled; ///< True if tape support is enabled XrdOucString mSecString; ///< string containing security summary std::map mSecMap; ///< map of all sec keys std::string mEtag; ///< Current and new ETag (recomputed in close) unsigned long long mFileId; //! file id unsigned long mFsId; //! file system id unsigned long mLid; //! layout id unsigned long long mCid; //! container id unsigned long long mForcedMtime; unsigned long long mForcedMtime_ms; bool mFusex; //! indicator that we are committing from a fusex client bool mFusexIsUnlinked; //! indicator for an already unlinked file bool closed; //! indicator the file is closed bool mOpened; //! indicator that file is opened bool mHasWrite; //! indicator that file was written/modified bool hasWriteError;// indicator for write errors to avoid message flooding bool hasReadError; //! indicator if a RAIN file could be reconstructed or not bool mIsRW; //! indicator that file is opened for rw bool mIsDevNull; ///< If true file act as a sink i.e. /dev/null bool isCreation; //! indicator that a new file is created bool isReplication; //! indicator that the opened file is a replica transfer bool noAtomicVersioning; //! indicate to disable atomic/versioning during commit //! Indicate that the opened file is a file injection where the size and //! checksum must match bool mIsInjection; bool mRainReconstruct; ///< indicator that the opened file is in a RAIN reconstruction process bool deleteOnClose; ///< indicator that the file has to be cleaned on close bool repairOnClose; ///< indicator that the file should get repaired on close bool mIsOCchunk; //! indicator this is an OC chunk upload int writeErrorFlag; //! uses kOFSxx enums to specify an error condition bool mEventOnClose; ///< Indicator to send a specified event to MGM on close bool mSyncOnClose; ///< Indicator to run a fsync on close //! Indicates the workflow to be triggered by an event XrdOucString mEventWorkflow; bool mSyncEventOnClose; //! Indicator to send a specified event to the mgm on close std::string mEventInstance; uint32_t mEventOwnerUid; uint32_t mEventOwnerGid; std::string mEventRequestor; std::string mEventRequestorGroup; std::string mEventAttributes; int mIoPriorityValue; int mIoPriorityClass; bool mIoPriorityErrorReported; std::string mAppRR; enum { kOfsIoError = 1, //! generic IO error kOfsMaxSizeError = 2, //! maximum file size error kOfsDiskFullError = 3, //! disk full error kOfsSimulatedIoError = 4, //! simulated IO error kOfsFsRemovedError = 5, //! filesystem has been unregistered kOfsSimulatedCloseError = 6, //! simulated error when closing the file kOfsSimulatedUnresponsive = 7 // ! simulated unresponsivness of an FST }; ///< In-memory file meta data object std::unique_ptr mFmd; std::unique_ptr mCheckSum; ///< Checksum object // @todo(esindril) this is not properly enforced everywhere ... XrdSysMutex mChecksumMutex; ///< Mutex protecting the checksum class std::unique_ptr mLayout; ///< Layout object std::string mTident; ///< Client identity using the file object // File statistics for monitoring purposes //! Largest byte position written of a newly created file unsigned long long mMaxOffsetWritten; unsigned long long mWritePosition; off_t openSize; //! file size when the file was opened off_t closeSize; //! file size when the file was closed struct timeval openTime; //! time when a file was opened struct timeval currentTime; //! time when a write occurs unsigned long long totalBytes; //! total bytes IO unsigned long long msSleep; //! total ms sleeping during io struct timeval closeTime; //! time when a file was closed struct timeval closeStart; //! time when a file close started struct timeval closeStop; //! time when a file close stopped struct timezone tz; //! timezone int mBandwidth; //! bandwidth limitation setting XrdSysMutex vecMutex; //! mutex protecting the rvec/wvec variables //! vector with all read sizes -> to compute sigma,min,max,total std::vector rvec; //! vector with all write sizes -> to compute sigma,min,max,total std::vector wvec; unsigned long long rBytes; //! sum bytes read unsigned long long wBytes; //! sum bytes written unsigned long long sFwdBytes; //! sum bytes seeked forward unsigned long long sBwdBytes; //! sum bytes seeked backward //! sum bytes with large forward seeks (> EOS_FSTOFS_LARGE_SEEKS) unsigned long long sXlFwdBytes; //! sum bytes with large backward seeks (> EOS_FSTOFS_LARGE_SEEKS) unsigned long long sXlBwdBytes; unsigned long rCalls; //! number of read calls unsigned long wCalls; //! number of write calls unsigned long nFwdSeeks; //! number of seeks forward unsigned long nBwdSeeks; //! number of seeks backward unsigned long nXlFwdSeeks; //! number of seeks forward unsigned long nXlBwdSeeks; //! number of seeks backward unsigned long long rOffset; //! offset since last read operation on this file unsigned long long wOffset; //! offset since last write operation on this file //! Vector with all readv sizes -> to compute min,max,etc. std::vector monReadvBytes; //! Size of each read call coming from readv requests -> to compute min,max, etc. std::vector monReadSingleBytes; //! Number of individual read op. in each readv call -> to compute min,max, etc. std::vector monReadvCount; struct timeval cTime; ///< current time struct timeval rStart; ///< start of last read (layout) struct timeval rvStart; ///< start of last readv (layout) struct timeval wStart; ///< start of last read (layout) struct timeval lrTime; ///< last read time struct timeval lrvTime; ///< last readv time struct timeval lwTime; ///< last write time struct timeval rTime; ///< sum time to serve read requests in ms struct timeval rvTime; ///< sum time to server readv requests in ms struct timeval wTime; ///< sum time to serve write requests in ms //! Stat struct to check if a file is updated between open-close struct stat updateStat; double timeToOpen; ///< time the open call took double timeToClose; ///< time the close call took double timeToRead; ///< time of all layout reads double timeToReadV; ///< time of all layout readvs double timeToWrite; ///< time of all layout writes //! TPC related types and variables enum TpcType_t { kTpcNone = 0, ///< No TPC access kTpcSrcSetup = 1, ///< Access setting up a source TPC session kTpcDstSetup = 2, ///< Access setting up a destination TPC session kTpcSrcRead = 3, ///< Read access from a TPC destination kTpcSrcCanDo = 4, ///< Read access to evaluate if source available }; enum TpcState_t { kTpcIdle = 0, ///< TPC is not enabled (1st sync) kTpcRun = 1, ///< TPC is running (2nd sync) kTpcDone = 2, ///< TPC has finished }; int mTpcThreadStatus; ///< Status of the TPC thread - 0 valid otherwise error pthread_t mTpcThread; ///< Thread doing the TPC transfer TpcState_t mTpcState; ///< TPC transfer status TpcType_t mTpcFlag; ///< TPC access type XrdOfsTPCInfo mTpcInfo; ///< TPC info object used for callback XrdSysMutex mTpcJobMutex; ///< TPC job mutex std::string mTpcKey; ///< TPC key for a tpc file operation TpcInfo mFstTpcInfo; ///< FST TPC info struct bool mIsTpcDst; ///< If true this is a TPC destination, otherwise a source int mTpcRetc; ///< TPC job return code std::atomic mTpcCancel; ///< Mark TPC cancellation request uint16_t mTimeout; ///< timeout for layout operations bool mIsHttp; ///< Mark if this is HTTP acceess //---------------------------------------------------------------------------- //! Get configured minimum file size for which the asynchronous close method //! is called. //! //! @return min file size //---------------------------------------------------------------------------- static uint64_t GetAsyncCloseMinSize(); //---------------------------------------------------------------------------- //! Check if async close is configured //! //! @return true if enabled, otherwise false //---------------------------------------------------------------------------- static bool IsAsyncCloseConfigured(); //---------------------------------------------------------------------------- //! Get hostname from tident. This is used when checking the origin match for //! TPC transfers. It only extract the hostname without domain to avoid //! mismatch in cases where the same machine provides both IPV4 and IPV6 //! interfaces. Eg. root.1.2@eosbackup.cern.ch and //! root.1.2@eosbackup.ipv6.cern.ch should match //! //! @param tident xrootd like tident //! @param hostname output parameter holding the hostname //! //! @return true if parsing successful and hostname stores the desired value, //! otherwise false //---------------------------------------------------------------------------- static bool GetHostFromTident(const std::string& tident, std::string& hostname); //---------------------------------------------------------------------------- //! Filter out particular tags from the opaque information //! //! @param opaque opaque information to process //! @param tags set of tags to be filtered out //---------------------------------------------------------------------------- static void FilterTagsInPlace(std::string& opaque, const std::set tags); //---------------------------------------------------------------------------- //! Get TPC key expiration timestamp by combinding the client specified //! tpc.ttl value with the system TPC key minimum validity. The resulting //! value should always be between the system TPC key minimum validity and //! 15 minutes, unless the former the biggern than 15 min and then this will //! apply. //! //! @param tpc_tll client specified key ttl //! @param now_ts used for testing //! //! @return expiration timestamp for the current key //---------------------------------------------------------------------------- static time_t GetTpcKeyExpireTS(std::string_view tpc_ttl, time_t now_ts = 0); //---------------------------------------------------------------------------- //! Close internal method that can be called synchronously (from XRootD) or //! asynchronously from the thread pool for long running close operations. //! //! @return SFS_OK if close successful, otherwise SFS_ERROR //---------------------------------------------------------------------------- int _close(); //---------------------------------------------------------------------------- //! Low-level open calling the default XrdOfs plugin and begin called from //! one of the layout implementations. //---------------------------------------------------------------------------- int openofs(const char* fileName, XrdSfsFileOpenMode openMode, mode_t createMode, const XrdSecEntity* client, const char* opaque = 0); //---------------------------------------------------------------------------- //! Low-level read calling the default XrdOfs plugin //---------------------------------------------------------------------------- XrdSfsXferSize readofs(XrdSfsFileOffset fileOffset, char* buffer, XrdSfsXferSize buffer_size); //---------------------------------------------------------------------------- //! Low-level vector read calling the default XrdOfs plugin //---------------------------------------------------------------------------- XrdSfsXferSize readvofs(XrdOucIOVec* readV, uint32_t readCount); //---------------------------------------------------------------------------- //! Low-level write calling the default XrdOfs plugin //---------------------------------------------------------------------------- XrdSfsXferSize writeofs(XrdSfsFileOffset fileOffset, const char* buffer, XrdSfsXferSize buffer_size); //---------------------------------------------------------------------------- //! Low-level sync calling the default XrdOfs plugin //---------------------------------------------------------------------------- int syncofs(); //---------------------------------------------------------------------------- //! Low-level truncate calling the default XrdOfs plugin //---------------------------------------------------------------------------- int truncateofs(XrdSfsFileOffset fileOffset); //---------------------------------------------------------------------------- //! Low-level close calling the default XrdOfs plugin //---------------------------------------------------------------------------- int closeofs(); //---------------------------------------------------------------------------- //! Get physical path on the FST (local) //! //! @return local physical path //---------------------------------------------------------------------------- inline std::string GetFstPath() const { return mFstPath.c_str(); } //---------------------------------------------------------------------------- //! Return the Etag //---------------------------------------------------------------------------- const char* GetETag() const { return mEtag.c_str(); } //---------------------------------------------------------------------------- //! Enforce an mtime on close //---------------------------------------------------------------------------- void SetForcedMtime(unsigned long long mtime, unsigned long long mtime_ms) { mForcedMtime = mtime; mForcedMtime_ms = mtime_ms; } //---------------------------------------------------------------------------- //! Return current mtime while open //---------------------------------------------------------------------------- time_t GetMtime() const; //---------------------------------------------------------------------------- //! Return the file size seen at open time //---------------------------------------------------------------------------- inline off_t GetOpenSize() const { return openSize; } //---------------------------------------------------------------------------- //! Return the file id //---------------------------------------------------------------------------- unsigned long long GetFileId() const { return mFileId; } //---------------------------------------------------------------------------- //! Return checksum //---------------------------------------------------------------------------- eos::fst::CheckSum* GetChecksum() const { return mCheckSum.get(); } //---------------------------------------------------------------------------- //! Return FMD checksum //---------------------------------------------------------------------------- std::string GetFmdChecksum() const; //---------------------------------------------------------------------------- //! Check for chunked upload flag //---------------------------------------------------------------------------- bool IsChunkedUpload() const { return mIsOCchunk; } //---------------------------------------------------------------------------- //! Check if the TpcKey is still valid e.g. member of gOFS.TpcMap //---------------------------------------------------------------------------- bool TpcValid() const; //---------------------------------------------------------------------------- //! Process TPC (third-party-copy) opaque information i.e handle tags like //! tpc.key, tpc.dst, tpc.stage etc and also extract and decrypt the cap //! opaque info //! //! @param opaque opaque information //! @param client XrdSecEntity of client //! //! @return SFS_OK if successful, otherwise SFS_ERROR //---------------------------------------------------------------------------- int ProcessTpcOpaque(std::string& opaque, const XrdSecEntity* client); //---------------------------------------------------------------------------- //! Process open opaque information - this comes directly from the client //! or from the MGM redirection but it's not encrypted but sent in plain //! text in the URL //! //! @return SFS_OK if successful, otherwise SFS_ERROR //---------------------------------------------------------------------------- int ProcessOpenOpaque(); //---------------------------------------------------------------------------- //! Process cap opaque information - decisions that need to be taken based //! on the encrypted opaque info //! //! @param is_repair_read flag if this is a repair read //! @param vid client virtual identity //! //! @return SFS_OK if successful, otherwise SFS_ERROR //---------------------------------------------------------------------------- int ProcessCapOpaque(bool& is_repair_read, eos::common::VirtualIdentity& vid); //---------------------------------------------------------------------------- //! Process mixed opaque information - decisions that need to be taken based //! on both the ecrypted and un-encrypted opaque info //! //! @return SFS_OK if successful, otherwise SFS_ERROR //---------------------------------------------------------------------------- int ProcessMixedOpaque(); //---------------------------------------------------------------------------- //! Compute time to close a file //---------------------------------------------------------------------------- void CloseTime(); //---------------------------------------------------------------------------- //! Compute total time to serve read requests //---------------------------------------------------------------------------- void AddReadTime(); void AddLayoutReadTime(); //---------------------------------------------------------------------------- //! Compute total time to serve vector read requests //---------------------------------------------------------------------------- void AddReadVTime(); void AddLayoutReadVTime(); //---------------------------------------------------------------------------- //! Compute total time to serve write requests //---------------------------------------------------------------------------- void AddWriteTime(); void AddLayoutWriteTime(); //---------------------------------------------------------------------------- //! Compute general statistics on a set of input values //! //! @param vect input collection //! @param min minimum element //! @param max maximum element //! @param sum sum of the elements //! @param avg average value //! @param sigma sigma of the elements //---------------------------------------------------------------------------- template void ComputeStatistics(const std::vector& vect, T& min, T& max, T& sum, double& sigma); //---------------------------------------------------------------------------- //! Create report as a string //---------------------------------------------------------------------------- void MakeReportEnv(XrdOucString& reportString); //---------------------------------------------------------------------------- //! Static method used to start an asynchronous thread which is doing the //! TPC transfer //! //! @param arg XrdFstOfsFile instance object //---------------------------------------------------------------------------- static void* StartDoTpcTransfer(void* arg); //---------------------------------------------------------------------------- //! Do TPC transfer //---------------------------------------------------------------------------- void* DoTpcTransfer(); //---------------------------------------------------------------------------- //! Extract logid from the opaque info i.e. mgm.logid //! //! @param opaque opaque info //---------------------------------------------------------------------------- std::string ExtractLogId(const char* opaque) const; //---------------------------------------------------------------------------- //! Drop all replicas from the MGM //! //! @param fileid file id //! @param path file logical path @todo(esindril) redundant, should drop //! @param manager MGM hostname //! //! @return 0 if successful, otherwise error code //---------------------------------------------------------------------------- int DropAllFromMgm(eos::common::FileId::fileid_t fileid, const std::string path, const std::string manager); //---------------------------------------------------------------------------- //! Check if file has been modified while in use //! //! @return -1 if modified, otherwise 0 //---------------------------------------------------------------------------- int ModifiedWhileInUse(); //---------------------------------------------------------------------------- //! Verify checksum //! //! @return true if ok, otherwise false //---------------------------------------------------------------------------- bool VerifyChecksum(); //---------------------------------------------------------------------------- //! Queue file for CTA archiving //! //! @param statinfo The file stat structure //! @param queueing_errmsg Error message from CTA queueing //! @param archive_req_id Output parameter: The archive request ID returned by //! the ProtoEfEndPoint //! //! @return true if successful, otherwise false //---------------------------------------------------------------------------- bool QueueForArchiving(const struct stat& statinfo, std::string& queueing_errmsg, std::string& archive_req_id); //---------------------------------------------------------------------------- //! Notify the workflow protobuf endpoint that the user has closed a file that //! they were writing to //! //! @param file_id The id of the file //! @param file_lid The layout id of the file //! @param file_size The size of the file //! @param file_checksum The checksum of the file //! @param owner_uid The user id of the file owner //! @param owner_gid The group id of the file owner //! @param requestor_name Tha name of the user that closed the file //! @param requestor_groupname The name of the group that closed the file //! @param instance_name Tha name of the EOS instance //! @param fullpath The full path of the file //! @param manager_name The name of the EOS manager //! @param xattrs The extended attributes of teh file to be passed to the //! workflow protobuf endpoint //! @param errmsg_wfe Output parameter: Error message back from the workflow //! protobuf endpoint //! @param archive_req_id Output parameter: The archive request ID returned by //! the ProtoEfEndPoint //! //! @return 0 if successful, error code otherwise //---------------------------------------------------------------------------- int NotifyProtoWfEndPointClosew(uint64_t file_id, uint32_t file_lid, uint64_t file_size, const std::string& file_checksum, uint32_t owner_uid, uint32_t owner_gid, const std::string& requestor_name, const std::string& requestor_groupname, const std::string& instance_name, const std::string& fullpath, const std::string& manager_name, const std::map& xattrs, std::string& errmsg_wfe, std::string& archive_req_id); //---------------------------------------------------------------------------- //! Send archive failed event to the manager //! //! @param fid The file identifier //! @param errmsg The error message to enclosed in the archive failed event //! //! @return SFS_OK if successful //---------------------------------------------------------------------------- int SendArchiveFailedToManager(const uint64_t fid, const std::string& errmsg); //---------------------------------------------------------------------------- //! Decide if close should be done synchronously. There are cases when close //! should happen in the same thread eg. read, http tx, sink writes etc. //! //! @return true if close is synchronous, otherwise false //---------------------------------------------------------------------------- bool DoSyncClose(); }; //------------------------------------------------------------------------------ // Compute general statistics on a set of input values //------------------------------------------------------------------------------ template void XrdFstOfsFile::ComputeStatistics(const std::vector& vect, T& min, T& max, T& sum, double& sigma) { double avg, sum2; max = sum = sum2 = avg = sigma = 0; min = 0xffffffff; sum = std::accumulate(vect.begin(), vect.end(), static_cast(0)); avg = vect.size() ? (1.0 * sum / vect.size()) : 0; for (auto it = vect.begin(); it != vect.end(); ++it) { if (*it > max) { max = *it; } if (*it < min) { min = *it; } sum2 += std::pow((*it - avg), 2); } sigma = vect.size() ? (sqrt(sum2 / vect.size())) : 0; if (min == 0xffffffff) { min = 0; } } EOSFSTNAMESPACE_END #endif