/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2017 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "namespace/ns_quarkdb/accounting/SyncTimeAccounting.hh"
#include
#include
EOSNSNAMESPACE_BEGIN
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
QuarkSyncTimeAccounting::QuarkSyncTimeAccounting(IContainerMDSvc* svc,
uint32_t update_interval):
mAccumulateIndx(0), mCommitIndx(1), mShutdown(false),
mUpdateIntervalSec(update_interval), mContainerMDSvc(svc),
mNamespaceStats(nullptr)
{
mBatch.resize(2);
// Enable updates if update interval is not 0
if (mUpdateIntervalSec) {
mThread.reset(&QuarkSyncTimeAccounting::AssistedPropagateUpdates, this);
}
}
//------------------------------------------------------------------------------
// Destructor
//------------------------------------------------------------------------------
QuarkSyncTimeAccounting::~QuarkSyncTimeAccounting()
{
mShutdown = true;
if (mUpdateIntervalSec) {
mThread.join();
}
}
//------------------------------------------------------------------------------
// Notify the me about the changes in the main view
//------------------------------------------------------------------------------
void
QuarkSyncTimeAccounting::containerMDChanged(IContainerMD* obj, Action type)
{
switch (type) {
case IContainerMDChangeListener::MTimeChange:
QueueForUpdate(obj->getId());
break;
default:
break;
}
}
//------------------------------------------------------------------------------
// Queue container object for update
//------------------------------------------------------------------------------
void
QuarkSyncTimeAccounting::QueueForUpdate(IContainerMD::id_t id)
{
std::lock_guard scope_lock(mMutexBatch);
auto& batch = mBatch[mAccumulateIndx];
auto it_map = batch.mMap.find(id);
if (it_map != batch.mMap.end()) {
auto& it_lst = it_map->second;
// Move it from current location to the end of the list (most recent)
batch.mLstUpd.splice(batch.mLstUpd.end(), batch.mLstUpd, it_lst);
} else {
auto it_new = batch.mLstUpd.emplace(batch.mLstUpd.end(), id);
batch.mMap[id] = it_new;
}
}
void QuarkSyncTimeAccounting::setNamespaceStats(INamespaceStats* namespaceStats) {
mNamespaceStats = namespaceStats;
}
//------------------------------------------------------------------------------
// Propagate updates in the hierarchical structure. Method ran by an
// asynchronous thread.
//------------------------------------------------------------------------------
void
QuarkSyncTimeAccounting::AssistedPropagateUpdates(ThreadAssistant& assistant)
noexcept
{
PropagateUpdates(&assistant);
}
//------------------------------------------------------------------------------
// Propagate the sync time
//------------------------------------------------------------------------------
void
QuarkSyncTimeAccounting::PropagateUpdates(ThreadAssistant* assistant)
{
while ((assistant && !assistant->terminationRequested()) || (!assistant)) {
if (mShutdown) {
break;
}
{
// Update the indexes to have the async thread working on the batch to
// commit and the incoming updates to go to the batch to update
std::lock_guard scope_lock(mMutexBatch);
std::swap(mAccumulateIndx, mCommitIndx);
}
uint16_t deepness = 0;
IContainerMD::id_t id = 0;
std::set upd_nodes;
auto& lst = mBatch[mCommitIndx].mLstUpd;
// Start updating form the last node (most recent) and also collect the
// nodes that we've updated so that older updates don't propagate further
// up than strictly necessary.
struct timeval start;
struct timeval stop;
struct timezone tz;
gettimeofday(&start, &tz);
for (auto it_id = lst.rbegin(); it_id != lst.rend(); ++it_id) {
deepness = 0;
id = *it_id;
if (id == 0u) {
continue;
}
eos_debug("Container_id=%lu sync time", id);
IContainerMD::ctime_t mtime {0};
while ((id > 1) && (deepness < 255)) {
std::shared_ptr cont;
// If node is already in the set of updates then don't bother
// propagating this update
if (upd_nodes.count(id)) {
break;
}
try {
cont = mContainerMDSvc->getContainerMD(id);
eos::IContainerMD::IContainerMDWriteLocker locker(cont);
// Only traverse if there there is an attribute saying so
if (!cont->hasAttribute("sys.mtime.propagation")) {
break;
}
// If there was a temporary ETAG this has not to be removed
if (cont->hasAttribute("sys.tmp.etag")) {
cont->removeAttribute("sys.tmp.etag");
}
if (deepness == 0u) {
cont->getMTime(mtime);
}
if (!cont->setTMTime(mtime) && deepness) {
break;
}
(void) upd_nodes.insert(id);
mContainerMDSvc->updateStore(cont.get());
} catch (MDException& e) {
cont = nullptr;
break;
}
id = cont->getParentId();
++deepness;
}
}
gettimeofday(&stop, &tz); \
double execTime = ((stop.tv_sec-start.tv_sec)*1000.0) + ((stop.tv_usec-start.tv_usec)/1000.0);
if(mNamespaceStats != nullptr){
mNamespaceStats->Add("QuarkSyncTimeAccounting",0,0,lst.size());
mNamespaceStats->AddExec("QuarkSyncTimeAccounting",execTime);
}
// Clean up the batch
mBatch[mCommitIndx].Clean();
if (mUpdateIntervalSec) {
if (assistant) {
assistant->wait_for(std::chrono::seconds(mUpdateIntervalSec));
} else {
std::this_thread::sleep_for(std::chrono::seconds(mUpdateIntervalSec));
}
} else {
break;
}
}
}
EOSNSNAMESPACE_END