//------------------------------------------------------------------------------
//! @file Drainer.cc
//------------------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2019 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "mgm/drain/Drainer.hh"
#include "mgm/XrdMgmOfs.hh"
#include "mgm/drain/DrainFs.hh"
#include "mgm/drain/DrainTransferJob.hh"
#include "mgm/FsView.hh"
#include "mgm/IMaster.hh"
#include "common/StacktraceHere.hh"
#include "common/table_formatter/TableFormatterBase.hh"
EOSMGMNAMESPACE_BEGIN
//------------------------------------------------------------------------------
// Constructor
//------------------------------------------------------------------------------
Drainer::Drainer():
mThreadPool(10, 100, 10, 6, 5, "drain")
{}
//------------------------------------------------------------------------------
// Start running thread
//------------------------------------------------------------------------------
void Drainer::Start()
{
mThread.reset(&Drainer::Drain, this);
}
//------------------------------------------------------------------------------
// Stop running thread and implicitly all running drain jobs
//------------------------------------------------------------------------------
void
Drainer::Stop()
{
mThread.join();
gOFS->mFidTracker.Clear(TrackerType::Drain);
}
//------------------------------------------------------------------------------
// Destructor
//------------------------------------------------------------------------------
Drainer::~Drainer()
{
Stop();
}
//------------------------------------------------------------------------------
// Start draining of a given file system
//------------------------------------------------------------------------------
bool
Drainer::StartFsDrain(eos::mgm::FileSystem* fs,
eos::common::FileSystem::fsid_t dst_fsid,
std::string& err)
{
using eos::common::FileSystem;
FileSystem::fsid_t src_fsid = fs->GetId();
eos_info("msg=\"start draining\" fsid=%d", src_fsid);
FileSystem::fs_snapshot_t src_snapshot;
fs->SnapShotFileSystem(src_snapshot);
// Check that the destination fs, if specified, is in the same space and
// group as the source
if (dst_fsid) {
FileSystem* dst = FsView::gFsView.mIdView.lookupByID(dst_fsid);
FileSystem::fs_snapshot_t dst_snapshot;
if (!dst) {
err = SSTR("error: destination file system " << dst_fsid
<< " does not exist");
return false;
}
dst->SnapShotFileSystem(dst_snapshot, false);
if ((src_snapshot.mSpace != dst_snapshot.mSpace) ||
(src_snapshot.mGroup != dst_snapshot.mGroup)) {
err = SSTR("error: destination file system " << dst_fsid << " does not "
<< "belong to the same space and scheduling group as the "
<< "source");
return false;
}
}
eos::common::RWMutexWriteLock wr_lock(mDrainMutex);
auto it_drainfs = mDrainFs.find(src_snapshot.mHostPort);
if (it_drainfs != mDrainFs.end()) {
// Check if the fs is already draining for this node
auto it = std::find_if(it_drainfs->second.begin(), it_drainfs->second.end(),
[src_fsid](const shared_ptr& elem) -> bool {
return (elem->GetFsId() == src_fsid);
});
if (it != it_drainfs->second.end()) {
err = SSTR("error: drain has already started for the given fsid="
<< src_fsid);
return false;
} else {
// Check if drain request is not already pending
auto it_pending = std::find_if(mPending.begin(), mPending.end(),
[src_fsid](const std::pair& elem) -> bool {
return (elem.first == src_fsid);
});
if (it_pending != mPending.end()) {
err = SSTR("error: drain jobs is already pending for fsid="
<< src_fsid);
return false;
}
// Check if we have reached the max fs per node for this node
if (it_drainfs->second.size() >=
MaxDrainFsInParallel(src_snapshot.mSpace)) {
fs->SetDrainStatus(eos::common::DrainStatus::kDrainWait);
mPending.push_back(std::make_pair(src_fsid, dst_fsid));
return true;
}
}
}
// Start the drain
std::shared_ptr dfs(new DrainFs(mThreadPool, gOFS->eosFsView,
src_fsid, dst_fsid));
auto future = std::async(std::launch::async, &DrainFs::DoIt, dfs);
dfs->SetFuture(std::move(future));
mDrainFs[src_snapshot.mHostPort].emplace(dfs);
return true;
}
//------------------------------------------------------------------------------
// Stop draining of a given file system
//------------------------------------------------------------------------------
bool
Drainer::StopFsDrain(eos::mgm::FileSystem* fs, std::string& err)
{
eos::common::FileSystem::fsid_t fsid = fs->GetId();
eos_notice("msg=\"stop draining\" fsid=%d ", fsid);
if (fsid == 0) {
std::ostringstream ss;
ss << "Debug stacktrace: " << eos::common::getStacktrace();
eos_static_crit("msg=\"%s\"", ss.str().c_str());
}
eos::common::FileSystem::fs_snapshot_t drain_snapshot;
fs->SnapShotFileSystem(drain_snapshot);
eos::common::RWMutexWriteLock wr_lock(mDrainMutex);
auto it_drainfs = mDrainFs.find(drain_snapshot.mHostPort);
if (it_drainfs == mDrainFs.end()) {
err = SSTR("error: no drain started for fsid=" << fsid);
return false;
}
// Check if the fs is draining
auto it = std::find_if(it_drainfs->second.begin(), it_drainfs->second.end(),
[fsid](const shared_ptr& elem) -> bool {
return (elem->GetFsId() == fsid);
});
if (it == it_drainfs->second.end()) {
// Check if there is a request pending
auto it_pending = std::find_if(mPending.begin(), mPending.end(),
[fsid](const std::pair& elem) -> bool {
return (elem.first == fsid);
});
if (it_pending != mPending.end()) {
(void) mPending.erase(it_pending);
}
fs->SetDrainStatus(eos::common::DrainStatus::kNoDrain);
} else {
(*it)->SignalStop();
}
return true;
}
//------------------------------------------------------------------------------
// Get drain jobs info
//------------------------------------------------------------------------------
bool
Drainer::GetJobsInfo(std::string& out, const DrainHdrInfo& hdr_info,
unsigned int fsid, bool only_failed, bool monitor_fmt) const
{
if (hdr_info.empty()) {
eos_err("msg=\"drain info header is empty\"");
return false;
}
// Collect list of internal tags for display
std::list itags;
for (const auto& elem : hdr_info) {
itags.push_back(elem.second);
}
TableFormatterBase table;
TableHeader table_header;
for (const auto& elem : hdr_info) {
if (monitor_fmt) {
table_header.push_back(std::make_tuple(elem.first, 10, "s"));
} else {
table_header.push_back(std::make_tuple(elem.first, 0, "s"));
}
}
table.SetHeader(table_header);
std::vector selections;
bool found {false};
{
// Loop through all drain jobs and collect status information
eos::common::RWMutexReadLock rd_lock(mDrainMutex);
if (mDrainFs.size() == 0) {
out += "info: there is no ongoing drain activity";
return true;
}
for (const auto& pair : mDrainFs) {
for (const auto& drain_fs : pair.second) {
if (fsid == 0) {
drain_fs->PrintJobsTable(table, only_failed, itags);
} else {
if (fsid == drain_fs->GetFsId()) {
drain_fs->PrintJobsTable(table, only_failed, itags);
found = true;
break;
}
}
}
if (found) {
break;
}
}
}
out = table.GenerateTable(HEADER, selections).c_str();
return true;
}
//------------------------------------------------------------------------------
// Method doing the drain monitoring
//------------------------------------------------------------------------------
void
Drainer::Drain(ThreadAssistant& assistant) noexcept
{
eos_static_notice("%s", "msg=\"starting central drainer\"");
gOFS->WaitUntilNamespaceIsBooted(assistant);
// Wait that current MGM becomes a master
do {
eos_debug("%s", "msg=\"drain waiting for master MGM\"");
assistant.wait_for(std::chrono::seconds(10));
} while (!assistant.terminationRequested() && !gOFS->mMaster->IsMaster());
// Reapply the drain status for file systems in drain mode
FsView::gFsView.ReapplyDrainStatus();
while (!assistant.terminationRequested()) {
UpdateFromSpaceConfig();
HandleQueued();
gOFS->mFidTracker.DoCleanup(TrackerType::Drain);
assistant.wait_for(std::chrono::seconds(5));
// Clean up finished or stopped file system drains
eos::common::RWMutexWriteLock wr_lock(mDrainMutex);
for (auto& pair : mDrainFs) {
auto& set_fs = pair.second;
for (auto it = set_fs.begin(); it != set_fs.end(); /*empty*/) {
if (!(*it)->IsRunning()) {
it = set_fs.erase(it);
} else {
++it;
}
}
}
}
WaitForAllDrainToStop();
eos_static_notice("%s", "msg=\"stopped central drainer\"");
}
//------------------------------------------------------------------------------
// Signal all drain file systems to stop and wait for them
//------------------------------------------------------------------------------
void
Drainer::WaitForAllDrainToStop()
{
eos_notice("%s", "msg=\"stop all ongoing drain\"");
{
eos::common::RWMutexReadLock rd_lock(mDrainMutex);
for (auto& node_elem : mDrainFs) {
for (const auto& fs_elem : node_elem.second) {
fs_elem->SignalStop();
}
}
}
bool all_stopped {false};
while (!all_stopped) {
{
all_stopped = true;
eos::common::RWMutexReadLock rd_lock(mDrainMutex);
for (auto& node_elem : mDrainFs) {
for (const auto& fs_elem : node_elem.second) {
if (fs_elem->IsRunning()) {
all_stopped = false;
break;
}
}
if (!all_stopped) {
break;
}
}
}
if (!all_stopped) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
eos::common::RWMutexWriteLock wr_lock(mDrainMutex);
mDrainFs.clear();
mPending.clear();
}
//------------------------------------------------------------------------------
// Update drain relevant configuration from the space view
//------------------------------------------------------------------------------
void
Drainer::UpdateFromSpaceConfig()
{
using namespace std::chrono;
static auto last_update = steady_clock::now();
// Update every minute
if (duration_cast(steady_clock::now() - last_update).count() > 60) {
last_update = std::chrono::steady_clock::now();
eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex);
for (const auto& space : FsView::gFsView.mSpaceView) {
int max_drain_fs = 5;
if (space.second->GetConfigMember("drainer.node.nfs") != "") {
max_drain_fs = atoi(space.second->GetConfigMember("drainer.node.nfs").c_str());
} else {
space.second->SetConfigMember("drainer.node.nfs", "5");
}
// Set the space configuration
XrdSysMutexHelper scope_lock(mCfgMutex);
mCfgMap[space.first] = max_drain_fs;
}
}
}
//------------------------------------------------------------------------------
// Get the maximum number of file systems that can be drained in parallel
// on the same node.
//------------------------------------------------------------------------------
unsigned int
Drainer::MaxDrainFsInParallel(const std::string& space) const
{
XrdSysMutexHelper scope_lock(mCfgMutex);
const auto it = mCfgMap.find(space);
if (it != mCfgMap.end()) {
return it->second;
}
return 0u;
}
//------------------------------------------------------------------------------
// Handle queued draining requests
//------------------------------------------------------------------------------
void
Drainer::HandleQueued()
{
std::string msg;
ListPendingT lst;
{
eos::common::RWMutexWriteLock wr_lock(mDrainMutex);
std::swap(lst, mPending);
}
while (!lst.empty()) {
auto pair = lst.front();
lst.pop_front();
eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex);
FileSystem* fs = FsView::gFsView.mIdView.lookupByID(pair.first);
if (fs && !StartFsDrain(fs, pair.second, msg)) {
eos_err("msg=\"failed to start pending drain src_fsid=%lu\""
" msg=\"%s\"", pair.first, msg.c_str());
}
}
}
EOSMGMNAMESPACE_END