//------------------------------------------------------------------------------
// File: ConverterDriver.cc
// Author: Mihai Patrascoiu - CERN
//------------------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2019 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "mgm/convert/ConverterDriver.hh"
#include "mgm/IMaster.hh"
EOSMGMNAMESPACE_BEGIN
constexpr unsigned int ConverterDriver::cDefaultRequestIntervalSec;
constexpr unsigned int ConverterDriver::QdbHelper::cBatchSize;
//------------------------------------------------------------------------------
// Start converter thread
//------------------------------------------------------------------------------
void
ConverterDriver::Start()
{
if (!mIsRunning.load()) {
mIsRunning = true;
mThread.reset(&ConverterDriver::Convert, this);
}
}
//------------------------------------------------------------------------------
// Stop converter thread and all running conversion jobs
//------------------------------------------------------------------------------
void
ConverterDriver::Stop()
{
mThread.join();
}
//------------------------------------------------------------------------------
// Converter engine thread monitoring
//------------------------------------------------------------------------------
void
ConverterDriver::Convert(ThreadAssistant& assistant) noexcept
{
JobInfoT info;
eos_notice("%s", "msg=\"starting converter engine\"");;
gOFS->WaitUntilNamespaceIsBooted(assistant);
// Wait that current MGM becomes a master
do {
eos_debug("%s", "msg=\"converter waiting for master MGM\"");
assistant.wait_for(std::chrono::seconds(10));
} while (!assistant.terminationRequested() && !gOFS->mMaster->IsMaster());
InitConfig();
SubmitQdbPending(assistant);
// Register a clean-up observer for finished conversion jobs
eos::common::observer_tag_t deleter_tag(0);
do {
deleter_tag = mObserverMgr->addObserver(CleanupObserver);
if (!deleter_tag) {
eos_crit("%s", "msg=\"failed cleanup observer registration, retry in 30s\"");
assistant.wait_for(std::chrono::seconds(30));
}
} while (!deleter_tag && !assistant.terminationRequested());
while (!assistant.terminationRequested()) {
while (!mPendingJobs.try_pop(info) && !assistant.terminationRequested()) {
HandleRunningJobs();
assistant.wait_for(std::chrono::seconds(5));
}
while ((mThreadPool.GetQueueSize() > mMaxQueueSize) &&
!assistant.terminationRequested()) {
eos_static_notice("%s", "msg=\"convert thread pool queue full, delay "
"pending jobs\"");
assistant.wait_for(std::chrono::seconds(5));
}
auto fid = info.first;
auto conversion_info = ConversionInfo::parseConversionString(info.second);
if (conversion_info != nullptr) {
auto job = std::make_shared(fid, *conversion_info.get());
mThreadPool.PushTask([ = ]() {
return job->DoIt();
});
eos::common::RWMutexWriteLock wlock(mJobsMutex);
mJobsRunning.push_back(job);
} else {
eos_static_err("msg=\"invalid conversion scheduled\" fxid=%08llx "
"conversion_id=%s", fid, info.second.c_str());
mQdbHelper.RemovePendingJob(fid);
gOFS->mFidTracker.RemoveEntry(fid);
}
HandleRunningJobs();
}
JoinAllConversionJobs();
mIsRunning = false;
eos_static_notice("%s", "msg=\"stopped converter engine\"");;
}
//------------------------------------------------------------------------------
// Observer job called when a conversion is done
//------------------------------------------------------------------------------
void
ConverterDriver::CleanupObserver(ConverterDriver::JobStatusT status,
std::string tag)
{
if (status != ConverterDriver::JobStatusT::DONE &&
status != ConverterDriver::JobStatusT::FAILED) {
eos_static_warning("msg=\"skip cleanup for job not completed\" tag=\"%s\"",
tag.c_str());
return;
}
auto info = ConversionInfo::parseConversionString(tag);
if (!info) {
eos_static_crit("msg=\"failed conversion info parsing\" tag=\"%s\"",
tag.c_str());
return;
}
auto rootvid = eos::common::VirtualIdentity::Root();
auto converter_path = SSTR(gOFS->MgmProcConversionPath << "/"
<< info->ToString());
XrdOucErrInfo error;
gOFS->_rem(converter_path.c_str(), error, rootvid, (const char*)0);
gOFS->mFidTracker.RemoveEntry(info->mFid);
}
//------------------------------------------------------------------------------
// Submit pending jobs from QDB
//------------------------------------------------------------------------------
void
ConverterDriver::SubmitQdbPending(ThreadAssistant& assistant)
{
const auto lst_pending = mQdbHelper.GetPendingJobs();
for (const auto& info : lst_pending) {
auto id = info.first;
auto conversion_info = ConversionInfo::parseConversionString(info.second);
if (!gOFS->mFidTracker.AddEntry(id, TrackerType::Convert)) {
eos_static_debug("msg=\"skip recently scheduled file\" fxid=%08llx", id);
continue;
}
if (conversion_info != nullptr) {
auto job = std::make_shared(id, *conversion_info.get());
mThreadPool.PushTask([ = ]() {
return job->DoIt();
});
eos::common::RWMutexWriteLock wlock(mJobsMutex);
mJobsRunning.push_back(job);
}
while ((mThreadPool.GetQueueSize() > mMaxQueueSize) &&
!assistant.terminationRequested()) {
assistant.wait_for(std::chrono::seconds(5));
}
if (assistant.terminationRequested()) {
break;
}
}
}
//------------------------------------------------------------------------------
// Handle jobs based on status
//------------------------------------------------------------------------------
void
ConverterDriver::HandleRunningJobs()
{
eos::common::RWMutexWriteLock wlock(mJobsMutex);
for (auto it = mJobsRunning.begin(); it != mJobsRunning.end(); /**/) {
if (auto job_status = (*it)->GetStatus();
(job_status == ConversionJob::Status::DONE) ||
(job_status == ConversionJob::Status::FAILED)) {
auto fid = (*it)->GetFid();
if (!mQdbHelper.RemovePendingJob(fid)) {
eos_static_err("msg=\"Failed to remove conversion job from QuarkDB\" "
"fid=%llu", fid);
}
if (job_status == ConversionJob::Status::FAILED) {
mQdbHelper.AddFailedJob(*it);
}
mObserverMgr->notifyChange(job_status, (*it)->GetConversionString());
it = mJobsRunning.erase(it);
} else {
++it;
}
}
}
//------------------------------------------------------------------------------
// Signal all conversion jobs to stop
//------------------------------------------------------------------------------
void
ConverterDriver::JoinAllConversionJobs()
{
eos_notice("%s", "msg=\"stopping all running conversion jobs\"");
HandleRunningJobs();
{
eos::common::RWMutexReadLock rlock(mJobsMutex);
for (auto& job : mJobsRunning) {
if (job->GetStatus() == ConversionJob::Status::RUNNING) {
job->Cancel();
}
}
for (auto& job : mJobsRunning) {
while ((job->GetStatus() == ConversionJob::Status::RUNNING) ||
(job->GetStatus() == ConversionJob::Status::PENDING)) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
}
eos::common::RWMutexWriteLock wlock(mJobsMutex);
mJobsRunning.clear();
}
//------------------------------------------------------------------------------
// Schedule a conversion job with the given ID and conversion string
//------------------------------------------------------------------------------
bool
ConverterDriver::ScheduleJob(const eos::IFileMD::id_t& id,
const std::string& conversion_info)
{
if (mPendingJobs.size() > 1000000) {
eos_static_err("%s", "msg=\"forbid conversion as there are more than 1M "
"jobs pending");
return false;
}
if (conversion_info.empty()) {
eos_static_err("msg=\"Invalid conversion_info string for file\" fid=%08llx",
id);
return false;
}
if (!gOFS->mFidTracker.AddEntry(id, TrackerType::Convert)) {
eos_static_debug("msg=\"skip recently scheduled file\" fxid=%08llx", id);
return false;
}
JobInfoT info = std::make_pair(id, conversion_info);
mPendingJobs.push(info);
return mQdbHelper.AddPendingJob(info);
}
//------------------------------------------------------------------------------
// Initialize converter configuration parameters
//------------------------------------------------------------------------------
void
ConverterDriver::InitConfig()
{
unsigned int max_threads = mConfigStore->get(kConverterMaxThreads,
cDefaultMaxThreadPoolSize);
unsigned int max_queue_sz = mConfigStore->get(kConverterMaxQueueSize,
cDefaultMaxQueueSize);
mMaxThreadPoolSize.store(max_threads, std::memory_order_relaxed);
mMaxQueueSize.store(max_queue_sz, std::memory_order_relaxed);
}
//------------------------------------------------------------------------------
// QdbHelper class implementation
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
// Add conversion job to the queue of pending jobs in QuarkDB
//------------------------------------------------------------------------------
bool
ConverterDriver::QdbHelper::AddPendingJob(const JobInfoT& jobinfo)
{
try {
return mQHashPending.hset(std::to_string(jobinfo.first), jobinfo.second);
} catch (const std::exception& e) {
eos_static_crit("msg=\"Error encountered while trying to add pending "
"conversion job\" emsg=\"%s\" conversion_id=%s",
e.what(), jobinfo.second.c_str());
}
return false;
}
//------------------------------------------------------------------------------
// Add conversion job to the queue of failed jobs in QuarkDB
//------------------------------------------------------------------------------
bool
ConverterDriver::QdbHelper::AddFailedJob(
const std::shared_ptr& job)
{
try {
return mQHashFailed.hset(job->GetConversionString(), job->GetErrorMsg());
} catch (const std::exception& e) {
eos_static_crit("msg=\"Error encountered while trying to add failed "
"conversion job\" emsg=\"%s\" conversion_id=%s",
e.what(), job->GetConversionString().c_str());
}
return false;
}
//------------------------------------------------------------------------------
// Get list of pending jobs
//------------------------------------------------------------------------------
std::vector
ConverterDriver::QdbHelper::GetPendingJobs()
{
std::vector pending;
for (auto it = mQHashPending.getIterator(cBatchSize, "0");
it.valid(); it.next()) {
try {
pending.emplace_back(std::stoull(it.getKey()), it.getValue());
} catch (...) {}
}
return pending;
}
//------------------------------------------------------------------------------
// Get list of failed jobs
//------------------------------------------------------------------------------
std::vector
ConverterDriver::QdbHelper::GetFailedJobs()
{
std::vector failed;
for (auto it = mQHashFailed.getIterator(cBatchSize, "0");
it.valid(); it.next()) {
failed.emplace_back(it.getKey(), it.getValue());
}
return failed;
}
//------------------------------------------------------------------------------
// Remove conversion job by id from the pending jobs queue in QuarkDB
//------------------------------------------------------------------------------
bool
ConverterDriver::QdbHelper::RemovePendingJob(const eos::IFileMD::id_t& id)
{
try {
return mQHashPending.hdel(std::to_string(id));
} catch (const std::exception& e) {
eos_static_crit("msg=\"Error encountered while trying to delete "
"pending conversion job\" emsg=\"%s\"", e.what());
}
return false;
}
//--------------------------------------------------------------------------
// Returns the number of failed jobs or -1 in case of failed operation
//--------------------------------------------------------------------------
int64_t
ConverterDriver::QdbHelper::NumFailedJobs()
{
try {
return mQHashFailed.hlen();
} catch (const std::exception& e) {
eos_static_crit("msg=\"Error encountered while retrieving size of "
"failed conversion jobs set\" emsg=\"%s\"", e.what());
}
return -1;
}
//------------------------------------------------------------------------------
// Clear list of pending jobs
//------------------------------------------------------------------------------
void
ConverterDriver::QdbHelper::ClearPendingJobs()
{
try {
(void) mQcl->del(kConversionPendingHashKey);
} catch (const std::exception& e) {
eos_static_crit("msg=\"Error encountered while clearing the list of "
"pending jobs\" emsg=\"%s\"", e.what());
}
}
//------------------------------------------------------------------------------
// Clear list of failed jobs
//------------------------------------------------------------------------------
void
ConverterDriver::QdbHelper::ClearFailedJobs()
{
try {
(void) mQcl->del(kConversionFailedHashKey);
} catch (const std::exception& e) {
eos_static_crit("msg=\"Error encountered while clearing the list of "
"failed jobs\" emsg=\"%s\"", e.what());
}
}
EOSMGMNAMESPACE_END