/* * @project The CERN Tape Archive (CTA) * @copyright Copyright © 2021-2022 CERN * @license This program is free software, distributed under the terms of the GNU General Public * Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". You can * redistribute it and/or modify it under the terms of the GPL Version 3, or (at your * option) any later version. * * This program is distributed in the hope that it will be useful, but WITHOUT ANY * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A * PARTICULAR PURPOSE. See the GNU General Public License for more details. * * In applying this licence, CERN does not waive the privileges and immunities * granted to it by virtue of its status as an Intergovernmental Organization or * submit itself to any jurisdiction. */ #include #include #include #include #include "Agent.hpp" #include "AgentReference.hpp" #include "common/exception/Errnum.hpp" #include "common/utils/utils.hpp" namespace cta::objectstore { std::atomic AgentReference::g_nextAgentId(0); AgentReference::AgentReference(const std::string & clientType, log::Logger &logger) : m_nextId(0), m_logger(logger) { std::stringstream aid; // Get time time_t now = time(nullptr); struct tm localNow; localtime_r(&now, &localNow); // Get hostname char host[200]; cta::exception::Errnum::throwOnMinusOne(::gethostname(host, sizeof(host)), "In AgentId::AgentId: failed to gethostname"); // gettid is a safe system call (never fails) uint64_t id=g_nextAgentId++; aid << clientType << "-" << host << "-" << syscall(SYS_gettid) << "-" << 1900 + localNow.tm_year << std::setfill('0') << std::setw(2) << 1 + localNow.tm_mon << std::setw(2) << localNow.tm_mday << "-" << std::setw(2) << localNow.tm_hour << ":" << std::setw(2) << localNow.tm_min << ":" << std::setw(2) << localNow.tm_sec << "-" << id; m_agentAddress = aid.str(); // Initialize the serialization token for queued actions (lock will make helgrind // happy, but not really needed threading::MutexLocker ml(m_currentQueueMutex); m_nextQueueExecutionPromise.reset(new std::promise); m_nextQueueExecutionFuture = m_nextQueueExecutionPromise->get_future(); m_nextQueueExecutionPromise->set_value(); } std::string AgentReference::getAgentAddress() { return m_agentAddress; } std::string AgentReference::nextId(const std::string& childType) { std::stringstream id; id << childType << "-" << m_agentAddress << "-" << m_nextId++; return id.str(); } void AgentReference::addToOwnership(const std::string& objectAddress, objectstore::Backend& backend) { std::shared_ptr a (new Action(AgentOperation::Add, objectAddress, std::list())); queueAndExecuteAction(a, backend); } void AgentReference::addBatchToOwnership(const std::list& objectAdresses, objectstore::Backend& backend) { std::shared_ptr a (new Action(AgentOperation::AddBatch, "", objectAdresses)); queueAndExecuteAction(a, backend); } void AgentReference::removeFromOwnership(const std::string& objectAddress, objectstore::Backend& backend) { std::shared_ptr a (new Action(AgentOperation::Remove, objectAddress, std::list())); queueAndExecuteAction(a, backend); } void AgentReference::removeBatchFromOwnership(const std::list& objectAdresses, objectstore::Backend& backend) { std::shared_ptr a (new Action(AgentOperation::RemoveBatch, "", objectAdresses)); queueAndExecuteAction(a, backend); } void AgentReference::bumpHeatbeat(objectstore::Backend& backend) { std::shared_ptr a (new Action(AgentOperation::Heartbeat, "", std::list())); queueAndExecuteAction(a, backend); } void AgentReference::queueAndExecuteAction(std::shared_ptr action, objectstore::Backend& backend) { // First, we need to determine if a queue exists or not. // If so, we just use it, and if not, we create and serve it. threading::MutexLocker ulGlobal(m_currentQueueMutex); if (m_currentQueue) { // There is already a queue threading::MutexLocker ulQueue(m_currentQueue->mutex); m_currentQueue->queue.push_back(action); // Get hold of the future before the promise gets a chance to be accessed auto actionFuture=action->promise.get_future(); // Release the locks and wait for action execution ulQueue.unlock(); ulGlobal.unlock(); actionFuture.get(); } else { // There is no queue, so we need to create and serve it ourselves. // To make sure there is no lifetime issues, we make it a shared_ptr std::shared_ptr q(new ActionQueue); // Lock the queue threading::MutexLocker ulq(q->mutex); // Get it referenced m_currentQueue = q; // Get our execution promise and future and leave one behind. std::shared_ptr> promiseForThisQueue = m_nextQueueExecutionPromise; auto futureForThisQueue = std::move(m_nextQueueExecutionFuture); // Leave a promise behind for the next queue, and set the future. m_nextQueueExecutionPromise.reset(new std::promise); m_nextQueueExecutionFuture=m_nextQueueExecutionPromise->get_future(); // Keep a pointer to it, so we will signal our own completion to our successor queue. std::shared_ptr> promiseForNextQueue = m_nextQueueExecutionPromise; // We can now unlock the queue and the general lock: queuing is open. ulq.unlock(); ulGlobal.unlock(); // Wait for previous queue to complete so we will not contend with other threads while // updating the object store. futureForThisQueue.get(); // Make sure we are not listed anymore as the queue taking jobs. // We should still be the listed queue ulGlobal.lock(); if (m_currentQueue != q) { throw cta::exception::Exception("In AgentReference::queueAndExecuteAction(): our queue is not the listed one as expected."); } m_currentQueue.reset(); ulGlobal.unlock(); // Make sure no leftover thread is still writing to the queue. ulq.lock(); // Off we go! Add the actions to the queue try { objectstore::Agent ag(m_agentAddress, backend); log::LogContext lc(m_logger); log::ScopedParamContainer params(lc); params.add("agentObject", m_agentAddress); utils::Timer t; objectstore::ScopedExclusiveLock agl(ag); ag.fetch(); if (ag.isBeingGarbageCollected()) { log::ScopedParamContainer params(lc); params.add("agentObject", ag.getAddressIfSet()); lc.log(log::CRIT, "In AgentReference::queueAndExecuteAction(): agent object being garbage collected. Exiting (segfault)."); cta::utils::segfault(); ::exit(EXIT_FAILURE); } bool ownershipModification = false; // First, determine if any action is an ownership modification if (m_ownerShipModifyingOperations.count(action->op)) ownershipModification = true; if (!ownershipModification) { for (auto &a: q->queue) { if (m_ownerShipModifyingOperations.count(a->op)) { ownershipModification = true; break; } } } std::set ownershipSet; // If necessary, we will dump the ownership list into a set, manipulate it in memory, // and then recreate it. if (ownershipModification) ownershipSet = ag.getOwnershipSet(); // First we apply our own modification appyAction(*action, ag, ownershipSet, lc); // Then those of other threads for (auto a: q->queue) { threading::MutexLocker ml(a->mutex); appyAction(*a, ag, ownershipSet, lc); } // Record the new ownership if needed, and commit if (ownershipModification) ag.resetOwnership(ownershipSet); ag.commit(); } catch (...) { // Something wend wrong: , we release the next batch of changes promiseForNextQueue->set_value(); // We now pass the exception to all threads for (auto a: q->queue) { threading::MutexLocker ml(a->mutex); a->promise.set_exception(std::current_exception()); } // And to our own caller throw; } // Things went well. We pass the token to the next queue promiseForNextQueue->set_value(); // and release the other threads for (auto a: q->queue) { threading::MutexLocker ml(a->mutex); a->promise.set_value(); } } } void AgentReference::appyAction(Action& action, objectstore::Agent& agent, std::set & ownershipSet, log::LogContext &lc) { switch (action.op) { case AgentOperation::Add: { ownershipSet.insert(action.objectAddress); log::ScopedParamContainer params(lc); params.add("ownedObject", action.objectAddress); lc.log(log::DEBUG, "In AgentReference::appyAction(): added object to ownership."); break; } case AgentOperation::AddBatch: { for (const auto & oa: action.objectAddressSet) { ownershipSet.insert(oa); log::ScopedParamContainer params(lc); params.add("ownedObject", oa); lc.log(log::DEBUG, "In AgentReference::appyAction(): added object to ownership (by batch)."); } break; } case AgentOperation::Remove: { ownershipSet.erase(action.objectAddress); log::ScopedParamContainer params(lc); params.add("ownedObject", action.objectAddress); lc.log(log::DEBUG, "In AgentReference::appyAction(): removed object from ownership."); break; } case AgentOperation::RemoveBatch: { for (const auto & oa: action.objectAddressSet) { ownershipSet.erase(oa); log::ScopedParamContainer params(lc); params.add("ownedObject", oa); lc.log(log::DEBUG, "In AgentReference::appyAction(): removed object from ownership (by batch)."); } break; } case AgentOperation::Heartbeat: agent.bumpHeartbeat(); break; default: throw cta::exception::Exception("In AgentReference::appyAction(): unknown operation."); } } } // namespace cta::objectstore