//------------------------------------------------------------------------------
// File: ConversionJob.hh
// Author: Mihai Patrascoiu - CERN
//------------------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2019 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#pragma once
#include "mgm/Namespace.hh"
#include "mgm/XrdMgmOfs.hh"
#include "mgm/convert/ConversionInfo.hh"
#include "common/FileId.hh"
#include "common/LayoutId.hh"
#include "common/FileSystem.hh"
#include "namespace/interface/IFileMD.hh"
#include "XrdCl/XrdClCopyProcess.hh"
EOSMGMNAMESPACE_BEGIN
//! Forward declaration
class ConversionJob;
enum class ConversionJobStatus { DONE, RUNNING, PENDING, FAILED };
static std::string EOS_APP_NAME = "eos/converter";
//------------------------------------------------------------------------------
//! @brief Class used for monitoring the progress of a running
//! Conversion Job. Cancellation is also allowed via this class.
//------------------------------------------------------------------------------
class ConversionProgressHandler : public XrdCl::CopyProgressHandler,
public eos::common::LogId
{
public:
friend class ConversionJob;
//----------------------------------------------------------------------------
//! Constructor
//----------------------------------------------------------------------------
ConversionProgressHandler() :
mCancel(false), mProgress(0), mBytesProcessed(0),
mStartTimestamp(0) {}
//----------------------------------------------------------------------------
//! Notify when a new job is about to start
//!
//! @param jobNum the job number of the copy job concerned
//! @param jobTotal total number of jobs being processed
//! @param source the source url of the current job
//! @param destination the destination url of the current job
//----------------------------------------------------------------------------
void BeginJob(uint16_t jobNum, uint16_t jobTotal,
const XrdCl::URL* source,
const XrdCl::URL* destination) override
{
mStartTimestamp = std::chrono::duration_cast(
std::chrono::system_clock::now().time_since_epoch()).count();
}
//----------------------------------------------------------------------------
//! Notify about the progress of the current job
//!
//! @param jobNum job number
//! @param bytesProcessed bytes processed by the current job
//! @param bytesTotal total number of bytes to be processed
//----------------------------------------------------------------------------
void JobProgress(uint16_t jobNum, uint64_t bytesProcessed,
uint64_t bytesTotal) override
{
mBytesProcessed = bytesProcessed;
mProgress = static_cast(100.0 * bytesProcessed / bytesTotal);
}
//----------------------------------------------------------------------------
//! Determine whether the job should be cancelled
//----------------------------------------------------------------------------
bool ShouldCancel(uint16_t jobNum) override
{
return mCancel;
}
//----------------------------------------------------------------------------
//! Trigger job cancellation
//----------------------------------------------------------------------------
inline void Cancel()
{
mCancel = true;
}
private:
std::atomic mCancel; ///< Flag whether job should be cancelled
std::atomic mProgress; ///< Job completion percentage
std::atomic mBytesProcessed; ///< Bytes processed by job so far
std::atomic mStartTimestamp; ///< Timestamp of job start
};
//------------------------------------------------------------------------------
//! @brief Class executing a third-party conversion job
//------------------------------------------------------------------------------
class ConversionJob : public eos::common::LogId
{
public:
//! Possible status of a conversion job
using Status = ConversionJobStatus;
//----------------------------------------------------------------------------
//! Constructor
//!
//! @param fid the file id to convert
//----------------------------------------------------------------------------
ConversionJob(const eos::IFileMD::id_t fid,
const ConversionInfo& conversion_info);
//----------------------------------------------------------------------------
//! Destructor
//----------------------------------------------------------------------------
~ConversionJob();
//----------------------------------------------------------------------------
//! Execute a third-party copy
//----------------------------------------------------------------------------
void DoIt() noexcept;
//----------------------------------------------------------------------------
//! Cancel running third-party copy
//----------------------------------------------------------------------------
inline void Cancel()
{
mProgressHandler.Cancel();
}
//----------------------------------------------------------------------------
//! Get the conversion job status
//----------------------------------------------------------------------------
inline ConversionJob::Status GetStatus() const
{
return mStatus.load(std::memory_order_relaxed);
}
//----------------------------------------------------------------------------
//! Get the conversion string
//----------------------------------------------------------------------------
inline std::string GetConversionString() const
{
return mConversionInfo.ToString();
}
//----------------------------------------------------------------------------
//! Get the conversion job file id
//----------------------------------------------------------------------------
inline eos::IFileMD::id_t GetFid() const
{
assert(mFid == mConversionInfo.mFid);
return mFid;
}
//----------------------------------------------------------------------------
//! Get the conversion error message
//----------------------------------------------------------------------------
inline std::string GetErrorMsg() const
{
return mErrorString;
}
private:
//----------------------------------------------------------------------------
//! Merge original and the newly converted one so that the initial file
//! identifier and all the rest of the metadata information is preserved.
//!
//! @return true if successful, otherwise false
//----------------------------------------------------------------------------
bool Merge();
//----------------------------------------------------------------------------
//! Log the error message, store it and set the job as failed
//!
//! @param errmsg the error message
//! @param details additional details
//----------------------------------------------------------------------------
void HandleError(const std::string& emsg, const std::string& details = "");
//----------------------------------------------------------------------------
//! Construct CGI from conversion info
//!
//! @param info conversion info to process
//!
//! @return string containing conversion CGI
//----------------------------------------------------------------------------
std::string ConversionCGI(const ConversionInfo& info) const;
const eos::IFileMD::id_t mFid; ///< Conversion file id
const ConversionInfo mConversionInfo; ///< Conversion details
std::string mSourcePath; ///< Path of file to be converted
std::string mConversionPath; ///< Path of newly converted file
std::atomic mStatus; ///< Conversion job status
std::string mErrorString; ///< Error message
ConversionProgressHandler mProgressHandler; ///< Conversion progress handler
};
EOSMGMNAMESPACE_END