/* * @project The CERN Tape Archive (CTA) * @copyright Copyright(C) 2021 CERN * @copyright Copyright(C) 2021 DESY * @license This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include "FrontendGRpcSvc.h" #include "version.h" #include "catalogue/Catalogue.hpp" #include "catalogue/CatalogueFactory.hpp" #include "catalogue/CatalogueFactoryFactory.hpp" #include "catalogue/SchemaVersion.hpp" #include "common/Configuration.hpp" #include "common/log/Logger.hpp" #include "common/log/LogLevel.hpp" #include "common/log/StdoutLogger.hpp" #include "rdbms/Login.hpp" #ifdef CTA_PGSCHED #include "scheduler/PostgresSchedDB/PostgresSchedDBInit.hpp" #else #include "scheduler/OStoreDB/OStoreDBInit.hpp" #endif #include #include using namespace cta; using namespace cta::common; std::string help = "Usage: cta-frontend-grpc [options]\n" "\n" "where options can be:\n" "\n" "\t--threads , -c \tnumber of threads to process concurrent requests, defaults to 8x#CPUs\n" "\t--port , -p \tTCP port number to use, defaults to 17017\n" "\t--log-header, -n \tadd hostname and timestamp to log outputs, default\n" "\t--no-log-header, -s \tdon't add hostname and timestamp to log outputs\n" "\t--tls, -t \tenable Transport Layer Security (TLS)\n" "\t--version, -v \tprint version and exit\n" "\t--help, -h \tprint this help and exit\n"; static struct option long_options[] = { { "threads", required_argument, nullptr, 'c' }, { "port", required_argument, nullptr, 'p' }, { "log-header", no_argument, nullptr, 'n' }, { "no-log-header", no_argument, nullptr, 's' }, { "help", no_argument, nullptr, 'h' }, { "version", no_argument, nullptr, 'v' }, { "tls", no_argument, nullptr, 't' }, { nullptr, 0, nullptr, 0 } }; void printHelpAndExit(int rc) { std::cout << help << std::endl; exit(rc); } void printVersionAndExit() { std::cout << "cta-frontend-grpc version: " << CTA_VERSION << std::endl; exit(0); } std::string file2string(std::string filename){ std::ifstream as_stream(filename); std::ostringstream as_string; as_string << as_stream.rdbuf(); return as_string.str(); } int main(const int argc, char *const *const argv) { std::string port = "17017"; char c; bool shortHeader = false; int option_index = 0; const std::string shortHostName = utils::getShortHostname(); bool useTLS = false; int threads = 8 * std::thread::hardware_concurrency(); while( (c = getopt_long(argc, argv, "c:p:nshv", long_options, &option_index)) != EOF) { switch(c) { case 'p': port = std::string(optarg); break; case 'n': shortHeader = false; break; case 's': shortHeader = true; break; case 'h': printHelpAndExit(0); break; case 'v': printVersionAndExit(); break; case 't': useTLS = true; break; case 'c': threads = std::atoi(optarg); break; default: printHelpAndExit(1); } } log::StdoutLogger logger(shortHostName, "cta-frontend-grpc", shortHeader); log::LogContext lc(logger); lc.log(log::INFO, "Starting cta-frontend-grpc- " + std::string(CTA_VERSION)); // use castor config to avoid dependency on xroot-ssi Configuration config("/etc/cta/cta.conf"); std::string server_address("0.0.0.0:" + port); // Initialise the Catalogue std::string catalogueConfigFile = "/etc/cta/cta-catalogue.conf"; const rdbms::Login catalogueLogin = rdbms::Login::parseFile(catalogueConfigFile); const uint64_t nbArchiveFileListingConns = 2; auto catalogueFactory = catalogue::CatalogueFactoryFactory::create(logger, catalogueLogin, 10, nbArchiveFileListingConns); auto catalogue = catalogueFactory->create(); try { catalogue->Schema()->ping(); lc.log(log::INFO, "Connected to catalog " + catalogue->Schema()->getSchemaVersion(). getSchemaVersion()); } catch (cta::exception::Exception &ex) { lc.log(cta::log::CRIT, ex.getMessageValue()); exit(1); } // Initialise the Scheduler auto backed = config.getConfEntString("ObjectStore", "BackendPath"); lc.log(log::INFO, "Using scheduler backend: " + backed); auto sInit = std::make_unique("Frontend", backed, logger); auto scheddb = sInit->getSchedDB(*catalogue, logger); scheddb->setBottomHalfQueueSize(25000); auto scheduler = std::make_unique(*catalogue, *scheddb, 5, 2*1000*1000); CtaRpcImpl svc(&logger, catalogue, scheduler); // start gRPC service ServerBuilder builder; std::shared_ptr creds; if (useTLS) { lc.log(log::INFO, "Using gRPC over TLS"); grpc::SslServerCredentialsOptions tls_options; grpc::SslServerCredentialsOptions::PemKeyCertPair cert; auto key_file = config.getConfEntString("gRPC", "TlsKey"); lc.log(log::INFO, "TLS service key file: " + key_file); cert.private_key = file2string(key_file); auto cert_file = config.getConfEntString("gRPC", "TlsCert"); lc.log(log::INFO, "TLS service certificate file: " + cert_file); cert.cert_chain = file2string(cert_file); auto ca_chain = config.getConfEntString("gRPC", "TlsChain", ""); if (!ca_chain.empty()) { lc.log(log::INFO, "TLS CA chain file: " + ca_chain); tls_options.pem_root_certs = file2string(ca_chain); } else { lc.log(log::INFO, "TLS CA chain file not defined ..."); tls_options.pem_root_certs = ""; } tls_options.pem_key_cert_pairs.emplace_back(std::move(cert)); creds = grpc::SslServerCredentials(tls_options); } else { lc.log(log::INFO, "Using gRPC over plaintext socket"); creds = grpc::InsecureServerCredentials(); } // Listen on the given address without any authentication mechanism. builder.AddListeningPort(server_address, creds); // fixed the number of request threads ResourceQuota quota; quota.SetMaxThreads(threads); lc.log(log::INFO, "Using " + std::to_string(threads) + " request processing threads"); builder.SetResourceQuota(quota); // Register "service" as the instance through which we'll communicate with // clients. In this case it corresponds to an *synchronous* service. builder.RegisterService(&svc); std::unique_ptr server(builder.BuildAndStart()); lc.log(cta::log::INFO, "Listening on socket address: " + server_address); server->Wait(); }