/************************************************************************ * EOS - the CERN Disk Storage System * * Copyright (C) 2018 CERN/Switzerland * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation, either version 3 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program. If not, see .* ************************************************************************/ #include "namespace/ns_quarkdb/persistency/RequestBuilder.hh" #include "namespace/ns_quarkdb/accounting/FileSystemHandler.hh" #include "namespace/ns_quarkdb/flusher/MetadataFlusher.hh" #include "namespace/utils/FileListRandomPicker.hh" #include "common/Assert.hh" EOSNSNAMESPACE_BEGIN //------------------------------------------------------------------------------ // Constructor. //------------------------------------------------------------------------------ FileSystemHandler::FileSystemHandler(IFileMD::location_t loc, folly::Executor* executor, qclient::QClient* qcl, MetadataFlusher* flusher, bool unlinked, bool fake_clock) : location(loc), pExecutor(executor), pQcl(qcl), pFlusher(flusher), mLastCacheLoadTS(0ull), mClock(fake_clock) { if (unlinked) { target = Target::kUnlinked; } else { target = Target::kRegular; } mContents.set_deleted_key(0); mContents.set_empty_key(0xffffffffffffffffll); } //------------------------------------------------------------------------------ // Constructor for the special case of "no replica list". //------------------------------------------------------------------------------ FileSystemHandler::FileSystemHandler(folly::Executor* executor, qclient::QClient* qcl, MetadataFlusher* flusher, IsNoReplicaListTag tag) : location(0), pExecutor(executor), pQcl(qcl), pFlusher(flusher) { target = Target::kNoReplicaList; mContents.set_deleted_key(0); mContents.set_empty_key(0xffffffffffffffffll); } //------------------------------------------------------------------------------ // Ensure contents have been loaded into the cache. If so, returns // immediatelly. Otherwise, does requests to QDB to retrieve its contents. // Return value: "this" pointer. //------------------------------------------------------------------------------ FileSystemHandler* FileSystemHandler::ensureContentsLoaded() { using eos::common::SteadyClock; mLastCacheLoadTS = SteadyClock::secondsSinceEpoch(mClock.getTime()).count(); return ensureContentsLoadedAsync().get(); } //------------------------------------------------------------------------------ // Ensure contents have been loaded into the cache. If so, returns // immediatelly. Otherwise, does requests to QDB to retrieve its contents. // Return value: "this" pointer. //------------------------------------------------------------------------------ folly::Future FileSystemHandler::ensureContentsLoadedAsync() { std::unique_lock lock(mMutex); if (mCacheStatus == CacheStatus::kNotLoaded) { mChangeList.clear(); mCacheStatus = CacheStatus::kInFlight; mSplitter = folly::FutureSplitter( folly::via(pExecutor).then(&FileSystemHandler::triggerCacheLoad, this)); lock.unlock(); return mSplitter.getFuture(); } return mSplitter.getFuture(); } //------------------------------------------------------------------------------ // Return redis key holding our target filesystem list. //------------------------------------------------------------------------------ std::string FileSystemHandler::getRedisKey() const { if (target == Target::kRegular) { return eos::RequestBuilder::keyFilesystemFiles(location); } else if (target == Target::kUnlinked) { return eos::RequestBuilder::keyFilesystemUnlinked(location); } eos_assert(target == Target::kNoReplicaList); return fsview::sNoReplicaPrefix; } //------------------------------------------------------------------------------ // Trigger load. Must only be called once. //------------------------------------------------------------------------------ FileSystemHandler* FileSystemHandler::triggerCacheLoad() { pFlusher->synchronize(); IFsView::FileList temporaryContents; temporaryContents.set_deleted_key(0); temporaryContents.set_empty_key(0xffffffffffffffffll); for (auto it = getStreamingFileList(); it->valid(); it->next()) { temporaryContents.insert(it->getElement()); } // Now merge under lock, and additionally apply all entries we might have // missed between triggering the cache load, and receiving the contents. std::unique_lock lock(mMutex); eos_assert(mCacheStatus == CacheStatus::kInFlight); mContents.swap(temporaryContents); mChangeList.apply(mContents); mChangeList.clear(); mCacheStatus = CacheStatus::kLoaded; mContents.resize(0); return this; } //------------------------------------------------------------------------------ // Insert item. //------------------------------------------------------------------------------ void FileSystemHandler::insert(FileIdentifier identifier) { std::unique_lock lock(mMutex); if (mCacheStatus == CacheStatus::kNotLoaded) { // discard, we're not storing the results in-memory at all } else if (mCacheStatus == CacheStatus::kInFlight) { // record into our ChangeList to apply later, once we've received the // contents. This write is racing against cache loading, and may or may // not be reflected in the contents. mChangeList.push_back(identifier.getUnderlyingUInt64()); } else { eos_assert(mCacheStatus == CacheStatus::kLoaded); // Write directly into mContents mContents.insert(identifier.getUnderlyingUInt64()); } lock.unlock(); pFlusher->sadd(getRedisKey(), std::to_string(identifier.getUnderlyingUInt64())); } //------------------------------------------------------------------------------ // Erase item. //------------------------------------------------------------------------------ void FileSystemHandler::erase(FileIdentifier identifier) { std::unique_lock lock(mMutex); if (mCacheStatus == CacheStatus::kNotLoaded) { // discard, we're not storing the results in-memory at all } else if (mCacheStatus == CacheStatus::kInFlight) { // record into our ChangeList to apply later, once we've received the // contents. This write is racing against cache loading, and may or may // not be reflected in the contents. mChangeList.erase(identifier.getUnderlyingUInt64()); } else { eos_assert(mCacheStatus == CacheStatus::kLoaded); // Write directly into mContents mContents.erase(identifier.getUnderlyingUInt64()); mContents.resize(0); } lock.unlock(); pFlusher->srem(getRedisKey(), std::to_string(identifier.getUnderlyingUInt64())); } //------------------------------------------------------------------------------ // Get size. Careful when calling this function, it'll load all contents if // not already there. //------------------------------------------------------------------------------ uint64_t FileSystemHandler::size() { { std::shared_lock lock(mMutex); if (mCacheStatus == CacheStatus::kLoaded) { return mContents.size(); } } // Do direct call to the backend qclient::redisReplyPtr reply = pQcl->exec("SCARD", getRedisKey()).get(); if ((reply == nullptr) || (reply->type != REDIS_REPLY_INTEGER)) { // Unexpected reply just return 0 return 0ull; } return reply->integer; } //------------------------------------------------------------------------------ // Return iterator for this file system. //------------------------------------------------------------------------------ std::shared_ptr> FileSystemHandler::getFileList() { ensureContentsLoaded(); return std::shared_ptr> (new eos::FileListIterator(mContents, mMutex)); } //------------------------------------------------------------------------------ // Return streaming iterator for this file system. //------------------------------------------------------------------------------ std::shared_ptr> FileSystemHandler::getStreamingFileList() { return std::shared_ptr> (new eos::StreamingFileListIterator(*pQcl, getRedisKey())); } //------------------------------------------------------------------------------ // Delete the entire filelist. //------------------------------------------------------------------------------ void FileSystemHandler::nuke() { std::unique_lock lock(mMutex); mContents.clear(); mContents.resize(0); pFlusher->del(getRedisKey()); } //------------------------------------------------------------------------------ // Get an approximately random file in the filelist. //------------------------------------------------------------------------------ bool FileSystemHandler::getApproximatelyRandomFile(IFileMD::id_t& res) { ensureContentsLoaded(); std::shared_lock lock(mMutex); return pickRandomFile(mContents, res); } //------------------------------------------------------------------------------ // Check whether a given id_t is contained in this filelist //------------------------------------------------------------------------------ bool FileSystemHandler::hasFileId(IFileMD::id_t file) { ensureContentsLoaded(); std::shared_lock lock(mMutex); return mContents.find(file) != mContents.end(); } //------------------------------------------------------------------------------ // Clear cache if it has been inactive during the given period //------------------------------------------------------------------------------ void FileSystemHandler::clearCache(std::chrono::seconds inactive_timeout) { using eos::common::SteadyClock; using namespace std::chrono_literals; if (inactive_timeout.count()) { int64_t inactive_interval = SteadyClock::secondsSinceEpoch(mClock.getTime()).count() - mLastCacheLoadTS; if (inactive_timeout.count() > inactive_interval) { return; } } // Skip if mutex held by a long running operation if (mMutex.try_lock_for(100ms)) { if (mCacheStatus == CacheStatus::kLoaded) { mContents.clear(); mContents.resize(0); mCacheStatus = CacheStatus::kNotLoaded; } mMutex.unlock(); } } EOSNSNAMESPACE_END