/* * @project The CERN Tape Archive (CTA) * @copyright Copyright © 2022 CERN * @license This program is free software, distributed under the terms of the GNU General Public * Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". You can * redistribute it and/or modify it under the terms of the GPL Version 3, or (at your * option) any later version. * * This program is distributed in the hope that it will be useful, but WITHOUT ANY * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A * PARTICULAR PURPOSE. See the GNU General Public License for more details. * * In applying this licence, CERN does not waive the privileges and immunities * granted to it by virtue of its status as an Intergovernmental Organization or * submit itself to any jurisdiction. */ #pragma once #include #include #include #include #include #include #include #include "common/dataStructures/ArchiveFileQueueCriteriaAndFileId.hpp" #include "common/dataStructures/ArchiveJob.hpp" #include "common/dataStructures/ArchiveRequest.hpp" #include "common/dataStructures/CancelRetrieveRequest.hpp" #include "common/dataStructures/DeleteArchiveRequest.hpp" #include "common/dataStructures/JobQueueType.hpp" #include "common/dataStructures/LabelFormat.hpp" #include "common/dataStructures/RepackInfo.hpp" #include "common/dataStructures/RetrieveFileQueueCriteria.hpp" #include "common/dataStructures/RetrieveJob.hpp" #include "common/dataStructures/RetrieveRequest.hpp" #include "common/dataStructures/SecurityIdentity.hpp" #include "common/log/Logger.hpp" #include "rdbms/ConnPool.hpp" #include "rdbms/Login.hpp" #include "scheduler/RetrieveJob.hpp" #include "scheduler/SchedulerDatabase.hpp" #include "common/utils/utils.hpp" #include "catalogue/TapeDrivesCatalogueState.hpp" namespace cta { namespace catalogue { class Catalogue; } class PostgresSchedDB: public SchedulerDatabase { public: PostgresSchedDB( const std::string &ownerId, log::Logger &logger, catalogue::Catalogue &catalogue, const rdbms::Login &login, const uint64_t nbConns); virtual ~PostgresSchedDB() noexcept; void waitSubthreadsComplete() override; void ping() override; std::string queueArchive(const std::string &instanceName, const cta::common::dataStructures::ArchiveRequest &request, const cta::common::dataStructures::ArchiveFileQueueCriteriaAndFileId &criteria, log::LogContext &logContext) override; std::map, std::less<>> getArchiveJobs() const override; std::list getArchiveJobs(const std::string& tapePoolName) const override; std::unique_ptr getArchiveJobQueueItor(const std::string &tapePoolName, common::dataStructures::JobQueueType queueType) const override; std::list > getNextArchiveJobsToReportBatch(uint64_t filesRequested, log::LogContext & logContext) override; JobsFailedSummary getArchiveJobsFailedSummary(log::LogContext &logContext) override; std::list> getNextRetrieveJobsToTransferBatch(const std::string & vid, uint64_t filesRequested, log::LogContext &lc) override; void requeueRetrieveRequestJobs(std::list &jobs, log::LogContext &lc) override; void reserveRetrieveQueueForCleanup(const std::string & vid, std::optional cleanupHeartBeatValue) override; void tickRetrieveQueueCleanupHeartbeat(const std::string & vid) override; void setArchiveJobBatchReported(std::list & jobsBatch, log::TimingList & timingList, utils::Timer & t, log::LogContext & lc) override; std::list getRetrieveQueuesCleanupInfo(log::LogContext& logContext) override; void setRetrieveQueueCleanupFlag(const std::string&vid, bool val, log::LogContext& logContext) override; std::list getRetrieveQueueStatistics( const cta::common::dataStructures::RetrieveFileQueueCriteria& criteria, const std::set& vidsToConsider) override; SchedulerDatabase::RetrieveRequestInfo queueRetrieve(cta::common::dataStructures::RetrieveRequest& rqst, const cta::common::dataStructures::RetrieveFileQueueCriteria &criteria, const std::optional diskSystemName, log::LogContext &logContext) override; void clearRetrieveQueueStatisticsCache(const std::string & vid) override; void cancelRetrieve(const std::string& instanceName, const cta::common::dataStructures::CancelRetrieveRequest& rqst, log::LogContext& lc) override; std::map > getRetrieveRequests() const override; std::list getRetrieveRequestsByVid(const std::string& vid) const override; std::list getRetrieveRequestsByRequester(const std::string& vid) const override; void deleteRetrieveRequest(const common::dataStructures::SecurityIdentity& requester, const std::string& remoteFile) override; void cancelArchive(const common::dataStructures::DeleteArchiveRequest& request, log::LogContext & lc) override; void deleteFailed(const std::string &objectId, log::LogContext &lc) override; std::map, std::less<>> getRetrieveJobs() const override; std::list getRetrieveJobs(const std::string &vid) const override; std::unique_ptr getRetrieveJobQueueItor(const std::string &vid, common::dataStructures::JobQueueType queueType) const override; std::string queueRepack(const SchedulerDatabase::QueueRepackRequest & repackRequest, log::LogContext &logContext) override; bool repackExists() override; std::list getRepackInfo() override; common::dataStructures::RepackInfo getRepackInfo(const std::string& vid) override; void cancelRepack(const std::string& vid, log::LogContext & lc) override; std::unique_ptr getRepackStatistics() override; std::unique_ptr getRepackStatisticsNoLock() override; std::unique_ptr getNextRepackJobToExpand() override; std::list> getNextRetrieveJobsToReportBatch( uint64_t filesRequested, log::LogContext &logContext) override; std::list> getNextRetrieveJobsFailedBatch( uint64_t filesRequested, log::LogContext &logContext) override; std::unique_ptr getNextRepackReportBatch(log::LogContext& lc) override; std::unique_ptr getNextSuccessfulRetrieveRepackReportBatch(log::LogContext& lc) override; std::unique_ptr getNextSuccessfulArchiveRepackReportBatch(log::LogContext& lc) override; std::unique_ptr getNextFailedRetrieveRepackReportBatch(log::LogContext& lc) override; std::unique_ptr getNextFailedArchiveRepackReportBatch(log::LogContext &lc) override; std::list> getRepackReportBatches(log::LogContext &lc) override; void setRetrieveJobBatchReportedToUser(std::list & jobsBatch, log::TimingList & timingList, utils::Timer & t, log::LogContext & lc) override; JobsFailedSummary getRetrieveJobsFailedSummary(log::LogContext &logContext) override; std::unique_ptr getMountInfo(log::LogContext& logContext) override; std::unique_ptr getMountInfo(log::LogContext& logContext, uint64_t timeout_us) override; void trimEmptyQueues(log::LogContext& lc) override; std::unique_ptr getMountInfoNoLock(PurposeGetMountInfo purpose, log::LogContext& logContext) override; // these are not in the baseclass but are beeded by XrdSsiCtaServiceProvider void setThreadNumber(uint64_t threadNumber, const std::optional &stackSize = std::nullopt); void setBottomHalfQueueSize(uint64_t tasksNumber); private: void fetchMountInfo(SchedulerDatabase::TapeMountDecisionInfo& tmdi, SchedulerDatabase::PurposeGetMountInfo purpose, log::LogContext& lc); std::string m_ownerId; rdbms::ConnPool m_connPool; catalogue::Catalogue& m_catalogue; log::Logger& m_logger; std::unique_ptr m_tapeDrivesState; }; } // namespace cta