//------------------------------------------------------------------------------
// File: ConverterDriver.hh
// Author: Mihai Patrascoiu - CERN
//------------------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2019 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#pragma once
#include "common/Logging.hh"
#include "common/ObserverMgr.hh"
#include "mgm/Namespace.hh"
#include "mgm/XrdMgmOfs.hh"
#include "mgm/config/GlobalConfigStore.hh"
#include "mgm/convert/ConversionJob.hh"
#include "namespace/interface/IFileMD.hh"
#include "namespace/ns_quarkdb/QdbContactDetails.hh"
#include "namespace/ns_quarkdb/qclient/include/qclient/QClient.hh"
#include "namespace/ns_quarkdb/qclient/include/qclient/structures/QHash.hh"
EOSMGMNAMESPACE_BEGIN
//! Forward declaration
class ConversionJob;
enum class ConversionJobStatus;
static const std::string kConverterMaxThreads {"converter-max-threads"};
static const std::string kConverterMaxQueueSize {"converter-max-queuesize"};
//------------------------------------------------------------------------------
//! @brief Class running the conversion threadpool
//------------------------------------------------------------------------------
class ConverterDriver : public eos::common::LogId
{
public:
using JobInfoT = std::pair;
using JobFailedT = std::pair;
using JobStatusT = ConversionJobStatus;
using ObserverT = eos::common::ObserverMgr;
//----------------------------------------------------------------------------
//! Constructor
//----------------------------------------------------------------------------
ConverterDriver(const eos::QdbContactDetails& qdb_details) :
mQdbHelper(qdb_details), mIsRunning(false),
mThreadPool(std::thread::hardware_concurrency(), cDefaultMaxThreadPoolSize,
10, 5, 3, "converter"),
mMaxThreadPoolSize(cDefaultMaxThreadPoolSize),
mMaxQueueSize(cDefaultMaxQueueSize), mTimestamp(),
mObserverMgr(std::make_unique(4)),
mConfigStore(std::make_unique(&FsView::gFsView))
{}
//----------------------------------------------------------------------------
//! Destructor
//----------------------------------------------------------------------------
~ConverterDriver()
{
Stop();
}
//----------------------------------------------------------------------------
//! Start converter thread
//----------------------------------------------------------------------------
void Start();
//----------------------------------------------------------------------------
//! Stop converter thread and all running conversion jobs
//----------------------------------------------------------------------------
void Stop();
//----------------------------------------------------------------------------
//! Schedule a conversion job with the given ID and conversion info.
//!
//! @param id the job id
//! @param conversion_info the conversion info string
//! @return true if scheduling succeeded, false otherwise
//----------------------------------------------------------------------------
bool ScheduleJob(const eos::IFileMD::id_t& id,
const std::string& conversion_info);
//----------------------------------------------------------------------------
//! Get running state info
//----------------------------------------------------------------------------
inline bool IsRunning() const
{
return mIsRunning;
}
//----------------------------------------------------------------------------
//! Get thread pool info
//----------------------------------------------------------------------------
inline std::string GetThreadPoolInfo() const
{
return mThreadPool.GetInfo();
}
//----------------------------------------------------------------------------
//! Get thread pool max size
//----------------------------------------------------------------------------
inline uint32_t GetMaxThreadPoolSize() const
{
return mMaxThreadPoolSize.load();
}
//----------------------------------------------------------------------------
//! Get number of running jobs
//----------------------------------------------------------------------------
inline uint64_t NumRunningJobs() const
{
eos::common::RWMutexReadLock rlock(mJobsMutex);
return mJobsRunning.size();
}
//----------------------------------------------------------------------------
//! Get number of pending jobs stored in QuarkDB
//----------------------------------------------------------------------------
inline uint64_t NumPendingJobs()
{
return mPendingJobs.size();
}
//----------------------------------------------------------------------------
//! Get number of failed jobs stored in QuarkDB
//----------------------------------------------------------------------------
inline uint64_t NumQdbFailedJobs()
{
return mQdbHelper.NumFailedJobs();
}
//----------------------------------------------------------------------------
//! Get max queue size
//----------------------------------------------------------------------------
inline uint32_t GetMaxQueueSize() const
{
return mMaxQueueSize.load();
}
//----------------------------------------------------------------------------
//! Set maximum size of the converter thread pool
//!
//! @param max maximum threadpool value
//----------------------------------------------------------------------------
inline void SetMaxThreadPoolSize(uint32_t max)
{
mThreadPool.SetMaxThreads(max);
mMaxThreadPoolSize = max;
mConfigStore->save(kConverterMaxThreads, std::to_string(max));
}
//----------------------------------------------------------------------------
//! Set maximum queue size
//!
//! @param max maximum (submitted) queue size
//----------------------------------------------------------------------------
inline void SetMaxQueueSize(uint32_t max)
{
mMaxQueueSize = max;
mConfigStore->save(kConverterMaxQueueSize, std::to_string(max));
}
//----------------------------------------------------------------------------
//! Get list of pending jobs
//!
//! @return list of pending jobs
//----------------------------------------------------------------------------
inline std::vector GetPendingJobs()
{
return mQdbHelper.GetPendingJobs();
}
//----------------------------------------------------------------------------
//! Get list of failed jobs
//!
//! @return list of failed jobs
//----------------------------------------------------------------------------
inline std::vector GetFailedJobs()
{
return mQdbHelper.GetFailedJobs();
}
//----------------------------------------------------------------------------
//! Clear list of pending jobs
//----------------------------------------------------------------------------
void ClearPendingJobs()
{
return mQdbHelper.ClearPendingJobs();
}
//----------------------------------------------------------------------------
//! Clear list of failed jobs
//----------------------------------------------------------------------------
void ClearFailedJobs()
{
return mQdbHelper.ClearFailedJobs();
}
//----------------------------------------------------------------------------
//! Get Observer Mgr, useful for other threads to register observers
//----------------------------------------------------------------------------
auto getObserverMgr()
{
return mObserverMgr.get();
}
private:
struct QdbHelper {
//--------------------------------------------------------------------------
//! Constructor
//--------------------------------------------------------------------------
QdbHelper(const eos::QdbContactDetails& qdb_details)
{
mQcl = std::make_unique(qdb_details.members,
qdb_details.constructOptions());
mQHashPending = qclient::QHash(*mQcl, kConversionPendingHashKey);
mQHashFailed = qclient::QHash(*mQcl, kConversionFailedHashKey);
}
//--------------------------------------------------------------------------
//! Returns a QuarkDB iterator for the pending jobs hash.
//!
//! @return the pending jobs hash iterator
//--------------------------------------------------------------------------
inline qclient::QHash::Iterator PendingJobsIterator()
{
return mQHashPending.getIterator(cBatchSize, "0");
}
//--------------------------------------------------------------------------
//! Get list of pending jobs
//!
//! @return list of pending jobs
//--------------------------------------------------------------------------
std::vector GetPendingJobs();
//--------------------------------------------------------------------------
//! Get list of failed jobs
//!
//! @return list of failed jobs
//--------------------------------------------------------------------------
std::vector GetFailedJobs();
//--------------------------------------------------------------------------
//! Clear list of pending jobs
//--------------------------------------------------------------------------
void ClearPendingJobs();
//--------------------------------------------------------------------------
//! Clear list of failed jobs
//--------------------------------------------------------------------------
void ClearFailedJobs();
//--------------------------------------------------------------------------
//! Add conversion job to the queue of pending jobs in QuarkDB.
//!
//! @param jobinfo the pending conversion job details
//! @return true if operation succeeded, false otherwise
//--------------------------------------------------------------------------
bool AddPendingJob(const JobInfoT& jobinfo);
//--------------------------------------------------------------------------
//! Add conversion job to the queue of failed jobs in QuarkDB.
//!
//! @param jobinfo the failed conversion job details
//! @return true if operation succeeded, false otherwise
//--------------------------------------------------------------------------
bool AddFailedJob(const std::shared_ptr& job);
//--------------------------------------------------------------------------
//! Remove conversion job by id from the pending jobs queue in QuarkDB.
//!
//! @param id the conversion job id to remove
//! @return true if operation succeeded, false otherwise
//--------------------------------------------------------------------------
bool RemovePendingJob(const eos::IFileMD::id_t& id);
//--------------------------------------------------------------------------
//! Returns the number of failed jobs or -1 in case of failed operation
//--------------------------------------------------------------------------
int64_t NumFailedJobs();
//! QDB conversion hash keys
const std::string kConversionPendingHashKey = "eos-conversion-jobs-pending";
const std::string kConversionFailedHashKey = "eos-conversion-jobs-failed";
static constexpr unsigned int cBatchSize{1000}; ///< Batch size constant
private:
std::unique_ptr mQcl; ///< Internal QClient object
qclient::QHash mQHashPending; ///< QDB pending jobs hash object
qclient::QHash mQHashFailed; ///< QDB failed jobs hash object
};
//----------------------------------------------------------------------------
//! Initialize the saved value of config values like max threads/queue size
//! from the config store
//----------------------------------------------------------------------------
void InitConfig();
//----------------------------------------------------------------------------
//! Converter engine thread monitoring
//!
//! @param assistant converter thread
//----------------------------------------------------------------------------
void Convert(ThreadAssistant& assistant) noexcept;
//----------------------------------------------------------------------------
//! Handle jobs based on status
//----------------------------------------------------------------------------
void HandleRunningJobs();
//----------------------------------------------------------------------------
//! Signal all conversion jobs to stop
//----------------------------------------------------------------------------
void JoinAllConversionJobs();
//----------------------------------------------------------------------------
//! Submit pending jobs from QDB
//----------------------------------------------------------------------------
void SubmitQdbPending(ThreadAssistant& assistant);
//----------------------------------------------------------------------------
//! Observer job called when a conversion is done taking care of deleting
//! the "proc" entry and updating the tracking information
//!
//! @param status conversion job status
//! @para tag conversion tag info
//----------------------------------------------------------------------------
static void CleanupObserver(ConverterDriver::JobStatusT status,
std::string tag);
//! Wait-time between jobs requests constant
static constexpr unsigned int cDefaultRequestIntervalSec{60};
//! Default maximum thread pool size constant
static constexpr unsigned int cDefaultMaxThreadPoolSize{100};
//! Max queue size from the thread pool when we delay new jobs
static constexpr unsigned int cDefaultMaxQueueSize{1000};
AssistedThread mThread; ///< Thread controller object
QdbHelper mQdbHelper; ///< QuarkDB helper object
std::atomic mIsRunning; ///< Mark if converter is running
eos::common::ThreadPool mThreadPool; ///< Thread pool for conversion jobs
std::atomic mMaxThreadPoolSize; ///< Max threadpool size
std::atomic mMaxQueueSize; ///< Max submitted queue size
//! Timestamp of last jobs request
std::chrono::steady_clock::time_point mTimestamp;
//! Collection of running conversion jobs
std::list> mJobsRunning;
//! RWMutex protecting the jobs collections
mutable eos::common::RWMutex mJobsMutex;
///! Pending jobs in memory
eos::common::ConcurrentQueue mPendingJobs;
std::unique_ptr mObserverMgr;
std::unique_ptr mConfigStore;
};
EOSMGMNAMESPACE_END