//------------------------------------------------------------------------------ // File: ConverterDriver.hh // Author: Mihai Patrascoiu - CERN //------------------------------------------------------------------------------ /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2019 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #pragma once #include "common/Logging.hh" #include "common/ObserverMgr.hh" #include "mgm/Namespace.hh" #include "mgm/XrdMgmOfs.hh" #include "mgm/config/GlobalConfigStore.hh" #include "mgm/convert/ConversionJob.hh" #include "namespace/interface/IFileMD.hh" #include "namespace/ns_quarkdb/QdbContactDetails.hh" #include "namespace/ns_quarkdb/qclient/include/qclient/QClient.hh" #include "namespace/ns_quarkdb/qclient/include/qclient/structures/QHash.hh" EOSMGMNAMESPACE_BEGIN //! Forward declaration class ConversionJob; enum class ConversionJobStatus; static const std::string kConverterMaxThreads {"converter-max-threads"}; static const std::string kConverterMaxQueueSize {"converter-max-queuesize"}; //------------------------------------------------------------------------------ //! @brief Class running the conversion threadpool //------------------------------------------------------------------------------ class ConverterDriver : public eos::common::LogId { public: using JobInfoT = std::pair; using JobFailedT = std::pair; using JobStatusT = ConversionJobStatus; using ObserverT = eos::common::ObserverMgr; //---------------------------------------------------------------------------- //! Constructor //---------------------------------------------------------------------------- ConverterDriver(const eos::QdbContactDetails& qdb_details) : mQdbHelper(qdb_details), mIsRunning(false), mThreadPool(std::thread::hardware_concurrency(), cDefaultMaxThreadPoolSize, 10, 5, 3, "converter"), mMaxThreadPoolSize(cDefaultMaxThreadPoolSize), mMaxQueueSize(cDefaultMaxQueueSize), mTimestamp(), mObserverMgr(std::make_unique(4)), mConfigStore(std::make_unique(&FsView::gFsView)) {} //---------------------------------------------------------------------------- //! Destructor //---------------------------------------------------------------------------- ~ConverterDriver() { Stop(); } //---------------------------------------------------------------------------- //! Start converter thread //---------------------------------------------------------------------------- void Start(); //---------------------------------------------------------------------------- //! Stop converter thread and all running conversion jobs //---------------------------------------------------------------------------- void Stop(); //---------------------------------------------------------------------------- //! Schedule a conversion job with the given ID and conversion info. //! //! @param id the job id //! @param conversion_info the conversion info string //! @return true if scheduling succeeded, false otherwise //---------------------------------------------------------------------------- bool ScheduleJob(const eos::IFileMD::id_t& id, const std::string& conversion_info); //---------------------------------------------------------------------------- //! Get running state info //---------------------------------------------------------------------------- inline bool IsRunning() const { return mIsRunning; } //---------------------------------------------------------------------------- //! Get thread pool info //---------------------------------------------------------------------------- inline std::string GetThreadPoolInfo() const { return mThreadPool.GetInfo(); } //---------------------------------------------------------------------------- //! Get thread pool max size //---------------------------------------------------------------------------- inline uint32_t GetMaxThreadPoolSize() const { return mMaxThreadPoolSize.load(); } //---------------------------------------------------------------------------- //! Get number of running jobs //---------------------------------------------------------------------------- inline uint64_t NumRunningJobs() const { eos::common::RWMutexReadLock rlock(mJobsMutex); return mJobsRunning.size(); } //---------------------------------------------------------------------------- //! Get number of pending jobs stored in QuarkDB //---------------------------------------------------------------------------- inline uint64_t NumPendingJobs() { return mPendingJobs.size(); } //---------------------------------------------------------------------------- //! Get number of failed jobs stored in QuarkDB //---------------------------------------------------------------------------- inline uint64_t NumQdbFailedJobs() { return mQdbHelper.NumFailedJobs(); } //---------------------------------------------------------------------------- //! Get max queue size //---------------------------------------------------------------------------- inline uint32_t GetMaxQueueSize() const { return mMaxQueueSize.load(); } //---------------------------------------------------------------------------- //! Set maximum size of the converter thread pool //! //! @param max maximum threadpool value //---------------------------------------------------------------------------- inline void SetMaxThreadPoolSize(uint32_t max) { mThreadPool.SetMaxThreads(max); mMaxThreadPoolSize = max; mConfigStore->save(kConverterMaxThreads, std::to_string(max)); } //---------------------------------------------------------------------------- //! Set maximum queue size //! //! @param max maximum (submitted) queue size //---------------------------------------------------------------------------- inline void SetMaxQueueSize(uint32_t max) { mMaxQueueSize = max; mConfigStore->save(kConverterMaxQueueSize, std::to_string(max)); } //---------------------------------------------------------------------------- //! Get list of pending jobs //! //! @return list of pending jobs //---------------------------------------------------------------------------- inline std::vector GetPendingJobs() { return mQdbHelper.GetPendingJobs(); } //---------------------------------------------------------------------------- //! Get list of failed jobs //! //! @return list of failed jobs //---------------------------------------------------------------------------- inline std::vector GetFailedJobs() { return mQdbHelper.GetFailedJobs(); } //---------------------------------------------------------------------------- //! Clear list of pending jobs //---------------------------------------------------------------------------- void ClearPendingJobs() { return mQdbHelper.ClearPendingJobs(); } //---------------------------------------------------------------------------- //! Clear list of failed jobs //---------------------------------------------------------------------------- void ClearFailedJobs() { return mQdbHelper.ClearFailedJobs(); } //---------------------------------------------------------------------------- //! Get Observer Mgr, useful for other threads to register observers //---------------------------------------------------------------------------- auto getObserverMgr() { return mObserverMgr.get(); } private: struct QdbHelper { //-------------------------------------------------------------------------- //! Constructor //-------------------------------------------------------------------------- QdbHelper(const eos::QdbContactDetails& qdb_details) { mQcl = std::make_unique(qdb_details.members, qdb_details.constructOptions()); mQHashPending = qclient::QHash(*mQcl, kConversionPendingHashKey); mQHashFailed = qclient::QHash(*mQcl, kConversionFailedHashKey); } //-------------------------------------------------------------------------- //! Returns a QuarkDB iterator for the pending jobs hash. //! //! @return the pending jobs hash iterator //-------------------------------------------------------------------------- inline qclient::QHash::Iterator PendingJobsIterator() { return mQHashPending.getIterator(cBatchSize, "0"); } //-------------------------------------------------------------------------- //! Get list of pending jobs //! //! @return list of pending jobs //-------------------------------------------------------------------------- std::vector GetPendingJobs(); //-------------------------------------------------------------------------- //! Get list of failed jobs //! //! @return list of failed jobs //-------------------------------------------------------------------------- std::vector GetFailedJobs(); //-------------------------------------------------------------------------- //! Clear list of pending jobs //-------------------------------------------------------------------------- void ClearPendingJobs(); //-------------------------------------------------------------------------- //! Clear list of failed jobs //-------------------------------------------------------------------------- void ClearFailedJobs(); //-------------------------------------------------------------------------- //! Add conversion job to the queue of pending jobs in QuarkDB. //! //! @param jobinfo the pending conversion job details //! @return true if operation succeeded, false otherwise //-------------------------------------------------------------------------- bool AddPendingJob(const JobInfoT& jobinfo); //-------------------------------------------------------------------------- //! Add conversion job to the queue of failed jobs in QuarkDB. //! //! @param jobinfo the failed conversion job details //! @return true if operation succeeded, false otherwise //-------------------------------------------------------------------------- bool AddFailedJob(const std::shared_ptr& job); //-------------------------------------------------------------------------- //! Remove conversion job by id from the pending jobs queue in QuarkDB. //! //! @param id the conversion job id to remove //! @return true if operation succeeded, false otherwise //-------------------------------------------------------------------------- bool RemovePendingJob(const eos::IFileMD::id_t& id); //-------------------------------------------------------------------------- //! Returns the number of failed jobs or -1 in case of failed operation //-------------------------------------------------------------------------- int64_t NumFailedJobs(); //! QDB conversion hash keys const std::string kConversionPendingHashKey = "eos-conversion-jobs-pending"; const std::string kConversionFailedHashKey = "eos-conversion-jobs-failed"; static constexpr unsigned int cBatchSize{1000}; ///< Batch size constant private: std::unique_ptr mQcl; ///< Internal QClient object qclient::QHash mQHashPending; ///< QDB pending jobs hash object qclient::QHash mQHashFailed; ///< QDB failed jobs hash object }; //---------------------------------------------------------------------------- //! Initialize the saved value of config values like max threads/queue size //! from the config store //---------------------------------------------------------------------------- void InitConfig(); //---------------------------------------------------------------------------- //! Converter engine thread monitoring //! //! @param assistant converter thread //---------------------------------------------------------------------------- void Convert(ThreadAssistant& assistant) noexcept; //---------------------------------------------------------------------------- //! Handle jobs based on status //---------------------------------------------------------------------------- void HandleRunningJobs(); //---------------------------------------------------------------------------- //! Signal all conversion jobs to stop //---------------------------------------------------------------------------- void JoinAllConversionJobs(); //---------------------------------------------------------------------------- //! Submit pending jobs from QDB //---------------------------------------------------------------------------- void SubmitQdbPending(ThreadAssistant& assistant); //---------------------------------------------------------------------------- //! Observer job called when a conversion is done taking care of deleting //! the "proc" entry and updating the tracking information //! //! @param status conversion job status //! @para tag conversion tag info //---------------------------------------------------------------------------- static void CleanupObserver(ConverterDriver::JobStatusT status, std::string tag); //! Wait-time between jobs requests constant static constexpr unsigned int cDefaultRequestIntervalSec{60}; //! Default maximum thread pool size constant static constexpr unsigned int cDefaultMaxThreadPoolSize{100}; //! Max queue size from the thread pool when we delay new jobs static constexpr unsigned int cDefaultMaxQueueSize{1000}; AssistedThread mThread; ///< Thread controller object QdbHelper mQdbHelper; ///< QuarkDB helper object std::atomic mIsRunning; ///< Mark if converter is running eos::common::ThreadPool mThreadPool; ///< Thread pool for conversion jobs std::atomic mMaxThreadPoolSize; ///< Max threadpool size std::atomic mMaxQueueSize; ///< Max submitted queue size //! Timestamp of last jobs request std::chrono::steady_clock::time_point mTimestamp; //! Collection of running conversion jobs std::list> mJobsRunning; //! RWMutex protecting the jobs collections mutable eos::common::RWMutex mJobsMutex; ///! Pending jobs in memory eos::common::ConcurrentQueue mPendingJobs; std::unique_ptr mObserverMgr; std::unique_ptr mConfigStore; }; EOSMGMNAMESPACE_END