//------------------------------------------------------------------------------
// File: GroupBalancer.cc
// Author: Joaquim Rocha - CERN
//------------------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2011 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "mgm/GroupBalancer.hh"
#include "mgm/XrdMgmOfs.hh"
#include "mgm/FsView.hh"
#include "mgm/convert/ConverterDriver.hh"
#include "namespace/interface/IFsView.hh"
#include "common/StringConversion.hh"
#include "common/FileId.hh"
#include "common/utils/RandUtils.hh"
#include "mgm/groupbalancer/BalancerEngineFactory.hh"
#include "mgm/groupbalancer/BalancerEngineUtils.hh"
#include "mgm/groupbalancer/GroupsInfoFetcher.hh"
#include "mgm/groupbalancer/ConverterUtils.hh"
#define CACHE_LIFE_TIME 60 // seconds
EOSMGMNAMESPACE_BEGIN
using group_balancer::BalancerEngineT;
using group_balancer::group_size_map;
using group_balancer::eosGroupsInfoFetcher;
//-------------------------------------------------------------------------------
// GroupBalancer constructor
//-------------------------------------------------------------------------------
GroupBalancer::GroupBalancer(const char* spacename)
: mSpaceName(spacename), mLastCheck(0)
{
mEngine.reset(group_balancer::make_balancer_engine(BalancerEngineT::stddev));
mThread.reset(&GroupBalancer::GroupBalance, this);
}
//------------------------------------------------------------------------------
// Stop group balancing thread
//------------------------------------------------------------------------------
void
GroupBalancer::Stop()
{
mThread.join();
}
//------------------------------------------------------------------------------
// Destructor
//------------------------------------------------------------------------------
GroupBalancer::~GroupBalancer()
{
Stop();
mEngine->clear();
}
//------------------------------------------------------------------------------
// Update the list of ongoing transfers
//------------------------------------------------------------------------------
void
GroupBalancer::UpdateTransferList()
{
for (auto it = mTransfers.begin(); it != mTransfers.end();) {
if (!gOFS->mFidTracker.HasEntry(it->first)) {
mTransfers.erase(it++);
} else {
++it;
}
}
eos_static_info("msg=\"group_balancer update transfers\" "
"scheduledtransfers=%d", mTransfers.size());
}
//------------------------------------------------------------------------------
// Creates the conversion file in proc for the file ID, from the given
// sourceGroup, to the targetGroup (and updates the cache structures)
//------------------------------------------------------------------------------
void
GroupBalancer::scheduleTransfer(const FileInfo& file_info,
FsGroup* sourceGroup, FsGroup* targetGroup)
{
if (sourceGroup == nullptr || targetGroup == nullptr) {
return;
}
auto mGroupSizes = mEngine->get_group_sizes();
if ((mGroupSizes.count(sourceGroup->mName) == 0) ||
(mGroupSizes.count(targetGroup->mName) == 0)) {
eos_static_err("msg=\"no src/trg group in map\" src_group=%s trg_group=%s",
sourceGroup->mName.c_str(), targetGroup->mName.c_str());
return;
}
// Proc file name is generated by getFileProcTransferNameAndSize
// doesn't contain (+) so we can append without checking
std::string conv_tag = file_info.filename;
conv_tag += "^groupbalancer^";
conv_tag.erase(0, gOFS->MgmProcConversionPath.length() + 1);
if (gOFS->mConverterDriver->ScheduleJob(file_info.fid, conv_tag)) {
eos_static_info("msg=\"group balancer scheduled job\" file=\"%s\" "
"src_grp=\"%s\" dst_grp=\"%s\"", conv_tag.c_str(),
sourceGroup->mName.c_str(), targetGroup->mName.c_str());
} else {
eos_static_err("msg=\"group balancer could not schedule job\" "
"file=\"%s\" src_grp=\"%s\" dst_grp=\"%s\"",
conv_tag.c_str(), sourceGroup->mName.c_str(),
targetGroup->mName.c_str());
}
mTransfers[file_info.fid] = file_info.filename;
}
//------------------------------------------------------------------------------
// Chooses a random file ID from a random filesystem in the given group
//------------------------------------------------------------------------------
eos::common::FileId::fileid_t
GroupBalancer::chooseFidFromGroup(FsGroup* group)
{
if (group == nullptr) {
return {};
}
int rndIndex;
bool found = false;
uint64_t fsid_size = 0ull;
eos::common::FileSystem::fsid_t fsid = 0;
eos::common::RWMutexReadLock vlock(FsView::gFsView.ViewMutex);
eos::common::RWMutexReadLock lock(gOFS->eosViewRWMutex);
// TODO(gbitzes): Add prefetching, make more efficient.
std::vector validFsIndexes(group->size());
for (size_t i = 0; i < group->size(); i++) {
validFsIndexes[i] = (int) i;
}
eos::mgm::BaseView::const_iterator fs_it;
while (validFsIndexes.size() > 0) {
fs_it = group->begin();
rndIndex = common::getRandom((uint64_t)0, validFsIndexes.size() - 1);
std::advance(fs_it, validFsIndexes[rndIndex]);
fsid = *fs_it;
// Accept only active file systems
FileSystem* target = FsView::gFsView.mIdView.lookupByID(fsid);
if (target && target->GetActiveStatus() == eos::common::ActiveStatus::kOnline) {
fsid_size = gOFS->eosFsView->getNumFilesOnFs(fsid);
if (fsid_size) {
found = true;
break;
}
}
validFsIndexes.erase(validFsIndexes.begin() + rndIndex);
}
// Check if we have any files to transfer
if (!found) {
return {};
}
int attempts = 10;
while (attempts-- > 0) {
eos::IFileMD::id_t randomPick;
if (gOFS->eosFsView->getApproximatelyRandomFileInFs(fsid, randomPick) &&
mTransfers.count(randomPick) == 0) {
return randomPick;
}
}
return {};
}
GroupBalancer::FileInfo
GroupBalancer::chooseFileFromGroup(FsGroup* from_group, FsGroup* to_group,
int attempts)
{
if (from_group == nullptr || to_group == nullptr) {
return {};
}
if (from_group->size() == 0) {
return {};
}
uint64_t filesize;
while (attempts-- > 0) {
auto fid = chooseFidFromGroup(from_group);
if (!fid) {
continue;
}
auto filename = group_balancer::getFileProcTransferNameAndSize(fid,
to_group->mName,
&filesize);
if (filename.empty() ||
(mCfg.mMinFileSize > filesize) ||
(mCfg.mMaxFileSize < filesize)) {
continue;
}
// We've a hit!
return {fid, std::move(filename), filesize};
}
return {};
}
//------------------------------------------------------------------------------
// Print size
//------------------------------------------------------------------------------
static void
printSizes(const group_size_map& group_sizes)
{
for (const auto& it : group_sizes)
eos_static_debug("group=%s average=%.02f", it.first.c_str(),
(double)it.second.filled() * 100.0);
}
//------------------------------------------------------------------------------
// Picks two groups (source and target) randomly and schedule a file ID
// to be transferred
//------------------------------------------------------------------------------
void
GroupBalancer::prepareTransfer()
{
FsGroup* fromGroup, *toGroup;
auto&& [over_it, under_it] = mEngine->pickGroupsforTransfer();
if (over_it.empty() || under_it.empty()) {
eos_static_info("msg=\"engine gave us empty groups skipping\" "
"engine_status=%s",
mEngine->get_status_str(false, true).c_str());
return;
}
{
eos::common::RWMutexReadLock rlock(FsView::gFsView.ViewMutex);
auto from_group_it = FsView::gFsView.mGroupView.find(over_it);
auto to_group_it = FsView::gFsView.mGroupView.find(under_it);
if (from_group_it == FsView::gFsView.mGroupView.end() ||
to_group_it == FsView::gFsView.mGroupView.end()) {
return;
}
fromGroup = from_group_it->second;
toGroup = to_group_it->second;
}
auto file_info = chooseFileFromGroup(fromGroup, toGroup, mCfg.file_attempts);
if (!file_info) {
eos_static_info("msg=\"failed to choose any fid to schedule\" "
"failedgroup=%s", fromGroup->mName.c_str());
return;
}
scheduleTransfer(file_info, fromGroup, toGroup);
}
//------------------------------------------------------------------------------
// Check if the sizes cache should be updated (based on the time passed since
// they were last updated)
//------------------------------------------------------------------------------
bool
GroupBalancer::cacheExpired()
{
time_t currentTime = time(NULL);
if (difftime(currentTime, mLastCheck) > CACHE_LIFE_TIME) {
mLastCheck = currentTime;
return true;
}
return false;
}
//------------------------------------------------------------------------------
// Schedule a pre-defined number of transfers
//------------------------------------------------------------------------------
void
GroupBalancer::prepareTransfers(int nrTransfers)
{
int allowedTransfers = nrTransfers - mTransfers.size();
for (int i = 0; i < allowedTransfers; i++) {
prepareTransfer();
}
if (allowedTransfers > 0) {
printSizes(mEngine->get_group_sizes());
}
}
std::string
GroupBalancer::Status(bool detail, bool monitoring) const
{
eos::common::RWMutexReadLock lock(mEngineMtx);
return mEngine->get_status_str(detail, monitoring);
}
bool
GroupBalancer::is_valid_engine(std::string_view engine_name)
{
return engine_name == "std" || engine_name == "minmax" || engine_name == "freespace";
}
//------------------------------------------------------------------------------
// Appply configuration stored at the space level
//------------------------------------------------------------------------------
bool
GroupBalancer::Configure(FsSpace* const space, GroupBalancer::Config& cfg)
{
cfg.is_enabled = space->GetConfigMember("groupbalancer") == "on";
cfg.is_conv_enabled = space->GetConfigMember("converter") == "on";
if (!cfg.is_enabled || !cfg.is_conv_enabled) {
eos_static_info("msg=\"group balancer or converter not enabled\""
" space=%s balancer_status=%d converter_status=%d",
mSpaceName.c_str(), cfg.is_enabled, cfg.is_conv_enabled);
return false;
}
cfg.num_tx = atoi(space->GetConfigMember("groupbalancer.ntx").c_str());
cfg.mMinFileSize = common::StringConversion::GetSizeFromString(
space->GetConfigMember("groupbalancer.min_file_size"));
cfg.mMaxFileSize = common::StringConversion::GetSizeFromString(
space->GetConfigMember("groupbalancer.max_file_size"));
if (!cfg.mMaxFileSize) {
eos_static_debug("%s", "msg=\"invalid Max File Size, using default\"");
cfg.mMaxFileSize = GROUPBALANCER_MAX_FILE_SIZE;
}
cfg.engine_type = group_balancer::get_engine_type(
space->GetConfigMember("groupbalancer.engine"));
cfg.file_attempts = atoi(
space->GetConfigMember("groupbalancer.file_attempts").c_str());
if (!cfg.file_attempts) {
eos_static_debug("%s", "msg=\"invalid File Attempts Count, using default\"");
cfg.file_attempts = GROUPBALANCER_FILE_ATTEMPTS;
}
auto min_threshold_str = space->GetConfigMember("groupbalancer.min_threshold");
auto max_threshold_str = space->GetConfigMember("groupbalancer.max_threshold");
if (!group_balancer::is_valid_threshold(min_threshold_str, max_threshold_str)) {
if (cfg.engine_type == BalancerEngineT::minmax) {
eos_static_err("msg=\"invalid min/max balancer threshold configuration\""
" space=%s", mSpaceName.c_str());
return false;
}
// This is a temporary stop gap until we force min/max threshold to be set
// and remove this param. For std. balancer in case there isn't an explicit
// min/max, let's set to configured threshold
auto threshold_str = space->GetConfigMember("groupbalancer.threshold");
if (!group_balancer::is_valid_threshold(threshold_str)) {
eos_static_err("msg=\"invalid std balancer threshold configuration\""
" space=%s", mSpaceName.c_str());
return false;
}
min_threshold_str = threshold_str;
max_threshold_str = threshold_str;
}
auto blocklisted_groups = space->GetConfigMember("groupbalancer.blocklist");
mEngineConf.insert_or_assign("min_threshold", std::move(min_threshold_str));
mEngineConf.insert_or_assign("max_threshold", std::move(max_threshold_str));
mEngineConf.insert_or_assign("blocklisted_groups", std::move(blocklisted_groups));
return true;
}
//------------------------------------------------------------------------------
// Eternal loop trying to run conversion jobs
//------------------------------------------------------------------------------
void
GroupBalancer::GroupBalance(ThreadAssistant& assistant) noexcept
{
gOFS->WaitUntilNamespaceIsBooted();
eos_static_info("%s", "msg=\"starting group balancer thread\"");
eosGroupsInfoFetcher fetcher(mSpaceName);
group_balancer::BalancerEngineT prev_engine_type {BalancerEngineT::stddev};
bool engine_reconfigured = false;
bool config_status = true;
// Loop forever until cancelled
while (!assistant.terminationRequested()) {
bool expected_reconfiguration = true;
assistant.wait_for(std::chrono::seconds(10));
if (!gOFS->mMaster->IsMaster()) {
assistant.wait_for(std::chrono::seconds(10));
eos_static_debug("%s", "msg=\"group balancer disabled for slave\"");
continue;
}
// Try to read lock the mutex
if (assistant.terminationRequested()) {
return;
}
FsView::gFsView.ViewMutex.LockRead();
if (!FsView::gFsView.mSpaceGroupView.count(mSpaceName.c_str())) {
FsView::gFsView.ViewMutex.UnLockRead();
eos_static_warning("msg=\"no groups to balance\" space=\"%s\"",
mSpaceName.c_str());
break;
}
FsSpace* space = FsView::gFsView.mSpaceView[mSpaceName.c_str()];
if (mDoConfigUpdate.compare_exchange_strong(expected_reconfiguration, false,
std::memory_order_acq_rel)) {
config_status = Configure(space, mCfg);
}
FsView::gFsView.ViewMutex.UnLockRead();
// Update tracker for scheduled jobs
gOFS->mFidTracker.DoCleanup(TrackerType::Convert);
if (!gOFS->mConverterDriver || !config_status) {
continue;
}
if (prev_engine_type != mCfg.engine_type) {
mEngine.reset(group_balancer::make_balancer_engine(mCfg.engine_type));
engine_reconfigured = true;
prev_engine_type = mCfg.engine_type;
fetcher.should_average(engine_should_average(mCfg.engine_type));
}
mEngine->configure(mEngineConf);
UpdateTransferList();
if ((int) mTransfers.size() >= mCfg.num_tx) {
continue;
}
eos_static_debug("msg=\"group balancer enabled\" ntx=%d ", mCfg.num_tx);
if (cacheExpired() || engine_reconfigured) {
{
eos::common::RWMutexWriteLock lock(mEngineMtx);
mEngine->populateGroupsInfo(fetcher.fetch());
}
printSizes(mEngine->get_group_sizes());
if (engine_reconfigured) {
eos_static_info("msg=\"group balancer engine reconfigured\"");
engine_reconfigured = false;
}
}
if (!mEngine->canPick()) {
eos_static_debug("msg=\"Empty source or target groups, cannot pick\""
" engine_status=%s",
mEngine->get_status_str(false, true).c_str());
continue;
}
prepareTransfers(mCfg.num_tx);
}
}
EOSMGMNAMESPACE_END