//------------------------------------------------------------------------------ //! @file Iostat.hh //! @authors Andreas-Joachim Peters/Jaroslav Guenther - CERN //------------------------------------------------------------------------------ /************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2021 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #pragma once #include "common/AssistedThread.hh" #include "common/StringConversion.hh" #include "mgm/FsView.hh" #include "mgm/Namespace.hh" #include "namespace/ns_quarkdb/qclient/include/qclient/QClient.hh" #include "namespace/ns_quarkdb/qclient/include/qclient/structures/QHash.hh" #include #include #include #include #include #include #include #include namespace eos { class MetadataFlusher; namespace common { class Report; } } // namespace eos EOSMGMNAMESPACE_BEGIN //! Define the history in days we want to do popularity tracking #define IOSTAT_POPULARITY_HISTORY_DAYS 7 #define IOSTAT_POPULARITY_DAY 86400 //! Enumeration class for the 4 periods for which stats are collected enum class Period {DAY, HOUR, FIVEMIN, ONEMIN}; enum class PercentComplete {p90, p95, p99, p100}; //------------------------------------------------------------------------------ //! Class IostatPeriods holds read/write stats for the past 24h //------------------------------------------------------------------------------ class IostatPeriods { public: //---------------------------------------------------------------------------- //! Constructor //---------------------------------------------------------------------------- IostatPeriods() { memset(mDataBuffer, 0, sizeof(mDataBuffer)); memset(mIntegralBuffer, 0, sizeof(mIntegralBuffer)); } //---------------------------------------------------------------------------- //! Destructor //---------------------------------------------------------------------------- ~IostatPeriods() = default; //---------------------------------------------------------------------------- //! Add measurement to the various periods it overlaps with //! //! @param val measured value //! @param start start time of the measurement //! @param stop stop time of the measurement //! @param now current timestamp //---------------------------------------------------------------------------- void Add(unsigned long long val, time_t start, time_t stop, time_t now); //------------------------------------------------------------------------------ //! Reset bin content of the buffer w.r.t. given timstamp //------------------------------------------------------------------------------ void StampBufferZero(time_t& now); //------------------------------------------------------------------------------ //! Get the sum of values for the given buffer period //------------------------------------------------------------------------------ unsigned long long GetDataInPeriod(size_t period, unsigned long long time_offset, time_t now) const; //------------------------------------------------------------------------------ //! Get longest transfer time in past 24h //------------------------------------------------------------------------------ inline unsigned long long GetLongestTransferTime() const { return mLongestTransferTimeInSample; } //------------------------------------------------------------------------------ //! Get longest transfer report time (time it took to FST report to arrive at //! MGM) in past 24h //------------------------------------------------------------------------------ unsigned long long GetLongestReportTime() const { return mLongestReportTimeInSample; } //---------------------------------------------------------------------------- //! Return time to completion of transfer of 90/95/99/100% of data for //! transfers seen during sample time [mLastTfSampleUpdateInterval] //------------------------------------------------------------------------------ inline unsigned long long GetTimeToPercComplete(PercentComplete perc) const { return (unsigned long long)mDurationToPercComplete[(int)perc]; } //------------------------------------------------------------------------------ //! Return average transfer size seen during sample time //! [mLastTfSampleUpdateInterval] //------------------------------------------------------------------------------ inline unsigned long long GetAvgTransferSize() const { return mAvgTfSize; } //------------------------------------------------------------------------------ //! Return number of transfers seen during sample time //! [mLastTfSampleUpdateInterval] //------------------------------------------------------------------------------ inline unsigned long long GetTfCountInSample() const { return mTfCountInSample; } //------------------------------------------------------------------------------ //! Return total IostatPeriod sum //------------------------------------------------------------------------------ inline unsigned long long GetTotalSum() const { return mTotal; } //------------------------------------------------------------------------------ //! Getting the timestamp of the last time the transfer sample was taken //------------------------------------------------------------------------------ std::string GetLastSampleUpdateTimestamp(bool date_format = false) const; private: #ifdef IN_TEST_HARNESS public: #endif unsigned long long mTotal = 0ull; // If sBinWidth !=1 please beware of the trannsfer start and stop bins getting // the right transfer volume and add code block currently commented out starting // from line 199 static constexpr size_t sBinWidth = 1; static constexpr int sBins = 86400; //! Number of seconds the sBins correspond to static constexpr int sPeriod = sBins * sBinWidth; time_t mLastAddTime = 0; time_t mLastStampZeroTime = 0; // even if you wait for longest transfer time - you still do not know if the longest // How much data was transferred during ibin = mDataBuffer[ibin] double mDataBuffer[sBins]; // what we can measure is choosing a period of time [sLastTfMaxLenUpdateRate] ` // for collecting newly finished transfers, distribute these tf into bins, // --> calculate bin/sumall per bin --> integrate bins until reaching // e.g. 99% of the data transferred --> the number of bins give us duration it too to get all data // through the network in the last e.g. 5 min [sLastTfMaxLenUpdateRate] ` const double mPercComplete[4] {0.90, 0.95, 0.99, 1.0}; double mIntegralBuffer[sBins]; // Udate rate every 5 minutes const time_t mLastTfSampleUpdateInterval = 300; time_t mLastTfMaxLenUpdateTime = 0; // Average transfer size in last 5 min [sLastTfMaxLenUpdateRate] unsigned long long mAvgTfSize = 0; unsigned long long mDurationToPercComplete[4] {0, 0, 0, 0}; // Transfer count unsigned long long mTfCount = 0; // Transfer length is not longer because there is longer transfer in the pipe ! unsigned long long mLongestTransferTime = 0; // Monitor how long it took to the transfer report to get to the MGM unsigned long long mLongestReportTime = 0; // The next 3 variables mean the same as the last 3 above, but these are // to be exposed to the user, evaluated every [mLastTfSampleUpdateInterval] unsigned long long mTfCountInSample = 0; unsigned long long mLongestTransferTimeInSample = 0; unsigned long long mLongestReportTimeInSample = 0; //------------------------------------------------------------------------------ //! Update Transfer Buffer to iterate over and calculate how long does it take //! to transfer [mPercComplete] % of the data //! //! @param now current timestamp //------------------------------------------------------------------------------ void UpdateTransferSampleInfo(time_t now); }; //------------------------------------------------------------------------------ //! Iostat subscribes to MQ, collects and digests report messages //------------------------------------------------------------------------------ class Iostat: public eos::common::LogId { public: //! Configuration keys used in config key-val store static const char* gIostatCollect; static const char* gIostatReport; static const char* gIostatReportNamespace; static const char* gIostatPopularity; static const char* gIostatUdpTargetList; static FILE* gOpenReportFD; //---------------------------------------------------------------------------- //! Constructor //---------------------------------------------------------------------------- Iostat(); //---------------------------------------------------------------------------- //! Destructor //---------------------------------------------------------------------------- ~Iostat(); //---------------------------------------------------------------------------- //! Perform object initialization //! //! @param instance_name used to build the hash map key to be stored in QDB //! @param port instance port //! @param legacy_file path legacy iostat file path location //! //! @return true if successful, otherwise false //---------------------------------------------------------------------------- bool Init(const std::string& instance_name, int port, const std::string& legacy_file); //---------------------------------------------------------------------------- //! Apply instance level configuration concerning IoStats //! //! @param fsview pointer to FsView object //---------------------------------------------------------------------------- void ApplyIostatConfig(FsView* fsview); //---------------------------------------------------------------------------- //! Store IoStat config in the instance level configuration //! //! @param fsview pointer to FsView object //! //! @return true if successful, otherwise false //---------------------------------------------------------------------------- bool StoreIostatConfig(FsView* fsview) const; //---------------------------------------------------------------------------- //! Method executed by the thread receiving reports //! //! @param assistant reference to thread object //---------------------------------------------------------------------------- void Receive(ThreadAssistant& assistant) noexcept; //---------------------------------------------------------------------------- //! Method executed by the thread ciruclating the entires //! //! @param assistant reference to thread object //---------------------------------------------------------------------------- void Circulate(ThreadAssistant& assistant) noexcept; //---------------------------------------------------------------------------- //! Start collection thread //! //! @return true if successful, otherwise false //---------------------------------------------------------------------------- bool StartCollection(); //---------------------------------------------------------------------------- //! Stop collection thread //! //! @return true if successful, otherwise false //---------------------------------------------------------------------------- bool StopCollection(); //---------------------------------------------------------------------------- //! Start popularity thread //! //! @return true if successful, otherwise false //---------------------------------------------------------------------------- bool StartPopularity(); //---------------------------------------------------------------------------- //! Stop popularity thread //! //! @return true if successful, otherwise false //---------------------------------------------------------------------------- bool StopPopularity(); //---------------------------------------------------------------------------- //! Start daily report thread //! //! @return true if successful, otherwise false //---------------------------------------------------------------------------- bool StartReport(); //---------------------------------------------------------------------------- //! Stop daily report thread //! //! @return true if successful, otherwise false //---------------------------------------------------------------------------- bool StopReport(); //---------------------------------------------------------------------------- //! Start namespace report thread //! //! @return true if successful, otherwise false //---------------------------------------------------------------------------- bool StartReportNamespace(); //---------------------------------------------------------------------------- //! Stop namespace report thread //! //! @return true if successful, otherwise false //---------------------------------------------------------------------------- bool StopReportNamespace(); //---------------------------------------------------------------------------- //! Add UDP target //! //! @param target new UDP target //! @param store_and_lock if true store new target in config //! //! @return true if successful, otherwise false //---------------------------------------------------------------------------- bool AddUdpTarget(const std::string& target, bool store_and_lock = true); //---------------------------------------------------------------------------- //! Remove UDP target //! //! @param target UDP target to be removed //! //! @return true if successful, otherwise false //---------------------------------------------------------------------------- bool RemoveUdpTarget(const std::string& target); //---------------------------------------------------------------------------- //! Write record to the stream - used by the MGM to push entries //! //! @param report report entry //---------------------------------------------------------------------------- void WriteRecord(const std::string& record); //------------------------------------------------------------------------------ //! Print IO statistics //------------------------------------------------------------------------------ void PrintOut(XrdOucString& out, bool summary, bool details, bool monitoring, bool numerical = false, bool top = false, bool domain = false, bool apps = false, bool sample_stat = false, time_t time_ago = 0, time_t time_interval = 0, XrdOucString option = ""); //---------------------------------------------------------------------------- //! Compute and print out the namespace popularity ranking //! //! @param out output string //! @param option fileter options //---------------------------------------------------------------------------- void PrintNsPopularity(XrdOucString& out, XrdOucString option = "") const; //---------------------------------------------------------------------------- //! Print namespace activity report for given path //! //! @param path namespace path //! @param out output string //---------------------------------------------------------------------------- void PrintNsReport(const char* path, XrdOucString& out) const; //---------------------------------------------------------------------------- //! Record measurement to the various periods it overlaps with //! //! @param tag measurement info tag //! @param uid user id //! @param gid group id //! @param val measurement value //! @param start start timestamp of measurement //! @param stop stop timestamp of measurement //! @param now current timestamp //---------------------------------------------------------------------------- void Add(const std::string& tag, uid_t uid, gid_t gid, unsigned long long val, time_t start, time_t stop, time_t now); //---------------------------------------------------------------------------- //! Get sum of measurements for the given tag (looping all uids per tag) //! @note: needs a lock on the mDataMutex //! //! @param tag measurement info tag //! //! @return total value //---------------------------------------------------------------------------- unsigned long long GetTotalStatForTag(const char* tag) const; //---------------------------------------------------------------------------- //! Get sum of measurements for the given tag (looping all uids per tag) and period //! @note: needs a lock on the mDataMutex //! //! @param tag measurement info tag //! @parma period time interval of interest //! //! @return total value //---------------------------------------------------------------------------- unsigned long long GetPeriodStatForTag(const char* tag, size_t period, time_t secago = 0) const; private: #ifdef IN_TEST_HARNESS public: #endif inline static const std::string USER_ID_TYPE = "u"; inline static const std::string GROUP_ID_TYPE = "g"; ///< Max delay for cache in front of QDB static constexpr std::chrono::seconds mCacheFlushDelay {30}; //! Max cache size before flush - 30 entries per uid/gid pair times 100 users static constexpr unsigned int mMapMaxSize {3000}; google::sparse_hash_map IostatTag; google::sparse_hash_map IostatPeriodsTag; google::sparse_hash_map> IostatUid; google::sparse_hash_map> IostatGid; google::sparse_hash_map> IostatPeriodsUid; google::sparse_hash_map> IostatPeriodsGid; google::sparse_hash_map IostatPeriodsDomainIOrb; google::sparse_hash_map IostatPeriodsDomainIOwb; google::sparse_hash_map IostatPeriodsAppIOrb; google::sparse_hash_map IostatPeriodsAppIOwb; std::atomic mDoneInit; //! Flusher to QDB backend std::unique_ptr mFlusher; std::string mFlusherPath; //! Mutex protecting the above data structures std::mutex mDataMutex; //! If true then use the file based approach otherwise store info in QDB std::atomic mLegacyMode; //! File path where statistics are stored on disk std::string mLegacyFilePath; std::atomic mRunning; //! Internal QClient object std::unique_ptr mQcl; //! Flag to store reports in the local report store std::atomic mReport; //! Flag if we should fill the report namespace std::atomic mReportNamespace; //! Flag if we fill the popularity maps (protected by this::Mutex) std::atomic mReportPopularity; //! QuarkDB hash map key name where info is saved std::string mHashKeyBase; //! Map of cached IoStat updates std::map mMapCacheUpdates; std::mutex mThreadSyncMutex; ///< Mutex serializing thread(s) start/stop AssistedThread mReceivingThread; ///< Looping thread receiving reports AssistedThread mCirculateThread; ///< Looping thread circulating report //! Mutex protecting the UDP broadcast data structures that follow mutable std::mutex mBcastMutex; //! Destinations for udp popularity packets std::set mUdpPopularityTarget; //! Socket to the udp destination(s) std::map mUdpSocket; //! Socket address structure to be reused for messages std::map mUdpSockAddr; //! Mutex protecting the popularity data structures mutable std::mutex mPopularityMutex; //! Popularity data structure struct Popularity { unsigned int nread; unsigned long long rb; }; //! Points to the bin which was last used in IostatPopularity std::atomic mLastPopularityBin; google::sparse_hash_map IostatPopularity[IOSTAT_POPULARITY_HISTORY_DAYS]; typedef std::pair popularity_t; //---------------------------------------------------------------------------- //! Value comparator for number of reads //---------------------------------------------------------------------------- struct PopularityCmp_nread { bool operator()(popularity_t const& l, popularity_t const& r) { if (l.second.nread == r.second.nread) { return (l.first < r.first); } return l.second.nread > r.second.nread; } }; //--------------------------------------------------------------------------- //! Value comparator for read bytes //---------------------------------------------------------------------------- struct PopularityCmp_rb { bool operator()(popularity_t const& l, popularity_t const& r) { if (l.second.rb == r.second.rb) { return (l.first < r.first); } return l.second.rb > r.second.rb; } }; //---------------------------------------------------------------------------- //! Record measurements directly in QDB //! //! @param tag measurement info tag //! @param uid user id //! @param gid group id //! @param val measurement value //---------------------------------------------------------------------------- void AddToQdb(const std::string& tag, uid_t uid, gid_t gid, unsigned long long val); //---------------------------------------------------------------------------- //! Do the UDP broadcast //! //! @param report pointer to report object //---------------------------------------------------------------------------- void UdpBroadCast(eos::common::Report* report) const; //---------------------------------------------------------------------------- //! Encode the UDP popularity targets to a string using the provided separator //! //! @param separator separator for the encoding //! //! @return encoded list of UDP popularity targets //---------------------------------------------------------------------------- std::string EncodeUdpPopularityTargets() const; //---------------------------------------------------------------------------- //! Add entry to popularity statistics //! //! @param path entry path //! @param rb read bytes //! @param start start timestamp of the operation //! @param stop stop timstamp of the operation //---------------------------------------------------------------------------- void AddToPopularity(const std::string& path, unsigned long long rb, time_t start, time_t stop); //---------------------------------------------------------------------------- //! One off migration from file based to QDB of IoStat information //! //! @param legacy_file file path for IoStat information //! //! @return true if successful, otherwise false //---------------------------------------------------------------------------- bool OneOffQdbMigration(const std::string& legacy_file); //---------------------------------------------------------------------------- //! Create/encode hash map key string from the given information //! //! @param id_type type of id, can be either user USER_ID_TYPE //! or group GROUP_ID_TYPE //! @param id_val numeric value of the id //! @param tag type of tag eg. bytes_read, bytes_write etc. //! //! @param return string representing the key to be used for storing this //! info in the hash map //---------------------------------------------------------------------------- static std::string EncodeKey(const std::string& id_type, const std::string& id_val, const std::string& tag); //---------------------------------------------------------------------------- //! Decode/parse hash map key to extract entry information //! //! @param key hash map key obtained by calling EncodeKey //! @param id_type type of id, can be either user USER_ID_TYPE //! or group GROUP_ID_TYPE //! @param id_val numeric value of the id //! @param tag type of tag eg. bytes_read, bytes_write etc. //! //! @return true if successful, otherwise false //---------------------------------------------------------------------------- static bool DecodeKey(const std::string& key, std::string& id_type, std::string& id_val, std::string& tag); //---------------------------------------------------------------------------- //! Load info from Qdb backend clearing up any memory data structures //! //! @return true if successful, otherwise false //---------------------------------------------------------------------------- bool LoadFromQdb(); //---------------------------------------------------------------------------- //! Get hash key under which info is stored in QDB. This also included the //! current year and it's cached for 5 minutes. //---------------------------------------------------------------------------- std::string GetHashKey() const; //---------------------------------------------------------------------------- //! Store statistics in legacy file format //! //! @return true if successful, otherwise false //---------------------------------------------------------------------------- bool LegacyStoreInFile(); //---------------------------------------------------------------------------- //! Restore statistics from legacy file format //! //! @return true if successful, otherwise false //---------------------------------------------------------------------------- bool LegacyRestoreFromFile(); //---------------------------------------------------------------------------- //! Save given update in the in-memory cache //! //! @param uid_key uid encoded key //! @param gid_key gid encoded key //! @param val value update //---------------------------------------------------------------------------- void CacheUpdate(const std::string& uid_key, const std::string& gid_key, unsigned long long val); //---------------------------------------------------------------------------- //! Check if the cache needs to be flushed //! //! @return true if cache must be flushed, otherwise false //---------------------------------------------------------------------------- bool ShouldFlushCache(); //---------------------------------------------------------------------------- //! Flush all cached entries to the QDB backed //---------------------------------------------------------------------------- void FlushCache(); }; EOSMGMNAMESPACE_END