/** * @project The CERN Tape Archive (CTA) * @copyright Copyright © 2021-2022 CERN * @license This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #pragma once #include "common/log/LogContext.hpp" #include "common/dataStructures/ArchiveFile.hpp" #include "common/checksum/ChecksumBlob.hpp" #include "scheduler/PostgresSchedDB/sql/Transaction.hpp" #include "scheduler/PostgresSchedDB/sql/Enums.hpp" #include namespace cta::postgresscheddb::sql { struct ArchiveJobQueueRow { uint64_t jobId = 0; uint64_t mountId = 0; ArchiveJobStatus status = ArchiveJobStatus::AJS_ToTransferForUser; std::string tapePool; std::string mountPolicy; uint16_t priority = 0; uint32_t minArchiveRequestAge = 0; uint16_t copyNb = 0; time_t startTime = 0; //!< Time the job was inserted into the queue std::string archiveReportUrl; std::string archiveErrorReportUrl; std::string requesterName; std::string requesterGroup; std::string srcUrl; uint32_t retriesWithinMount = 0; uint32_t totalRetries = 0; uint64_t lastMountWithFailure = 0; uint32_t maxTotalRetries = 0; uint32_t maxRetriesWithinMount = 0; uint32_t maxReportRetries = 0; uint32_t totalReportRetries = 0; std::vector failureLogs; std::vector reportFailureLogs; bool is_repack = false; uint64_t repackId = 0; std::string repackFilebufUrl; uint64_t repackFseq = 0; std::string repackDestVid; common::dataStructures::ArchiveFile archiveFile; ArchiveJobQueueRow() { archiveFile.reconciliationTime = 0; archiveFile.archiveFileID = 0; archiveFile.fileSize = 0; archiveFile.diskFileInfo.owner_uid = 0; archiveFile.diskFileInfo.gid = 0; archiveFile.creationTime = 0; } /** * Constructor from row * * @param row A single row from the current row of the rset */ ArchiveJobQueueRow(const rdbms::Rset &rset) { *this = rset; } ArchiveJobQueueRow& operator=(const rdbms::Rset &rset) { jobId = rset.columnUint64("JOB_ID"); mountId = rset.columnUint64("MOUNT_ID"); status = from_string( rset.columnString("STATUS") ); tapePool = rset.columnString("TAPE_POOL"); mountPolicy = rset.columnString("MOUNT_POLICY"); priority = rset.columnUint16("PRIORITY"); minArchiveRequestAge = rset.columnUint32("MIN_ARCHIVE_REQUEST_AGE"); archiveFile.archiveFileID = rset.columnUint64("ARCHIVE_FILE_ID"); archiveFile.fileSize = rset.columnUint64("SIZE_IN_BYTES"); copyNb = rset.columnUint16("COPY_NB"); startTime = rset.columnUint64("START_TIME"); archiveFile.checksumBlob.deserialize( rset.columnBlob("CHECKSUMBLOB") ); archiveFile.creationTime = rset.columnUint64("CREATION_TIME"); archiveFile.diskInstance = rset.columnString("DISK_INSTANCE"); archiveFile.diskFileId = rset.columnString("DISK_FILE_ID"); archiveFile.diskFileInfo.owner_uid = rset.columnUint32("DISK_FILE_OWNER_UID"); archiveFile.diskFileInfo.gid = rset.columnUint32("DISK_FILE_GID"); archiveFile.diskFileInfo.path = rset.columnString("DISK_FILE_PATH"); archiveReportUrl = rset.columnString("ARCHIVE_REPORT_URL"); archiveErrorReportUrl = rset.columnString("ARCHIVE_ERROR_REPORT_URL"); requesterName = rset.columnString("REQUESTER_NAME"); requesterGroup = rset.columnString("REQUESTER_GROUP"); srcUrl = rset.columnString("SRC_URL"); archiveFile.storageClass = rset.columnString("STORAGE_CLASS"); retriesWithinMount = rset.columnUint16("RETRIES_WITHIN_MOUNT"); totalRetries = rset.columnUint16("TOTAL_RETRIES"); lastMountWithFailure = rset.columnUint32("LAST_MOUNT_WITH_FAILURE"); maxTotalRetries = rset.columnUint16("MAX_TOTAL_RETRIES"); return *this; } void insert(Transaction &txn) const { // does not set mountId or jobId const char *const sql = "INSERT INTO ARCHIVE_JOB_QUEUE(" "STATUS," "TAPE_POOL," "MOUNT_POLICY," "PRIORITY," "MIN_ARCHIVE_REQUEST_AGE," "ARCHIVE_FILE_ID," "SIZE_IN_BYTES," "COPY_NB," "START_TIME," "CHECKSUMBLOB," "CREATION_TIME," "DISK_INSTANCE," "DISK_FILE_ID," "DISK_FILE_OWNER_UID," "DISK_FILE_GID," "DISK_FILE_PATH," "ARCHIVE_REPORT_URL," "ARCHIVE_ERROR_REPORT_URL," "REQUESTER_NAME," "REQUESTER_GROUP," "SRC_URL," "STORAGE_CLASS," "RETRIES_WITHIN_MOUNT," "TOTAL_RETRIES," "LAST_MOUNT_WITH_FAILURE," "MAX_TOTAL_RETRIES) VALUES (" ":STATUS," ":TAPE_POOL," ":MOUNT_POLICY," ":PRIORITY," ":MIN_ARCHIVE_REQUEST_AGE," ":ARCHIVE_FILE_ID," ":SIZE_IN_BYTES," ":COPY_NB," ":START_TIME," ":CHECKSUMBLOB," ":CREATION_TIME," ":DISK_INSTANCE," ":DISK_FILE_ID," ":DISK_FILE_OWNER_UID," ":DISK_FILE_GID," ":DISK_FILE_PATH," ":ARCHIVE_REPORT_URL," ":ARCHIVE_ERROR_REPORT_URL," ":REQUESTER_NAME," ":REQUESTER_GROUP," ":SRC_URL," ":STORAGE_CLASS," ":RETRIES_WITHIN_MOUNT," ":TOTAL_RETRIES," ":LAST_MOUNT_WITH_FAILURE," ":MAX_TOTAL_RETRIES)"; auto stmt = txn.conn().createStmt(sql); stmt.bindString(":STATUS", to_string(status)); stmt.bindString(":TAPE_POOL", tapePool); stmt.bindString(":MOUNT_POLICY", mountPolicy); stmt.bindUint16(":PRIORITY", priority); stmt.bindUint32(":MIN_ARCHIVE_REQUEST_AGE", minArchiveRequestAge); stmt.bindUint64(":ARCHIVE_FILE_ID", archiveFile.archiveFileID); stmt.bindUint64(":SIZE_IN_BYTES", archiveFile.fileSize); stmt.bindUint16(":COPY_NB", copyNb); stmt.bindUint64(":START_TIME", startTime); stmt.bindBlob(":CHECKSUMBLOB", archiveFile.checksumBlob.serialize()); stmt.bindUint64(":CREATION_TIME", archiveFile.creationTime); stmt.bindString(":DISK_INSTANCE", archiveFile.diskInstance); stmt.bindString(":DISK_FILE_ID", archiveFile.diskFileId); stmt.bindUint32(":DISK_FILE_OWNER_UID", archiveFile.diskFileInfo.owner_uid); stmt.bindUint32(":DISK_FILE_GID", archiveFile.diskFileInfo.gid); stmt.bindString(":DISK_FILE_PATH", archiveFile.diskFileInfo.path); stmt.bindString(":ARCHIVE_REPORT_URL", archiveReportUrl); stmt.bindString(":ARCHIVE_ERROR_REPORT_URL", archiveErrorReportUrl); stmt.bindString(":REQUESTER_NAME", requesterName); stmt.bindString(":REQUESTER_GROUP", requesterGroup); stmt.bindString(":SRC_URL", srcUrl); stmt.bindString(":STORAGE_CLASS", archiveFile.storageClass); stmt.bindUint16(":RETRIES_WITHIN_MOUNT", retriesWithinMount); stmt.bindUint16(":TOTAL_RETRIES", totalRetries); stmt.bindUint32(":LAST_MOUNT_WITH_FAILURE", lastMountWithFailure); stmt.bindUint16(":MAX_TOTAL_RETRIES", maxTotalRetries); stmt.executeNonQuery(); } void addParamsToLogContext(log::ScopedParamContainer& params) const { // does not set jobId params.add("mountId", mountId); params.add("status", to_string(status)); params.add("tapePool", tapePool); params.add("mountPolicy", mountPolicy); params.add("priority", priority); params.add("minArchiveRequestAge", minArchiveRequestAge); params.add("archiveFileId", archiveFile.archiveFileID); params.add("sizeInBytes", archiveFile.fileSize); params.add("copyNb", copyNb); params.add("startTime", startTime); params.add("checksumBlob", archiveFile.checksumBlob); params.add("creationTime", archiveFile.creationTime); params.add("diskInstance", archiveFile.diskInstance); params.add("diskFileId", archiveFile.diskFileId); params.add("diskFileOwnerUid", archiveFile.diskFileInfo.owner_uid); params.add("diskFileGid", archiveFile.diskFileInfo.gid); params.add("diskFilePath", archiveFile.diskFileInfo.path); params.add("archiveReportUrl", archiveReportUrl); params.add("archiveErrorReportUrl", archiveErrorReportUrl); params.add("requesterName", requesterName); params.add("requesterGroup", requesterGroup); params.add("srcUrl", srcUrl); params.add("storageClass", archiveFile.storageClass); params.add("retriesWithinMount", retriesWithinMount); params.add("totalRetries", totalRetries); params.add("lastMountWithFailure", lastMountWithFailure); params.add("maxTotalRetries", maxTotalRetries); } /** * Select unowned jobs from the queue * * @param txn Transaction to use for this query * @param status Archive Job Status to select on * @param tapepool Tapepool to select on * @param limit Maximum number of rows to return * * @return result set */ static rdbms::Rset select(Transaction &txn, ArchiveJobStatus status, const std::string& tapepool, uint32_t limit) { const char *const sql = "SELECT " "JOB_ID AS JOB_ID," "MOUNT_ID AS MOUNT_ID," "STATUS AS STATUS," "TAPE_POOL AS TAPE_POOL," "MOUNT_POLICY AS MOUNT_POLICY," "PRIORITY AS PRIORITY," "MIN_ARCHIVE_REQUEST_AGE AS MIN_ARCHIVE_REQUEST_AGE," "ARCHIVE_FILE_ID AS ARCHIVE_FILE_ID," "SIZE_IN_BYTES AS SIZE_IN_BYTES," "COPY_NB AS COPY_NB," "START_TIME AS START_TIME," "CHECKSUMBLOB AS CHECKSUMBLOB," "CREATION_TIME AS CREATION_TIME," "DISK_INSTANCE AS DISK_INSTANCE," "DISK_FILE_ID AS DISK_FILE_ID," "DISK_FILE_OWNER_UID AS DISK_FILE_OWNER_UID," "DISK_FILE_GID AS DISK_FILE_GID," "DISK_FILE_PATH AS DISK_FILE_PATH," "ARCHIVE_REPORT_URL AS ARCHIVE_REPORT_URL," "ARCHIVE_ERROR_REPORT_URL AS ARCHIVE_ERROR_REPORT_URL," "REQUESTER_NAME AS REQUESTER_NAME," "REQUESTER_GROUP AS REQUESTER_GROUP," "SRC_URL AS SRC_URL," "STORAGE_CLASS AS STORAGE_CLASS," "RETRIES_WITHIN_MOUNT AS RETRIES_WITHIN_MOUNT," "TOTAL_RETRIES AS TOTAL_RETRIES," "LAST_MOUNT_WITH_FAILURE AS LAST_MOUNT_WITH_FAILURE," "MAX_TOTAL_RETRIES AS MAX_TOTAL_RETRIES " "FROM ARCHIVE_JOB_QUEUE " "WHERE " "TAPE_POOL = :TAPE_POOL " "AND STATUS = :STATUS " "AND MOUNT_ID IS NULL " "ORDER BY PRIORITY DESC, JOB_ID " "LIMIT :LIMIT"; auto stmt = txn.conn().createStmt(sql); stmt.bindString(":TAPE_POOL", tapepool); stmt.bindString(":STATUS", to_string(status)); stmt.bindUint32(":LIMIT", limit); return stmt.executeQuery(); } /** * Select owned jobs from the queue * * @param txn Transaction to use for this query * @param status Archive Job Status to select on * @param tapepool Tapepool to select on * @param mount_id Mount id which owns this job * @param limit Maximum number of rows to return * * @return result set */ static rdbms::Rset select(Transaction &txn, ArchiveJobStatus status, const std::string& tapepool, uint32_t limit, uint64_t mount_id) { const char *const sql = "SELECT " "JOB_ID AS JOB_ID," "MOUNT_ID AS MOUNT_ID," "STATUS AS STATUS," "TAPE_POOL AS TAPE_POOL," "MOUNT_POLICY AS MOUNT_POLICY," "PRIORITY AS PRIORITY," "MIN_ARCHIVE_REQUEST_AGE AS MIN_ARCHIVE_REQUEST_AGE," "ARCHIVE_FILE_ID AS ARCHIVE_FILE_ID," "SIZE_IN_BYTES AS SIZE_IN_BYTES," "COPY_NB AS COPY_NB," "START_TIME AS START_TIME," "CHECKSUMBLOB AS CHECKSUMBLOB," "CREATION_TIME AS CREATION_TIME," "DISK_INSTANCE AS DISK_INSTANCE," "DISK_FILE_ID AS DISK_FILE_ID," "DISK_FILE_OWNER_UID AS DISK_FILE_OWNER_UID," "DISK_FILE_GID AS DISK_FILE_GID," "DISK_FILE_PATH AS DISK_FILE_PATH," "ARCHIVE_REPORT_URL AS ARCHIVE_REPORT_URL," "ARCHIVE_ERROR_REPORT_URL AS ARCHIVE_ERROR_REPORT_URL," "REQUESTER_NAME AS REQUESTER_NAME," "REQUESTER_GROUP AS REQUESTER_GROUP," "SRC_URL AS SRC_URL," "STORAGE_CLASS AS STORAGE_CLASS," "RETRIES_WITHIN_MOUNT AS RETRIES_WITHIN_MOUNT," "TOTAL_RETRIES AS TOTAL_RETRIES," "LAST_MOUNT_WITH_FAILURE AS LAST_MOUNT_WITH_FAILURE," "MAX_TOTAL_RETRIES AS MAX_TOTAL_RETRIES " "FROM ARCHIVE_JOB_QUEUE " "WHERE " "TAPE_POOL = :TAPE_POOL " "AND STATUS = :STATUS " "AND MOUNT_ID = :MOUNT_ID " "ORDER BY PRIORITY DESC, JOB_ID " "LIMIT :LIMIT"; auto stmt = txn.conn().createStmt(sql); stmt.bindString(":TAPE_POOL", tapepool); stmt.bindString(":STATUS", to_string(status)); stmt.bindUint64(":MOUNT_ID", mount_id); stmt.bindUint32(":LIMIT", limit); return stmt.executeQuery(); } /** * Assign a mount ID to the specified rows * * @param txn Transaction to use for this query * @param rowList List of table rows to claim for the new owner * @param mountId Mount ID to assign */ static void updateMountId(Transaction &txn, const std::list& rowList, uint64_t mountId); }; } // namespace cta::postgresscheddb::sql