/**
* @project The CERN Tape Archive (CTA)
* @copyright Copyright © 2021-2023 CERN
* @license This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
#pragma once
#include "common/log/LogContext.hpp"
#include "common/dataStructures/ArchiveFile.hpp"
#include "common/dataStructures/RetrieveRequest.hpp"
#include "common/checksum/ChecksumBlob.hpp"
#include "scheduler/PostgresSchedDB/sql/Transaction.hpp"
#include "scheduler/PostgresSchedDB/sql/Enums.hpp"
#include
#include
namespace cta::postgresscheddb::sql {
struct RetrieveJobQueueRow {
uint64_t jobId = 0;
uint64_t retrieveReqId = 0;
uint64_t mountId = 0;
RetrieveJobStatus status = RetrieveJobStatus::RJS_ToTransfer;
std::string vid;
std::optional activity;
uint16_t actCopyNb = 0;
uint16_t priority = 0;
time_t retMinReqAge = 0;
time_t startTime = 0;
std::string mountPolicyName;
std::string failureReportUrl;
std::string failureReportLog;
bool isRepack = false;
uint64_t repackReqId = 0;
bool isFailed = false;
std::string retrieveJobsProtoBuf;
std::string repackInfoProtoBuf;
cta::common::dataStructures::ArchiveFile archiveFile;
cta::common::dataStructures::RetrieveRequest retrieveRequest;
std::optional diskSystemName;
RetrieveJobQueueRow() { }
/**
* Constructor from row
*
* @param row A single row from the current row of the rset
*/
RetrieveJobQueueRow(const rdbms::Rset &rset) {
*this = rset;
}
RetrieveJobQueueRow& operator=(const rdbms::Rset &rset) {
jobId = rset.columnUint64("JOB_ID");
retrieveReqId = rset.columnUint64("RETRIEVE_REQID");
mountId = rset.columnUint64("MOUNT_ID");
status = from_string(
rset.columnString("STATUS") );
vid = rset.columnString("VID");
activity = rset.columnOptionalString("ACTIVITY");
actCopyNb = rset.columnUint16("ACTIVE_COPY_NB");
priority = rset.columnUint16("PRIORITY");
retMinReqAge = rset.columnUint64("RETRIEVE_MIN_REQ_AGE");
startTime = rset.columnUint64("STARTTIME");
retrieveJobsProtoBuf = rset.columnBlob("RETRIEVEJOB_PB");
retrieveRequest.requester.name = rset.columnString("REQUESTER_NAME");
retrieveRequest.requester.group = rset.columnString("REQUESTER_GROUP");
archiveFile.archiveFileID = rset.columnUint64("ARCHIVE_FILE_ID");
retrieveRequest.archiveFileID = archiveFile.archiveFileID;
retrieveRequest.dstURL = rset.columnString("DST_URL");
archiveFile.diskFileInfo.owner_uid = rset.columnUint32("DISK_FILE_OWNER_UID");
archiveFile.diskFileInfo.gid = rset.columnUint32("DISK_FILE_GID");
archiveFile.diskFileInfo.path = rset.columnString("DISK_FILE_PATH");
retrieveRequest.diskFileInfo = archiveFile.diskFileInfo;
retrieveRequest.creationLog.username = rset.columnString("SRR_USERNAME");
retrieveRequest.creationLog.host = rset.columnString("SRR_HOST");
retrieveRequest.creationLog.time = rset.columnUint64("SRR_TIME");
retrieveRequest.errorReportURL = rset.columnString("RETRIEVE_ERROR_REPORT_URL");
retrieveRequest.isVerifyOnly = rset.columnBool("IS_VERIFY");
mountPolicyName = rset.columnString("MOUNT_POLICY");
retrieveRequest.mountPolicy = rset.columnString("SRR_MOUNT_POLICY");
retrieveRequest.activity = rset.columnOptionalString("SRR_ACTIVITY");
archiveFile.fileSize = rset.columnUint64("SIZE_IN_BYTES");
archiveFile.diskFileId = rset.columnString("DISK_FILE_ID");
archiveFile.diskInstance = rset.columnString("DISK_INSTANCE");
archiveFile.checksumBlob.deserialize(
rset.columnBlob("CHECKSUMBLOB") );
archiveFile.creationTime = rset.columnUint64("CREATION_TIME");
archiveFile.reconciliationTime = 0;
archiveFile.storageClass = rset.columnString("STORAGE_CLASS");
failureReportUrl = rset.columnString("FAILURE_REPORT_URL");
failureReportLog = rset.columnString("FAILURE_REPORT_LOG");
isRepack = rset.columnBool("IS_REPACK");
repackReqId = rset.columnUint64("REPACK_REQID");
repackInfoProtoBuf = rset.columnBlob("RR_REPACKINFO_PB");
retrieveRequest.lifecycleTimings.creation_time = rset.columnUint64("LT_CREATE");
retrieveRequest.lifecycleTimings.first_selected_time = rset.columnUint64("LT_FIRST_SELECTED");
retrieveRequest.lifecycleTimings.completed_time = rset.columnUint64("LT_COMPLETED");
diskSystemName = rset.columnString("DISK_SYSTEM_NAME");
isFailed = rset.columnBool("IS_FAILED");
return *this;
}
void insert(Transaction &txn) const {
// does not set mountId or jobId
const char *const sql =
"INSERT INTO RETRIEVE_JOB_QUEUE("
"RETRIEVE_REQID,"
"STATUS,"
"VID,"
"ACTIVITY,"
"ACTIVE_COPY_NB,"
"PRIORITY,"
"RETRIEVE_MIN_REQ_AGE,"
"STARTTIME,"
"RETRIEVEJOB_PB,"
"REQUESTER_NAME,"
"REQUESTER_GROUP,"
"ARCHIVE_FILE_ID,"
"DST_URL,"
"DISK_FILE_OWNER_UID,"
"DISK_FILE_GID,"
"DISK_FILE_PATH,"
"SRR_USERNAME,"
"SRR_HOST,"
"SRR_TIME,"
"RETRIEVE_ERROR_REPORT_URL,"
"IS_VERIFY,"
"MOUNT_POLICY,"
"SRR_MOUNT_POLICY,"
"SRR_ACTIVITY,"
"SIZE_IN_BYTES,"
"DISK_FILE_ID,"
"DISK_INSTANCE,"
"CHECKSUMBLOB,"
"CREATION_TIME,"
"STORAGE_CLASS,"
"FAILURE_REPORT_URL,"
"FAILURE_REPORT_LOG,"
"IS_REPACK,"
"REPACK_REQID,"
"RR_REPACKINFO_PB,"
"LT_CREATE,"
"LT_FIRST_SELECTED,"
"LT_COMPLETED,"
"DISK_SYSTEM_NAME,"
"IS_FAILED) VALUES ("
":RETRIEVE_REQID,"
":STATUS,"
":VID,"
":ACTIVITY,"
":ACTIVE_COPY_NB,"
":PRIORITY,"
":RETRIEVE_MIN_REQ_AGE,"
":STARTTIME,"
":RETRIEVEJOB_PB,"
":REQUESTER_NAME,"
":REQUESTER_GROUP,"
":ARCHIVE_FILE_ID,"
":DST_URL,"
":DISK_FILE_OWNER_UID,"
":DISK_FILE_GID,"
":DISK_FILE_PATH,"
":SRR_USERNAME,"
":SRR_HOST,"
":SRR_TIME,"
":RETRIEVE_ERROR_REPORT_URL,"
":IS_VERIFY,"
":MOUNT_POLICY,"
":SRR_MOUNT_POLICY,"
":SRR_ACTIVITY,"
":SIZE_IN_BYTES,"
":DISK_FILE_ID,"
":DISK_INSTANCE,"
":CHECKSUMBLOB,"
":CREATION_TIME,"
":STORAGE_CLASS,"
":FAILURE_REPORT_URL,"
":FAILURE_REPORT_LOG,"
":IS_REPACK,"
":REPACK_REQID,"
":RR_REPACKINFO_PB,"
":LT_CREATE,"
":LT_FIRST_SELECTED,"
":LT_COMPLETED,"
":DISK_SYSTEM_NAME,"
":IS_FAILED)";
auto stmt = txn.conn().createStmt(sql);
stmt.bindUint64(":RETRIEVE_REQID", retrieveReqId);
stmt.bindString(":STATUS", to_string(status));
stmt.bindString(":VID", vid);
stmt.bindString(":ACTVITIY", activity);
stmt.bindUint16(":ACTIVE_COPY_NB", actCopyNb);
stmt.bindUint16(":PRIORITY", priority);
stmt.bindUint64(":RETRIEVE_MIN_REQ_AGE", retMinReqAge);
stmt.bindUint64(":STARTTIME", startTime);
stmt.bindBlob(":RETRIEVEJOB_PB", retrieveJobsProtoBuf);
stmt.bindString(":REQUESTER_NAME", retrieveRequest.requester.name);
stmt.bindString(":REQUESTER_GROUP", retrieveRequest.requester.group);
stmt.bindUint64(":ARCHIVE_FILE_ID", archiveFile.archiveFileID);
stmt.bindString(":DST_URL", retrieveRequest.dstURL);
stmt.bindUint32(":DISK_FILE_OWNER_UID", archiveFile.diskFileInfo.owner_uid);
stmt.bindUint32(":DISK_FILE_GID", archiveFile.diskFileInfo.gid);
stmt.bindString(":DISK_FILE_PATH", archiveFile.diskFileInfo.path);
stmt.bindString(":SRR_USERNAME", retrieveRequest.creationLog.username);
stmt.bindString(":SRR_HOST", retrieveRequest.creationLog.host);
stmt.bindUint64(":SRR_TIME", retrieveRequest.creationLog.time);
stmt.bindString(":RETRIEVE_ERROR_REPORT_URL", retrieveRequest.errorReportURL);
stmt.bindBool(":IS_VERIFY", retrieveRequest.isVerifyOnly);
stmt.bindString(":MOUNT_POLICY", mountPolicyName);
stmt.bindString(":SRR_MOUNT_POLICY", retrieveRequest.mountPolicy);
stmt.bindString(":SRR_ACTIVITY", retrieveRequest.activity);
stmt.bindUint64(":SIZE_IN_BYTES", archiveFile.fileSize);
stmt.bindString(":DISK_FILE_ID", archiveFile.diskFileId);
stmt.bindString(":DISK_INSTANCE", archiveFile.diskInstance);
stmt.bindBlob(":CHECKSUMBLOB", archiveFile.checksumBlob.serialize());
stmt.bindUint64(":CREATION_TIME", archiveFile.creationTime);
stmt.bindString(":STORAGE_CLASS", archiveFile.storageClass);
stmt.bindString(":FAILURE_REPORT_URL", failureReportUrl);
stmt.bindString(":FAILURE_REPORT_LOG", failureReportLog);
stmt.bindBool(":IS_REPACK", isRepack);
stmt.bindUint64(":REPACK_REQID", repackReqId);
stmt.bindBlob(":RR_REPACKINFO_PB", repackInfoProtoBuf);
stmt.bindUint64(":LT_CREATE", retrieveRequest.lifecycleTimings.creation_time);
stmt.bindUint64(":LT_FIRST_SELECTED", retrieveRequest.lifecycleTimings.first_selected_time);
stmt.bindUint64(":LT_COMPLETED", retrieveRequest.lifecycleTimings.completed_time);
stmt.bindString(":DISK_SYSTEM_NAME", diskSystemName);
stmt.bindBool(":IS_FAILED", isFailed);
stmt.executeNonQuery();
}
void addParamsToLogContext(log::ScopedParamContainer& params) const {
params.add("retrieveReqId", retrieveReqId);
params.add("mountId", mountId);
params.add("status", to_string(status));
params.add("vid", vid);
params.add("actCopyNb", actCopyNb);
params.add("activity", activity.value());
params.add("priority", priority);
params.add("retMinReqAge", retMinReqAge);
params.add("startTime", startTime);
// params.add("retrieveJobsProtoBuf", retrieveJobsProtoBuf);
params.add("requester.name", retrieveRequest.requester.name);
params.add("requester.group", retrieveRequest.requester.group);
params.add("archiveFileID", archiveFile.archiveFileID);
params.add("dstURL", retrieveRequest.dstURL);
params.add("diskFileInfo.owner_uid", archiveFile.diskFileInfo.owner_uid);
params.add("diskFileInfo.gid", archiveFile.diskFileInfo.gid);
params.add("diskFileInfo.path", archiveFile.diskFileInfo.path);
params.add("creationlog.ssername", retrieveRequest.creationLog.username);
params.add("creationlog.host", retrieveRequest.creationLog.host);
params.add("creationlog.time", retrieveRequest.creationLog.time);
params.add("errorReportURL", retrieveRequest.errorReportURL);
params.add("isVerifyOnly", retrieveRequest.isVerifyOnly);
params.add("mountPolicyName", mountPolicyName);
params.add("retrieveRequest.mountPolicy", retrieveRequest.mountPolicy.value());
params.add("retrieveRequest.activity", retrieveRequest.activity.value());
params.add("fileSize", archiveFile.fileSize);
params.add("diskFileId", archiveFile.diskFileId);
params.add("diskInstance", archiveFile.diskInstance);
params.add("checksumBlob", archiveFile.checksumBlob);
params.add("creationTime", archiveFile.creationTime);
params.add("storageClass", archiveFile.storageClass);
params.add("failureReportUrl", failureReportUrl);
params.add("failureReportLog", failureReportLog);
params.add("isRepack", isRepack);
params.add("repackReqId", repackReqId);
// params.add("repackInfoProtoBuf", repackInfoProtoBuf);
params.add("lifecycleTimings.creation_time", retrieveRequest.lifecycleTimings.creation_time);
params.add("lifecycleTimings.first_selected_time", retrieveRequest.lifecycleTimings.first_selected_time);
params.add("lifecycleTimings.completed_time", retrieveRequest.lifecycleTimings.completed_time);
params.add("diskSystemName", diskSystemName.value());
params.add("isFailed", isFailed);
}
static rdbms::Rset select(Transaction &txn, RetrieveJobStatus status, const std::string &vid, uint32_t limit) {
const char *const sql =
"SELECT "
"JOB_ID AS JOB_ID,"
"RETRIEVE_REQID AS RETRIEVE_REQID,"
"MOUNT_ID AS MOUNT_ID,"
"STATUS AS STATUS,"
"VID AS VID,"
"ACTIVE_COPY_NB AS ACTIVE_COPY_NB,"
"ACTIVITY AS ACTIVITY,"
"PRIORITY AS PRIORITY,"
"RETRIEVE_MIN_REQ_AGE AS RETRIEVE_MIN_REQ_AGE,"
"STARTTIME AS STARTTIME,"
"RETRIEVEJOB_PB AS RETRIEVEJOB_PB,"
"REQUESTER_NAME AS REQUESTER_NAME,"
"REQUESTER_GROUP AS REQUESTER_GROUP,"
"ARCHIVE_FILE_ID AS ARCHIVE_FILE_ID,"
"DST_URL AS DST_URL,"
"DISK_FILE_OWNER_UID AS DISK_FILE_OWNER_UID,"
"DISK_FILE_GID AS DISK_FILE_GID,"
"DISK_FILE_PATH AS DISK_FILE_PATH,"
"SRR_USERNAME AS SRR_USERNAME,"
"SRR_HOST AS SRR_HOST,"
"SRR_TIME AS SRR_TIME,"
"RETRIEVE_ERROR_REPORT_URL AS RETRIEVE_ERROR_REPORT_URL,"
"IS_VERIFY AS IS_VERIFY,"
"MOUNT_POLICY AS MOUNT_POLICY,"
"SRR_MOUNT_POLICY AS SRR_MOUNT_POLICY,"
"SRR_ACTIVITY AS SRR_ACTIVITY,"
"SIZE_IN_BYTES AS SIZE_IN_BYTES,"
"DISK_FILE_ID AS DISK_FILE_ID,"
"DISK_INSTANCE AS DISK_INSTANCE,"
"CHECKSUMBLOB AS CHECKSUMBLOB,"
"CREATION_TIME AS CREATION_TIME,"
"STORAGE_CLASS AS STORAGE_CLASS,"
"FAILURE_REPORT_URL AS FAILURE_REPORT_URL,"
"FAILURE_REPORT_LOG AS FAILURE_REPORT_LOG,"
"IS_REPACK AS IS_REPACK,"
"REPACK_REQID AS REPACK_REQID,"
"RR_REPACKINFO_PB AS RR_REPACKINFO_PB,"
"LT_CREATE AS LT_CREATE,"
"LT_FIRST_SELECTED AS LT_FIRST_SELECTED,"
"LT_COMPLETED AS LT_COMPLETED,"
"DISK_SYSTEM_NAME AS DISK_SYSTEM_NAME,"
"IS_FAILED AS IS_FAILED "
"FROM RETRIEVE_JOB_QUEUE "
"WHERE "
"VID = :VID "
"AND STATUS = :STATUS "
"AND MOUNT_ID IS NULL "
"ORDER BY ACTIVE_PRIORITY DESC, JOB_ID "
"LIMIT :LIMIT";
auto stmt = txn.conn().createStmt(sql);
stmt.bindString(":VID", vid);
stmt.bindString(":STATUS", to_string(status));
stmt.bindUint32(":LIMIT", limit);
return stmt.executeQuery();
}
/**
* Assign a mount ID to the specified rows
*
* @param txn Transaction to use for this query
* @param rowList List of table rows to claim for the new owner
* @param mountId Mount ID to assign
*/
static void updateMountId(Transaction &txn, const std::list& rowList, uint64_t mountId);
};
} // namespace cta::postgresscheddb::sql