//------------------------------------------------------------------------------ //! @file ResponseCollector.cc //! @author Elvin-Alin Sindrilaru - CERN //------------------------------------------------------------------------------ /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2021 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "fst/io/xrd/ResponseCollector.hh" EOSFSTNAMESPACE_BEGIN //------------------------------------------------------------------------------ // Collect future object //------------------------------------------------------------------------------ void ResponseCollector::CollectFuture(std::future fut) { std::unique_lock lock(mMutex); mResponses.push_back(std::move(fut)); } //------------------------------------------------------------------------------ // Check the status of the responses //------------------------------------------------------------------------------ bool ResponseCollector::CheckResponses(bool wait_all, uint32_t max_pending) { bool ok = true; // If more then max_pending responses in-flight then wait for at least // half of them. bool wait_partial = false; std::unique_lock lock(mMutex); if (mResponses.size() > max_pending) { wait_partial = true; } while (!mResponses.empty()) { auto& fut = mResponses.front(); if (!fut.valid()) { ok = false; mResponses.pop_front(); continue; } if (wait_all) { fut.wait(); } else { if (wait_partial) { if (mResponses.size() > (max_pending >> 1)) { fut.wait(); } else { break; } } else { // If current response is available then retrieve it if (fut.wait_for(std::chrono::seconds(0)) != std::future_status::ready) { break; } } } XrdCl::XRootDStatus status = fut.get(); if (!status.IsOK()) { ok = false; } mResponses.pop_front(); } return ok; } //------------------------------------------------------------------------------ // Get number of registered responses //------------------------------------------------------------------------------ uint32_t ResponseCollector::GetNumResponses() const { std::unique_lock lock(mMutex); return mResponses.size(); } EOSFSTNAMESPACE_END