// ----------------------------------------------------------------------
// File: WFE.hh
// Author: Andreas-Joachim Peters - CERN
// ----------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2011 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#ifndef __EOSMGM_WFE__HH__
#define __EOSMGM_WFE__HH__
#include "mgm/Namespace.hh"
#include "common/Mapping.hh"
#include "common/Timing.hh"
#include "common/FileId.hh"
#include "common/ThreadPool.hh"
#include "common/AssistedThread.hh"
#include "common/xrootd-ssi-protobuf-interface/eos_cta/include/CtaFrontendApi.hpp"
#include "proto/ConsoleReply.pb.h"
#include "XrdOuc/XrdOucString.hh"
#include "XrdOuc/XrdOucErrInfo.hh"
#include "Xrd/XrdJob.hh"
#include
//! Forward declaration
class XrdScheduler;
EOSMGMNAMESPACE_BEGIN
/**
* @file WFE.hh
*
* @brief This class implements an WFE engine
*
*/
class WFE
{
private:
//............................................................................
// variables for the WFE thread
//............................................................................
AssistedThread mThread; //< thread id of the WFE thread
time_t mMs; //< forced sleep time used for find / scans
eos::common::VirtualIdentity mRootVid; //< we operate with the root vid
XrdOucErrInfo mError; //< XRootD error object
/// number of all jobs which are queued and didn't run yet
std::atomic_uint_least32_t mActiveJobs;
/// condition variable to get signalled for a done job
XrdSysCondVar mDoneSignal;
public:
/* Default Constructor - use it to run the WFE thread by calling Start
*/
WFE();
/**
* @brief get the millisecond sleep time for find
* @return configured sleep time
*/
time_t GetMs()
{
return mMs;
}
/**
* @brief set the millisecond sleep time for find
* @param ms sleep time in milliseconds to enforce
*/
void SetMs(time_t ms)
{
mMs = ms;
}
/* Start the WFE thread engine
*/
bool Start();
/* Stop the WFE thread engine
*/
void Stop();
/* WFE method doing the actual policy scrubbing
*/
void WFEr(ThreadAssistant& assistant) noexcept;
/**
* @brief Destructor
*
*/
~WFE()
{
Stop();
std::cerr << __FUNCTION__ << ":: end of destructor" << std::endl;
}
class Job : XrdJob
{
public:
struct Action {
Action(std::string a, std::string e, time_t when, std::string workflow,
std::string queue)
{
mAction = std::move(a);
mEvent = std::move(e);
mTime = when;
mWorkflow = std::move(workflow);
mQueue = std::move(queue);
XrdOucString tst;
mWhen = eos::common::StringConversion::GetSizeString(tst,
(unsigned long long) when);
mDay = eos::common::Timing::UnixTimestamp_to_Day(when);
}
Action(std::string a, std::string e, time_t when, std::string savedOnDay,
std::string workflow,
std::string queue) : Action(std::move(a), std::move(e), when,
std::move(workflow), std::move(queue))
{
mSavedOnDay = std::move(savedOnDay);
}
std::string mAction;
std::string mEvent;
time_t mTime; //! unix timestamp
std::string mWhen; //! string with unix timestamp
std::string mDay; //! string with yearmonthday
std::string mSavedOnDay; //! string with yearmonthday
std::string mWorkflow;
std::string mQueue;
};
Job()
{
mFid = 0;
mRetry = 0;
}
Job(eos::common::FileId::fileid_t fid,
const eos::common::VirtualIdentity& vid,
const std::string& errorMessage = "")
{
mFid = fid;
mRetry = 0;
mVid = vid;
mErrorMesssage = errorMessage;
}
~Job() override = default;
Job(const Job& other)
{
mActions = other.mActions;
mFid = other.mFid;
mDescription = other.mDescription;
mRetry = other.mRetry;
mErrorMesssage = other.mErrorMesssage;
}
// ---------------------------------------------------------------------------
// Job execution function
// ---------------------------------------------------------------------------
void DoIt() override
{
std::string errorMsg;
DoIt(false, errorMsg);
}
//! @ininfo original opaque information of the URL that triggered the event
int DoIt(bool issync, std::string& errorMsg, const char * const ininfo = nullptr);
//! @brief Handles "proto" method events
//! @param errorMsg out parameter giving the text of any error
//! @ininfo original opaque information of the URL that triggered the event
//! @return SFS_OK if success
int HandleProtoMethodEvents(std::string& errorMsg, const char * const ininfo);
//! @brief Handles a "proto" method "prepare" event
//! @param fullPath the full path of the file
//! @ininfo original opaque information of the URL that triggered the event
//! @param errorMsg out parameter giving the text of any error response
int HandleProtoMethodPrepareEvent(const std::string &fullPath, const char * const ininfo, std::string& errorMsg);
//! @brief Handles a "proto" method "abort_prepare" event
//! @param fullPath the full path of the file
//! @ininfo original opaque information of the URL that triggered the event
//! @param errorMsg out parameter giving the text of any error response
int HandleProtoMethodAbortPrepareEvent(const std::string &fullPath, const char * const ininfo, std::string &errorMsg);
//! @brief Handles a "proto" method "evict_prepare" event
//! @param fullPath the full path of the file
//! @ininfo original opaque information of the URL that triggered the event
//! @param errorMsg out parameter giving the text of any error response
int HandleProtoMethodEvictPrepareEvent(const std::string &fullPath, const char * const ininfo, std::string& errorMsg);
//! @brief Handles a "proto" method "create" event
//! @param fullPath the full path of the file
//! @param errorMsg out parameter giving the text of any error response
int HandleProtoMethodCreateEvent(const std::string &fullPath, std::string &errorMsg);
//! @brief Handles a "proto" method "delete" event
//! @param fullPath the full path of the file
//! @param errorMsg out parameter giving the text of any error response
int HandleProtoMethodDeleteEvent(const std::string &fullPath, std::string &errorMsg);
//! @brief Handles a "proto" method "close" event
//! @param event the precise name of the event
//! @param fullPath the full path of the file
//! @ininfo original opaque information of the URL that triggered the event
int HandleProtoMethodCloseEvent(const std::string &event, const std::string &fullPath,
const char * const ininfo);
//! @brief Handles a "proto" method "archived" event
//! @param event the precise name of the event
//! @param fullPath the full path of the file
//! @ininfo original opaque information of the URL that triggered the event
int HandleProtoMethodArchivedEvent(const std::string &event, const std::string &fullPath,
const char * const ininfo);
//! @brief Handles a "proto" method "retrieve_failed" event
//! @param fullPath the full path of the file
int HandleProtoMethodRetrieveFailedEvent(const std::string &fullPath);
//! @brief Handles a "proto" method "archive_failed" event
//! @param fullPath the full path of the file
int HandleProtoMethodArchiveFailedEvent(const std::string &fullPath);
//! @brief Handles a "proto" method "offline" event
//! @param fullPath the full path of the file
//! @param ininfo original opaque information of the URL that triggered the event
//! @param errorMsg out parameter giving the text of any error response
int HandleProtoMethodOfflineEvent(const std::string &fullPath, const char * const ininfo, std::string& errorMsg);
//! @brief Handles a "proto" method "update_fid" event
//! @param fullPath the full path of the file
//! @param errorMsg out parameter giving the text of any error response
int HandleProtoMethodUpdateFidEvent(const std::string &fullPath, std::string &errorMsg);
//! @brief Resets the extended attributes for tracking retrieve requests
//! @param fullPath the full path of the file
void resetRetrieveIdListAndErrorMsg(const std::string &fullPath);
//! @brief This method is used for communicating proto event requests
//! @param jobPtr pointer to the job of the event
//! @param fullPath path of the file which trigered the event
//! @param request the protobuf request object
//! @param retry should retry the request
//! @param errorMsg out parameter giving the text of any error response
//! @return whether it was successful or not
static int SendProtoWFRequest(Job* jobPtr, const std::string& fullPath,
const cta::xrd::Request& request, std::string& errorMsg,
bool retry = false);
//! @brief Execute evict as user root
//!
//! @param fid The file identifier
//! @return evict result
static console::ReplyProto EvictAsRoot(const eos::IFileMD::id_t fid);
//! @brief Returns true if the 'file archived' garbage collector should be
//! enabled
//! @param space name of the space in which to look for the value of the
//! corresponding configuration variable
//! @return true if the 'file archived' garbage collector should be enabled
bool GetFileArchivedGCEnabled(const std::string &space);
// -------------------------------------------------------------------------
// persistency related methods
// -------------------------------------------------------------------------
int Save(std::string queue, time_t& when, int action = 0, int retry = 0);
int Load(std::string path2entry);
int Move(std::string from_queue, std::string to_queue, time_t& when,
int retry = 0);
int Results(std::string queue, int retc, XrdOucString log, time_t when);
int Delete(std::string queue, std::string fromDay);
// -------------------------------------------------------------------------
void AddAction(const std::string& action,
const std::string& event,
time_t when,
const std::string& workflow,
const std::string& queue)
{
Action thisaction(action, event, when, workflow, queue);
mActions.push_back(thisaction);
mDescription += action;
mDescription += " ";
mDescription += "/";
mDescription += event;
mDescription += "/";
std::string tst;
mDescription += eos::common::StringConversion::GetSizeString(tst,
(unsigned long long) when);
mDescription += "/";
mDescription += workflow;
mDescription += "/";
mDescription += queue;
mDescription += "/";
mDescription += eos::common::StringConversion::GetSizeString(tst,
(unsigned long long) mFid);
}
void AddAction(const std::string& action,
const std::string& event,
time_t when,
const std::string& savedOnDay,
const std::string& workflow,
const std::string& queue)
{
AddAction(action, event, when, workflow, queue);
mActions[mActions.size() - 1].mSavedOnDay = savedOnDay;
}
bool IsSync(const std::string& event = "")
{
return (event.length() ? event.substr(0, 6) : mActions[0].mEvent.substr(0,
6)) == "sync::";
}
std::vector mActions;
eos::common::FileId::fileid_t mFid;
std::string mDescription;
eos::common::VirtualIdentity mVid;
std::string mWorkflowPath;
std::string mErrorMesssage;
int mRetry;///! number of retries
private:
//! @brief moving proto wf event jobs to retry queue
//! @param filePath the path of the file concerned
void MoveToRetry(const std::string& filePath);
//! @brief move to queues based on the results
//! @param rcode the return code of the job
void MoveWithResults(int rcode, std::string fromQueue = "r");
//! @brief get prepare request identifier from specified opaque information
//! @param ininfo opaque information
//! @return prepare request identifier
//! @throw MDException if the prepare request identifier does not exist in the
//! opaque information or if it has no value
std::string GetPrepareRequestIdFromOpaqueData(const char* const ininfo);
//! @brief get prepare request activity from specified opaque information
//! @param ininfo opaque information
//! @return prepare request activity, or empty string if no activity is found
std::string GetPrepareActivityFromOpaqueData(const char* const ininfo);
//! @brief Queues a prepare request if necessary
//! @param fullPath the full path of the file
//! @param prepareRequestId prepare request identifier
//! @param prepareActivity prepare request activity, can be empty string
//! @param errorMsg out parameter giving the text of any error response
int IdempotentPrepare(const std::string& fullPath, const std::string &prepareRequestId, const std::string& prepareActivity, std::string& errorMsg);
};
XrdSysCondVar* GetSignal()
{
return &mDoneSignal;
}
// ---------------------------------------------------------------------------
//! Decrement the number of active jobs in the workflow engine
// ---------------------------------------------------------------------------
void DecActiveJobs()
{
mActiveJobs--;
PublishActiveJobs();
}
// ---------------------------------------------------------------------------
//! Increment the number of active jobs in this converter
// ---------------------------------------------------------------------------
void IncActiveJobs()
{
mActiveJobs++;
PublishActiveJobs();
}
// ---------------------------------------------------------------------------
//! Publish the number of active jobs in the workflow engine
// ---------------------------------------------------------------------------
void PublishActiveJobs();
// ---------------------------------------------------------------------------
//! Return active jobs
// ---------------------------------------------------------------------------
inline auto GetActiveJobs() -> decltype(mActiveJobs.load()) const
{
return mActiveJobs.load();
}
static std::string GetUserName(uid_t uid);
static std::string GetGroupName(gid_t gid);
static IContainerMD::XAttrMap CollectAttributes(const std::string& fullPath);
static void MoveFromRBackToQ();
/// the scheduler class is providing a destructor-less object,
/// so we have to create once a singleton of this and keep/share it
static XrdSysMutex gSchedulerMutex;
/// singleton object of a scheduler
static XrdScheduler* gScheduler;
};
EOSMGMNAMESPACE_END
#endif