// ----------------------------------------------------------------------
// File: SchedulerTests.cc
// Author: Abhishek Lekshmanan - CERN
// ----------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2023 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "common/utils/ContainerUtils.hh"
#include "mgm/placement/FlatScheduler.hh"
#include "mgm/placement/PlacementStrategy.hh"
#include "mgm/placement/RoundRobinPlacementStrategy.hh"
#include "mgm/placement/WeightedRandomStrategy.hh"
#include "unit_tests/mgm/placement/ClusterMapFixture.hh"
#include "gtest/gtest.h"
using eos::mgm::placement::item_id_t;
TEST_F(SimpleClusterF, RoundRobinBasic)
{
eos::mgm::placement::RoundRobinPlacement rr_placement(eos::mgm::placement::PlacementStrategyT::kRoundRobin,
256);
auto cluster_data_ptr = mgr.getClusterData();
// TODO: write a higher level function to do recursive descent
// Choose 1 site - from ROOT
auto res = rr_placement.placeFiles(cluster_data_ptr(), {0, 1});
ASSERT_TRUE(res);
EXPECT_EQ(res.n_replicas, 1);
EXPECT_EQ(res.ids[0], -1);
// Choose 1 group from SITE
auto site_id = res.ids[0];
auto group_res = rr_placement.placeFiles(cluster_data_ptr(), {site_id, 1});
ASSERT_TRUE(group_res);
EXPECT_EQ(group_res.n_replicas, 1);
// choose 2 disks from group!
auto disks_res =
rr_placement.placeFiles(cluster_data_ptr(), {group_res.ids[0], 2});
ASSERT_TRUE(disks_res);
EXPECT_EQ(disks_res.n_replicas, 2);
}
TEST_F(SimpleClusterF, RandomBasic)
{
eos::mgm::placement::RoundRobinPlacement rand_placement(eos::mgm::placement::PlacementStrategyT::kRandom,
256);
auto cluster_data_ptr = mgr.getClusterData();
auto res = rand_placement.placeFiles(cluster_data_ptr(), {0, 1});
ASSERT_TRUE(res);
EXPECT_EQ(res.n_replicas, 1);
auto site_id = res.ids[0];
auto group_res = rand_placement.placeFiles(cluster_data_ptr(), {site_id, 1});
ASSERT_TRUE(group_res);
EXPECT_EQ(group_res.n_replicas, 1);
auto disks_res =
rand_placement.placeFiles(cluster_data_ptr(), {group_res.ids[0], 2});
ASSERT_TRUE(disks_res);
std::cout << disks_res << "\n";
EXPECT_EQ(disks_res.n_replicas, 2);
}
TEST_F(SimpleClusterF, TLRoundRobinBasic)
{
eos::mgm::placement::RoundRobinPlacement rr_placement(eos::mgm::placement::PlacementStrategyT::kThreadLocalRoundRobin,
256);
auto cluster_data_ptr = mgr.getClusterData();
// TODO: write a higher level function to do recursive descent
// Choose 1 site - from ROOT
auto res = rr_placement.placeFiles(cluster_data_ptr(), {0, 1});
ASSERT_TRUE(res);
EXPECT_EQ(res.n_replicas, 1);
// We cannot assert on the id here because the thread local round robin would
// have a random starting point, only the looping behaviour is easier to reason
// Choose 1 group from SITE
auto site_id = res.ids[0];
auto group_res = rr_placement.placeFiles(cluster_data_ptr(), {site_id, 1});
ASSERT_TRUE(group_res);
EXPECT_EQ(group_res.n_replicas, 1);
// choose 2 disks from group!
auto disks_res =
rr_placement.placeFiles(cluster_data_ptr(), {group_res.ids[0], 2});
ASSERT_TRUE(disks_res);
EXPECT_EQ(disks_res.n_replicas, 2);
}
TEST_F(SimpleClusterF, RoundRobinBasicLoop)
{
eos::mgm::placement::RoundRobinPlacement rr_placement(eos::mgm::placement::PlacementStrategyT::kRoundRobin,
256);
auto cluster_data_ptr = mgr.getClusterData();
std::map site_id_ctr;
std::map group_id_ctr;
std::map disk_id_ctr;
std::vector disk_ids_vec;
// TODO: write a higher level function to do recursive descent
// Choose 1 site - from ROOT
// Loop over 30 times, which is the total size of the disks to ensure that all
// elements are chosen
for (int i = 0; i < 30; i++)
{
auto res = rr_placement.placeFiles(cluster_data_ptr(), {0, 1});
ASSERT_TRUE(res);
ASSERT_EQ(res.n_replicas, 1);
site_id_ctr[res.ids[0]]++;
// Choose 1 group from SITE
auto site_id = res.ids[0];
auto group_res = rr_placement.placeFiles(cluster_data_ptr(), {site_id, 1});
ASSERT_TRUE(group_res);
ASSERT_EQ(group_res.n_replicas, 1);
group_id_ctr[group_res.ids[0]]++;
// choose 2 disks from group!
auto disks_res =
rr_placement.placeFiles(cluster_data_ptr(), {group_res.ids[0], 2});
ASSERT_TRUE(disks_res);
ASSERT_EQ(disks_res.n_replicas, 2);
disk_id_ctr[disks_res.ids[0]]++;
disk_id_ctr[disks_res.ids[1]]++;
disk_ids_vec.push_back(disks_res.ids[0]);
disk_ids_vec.push_back(disks_res.ids[1]);
}
// SITE1 gets 15 requests, SITE2 gets 15 requests;
ASSERT_EQ(site_id_ctr[-1], 15);
ASSERT_EQ(site_id_ctr[-2], 15);
// 30 items chosen in site1 among 20 disks
// 30 items chosen in site2 among 10 disks
ASSERT_EQ(group_id_ctr[-102], 15);
// This is a bit more involved to reason, actually just a consequence of an
// empty starting cluster, where we'd expect roundrobin to start from the initial
// elements, hence, group1 is chosen first, and thus gets a request extra
// if you do the LCM you'd be able to reach a point where you'd schedule equally
// group1 & group2; group3 would still have 2X requests if you RR over the sites first
EXPECT_EQ(group_id_ctr[-100], 8);
EXPECT_EQ(group_id_ctr[-101], 7);
// All the disks are chosen at least once, due to the non uniform nature here,
// site 2 would have its disks chosen twice as often as site 1
ASSERT_EQ(disk_ids_vec.size(), 60);
ASSERT_EQ(disk_id_ctr.size(), 30);
// Check SITE1 ctr, atleast 1; initial disks would be twice as filled as latter
for (int i=1; i <=20; i++) {
ASSERT_GE(disk_id_ctr[i], 1);
}
// Check SITE2 ctr, all disks would've been scheduled twice, initial disks twice often as the others
for (int i=21; i <=30; i++) {
ASSERT_GE(disk_id_ctr[i],2);
}
}
TEST_F(SimpleClusterF, TLRoundRobinBasicLoop)
{
eos::mgm::placement::RoundRobinPlacement rr_placement(eos::mgm::placement::PlacementStrategyT::kThreadLocalRoundRobin,
256);
auto cluster_data_ptr = mgr.getClusterData();
std::map site_id_ctr;
std::map group_id_ctr;
std::map disk_id_ctr;
std::vector disk_ids_vec;
// TODO: write a higher level function to do recursive descent
// Choose 1 site - from ROOT
// Loop over 30 times, which is the total size of the disks to ensure that all
// elements are chosen
for (int i = 0; i < 30; i++)
{
auto res = rr_placement.placeFiles(cluster_data_ptr(), {0, 1});
ASSERT_TRUE(res);
ASSERT_EQ(res.n_replicas, 1);
site_id_ctr[res.ids[0]]++;
// Choose 1 group from SITE
auto site_id = res.ids[0];
auto group_res = rr_placement.placeFiles(cluster_data_ptr(), {site_id, 1});
ASSERT_TRUE(group_res);
ASSERT_EQ(group_res.n_replicas, 1);
group_id_ctr[group_res.ids[0]]++;
// choose 2 disks from group!
auto disks_res =
rr_placement.placeFiles(cluster_data_ptr(), {group_res.ids[0], 2});
ASSERT_TRUE(disks_res);
ASSERT_EQ(disks_res.n_replicas, 2);
disk_id_ctr[disks_res.ids[0]]++;
disk_id_ctr[disks_res.ids[1]]++;
disk_ids_vec.push_back(disks_res.ids[0]);
disk_ids_vec.push_back(disks_res.ids[1]);
}
// SITE1 gets 15 requests, SITE2 gets 15 requests;
ASSERT_EQ(site_id_ctr[-1], 15);
ASSERT_EQ(site_id_ctr[-2], 15);
// 30 items chosen in site1 among 20 disks
// 30 items chosen in site2 among 10 disks
ASSERT_EQ(group_id_ctr[-102], 15);
// All the disks are chosen at least once, due to the non uniform nature here,
// site 2 would have its disks chosen twice as often as site 1
ASSERT_EQ(disk_ids_vec.size(), 60);
ASSERT_EQ(disk_id_ctr.size(), 30);
// Check SITE1 ctr, atleast 1; initial disks would be twice as filled as latter
for (int i=1; i <=20; i++) {
ASSERT_GE(disk_id_ctr[i], 1);
}
// Check SITE2 ctr, all disks would've been scheduled twice, initial disks twice often as the others
for (int i=21; i <=30; i++) {
ASSERT_GE(disk_id_ctr[i],2);
}
}
TEST_F(SimpleClusterF, FlatSchedulerBasic)
{
using eos::mgm::placement::PlacementStrategyT;
eos::mgm::placement::FlatScheduler flat_scheduler(
eos::mgm::placement::PlacementStrategyT::kRoundRobin,
256);
auto cluster_data_ptr = mgr.getClusterData();
auto result = flat_scheduler.schedule(cluster_data_ptr(),
{2});
eos::mgm::placement::PlacementResult expected_result;
expected_result.ids = {1,2};
expected_result.ret_code = 0;
ASSERT_TRUE(result);
ASSERT_TRUE(result.is_valid_placement(2));
EXPECT_EQ(result, expected_result);
auto result2 = flat_scheduler.schedule(cluster_data_ptr(),
{2});
ASSERT_TRUE(result.is_valid_placement(2));
}
TEST_F(SimpleClusterF, FlatSchedulerBasicLoop)
{
using eos::mgm::placement::PlacementStrategyT;
eos::mgm::placement::FlatScheduler flat_scheduler(
eos::mgm::placement::PlacementStrategyT::kRoundRobin,
256);
auto cluster_data_ptr = mgr.getClusterData();
std::map disk_id_ctr;
std::vector disk_ids_vec;
for (int i=0; i <30; ++i) {
auto result = flat_scheduler.schedule(cluster_data_ptr(),
{2});
ASSERT_TRUE(result);
ASSERT_TRUE(result.is_valid_placement(2));
disk_id_ctr[result.ids[0]]++;
disk_id_ctr[result.ids[1]]++;
disk_ids_vec.push_back(result.ids[0]);
disk_ids_vec.push_back(result.ids[1]);
}
// All the disks are chosen at least once, due to the non uniform nature here,
// site 2 would have its disks chosen twice as often as site 1
ASSERT_EQ(disk_ids_vec.size(), 60);
ASSERT_EQ(disk_id_ctr.size(), 30);
// Check SITE1 ctr, atleast 1; initial disks would be twice as filled as latter
for (int i=1; i <=20; i++) {
ASSERT_GE(disk_id_ctr[i], 1);
}
// Check SITE2 ctr, all disks would've been scheduled twice,
// initial disks twice often as the others
for (int i=21; i <=30; i++) {
ASSERT_GE(disk_id_ctr[i],2);
}
}
TEST_F(SimpleClusterF, TLFlatSchedulerBasicLoop)
{
using eos::mgm::placement::PlacementStrategyT;
eos::mgm::placement::FlatScheduler flat_scheduler(
eos::mgm::placement::PlacementStrategyT::kThreadLocalRoundRobin,
256);
auto cluster_data_ptr = mgr.getClusterData();
std::map disk_id_ctr;
std::vector disk_ids_vec;
for (int i=0; i <30; ++i) {
auto result = flat_scheduler.schedule(cluster_data_ptr(),
{2});
ASSERT_TRUE(result);
ASSERT_TRUE(result.is_valid_placement(2));
disk_id_ctr[result.ids[0]]++;
disk_id_ctr[result.ids[1]]++;
disk_ids_vec.push_back(result.ids[0]);
disk_ids_vec.push_back(result.ids[1]);
}
// All the disks are chosen at least once, due to the non uniform nature here,
// site 2 would have its disks chosen twice as often as site 1
ASSERT_EQ(disk_ids_vec.size(), 60);
ASSERT_EQ(disk_id_ctr.size(), 30);
// Check SITE1 ctr, atleast 1; initial disks would be twice as filled as latter
for (int i=1; i <=20; i++) {
ASSERT_GE(disk_id_ctr[i], 1);
}
// Check SITE2 ctr, all disks would've been scheduled twice,
// initial disks twice often as the others
for (int i=21; i <=30; i++) {
ASSERT_GE(disk_id_ctr[i],2);
}
}
TEST(FlatScheduler, SingleSite)
{
using namespace eos::mgm::placement;
ClusterMgr mgr;
using eos::mgm::placement::PlacementStrategyT;
eos::mgm::placement::FlatScheduler flat_scheduler(PlacementStrategyT::kRoundRobin,
2048);
{
auto sh = mgr.getStorageHandler(1024);
ASSERT_TRUE(sh.addBucket(get_bucket_type(StdBucketType::ROOT), 0));
ASSERT_TRUE(sh.addBucket(get_bucket_type(StdBucketType::SITE), -1, 0));
ASSERT_TRUE(sh.addBucket(get_bucket_type(StdBucketType::GROUP), -100, -1));
ASSERT_TRUE(sh.addDisk(Disk(1, ConfigStatus::kRW, ActiveStatus::kOnline, 1), -100));
ASSERT_TRUE(sh.addDisk(Disk(2, ConfigStatus::kRW, ActiveStatus::kOnline, 1), -100));
ASSERT_TRUE(sh.addDisk(Disk(3, ConfigStatus::kRW, ActiveStatus::kOnline, 1), -100));
ASSERT_TRUE(sh.addDisk(Disk(4, ConfigStatus::kRW, ActiveStatus::kOnline, 1), -100));
ASSERT_TRUE(sh.addDisk(Disk(5, ConfigStatus::kRW, ActiveStatus::kOnline, 1), -100));
}
auto data = mgr.getClusterData();
std::vector disk_ids_vec {-1};
std::vector site_ids_vec {-100};
std::vector group_ids_vec {1,2,3,4,5};
ASSERT_EQ(data->buckets[0].items, disk_ids_vec);
ASSERT_EQ(data->buckets[1].items, site_ids_vec);
ASSERT_EQ(data->buckets[100].items, group_ids_vec);
auto cluster_data_ptr = mgr.getClusterData();
auto result = flat_scheduler.schedule(cluster_data_ptr(),
{2});
std::cout << result.err_msg.value_or("") << ", " << result << std::endl;
ASSERT_TRUE(result);
ASSERT_TRUE(result.is_valid_placement(2));
}
TEST(FlatScheduler, TLSingleSite)
{
using namespace eos::mgm::placement;
ClusterMgr mgr;
using eos::mgm::placement::PlacementStrategyT;
eos::mgm::placement::FlatScheduler flat_scheduler(PlacementStrategyT::kThreadLocalRoundRobin,
2048);
{
auto sh = mgr.getStorageHandler(1024);
ASSERT_TRUE(sh.addBucket(get_bucket_type(StdBucketType::ROOT), 0));
ASSERT_TRUE(sh.addBucket(get_bucket_type(StdBucketType::SITE), -1, 0));
ASSERT_TRUE(sh.addBucket(get_bucket_type(StdBucketType::GROUP), -100, -1));
ASSERT_TRUE(sh.addDisk(Disk(1, ConfigStatus::kRW, ActiveStatus::kOnline, 1), -100));
ASSERT_TRUE(sh.addDisk(Disk(2, ConfigStatus::kRW, ActiveStatus::kOnline, 1), -100));
ASSERT_TRUE(sh.addDisk(Disk(3, ConfigStatus::kRW, ActiveStatus::kOnline, 1), -100));
ASSERT_TRUE(sh.addDisk(Disk(4, ConfigStatus::kRW, ActiveStatus::kOnline, 1), -100));
ASSERT_TRUE(sh.addDisk(Disk(5, ConfigStatus::kRW, ActiveStatus::kOnline, 1), -100));
}
auto data = mgr.getClusterData();
std::vector disk_ids_vec {-1};
std::vector site_ids_vec {-100};
std::vector group_ids_vec {1,2,3,4,5};
ASSERT_EQ(data->buckets[0].items, disk_ids_vec);
ASSERT_EQ(data->buckets[1].items, site_ids_vec);
ASSERT_EQ(data->buckets[100].items, group_ids_vec);
auto cluster_data_ptr = mgr.getClusterData();
auto result = flat_scheduler.schedule(cluster_data_ptr(),
{2});
std::cout << result.err_msg.value_or("") << std::endl;
ASSERT_TRUE(result);
ASSERT_TRUE(result.is_valid_placement(2));
}
TEST(FlatScheduler, TLSingleSiteWeighted)
{
using namespace eos::mgm::placement;
ClusterMgr mgr;
using eos::mgm::placement::PlacementStrategyT;
eos::mgm::placement::FlatScheduler flat_scheduler(PlacementStrategyT::kWeightedRandom,
2048);
{
auto sh = mgr.getStorageHandler(1024);
ASSERT_TRUE(sh.addBucket(get_bucket_type(StdBucketType::ROOT), 0));
ASSERT_TRUE(sh.addBucket(get_bucket_type(StdBucketType::SITE), -1, 0));
ASSERT_TRUE(sh.addBucket(get_bucket_type(StdBucketType::GROUP), -100, -1));
ASSERT_TRUE(sh.addDisk(Disk(1, ConfigStatus::kRW, ActiveStatus::kOnline, 1), -100));
ASSERT_TRUE(sh.addDisk(Disk(2, ConfigStatus::kRW, ActiveStatus::kOnline, 1), -100));
ASSERT_TRUE(sh.addDisk(Disk(3, ConfigStatus::kRW, ActiveStatus::kOnline, 1), -100));
ASSERT_TRUE(sh.addDisk(Disk(4, ConfigStatus::kRW, ActiveStatus::kOnline, 1), -100));
ASSERT_TRUE(sh.addDisk(Disk(5, ConfigStatus::kRW, ActiveStatus::kOnline, 1), -100));
}
auto data = mgr.getClusterData();
std::vector disk_ids_vec {-1};
std::vector site_ids_vec {-100};
std::vector group_ids_vec {1,2,3,4,5};
ASSERT_EQ(data->buckets[0].items, disk_ids_vec);
ASSERT_EQ(data->buckets[1].items, site_ids_vec);
ASSERT_EQ(data->buckets[100].items, group_ids_vec);
auto cluster_data_ptr = mgr.getClusterData();
auto result = flat_scheduler.schedule(cluster_data_ptr(),
{2});
std::cout << result.err_msg.value_or("") << std::endl;
ASSERT_TRUE(result);
ASSERT_TRUE(result.is_valid_placement(2));
}
TEST(FlatScheduler, TLNoSite)
{
using namespace eos::mgm::placement;
eos::mgm::placement::ClusterMgr mgr;
int n_elements = 1024;
int n_disks_per_group = 16;
int n_groups = 32;
eos::mgm::placement::FlatScheduler flat_scheduler(PlacementStrategyT::kThreadLocalRoundRobin,
2048);
{
auto sh = mgr.getStorageHandler(n_elements);
ASSERT_TRUE(sh.addBucket(get_bucket_type(StdBucketType::ROOT), 0));
for (int i=0; i< n_groups; ++i) {
ASSERT_TRUE(sh.addBucket(get_bucket_type(StdBucketType::GROUP), -100-i, 0));
}
for (int i=0; i < n_groups*n_disks_per_group; i++) {
ASSERT_TRUE(sh.addDisk(Disk(i+1, ConfigStatus::kRW, ActiveStatus::kOnline, 1),
-100 - i/n_disks_per_group));
}
}
auto cluster_data = mgr.getClusterData();
EXPECT_EQ(cluster_data->disks.size(), 32*16);
EXPECT_EQ(cluster_data->buckets.size(), n_elements);
auto root_bucket = cluster_data->buckets[0];
EXPECT_EQ(root_bucket.items.size(), n_groups);
for (auto it: root_bucket.items) {
EXPECT_EQ(cluster_data->buckets.at(-it).items.size(), n_disks_per_group);
}
for (int i = 0; i < 1000; i++) {
auto result = flat_scheduler.schedule(cluster_data(), {2});
ASSERT_TRUE(result);
ASSERT_TRUE(result.is_valid_placement(2));
}
}
TEST(FlatScheduler, TLNoSiteExcludeFsids)
{
using namespace eos::mgm::placement;
eos::mgm::placement::ClusterMgr mgr;
int n_elements = 1024;
int n_disks_per_group = 16;
int n_groups = 32;
eos::mgm::placement::FlatScheduler flat_scheduler(2048);
{
auto sh = mgr.getStorageHandler(n_elements);
ASSERT_TRUE(sh.addBucket(get_bucket_type(StdBucketType::ROOT), 0));
for (int i=0; i< n_groups; ++i) {
ASSERT_TRUE(sh.addBucket(get_bucket_type(StdBucketType::GROUP), -100-i, 0));
}
for (int i=0; i < n_groups*n_disks_per_group; i++) {
ASSERT_TRUE(sh.addDisk(Disk(i+1, ConfigStatus::kRW, ActiveStatus::kOnline, 1),
-100 - i/n_disks_per_group));
}
}
auto cluster_data = mgr.getClusterData();
EXPECT_EQ(cluster_data->disks.size(), 32*16);
EXPECT_EQ(cluster_data->buckets.size(), n_elements);
auto root_bucket = cluster_data->buckets[0];
EXPECT_EQ(root_bucket.items.size(), n_groups);
for (auto it: root_bucket.items) {
EXPECT_EQ(cluster_data->buckets.at(-it).items.size(), n_disks_per_group);
}
for (auto t: {PlacementStrategyT::kRoundRobin,
PlacementStrategyT::kThreadLocalRoundRobin,
PlacementStrategyT::kWeightedRandom,
PlacementStrategyT::kWeightedRoundRobin}) {
PlacementArguments args{2};
args.excludefs = {1};
args.strategy = t;
for (int i = 0; i < 1000; i++) {
auto result = flat_scheduler.schedule(cluster_data(), args);
if (!result) {
std::cerr << "Iteration " << i << " failed: " << result.err_msg.value_or("") << std::endl;
}
EXPECT_TRUE(result);
EXPECT_TRUE(result.is_valid_placement(2));
EXPECT_NE(result.ids[0], 1);
EXPECT_NE(result.ids[1], 1);
}
}
}
TEST(FlatScheduler, ForcedGroup)
{
using namespace eos::mgm::placement;
using namespace eos::mgm::placement;
eos::mgm::placement::ClusterMgr mgr;
int n_elements = 1024;
int n_disks_per_group = 16;
int n_groups = 32;
eos::mgm::placement::FlatScheduler flat_scheduler(2048);
{
auto sh = mgr.getStorageHandler(n_elements);
ASSERT_TRUE(sh.addBucket(get_bucket_type(StdBucketType::ROOT), 0));
for (int i=0; i< n_groups; ++i) {
ASSERT_TRUE(sh.addBucket(get_bucket_type(StdBucketType::GROUP),
kBaseGroupOffset-i, 0));
}
for (int i=0; i < n_groups*n_disks_per_group; i++) {
ASSERT_TRUE(sh.addDisk(Disk(i+1, ConfigStatus::kRW, ActiveStatus::kOnline, 1),
kBaseGroupOffset - i/n_disks_per_group));
}
}
auto cluster_data = mgr.getClusterData();
for (int i=0; idisks.size(), 32*16);
EXPECT_EQ(cluster_data->buckets.size(), n_elements);
auto root_bucket = cluster_data->buckets[0];
EXPECT_EQ(root_bucket.items.size(), n_groups);
for (auto it: root_bucket.items) {
EXPECT_EQ(cluster_data->buckets.at(-it).items.size(), n_disks_per_group);
}
for (int i = 0; i < 1000; i++) {
auto result = flat_scheduler.schedule(cluster_data(), {2});
ASSERT_TRUE(result);
ASSERT_TRUE(result.is_valid_placement(2));
}
}
TEST(FlatScheduler, TLNoSiteUniformWeightedRR)
{
using namespace eos::mgm::placement;
eos::mgm::placement::ClusterMgr mgr;
int n_elements = 1024;
int n_disks_per_group = 16;
int n_groups = 32;
eos::mgm::placement::FlatScheduler flat_scheduler(PlacementStrategyT::kWeightedRoundRobin,
2048);
{
auto sh = mgr.getStorageHandler(n_elements);
ASSERT_TRUE(sh.addBucket(get_bucket_type(StdBucketType::ROOT), 0));
for (int i=0; i< n_groups; ++i) {
ASSERT_TRUE(sh.addBucket(get_bucket_type(StdBucketType::GROUP), -100-i, 0));
}
for (int i=0; i < n_groups*n_disks_per_group; i++) {
ASSERT_TRUE(sh.addDisk(Disk(i+1, ConfigStatus::kRW, ActiveStatus::kOnline, 1),
-100 - i/n_disks_per_group));
}
}
auto cluster_data = mgr.getClusterData();
EXPECT_EQ(cluster_data->disks.size(), 32*16);
EXPECT_EQ(cluster_data->buckets.size(), n_elements);
auto root_bucket = cluster_data->buckets[0];
EXPECT_EQ(root_bucket.items.size(), n_groups);
for (auto it: root_bucket.items) {
EXPECT_EQ(cluster_data->buckets.at(-it).items.size(), n_disks_per_group);
}
for (int i = 0; i < 1000; i++) {
auto result = flat_scheduler.schedule(cluster_data(), {2});
ASSERT_TRUE(result);
ASSERT_TRUE(result.is_valid_placement(2));
}
}
TEST(FlatScheduler, TLNoSiteWeighted)
{
using namespace eos::mgm::placement;
eos::mgm::placement::ClusterMgr mgr;
int n_elements = 1024;
int n_disks_per_group = 32;
int n_groups = 32;
eos::mgm::placement::FlatScheduler flat_scheduler(PlacementStrategyT::kWeightedRandom,
2048);
std::map disk_wt_map;
std::map disk_wt_count;
{
std::vector weights = {4, 8, 16, 32};
auto sh = mgr.getStorageHandler(n_elements);
ASSERT_TRUE(sh.addBucket(get_bucket_type(StdBucketType::ROOT), 0));
for (int i=0; i< n_groups; ++i) {
ASSERT_TRUE(sh.addBucket(get_bucket_type(StdBucketType::GROUP), -100-i, 0));
}
for (int i=0; i < n_groups*n_disks_per_group; i++) {
auto weight = eos::common::pickIndexRR(weights, i);
disk_wt_map[i+1] = weight;
disk_wt_count[weight]++;
ASSERT_TRUE(sh.addDisk(Disk(i+1, ConfigStatus::kRW, ActiveStatus::kOnline,
weight),
-100 - i/n_disks_per_group));
}
}
auto cluster_data = mgr.getClusterData();
EXPECT_EQ(cluster_data->disks.size(), 32*32);
EXPECT_EQ(cluster_data->buckets.size(), n_elements);
auto root_bucket = cluster_data->buckets[0];
EXPECT_EQ(root_bucket.items.size(), n_groups);
for (auto it: root_bucket.items) {
EXPECT_EQ(cluster_data->buckets.at(-it).items.size(), n_disks_per_group);
}
std::map weight_counter;
for (int i = 0; i < 1024; i++) {
auto result = flat_scheduler.schedule(cluster_data(), {2});
ASSERT_TRUE(result);
ASSERT_TRUE(result.is_valid_placement(2));
weight_counter[disk_wt_map[result.ids[0]]]++;
weight_counter[disk_wt_map[result.ids[1]]]++;
}
ASSERT_TRUE(weight_counter[4] < weight_counter[8]);
ASSERT_TRUE(weight_counter[8] < weight_counter[16]);
ASSERT_TRUE(weight_counter[16] < weight_counter[32]);
std::cout << "Cluster Disk weight count: " << std::endl;
for (const auto &kv: disk_wt_count) {
std::cout << kv.first << " : " << kv.second << std::endl;
}
std::cout << "Scheduling Disk Weight distribution: " << std::endl;
for (const auto &kv: weight_counter) {
std::cout << kv.first << " : " << kv.second << std::endl;
}
}
TEST(FlatScheduler, TLNoSiteWeightedRR)
{
using namespace eos::mgm::placement;
eos::mgm::placement::ClusterMgr mgr;
int n_elements = 1024;
int n_disks_per_group = 32;
int n_groups = 32;
eos::mgm::placement::FlatScheduler flat_scheduler(PlacementStrategyT::kWeightedRoundRobin,
2048);
std::map disk_wt_map;
std::map disk_wt_count;
{
std::vector weights = {4, 8, 16, 32};
auto sh = mgr.getStorageHandler(n_elements);
ASSERT_TRUE(sh.addBucket(get_bucket_type(StdBucketType::ROOT), 0));
for (int i=0; i< n_groups; ++i) {
ASSERT_TRUE(sh.addBucket(get_bucket_type(StdBucketType::GROUP), -100-i, 0));
}
for (int i=0; i < n_groups*n_disks_per_group; i++) {
auto weight = eos::common::pickIndexRR(weights, i);
disk_wt_map[i+1] = weight;
disk_wt_count[weight]++;
ASSERT_TRUE(sh.addDisk(Disk(i+1, ConfigStatus::kRW, ActiveStatus::kOnline,
weight),
-100 - i/n_disks_per_group));
}
}
auto cluster_data = mgr.getClusterData();
EXPECT_EQ(cluster_data->disks.size(), 32*32);
EXPECT_EQ(cluster_data->buckets.size(), n_elements);
auto root_bucket = cluster_data->buckets[0];
EXPECT_EQ(root_bucket.items.size(), n_groups);
for (auto it: root_bucket.items) {
EXPECT_EQ(cluster_data->buckets.at(-it).items.size(), n_disks_per_group);
}
std::map weight_counter;
// with Interleaved Weighted RR, you'd need atleast weight*n_items to show the
// distribution at lower numbers below the full wt of a category, you'd end up
// with uniform ie. for the previous case of 1024 schedulings, you'd end up
// with equal distribution as you've not finished a full round of each weights yet
for (int i = 0; i < 60*256; i++) {
auto result = flat_scheduler.schedule(cluster_data(), {2});
ASSERT_TRUE(result);
ASSERT_TRUE(result.is_valid_placement(2));
weight_counter[disk_wt_map[result.ids[0]]]++;
weight_counter[disk_wt_map[result.ids[1]]]++;
}
ASSERT_TRUE(weight_counter[4] < weight_counter[8]);
ASSERT_TRUE(weight_counter[8] < weight_counter[16]);
ASSERT_TRUE(weight_counter[16] < weight_counter[32]);
std::cout << "Cluster Disk weight count: " << std::endl;
for (const auto &kv: disk_wt_count) {
std::cout << kv.first << " : " << kv.second << std::endl;
}
std::cout << "Scheduling Disk Weight distribution: " << std::endl;
for (const auto &kv: weight_counter) {
std::cout << kv.first << " : " << kv.second << std::endl;
}
// schedule one more for offby1 errors
ASSERT_TRUE(flat_scheduler.schedule(cluster_data(), {2}));
}
TEST(ClusterMap, Concurrency)
{
using namespace eos::mgm::placement;
ClusterMgr mgr;
using eos::mgm::placement::PlacementStrategyT;
eos::mgm::placement::FlatScheduler flat_scheduler(PlacementStrategyT::kRoundRobin,
2048);
{
auto sh = mgr.getStorageHandler(1024);
ASSERT_TRUE(sh.addBucket(get_bucket_type(StdBucketType::ROOT), 0));
ASSERT_TRUE(sh.addBucket(get_bucket_type(StdBucketType::SITE), -1, 0));
ASSERT_TRUE(sh.addBucket(get_bucket_type(StdBucketType::GROUP), -100, -1));
ASSERT_TRUE(sh.addDisk(Disk(1, ConfigStatus::kRW, ActiveStatus::kOnline, 1), -100));
ASSERT_TRUE(sh.addDisk(Disk(2, ConfigStatus::kRW, ActiveStatus::kOnline, 1), -100));
ASSERT_TRUE(sh.addDisk(Disk(3, ConfigStatus::kRW, ActiveStatus::kOnline, 1), -100));
ASSERT_TRUE(sh.addDisk(Disk(4, ConfigStatus::kRW, ActiveStatus::kOnline, 1), -100));
ASSERT_TRUE(sh.addDisk(Disk(5, ConfigStatus::kRW, ActiveStatus::kOnline, 1), -100));
}
auto mgr_ptr = &mgr;
auto add_fn = [mgr_ptr]() {
for (int i=0; i < 10; i++) {
std::cout << "Writer thread: " << std::this_thread::get_id() << " ctr"
<< i << std::endl;
{
auto sh = mgr_ptr->getStorageHandlerWithData();
auto group_id = -101 - i;
std::cout << "Adding group with id=" << group_id << std::endl;
ASSERT_TRUE(sh.addBucket(get_bucket_type(StdBucketType::GROUP),
group_id, -1));
for (int k = 0; k < 10; k++) {
ASSERT_TRUE(sh.addDisk(Disk((i+1)*10 + k + 1, ConfigStatus::kRW, ActiveStatus::kOnline, 1), group_id));
}
}
}
std::cout << "Done with writer at " << std::this_thread::get_id() << std::endl;
};
auto read_fn = [&flat_scheduler, mgr_ptr]() {
for (int i=0; i < 1000; i++) {
auto data = mgr_ptr->getClusterData();
ASSERT_TRUE(data->buckets.size());
ASSERT_TRUE(data->disks.size());
auto result = flat_scheduler.schedule(data(), {2});
ASSERT_TRUE(result);
ASSERT_TRUE(result.is_valid_placement(2));
}
std::cout << "Done with reader at " << std::this_thread::get_id() << std::endl;
};
std::vector reader_threads;
for (int i=0; i<100;i++) {
reader_threads.emplace_back(read_fn);
}
std::vector writer_threads;
for (int i=0; i < 5; i++) {
writer_threads.emplace_back(add_fn);
}
for (auto& t: writer_threads) {
t.join();
}
for (auto& t: reader_threads) {
t.join();
}
}