// ----------------------------------------------------------------------
// File: GeoBalancer.cc
// Author: Joaquim Rocha - CERN
// ----------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2013 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "mgm/GeoBalancer.hh"
#include "mgm/XrdMgmOfs.hh"
#include "mgm/FsView.hh"
#include "mgm/convert/ConverterDriver.hh"
#include "namespace/interface/IFsView.hh"
#include "namespace/interface/IView.hh"
#include "namespace/Prefetcher.hh"
#include "common/StringConversion.hh"
#include "common/FileId.hh"
#include "common/LayoutId.hh"
#include "XrdSys/XrdSysError.hh"
#include "XrdOuc/XrdOucTrace.hh"
#include "Xrd/XrdScheduler.hh"
#include
#include
extern XrdSysError gMgmOfsEroute;
extern XrdOucTrace gMgmOfsTrace;
#define CACHE_LIFE_TIME 300 // seconds
/*----------------------------------------------------------------------------*/
EOSMGMNAMESPACE_BEGIN
/*----------------------------------------------------------------------------*/
GeoBalancer::GeoBalancer(const char* spacename)
: mThreshold(.5),
mAvgUsedSize(0)
/*----------------------------------------------------------------------------*/
/**
* @brief Constructor by space name
*
* @param spacename name of the associated space
*/
/*----------------------------------------------------------------------------*/
{
mSpaceName = spacename;
mLastCheck = 0;
mThread.reset(&GeoBalancer::GeoBalance, this);
}
/*----------------------------------------------------------------------------*/
void
GeoBalancer::Stop()
/*----------------------------------------------------------------------------*/
/**
* @brief thread stop function
*/
/*----------------------------------------------------------------------------*/
{
mThread.join();
}
/*----------------------------------------------------------------------------*/
GeoBalancer::~GeoBalancer()
/*----------------------------------------------------------------------------*/
/**
* @brief Destructor
*/
/*----------------------------------------------------------------------------*/
{
Stop();
clearCachedSizes();
}
/*----------------------------------------------------------------------------*/
GeotagSize::GeotagSize(uint64_t usedBytes, uint64_t capacity)
: mSize(usedBytes),
mCapacity(capacity)
/*----------------------------------------------------------------------------*/
/**
* @brief GeotagSize constructor (capacity must be > 0)
*/
/*----------------------------------------------------------------------------*/
{
assert(capacity > 0);
}
/*----------------------------------------------------------------------------*/
int
GeoBalancer::getRandom(int max)
/*----------------------------------------------------------------------------*/
/**
* @brief Gets a random int between 0 and a given maximum
* @param max the upper bound of the range within which the int will be
* generated
*/
/*----------------------------------------------------------------------------*/
{
return (int) round(max * random() / (double) RAND_MAX);
}
/*----------------------------------------------------------------------------*/
void
GeoBalancer::clearCachedSizes()
/*----------------------------------------------------------------------------*/
/**
* @brief Clears the cache structures
*/
/*----------------------------------------------------------------------------*/
{
mGeotagFs.clear();
mFsGeotag.clear();
std::map::iterator it;
for (it = mGeotagSizes.begin(); it != mGeotagSizes.end(); it++) {
delete(*it).second;
}
mGeotagSizes.clear();
}
/*----------------------------------------------------------------------------*/
void
GeoBalancer::fillGeotagsByAvg()
/*----------------------------------------------------------------------------*/
/**
* @brief Fills mGeotagsOverAvg with the objects in mGeotagSizes, in case
* they're greater than the current mAvgUsedSize
*/
/*----------------------------------------------------------------------------*/
{
mGeotagsOverAvg.clear();
std::map::const_iterator it;
for (it = mGeotagSizes.cbegin(); it != mGeotagSizes.cend(); it++) {
double geotagAvg = (*it).second->filled();
if (geotagAvg - mAvgUsedSize > mThreshold) {
mGeotagsOverAvg.push_back((*it).first);
}
}
}
static void
printSizes(const std::map* sizes)
{
std::map::const_iterator it;
for (it = sizes->cbegin(); it != sizes->cend(); it++)
eos_static_info("geotag=%s average=%.02f", (*it).first.c_str(),
(double)(*it).second->filled() * 100.0);
}
/*----------------------------------------------------------------------------*/
void
GeoBalancer::populateGeotagsInfo()
/*----------------------------------------------------------------------------*/
/**
* @brief Fills mGeotagSizes, calculates the mAvgUsedSize and fills
* mGeotagsOverAvg
*/
/*----------------------------------------------------------------------------*/
{
clearCachedSizes();
const char* spaceName = mSpaceName.c_str();
eos::common::RWMutexReadLock lock(FsView::gFsView.ViewMutex);
const FsSpace* spaceView = FsView::gFsView.mSpaceView[spaceName];
if (spaceView->size() == 0) {
eos_static_info("msg=\"no filesystems in space\" space=%s", spaceName);
return;
}
for (auto it = spaceView->cbegin(); it != spaceView->cend(); it++) {
FileSystem* fs = FsView::gFsView.mIdView.lookupByID(*it);
if (!fs || (fs->GetActiveStatus() != eos::common::ActiveStatus::kOnline)) {
continue;
}
eos::common::FileSystem::fs_snapshot_t snapshot;
fs->SnapShotFileSystem(snapshot, false);
if (snapshot.mStatus != eos::common::BootStatus::kBooted ||
snapshot.mConfigStatus < eos::common::ConfigStatus::kRO ||
snapshot.mGeoTag.empty()) {
continue;
}
mGeotagFs[snapshot.mGeoTag].push_back(*it);
mFsGeotag[*it] = snapshot.mGeoTag;
uint64_t capacity = snapshot.mDiskCapacity;
uint64_t usedBytes = (uint64_t)(capacity - snapshot.mDiskFreeBytes);
if (mGeotagSizes.count(snapshot.mGeoTag) == 0) {
mGeotagSizes[snapshot.mGeoTag] = new GeotagSize(usedBytes, capacity);
} else {
uint64_t currentUsedBytes = mGeotagSizes[snapshot.mGeoTag]->usedBytes();
uint64_t currentCapacity = mGeotagSizes[snapshot.mGeoTag]->capacity();
mGeotagSizes[snapshot.mGeoTag]->setUsedBytes(currentUsedBytes + usedBytes);
mGeotagSizes[snapshot.mGeoTag]->setCapacity(currentCapacity + capacity);
}
}
mAvgUsedSize = 0;
std::map>::const_iterator
git;
for (git = mGeotagFs.cbegin(); git != mGeotagFs.cend(); git++) {
const std::string geotag = (*git).first;
const std::vector fsVector = (*git).second;
mAvgUsedSize += mGeotagSizes[geotag]->filled();
}
mAvgUsedSize /= ((double) mGeotagSizes.size());
eos_static_info("msg=\"geo_balancer update average fill\" average=%.02f %%",
mAvgUsedSize * 100.0);
fillGeotagsByAvg();
}
/*----------------------------------------------------------------------------*/
/**
* @brief Checks if a file is spread in more than one location
* @param fmd the file metadata object
* @return whether the file is in more than one location or not
*/
/*----------------------------------------------------------------------------*/
bool
GeoBalancer::fileIsInDifferentLocations(const eos::IFileMD* fmd)
{
const std::string* geotag = 0;
eos::IFileMD::LocationVector::const_iterator lociter;
eos::IFileMD::LocationVector loc_vect = fmd->getLocations();
for (lociter = loc_vect.begin(); lociter != loc_vect.end(); ++lociter) {
// ignore filesystem id 0
if (!(*lociter)) {
eos_static_err("msg=\"fsid 0 found\" fxid=%08llx", fmd->getId());
continue;
}
// Ignore EOS_TAPE_FSID
if (EOS_TAPE_FSID == *lociter) {
eos_static_debug("msg=\"skip tape fsid\" fxid=%08llx", fmd->getId());
continue;
}
if (geotag == 0) {
geotag = &mFsGeotag[*lociter];
} else if (geotag->compare(mFsGeotag[*lociter]) != 0) {
return true;
}
}
return false;
}
/*----------------------------------------------------------------------------*/
/**
* @brief Produces a file conversion path to be placed in the proc directory
* and also returns its size
* @param fid the file ID
* @param size return address for the size of the file
* @return the file path
*/
/*----------------------------------------------------------------------------*/
std::string
GeoBalancer::getFileProcTransferNameAndSize(eos::common::FileId::fileid_t fid,
uint64_t* size)
{
char fileName[1024];
std::string file_uri;
std::shared_ptr fmd;
eos::common::LayoutId::layoutid_t layoutid = 0;
{
eos::Prefetcher::prefetchFileMDWithParentsAndWait(gOFS->eosView, fid);
eos::common::RWMutexReadLock lock(gOFS->eosViewRWMutex);
try {
fmd = gOFS->eosFileService->getFileMD(fid);
layoutid = fmd->getLayoutId();
if ((fmd->getContainerId() == 0) ||
(fmd->getNumLocation() == 0) ||
(fmd->getSize() == 0)) {
return std::string("");
}
if (fileIsInDifferentLocations(fmd.get())) {
eos_static_debug("msg=\"file is already in more than one location\" "
"name=%s fxid=%08llx", fmd->getName().c_str(), fid);
return std::string("");
}
if (size) {
*size = fmd->getSize();
}
file_uri = gOFS->eosView->getUri(fmd.get()).c_str();
// Don't touch files in any ../proc/ directory
if (file_uri.rfind(gOFS->MgmProcPath.c_str(), 0) == 0) {
return std::string("");
}
} catch (eos::MDException& e) {
eos_static_debug("msg=\"exception\" ec=%d emsg=\"%s\"", e.getErrno(),
e.getMessage().str().c_str());
return std::string("");
}
}
eos_static_debug("msg=\"found file to geobalance\" path=%s",
file_uri.c_str());
snprintf(fileName, 1024, "%s/%016llx:%s#%08lx",
gOFS->MgmProcConversionPath.c_str(), fid, mSpaceName.c_str(),
(unsigned long) layoutid);
return std::string(fileName);
}
//------------------------------------------------------------------------------
// Update the list of ongoing transfers
//------------------------------------------------------------------------------
void
GeoBalancer::updateTransferList()
{
// Update tracker for scheduled jobs if using new converter
gOFS->mFidTracker.DoCleanup(TrackerType::Convert);
for (auto it = mTransfers.begin(); it != mTransfers.end();) {
if (!gOFS->mFidTracker.HasEntry(it->first)) {
mTransfers.erase(it++);
} else {
++it;
}
}
eos_static_info("msg=\"geo_balancer update transfers\" scheduled_transfers=%d",
mTransfers.size());
}
//------------------------------------------------------------------------------
// Creates the conversion file in proc for the file ID, from the given
// fromGeotag (updates the cache structures).
//
// @note: All this works based on the assumption that kScattered is the default
// placement policy.
//------------------------------------------------------------------------------
bool
GeoBalancer::scheduleTransfer(eos::common::FileId::fileid_t fid,
const std::string& fromGeotag)
{
uint64_t size = 0;
std::string file_path = getFileProcTransferNameAndSize(fid, &size);
if (file_path == "") {
return false;
}
std::string conv_tag = file_path;
conv_tag += "^geobalancer^";
conv_tag.erase(0, gOFS->MgmProcConversionPath.length() + 1);
if (gOFS->mConverterDriver->ScheduleJob(fid, conv_tag)) {
eos_static_info("msg=\"geo_balancer scheduled job\" file=\"%s\" "
"from_geotag=\"%s\"", conv_tag.c_str(),
fromGeotag.c_str());
} else {
eos_static_err("msg=\"geo_balancer failed to schedule job\" "
"file=\"%s\" from_geotag=\"%s\"", conv_tag.c_str(),
fromGeotag.c_str());
return false;
}
mTransfers[fid] = file_path.c_str();
uint64_t usedBytes = mGeotagSizes[fromGeotag]->usedBytes();
mGeotagSizes[fromGeotag]->setUsedBytes(usedBytes - size);
fillGeotagsByAvg();
return true;
}
/*----------------------------------------------------------------------------*/
eos::common::FileId::fileid_t
GeoBalancer::chooseFidFromGeotag(const std::string& geotag)
/*----------------------------------------------------------------------------*/
/**
* @brief Chooses a random file ID from a random filesystem in the given geotag
* @param geotag the location's name from which the file id will be chosen
* @return the chosen file ID
*/
/*----------------------------------------------------------------------------*/
{
int rndIndex;
bool found = false;
uint64_t fsid_size = 0ull;
eos::common::FileSystem::fsid_t fsid = 0;
eos::common::RWMutexReadLock vlock(FsView::gFsView.ViewMutex);
eos::common::RWMutexReadLock lock(gOFS->eosViewRWMutex);
std::vector& validFs = mGeotagFs[geotag];
// TODO(gbitzes): Add prefetching here.
while (validFs.size() > 0) {
rndIndex = getRandom(validFs.size() - 1);
fsid = validFs[rndIndex];
fsid_size = gOFS->eosFsView->getNumFilesOnFs(fsid);
if (fsid_size) {
found = true;
break;
}
validFs.erase(validFs.begin() + rndIndex);
}
if (validFs.size() == 0) {
mGeotagFs.erase(geotag);
mGeotagSizes.erase(geotag);
fillGeotagsByAvg();
}
if (!found) {
return -1;
}
int attempts = 10;
while (attempts-- > 0) {
eos::IFileMD::id_t randomPick;
if (gOFS->eosFsView->getApproximatelyRandomFileInFs(fsid, randomPick) &&
mTransfers.count(randomPick) == 0) {
return randomPick;
}
}
return -1;
}
/*----------------------------------------------------------------------------*/
void
GeoBalancer::prepareTransfer()
/*----------------------------------------------------------------------------*/
/**
* @brief Picks a geotag randomly and schedule a file ID to be transferred
*/
/*----------------------------------------------------------------------------*/
{
if (mGeotagsOverAvg.size() == 0) {
eos_static_debug("%s", "msg=\"no geotags above average\"");
return;
}
int attempts = 10;
while (attempts-- > 0) {
int rndIndex = getRandom(mGeotagsOverAvg.size() - 1);
std::vector::const_iterator over_it = mGeotagsOverAvg.cbegin();
std::advance(over_it, rndIndex);
// TODO: this loop should be improved not to request the file list too
// many times in a tight loop
eos::common::FileId::fileid_t fid = chooseFidFromGeotag(*over_it);
if ((int) fid == -1) {
eos_static_debug("msg=\"no fid found to schedule\" failed_geotag=%s",
(*over_it).c_str());
continue;
}
if (scheduleTransfer(fid, *over_it)) {
break;
}
}
}
/*----------------------------------------------------------------------------*/
bool
GeoBalancer::cacheExpired()
/*----------------------------------------------------------------------------*/
/**
* @brief Check if the sizes cache should be updated (based on the time passed
* since they were last updated)
* @return whether the cache expired or not
*/
/*----------------------------------------------------------------------------*/
{
time_t currentTime = time(NULL);
if (difftime(currentTime, mLastCheck) > CACHE_LIFE_TIME) {
mLastCheck = currentTime;
return true;
}
return false;
}
//------------------------------------------------------------------------------
//Schedule a pre-defined number of transfers
//------------------------------------------------------------------------------
void
GeoBalancer::prepareTransfers(int nrTransfers)
{
int allowedTransfers = nrTransfers - mTransfers.size();
for (int i = 0; i < allowedTransfers; i++) {
prepareTransfer();
}
if (allowedTransfers > 0) {
printSizes(&mGeotagSizes);
}
}
//------------------------------------------------------------------------------
//! @brief eternal loop trying to run conversion jobs
//------------------------------------------------------------------------------
void
GeoBalancer::GeoBalance(ThreadAssistant& assistant) noexcept
{
eos::common::VirtualIdentity rootvid = eos::common::VirtualIdentity::Root();
gOFS->WaitUntilNamespaceIsBooted(assistant);
assistant.wait_for(std::chrono::seconds(10));
// Loop forever until cancelled
while (!assistant.terminationRequested()) {
bool is_enabled = true;
int nrTransfers = 0;
FsSpace* space {nullptr};
decltype(FsView::gFsView.mSpaceView.begin()) it_space;
if (!gOFS->mMaster->IsMaster()) {
eos_static_debug("%s", "msg=\"geo balancer is disabled for slave\"");
goto wait;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
FsView::gFsView.ViewMutex.LockRead();
if (!FsView::gFsView.mSpaceGroupView.count(mSpaceName.c_str())) {
FsView::gFsView.ViewMutex.UnLockRead();
eos_static_warning("msg=\"no space to geo balance\" space=\"%s\"",
mSpaceName.c_str());
break;
}
it_space = FsView::gFsView.mSpaceView.find(mSpaceName.c_str());
if (it_space == FsView::gFsView.mSpaceView.end()) {
eos_static_err("msg=\"geo_balancer terminating, no such space\" space=%s",
mSpaceName.c_str());
break;
}
space = it_space->second;
if (space->GetConfigMember("converter") != "on") {
eos_static_debug("msg=\"geo balancer disabled since it needs the "
"converter enabled to work and it's not\" space=%s",
mSpaceName.c_str());
FsView::gFsView.ViewMutex.UnLockRead();
goto wait;
}
// Extract the current settings if conversion enabled and how many
// conversion jobs should run
is_enabled = space->GetConfigMember("geobalancer") == "on";
nrTransfers = atoi(space->GetConfigMember("geobalancer.ntx").c_str());
mThreshold = atof(space->GetConfigMember("geobalancer.threshold").c_str());
mThreshold /= 100.0;
FsView::gFsView.ViewMutex.UnLockRead();
if (is_enabled) {
eos_static_info("msg=\"geo balancer is enabled\" ntx=%d ", nrTransfers);
updateTransferList();
if ((int) mTransfers.size() >= nrTransfers) {
goto wait;
}
if (cacheExpired()) {
populateGeotagsInfo();
printSizes(&mGeotagSizes);
}
prepareTransfers(nrTransfers);
} else {
eos_static_debug("%s", "msg=\"geo balancer is disabled\"");
}
wait:
// Let some time pass or wait for a notification
assistant.wait_for(std::chrono::seconds(10));
if (assistant.terminationRequested()) {
return;
}
}
}
EOSMGMNAMESPACE_END