//------------------------------------------------------------------------------ // File: Fsck.cc // Author: Andreas-Joachim Peters/Elvin Sindrilaru - CERN //------------------------------------------------------------------------------ /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2011 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "mgm/fsck/Fsck.hh" #include "common/Timing.hh" #include "common/LayoutId.hh" #include "common/Path.hh" #include "common/Mapping.hh" #include "common/StringConversion.hh" #include "common/StringTokenizer.hh" #include "common/StringUtils.hh" #include "mgm/XrdMgmOfs.hh" #include "mgm/Messaging.hh" #include "mgm/FsView.hh" #include "namespace/interface/IView.hh" #include "namespace/interface/IFsView.hh" #include "namespace/Prefetcher.cc" #include "qclient/structures/QSet.hh" #include "json/json.h" EOSMGMNAMESPACE_BEGIN const std::string Fsck::sFsckKey {"fsck" }; const std::string Fsck::sCollectKey {"toggle-collect"}; const std::string Fsck::sCollectIntervalKey {"collect-interval-min"}; const std::string Fsck::sRepairKey {"toggle-repair"}; const std::string Fsck::sRepairCategory {"repair-category"}; using eos::common::FsckErr; using eos::common::ConvertToFsckErr; //------------------------------------------------------------------------------ // Constructor //------------------------------------------------------------------------------ Fsck::Fsck(): mShowOffline(false), mShowNoReplica(false), mShowDarkFiles(false), mStartProcessing(false), mCollectEnabled(false), mRepairEnabled(false), mCollectRunning(false), mRepairRunning(false), mRepairCategory(FsckErr::None), mCollectInterval(std::chrono::seconds(30 * 60)), eTimeStamp(0), mThreadPool(2, mMaxThreadPoolSize, 10, 6, 5, "fsck") {} //------------------------------------------------------------------------------ // Stop all fsck related threads and activities //------------------------------------------------------------------------------ void Fsck::Stop() { mRepairThread.join(); mCollectorThread.join(); } //------------------------------------------------------------------------------ // Apply the FSCK configuration stored in the configuration engine //------------------------------------------------------------------------------ void Fsck::ApplyFsckConfig() { using eos::common::StringTokenizer; // Parse config of the form: key1=val1 key2=val2 etc. std::string config = FsView::gFsView.GetGlobalConfig(sFsckKey); eos_info("msg=\"apply fsck configuration\" data=\"%s\"", config.c_str()); std::map kv_map; auto pairs = StringTokenizer::split>(config, ' '); for (const auto& pair : pairs) { auto kv = StringTokenizer::split>(pair, '='); if (kv.empty()) { eos_err("msg=\"unknown fsck config data\" data=\"%s\"", config.c_str()); continue; } if (kv.size() == 1) { kv.emplace_back(""); } kv_map.emplace(kv[0], kv[1]); } // Apply the configuration to the fsck engine std::string msg; if (kv_map.count(sCollectKey) && kv_map.count(sCollectIntervalKey)) { mCollectEnabled = (kv_map[sCollectKey] == "1"); Config(sCollectKey, (mCollectEnabled ? kv_map[sCollectIntervalKey] : "off"), msg); if (kv_map.count(sRepairKey)) { mRepairEnabled = (kv_map[sRepairKey] == "1"); Config(sRepairKey, kv_map[sRepairKey], msg); } if (kv_map.count(sRepairCategory)) { Config(sRepairCategory, kv_map[sRepairCategory], msg); } } } //------------------------------------------------------------------------------ // Store the current running FSCK configuration in the config engine //------------------------------------------------------------------------------ bool Fsck::StoreFsckConfig() { using namespace std::chrono; uint32_t collect_interval_min = duration_cast (mCollectInterval).count(); // Make sure the collection internval is at least one minute if (collect_interval_min == 0) { collect_interval_min = 1; } std::ostringstream oss; oss << sCollectKey << "=" << mCollectEnabled << " " << sCollectIntervalKey << "=" << collect_interval_min << sRepairKey << "=" << mRepairEnabled << " " << sRepairCategory << "=" << eos::common::FsckErrToString(mRepairCategory); return FsView::gFsView.SetGlobalConfig(sFsckKey, oss.str()); } //------------------------------------------------------------------------------ // Apply configuration options to the fsck mechanism //------------------------------------------------------------------------------ bool Fsck::Config(const std::string& key, const std::string& value, std::string& msg) { // Make sure only one config runs at a time static std::mutex mutex; std::unique_lock serialize_lock(mutex); if (mQcl == nullptr) { if (!gOFS->mQdbCluster.empty()) { mQcl = std::make_shared (gOFS->mQdbContactDetails.members, gOFS->mQdbContactDetails.constructOptions()); } else { msg = "error: no qclient configuration for fsck"; eos_err("%s", msg.c_str()); return false; } } if (key == sCollectKey) { if (value == "off") { mCollectEnabled = false; } else { mCollectEnabled = !mCollectRunning; } if (mCollectEnabled == false) { mRepairEnabled = false; // Stop the collection and repair if (mRepairRunning) { mRepairThread.join(); } if (mCollectRunning) { mCollectorThread.join(); } } else { // If value is present then it represents the collection interval if (!value.empty()) { try { float fval = std::stof(value); if (fval == 0) { mCollectInterval = std::chrono::seconds(60); } else if (fval < 1) { mCollectInterval = std::chrono::seconds((long)std::ceil(fval * 60)); } else { mCollectInterval = std::chrono::seconds((long)fval * 60); } } catch (...) { mCollectInterval = std::chrono::seconds(30 * 60); } } if (!mCollectRunning) { mCollectorThread.reset(&Fsck::CollectErrs, this); } } if (!StoreFsckConfig()) { msg = "error: failed to store fsck configuration changes"; return false; } } else if (key == sRepairKey) { if (value.empty()) { // User triggered repair toggle mRepairEnabled = !mRepairRunning; } else { // Mandatory config coming from the stored configuration mRepairEnabled = (value == "1"); } if (mRepairEnabled == false) { if (mRepairRunning) { mRepairThread.join(); } } else { if (!mCollectEnabled) { msg = "error: repair can not be enabled without error collection"; return false; } mRepairThread.reset(&Fsck::RepairErrs, this); } if (!StoreFsckConfig()) { msg = "error: failed to store fsck configuration changes"; return false; } } else if (key == sRepairCategory) { if (value == "all") { mRepairCategory = FsckErr::None; } else { const auto tmp_cat = ConvertToFsckErr(value); if (tmp_cat == FsckErr::None) { msg = "error: unknown repair category"; return false; } else { mRepairCategory = tmp_cat; } } if (!StoreFsckConfig()) { msg = "error: failed to store fsck configuration changes"; return false; } } else if (key == "show-dark-files") { mShowDarkFiles = (value == "yes"); } else if (key == "show-offline") { mShowOffline = (value == "yes"); } else if (key == "show-no-replica") { mShowNoReplica = (value == "yes"); } else if (key == "max-queued-jobs") { try { mMaxQueuedJobs = std::stoull(value); } catch (...) { eos_err("msg=\"failed to convert max-queued-jobs\" value=%s", value.c_str()); } } else if (key == "max-thread-pool-size") { try { mMaxThreadPoolSize = std::stoul(value); mThreadPool.SetMaxThreads(mMaxThreadPoolSize); } catch (...) { eos_err("msg=\"failed to convert max-thread-pool-size\" value=%s", value.c_str()); } } else { return false; } return true; } //------------------------------------------------------------------------------ // Looping thread function collecting FSCK results //------------------------------------------------------------------------------ void Fsck::CollectErrs(ThreadAssistant& assistant) noexcept { mCollectRunning = true; eos_info("%s", "msg=\"started fsck collector thread\""); if (mQcl == nullptr) { eos_err("%s", "msg=\"failed to fsck repair thread without a qclient\""); Log("Fsck error collection disabled, missing QuarkDB configuration"); mCollectRunning = false; return; } gOFS->WaitUntilNamespaceIsBooted(); // Wait that current MGM becomes a master do { eos_debug("%s", "msg=\"fsck waiting for master MGM\""); assistant.wait_for(std::chrono::seconds(10)); } while (!assistant.terminationRequested() && !gOFS->mMaster->IsMaster()); while (!assistant.terminationRequested()) { Log("Start error collection"); Log("Filesystems to check: %lu", FsView::gFsView.GetNumFileSystems()); decltype(eFsMap) tmp_err_map; QueryQdb(tmp_err_map); { // Swap in the new list of errors and clear the rest eos::common::RWMutexWriteLock wr_lock(mErrMutex); std::swap(tmp_err_map, eFsMap); eFsUnavail.clear(); eFsDark.clear(); eTimeStamp = time(NULL); } // @note accounting the offline replicas/files is a heavy ns op. if (mShowOffline) { AccountOfflineReplicas(); PrintOfflineReplicas(); AccountOfflineFiles(); } // @note no replicas can be a really long list (e.g. PPS) if (mShowNoReplica) { AccountNoReplicaFiles(); } PrintErrorsSummary(); // @note the following operation is a heavy ns op. if (mShowDarkFiles) { AccountDarkFiles(); } Log("Finished error collection"); Log("Next run in %d minutes", std::chrono::duration_cast(mCollectInterval).count()); // Notify the repair thread that it can run now mStartProcessing = true; PublishLogs(); // Wait for next FSCK round ... assistant.wait_for(mCollectInterval); } ResetErrorMaps(); Log("Stop error collection"); PublishLogs(); eos_info("%s", "msg=\"stopped fsck collector thread\""); mCollectRunning = false; } //------------------------------------------------------------------------------ // Method submitting fsck repair jobs to the thread pool //------------------------------------------------------------------------------ void Fsck::RepairErrs(ThreadAssistant& assistant) noexcept { mRepairRunning = true; eos_info("%s", "msg=\"started fsck repair thread\""); if (mQcl == nullptr) { eos_err("%s", "msg=\"failed to fsck repair thread without a qclient\""); Log("Fsck error repair disabled, missing QuarkDB configuration"); mRepairRunning = false; return; } gOFS->WaitUntilNamespaceIsBooted(); while (!assistant.terminationRequested()) { // Don't run if we are not a master while (!gOFS->mMaster->IsMaster()) { assistant.wait_for(std::chrono::seconds(1)); if (assistant.terminationRequested()) { eos_info("%s", "msg=\"stopped fsck repair thread\""); mRepairRunning = false; return; } } // Wait for the collector thread to signal us while (!mStartProcessing) { assistant.wait_for(std::chrono::seconds(1)); if (assistant.terminationRequested()) { eos_info("%s", "msg=\"stopped fsck repair thread\""); mRepairRunning = false; return; } } // Create local struct for errors so that we avoid the iterator invalidation // and the long locks std::map > > local_emap; { eos::common::RWMutexReadLock rd_lock(mErrMutex); local_emap.insert(eFsMap.begin(), eFsMap.end()); } uint64_t count = 0ull; uint64_t msg_delay = 0; std::list err_priority{ "stripe_err", "blockxs_err", "unreg_n", "rep_diff_n", "rep_missing_n", "m_mem_sz_diff", "m_cx_diff", "d_mem_sz_diff", "d_cx_diff"}; for (const auto& err_type : err_priority) { // Repair only targeted categories if this option is set if ((mRepairCategory != FsckErr::None) && (mRepairCategory != ConvertToFsckErr(err_type))) { continue; } for (const auto& elem : local_emap[err_type]) { if (!gOFS->mFidTracker.AddEntry(elem.first, TrackerType::Fsck)) { eos_debug("msg=\"skip already scheduled transfer\" fxid=%08llx", elem.first); continue; } std::shared_ptr job{ new FsckEntry(elem.first, elem.second, err_type, mQcl)}; mThreadPool.PushTask([job]() { return job->Repair(); }); // Check regularly if we should exit and sleep if queue is full while ((mThreadPool.GetQueueSize() > mMaxQueuedJobs) || (++count % 100 == 0)) { if (assistant.terminationRequested()) { // Wait that there are not more jobs in the queue - this can // take a while depending on the queue size while (mThreadPool.GetQueueSize()) { std::this_thread::sleep_for(std::chrono::seconds(1)); if (++msg_delay % 5 == 0) { eos_info("%s", "msg=\"stopping fsck repair waiting for thread " "pool queue to be consummed\""); msg_delay = 0; } } eos_info("%s", "msg=\"stopped fsck repair thread\""); mRepairRunning = false; return; } else { if (mThreadPool.GetQueueSize() > mMaxQueuedJobs) { assistant.wait_for(std::chrono::seconds(1)); } else { break; } } } } } // Remove orphans from unavailable filesystems for (const auto& [fid, fsids] : local_emap[eos::common::FSCK_ORPHANS_N]) { for (const auto& fsid : fsids) { eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex); FileSystem* fs = FsView::gFsView.mIdView.lookupByID(fsid); if (!fs) { eos_info("msg=\"dropping orphans for missing filesystem\" " "fxid=%08llx fsid=%d", fid, fsid); NotifyFixedErr(fid, fsid, eos::common::FSCK_ORPHANS_N); } } } // Force flush any collected notifications NotifyFixedErr(0ull, 0ul, "", true); gOFS->mFidTracker.Clear(TrackerType::Fsck); mStartProcessing = false; eos_info("%s", "msg=\"loop in fsck repair thread\""); } // Wait that there are no more jobs in the queue while (mThreadPool.GetQueueSize()) { assistant.wait_for(std::chrono::seconds(1)); } eos_info("%s", "msg=\"stopped fsck repair thread\""); mRepairRunning = false; } //------------------------------------------------------------------------------ // Try to repair a given entry //------------------------------------------------------------------------------ bool Fsck::RepairEntry(eos::IFileMD::id_t fid, const std::set& fsid_err, const std::string& err_type, bool async, std::string& out_msg) { if (fid == 0ull) { eos_err("%s", "msg=\"not such file id 0\""); return false; } std::shared_ptr job { new FsckEntry(fid, fsid_err, err_type, mQcl)}; if (async) { out_msg = "msg=\"repair job submitted\""; mThreadPool.PushTask([job]() { return job->Repair(); }); } else { if (!job->Repair()) { out_msg = "msg=\"repair job failed\""; return false; } else { out_msg = "msg=\"repair successful\""; } } return true; } //------------------------------------------------------------------------------ // Print the current log output //------------------------------------------------------------------------------ void Fsck::PrintOut(std::string& out, bool monitor_fmt) const { std::ostringstream oss; if (monitor_fmt) { static time_t current_time; time(¤t_time); oss << "timestamp=" << current_time << std::endl << "collection_thread=" << (mCollectEnabled ? "enabled" : "disabled") << std::endl << "repair_thread=" << (mRepairEnabled ? "enabled" : "disabled") << std::endl << "repair_category=" << ((mRepairCategory == FsckErr::None) ? "all" : eos::common::FsckErrToString(mRepairCategory)) << std::endl; } else { oss << "Info: collection thread status -> " << (mCollectEnabled ? "enabled" : "disabled") << std::endl << "Info: repair thread status -> " << (mRepairEnabled ? "enabled" : "disabled") << std::endl << "Info: repair category -> " << ((mRepairCategory == FsckErr::None) ? "all" : eos::common::FsckErrToString(mRepairCategory)) << std::endl; } { XrdSysMutexHelper lock(mLogMutex); oss << (monitor_fmt ? mLogMonitor : mLog); } out = oss.str(); } //------------------------------------------------------------------------------ // Get the require format for the given file identifier. Empty if no format // requested. //------------------------------------------------------------------------------ std::string Fsck::GetFidFormat(eos::IFileMD::id_t fid, bool display_fxid, bool display_lfn) const { if (display_fxid) { return eos::common::FileId::Fid2Hex(fid); } else if (display_lfn) { eos::Prefetcher::prefetchFileMDWithParentsAndWait(gOFS->eosView, fid); eos::common::RWMutexReadLock lock(gOFS->eosViewRWMutex); try { auto fmd = gOFS->eosFileService->getFileMD(fid); return gOFS->eosView->getUri(fmd.get()); } catch (eos::MDException& e) { return "undefined"; } } return ""; } //------------------------------------------------------------------------------ // Return the current FSCK report //------------------------------------------------------------------------------ bool Fsck::Report(std::string& out, const std::set tags, bool display_per_fs, bool display_fxid, bool display_lfn, bool display_json) { std::ostringstream oss; eos::common::RWMutexReadLock rd_lock(mErrMutex); if (display_json) { ReportJsonFormat(oss, tags, display_per_fs, display_fxid, display_lfn); } else { ReportMonitorFormat(oss, tags, display_per_fs, display_fxid, display_lfn); } out = oss.str(); return true; } //------------------------------------------------------------------------------ // Create report in JSON format //------------------------------------------------------------------------------ void Fsck::ReportJsonFormat(std::ostringstream& oss, const std::set tags, bool display_per_fs, bool display_fxid, bool display_lfn) const { Json::Value json; if (display_per_fs) { // Display per file system std::map>> fs_fxid; for (const auto& elem_map : eFsMap) { if (!tags.empty() && (tags.find(elem_map.first) == tags.end())) { continue; // skip unselected } for (const auto& elem : elem_map.second) { for (const auto& fsid : elem.second) { fs_fxid[fsid][elem_map.first].insert(elem.first); } } } for (const auto& elem : fs_fxid) { for (auto it = elem.second.begin(); it != elem.second.end(); ++it) { Json::Value json_entry; json_entry["timestamp"] = (Json::Value::UInt64)eTimeStamp; json_entry["fsid"] = (int) elem.first; json_entry["tag"] = it->first.c_str(); json_entry["count"] = (int) it->second.size(); if (!display_fxid && !display_lfn) { json.append(json_entry); continue; } Json::Value json_ids; for (auto it_fid = it->second.begin(); it_fid != it->second.end(); ++it_fid) { json_ids.append(GetFidFormat(*it_fid, display_fxid, display_lfn)); } if (display_fxid) { json_entry["fxid"] = json_ids; } else if (display_lfn) { json_entry["lfn"] = json_ids; } json.append(json_entry); } } } else { for (const auto& elem_map : eFsMap) { if (!tags.empty() && (tags.find(elem_map.first) == tags.end())) { continue; // skip unselected } Json::Value json_entry; json_entry["timestamp"] = (Json::Value::UInt64) eTimeStamp; json_entry["tag"] = elem_map.first; Json::Value json_ids; std::set fids; for (const auto& elem : elem_map.second) { fids.insert(elem.first); } for (auto it = fids.begin(); it != fids.end(); ++it) { json_ids.append(GetFidFormat(*it, display_fxid, display_lfn)); } json_entry["count"] = (Json::Value::UInt64)fids.size(); if (display_fxid) { json_entry["fxid"] = json_ids; } else if (display_lfn) { json_entry["lfn"] = json_ids; } json.append(json_entry); } } // List shadow filesystems if (!eFsDark.empty()) { for (auto fsit = eFsDark.begin(); fsit != eFsDark.end(); fsit++) { Json::Value json_entry; json_entry["timestamp"] = (Json::Value::UInt64) eTimeStamp; json_entry["tag"] = "shadow_fsid"; json_entry["fsid"] = (Json::Value::UInt64)fsit->first; json_entry["count"] = (Json::Value::UInt64)fsit->second; json.append(json_entry); } } oss << json; } //------------------------------------------------------------------------------ // Create report in monitor format //------------------------------------------------------------------------------ void Fsck::ReportMonitorFormat(std::ostringstream& oss, const std::set tags, bool display_per_fs, bool display_fxid, bool display_lfn) const { if (display_per_fs) { // Display per file system std::map>> fs_fxid; for (const auto& elem_map : eFsMap) { if (!tags.empty() && (tags.find(elem_map.first) == tags.end())) { continue; // skip unselected } for (const auto& elem : elem_map.second) { for (const auto& fsid : elem.second) { fs_fxid[fsid][elem_map.first].insert(elem.first); } } } for (const auto& elem : fs_fxid) { for (auto it = elem.second.begin(); it != elem.second.end(); ++it) { oss << "timestamp=" << eTimeStamp << " fsid=" << elem.first << " tag=\"" << it->first << "\" count=" << it->second.size(); if (display_fxid) { oss << " fxid="; } else if (display_lfn) { oss << " lfn="; } else { oss << std::endl; continue; } for (auto it_fid = it->second.begin(); it_fid != it->second.end(); ++it_fid) { oss << GetFidFormat(*it_fid, display_fxid, display_lfn); if (it_fid != std::prev(it->second.end())) { oss << ", "; } } oss << std::endl; } } } else { for (const auto& elem_map : eFsMap) { if (!tags.empty() && (tags.find(elem_map.first) == tags.end())) { continue; // skip unselected } oss << "timestamp=" << eTimeStamp << " tag=\"" << elem_map.first << "\""; uint64_t count = 0ull; for (const auto& elem : elem_map.second) { count += elem.second.size(); } oss << " count=" << count; if (display_fxid) { oss << " fxid="; } else if (display_lfn) { oss << " lfn="; } else { oss << std::endl; continue; } std::set fids; for (const auto& elem : elem_map.second) { fids.insert(elem.first); } for (auto it = fids.begin(); it != fids.end(); ++it) { oss << GetFidFormat(*it, display_fxid, display_lfn); if (it != std::prev(fids.end())) { oss << ", "; } } oss << std::endl; } } // List shadow filesystems if (!eFsDark.empty()) { for (auto fsit = eFsDark.begin(); fsit != eFsDark.end(); ++fsit) { oss << "timestamp=" << eTimeStamp << " tag=\"shadow_fsid\"" << " fsid=" << fsit->first << " count=" << fsit->second; } } } //------------------------------------------------------------------------------ // Clear the current FSCK log //------------------------------------------------------------------------------ void Fsck::PublishLogs() { XrdSysMutexHelper lock(mLogMutex); mLog = mTmpLog; mTmpLog.clear(); mLogMonitor = mTmpLogMonitor; mTmpLogMonitor.clear(); } //------------------------------------------------------------------------------ // Write log message to the current in-memory log //------------------------------------------------------------------------------ void Fsck::Log(const char* msg, ...) const { static time_t current_time; static struct timeval tv; static struct timezone tz; static struct tm* tm; va_list args; va_start(args, msg); char buffer[16384]; char* ptr; time(¤t_time); gettimeofday(&tv, &tz); tm = localtime(¤t_time); sprintf(buffer, "%02d%02d%02d %02d:%02d:%02d %lu.%06lu ", tm->tm_year - 100, tm->tm_mon + 1, tm->tm_mday, tm->tm_hour, tm->tm_min, tm->tm_sec, current_time, (unsigned long) tv.tv_usec); ptr = buffer + strlen(buffer); vsprintf(ptr, msg, args); XrdSysMutexHelper lock(mLogMutex); mTmpLog += buffer; mTmpLog += "\n"; va_end(args); } //------------------------------------------------------------------------------ // Write log message to the current in-memory log in monitoring format //------------------------------------------------------------------------------ void Fsck::LogMonitor(const char* msg, ...) const { va_list args; va_start(args, msg); char buffer[16384]; vsprintf(buffer, msg, args); XrdSysMutexHelper lock(mLogMutex); mTmpLogMonitor += buffer; mTmpLogMonitor += "\n"; va_end(args); } //------------------------------------------------------------------------------ // Reset all collected errors in the error map //------------------------------------------------------------------------------ void Fsck::ResetErrorMaps() { eos::common::RWMutexWriteLock wr_lock(mErrMutex); eFsMap.clear(); eFsUnavail.clear(); eFsDark.clear(); eTimeStamp = time(NULL); } //------------------------------------------------------------------------------ // Account for offline replicas due to unavailable file systems //------------------------------------------------------------------------------ void Fsck::AccountOfflineReplicas() { // Grab all files which are damaged because filesystems are down eos::common::RWMutexWriteLock wr_lock(mErrMutex); eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex); for (auto it = FsView::gFsView.mIdView.cbegin(); it != FsView::gFsView.mIdView.cend(); ++it) { // protect against illegal 0 filesystem pointer if (!it->second) { eos_crit("msg=\"found illegal pointer in filesystem view\" fsid=%lu", it->first); continue; } eos::common::FileSystem::fsid_t fsid = it->first; eos::common::ActiveStatus fsactive = it->second->GetActiveStatus(); eos::common::ConfigStatus fsconfig = it->second->GetConfigStatus(); eos::common::BootStatus fsstatus = it->second->GetStatus(); if ((fsstatus == eos::common::BootStatus::kBooted) && (fsconfig >= eos::common::ConfigStatus::kDrain) && (fsactive == eos::common::ActiveStatus::kOnline)) { // Healthy, don't need to do anything continue; } else { // Not ok and contributes to replica offline errors try { eos::Prefetcher::prefetchFilesystemFileListAndWait(gOFS->eosView, gOFS->eosFsView, fsid); // Only need the view lock if we're in-memory eos::common::RWMutexReadLock nslock; if (gOFS->eosView->inMemory()) { nslock.Grab(gOFS->eosViewRWMutex); } for (auto it_fid = gOFS->eosFsView->getFileList(fsid); (it_fid && it_fid->valid()); it_fid->next()) { eFsUnavail[fsid]++; eFsMap["rep_offline"][it_fid->getElement()].insert(fsid); } } catch (eos::MDException& e) { errno = e.getErrno(); eos_debug("caught exception %d %s\n", e.getErrno(), e.getMessage().str().c_str()); } } } } //------------------------------------------------------------------------------ // Account for file with no replicas //------------------------------------------------------------------------------ void Fsck::AccountNoReplicaFiles() { // Grab all files which have no replicas at all try { eos::common::RWMutexWriteLock wr_lock(mErrMutex); eos::common::RWMutexReadLock ns_rd_lock(gOFS->eosViewRWMutex); // it_fid not invalidated when items are added or removed for QDB for (auto it_fid = gOFS->eosFsView->getStreamingNoReplicasFileList(); (it_fid && it_fid->valid()); it_fid->next()) { ns_rd_lock.Release(); eos::Prefetcher::prefetchFileMDWithParentsAndWait(gOFS->eosView, it_fid->getElement()); ns_rd_lock.Grab(gOFS->eosViewRWMutex); auto fmd = gOFS->eosFileService->getFileMD(it_fid->getElement()); std::string path = gOFS->eosView->getUri(fmd.get()); XrdOucString fullpath = path.c_str(); if (fullpath.beginswith(gOFS->MgmProcPath)) { // Don't report eos /proc files continue; } if (fmd && (!fmd->isLink())) { eFsMap["zero_replica"][it_fid->getElement()].insert(0); } } } catch (eos::MDException& e) { errno = e.getErrno(); eos_debug("msg=\"caught exception\" errno=d%d msg=\"%s\"", e.getErrno(), e.getMessage().str().c_str()); } } //------------------------------------------------------------------------------ // Print offline replicas summary //------------------------------------------------------------------------------ void Fsck::PrintOfflineReplicas() const { eos::common::RWMutexReadLock rd_lock(mErrMutex); // Loop over unavailable filesystems for (auto ua_it = eFsUnavail.cbegin(); ua_it != eFsUnavail.cend(); ++ua_it) { std::string host = "not configured"; eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex); FileSystem* fs = FsView::gFsView.mIdView.lookupByID(ua_it->first); if (fs) { host = fs->GetString("hostport"); } Log("host=%s fsid=%lu replica_offline=%llu", host.c_str(), ua_it->first, ua_it->second); } } //------------------------------------------------------------------------------ // Account for offline files or files that require replica adjustments // i.e. file_offline and adjust_replica //------------------------------------------------------------------------------ void Fsck::AccountOfflineFiles() { using eos::common::LayoutId; // Loop over all replica_offline and layout error files to assemble a // file offline list std::set fid2check; { eos::common::RWMutexReadLock rd_lock(mErrMutex); auto it_offline = eFsMap.find("rep_offline"); if (it_offline != eFsMap.end()) { for (const auto& elem : it_offline->second) { fid2check.insert(elem.first); } } auto it_diff_n = eFsMap.find("rep_diff_n"); if (it_diff_n != eFsMap.end()) { for (const auto& elem : it_diff_n->second) { fid2check.insert(elem.first); } } } for (auto it = fid2check.begin(); it != fid2check.end(); ++it) { std::shared_ptr fmd; eos::IFileMD::LocationVector loc_vect; eos::IFileMD::layoutId_t lid {0ul}; size_t nlocations {0}; try { // Check if locations are online eos::Prefetcher::prefetchFileMDAndWait(gOFS->eosView, *it); eos::common::RWMutexReadLock ns_rd_lock(gOFS->eosViewRWMutex); fmd = gOFS->eosFileService->getFileMD(*it); lid = fmd->getLayoutId(); nlocations = fmd->getNumLocation(); loc_vect = fmd->getLocations(); } catch (eos::MDException& e) { continue; } size_t offlinelocations = 0; eos::common::RWMutexWriteLock wr_lock(mErrMutex); eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex); for (const auto& loc : loc_vect) { if (loc) { FileSystem* fs = FsView::gFsView.mIdView.lookupByID(loc); if (fs) { eos::common::BootStatus bootstatus = fs->GetStatus(true); eos::common::ConfigStatus configstatus = fs->GetConfigStatus(); bool conda = (fs->GetActiveStatus() == eos::common::ActiveStatus::kOffline); bool condb = (bootstatus != eos::common::BootStatus::kBooted); bool condc = (configstatus == eos::common::ConfigStatus::kDrainDead); if (conda || condb || condc) { ++offlinelocations; } } } } unsigned long layout_type = LayoutId::GetLayoutType(lid); if (layout_type == LayoutId::kReplica) { if (offlinelocations == nlocations) { eFsMap["file_offline"][*it].insert(0); } } else if (layout_type >= LayoutId::kArchive) { // Proper condition for RAIN layout if (offlinelocations > LayoutId::GetRedundancyStripeNumber(lid)) { eFsMap["file_offline"][*it].insert(0); } } if (offlinelocations && (offlinelocations != nlocations)) { eFsMap["adjust_replica"][*it].insert(0); } } } //------------------------------------------------------------------------------ // Print summary of the different type of errors collected so far and their // corresponding counters //------------------------------------------------------------------------------ void Fsck::PrintErrorsSummary() const { eos::common::RWMutexReadLock rd_lock(mErrMutex); for (const auto& elem_type : eFsMap) { uint64_t count {0ull}; for (const auto& elem_errs : elem_type.second) { count += elem_errs.second.size(); } Log("%-30s : %llu", elem_type.first.c_str(), count); LogMonitor("%s=%llu", elem_type.first.c_str(), count); } } //------------------------------------------------------------------------------ // Account for "dark" file entries i.e. file system ids which have file // entries in the namespace view but have no configured file system in the // FsView. //------------------------------------------------------------------------------ void Fsck::AccountDarkFiles() { eos::common::RWMutexWriteLock wr_lock(mErrMutex); eos::common::RWMutexReadLock fs_rd_lock(FsView::gFsView.ViewMutex); eos::common::RWMutexReadLock ns_rd_lock(gOFS->eosViewRWMutex); for (auto it = gOFS->eosFsView->getFileSystemIterator(); it->valid(); it->next()) { IFileMD::location_t nfsid = it->getElement(); try { // @todo(gbitzes): Urgent fix for QDB namespace needed.. This loop // will need to load all filesystems in memory, just to get a couple // of silly counters. uint64_t num_files = gOFS->eosFsView->getNumFilesOnFs(nfsid); if (num_files) { // Check if this exists in the gFsView if (!FsView::gFsView.mIdView.exists(nfsid)) { eFsDark[nfsid] += num_files; Log("shadow fsid=%lu shadow_entries=%llu ", nfsid, num_files); } } } catch (const eos::MDException& e) { // ignore } } } //------------------------------------------------------------------------------ // Query QDB for fsck errors //------------------------------------------------------------------------------ void Fsck::QueryQdb(ErrMapT& err_map) { static std::set known_errs = eos::common::GetKnownFsckErrs(); qclient::QSet set_errs(*mQcl.get(), ""); // Helper function to parse fsck info stored in QDB auto parse_fsck = [](const std::string & data) -> std::pair { const size_t pos = data.find(':'); if ((pos == std::string::npos) || (pos == data.length())) { eos_static_err("msg=\"failed to parse fsck element\" data=\"%s\"", data.c_str()); return {0ull, 0ul}; } eos::IFileMD::id_t fid; eos::common::FileSystem::fsid_t fsid; if (!eos::common::StringToNumeric(data.substr(0, pos), fid) || !eos::common::StringToNumeric(data.substr(pos + 1), fsid)) { eos_static_err("msg=\"failed to convert fsck info\" data=\"%s\"", data.c_str()); return {0ull, 0ul}; } return {fid, fsid}; }; eos_static_info("%s", "msg=\"check for fsck errors\""); for (const auto& err_type : known_errs) { set_errs.setKey(SSTR("fsck:" << err_type)); for (auto it = set_errs.getIterator(); it.valid(); it.next()) { // Set elements are in the form: fid:fsid auto pair_info = parse_fsck(it.getElement()); err_map[err_type][pair_info.first].insert(pair_info.second); } } return; } //------------------------------------------------------------------------------ // Update the backend given the successful outcome of the repair //------------------------------------------------------------------------------ void Fsck::NotifyFixedErr(eos::IFileMD::id_t fid, eos::common::FileSystem::fsid_t fsid_err, const std::string& err_type, bool force, uint32_t count_flush) { eos_static_debug("msg=\"fsck notification\" fxid=%08llx fsid=%lu " "err=%s", fid, fsid_err, err_type.c_str()); static std::mutex mutex; static uint64_t num_updates = 0ull; static std::map> updates; std::unique_lock scope_lock(mutex); if ((fid || fsid_err) && !err_type.empty() && (err_type != "none")) { auto it = updates.find(err_type); if (it == updates.end()) { auto resp = updates.emplace(err_type, std::set()); it = resp.first; } const std::string value = SSTR(fid << ':' << fsid_err); auto resp = it->second.insert(value); if (resp.second == true) { ++num_updates; } } // Decide if time based flushing is needed using namespace std::chrono; static uint32_t s_flush_timeout_sec {60}; static std::atomic s_timestamp = duration_cast (system_clock::now().time_since_epoch()).count(); bool do_flush = false; uint64_t current_timestamp = duration_cast (system_clock::now().time_since_epoch()).count(); if (current_timestamp - s_timestamp > s_flush_timeout_sec) { s_timestamp = current_timestamp; do_flush = true; } // Eventually flush the contents to the QDB backend if sentinel fid present // or if enough updates accumulated if (force || do_flush || (num_updates >= count_flush)) { qclient::QSet qset(*mQcl.get(), ""); for (const auto& elem : updates) { qset.setKey(SSTR("fsck:" << elem.first)); std::list values(elem.second.begin(), elem.second.end()); if (qset.srem(values) != (long long int)values.size()) { eos_static_err("msg=\"failed to delete some fsck errors\" err_type=%s", elem.first.c_str()); } } num_updates = 0ull; updates.clear(); } } EOSMGMNAMESPACE_END