//------------------------------------------------------------------------------ //! @file DrainTransfer.hh //! @author Elvin Sindrilaru - CERN //------------------------------------------------------------------------------ /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2017 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #pragma once #include "mgm/Namespace.hh" #include "common/FileId.hh" #include "common/Logging.hh" #include "common/FileSystem.hh" #include "proto/FileMd.pb.h" #include "XrdCl/XrdClCopyProcess.hh" EOSMGMNAMESPACE_BEGIN //! Forward declaration class DrainTransferJob; //------------------------------------------------------------------------------ //! Class DrainProgressHandler used to monitor the progress of the current //! drain transfers but also more importantly to cancel gracefully a running //! transfer. //------------------------------------------------------------------------------ class DrainProgressHandler: public XrdCl::CopyProgressHandler, public eos::common::LogId { friend class DrainTransferJob; public: //---------------------------------------------------------------------------- //! Constructor //---------------------------------------------------------------------------- DrainProgressHandler(): mDoCancel{false}, mProgress{0}, mBytesTransferred{0ull}, mStartTimestampSec(std::chrono::duration_cast (std::chrono::system_clock::now().time_since_epoch()).count()) {} //---------------------------------------------------------------------------- //! Notify when a new job is about to start //! //! @param jobNum the job number of the copy job concerned //! @param jobTotal total number of jobs being processed //! @param source the source url of the current job //! @param destination the destination url of the current job //---------------------------------------------------------------------------- void BeginJob(uint16_t jobNum, uint16_t jobTotal, const XrdCl::URL* source, const XrdCl::URL* destination) override { using namespace std::chrono; mStartTimestampSec = duration_cast (system_clock::now().time_since_epoch()).count(); } //---------------------------------------------------------------------------- //! Notify about the progress of the current job //! //! @param jobNum job number //! @param bytesProcessed bytes processed by the current job //! @param bytesTotal total number of bytes to be processed by job //---------------------------------------------------------------------------- void JobProgress(uint16_t jobNum, uint64_t bytesProcessed, uint64_t bytesTotal) override { mBytesTransferred = bytesProcessed; mProgress = static_cast((1.0 - (1.0 * (bytesTotal - bytesProcessed) / bytesTotal)) * 100); } //---------------------------------------------------------------------------- //! Determine whether the job should be canceled - this is used internally //! by the XrdCl::CopyProcess. //---------------------------------------------------------------------------- bool ShouldCancel(uint16_t jobNum) override { return mDoCancel; } //---------------------------------------------------------------------------- //! Mark drain job to be cancelled //---------------------------------------------------------------------------- void DoCancel() { mDoCancel = true; } private: std::atomic mDoCancel; ///< Mark if job should be cancelled std::atomic mProgress; ///< Progress percentage std::atomic mBytesTransferred; ///< Amount of data transferred std::atomic mStartTimestampSec; ///< Start timestamp in seconds }; //------------------------------------------------------------------------------ //! Class implementing the third party copy transfer, takes as input the //! file id and the destination filesystem //------------------------------------------------------------------------------ class DrainTransferJob: public eos::common::LogId { public: //! Status of a drain transfer job enum class Status {OK, Running, Failed, Ready}; //---------------------------------------------------------------------------- //! Constructor //! //! @param fid the file id //! @param fsid_src source file system id //! @param fsid_trg target file system id //! @param exclude_srcs set of fs ids which are to be excluded as sources //! @param exclude_dsts set of fs ids which are to be excluded as dest. //! @param drop_src mark if source replica should be dropped if operation //! is successful (default true) //! @param app_tag application tag for easy classification of job types //! @param balance_mode if true force transfer between given source and //! destination file systems //! @param vid virtual identity running the job //! @param repair_excluded if true then try to repair also the excluded srcs //---------------------------------------------------------------------------- DrainTransferJob(eos::common::FileId::fileid_t fid, eos::common::FileSystem::fsid_t fsid_src, eos::common::FileSystem::fsid_t fsid_trg = 0, std::set exclude_srcs = {}, std::set exclude_dsts = {}, bool drop_src = true, const std::string& app_tag = "drain", bool balance_mode = false, eos::common::VirtualIdentity vid = eos::common::VirtualIdentity::Root(), bool repair_excluded = false): mAppTag(app_tag), mFileId(fid), mFsIdSource(fsid_src), mFsIdTarget(fsid_trg), mTxFsIdSource(fsid_src), mStatus(Status::Ready), mRainAttempt(false), mRainReconstruct(false), mDropSrc(drop_src), mBalanceMode(balance_mode), mRepairExcluded(repair_excluded), mVid(vid) { mTriedSrcs.insert(exclude_srcs.begin(), exclude_srcs.end()); mExcludeDsts.insert(mExcludeDsts.begin(), exclude_dsts.begin(), exclude_dsts.end()); } //---------------------------------------------------------------------------- //! Destructor //---------------------------------------------------------------------------- virtual ~DrainTransferJob() = default; //---------------------------------------------------------------------------- //! Execute a third-party transfer //---------------------------------------------------------------------------- virtual void DoIt() noexcept; //---------------------------------------------------------------------------- //! Cancel ongoing TPC transfer //---------------------------------------------------------------------------- inline void Cancel() { mProgressHandler.DoCancel(); } //---------------------------------------------------------------------------- //! Log error message and save it //! //! @param error error message //---------------------------------------------------------------------------- void ReportError(const std::string& error); //---------------------------------------------------------------------------- //! Set drain transfer status //! //! @param status new drain transfer status //---------------------------------------------------------------------------- inline void SetStatus(DrainTransferJob::Status status) { mStatus = status; } //---------------------------------------------------------------------------- //! Get drain transfer status //---------------------------------------------------------------------------- virtual DrainTransferJob::Status GetStatus() const { return mStatus.load(); } //---------------------------------------------------------------------------- //! Get drain job info based on the requested tags //! //! @param tags set of tags that the requestor would like to get inf about //! //! @param map of tags to corresponding information collected from the job //---------------------------------------------------------------------------- std::list GetInfo(const std::list& tags) const; //---------------------------------------------------------------------------- //! Update MGM stats depending on the type of transfer. The generic //! DrainTransferJob can be used in different components eg. fsck. For the //! time being this will update the MGM statistics only for the drainer. //---------------------------------------------------------------------------- void UpdateMgmStats(); #ifdef IN_TEST_HARNESS public: #else private: #endif //---------------------------------------------------------------------------- //! Struct holding info about a file to be drained //---------------------------------------------------------------------------- struct FileDrainInfo { std::string mFullPath; eos::ns::FileMdProto mProto; }; //---------------------------------------------------------------------------- //! Get file metadata info. Depending on the MGM configuration this will use //! the in-memory approach with namespace locking or the qclient to connect //! directy to QDB without any locking. //! //! @return file drain info object or throws and MDException //---------------------------------------------------------------------------- FileDrainInfo GetFileInfo() const; //---------------------------------------------------------------------------- //! Build TPC source url //! //! @param fdrain file to be drained info //! @param log_id transfer log id //! //! @return XrdCl source URL //---------------------------------------------------------------------------- XrdCl::URL BuildTpcSrc(const FileDrainInfo& fdrain, const std::string& log_id); //---------------------------------------------------------------------------- //! Build TPC destination url //! //! @param fdrain file to be drained info //! @param log_id transfer log id //! //! @return XrdCl destination URL //---------------------------------------------------------------------------- XrdCl::URL BuildTpcDst(const FileDrainInfo& fdrain, const std::string& log_id); //---------------------------------------------------------------------------- //! Select destiantion file system for current transfer //! //! @param fdrain file to drain metadata info //! //! @return true if successful, otherwise false //---------------------------------------------------------------------------- bool SelectDstFs(const FileDrainInfo& fdrain); //---------------------------------------------------------------------------- //! Drain 0-size file //! //! @param fdrain file to drain metadata info //! //! @return drain status //---------------------------------------------------------------------------- Status DrainZeroSizeFile(const FileDrainInfo& fdrain); std::string mAppTag; ///< Application tag for the transfer const eos::common::FileId::fileid_t mFileId; ///< File id to transfer //! Source and destination file system std::atomic mFsIdSource; std::atomic mFsIdTarget; //! Actual file system id used for the current drain transfer, can point to //! the file system of a replica of the file std::atomic mTxFsIdSource; std::string mErrorString; ///< Error message std::atomic mStatus; ///< Status of the drain job std::set mTriedSrcs; ///< Tried src std::vector mExcludeDsts; ///< Excluded dest bool mRainAttempt; ///< Rain transfers are attempted only once bool mRainReconstruct; ///< Mark rain reconstruction bool mDropSrc; ///< Mark if source replicas should be dropped //! In balance mode the source and destination file systems are enforced bool mBalanceMode; bool mRepairExcluded; ///< Mark if mTriedSrcs should be included in recfs for rain layouts DrainProgressHandler mProgressHandler; ///< TPC progress handler eos::common::VirtualIdentity mVid; /// VID triggering the job }; EOSMGMNAMESPACE_END