/************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2017 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "namespace/ns_quarkdb/accounting/ContainerAccounting.hh" #include #include EOSNSNAMESPACE_BEGIN //---------------------------------------------------------------------------- // Constructor //---------------------------------------------------------------------------- QuarkContainerAccounting::QuarkContainerAccounting(IContainerMDSvc* svc, eos::common::RWMutex* ns_mutex, int32_t update_interval) : mAccumulateIndx(0), mCommitIndx(1), mShutdown(false), mUpdateIntervalSec(update_interval), mContainerMDSvc(svc), gNsRwMutex(ns_mutex) { mBatch.resize(2); // If update interval is 0 then we disable async updates if (mUpdateIntervalSec) { mThread.reset(&QuarkContainerAccounting::AssistedPropagateUpdates, this); } } //---------------------------------------------------------------------------- // Constructor //---------------------------------------------------------------------------- QuarkContainerAccounting::~QuarkContainerAccounting() { mShutdown = true; if (mUpdateIntervalSec) { mThread.join(); } } //---------------------------------------------------------------------------- // Notifications about changes in the main view //---------------------------------------------------------------------------- void QuarkContainerAccounting::fileMDChanged(IFileMDChangeListener::Event* e) { switch (e->action) { // We are only interested in SizeChange events case IFileMDChangeListener::SizeChange: if (e->file->getContainerId() == 0) { // NOTE: This is an ugly hack. The file object has not reference to the // container id, therefore we hijack the "location" member of the Event // class to pass in the container id. QueueForUpdate(e->location, e->sizeChange); } else { QueueForUpdate(e->file->getContainerId(), e->sizeChange); } break; default: break; } } //------------------------------------------------------------------------------ // Add tree //------------------------------------------------------------------------------ void QuarkContainerAccounting::AddTree(IContainerMD* obj, int64_t dsize) { QueueForUpdate(obj->getId(), dsize); } //------------------------------------------------------------------------------- // Remove tree //------------------------------------------------------------------------------- void QuarkContainerAccounting::RemoveTree(IContainerMD* obj, int64_t dsize) { QueueForUpdate(obj->getId(), -dsize); } //------------------------------------------------------------------------------ // Queue file info for update //------------------------------------------------------------------------------ void QuarkContainerAccounting::QueueForUpdate(IContainerMD::id_t id, int64_t dsize) { uint16_t deepness = 0; std::shared_ptr cont; std::vector idsToUpdate; while ((id > 1) && (deepness < 255)) { try { cont = mContainerMDSvc->getContainerMD(id); } catch (const MDException& e) { // TODO (esindril): error message using default logging break; } idsToUpdate.push_back(id); id = cont->getParentId(); ++deepness; } std::lock_guard scope_lock(mMutexBatch); auto& batch = mBatch[mAccumulateIndx]; for(auto idToUpdate: idsToUpdate) { auto it_map = batch.mMap.find(idToUpdate); if (it_map != batch.mMap.end()) { it_map->second += dsize; } else { batch.mMap.emplace(idToUpdate, dsize); } } } //------------------------------------------------------------------------------ // Propagate updates in the hierarchical structure. Method ran by an // asynchronous thread. //------------------------------------------------------------------------------ void QuarkContainerAccounting::AssistedPropagateUpdates(ThreadAssistant& assistant) noexcept { PropagateUpdates(&assistant); } //------------------------------------------------------------------------------ // Propagate updates in the hierarchical structure //------------------------------------------------------------------------------ void QuarkContainerAccounting::PropagateUpdates(ThreadAssistant* assistant) { while ((assistant && !assistant->terminationRequested()) || (!assistant)) { if (mShutdown) { break; } { // Update the indexes to have the async thread working on the batch to // commit and the incoming updates to go to the batch to update std::lock_guard scope_lock(mMutexBatch); std::swap(mAccumulateIndx, mCommitIndx); } auto& batch = mBatch[mCommitIndx]; if(!batch.mMap.empty()){ std::shared_ptr cont; for (auto const& elem : batch.mMap) { try { cont = mContainerMDSvc->getContainerMD(elem.first); eos::IContainerMD::IContainerMDWriteLocker locker(cont); cont->updateTreeSize(elem.second); mContainerMDSvc->updateStore(cont.get()); } catch (const MDException& e) { // TODO: (esindril) error message using default logging continue; } } } batch.mMap.clear(); if (mUpdateIntervalSec) { if (assistant) { assistant->wait_for(std::chrono::seconds(mUpdateIntervalSec)); } else { std::this_thread::sleep_for(std::chrono::seconds(mUpdateIntervalSec)); } } else { break; } } } EOSNSNAMESPACE_END