/** * @project The CERN Tape Archive (CTA) * @copyright Copyright © 2023 CERN * @license This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #pragma once #include "common/log/LogContext.hpp" #include "scheduler/PostgresSchedDB/sql/Transaction.hpp" #include "scheduler/PostgresSchedDB/sql/Enums.hpp" #include "common/dataStructures/EntryLog.hpp" namespace cta::postgresscheddb::sql { struct RepackJobQueueRow { uint64_t repackReqId; std::string vid; std::string bufferUrl; RepackJobStatus status; bool isAddCopies; bool isMove; uint64_t totalFilesOnTapeAtStart; uint64_t totalBytesOnTapeAtStart; bool allFilesSelectedAtStart; uint64_t totalFilesToRetrieve; uint64_t totalBytesToRetrieve; uint64_t totalFilesToArchive; uint64_t totalBytesToArchive; uint64_t userProvidedFiles; uint64_t userProvidedBytes; uint64_t retrievedFiles; uint64_t retrievedBytes; uint64_t archivedFiles; uint64_t archivedBytes; uint64_t failedToRetrieveFiles; uint64_t failedToRetrieveBytes; uint64_t failedToCreateArchiveReq; uint64_t failedToArchiveFiles; uint64_t failedToArchiveBytes; uint64_t lastExpandedFseq; bool isExpandFinished; bool isExpandStarted; std::string mountPolicyName; bool isComplete; bool isNoRecall; std::string subReqProtoBuf; std::string destInfoProtoBuf; common::dataStructures::EntryLog createLog; time_t repackFinishedTime; RepackJobQueueRow() : repackReqId(0), status(RepackJobStatus::RRS_Pending), isAddCopies(true), isMove(true), totalFilesOnTapeAtStart(0), totalBytesOnTapeAtStart(0), allFilesSelectedAtStart(true), totalFilesToRetrieve(0), totalBytesToRetrieve(0), totalFilesToArchive(0), totalBytesToArchive(0), userProvidedFiles(0), userProvidedBytes(0), retrievedFiles(0), retrievedBytes(0), archivedFiles(0), archivedBytes(0), failedToRetrieveFiles(0), failedToRetrieveBytes(0), failedToCreateArchiveReq(0), failedToArchiveFiles(0), failedToArchiveBytes(0), lastExpandedFseq(0), isExpandFinished(false), isExpandStarted(false), isComplete(false), isNoRecall(0), repackFinishedTime(0) { } /** * Constructor from row * * @param row A single row from the current row of the rset */ RepackJobQueueRow(const rdbms::Rset &rset) { *this = rset; } RepackJobQueueRow& operator=(const rdbms::Rset &rset) { repackReqId = rset.columnUint64("REPACK_REQID"); vid = rset.columnString("VID"); bufferUrl = rset.columnString("BUFFER_URL"); status = from_string( rset.columnString("STATUS") ); isAddCopies = rset.columnBool("IS_ADD_COPIES"); isMove = rset.columnBool("IS_MOVE"); totalFilesOnTapeAtStart = rset.columnUint64("TOTAL_FILES_ON_TAPE_AT_START"); totalBytesOnTapeAtStart = rset.columnUint64("TOTAL_BYTES_ON_TAPE_AT_START"); allFilesSelectedAtStart = rset.columnBool("ALL_FILES_SELECTED_AT_START"); totalFilesToRetrieve = rset.columnUint64("TOTAL_FILES_TO_RETRIEVE"); totalBytesToRetrieve = rset.columnUint64("TOTAL_BYTES_TO_RETRIEVE"); totalFilesToArchive = rset.columnUint64("TOTAL_FILES_TO_ARCHIVE"); totalBytesToArchive = rset.columnUint64("TOTAL_BYTES_TO_ARCHIVE"); userProvidedFiles = rset.columnUint64("USER_PROVIDED_FILES"); userProvidedBytes = rset.columnUint64("USER_PROVIDED_BYTES"); retrievedFiles = rset.columnUint64("RETRIEVED_FILES"); retrievedBytes = rset.columnUint64("RETRIEVED_BYTES"); archivedFiles = rset.columnUint64("ARCHIVED_FILES"); archivedBytes = rset.columnUint64("ARCHIVED_BYTES"); failedToRetrieveFiles = rset.columnUint64("FAILED_TO_RETRIEVE_FILES"); failedToRetrieveBytes = rset.columnUint64("FAILED_TO_RETRIEVE_BYTES"); failedToCreateArchiveReq = rset.columnUint64("FAILED_TO_CREATE_ARCHIVE_REQ"); failedToArchiveFiles = rset.columnUint64("FAILED_TO_ARCHIVE_FILES"); failedToArchiveBytes = rset.columnUint64("FAILED_TO_ARCHIVE_BYTES"); lastExpandedFseq = rset.columnUint64("LAST_EXPANDED_FSEQ"); isExpandFinished = rset.columnBool("IS_EXPAND_FINISHED"); isExpandStarted = rset.columnBool("IS_EXPAND_STARTED"); mountPolicyName = rset.columnString("MOUNT_POLICY"); isComplete = rset.columnBool("IS_COMPLETE"); isNoRecall = rset.columnBool("IS_NO_RECALL"); subReqProtoBuf = rset.columnBlob("SUBREQ_PB"); destInfoProtoBuf = rset.columnBlob("DESTINFO_PB"); createLog.username = rset.columnString("CREATE_USERNAME"); createLog.host = rset.columnString("CREATE_HOST"); createLog.time = rset.columnUint64("CREATE_TIME"); repackFinishedTime = rset.columnUint64("REPACK_FINIHSED_TIME"); return *this; } void insert(Transaction &txn) const { // setting repackReqId; todo const char *const sql = "INSERT INTO REPACK_JOB_QUEUE(" "VID," "BUFFER_URL," "STATUS," "IS_ADD_COPIES," "IS_MOVE," "TOTAL_FILES_ON_TAPE_AT_START," "TOTAL_BYTES_ON_TAPE_AT_START," "ALL_FILES_SELECTED_AT_START," "TOTAL_FILES_TO_RETRIEVE," "TOTAL_BYTES_TO_RETRIEVE," "TOTAL_FILES_TO_ARCHIVE," "TOTAL_BYTES_TO_ARCHIVE," "USER_PROVIDED_FILES," "USER_PROVIDED_BYTES," "RETRIEVED_FILES," "RETRIEVED_BYTES," "ARCHIVED_FILES," "ARCHIVED_BYTES," "FAILED_TO_RETRIEVE_FILES," "FAILED_TO_RETRIEVE_BYTES," "FAILED_TO_CREATE_ARCHIVE_REQ," "FAILED_TO_ARCHIVE_FILES," "FAILED_TO_ARCHIVE_BYTES," "LAST_EXPANDED_FSEQ," "IS_EXPAND_FINISHED," "IS_EXPAND_STARTED," "MOUNT_POLICY," "IS_COMPLETE," "IS_NO_RECALL," "SUBREQ_PB," "DESTINFO_PB," "CREATE_USERNAME," "CREATE_HOST," "CREATE_TIME," "REPACK_FINISHED_TIME) VALUES (" ":VID," ":BUFFER_URL," ":STATUS," ":IS_ADD_COPIES," ":IS_MOVE," ":TOTAL_FILES_ON_TAPE_AT_START," ":TOTAL_BYTES_ON_TAPE_AT_START," ":ALL_FILES_SELECTED_AT_START," ":TOTAL_FILES_TO_RETRIEVE," ":TOTAL_BYTES_TO_RETRIEVE," ":TOTAL_FILES_TO_ARCHIVE," ":TOTAL_BYTES_ARCHIVE," ":USER_PROVIDED_FILES," ":USER_PROVIDED_BYTES," ":RETRIEVED_FILES," ":RETRIEVED_BYTES," ":ARCHIVED_FILES," ":ARCHIVED_BYTES," ":FAILED_TO_RETRIEVE_FILES," ":FAILED_TO_RETRIEVE_BYTES," ":FAILED_TO_CREATE_ARCHIVE_REQ," ":FAILED_TO_ARCHIVE_FILES," ":FAILED_TO_ARCHIVE_BYTES," ":LAST_EXPANDED_FSEQ," ":IS_EXPAND_FINISHED," ":IS_EXPAND_STARTED," ":MOUNT_POLICY," ":IS_COMPLETE," ":IS_NO_RECALL," ":SUBREQ_PB," ":DESTINFO_PB," ":CREATE_USERNAME," ":CREATE_HOST," ":CREATE_TIME," ":REPACK_FINISHED_TIME)"; auto stmt = txn.conn().createStmt(sql); stmt.bindString(":VID", vid); stmt.bindString(":BUFFER_URL", bufferUrl); stmt.bindString(":STATUS", to_string(status)); stmt.bindBool(":IS_ADD_COPIES", isAddCopies); stmt.bindBool(":IS_MOVE", isMove); stmt.bindUint64(":TOTAL_FILES_ON_TAPE_AT_START", totalFilesOnTapeAtStart); stmt.bindUint64(":TOTAL_BYTES_ON_TAPE_AT_START", totalBytesOnTapeAtStart); stmt.bindBool(":ALL_FILES_SELECTED_AT_START", allFilesSelectedAtStart); stmt.bindUint64(":TOTAL_FILES_TO_RETRIEVE", totalFilesToRetrieve); stmt.bindUint64(":TOTAL_BYTES_TO_RETRIEVE", totalBytesToRetrieve); stmt.bindUint64(":TOTAL_FILES_TO_ARCHIVE", totalFilesToArchive); stmt.bindUint64(":TOTAL_BYTES_TO_ARCHIVE", totalBytesToArchive); stmt.bindUint64(":USER_PROVIDED_FILES", userProvidedFiles); stmt.bindUint64(":USER_PROVIDED_BYTES", userProvidedBytes); stmt.bindUint64(":RETRIEVED_FILES", retrievedFiles); stmt.bindUint64(":RETRIEVED_BYTES", retrievedBytes); stmt.bindUint64(":ARCHIVED_FILES", archivedFiles); stmt.bindUint64(":ARCHIVED_BYTES", archivedBytes); stmt.bindUint64(":FAILED_TO_RETRIEVE_FILES", failedToRetrieveFiles); stmt.bindUint64(":FAILED_TO_RETRIEVE_BYTES", failedToRetrieveBytes); stmt.bindUint64(":FAILED_TO_CREATE_ARCHIVE_REQ", failedToCreateArchiveReq); stmt.bindUint64(":FAILED_TO_ARCHIVE_FILES", failedToArchiveFiles); stmt.bindUint64(":FAILED_TO_ARCHIVE_BYTES", failedToArchiveBytes); stmt.bindUint64(":LAST_EXPANDED_FSEQ", lastExpandedFseq); stmt.bindBool(":IS_EXPAND_FINISHED", isExpandFinished); stmt.bindBool(":IS_EXPAND_STARTED", isExpandStarted); stmt.bindString(":MOUNT_POLICY", mountPolicyName); stmt.bindBool(":IS_COMPLETE", isComplete); stmt.bindBool(":IS_NO_RECALL", isNoRecall); stmt.bindBlob(":SUBREQ_PB", subReqProtoBuf); stmt.bindBlob(":DESTINFO_PB", destInfoProtoBuf); stmt.bindString(":CREATE_USERNAME", createLog.username); stmt.bindString(":CREATE_HOST", createLog.host); stmt.bindUint64(":CREATE_TIME", createLog.time); stmt.bindUint64(":REPACK_FINISHED_TIME", repackFinishedTime); stmt.executeNonQuery(); } void addParamsToLogContext(log::ScopedParamContainer& params) const { // does not set repackReqId params.add("vid", vid); params.add("bufferUrl", bufferUrl); params.add("status", to_string(status)); params.add("isAddCopies", isAddCopies); params.add("isMove", isMove); params.add("totalFilesOnTapeAtStart", totalFilesOnTapeAtStart); params.add("totalBytesOnTapeAtStart", totalBytesOnTapeAtStart); params.add("allFilesSelectedAtStart", allFilesSelectedAtStart); params.add("totalFilesToRetrieve", totalFilesToRetrieve); params.add("totalBytesToRetrieve", totalBytesToRetrieve); params.add("totalFilesToArchive", totalFilesToArchive); params.add("totalBytesToArchive", totalBytesToArchive); params.add("userProvidedFiles", userProvidedFiles); params.add("userProvidedBytes", userProvidedBytes); params.add("retrievedFiles", retrievedFiles); params.add("retrievedBytes", retrievedBytes); params.add("archivedFiles", archivedFiles); params.add("archivedBytes", archivedBytes); params.add("failedToRetrieveFiles", failedToRetrieveFiles); params.add("failedToRetrieveBytes", failedToRetrieveBytes); params.add("failedToCreateArchiveReq", failedToCreateArchiveReq); params.add("failedToArchiveFiles", failedToArchiveFiles); params.add("failedToArchiveBytes", failedToArchiveBytes); params.add("lastExpandedFseq", lastExpandedFseq); params.add("isExpandFinished", isExpandFinished); params.add("isExpandStarted", isExpandStarted); params.add("mountPolicyName", mountPolicyName); params.add("isComplete", isComplete); params.add("isNoRecall", isNoRecall); // params.add("subReqProtoBuf", subReqProtoBuf); // params.add("destInfoProtoBuf", destInfoProtoBuf); params.add("createUsername", createLog.username); params.add("createHost", createLog.host); params.add("createTime", createLog.time); params.add("repackFinishedTime", repackFinishedTime); } /** * Select unowned jobs from the queue * * @param txn Transaction to use for this query * @param status Archive Job Status to select on * @param tapepool Tapepool to select on * @param limit Maximum number of rows to return * * @return result set */ static rdbms::Rset select(Transaction &txn, RepackJobStatus status, uint32_t limit) { const char *const sql = "SELECT " "REPACK_REQID AS REPACK_REQID," "VID AS VID," "BUFFER_URL AS BUFFER_URL," "STATUS AS STATUS," "IS_ADD_COPIES AS IS_ADD_COPIES," "IS_MOVE AS IS_MOVE," "TOTAL_FILES_ON_TAPE_AT_START AS TOTAL_FILES_ON_TAPE_AT_START," "TOTAL_BYTES_ON_TAPE_AT_START AS TOTAL_BYTES_ON_TAPE_AT_START," "ALL_FILES_SELECTED_AT_START AS ALL_FILES_SELECTED_AT_START," "TOTAL_FILES_TO_RETRIEVE AS TOTAL_FILES_TO_RETRIEVE," "TOTAL_BYTES_TO_RETRIEVE AS TOTAL_BYTES_TO_RETRIEVE," "TOTAL_FILES_TO_ARCHIVE AS TOTAL_FILES_TO_ARCHIVE," "TOTAL_BYTES_TO_ARCHIVE AS TOTAL_BYTES_TO_ARCHIVE," "USER_PROVIDED_FILES AS USER_PROVIDED_FILES," "USER_PROVIDED_BYTES AS USER_PROVIDED_BYTES," "RETRIEVED_FILES AS RETRIEVED_FILES," "RETRIEVED_BYTES AS RETRIEVED_BYTES," "ARCHIVED_FILES AS ARCHIVED_FILES," "ARCHIVED_BYTES AS ARCHIVED_BYTES," "FAILED_TO_RETRIEVE_FILES AS FAILED_TO_RETRIEVE_FILES," "FAILED_TO_RETRIEVE_BYTES AS FAILED_TO_RETRIEVE_BYTES," "FAILED_TO_CREATE_ARCHIVE_REQ AS FAILED_TO_CREATE_ARCHIVE_REQ," "FAILED_TO_ARCHIVE_FILES AS FAILED_TO_ARCHIVE_FILES," "FAILED_TO_ARCHIVE_BYTES AS FAILED_TO_ARCHIVE_BYTES," "LAST_EXPANDED_FSEQ AS LAST_EXPANDED_FSEQ," "IS_EXPAND_FINISHED AS IS_EXPAND_FINISHED," "IS_EXPAND_STARTED AS IS_EXPAND_STARTED," "MOUNT_POLICY AS MOUNT_POLICY," "IS_COMPLETE AS IS_COMPLETE," "IS_NO_RECALL AS IS_NO_RECALL," "SUBREQ_PB AS SUBREQ_PB," "CREATE_USERNAME AS CREATE_USERNAME," "CREATE_HOST AS CREATE_HOST," "CREATE_TIME AS CREATE_TIME," "REPACK_FINISHED_TIME AS REPACK_FINISHED_TIME " "FROM REPACK_JOB_QUEUE " "WHERE " "STATUS = :STATUS " "ORDER BY REPACK_REQID " "LIMIT :LIMIT"; auto stmt = txn.conn().createStmt(sql); stmt.bindString(":STATUS", to_string(status)); stmt.bindUint32(":LIMIT", limit); return stmt.executeQuery(); } }; } // namespace cta::postgresscheddb::sql