//------------------------------------------------------------------------------
//! @file BulkRequestProcCleaner.cc
//! @author Cedric Caffy - CERN
//------------------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2017 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "BulkRequestProcCleaner.hh"
#include "mgm/IMaster.hh"
#include "mgm/XrdMgmOfs.hh"
#include
#include "mgm/bulk-request/dao/factories/AbstractDAOFactory.hh"
#include "mgm/bulk-request/dao/factories/ProcDirectoryDAOFactory.hh"
#include "mgm/bulk-request/dao/IBulkRequestDAO.hh"
#include "mgm/bulk-request/exception/PersistencyException.hh"
EOSBULKNAMESPACE_BEGIN
BulkRequestProcCleaner::BulkRequestProcCleaner(const
bulk::ProcDirectoryBulkRequestLocations& bulkReqDirectory,
std::unique_ptr config): mBulkRequestLocation(
bulkReqDirectory), mConfig(std::move(config))
{
}
void
BulkRequestProcCleaner::Start()
{
mThread.reset(&BulkRequestProcCleaner::backgroundThread, this);
}
void
BulkRequestProcCleaner::Stop()
{
mThread.join();
}
void
BulkRequestProcCleaner::backgroundThread(ThreadAssistant& assistant)
{
std::ostringstream oss;
oss << "msg=\"starting BulkRequestProcCleaner thread. Directory=" <<
mBulkRequestLocation.getBulkRequestDirectory() << "\"";
eos_static_notice(oss.str().c_str());
gOFS->WaitUntilNamespaceIsBooted(assistant);
// Wait that current MGM becomes a master
do {
eos_static_debug("%s", "msg=\"BulkRequestProcCleaner waiting for master MGM\"");
assistant.wait_for(std::chrono::seconds(10));
} while (!assistant.terminationRequested() && !gOFS->mMaster->IsMaster());
while (!assistant.terminationRequested()) {
// every now and then we wake up
common::IntervalStopwatch stopwatch(mConfig->mInterval);
// Only a master needs to run the cleaner
if (gOFS->mMaster->IsMaster()) {
//Initialize the bulk-request DAO
std::unique_ptr daoFactory(new ProcDirectoryDAOFactory(gOFS,
mBulkRequestLocation));
std::unique_ptr bulkReqDao = daoFactory->getBulkRequestDAO();
try {
uint64_t nbBulkRequestDeleted = bulkReqDao->deleteBulkRequestNotQueriedFor(
BulkRequest::Type::PREPARE_STAGE,
mConfig->mBulkReqLastAccessTimeBeforeCleaning);
eos_static_info("msg=\"BulkRequestProcCleaner did one round of cleaning, nbDeletedBulkRequests=%ld\"",
nbBulkRequestDeleted);
} catch (const PersistencyException& ex) {
eos_static_err("msg=\"BulkRequestProcCleaner an exception occured during a round of cleaning\" exceptionMsg=\"%s\"",
ex.what());
}
}
while (stopwatch.timeRemainingInCycle() >= std::chrono::seconds(5)) {
assistant.wait_for(std::chrono::seconds(5));
if (assistant.terminationRequested()) {
break;
}
}
}
}
BulkRequestProcCleaner::~BulkRequestProcCleaner()
{
Stop();
}
EOSBULKNAMESPACE_END