// ----------------------------------------------------------------------
// File: GrpcServer.cc
// Author: Andreas-Joachim Peters - CERN
// ----------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2018 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "GrpcServer.hh"
#include "GrpcNsInterface.hh"
#include
#include "common/Logging.hh"
#include "common/StringConversion.hh"
#include "mgm/Macros.hh"
#include "XrdSec/XrdSecEntity.hh"
#ifdef EOS_GRPC
#include "proto/Rpc.grpc.pb.h"
#include
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::ServerWriter;
using grpc::Status;
using eos::rpc::Eos;
using eos::rpc::PingRequest;
using eos::rpc::PingReply;
using eos::rpc::FileInsertRequest;
using eos::rpc::ContainerInsertRequest;
using eos::rpc::InsertReply;
#endif
EOSMGMNAMESPACE_BEGIN
#ifdef EOS_GRPC
class RequestServiceImpl final : public Eos::Service
{
Status Ping(ServerContext* context, const eos::rpc::PingRequest* request,
eos::rpc::PingReply* reply) override
{
eos_static_info("grpc::ping from client peer=%s ip=%s DN=%s token=%s len=%lu",
context->peer().c_str(), GrpcServer::IP(context).c_str(),
GrpcServer::DN(context).c_str(), request->authkey().c_str(),
request->message().length());
eos::common::VirtualIdentity vid;
GrpcServer::Vid(context, vid, request->authkey());
reply->set_message(request->message());
return Status::OK;
}
Status FileInsert(ServerContext* context,
const eos::rpc::FileInsertRequest* request,
eos::rpc::InsertReply* reply) override
{
eos_static_info("grpc::fileinsert from client peer=%s ip=%s DN=%s token=%s",
context->peer().c_str(), GrpcServer::IP(context).c_str(),
GrpcServer::DN(context).c_str(), request->authkey().c_str());
eos::common::VirtualIdentity vid;
GrpcServer::Vid(context, vid, request->authkey());
WAIT_BOOT;
return GrpcNsInterface::FileInsert(vid, reply, request);
}
Status ContainerInsert(ServerContext* context,
const eos::rpc::ContainerInsertRequest* request,
eos::rpc::InsertReply* reply) override
{
eos_static_info("grpc::containerinsert from client peer=%s ip=%s DN=%s token=%s",
context->peer().c_str(), GrpcServer::IP(context).c_str(),
GrpcServer::DN(context).c_str(), request->authkey().c_str());
eos::common::VirtualIdentity vid;
GrpcServer::Vid(context, vid, request->authkey());
WAIT_BOOT;
return GrpcNsInterface::ContainerInsert(vid, reply, request);
}
Status MD(ServerContext* context, const eos::rpc::MDRequest* request,
ServerWriter* writer) override
{
eos_static_info("grpc::md from client peer=%s ip=%s DN=%s token=%s",
context->peer().c_str(), GrpcServer::IP(context).c_str(),
GrpcServer::DN(context).c_str(), request->authkey().c_str());
eos::common::VirtualIdentity vid;
GrpcServer::Vid(context, vid, request->authkey());
WAIT_BOOT;
switch (request->type()) {
case eos::rpc::FILE:
case eos::rpc::CONTAINER:
case eos::rpc::STAT:
return GrpcNsInterface::Stat(vid, writer, request);
break;
case eos::rpc::LISTING:
return GrpcNsInterface::StreamMD(vid, writer, request);
break;
default:
;
}
return Status(grpc::StatusCode::INVALID_ARGUMENT, "request is not supported");
}
Status Find(ServerContext* context, const eos::rpc::FindRequest* request,
ServerWriter* writer) override
{
eos_static_info("grpc::find from client peer=%s ip=%s DN=%s token=%s",
context->peer().c_str(), GrpcServer::IP(context).c_str(),
GrpcServer::DN(context).c_str(), request->authkey().c_str());
eos::common::VirtualIdentity vid;
GrpcServer::Vid(context, vid, request->authkey());
WAIT_BOOT;
return GrpcNsInterface::Find(vid, writer, request);
}
Status NsStat(ServerContext* context,
const eos::rpc::NsStatRequest* request,
eos::rpc::NsStatResponse* reply) override
{
eos_static_info("grpc::nsstat::request from client peer=%s ip=%s DN=%s token=%s",
context->peer().c_str(), GrpcServer::IP(context).c_str(),
GrpcServer::DN(context).c_str(), request->authkey().c_str());
eos::common::VirtualIdentity vid;
GrpcServer::Vid(context, vid, request->authkey());
WAIT_BOOT;
return GrpcNsInterface::NsStat(vid, reply, request);
}
Status Exec(ServerContext* context,
const eos::rpc::NSRequest* request,
eos::rpc::NSResponse* reply) override
{
eos_static_info("grpc::exec::request from client peer=%s ip=%s DN=%s token=%s",
context->peer().c_str(), GrpcServer::IP(context).c_str(),
GrpcServer::DN(context).c_str(), request->authkey().c_str());
eos::common::VirtualIdentity vid;
GrpcServer::Vid(context, vid, request->authkey());
WAIT_BOOT;
return GrpcNsInterface::Exec(vid, reply, request);
}
};
/* return client DN*/
std::string
GrpcServer::DN(grpc::ServerContext* context)
{
/*
The methods GetPeerIdentityPropertyName() and GetPeerIdentity() from grpc::ServerContext.auth_context
will prioritize SAN fields (x509_subject_alternative_name) in favor of x509_common_name
*/
std::string tag = "x509_common_name";
auto resp = context->auth_context()->FindPropertyValues(tag);
if (resp.empty()) {
tag = "x509_subject_alternative_name";
auto resp = context->auth_context()->FindPropertyValues(tag);
if (resp.empty()) {
return "";
}
}
return resp[0].data();
}
/* return client IP */
std::string GrpcServer::IP(grpc::ServerContext* context, std::string* id,
std::string* port)
{
// format is ipv4::<..> or ipv6::<..> - we just return the IP address
// butq net and id are populated as well with the prefix and suffix, respectively
// The context peer information is curl encoded
const std::string decoded_peer =
eos::common::StringConversion::curl_default_unescaped(context->peer().c_str());
std::vector tokens;
eos::common::StringConversion::Tokenize(decoded_peer, tokens, "[]");
if (tokens.size() == 3) {
if (id) {
*id = tokens[0].substr(0, tokens[0].size() - 1);
}
if (port) {
*port = tokens[2].substr(1, tokens[2].size() - 1);
}
return "[" + tokens[1] + "]";
} else {
tokens.clear();
eos::common::StringConversion::Tokenize(decoded_peer, tokens, ":");
if (tokens.size() == 3) {
if (id) {
*id = tokens[0].substr(0, tokens[0].size());
}
if (port) {
*port = tokens[2].substr(0, tokens[2].size());
}
return tokens[1];
}
return "";
}
}
/* return VID for a given call */
void
GrpcServer::Vid(grpc::ServerContext* context,
eos::common::VirtualIdentity& vid,
const std::string& authkey)
{
XrdSecEntity client("grpc");
std::string dn = DN(context);
client.name = const_cast(dn.c_str());
bool isEosToken = (authkey.substr(0, 8) == "zteos64:");
std::string tident = dn.length() ? dn.c_str() : (isEosToken ? "eostoken" :
authkey.c_str());
std::string id;
std::string ip = GrpcServer::IP(context, &id).c_str();
tident += ".1:";
tident += id;
tident += "@";
tident += ip;
client.tident = tident.c_str();
if (authkey.length()) {
client.endorsements = const_cast(authkey.c_str());
}
eos::common::Mapping::IdMap(&client, "eos.app=grpc", client.tident, vid);
}
#endif
void
GrpcServer::Run(ThreadAssistant& assistant) noexcept
{
#ifdef EOS_GRPC
if (getenv("EOS_MGM_GRPC_SSL_CERT") &&
getenv("EOS_MGM_GRPC_SSL_KEY") &&
getenv("EOS_MGM_GRPC_SSL_CA")) {
mSSL = true;
mSSLCertFile = getenv("EOS_MGM_GRPC_SSL_CERT");
mSSLKeyFile = getenv("EOS_MGM_GRPC_SSL_KEY");
mSSLCaFile = getenv("EOS_MGM_GRPC_SSL_CA");
if (eos::common::StringConversion::LoadFileIntoString(mSSLCertFile.c_str(),
mSSLCert) && !mSSLCert.length()) {
eos_static_crit("unable to load ssl certificate file '%s'",
mSSLCertFile.c_str());
mSSL = false;
}
if (eos::common::StringConversion::LoadFileIntoString(mSSLKeyFile.c_str(),
mSSLKey) && !mSSLKey.length()) {
eos_static_crit("unable to load ssl key file '%s'", mSSLKeyFile.c_str());
mSSL = false;
}
if (eos::common::StringConversion::LoadFileIntoString(mSSLCaFile.c_str(),
mSSLCa) && !mSSLCa.length()) {
eos_static_crit("unable to load ssl ca file '%s'", mSSLCaFile.c_str());
mSSL = false;
}
}
RequestServiceImpl service;
std::string bind_address = "0.0.0.0:";
bind_address += std::to_string(mPort);
grpc::ServerBuilder builder;
if (mSSL) {
grpc::SslServerCredentialsOptions::PemKeyCertPair keycert = {
mSSLKey,
mSSLCert
};
grpc::SslServerCredentialsOptions sslOps(
GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY);
sslOps.pem_root_certs = mSSLCa;
sslOps.pem_key_cert_pairs.push_back(keycert);
builder.AddListeningPort(bind_address, grpc::SslServerCredentials(sslOps));
} else {
builder.AddListeningPort(bind_address, grpc::InsecureServerCredentials());
}
builder.RegisterService(&service);
mServer = builder.BuildAndStart();
if (mServer) {
mServer->Wait();
}
#else
// Make the compiler happy
(void) mPort;
(void) mSSL;
#endif
}
EOSMGMNAMESPACE_END