//------------------------------------------------------------------------------ // @file DrainFs.cc //------------------------------------------------------------------------------ /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2019 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "mgm/drain/DrainFs.hh" #include "mgm/drain/DrainTransferJob.hh" #include "mgm/XrdMgmOfs.hh" #include "mgm/FsView.hh" #include "common/table_formatter/TableFormatterBase.hh" #include "common/ThreadPool.hh" #include "namespace/interface/IView.hh" #include EOSMGMNAMESPACE_BEGIN using namespace std::chrono; constexpr std::chrono::seconds DrainFs::sRefreshTimeout; constexpr std::chrono::seconds DrainFs::sStallTimeout; //------------------------------------------------------------------------------ // Constructor //------------------------------------------------------------------------------ DrainFs::DrainFs(eos::common::ThreadPool& thread_pool, eos::IFsView* fs_view, eos::common::FileSystem::fsid_t src_fsid, eos::common::FileSystem::fsid_t dst_fsid): mNsFsView(fs_view), mFsId(src_fsid), mTargetFsId(dst_fsid), mStatus(eos::common::DrainStatus::kNoDrain), mDidRerun(false), mDrainStop(false), mMaxJobs(10), mDrainPeriod(0), mThreadPool(thread_pool), mTotalFiles(0ull), mPending(0ull), mLastPending(0ull), mLastProgressTime(steady_clock::now()), mLastUpdateTime(steady_clock::now()) {} //------------------------------------------------------------------------------ // Destructor //------------------------------------------------------------------------------ DrainFs::~DrainFs() { eos_static_debug("msg=\"fsid=%u destroying fs drain object", mFsId); ResetCounters(); } //------------------------------------------------------------------------------ // Get space defined drain variables //------------------------------------------------------------------------------ void DrainFs::GetSpaceConfiguration(const std::string& space_name) { if (!space_name.empty() && FsView::gFsView.mSpaceView.count(space_name)) { auto space = FsView::gFsView.mSpaceView[space_name]; if (space) { if (space->GetConfigMember("drainer.fs.ntx") != "") { mMaxJobs.store(std::stoul(space->GetConfigMember("drainer.fs.ntx"))); eos_static_debug("msg=\"per fs max parallel jobs=%u\"", mMaxJobs.load()); } } else { eos_warning("msg=\"space %s not yet initialized\"", space_name.c_str()); } } else { // Use some sensible default values for testing mMaxJobs = 2; } } //------------------------------------------------------------------------------ // Method draining the file system //------------------------------------------------------------------------------ DrainFs::State DrainFs::DoIt() { eos_notice("msg=\"start draining\" fsid=%d", mFsId); WaitUntilNamespaceIsBooted(); if (mDrainStop) { eos_err("msg=\"drain stopped while waiting for the namespace boot\" " "fsid=%lu", mFsId); return State::Failed; } mTotalFiles = mNsFsView->getNumFilesOnFs(mFsId); if (mTotalFiles == 0) { SuccessfulDrain(); return State::Done; } if (!PrepareFs()) { return State::Failed; } State state = State::Running; // Loop to drain the files while (!mDrainStop && (state != State::Done) && (state != State::Failed)) { mTotalFiles = mNsFsView->getNumFilesOnFs(mFsId); mPending = mTotalFiles; for (auto it_fid = mNsFsView->getStreamingFileList(mFsId); it_fid && it_fid->valid(); /* no progress */) { if (NumRunningJobs() <= mMaxJobs) { std::shared_ptr job { new DrainTransferJob(it_fid->getElement(), mFsId, mTargetFsId)}; if (!gOFS->mFidTracker.AddEntry(it_fid->getElement(), TrackerType::Drain)) { job->ReportError(SSTR("msg=\"skip currently scheduled drain\" " "fxid=" << std::hex << it_fid->getElement())); eos::common::RWMutexWriteLock wr_lock(mJobsMutex); mJobsFailed.insert(job); } else { mThreadPool.PushTask([ = ] { job->UpdateMgmStats(); job->DoIt(); job->UpdateMgmStats(); }); eos::common::RWMutexWriteLock wr_lock(mJobsMutex); mJobsRunning.push_back(job); } // Advance to the next file id to be drained it_fid->next(); --mPending; } else { std::this_thread::sleep_for(seconds(1)); } HandleRunningJobs(); state = UpdateProgress(); if (mDrainStop || (state != State::Running)) { break; } } if (mDrainStop) { break; } while (!mDrainStop && (state == State::Running)) { HandleRunningJobs(); state = UpdateProgress(); // Ensure we do a refresh of the files to drain if there are no more // running jobs so that we don't get stuck in stalling forever. if (NumRunningJobs() == 0) { break; } } } if (mDrainStop) { StopJobs(); ResetCounters(); state = State::Failed; } else { if (state == State::Rerun) { FailedDrain(); state = State::Failed; } } eos_notice("msg=\"finished draining\" fsid=%d state=%i", mFsId, state); return state; } //---------------------------------------------------------------------------- // Handle running jobs //---------------------------------------------------------------------------- void DrainFs::HandleRunningJobs() { eos::common::RWMutexWriteLock wr_lock(mJobsMutex); for (auto it = mJobsRunning.begin(); it != mJobsRunning.end(); /* no progress */) { std::string sfxid = (*it)->GetInfo({"fxid"}).front(); eos::IFileMD::id_t fxid = eos::common::FileId::Hex2Fid(sfxid.c_str()); if ((*it)->GetStatus() == DrainTransferJob::Status::OK) { gOFS->mFidTracker.RemoveEntry(fxid); it = mJobsRunning.erase(it); } else if ((*it)->GetStatus() == DrainTransferJob::Status::Failed) { gOFS->mFidTracker.RemoveEntry(fxid); mJobsFailed.insert(*it); it = mJobsRunning.erase(it); } else { ++it; } } } //---------------------------------------------------------------------------- // Mark file system drain as successful //---------------------------------------------------------------------------- void DrainFs::SuccessfulDrain() { eos_notice("msg=\"complete drain\" fsid=%d", mFsId); eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex); FileSystem* fs = FsView::gFsView.mIdView.lookupByID(mFsId); if (fs) { mStatus = eos::common::DrainStatus::kDrained; eos::common::FileSystemUpdateBatch batch; batch.setDrainStatusLocal(mStatus); batch.setLongLongLocal("local.drain.bytesleft", 0); batch.setLongLongLocal("local.drain.timeleft", 0); batch.setLongLongLocal("local.drain.failed", 0); batch.setLongLongLocal("local.drain.files", 0); if (!gOFS->Shutdown) { // If drain done and the system is not shutting down then set the // file system to "empty" state batch.setLongLongLocal("local.drain.progress", 100); batch.setLongLongLocal("local.drain.failed", 0); batch.setStringDurable("configstatus", "empty"); FsView::gFsView.StoreFsConfig(fs); } fs->applyBatch(batch); } } //------------------------------------------------------------------------------ // Mark file system drain as failed //------------------------------------------------------------------------------ void DrainFs::FailedDrain() { eos_notice("msg=\"failed drain\" fsid=%d", mFsId); eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex); FileSystem* fs = FsView::gFsView.mIdView.lookupByID(mFsId); if (fs) { mStatus = eos::common::DrainStatus::kDrainFailed; eos::common::FileSystemUpdateBatch batch; batch.setDrainStatusLocal(mStatus); batch.setLongLongLocal("local.drain.timeleft", 0); batch.setLongLongLocal("local.drain.progress", 100); batch.setLongLongLocal("local.drain.failed", NumFailedJobs()); fs->applyBatch(batch); } } //------------------------------------------------------------------------------ // Stop ongoing drain jobs //------------------------------------------------------------------------------ void DrainFs::StopJobs() { { eos::common::RWMutexReadLock rd_lock(mJobsMutex); // Signal all drain jobs to stop/cancel for (auto& job : mJobsRunning) { if (job->GetStatus() == DrainTransferJob::Status::Running) { job->Cancel(); } } // Wait for drain jobs to cancel for (auto& job : mJobsRunning) { while ((job->GetStatus() == DrainTransferJob::Status::Running) || (job->GetStatus() == DrainTransferJob::Status::Ready)) { std::this_thread::sleep_for(milliseconds(10)); } // Also clean them up form the tracker const std::string sfxid = job->GetInfo({"fxid"}).front(); eos::IFileMD::id_t fxid = eos::common::FileId::Hex2Fid(sfxid.c_str()); gOFS->mFidTracker.RemoveEntry(fxid); } } eos::common::RWMutexWriteLock wr_lock(mJobsMutex); mJobsRunning.clear(); } //------------------------------------------------------------------------------ // Prepare the file system for drain i.e. delay the start by the configured // amount of time, set the status etc. //------------------------------------------------------------------------------ bool DrainFs::PrepareFs() { std::string space_name; { eos_info("msg=\"setting the drain prepare status\" fsid=%i", mFsId); eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex); FileSystem* fs = FsView::gFsView.mIdView.lookupByID(mFsId); if (!fs) { eos_notice("msg=\"removed during prepare\" fsid=%d", mFsId); return false; } mStatus = eos::common::DrainStatus::kDrainPrepare; eos::common::FileSystemUpdateBatch batch; batch.setLongLongLocal("local.drain.bytesleft", 0); batch.setLongLongLocal("local.drain.files", 0); batch.setLongLongLocal("local.drain.failed", 0); batch.setLongLongLocal("local.drain.timeleft", 0); batch.setLongLongLocal("local.drain.progress", 0); batch.setDrainStatusLocal(mStatus); fs->applyBatch(batch); mDrainPeriod = seconds(fs->GetLongLong("drainperiod")); eos::common::FileSystem::fs_snapshot_t drain_snapshot; fs->SnapShotFileSystem(drain_snapshot, false); space_name = drain_snapshot.mSpace; } mDrainStart = steady_clock::now(); mDrainEnd = mDrainStart + mDrainPeriod; // Wait 60 seconds or the service delay time indicated by Master size_t kLoop = gOFS->mMaster->GetServiceDelay(); if (!kLoop) { kLoop = 60; } for (size_t k = 0; k < kLoop; ++k) { std::this_thread::sleep_for(seconds(1)); { eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex); FileSystem* entry = FsView::gFsView.mIdView.lookupByID(mFsId); if (!entry) { eos_err("msg=\"removed during drain prepare\" fsid=%d", mFsId); return false; } entry->SetLongLongLocal("local.drain.timeleft", kLoop - 1 - k); } if (mDrainStop) { ResetCounters(); return false; } } // Mark file system as draining eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex); FileSystem* fs = FsView::gFsView.mIdView.lookupByID(mFsId); if (!fs) { eos_notice("msg=\"removed during drain\" fsid=%d", mFsId); return false; } GetSpaceConfiguration(space_name); mStatus = eos::common::DrainStatus::kDraining; eos::common::FileSystemUpdateBatch batch; batch.setDrainStatusLocal(mStatus); batch.setLongLongLocal("local.drain.files", mTotalFiles); batch.setLongLongLocal("local.drain.failed", 0); batch.setLongLongLocal("local.drain.bytesleft", fs->GetLongLong("stat.statfs.usedbytes")); fs->applyBatch(batch); return true; } //------------------------------------------------------------------------------ // Update progress of the drain //------------------------------------------------------------------------------ DrainFs::State DrainFs::UpdateProgress() { bool is_expired = false; auto now = steady_clock::now(); if (mLastPending != mPending) { mLastPending = mPending; mLastProgressTime = now; } else { std::this_thread::sleep_for(seconds(1)); } auto duration = now - mLastProgressTime; bool is_stalled = (duration_cast(duration).count() > sStallTimeout.count()); eos_static_debug("msg=\"fsid=%d, timestamp=%llu, last_progress=%llu, is_stalled=%i, " "total_files=%llu, last_pending=%llu, pending=%llu, running=%llu, " "failed=%llu\"", mFsId, duration_cast(now.time_since_epoch()).count(), duration_cast(mLastProgressTime.time_since_epoch()).count(), is_stalled, mTotalFiles, mLastPending, mPending, NumRunningJobs(), NumFailedJobs()); // Check if drain expired if (mDrainPeriod.count() && (mDrainEnd < now)) { eos_warning("msg=\"drain expired\" fsid=%d", mFsId); is_expired = true; } // Update drain display variables if (is_stalled || is_expired || (mLastProgressTime == now)) { eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex); FileSystem* fs = FsView::gFsView.mIdView.lookupByID(mFsId); if (!fs) { eos_err("msg=\"removed during drain\" fsid=%d", mFsId); return State::Failed; } if (is_expired) { mStatus = eos::common::DrainStatus::kDrainExpired; common::FileSystemUpdateBatch batch; batch.setLongLongLocal("local.drain.timeleft", 0); batch.setLongLongLocal("local.drain.files", mPending); batch.setDrainStatusLocal(mStatus); fs->applyBatch(batch); return State::Failed; } common::FileSystemUpdateBatch batch; if (is_stalled) { if (mStatus != eos::common::DrainStatus::kDrainStalling) { mStatus = eos::common::DrainStatus::kDrainStalling; batch.setDrainStatusLocal(mStatus); } } else { if (mStatus != eos::common::DrainStatus::kDraining) { mStatus = eos::common::DrainStatus::kDraining; batch.setDrainStatusLocal(mStatus); } } uint64_t progress = 100ull; if (mTotalFiles) { progress = 100.0 * (mTotalFiles - mPending) / mTotalFiles; } uint64_t time_left = 99999999999ull; if (mDrainEnd > now) { time_left = duration_cast(mDrainEnd - now).count(); } batch.setLongLongLocal("local.drain.failed", NumFailedJobs()); batch.setLongLongLocal("local.drain.files", mPending); batch.setLongLongLocal("local.drain.progress", progress); batch.setLongLongLocal("local.drain.timeleft", time_left); batch.setLongLongLocal("local.drain.bytesleft", fs->GetLongLong("stat.statfs.usedbytes")); fs->applyBatch(batch); eos_static_debug("msg=\"fsid=%d, update progress", mFsId); } // Sleep for a longer period since nothing moved in the last 10 min if (is_stalled) { std::this_thread::sleep_for(seconds(30)); } // If we have only failed jobs check if the files still exist. It could also // be that there were new files written while draining was started. if ((mPending == 0) && (NumRunningJobs() == 0)) { uint64_t total_files = mNsFsView->getNumFilesOnFs(mFsId); if (total_files == 0) { SuccessfulDrain(); return State::Done; } else { if (total_files == NumFailedJobs()) { FailedDrain(); return State::Failed; } else { if (mDidRerun) { // If we already did a rerun then we just fail since there might be // ghost entries on the file system i.e. fids registered in the // FileSystem view but without any existing FileMD object. FailedDrain(); return State::Failed; } else { mDidRerun = true; eos_info("msg=\"still %llu files to drain before declaring the file " "system empty\" fsid=%lu", total_files, mFsId); mTotalFiles = total_files; mPending = mTotalFiles; eos::common::RWMutexWriteLock wr_lock(mJobsMutex); mJobsFailed.clear(); return State::Rerun; } } } } return State::Running; } //------------------------------------------------------------------------------ // Reset drain counters and status //------------------------------------------------------------------------------ void DrainFs::ResetCounters() { eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex); FileSystem* fs = FsView::gFsView.mIdView.lookupByID(mFsId); if (fs) { common::FileSystemUpdateBatch batch; batch.setLongLongLocal("local.drain.bytesleft", 0); batch.setLongLongLocal("local.drain.files", 0); batch.setLongLongLocal("local.drain.timeleft", 0); batch.setLongLongLocal("local.drain.progress", 0); batch.setDrainStatusLocal(eos::common::DrainStatus::kNoDrain); fs->applyBatch(batch); } mStatus = eos::common::DrainStatus::kNoDrain; } //------------------------------------------------------------------------------ // Populate table with drain jobs info corresponding to the current fs //------------------------------------------------------------------------------ void DrainFs::PrintJobsTable(TableFormatterBase& table, bool show_errors, const std::list& itags) const { TableData table_data; eos::common::RWMutexReadLock rd_lock(mJobsMutex); if (show_errors) { for (const auto& job : mJobsFailed) { table_data.emplace_back(); std::list data = job->GetInfo(itags); for (const auto& elem : data) { table_data.back().push_back(TableCell(elem, "s")); } } } else { for (const auto& job : mJobsRunning) { table_data.emplace_back(); for (const auto& elem : job->GetInfo(itags)) { table_data.back().push_back(TableCell(elem, "s")); } } } table.AddRows(table_data); } //------------------------------------------------------------------------------ // Wait until namespace is booted or drain stop is requested //------------------------------------------------------------------------------ void DrainFs::WaitUntilNamespaceIsBooted() const { while ((gOFS->mNamespaceState != NamespaceState::kBooted) && (!mDrainStop)) { std::this_thread::sleep_for(std::chrono::seconds(1)); eos_static_debug("msg=\"delay drain start until namespace is booted\" fsid=%u", mFsId); } } EOSMGMNAMESPACE_END