//------------------------------------------------------------------------------ // File: ResponseCollectorTests.hh // Author: Elvin Sindrilaru - CERN //------------------------------------------------------------------------------ /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2021 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "gtest/gtest.h" #include "fst/io/xrd/ResponseCollector.hh" #include //------------------------------------------------------------------------------ // Test successful run //------------------------------------------------------------------------------ TEST(ResponseCollector, SuccessfulRun) { eos::fst::ResponseCollector collector; std::list> lst_promises; for (int i = 0; i < 100; ++i) { auto& promise = lst_promises.emplace_back(); collector.CollectFuture(promise.get_future()); } for (auto& promise : lst_promises) { promise.set_value(XrdCl::XRootDStatus()); } ASSERT_TRUE(collector.CheckResponses(true)); } //------------------------------------------------------------------------------ // Test run with failures //------------------------------------------------------------------------------ TEST(ResponseCollector, FailedRun) { int count = 0; eos::fst::ResponseCollector collector; std::list> lst_promises; for (int i = 0; i < 100; ++i) { auto& promise = lst_promises.emplace_back(); collector.CollectFuture(promise.get_future()); } for (auto& promise : lst_promises) { ++count; if (count % 10 == 0) { promise.set_value(XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errUnknown, EINVAL)); } else { promise.set_value(XrdCl::XRootDStatus()); } } ASSERT_FALSE(collector.CheckResponses(true)); } //------------------------------------------------------------------------------ // Test collection of partial successful responses //------------------------------------------------------------------------------ TEST(ResponseCollector, PartialSuccessfulRun) { eos::fst::ResponseCollector collector; uint32_t num_requests = 100; std::list> lst_promises; for (unsigned int i = 0; i < num_requests; ++i) { auto& promise = lst_promises.emplace_back(); collector.CollectFuture(promise.get_future()); } // Set reponses for the first half unsigned int count = 0; for (auto& promise : lst_promises) { ++count; if (count < lst_promises.size() / 2) { promise.set_value(XrdCl::XRootDStatus()); } } std::thread t([&]() { unsigned int c = 0; std::this_thread::sleep_for(std::chrono::milliseconds(500)); for (auto& promise : lst_promises) { ++c; if (c >= lst_promises.size() / 2) { promise.set_value(XrdCl::XRootDStatus()); } } }); // First half should all be successful, no waiting ASSERT_TRUE(collector.CheckResponses(false)); // Second half successful, wait for all ASSERT_TRUE(collector.CheckResponses(true)); t.join(); } //------------------------------------------------------------------------------ // Test collection of partial failed responses //----------------------------------------=-------------------------------------- TEST(ResponseCollector, PartialFailedRun) { eos::fst::ResponseCollector collector; uint32_t num_requests = 100; std::list> lst_promises; for (unsigned int i = 0; i < num_requests; ++i) { auto& promise = lst_promises.emplace_back(); collector.CollectFuture(promise.get_future()); } // Set reponses for the first half unsigned int count = 0; for (auto& promise : lst_promises) { ++count; if (count < lst_promises.size() / 2) { promise.set_value(XrdCl::XRootDStatus()); } } std::thread t([&]() { unsigned int c = 0; std::this_thread::sleep_for(std::chrono::milliseconds(500)); for (auto& promise : lst_promises) { ++c; if (c >= lst_promises.size() / 2) { if (c % 2 == 0) { promise.set_value(XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errUnknown, EINVAL)); } else { promise.set_value(XrdCl::XRootDStatus()); } } } }); // First half should all be successful, no waiting ASSERT_TRUE(collector.CheckResponses(false, num_requests)); // Second half has errors, wait for all ASSERT_FALSE(collector.CheckResponses(true, num_requests)); t.join(); } //------------------------------------------------------------------------------ // Test collection when max_pending applies //----------------------------------------=-------------------------------------- TEST(ResponseCollector, MaxPending) { eos::fst::ResponseCollector collector; uint32_t num_requests = 50; uint32_t max_pending = 10; std::list> lst_promises; for (unsigned int i = 0; i < num_requests; ++i) { auto& promise = lst_promises.emplace_back(); collector.CollectFuture(promise.get_future()); } std::thread t([&]() { unsigned int c = 0; std::this_thread::sleep_for(std::chrono::milliseconds(500)); for (auto& promise : lst_promises) { ++c; if (c <= (num_requests - (max_pending / 2))) { promise.set_value(XrdCl::XRootDStatus()); } else { static bool one_off = true; if (one_off) { std::this_thread::sleep_for(std::chrono::milliseconds(500)); one_off = false; } if (c != num_requests) { promise.set_value(XrdCl::XRootDStatus()); } else { promise.set_value(XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errUnknown, EINVAL)); } } } }); // First batch should all be successful - 45 replies ASSERT_TRUE(collector.CheckResponses(false, max_pending)); ASSERT_EQ(max_pending / 2, collector.GetNumResponses()); // Second batch has an error at the last reply ASSERT_FALSE(collector.CheckResponses(true, max_pending)); t.join(); }