// ----------------------------------------------------------------------
// File: GrpccLIENT.cc
// Author: Andreas-Joachim Peters - CERN
// ----------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2018 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
/*----------------------------------------------------------------------------*/
#include "GrpcClient.hh"
#include "proto/Rpc.grpc.pb.h"
#include "common/StringConversion.hh"
#include "common/Timing.hh"
#include "common/Path.hh"
/*----------------------------------------------------------------------------*/
#include
#include
#include
/*----------------------------------------------------------------------------*/
#include
/*----------------------------------------------------------------------------*/
EOSCLIENTNAMESPACE_BEGIN
//#ifdef EOS_GRPC
using grpc::Channel;
using grpc::ClientAsyncResponseReader;
using grpc::ClientAsyncReader;
using grpc::ClientContext;
using grpc::CompletionQueue;
using grpc::Status;
using eos::rpc::Eos;
using eos::rpc::PingRequest;
using eos::rpc::PingReply;
using eos::rpc::MDRequest;
using eos::rpc::MDResponse;
using eos::rpc::FindRequest;
using eos::rpc::NSRequest;
using eos::rpc::NSResponse;
using eos::rpc::NsStatRequest;
using eos::rpc::NsStatResponse;
using eos::rpc::FileInsertRequest;
using eos::rpc::ContainerInsertRequest;
using eos::rpc::InsertReply;
using eos::rpc::ContainerMdProto;
using eos::rpc::FileMdProto;
using eos::rpc::ManilaRequest;
using eos::rpc::ManilaResponse;
std::string GrpcClient::Ping(const std::string& payload)
{
PingRequest request;
request.set_message(payload);
request.set_authkey(token());
PingReply reply;
ClientContext context;
// The producer-consumer queue we use to communicate asynchronously with the
// gRPC runtime.
CompletionQueue cq;
Status status;
// stub_->AsyncPing() performs the RPC call, returning an instance we
// store in "rpc". Because we are using the asynchronous API, we need to
// hold on to the "rpc" instance in order to get updates on the ongoing RPC.
std::unique_ptr > rpc(
stub_->AsyncPing(&context, request, &cq));
// Request that, upon completion of the RPC, "reply" be updated with the
// server's response; "status" with the indication of whether the operation
// was successful. Tag the request with the integer 1.
rpc->Finish(&reply, &status, (void*) 1);
void* got_tag;
bool ok = false;
// Block until the next result is available in the completion queue "cq".
// The return value of Next should always be checked. This return value
// tells us whether there is any kind of event or the cq_ is shutting down.
GPR_ASSERT(cq.Next(&got_tag, &ok));
// Verify that the result from "cq" corresponds, by its tag, our previous
// request.
GPR_ASSERT(got_tag == (void*) 1);
// ... and that the request was completed successfully. Note that "ok"
// corresponds solely to the request for updates introduced by Finish().
GPR_ASSERT(ok);
// Act upon the status of the actual RPC.
if (status.ok()) {
return reply.message();
} else {
return "";
}
}
int
GrpcClient::ManilaRequest(const eos::rpc::ManilaRequest& request,
eos::rpc::ManilaResponse& reply)
{
ClientContext context;
CompletionQueue cq;
Status status;
std::unique_ptr > rpc(
stub_->AsyncManilaServerRequest(&context, request, &cq));
rpc->Finish(&reply, &status, (void*) 1);
void* got_tag;
bool ok = false;
GPR_ASSERT(cq.Next(&got_tag, &ok));
GPR_ASSERT(got_tag == (void*) 1);
GPR_ASSERT(ok);
// Act upon the status of the actual RPC.
if (status.ok()) {
return reply.code();
} else {
return -1;
}
}
std::string
GrpcClient::Md(const std::string& path,
uint64_t id,
uint64_t ino,
bool list,
bool printonly)
{
MDRequest request;
if (list) {
request.set_type(eos::rpc::LISTING);
} else {
request.set_type(eos::rpc::STAT);
}
if (path.length()) {
request.mutable_id()->set_path(path);
} else if (id) {
request.mutable_id()->set_id(id);
} else if (ino) {
request.mutable_id()->set_ino(ino);
} else {
return "";
}
request.set_authkey(token());
MDResponse response;
ClientContext context;
std::string responsestring;
CompletionQueue cq;
Status status;
std::unique_ptr > rpc(
stub_->AsyncMD(&context, request, &cq, (void*) 1));
void* got_tag;
bool ok = false;
bool ret = cq.Next(&got_tag, &ok);
while (1) {
rpc->Read(&response, (void*) 1);
ok = false;
ret = cq.Next(&got_tag, &ok);
if (!ret || !ok || got_tag != (void*) 1) {
break;
}
google::protobuf::util::JsonPrintOptions options;
options.add_whitespace = true;
options.always_print_primitive_fields = true;
std::string jsonstring;
(void) google::protobuf::util::MessageToJsonString(response,
&jsonstring, options);
if (printonly) {
std::cout << jsonstring << std::endl;
} else {
responsestring += jsonstring;
}
}
if (!status.ok()) {
std::cerr << "error: " << status.error_message() << std::endl;
}
return responsestring;
}
std::string
GrpcClient::Find(const std::string& path,
const std::string& filter,
uint64_t id,
uint64_t ino,
bool files,
bool dirs,
uint64_t depth,
bool printonly,
const std::string& exportfs)
{
FindRequest request;
if (files && !dirs) {
// query files
request.set_type(eos::rpc::FILE);
} else if (dirs && !files) {
// query container
request.set_type(eos::rpc::CONTAINER);
} else {
// query files & container
request.set_type(eos::rpc::LISTING);
}
if (path.length()) {
request.mutable_id()->set_path(path);
} else if (id) {
request.mutable_id()->set_id(id);
} else if (ino) {
request.mutable_id()->set_ino(ino);
} else {
return "";
}
if (depth) {
request.set_maxdepth(depth);
}
request.set_authkey(token());
if (filter.length()) {
// enable filtering
request.mutable_selection()->set_select(true);
std::map filtermap;
eos::common::StringConversion::GetKeyValueMap(filter.c_str(),
filtermap);
for (auto const& x : filtermap) {
if (x.first == "owner-root") {
request.mutable_selection()->set_owner_root(strtoul(x.second.c_str(), 0,
10) ? true : false);
} else if (x.first == "group-root") {
request.mutable_selection()->set_group_root(strtoul(x.second.c_str(), 0,
10) ? true : false);
} else if (x.first == "owner") {
request.mutable_selection()->set_owner(strtoul(x.second.c_str(), 0, 10));
} else if (x.first == "group") {
request.mutable_selection()->set_group(strtoul(x.second.c_str(), 0, 10));
} else if (x.first == "regex-filename") {
request.mutable_selection()->set_regexp_filename(x.second);
} else if (x.first == "regex-dirname") {
request.mutable_selection()->set_regexp_dirname(x.second);
} else if (x.first == "zero-size") {
request.mutable_selection()->mutable_size()->set_zero(strtoul(x.second.c_str(),
0, 10) ? true : false);
} else if (x.first == "min-size") {
request.mutable_selection()->mutable_size()->set_min(strtoul(x.second.c_str(),
0, 10));
} else if (x.first == "max-size") {
request.mutable_selection()->mutable_size()->set_max(strtoul(x.second.c_str(),
0, 10));
} else if (x.first == "min-children") {
request.mutable_selection()->mutable_children()->set_min(strtoul(
x.second.c_str(), 0, 10));
} else if (x.first == "max-children") {
request.mutable_selection()->mutable_children()->set_max(strtoul(
x.second.c_str(), 0, 10));
} else if (x.first == "zero-children") {
request.mutable_selection()->mutable_children()->set_zero(strtoul(
x.second.c_str(), 0, 10) ? true : false);
} else if (x.first == "min-locations") {
request.mutable_selection()->mutable_locations()->set_min(strtoul(
x.second.c_str(), 0, 10));
} else if (x.first == "max-locations") {
request.mutable_selection()->mutable_locations()->set_max(strtoul(
x.second.c_str(), 0, 10));
} else if (x.first == "zero-locations") {
request.mutable_selection()->mutable_locations()->set_zero(strtoul(
x.second.c_str(), 0, 10) ? true : false);
} else if (x.first == "min-unlinked_locations") {
request.mutable_selection()->mutable_unlinked_locations()->set_min(strtoul(
x.second.c_str(), 0, 10));
} else if (x.first == "max-unlinked_locations") {
request.mutable_selection()->mutable_unlinked_locations()->set_max(strtoul(
x.second.c_str(), 0, 10));
} else if (x.first == "zero-unlinked_locations") {
request.mutable_selection()->mutable_unlinked_locations()->set_zero(strtoul(
x.second.c_str(), 0, 10) ? true : false);
} else if (x.first == "min-treesize") {
request.mutable_selection()->mutable_treesize()->set_min(strtoul(
x.second.c_str(), 0, 10));
} else if (x.first == "max-treesize") {
request.mutable_selection()->mutable_treesize()->set_max(strtoul(
x.second.c_str(), 0, 10));
} else if (x.first == "zero-treesize") {
request.mutable_selection()->mutable_treesize()->set_zero(strtoul(
x.second.c_str(), 0, 10) ? true : false);
} else if (x.first == "min-ctime") {
request.mutable_selection()->mutable_ctime()->set_min(strtoul(x.second.c_str(),
0, 10));
} else if (x.first == "max-ctime") {
request.mutable_selection()->mutable_ctime()->set_max(strtoul(x.second.c_str(),
0, 10));
} else if (x.first == "zero-ctime") {
request.mutable_selection()->mutable_ctime()->set_zero(strtoul(x.second.c_str(),
0, 10) ? true : false);
} else if (x.first == "min-mtime") {
request.mutable_selection()->mutable_mtime()->set_min(strtoul(x.second.c_str(),
0, 10));
} else if (x.first == "max-mtime") {
request.mutable_selection()->mutable_mtime()->set_max(strtoul(x.second.c_str(),
0, 10));
} else if (x.first == "zero-mtime") {
request.mutable_selection()->mutable_mtime()->set_zero(strtoul(x.second.c_str(),
0, 10) ? true : false);
} else if (x.first == "min-stime") {
request.mutable_selection()->mutable_stime()->set_min(strtoul(x.second.c_str(),
0, 10));
} else if (x.first == "max-stime") {
request.mutable_selection()->mutable_stime()->set_max(strtoul(x.second.c_str(),
0, 10));
} else if (x.first == "zero-stime") {
request.mutable_selection()->mutable_stime()->set_zero(strtoul(x.second.c_str(),
0, 10) ? true : false);
} else if (x.first == "layoutid") {
request.mutable_selection()->set_layoutid(strtoull(x.second.c_str(), 0, 10));
} else if (x.first == "flags") {
request.mutable_selection()->set_flags(strtoull(x.second.c_str(), 0, 10));
} else if (x.first == "symlink") {
request.mutable_selection()->set_symlink(strtoul(x.second.c_str(), 0,
10) ? true : false);
} else if (x.first == "checksum-type") {
request.mutable_selection()->mutable_checksum()->set_type(x.second);
} else if (x.first == "checksum-value") {
request.mutable_selection()->mutable_checksum()->set_value(x.second);
} else if (x.first == "xattr") {
std::string key;
std::string val;
eos::common::StringConversion::SplitKeyValue(x.second, key, val, "=");
(*(request.mutable_selection()->mutable_xattr()))[key] = val;
} else {
std::cerr << "error: unknown filter '" << x.first << ":" << x.second << "'" <<
std::endl;
return "";
}
}
}
MDResponse response;
ClientContext context;
std::string responsestring;
CompletionQueue cq;
Status status;
std::unique_ptr > rpc(
stub_->AsyncFind(&context, request, &cq, (void*) 1));
void* got_tag;
bool ok = false;
bool ret = cq.Next(&got_tag, &ok);
while (1) {
rpc->Read(&response, (void*) 1);
ok = false;
ret = cq.Next(&got_tag, &ok);
if (!ret || !ok || got_tag != (void*) 1) {
break;
}
if (!exportfs.empty()) {
responsestring = ExportFs(response, exportfs);
} else {
google::protobuf::util::JsonPrintOptions options;
options.add_whitespace = true;
options.always_print_primitive_fields = true;
std::string jsonstring;
(void) google::protobuf::util::MessageToJsonString(response,
&jsonstring, options);
if (printonly) {
std::cout << jsonstring << std::endl;
} else {
responsestring += jsonstring;
}
}
}
if (!status.ok()) {
std::cerr << "error: " << status.error_message() << std::endl;
}
return responsestring;
}
int
GrpcClient::FileInsert(const std::vector& paths)
{
FileInsertRequest request;
size_t cnt = 0;
for (auto it : paths) {
std::string path = it;
struct timespec tsnow;
eos::common::Timing::GetTimeSpec(tsnow);
uint64_t inode = 0;
cnt++;
FileMdProto* file = request.add_files();
if (it.substr(0, 4) == "ino:") {
// the format is ino:xxxxxxxxxxxxxxxx: where xxxxxxxxxxxxxxxx is a 64bit hex string of the inode
path = it.substr(21);
inode = std::strtol(it.substr(4, 20).c_str() , 0, 16);
}
if (inode) {
file->set_id(inode);
}
file->set_path(path);
file->set_uid(2);
file->set_gid(2);
file->set_size(cnt);
file->set_layout_id(0x00100002);
file->mutable_checksum()->set_value("\0\0\0\1", 4);
file->set_flags(0);
file->mutable_ctime()->set_sec(tsnow.tv_sec);
file->mutable_ctime()->set_n_sec(tsnow.tv_nsec);
file->mutable_mtime()->set_sec(tsnow.tv_sec);
file->mutable_mtime()->set_n_sec(tsnow.tv_nsec);
file->mutable_locations()->Add(65535);
auto map = file->mutable_xattrs();
(*map)["sys.acl"] = "u:100:rwx";
(*map)["sys.cta.id"] = "fake";
}
request.set_authkey(token());
InsertReply reply;
ClientContext context;
// The producer-consumer queue we use to communicate asynchronously with the
// gRPC runtime.
CompletionQueue cq;
Status status;
std::unique_ptr > rpc(
stub_->AsyncFileInsert(&context, request, &cq));
// Request that, upon completion of the RPC, "reply" be updated with the
// server's response; "status" with the indication of whether the operation
// was successful. Tag the request with the integer 1.
rpc->Finish(&reply, &status, (void*) 1);
void* got_tag;
bool ok = false;
// Block until the next result is available in the completion queue "cq".
// The return value of Next should always be checked. This return value
// tells us whether there is any kind of event or the cq_ is shutting down.
GPR_ASSERT(cq.Next(&got_tag, &ok));
// Verify that the result from "cq" corresponds, by its tag, our previous
// request.
GPR_ASSERT(got_tag == (void*) 1);
// ... and that the request was completed successfully. Note that "ok"
// corresponds solely to the request for updates introduced by Finish().
GPR_ASSERT(ok);
// Act upon the status of the actual RPC.
int retc = 0;
if (status.ok()) {
for (auto it : reply.retc()) {
retc |= it;
}
return retc;
} else {
return -1;
}
}
int
GrpcClient::ContainerInsert(const std::vector& paths)
{
ContainerInsertRequest request;
for (auto it : paths) {
std::string path;
struct timespec tsnow;
eos::common::Timing::GetTimeSpec(tsnow);
uint64_t inode = 0 ;
if (it.substr(0, 4) == "ino:") {
// the format is ino:xxxxxxxxxxxxxxxx: where xxxxxxxxxxxxxxxx is a 64bit hex string of the inode
path = it.substr(21);
inode = std::strtol(it.substr(4, 20).c_str() , 0, 16);
}
ContainerMdProto* container = request.add_container();
if (inode) {
container->set_id(inode);
}
container->set_path(path);
container->set_uid(2);
container->set_gid(2);
container->set_mode(S_IFDIR | S_IRWXU);
container->mutable_ctime()->set_sec(tsnow.tv_sec);
container->mutable_ctime()->set_n_sec(tsnow.tv_nsec);
container->mutable_mtime()->set_sec(tsnow.tv_sec);
container->mutable_mtime()->set_n_sec(tsnow.tv_nsec);
auto map = container->mutable_xattrs();
(*map)["sys.acl"] = "u:100:rwx";
(*map)["sys.forced.checksum"] = "adler";
(*map)["sys.forced.space"] = "default";
(*map)["sys.forced.nstripes"] = "1";
(*map)["sys.forced.layout"] = "replica";
}
request.set_authkey(token());
InsertReply reply;
ClientContext context;
// The producer-consumer queue we use to communicate asynchronously with the
// gRPC runtime.
CompletionQueue cq;
Status status;
std::unique_ptr > rpc(
stub_->AsyncContainerInsert(&context, request, &cq));
// Request that, upon completion of the RPC, "reply" be updated with the
// server's response; "status" with the indication of whether the operation
// was successful. Tag the request with the integer 1.
rpc->Finish(&reply, &status, (void*) 1);
void* got_tag;
bool ok = false;
// Block until the next result is available in the completion queue "cq".
// The return value of Next should always be checked. This return value
// tells us whether there is any kind of event or the cq_ is shutting down.
GPR_ASSERT(cq.Next(&got_tag, &ok));
// Verify that the result from "cq" corresponds, by its tag, our previous
// request.
GPR_ASSERT(got_tag == (void*) 1);
// ... and that the request was completed successfully. Note that "ok"
// corresponds solely to the request for updates introduced by Finish().
GPR_ASSERT(ok);
// Act upon the status of the actual RPC.
int retc = 0;
if (status.ok()) {
for (auto it : reply.retc()) {
retc |= it;
}
return retc;
} else {
return -1;
}
}
std::unique_ptr
GrpcClient::Create(std::string endpoint,
std::string token,
std::string keyfile,
std::string certfile,
std::string cafile
)
{
std::string key;
std::string cert;
std::string ca;
bool ssl = false;
if (keyfile.length() || certfile.length() || cafile.length()) {
if (!keyfile.length() || !certfile.length() || !cafile.length()) {
return 0;
}
ssl = true;
if (eos::common::StringConversion::LoadFileIntoString(certfile.c_str(),
cert) && !cert.length()) {
fprintf(stderr, "error: unable to load ssl certificate file '%s'\n",
certfile.c_str());
return 0;
}
if (eos::common::StringConversion::LoadFileIntoString(keyfile.c_str(),
key) && !key.length()) {
fprintf(stderr, "unable to load ssl key file '%s'\n", keyfile.c_str());
return 0;
}
if (eos::common::StringConversion::LoadFileIntoString(cafile.c_str(),
ca) && !ca.length()) {
fprintf(stderr, "unable to load ssl ca file '%s'\n", cafile.c_str());
return 0;
}
}
grpc::SslCredentialsOptions opts = {
ca,
key,
cert
};
std::unique_ptr p(new eos::client::GrpcClient(
grpc::CreateChannel(
endpoint,
ssl ? grpc::SslCredentials(opts)
: grpc::InsecureChannelCredentials())));
p->set_ssl(ssl);
p->set_token(token);
return p;
}
int
GrpcClient::NsStat(const eos::rpc::NsStatRequest& request,
eos::rpc::NsStatResponse& reply)
{
ClientContext context;
CompletionQueue cq;
Status status;
std::unique_ptr> rpc(
stub_->AsyncNsStat(&context, request, &cq));
rpc->Finish(&reply, &status, (void*) 1);
void* got_tag;
bool ok = false;
GPR_ASSERT(cq.Next(&got_tag, &ok));
GPR_ASSERT(got_tag == (void*) 1);
GPR_ASSERT(ok);
// Act upon the status of the actual RPC
if (status.ok()) {
return reply.code();
} else {
return -1;
}
}
int
GrpcClient::Exec(const eos::rpc::NSRequest& request,
eos::rpc::NSResponse& reply)
{
ClientContext context;
CompletionQueue cq;
Status status;
std::unique_ptr > rpc(
stub_->AsyncExec(&context, request, &cq));
rpc->Finish(&reply, &status, (void*) 1);
void* got_tag;
bool ok = false;
GPR_ASSERT(cq.Next(&got_tag, &ok));
GPR_ASSERT(got_tag == (void*) 1);
GPR_ASSERT(ok);
// Act upon the status of the actual RPC.
if (status.ok()) {
return reply.error().code();
} else {
return -1;
}
}
std::string
GrpcClient::ExportFs(const eos::rpc::MDResponse& response,
const std::string& exportfs)
{
bool first = false;
if (response.type() == eos::rpc::CONTAINER) {
if (!tree.size()) {
first = true;
tree[response.cmd().id()] = response.cmd().name() + "/";
} else {
first = false;
tree[response.cmd().id()] = tree[response.cmd().parent_id()] +
response.cmd().name() + "/";
}
fprintf(stderr, "%s\n", tree[response.cmd().id()].c_str());
if (!first) {
std::string target = exportfs + "/" + tree[response.cmd().id()];
eos::common::Path cPath(target.c_str());
if (!cPath.MakeParentPath(755)) {
fprintf(stderr, "error: failed to created '%s'\n", cPath.GetParentPath());
exit(errno);
}
int rc = mkdir(cPath.GetPath(), 755);
if (rc) {
fprintf(stderr, "error: failed to created '%s'\n", cPath.GetPath());
exit(errno);
}
}
}
if (response.type() == eos::rpc::FILE) {
fprintf(stderr, "%s\n", (tree[response.fmd().cont_id()] +
response.fmd().name()).c_str());
}
return "";
}
//#endif
EOSCLIENTNAMESPACE_END