/*
* @project The CERN Tape Archive (CTA)
* @copyright Copyright(C) 2021 CERN
* @copyright Copyright(C) 2021 DESY
* @license This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
#include "FrontendGRpcSvc.h"
#include "catalogue/Catalogue.hpp"
#include "common/log/LogLevel.hpp"
#include
Status CtaRpcImpl::Version(::grpc::ServerContext *context, const ::google::protobuf::Empty *request,
::cta::admin::Version *response) {
response->set_cta_version(CTA_VERSION);
return Status::OK;
}
/*
* Validate the storage class and issue the archive ID which should be used for the Archive request
*/
Status CtaRpcImpl::Create(::grpc::ServerContext* context, const ::cta::frontend::rpc::SchedulerRequest* request, ::cta::frontend::rpc::CreateResponse* response) {
cta::log::LogContext lc(*m_log);
cta::log::ScopedParamContainer sp(lc);
lc.log(cta::log::INFO, "Create");
try {
auto& instance = request->md().wf().instance().name();
auto& storageClass = request->md().file().storage_class();
cta::common::dataStructures::RequesterIdentity requester;
requester.name = request->md().cli().user().username();
requester.group = request->md().cli().user().groupname();
uint64_t archiveFileId = m_scheduler->checkAndGetNextArchiveFileId(instance, storageClass, requester, lc);
response->set_archive_file_id(archiveFileId);
} catch (cta::exception::Exception &ex) {
lc.log(cta::log::ERR, ex.getMessageValue());
return ::grpc::Status(::grpc::StatusCode::INTERNAL, ex.getMessageValue());
}
return Status::OK;
}
Status CtaRpcImpl::Archive(::grpc::ServerContext* context, const ::cta::frontend::rpc::SchedulerRequest* request, ::cta::frontend::rpc::ArchiveResponse* response) {
cta::log::LogContext lc(*m_log);
cta::log::ScopedParamContainer sp(lc);
lc.log(cta::log::INFO, "Archive request");
sp.add("remoteHost", context->peer());
sp.add("request", "archive");
const std::string storageClass = request->md().file().storage_class();
if (storageClass.empty()) {
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "Storage class is not set.");
}
lc.log(cta::log::DEBUG, "Archive request for storageClass: " + storageClass);
cta::common::dataStructures::RequesterIdentity requester;
requester.name = request->md().cli().user().username();
requester.group = request->md().cli().user().groupname();
// check validate request args
if (request->md().wf().instance().name().empty()) {
lc.log(cta::log::WARNING, "CTA instance is not set");
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "CTA instance is not set.");
}
if (request->md().cli().user().username().empty()) {
lc.log(cta::log::WARNING, "CTA username is not set");
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "CTA username is not set.");
}
if (request->md().cli().user().groupname().empty()) {
lc.log(cta::log::WARNING, "CTA groupname is not set");
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "CTA groupname is not set.");
}
if (!request->md().file().owner().uid()) {
lc.log(cta::log::WARNING, "File's owner uid can't be zero");
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "File's owner uid can't be zero");
}
if (!request->md().file().owner().gid()) {
lc.log(cta::log::WARNING, "File's owner gid can't be zero");
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "File's owner gid can't be zero");
}
if (request->md().file().lpath().empty()) {
lc.log(cta::log::WARNING, "File's path can't be empty");
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "File's path can't be empty");
}
auto instance = request->md().wf().instance().name();
sp.add("instance", instance);
sp.add("username", request->md().cli().user().username());
sp.add("groupname", request->md().cli().user().groupname());
sp.add("storageClass", storageClass);
sp.add("fileID", request->md().file().disk_file_id());
try {
auto archiveFileId = request->md().file().archive_file_id();
sp.add("archiveID", archiveFileId);
cta::common::dataStructures::ArchiveRequest archiveRequest;
cta::checksum::ProtobufToChecksumBlob(request->md().file().csb(), archiveRequest.checksumBlob);
archiveRequest.diskFileInfo.owner_uid = request->md().file().owner().uid();
archiveRequest.diskFileInfo.gid = request->md().file().owner().gid();
archiveRequest.diskFileInfo.path = request->md().file().lpath();
archiveRequest.diskFileID = request->md().file().disk_file_id();
archiveRequest.fileSize = request->md().file().size();
archiveRequest.requester.name = request->md().cli().user().username();
archiveRequest.requester.group = request->md().cli().user().groupname();
archiveRequest.storageClass = storageClass;
archiveRequest.srcURL = request->md().transport().dst_url();
archiveRequest.archiveReportURL = request->md().transport().report_url();
archiveRequest.archiveErrorReportURL = request->md().transport().error_report_url();
archiveRequest.creationLog.host = context->peer();
archiveRequest.creationLog.username = instance;
archiveRequest.creationLog.time = time(nullptr);
std::string reqId = m_scheduler->queueArchiveWithGivenId(archiveFileId, instance, archiveRequest, lc);
sp.add("reqId", reqId);
lc.log(cta::log::INFO, "Archive request for storageClass: " + storageClass
+ " archiveFileId: " + std::to_string(archiveFileId)
+ " RequestID: " + reqId);
response->set_objectstore_id(reqId);
} catch (cta::exception::Exception &ex) {
lc.log(cta::log::ERR, ex.getMessageValue());
return ::grpc::Status(::grpc::StatusCode::INTERNAL, ex.getMessageValue());
}
return Status::OK;
}
Status CtaRpcImpl::Delete(::grpc::ServerContext* context, const ::cta::frontend::rpc::SchedulerRequest* request, ::google::protobuf::Empty* response) {
cta::log::LogContext lc(*m_log);
cta::log::ScopedParamContainer sp(lc);
sp.add("remoteHost", context->peer());
lc.log(cta::log::DEBUG, "Delete request");
sp.add("request", "delete");
// check validate request args
if (request->md().wf().instance().name().empty()) {
lc.log(cta::log::WARNING, "CTA instance is not set");
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "CTA instance is not set.");
}
if (request->md().cli().user().username().empty()) {
lc.log(cta::log::WARNING, "CTA username is not set");
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "CTA username is not set.");
}
if (request->md().cli().user().groupname().empty()) {
lc.log(cta::log::WARNING, "CTA groupname is not set");
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "CTA groupname is not set.");
}
if (request->md().file().archive_file_id() == 0) {
lc.log(cta::log::WARNING, "Invalid archive file id");
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "Invalid archive file id.");
}
if (!request->md().file().owner().uid()) {
lc.log(cta::log::WARNING, "File's owner uid can't be zero");
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "File's owner uid can't be zero");
}
if (!request->md().file().owner().gid()) {
lc.log(cta::log::WARNING, "File's owner gid can't be zero");
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "File's owner gid can't be zero");
}
if (request->md().file().lpath().empty()) {
lc.log(cta::log::WARNING, "File's path can't be empty");
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "File's path can't be empty");
}
auto instance = request->md().wf().instance().name();
// Unpack message
cta::common::dataStructures::DeleteArchiveRequest deleteRequest;
deleteRequest.requester.name = request->md().cli().user().username();
deleteRequest.requester.group = request->md().cli().user().groupname();
sp.add("instance", instance);
sp.add("username", request->md().cli().user().username());
sp.add("groupname", request->md().cli().user().groupname());
sp.add("fileID", request->md().file().disk_file_id());
deleteRequest.diskFilePath = request->md().file().lpath();
deleteRequest.diskFileId = request->md().file().disk_file_id();
deleteRequest.diskInstance = instance;
// remove pending scheduler entry, if any
deleteRequest.archiveFileID = request->md().file().archive_file_id();
if (!request->objectstore_id().empty()) {
deleteRequest.address = request->objectstore_id();
}
// Delete the file from the catalogue or from the objectstore if archive request is created
cta::utils::Timer t;
try {
deleteRequest.archiveFile = m_catalogue->ArchiveFile()->getArchiveFileById(deleteRequest.archiveFileID);
} catch (cta::exception::Exception &ex){
lc.log(cta::log::WARNING, "Deleted file is not in catalog.");
}
m_scheduler->deleteArchive(instance, deleteRequest, lc);
lc.log(cta::log::INFO, "archive file deleted.");
return Status::OK;
}
Status CtaRpcImpl::Retrieve(::grpc::ServerContext* context, const ::cta::frontend::rpc::SchedulerRequest* request, ::cta::frontend::rpc::RetrieveResponse* response) {
cta::log::LogContext lc(*m_log);
cta::log::ScopedParamContainer sp(lc);
sp.add("remoteHost", context->peer());
sp.add("request", "retrieve");
const std::string storageClass = request->md().file().storage_class();
if (storageClass.empty()) {
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "Storage class is not set.");
}
lc.log(cta::log::DEBUG, "Retrieve request for storageClass: " + storageClass);
// check validate request args
if (request->md().wf().instance().name().empty()) {
lc.log(cta::log::WARNING, "CTA instance is not set");
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "CTA instance is not set.");
}
if (request->md().cli().user().username().empty()) {
lc.log(cta::log::WARNING, "CTA username is not set");
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "CTA username is not set.");
}
if (request->md().cli().user().groupname().empty()) {
lc.log(cta::log::WARNING, "CTA groupname is not set");
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "CTA groupname is not set.");
}
if (request->md().file().archive_file_id() == 0) {
lc.log(cta::log::WARNING, "Invalid archive file id");
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "Invalid archive file id.");
}
if (!request->md().file().owner().uid()) {
lc.log(cta::log::WARNING, "File's owner uid can't be zero");
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "File's owner uid can't be zero");
}
if (!request->md().file().owner().gid()) {
lc.log(cta::log::WARNING, "File's owner gid can't be zero");
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "File's owner gid can't be zero");
}
if (request->md().file().lpath().empty()) {
lc.log(cta::log::WARNING, "File's path can't be empty");
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "File's path can't be empty");
}
auto instance = request->md().wf().instance().name();
sp.add("instance", instance);
sp.add("username", request->md().cli().user().username());
sp.add("groupname", request->md().cli().user().groupname());
sp.add("storageClass", storageClass);
sp.add("archiveID", request->md().file().archive_file_id());
sp.add("fileID", request->md().file().disk_file_id());
// Unpack message
cta::common::dataStructures::RetrieveRequest retrieveRequest;
retrieveRequest.requester.name = request->md().cli().user().username();
retrieveRequest.requester.group = request->md().cli().user().groupname();
retrieveRequest.dstURL = request->md().transport().dst_url();
retrieveRequest.errorReportURL = request->md().transport().error_report_url();
retrieveRequest.diskFileInfo.owner_uid = request->md().file().owner().uid();
retrieveRequest.diskFileInfo.gid = request->md().file().owner().gid();
retrieveRequest.diskFileInfo.path = request->md().file().lpath();
retrieveRequest.creationLog.host = context->peer();
retrieveRequest.creationLog.username = instance;
retrieveRequest.creationLog.time = time(nullptr);
retrieveRequest.isVerifyOnly = false;
retrieveRequest.archiveFileID = request->md().file().archive_file_id();
sp.add("archiveID", request->md().file().archive_file_id());
sp.add("fileID", request->md().file().disk_file_id());
cta::utils::Timer t;
// Queue the request
try {
std::string reqId = m_scheduler->queueRetrieve(instance, retrieveRequest, lc);
sp.add("reqId", reqId);
lc.log(cta::log::INFO, "Retrieve request for storageClass: " + storageClass
+ " archiveFileId: " + std::to_string(retrieveRequest.archiveFileID)
+ " RequestID: " + reqId);
response->set_objectstore_id(reqId);
} catch (cta::exception::Exception &ex){
lc.log(cta::log::CRIT, ex.getMessageValue());
return ::grpc::Status(::grpc::StatusCode::INTERNAL, ex.getMessageValue());
}
return Status::OK;
}
Status CtaRpcImpl::CancelRetrieve(::grpc::ServerContext* context, const ::cta::frontend::rpc::SchedulerRequest* request, ::google::protobuf::Empty* response) {
cta::log::LogContext lc(*m_log);
cta::log::ScopedParamContainer sp(lc);
sp.add("remoteHost", context->peer());
lc.log(cta::log::DEBUG, "CancelRetrieve request");
sp.add("request", "cancel");
// check validate request args
if (request->md().wf().instance().name().empty()) {
lc.log(cta::log::WARNING, "CTA instance is not set");
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "CTA instance is not set.");
}
if (request->md().cli().user().username().empty()) {
lc.log(cta::log::WARNING, "CTA username is not set");
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "CTA username is not set.");
}
if (request->md().cli().user().groupname().empty()) {
lc.log(cta::log::WARNING, "CTA groupname is not set");
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "CTA groupname is not set.");
}
if (!request->md().file().archive_file_id()) {
lc.log(cta::log::WARNING, "Invalid archive file id");
return ::grpc::Status(::grpc::StatusCode::INVALID_ARGUMENT, "Invalid archive file id.");
}
auto instance = request->md().wf().instance().name();
// Unpack message
cta::common::dataStructures::CancelRetrieveRequest cancelRequest;
cancelRequest.requester.name = request->md().cli().user().username();
cancelRequest.requester.group = request->md().cli().user().groupname();
cancelRequest.archiveFileID = request->md().file().archive_file_id();
cancelRequest.retrieveRequestId = request->objectstore_id();
sp.add("instance", instance);
sp.add("username", request->md().cli().user().username());
sp.add("groupname", request->md().cli().user().groupname());
sp.add("fileID", request->md().file().archive_file_id());
sp.add("schedulerJobID", request->objectstore_id());
m_scheduler->abortRetrieve(instance, cancelRequest, lc);
lc.log(cta::log::INFO, "retrieve request canceled.");
return Status::OK;
}
CtaRpcImpl::CtaRpcImpl(cta::log::Logger *logger, std::unique_ptr &catalogue, std::unique_ptr &scheduler):
m_catalogue(std::move(catalogue)), m_scheduler(std::move(scheduler)) {
m_log = logger;
}