//------------------------------------------------------------------------------
// File: eoscp.cc
// Author: Elvin-Alin Sindrilaru / Andreas-Joachim Peters - CERN
//------------------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2011 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "XrdCl/XrdClFile.hh"
#include "XrdCl/XrdClDefaultEnv.hh"
#include "XrdOuc/XrdOucString.hh"
#include "common/XrdErrorMap.hh"
#include "common/Timing.hh"
#include "common/SymKeys.hh"
#include "common/StringSplit.hh"
#include "common/json/Jsonifiable.hh"
#include "common/json/JsonCppJsonifier.hh"
#include "fst/layout/RaidDpLayout.hh"
#include "fst/layout/ReedSLayout.hh"
#include "fst/io/AsyncMetaHandler.hh"
#include "fst/io/ChunkHandler.hh"
#include "fst/io/xrd/XrdIo.hh"
#include "fst/io/FileIo.hh"
#include "fst/io/FileIoPluginCommon.hh"
#include "fst/checksum/ChecksumPlugins.hh"
#define PROGRAM "eoscp"
#define DEFAULTBUFFERSIZE 4*1024*1024
#define MAXSRCDST 32
using eos::common::LayoutId;
typedef std::vector > VectLocationType;
enum AccessType {
LOCAL_ACCESS, ///< local access
RAID_ACCESS, ///< xroot protocol but with raid layout
XRD_ACCESS, ///< xroot protocol
RIO_ACCESS, ///< any File IO plug-in remote protocol
CONSOLE_ACCESS ///< input/output to console
};
class XferSummary : public eos::common::Jsonifiable
{
public:
std::vector sources;
std::vector destinations;
time_t rawtime;
std::string astime;
std::optional xrdsecprotocol;
std::optional krb5ccname;
std::optional x509userproxy;
std::string src_clientinfo;
std::string dst_clientinfo;
uint64_t bytescopied;
uint64_t totalbytescopied;
float abs_time;
float realtime;
float copyrate;
double ingress_rate;
double egress_rate;
double ingress_microseconds;
double egress_microseconds;
float bandwidth;
std::optional checksum_type;
std::optional checksum_value;
off_t write_start;
off_t write_stop;
long long read_start;
long long read_stop;
int ndst;
virtual ~XferSummary() = default;
};
class XferSummaryJson : public eos::common::JsonCppJsonifier
{
public:
virtual ~XferSummaryJson() = default;
void jsonify(const XferSummary* obj, std::stringstream& oss) override
{
Json::Value root;
root["unixtime"] = Json::UInt64(obj->rawtime);
root["date"] = obj->astime;
root["auth"] = obj->xrdsecprotocol ? Json::Value(*obj->xrdsecprotocol) :
Json::nullValue;
root["krb5"] = obj->krb5ccname ? Json::Value(*obj->krb5ccname) :
Json::nullValue;
root["x509userproxy"] = obj->x509userproxy ? Json::Value(
*obj->x509userproxy) : Json::nullValue;
initializeArray(root["sources"]);
for (size_t i = 0; i < obj->sources.size(); ++i) {
root["sources"].append(obj->sources[i]);
}
for (size_t i = 0; i < obj->destinations.size(); ++i) {
root["destinations"].append(obj->destinations[i]);
}
root["bytes_copied"] = Json::UInt64(obj->bytescopied);
if (obj->ndst > 1) {
root["totalbytes_copied"] = Json::UInt64(obj->totalbytescopied);
}
root["realtime"] = obj->realtime;
root["copy_rate"] = obj->copyrate;
root["ingress_rate"] = obj->ingress_rate;
root["egress_rate"] = obj->egress_rate;
root["ingress_server_info"] = !obj->src_clientinfo.empty() ? Json::Value(
obj->src_clientinfo) : Json::nullValue;
root["egress_server_info"] = !obj->dst_clientinfo.empty() ? Json::Value(
obj->dst_clientinfo) : Json::nullValue;
root["bandwidth"] = obj->bandwidth ? Json::Value(obj->bandwidth) :
Json::nullValue;
root["checksum_type"] = obj->checksum_type ? Json::Value(
*obj->checksum_type) : Json::nullValue;
root["checksum_value"] = obj->checksum_value ? Json::Value(
*obj->checksum_value) : Json::nullValue;
root["write_start"] = Json::UInt64(obj->write_start);
root["write_stop"] = Json::UInt64(obj->write_stop);
root["read_start"] = obj->read_start >= 0 ? Json::UInt64(
obj->read_start) : Json::nullValue;
root["read_stop"] = obj->read_start >= 0 ? Json::UInt64(
obj->read_stop) : Json::nullValue;
oss << root;
}
};
const char* protocols[] = {"file", "raid", "xroot", "rio", NULL};
const char* xs[] = {"adler", "md5", "sha1", "crc32", "crc32c"};
std::set xsTypeSet(xs, xs + 5);
///! vector of source file descriptors or IO objects
std::vector > src_handler;
///! vector of destination file descriptors or IO objects
std::vector > dst_handler;
///! vector of source host address and path file
VectLocationType src_location;
///! vector of destination host address and path file
VectLocationType dst_location;
std::vector src_type; ///< vector of source type access
std::vector dst_type; ///< vector of destination type access
int verbose = 0;
int debug = 0;
int monitoring = 0;
int jsonoutput = 0;
int trylocal = 0;
int progbar = 1;
int summary = 1;
int nopio = 0;
unsigned long long targetsize = 0;
int euid = -1;
int egid = -1;
int nsrc = 1;
int ndst = 1;
int createdir = 0;
int transparentstaging = 0;
int appendmode = 0;
long long startbyte = -1;
long long stopbyte = -1;
off_t startwritebyte = 0;
off_t stopwritebyte = 0;
char symlinkname[4096];
int dosymlink = 0;
int replicamode = 0;
float bandwidth = 0;
XrdOucString cpname = "";
XrdCl::XRootDStatus status;
int retc = 0;
uint32_t buffersize = DEFAULTBUFFERSIZE;
double read_wait = 0; ///< statistics about total read time
double write_wait = 0; ///< statistics about total write time
char* buffer = NULL; ///< used for doing the reading
bool first_time = true; ///< first time prefetch two blocks
bool nooverwrite = false; ///< buy default we overwrite the target files
int gtimeout = 0; ///< copy process timeout in seconds
int cksumcomparison =
0; ///< performs a checksum comparison between the source and the destination, returns an error to the user in the case it happens
int cksummismatchdelete =
0; ///< performs a deletion of the destination file if the checksum of the source and the destination mismatch
//..............................................................................
// RAID related variables
//..............................................................................
off_t stripeWidth = 1024 * 1024;
uint64_t offsetXrd = 0;
int nparitystripes = 0;
bool isRaidTransfer = false; ///< true if we currently handle a RAID transfer
bool isSrcRaid = false; ///< meaningful only for RAID transfers
bool isStreamFile = false; ///< the file is streamed
bool doStoreRecovery = false; ///< store recoveries if the file is corrupted
std::string opaqueInfo; ///< opaque info containing the capabilities
///< necessary to do a parallel IO open
std::string replicationType = "";
//TODO: deal with the case when both the source and the destination are RAIN files
eos::fst::RainMetaLayout* redundancyObj = NULL;
std::string dst_lasturl;
std::string src_lasturl;
//..............................................................................
// Checksum variables
//..............................................................................
off_t offsetXS = 0;
bool computeXS = false;
std::string xsString = "";
std::string xsValue = "";
std::unique_ptr xsObj;
//..............................................................................
// To compute throughput etc
//..............................................................................
struct timeval abs_start_time;
struct timeval abs_stop_time;
struct timezone tz;
double ingress_microseconds = 0;
double egress_microseconds = 0;
std::string progressFile = "";
char* source[MAXSRCDST];
char* destination[MAXSRCDST];
//------------------------------------------------------------------------------
// Usage command
//------------------------------------------------------------------------------
void
usage()
{
fprintf(stderr,
"Usage: %s [-5] [-0] [-X ] [-t ] [-h] [-x] [-v] [-V] [-d] [-l] [-j] [-b ] [-T ] [-Y] [-n] [-s] [-u ] [-g ] [-S <#>] [-D <#>] [-O ] [-N ] [src2...] [dst2...]\n",
PROGRAM);
fprintf(stderr, " -h : help\n");
fprintf(stderr, " -d : debug mode\n");
fprintf(stderr, " -v : verbose mode\n");
fprintf(stderr, " -V : write summary as key value pairs\n");
fprintf(stderr, " -l : try to force the destination to the "
"local disk server [not supported]\n");
fprintf(stderr, " -a : append to the file rather than truncate"
" an existing file\n");
fprintf(stderr, " -A : append/overwrite at offset\n");
fprintf(stderr,
" -b : use as buffer size for copy operations\n");
fprintf(stderr,
" -T : use as target size for copies from STDIN\n");
fprintf(stderr,
" -m : set the mode for the destination file\n");
fprintf(stderr, " -n : hide progress bar\n");
fprintf(stderr, " -N : set name for progress printout\n");
fprintf(stderr, " -s : hide summary\n");
fprintf(stderr,
" -j : JSON output (flags -V -d -v -s are ignored)\n");
fprintf(stderr,
" -u : use as UID to execute the operation - (user) is mapped to unix UID if possible\n");
fprintf(stderr,
" -g : use as GID to execute the operation - (group) is mapped to unix GID if possible\n");
fprintf(stderr,
" -t : reduce the traffic to an average of mb/s\n");
fprintf(stderr, " -S <#> : read from <#> sources in 'parallel'\n");
fprintf(stderr, " -D <#> : write to <#> sources in 'parallel'\n");
fprintf(stderr, " -q : quit copy after seconds\n");
fprintf(stderr,
" -O : write progress file to (0.00 - 100.00%%)\n");
fprintf(stderr, " -i : enable transparent staging\n");
fprintf(stderr,
" -p : create all needed subdirectories for destination paths\n");
fprintf(stderr, " : path/url or - for STDIN\n");
fprintf(stderr, " : path/url or - for STDOUT\n");
fprintf(stderr, " -5 : compute md5\n");
fprintf(stderr,
" -r : : read only the range from bytes to bytes\n");
fprintf(stderr,
" -L : create a symbolic link to the 1st target file with name \n");
fprintf(stderr,
" -R : replication mode - avoid dir creation and stat's\n");
fprintf(stderr,
" -X : checksum type: adler, crc32, crc32c, sha1, md5\n");
fprintf(stderr,
" -e : RAID layouts - error correction layout: raiddp/reeds\n");
fprintf(stderr,
" -P : RAID layouts - number of parity stripes\n");
fprintf(stderr,
" -f : RAID layouts - store the modifications in case of errors\n");
fprintf(stderr,
" -c : RAID layouts - force check and recover any corruptions in any stripe\n");
fprintf(stderr, " -Y : RAID layouts - streaming file\n");
fprintf(stderr,
" -0 : RAID layouts - don't use parallel IO mode\n");
fprintf(stderr, " -x : don't overwrite an existing file\n");
fprintf(stderr,
" -C : fail if checksum comparison between source and destination fails (XRootD destination only)\n");
fprintf(stderr,
" -E : automatically delete the destination file if checksum comparison between source and destination fails (XRootD destination only) \n");
exit(-1);
}
extern "C"
{
//----------------------------------------------------------------------------
// Function + macros to allow formatted print via cout,cerr
//----------------------------------------------------------------------------
void
cout_print(const char* format, ...)
{
char cout_buff[4096];
va_list args;
va_start(args, format);
vsprintf(cout_buff, format, args);
va_end(args);
cout << cout_buff;
}
void
cerr_print(const char* format, ...)
{
char cerr_buff[4096];
va_list args;
va_start(args, format);
vsprintf(cerr_buff, format, args);
va_end(args);
cerr << cerr_buff;
}
#define COUT(s) do { \
cout_print s; \
} while (0)
#define CERR(s) do { \
cerr_print s; \
} while (0)
}
XferSummary createXferSummary(const VectLocationType& src,
const VectLocationType& dst, unsigned long long bytesread)
{
XferSummary xferSummary;
xferSummary.setJsonifier(std::make_shared());
std::string src_clientinfo;
std::string dst_clientinfo;
if (src_lasturl.length()) {
XrdCl::URL url(src_lasturl);
XrdCl::URL::ParamsMap cgi = url.GetParams();
std::string zclientinfo = cgi["eos.clientinfo"];
eos::common::SymKey::ZDeBase64(zclientinfo, src_clientinfo);
xferSummary.src_clientinfo = src_clientinfo;
}
if (dst_lasturl.length()) {
XrdCl::URL url(dst_lasturl);
XrdCl::URL::ParamsMap cgi = url.GetParams();
std::string zclientinfo = cgi["eos.clientinfo"];
eos::common::SymKey::ZDeBase64(zclientinfo, dst_clientinfo);
xferSummary.dst_clientinfo = dst_clientinfo;
}
gettimeofday(&abs_stop_time, &tz);
float abs_time = ((float)((abs_stop_time.tv_sec - abs_start_time.tv_sec) * 1000
+
(abs_stop_time.tv_usec - abs_start_time.tv_usec) / 1000));
xferSummary.abs_time = abs_time;
for (unsigned int i = 0; i < src.size(); i++) {
xferSummary.sources.push_back("");
auto& srcStr = xferSummary.sources.back();
srcStr += src[i].first.c_str();
srcStr += src[i].second.c_str();
size_t pos = srcStr.rfind('?');
if (pos != std::string::npos) {
srcStr.erase(pos);
}
if (srcStr.find("//replicate:") != std::string::npos) {
// disable client redirection eoscp
XrdCl::DefaultEnv::GetEnv()->PutInt("RedirectLimit", 1);
}
}
for (unsigned int i = 0; i < dst.size(); i++) {
xferSummary.destinations.push_back("");
auto& dstStr = xferSummary.destinations.back();
dstStr += dst[i].first.c_str();
dstStr += dst[i].second.c_str();
size_t pos = dstStr.rfind('?');
if (pos != std::string::npos) {
dstStr.erase(pos);
}
if (dstStr.find("//replicate:") != std::string::npos) {
// disable client redirection eoscp
XrdCl::DefaultEnv::GetEnv()->PutInt("RedirectLimit", 1);
}
}
time_t rawtime;
struct tm* timeinfo;
time(&rawtime);
timeinfo = localtime(&rawtime);
XrdOucString astime = asctime(timeinfo);
astime.erase(astime.length() - 1);
xferSummary.rawtime = rawtime;
xferSummary.astime = astime.c_str();
xferSummary.xrdsecprotocol = getenv("XrdSecPROTOCOL") ?
std::optional(getenv("XrdSecPROTOCOL")) : std::nullopt;
xferSummary.krb5ccname = getenv("KRB5CCNAME") ? std::optional
(getenv("KRB5CCNAME")) : std::nullopt;
xferSummary.x509userproxy = getenv("X509_USER_PROXY") ?
std::optional(getenv("X509_USER_PROXY")) : std::nullopt;
xferSummary.bytescopied = bytesread;
xferSummary.totalbytescopied = bytesread * ndst;
xferSummary.realtime = xferSummary.abs_time / 1000.0;
xferSummary.copyrate = xferSummary.abs_time > 0 ? xferSummary.bytescopied /
xferSummary.abs_time / 1000.0 : 0;
xferSummary.ingress_microseconds = ingress_microseconds;
xferSummary.egress_microseconds = egress_microseconds;
xferSummary.ingress_rate = xferSummary.ingress_microseconds ? bytesread /
xferSummary.ingress_microseconds : 0;
xferSummary.egress_rate = xferSummary. egress_microseconds ? bytesread /
xferSummary.egress_microseconds : 0;
xferSummary.bandwidth = bandwidth;
if (computeXS) {
xferSummary.checksum_type = xsString;
xferSummary.checksum_value = xsValue;
}
xferSummary.write_start = startwritebyte;
xferSummary.write_stop = stopwritebyte;
xferSummary.read_start = startbyte;
xferSummary.read_stop = stopbyte;
xferSummary.ndst = ndst;
return xferSummary;
}
//------------------------------------------------------------------------------
// Printing summary header
//------------------------------------------------------------------------------
void
print_summary_header(const XferSummary& xferSummary)
{
if (!monitoring) {
COUT(("[eoscp] #################################################################\n"));
COUT(("[eoscp] # Date : ( %lu ) %s\n",
(unsigned long) xferSummary.rawtime, xferSummary.astime.c_str()));
COUT(("[eoscp] # auth forced=%s krb5=%s gsi=%s\n",
xferSummary.xrdsecprotocol ? xferSummary.xrdsecprotocol->c_str() : "",
xferSummary.krb5ccname ? xferSummary.krb5ccname->c_str() : "",
xferSummary.x509userproxy ? xferSummary.x509userproxy->c_str() : ""));
for (unsigned int i = 0; i < xferSummary.sources.size(); i++) {
COUT(("[eoscp] # Source Name [%02d] : %s\n", i,
xferSummary.sources[i].c_str()));
}
for (unsigned int i = 0; i < xferSummary.destinations.size(); i++) {
COUT(("[eoscp] # Destination Name [%02d] : %s\n", i,
xferSummary.destinations[i].c_str()));
}
} else {
COUT(("unixtime=%lu date=\"%s\" auth=\"%s\" ",
(unsigned long) xferSummary.rawtime,
xferSummary.astime.c_str(),
xferSummary.xrdsecprotocol ? xferSummary.xrdsecprotocol->c_str() : "(null)"));
for (unsigned int i = 0; i < xferSummary.sources.size(); i++) {
COUT(("src_%d=%s ", i, xferSummary.sources[i].c_str()));
}
for (unsigned int i = 0; i < xferSummary.destinations.size(); i++) {
COUT(("dst_%d=%s ", i, xferSummary.destinations[i].c_str()));
}
}
}
//------------------------------------------------------------------------------
// Print summary
//------------------------------------------------------------------------------
void
print_summary(const XferSummary& xferSummary)
{
print_summary_header(xferSummary);
if (!monitoring) {
// This is a quick-and-dirty trick to keep the ':' after the checksum type label aligned with the rest
// of the output (part 1)
std::string key = "[eoscp] # Data Copied [bytes] ";
const size_t keyLen = key.size();
key += ": %lld\n";
COUT((key.c_str(), xferSummary.totalbytescopied));
if (xferSummary.ndst > 1) {
COUT(("[eoscp] # Tot. Data Copied [bytes] : %lld\n",
xferSummary.totalbytescopied));
}
COUT(("[eoscp] # Realtime [s] : %.03f\n", xferSummary.realtime));
if (xferSummary.abs_time > 0) {
COUT(("[eoscp] # Eff.Copy. Rate[MB/s] : %.02f\n",
xferSummary.copyrate));
}
if (xferSummary.ingress_microseconds) {
COUT(("[eoscp] # INGRESS [MB/s] : %.02f\n",
xferSummary.ingress_rate));
}
if (xferSummary.egress_microseconds) {
COUT(("[eoscp] # EGRESS [MB/s] : %.02f\n",
xferSummary.egress_rate));
}
if (xferSummary.bandwidth) {
COUT(("[eoscp] # Bandwidth[MB/s] : %d\n",
(int) xferSummary.bandwidth));
}
if (xferSummary.checksum_type) {
// This is a quick-and-dirty trick to keep the ':' after the checksum type label aligned with the rest
// of the output (part 2)
std::string cksumTypeTitle = "[eoscp] # Checksum Type " +
*xferSummary.checksum_type;
size_t paddingSize = int(keyLen - cksumTypeTitle.length()) > 0 ? keyLen -
cksumTypeTitle.length() : 0;
if (paddingSize) {
cksumTypeTitle += std::string(paddingSize, ' ');
}
COUT(((cksumTypeTitle + std::string(": ")).c_str()));
COUT(("%s", xferSummary.checksum_value->c_str()));
COUT(("\n"));
}
COUT(("[eoscp] # Write Start Position : %lld\n", xferSummary.write_start));
COUT(("[eoscp] # Write Stop Position : %lld\n", xferSummary.write_stop));
if (xferSummary.read_start >= 0) {
COUT(("[eoscp] # Read Start Position : %lld\n", xferSummary.read_start));
COUT(("[eoscp] # Read Stop Position : %lld\n", xferSummary.read_stop));
}
if (!xferSummary.src_clientinfo.empty()) {
COUT(("[eoscp] # INGRESS Server Info : %s\n",
xferSummary.src_clientinfo.c_str()));
}
if (!xferSummary.dst_clientinfo.empty()) {
COUT(("[eoscp] # EGRESS Server info : %s\n",
xferSummary.dst_clientinfo.c_str()));
}
} else {
COUT(("bytes_copied=%lld ", xferSummary.bytescopied));
if (ndst > 1) {
COUT(("totalbytes_copied=%lld ", xferSummary.totalbytescopied));
}
COUT(("realtime=%.02f ", xferSummary.abs_time / 1000.0));
if (xferSummary.abs_time > 0) {
COUT(("copy_rate=%f ", xferSummary.bytescopied / xferSummary.abs_time /
1000.0));
}
if (xferSummary.ingress_microseconds) {
COUT(("ingress_rate=%f ",
xferSummary.ingress_rate));
}
if (xferSummary.egress_microseconds) {
COUT(("egress_rate=%f ",
xferSummary.egress_rate));
}
if (xferSummary.bandwidth) {
COUT(("bandwidth=%d ", (int) xferSummary.bandwidth));
}
if (xferSummary.checksum_type) {
COUT(("checksum_type=%s ", xferSummary.checksum_type->c_str()));
COUT(("checksum=%s ", xferSummary.checksum_value->c_str()));
}
COUT(("write_start=%lld ", xferSummary.write_start));
COUT(("write_stop=%lld ", xferSummary.write_stop));
if (xferSummary.read_start >= 0) {
COUT(("read_start=%lld ", xferSummary.read_start));
COUT(("read_stop=%lld ", xferSummary.read_stop));
}
}
}
void print_json_summary(const XferSummary& xferSummary)
{
std::stringstream ss;
xferSummary.jsonify(ss);
COUT((ss.str().c_str()));
}
//------------------------------------------------------------------------------
// Printing progress bar
//------------------------------------------------------------------------------
void
print_progbar(unsigned long long bytesread, unsigned long long size)
{
if (!size) {
bytesread = size = 1; // fake 100% in that case
}
CERR(("[eoscp] %-24s Total %.02f MB\t|", cpname.c_str(),
(float) size / 1024 / 1024));
for (int l = 0; l < 20; l++) {
if (l < ((int)(20.0 * bytesread / size))) {
CERR(("="));
}
if (l == ((int)(20.0 * bytesread / size))) {
CERR((">"));
}
if (l > ((int)(20.0 * bytesread / size))) {
CERR(("."));
}
}
float abs_time = ((float)((abs_stop_time.tv_sec - abs_start_time.tv_sec) * 1000
+
(abs_stop_time.tv_usec - abs_start_time.tv_usec) / 1000));
CERR(("| %.02f %% [%.01f MB/s]\r", 100.0 * bytesread / size,
bytesread / abs_time / 1000.0));
}
//------------------------------------------------------------------------------
// Write progress
//------------------------------------------------------------------------------
void
write_progress(unsigned long long bytesread, unsigned long long size)
{
static double lastprogress = 0;
double progress = 100 * bytesread / (double)(size ? size : 1);
if (progress > 100) {
progress = 100;
}
if ((fabs(progress - lastprogress) <= 1.0) && (progress != 100.)) {
// skip this update
return;
}
std::string pf = progressFile;
pf += ".tmp";
FILE* fd = fopen(pf.c_str(), "w+");
if (fd) {
fprintf(fd, "%.02f %llu %llu\n", progress, bytesread, size);
fclose(fd);
if (rename(pf.c_str(), progressFile.c_str())) {
fprintf(stderr, "error: renaming of progress file failed (%s=>%s)\n",
pf.c_str(), progressFile.c_str());
}
}
}
struct CompareCksumResult {
bool cksumMismatch = true;
uint32_t xrdErrno = 0;
std::string errMsg = "";
};
CompareCksumResult compareChecksum(XrdCl::FileSystem& fs,
const std::string& destFilePath, std::string srcCksumType,
const std::string& srcCksumValue)
{
CompareCksumResult result;
// Get the checksum of the file that got uploaded to the destination
std::unique_ptr response_sp;
XrdCl::Buffer* response = response_sp.get();
if (srcCksumType == "adler") {
// xrootd adler32 checksum is called "adler32"
srcCksumType = "adler32";
}
XrdCl::Buffer arg;
arg.FromString(destFilePath + "?cks.type=" + srcCksumType);
XrdCl::XRootDStatus status = fs.Query(XrdCl::QueryCode::Checksum, arg,
response);
if (status.IsOK()) {
// we got the checksum of the destination file
// compare the checksums between source and destination
std::string queryCksumResp = response->GetBuffer();
auto splittedResp = eos::common::StringSplit(queryCksumResp, " ");
if (splittedResp.size() == 2) {
// The checksum response we received has a proper format
const std::string destCksumType(splittedResp[0]);
const std::string destCksumValue(splittedResp[1]);
if (destCksumType == srcCksumType) {
// Same checksum type between the source and the destination
if (destCksumValue == srcCksumValue) {
// Checksum match!
result.cksumMismatch = false;
} else {
// Checksum mismatch
result.xrdErrno = EIO;
result.errMsg = "error: checksum mismatch between source (" + srcCksumValue +
") and destination (" + destCksumValue + ")";
}
} else {
// Different checksum type between source and destination
result.errMsg =
"error while extracting destination checksum: received a different checksum type from the destination ("
+ destCksumType + ") compared to the one computed on the source (" +
srcCksumType + ")";
result.xrdErrno = EINVAL;
}
} else {
// Wrong response format received
result.errMsg =
"error while extracting the destination checksum: expected 'destCksumType destCksumValue', received:"
+ queryCksumResp;
result.xrdErrno = EINVAL;
}
} else {
// Problem while querying the destination checksum
result.errMsg = "error while getting the destination checksum: " +
status.ToStr();
result.xrdErrno = status.errNo;
}
return result;
}
//------------------------------------------------------------------------------
// Abort handler
//------------------------------------------------------------------------------
void
abort_handler(int)
{
// print_summary_header(src_location, dst_location);
fprintf(stdout, "error: [eoscp] has been aborted\n");
exit(EINTR);
}
//------------------------------------------------------------------------------
// Alarm handler
//------------------------------------------------------------------------------
void
alarm_handler(int)
{
// print_summary_header(src_location, dst_location);
fprintf(stdout, "error: [eoscp] has timedout after %d seconds\n", gtimeout);
exit(ETIMEDOUT);
}
//------------------------------------------------------------------------------
// Main function
//------------------------------------------------------------------------------
int
main(int argc, char* argv[])
{
int c;
mode_t dest_mode[MAXSRCDST];
int set_mode = 0;
extern char* optarg;
extern int optind;
XrdCl::DefaultEnv::GetEnv()->PutInt("MetalinkProcessing", 0);
XrdCl::DefaultEnv::GetEnv()->PutInt("ParallelEvtLoop",
8); // needed for high performance on 100GE
while ((c = getopt(argc, argv,
"CEnshxdvlipfcje:P:X:b:m:u:g:t:S:D:5aA:r:N:L:RT:O:V0q:")) != -1) {
switch (c) {
case 'v':
verbose = 1;
break;
case 'V':
monitoring = 1;
break;
case 'j':
jsonoutput = 1;
break;
case 'd':
debug = 1;
break;
case 'l':
trylocal = 1;
break;
case 'n':
progbar = 0;
break;
case 'p':
createdir = 1;
break;
case 's':
summary = 0;
break;
case 'i':
transparentstaging = 1;
break;
case 'a':
appendmode = 1;
break;
case 'A':
appendmode = 1;
startwritebyte = strtoull(optarg, 0, 10);
break;
case 'c':
doStoreRecovery = true;
offsetXrd = -1;
break;
case 'f':
break;
case 'x':
nooverwrite = true;
break;
case 'e':
replicationType = optarg;
if ((replicationType != "raiddp") && (replicationType != "reeds")) {
fprintf(stderr, "error: no such RAID layout\n");
exit(-1);
}
isRaidTransfer = true;
break;
case 'X': {
xsString = optarg;
if (find(xsTypeSet.begin(), xsTypeSet.end(), xsString) == xsTypeSet.end()) {
fprintf(stderr, "error: no such checksum type: %s\n", optarg);
exit(-1);
}
int layout = 0;
unsigned long layoutId = 0;
if (xsString == "adler") {
layoutId = LayoutId::GetId(layout, LayoutId::kAdler);
} else if (xsString == "crc32") {
layoutId = LayoutId::GetId(layout, LayoutId::kCRC32);
} else if (xsString == "md5") {
layoutId = LayoutId::GetId(layout, LayoutId::kMD5);
} else if (xsString == "sha1") {
layoutId = LayoutId::GetId(layout, LayoutId::kSHA1);
} else if (xsString == "crc32c") {
layoutId = LayoutId::GetId(layout, LayoutId::kCRC32C);
}
xsObj = eos::fst::ChecksumPlugins::GetChecksumObject(layoutId);
if (xsObj) {
xsObj->Reset();
computeXS = true;
}
break;
}
case 'P':
nparitystripes = atoi(optarg);
if (nparitystripes < 2) {
fprintf(stderr, "error: number of parity stripes >= 2\n");
exit(-1);
}
break;
case '0':
nopio = true;
break;
case 'O':
progressFile = optarg;
break;
case 'u':
euid = atoi(optarg);
char tuid[128];
sprintf(tuid, "%d", euid);
if (strcmp(tuid, optarg)) {
// this is not a number, try to map it with getpwnam
struct passwd* pwinfo = getpwnam(optarg);
if (pwinfo) {
euid = pwinfo->pw_uid;
if (debug) {
fprintf(stdout, "[eoscp]: mapping user %s=>UID:%d\n", optarg, euid);
}
} else {
fprintf(stderr, "error: cannot map user %s to any unix id!\n", optarg);
exit(-ENOENT);
}
}
break;
case 'g':
egid = atoi(optarg);
char tgid[128];
sprintf(tgid, "%d", egid);
if (strcmp(tgid, optarg)) {
// this is not a number, try to map it with getgrnam
struct group* grinfo = getgrnam(optarg);
if (grinfo) {
egid = grinfo->gr_gid;
if (debug) {
fprintf(stdout, "[eoscp]: mapping group %s=>GID:%d\n", optarg, egid);
}
} else {
fprintf(stderr, "error: cannot map group %s to any unix id!\n", optarg);
exit(-ENOENT);
}
}
break;
case 't':
bandwidth = atoi(optarg);
if ((bandwidth < 1) || (bandwidth > 2000)) {
fprintf(stderr, "error: bandwidth can only be 1 <= bandwidth <= 2000 Mb/s\n");
exit(-1);
}
break;
case 'q':
gtimeout = atoi(optarg);
break;
case 'S':
nsrc = atoi(optarg);
if ((nsrc < 1) || (nsrc > MAXSRCDST)) {
fprintf(stderr, "error: # of sources must be 1 <= # <= %d\n", MAXSRCDST);
exit(-1);
}
break;
case 'D':
ndst = atoi(optarg);
if ((ndst < 1) || (ndst > MAXSRCDST)) {
fprintf(stderr, "error: # of sources must be 1 <= # <= %d\n", MAXSRCDST);
exit(-1);
}
break;
case 'N':
cpname = optarg;
break;
case 'b':
buffersize = atoi(optarg);
if ((buffersize < 4096) || (buffersize > 100 * 1024 * 1024)) {
fprintf(stderr, "error: buffer size can only 4k <= size <= 100 M\n");
exit(-1);
}
break;
case 'T':
targetsize = strtoull(optarg, 0, 10);
break;
case 'm':
for (int i = 0; i < MAXSRCDST; i++) {
dest_mode[i] = strtol(optarg, 0, 8);
}
set_mode = 1;
break;
case 'r':
char* colon;
colon = strchr(optarg, ':');
if (colon == NULL) {
fprintf(stderr, "error: range has to be given in the format "
": e.g. 0:100000\n");
exit(-1);
}
*colon = 0;
startbyte = strtoll(optarg, 0, 0);
stopbyte = strtoll(colon + 1, 0, 0);
if (debug) {
fprintf(stdout, "[eoscp]: reading range start=%lld stop=%lld\n", startbyte,
stopbyte);
}
break;
case 'L':
sprintf(symlinkname, "%s", optarg);
dosymlink = 1;
break;
case 'R':
replicamode = 1;
break;
case 'Y':
isStreamFile = true;
break;
case 'C':
cksumcomparison = 1;
break;
case 'E':
cksummismatchdelete = 1;
break;
case 'h':
default:
usage();
;
}
}
if (jsonoutput) {
summary = 1;
monitoring = 0;
debug = 0;
verbose = 0;
progbar = 0;
}
if (debug) {
eos::common::Logging& g_logging = eos::common::Logging::GetInstance();
g_logging.SetLogPriority(LOG_DEBUG);
}
if (optind - 1 + nsrc + ndst >= argc) {
usage();
}
if (gtimeout) {
signal(SIGALRM, alarm_handler);
alarm(gtimeout);
}
//............................................................................
// Allocate the buffer used for copy
//............................................................................
buffer = new char[2 * buffersize];
if ((!buffer)) {
fprintf(stderr, "error: cannot allocate buffer of size %u\n", 2 * buffersize);
exit(-ENOMEM);
}
if (debug) {
fprintf(stderr, "[eoscp]: allocate copy buffer with %u bytes\n",
2 * buffersize);
}
//.............................................................................
// Get the address and the file path from the input
//.............................................................................
std::string location;
std::string address;
std::string file_path;
for (int i = 0; i < nsrc; i++) {
location = argv[optind + i];
size_t pos = location.find("://");
pos = location.find("//", pos + 3);
if (pos == std::string::npos) {
address = "";
file_path = location;
} else {
address = std::string(location, 0, pos + 1);
file_path = std::string(location, pos + 1);
}
src_location.push_back(std::make_pair(address, file_path));
if (verbose || debug) {
if (i == 0) {
fprintf(stdout, "[eoscp] ");
}
fprintf(stdout, "src<%d>=%s ", i, location.c_str());
}
}
for (int i = 0; i < ndst; i++) {
location = argv[optind + nsrc + i];
size_t pos = location.find("://");
pos = location.find("//", pos + 3);
if (pos == std::string::npos) {
address = "";
file_path = location;
} else {
address = std::string(location, 0, pos + 1);
file_path = std::string(location, pos + 1);
}
dst_location.push_back(std::make_pair(address, file_path));
if (verbose || debug) {
fprintf(stdout, "dst<%d>=%s ", i, location.c_str());
}
}
if (verbose || debug) {
fprintf(stdout, "\n");
}
if (cksumcomparison || cksummismatchdelete) {
if (src_location.size() != 1 && dst_location.size() != 1) {
fprintf(stderr,
"error: only one source and one destination can be provided if the destination checksum check option is enabled (-C or -E)\n");
exit(-EINVAL);
}
if (cksummismatchdelete && !cksumcomparison) {
fprintf(stderr,
"error: source and destination checksum comparison (-C) not enabled, automatic deletion option (-E) cannot be enabled\n");
exit(-EINVAL);
}
}
//.............................................................................
// Get the type of access we will be doing
//.............................................................................
if (isRaidTransfer) {
if (!nparitystripes) {
fprintf(stderr, "error: number of parity stripes undefined\n");
exit(-EINVAL);
}
if (nsrc > ndst) {
isSrcRaid = true;
} else {
isSrcRaid = false;
}
}
int stat_failed = 0;
struct stat st[MAXSRCDST];
//.............................................................................
// Get sources access type
//.............................................................................
for (int i = 0; i < nsrc; i++) {
if (src_location[i].first.find("root://") != std::string::npos) {
if (isRaidTransfer && isSrcRaid) {
src_type.push_back(RAID_ACCESS);
} else {
// If we don't need to recover the source and we were not told explicitly
// that this is a RAIN transfer
if (!isRaidTransfer && !doStoreRecovery) {
//.......................................................................
// Test if we can do parallel IO access
//.......................................................................
bool doPIO = false;
XrdCl::Buffer arg;
XrdCl::Buffer* response = 0;
XrdCl::XRootDStatus status;
file_path = src_location[i].first + src_location[i].second;
if (file_path.find("//eos/") != std::string::npos) {
// for any other URL it does not make sense to do the PIO access
if (!nopio) {
doPIO = true;
}
}
size_t spos = file_path.rfind("//");
std::string address = file_path.substr(0, spos + 1);
XrdCl::URL url(address);
if (!url.IsValid()) {
fprintf(stderr, "URL is invalid: %s", address.c_str());
exit(-1);
}
XrdCl::FileSystem fs(url);
if (spos != std::string::npos) {
file_path.erase(0, spos + 1);
}
std::string request = file_path;
if ((file_path.find("?") == std::string::npos)) {
request += "?mgm.pcmd=open";
} else {
request += "&mgm.pcmd=open";
}
arg.FromString(request);
st[0].st_size = 0;
st[0].st_mode = 0;
if (doPIO) {
status = fs.Query(XrdCl::QueryCode::OpaqueFile, arg, response);
}
if (doPIO && status.IsOK()) {
if (!getenv("EOS_FST_XRDIO_READAHEAD")) {
setenv("EOS_FST_XRDIO_READAHEAD", "1", 1);
}
if (!getenv("EOS_FST_XRDIO_BLOCK_SIZE")) {
setenv("EOS_FST_XRDIO_BLOCK_SIZE", "4194304 ", 1);
}
XrdCl::StatInfo* statresponse = 0;
status = fs.Stat(file_path.c_str(), statresponse);
if (status.IsOK()) {
st[0].st_size = statresponse->GetSize();
st[0].st_mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
if (statresponse->TestFlags(XrdCl::StatInfo::IsWritable)) {
st[0].st_mode |= S_IWGRP;
}
}
delete statresponse;
//..................................................................
// Parse output
//..................................................................
if (verbose || debug) {
fprintf(stderr, "[eoscp] having PIO_ACCESS for source location=%i size=%llu \n",
i, (unsigned long long) st[0].st_size);
}
XrdOucString tag;
XrdOucString stripe_path;
std::string origResponse(response->GetBuffer(), response->GetSize());
XrdOucString stringOpaque = origResponse.c_str();
while (stringOpaque.replace("?", "&")) {
}
while (stringOpaque.replace("&&", "&")) {
}
XrdOucEnv* openOpaque = new XrdOucEnv(stringOpaque.c_str());
char* opaque_info = (char*) strstr(origResponse.c_str(), "&mgm.logid");
if (opaque_info == nullptr) {
fprintf(stderr, "error: failed to parse opaque information from "
"PIO request.\n");
exit(-EINVAL);
}
opaqueInfo = opaque_info;
//..................................................................
// Now that parallel IO is possible, we add the new stripes to the
// src_location vector, we update the number of source files and then
// we can use the RAID-like access mode where the stripe files are
// given as input to the command line
//...................................................................
if (opaque_info) {
LayoutId::layoutid_t layout = openOpaque->GetInt("mgm.lid");
std::string orig_file = file_path;
if (eos::common::LayoutId::GetLayoutType(layout) ==
eos::common::LayoutId::kRaidDP) {
nsrc = eos::common::LayoutId::GetStripeNumber(layout) + 1;
nparitystripes = 2;
isRaidTransfer = true;
isSrcRaid = true;
src_location.clear();
replicationType = "raiddp";
} else if (eos::common::LayoutId::IsRain(layout)) {
nsrc = eos::common::LayoutId::GetStripeNumber(layout) + 1;
nparitystripes = eos::common::LayoutId::GetRedundancyStripeNumber(layout);
isRaidTransfer = true;
isSrcRaid = true;
src_location.clear();
replicationType = "reeds";
} else {
nsrc = 1;
src_type.push_back(XRD_ACCESS);
replicationType = "replica";
}
if (replicationType != "replica") {
int qpos = orig_file.rfind("?");
if (qpos != STR_NPOS) {
opaqueInfo += "&";
opaqueInfo += orig_file.substr(qpos + 1);
file_path.erase(qpos);
}
for (int j = 0; j < nsrc; j++) {
tag = "pio.";
tag += j;
stripe_path = "root://";
stripe_path += openOpaque->Get(tag.c_str());
stripe_path += "/";
stripe_path += orig_file.c_str();
int pos = stripe_path.rfind("//");
if (pos == STR_NPOS) {
address = "";
file_path = stripe_path.c_str();
} else {
address = std::string(stripe_path.c_str(), 0, pos + 1);
file_path = std::string(stripe_path.c_str(), pos + 1,
stripe_path.length() - pos - 1);
}
// remove the ?xyz from the individual source URL
int qpos = file_path.rfind("?");
if (qpos != STR_NPOS) {
file_path.erase(qpos);
}
src_location.push_back(std::make_pair(address, file_path));
src_type.push_back(RAID_ACCESS);
if (verbose || debug) {
fprintf(stdout, "[eoscp] src<%d>=%s [%s]\n", j,
src_location.back().second.c_str(), src_location.back().first.c_str());
}
}
} else {
//.....................................................................
// The file is not suitable for PIO access, do normal XRD access
//.....................................................................
src_type.push_back(XRD_ACCESS);
if (verbose || debug) {
fprintf(stdout, "[eoscp] doing standard access...\n");
}
}
} else {
fprintf(stderr,
"Error while parsing the opaque information from PIO request.\n");
exit(-1);
}
delete openOpaque;
delete response;
break;
} else {
//.....................................................................
// The file is not suitable for PIO access, do normal XRD access
//.....................................................................
src_type.push_back(XRD_ACCESS);
}
delete response;
} else {
//.....................................................................
// Recovering a file in place or forcing recovery can not be done in
// PIO mode, do normal XRD access (RAIN in gateway mode)
//.....................................................................
src_type.push_back(XRD_ACCESS);
}
}
} else if (src_location[i].second == "-") {
src_type.push_back(CONSOLE_ACCESS);
if (i > 0) {
fprintf(stderr, "error: you cannot read with several sources from stdin\n");
exit(-EPERM);
}
} else if (src_location[i].first.find(":/") != std::string::npos) {
src_type.push_back(RIO_ACCESS);
} else {
src_type.push_back(LOCAL_ACCESS);
}
}
//............................................................................
// Get destinations access type
//............................................................................
for (int i = 0; i < ndst; i++) {
if (dst_location[i].first.find("root://") != std::string::npos) {
if (isRaidTransfer && !isSrcRaid) {
dst_type.push_back(RAID_ACCESS);
} else {
//.......................................................................
// Here we rely on the fact that all destinations must be of the same type
//.......................................................................
dst_type.push_back(XRD_ACCESS);
}
} else if (dst_location[i].second == "-") {
dst_type.push_back(CONSOLE_ACCESS);
} else if (dst_location[i].first.find(":/") != std::string::npos) {
dst_type.push_back(RIO_ACCESS);
} else {
dst_type.push_back(LOCAL_ACCESS);
}
//..........................................................................
// Print the types of protocols involved
//..........................................................................
if (verbose || debug) {
fprintf(stdout, "[eoscp]: copy protocol ");
for (int j = 0; j < nsrc; j++) {
fprintf(stdout, "%s:", protocols[src_type[j]]);
}
fprintf(stdout, "=>");
for (int j = 0; j < ndst; j++) {
fprintf(stdout, "%s:", protocols[dst_type[j]]);
}
fprintf(stdout, "\n");
}
}
if (cksumcomparison) {
size_t dst_type_sz = dst_type.size();
if (dst_type_sz > 1) {
fprintf(stderr,
"error: too many destination provided. Checksum comparison between source and destination cannot be enabled.\n");
exit(-EINVAL);
}
if (dst_type_sz == 1 && dst_type[0] != XRD_ACCESS) {
fprintf(stderr,
"error: source and checksum comparison (-C) only allowed for destination using root protocol.\n");
exit(-EINVAL);
}
}
if (verbose || debug) {
fprintf(stderr, "\n");
}
if (egid >= 0) {
if (setgid(egid)) {
fprintf(stdout, "error: cannot change identity to gid %d\n", egid);
exit(-EPERM);
}
}
if (euid >= 0) {
if (setuid(euid)) {
fprintf(stdout, "error: cannot change identity to uid %d\n", euid);
exit(-EPERM);
}
}
//............................................................................
// Start the performance measurement
//............................................................................
gettimeofday(&abs_start_time, &tz);
bool got_rain_flags = false;
int raid_url_failed_count = 0;
if (!replicamode) {
for (int i = 0; i < nsrc; i++) {
// stat the source
switch (src_type[i]) {
case LOCAL_ACCESS: {
if (debug) {
fprintf(stdout, "[eoscp]: doing POSIX stat on %s\n",
src_location[i].second.c_str());
}
stat_failed = lstat(src_location[i].second.c_str(), &st[i]);
}
break;
// TODO: Improve stat call for RAID_ACCESS
// - should stat the FST file physical path
// - stat_failed should affect even RAID access
// Possible merge with XRD_ACCESS stat call
case RAID_ACCESS:
if (!got_rain_flags) {
XrdCl::URL url(src_location[i].first);
if (!url.IsValid()) {
fprintf(stderr, "warn: the url address is not valid url=%s\n",
src_location[i].first.c_str());
raid_url_failed_count++;
continue;
}
XrdCl::FileSystem fs(url);
XrdCl::StatInfo* response = 0;
status = fs.Stat(src_location[i].second, response);
if (!status.IsOK()) {
stat_failed = 1;
} else {
stat_failed = 0;
st[i].st_size = response->GetSize();
st[i].st_mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
if (response->TestFlags(XrdCl::StatInfo::IsWritable)) {
st[i].st_mode |= S_IWGRP;
}
got_rain_flags = true;
}
if (got_rain_flags) {
for (int j = 0; j < nsrc; j++) {
if (j != i) {
st[j].st_size = st[i].st_size;
st[j].st_mode = st[i].st_mode;
}
}
}
}
break;
case XRD_ACCESS: {
if (debug) {
fprintf(stdout, "[eoscp]: doing XROOT/RAIDIO stat on %s\n",
src_location[i].second.c_str());
}
XrdCl::URL url(src_location[i].first);
if (!url.IsValid()) {
fprintf(stderr, "error: the url address is not valid url=%s\n",
src_location[i].first.c_str());
exit(-EPERM);
}
XrdCl::FileSystem fs(url);
XrdCl::StatInfo* response = 0;
status = fs.Stat(src_location[i].second, response);
if (!status.IsOK()) {
stat_failed = 1;
} else {
stat_failed = 0;
st[i].st_size = response->GetSize();
st[i].st_mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
if (response->TestFlags(XrdCl::StatInfo::IsWritable)) {
st[i].st_mode |= S_IWGRP;
}
}
delete response;
}
break;
case CONSOLE_ACCESS:
stat_failed = 0;
break;
case RIO_ACCESS:
stat_failed = 0;
break;
}
if (!isRaidTransfer && stat_failed) {
fprintf(stderr, "error: cannot stat source %s[%s]\n",
src_location[i].first.c_str(), src_location[i].second.c_str());
exit(-ENOENT);
}
}
}
//............................................................................
// Start consistency check
//............................................................................
if ((isRaidTransfer) && (raid_url_failed_count > nparitystripes)) {
fprintf(stderr,
"error: not enough replicas for XROOT(RAIDIO) read");
exit(-EINVAL);
}
if ((!isRaidTransfer) && (nsrc > 1)) {
for (int i = 1; i < nsrc; i++) {
if (st[0].st_size != st[i].st_size) {
fprintf(stderr, "error: source files differ in size !\n");
exit(-EINVAL);
}
}
}
//............................................................................
// Check if this is a range link
//............................................................................
if (!replicamode) {
for (int i = 0; i < nsrc; i++) {
if (S_ISLNK(st[i].st_mode)) {
int readlink_size = 0;
char* readlinkbuff = (char*) malloc(4096);
if (!readlinkbuff) {
fprintf(stderr, "error: cannot allocate link buffer\n");
exit(-ENOMEM);
}
readlinkbuff[0] = 0;
switch (src_type[i]) {
case LOCAL_ACCESS:
if (debug) {
fprintf(stdout, "[eoscp]: doing POSIX readlink on %s\n",
src_location[i].second.c_str());
}
readlink_size = readlink(src_location[i].second.c_str(), readlinkbuff, 4096);
break;
case RAID_ACCESS:
case XRD_ACCESS:
case RIO_ACCESS:
readlink_size = 1;
break;
case CONSOLE_ACCESS:
readlink_size = 0;
break;
}
if (readlink_size < 0) {
fprintf(stderr, "error: cannot read the link of %s\n",
src_location[i].second.c_str());
exit(-errno);
}
char* space = strchr(readlinkbuff, ' ');
if (space) {
*space = 0;
char* colon = strchr(space + 1, ':');
if (colon) {
*colon = 0;
// yep, this is a range link
startbyte = strtoll(space + 1, 0, 0);
stopbyte = strtoll(colon + 1, 0, 0);
src_location[i] = std::make_pair("", readlinkbuff);
if (debug) {
fprintf(stdout, "[eoscp]: setting range to destination %s %lld:%lld\n",
src_location[i].second.c_str(), startbyte, stopbyte);
}
}
}
}
}
}
//............................................................................
// If transparent staging is not enabled, we need to check if files are online
//............................................................................
if (!transparentstaging) {
for (int i = 0; i < nsrc; i++) {
switch (src_type[i]) {
case LOCAL_ACCESS:
if (debug) {
fprintf(stdout,
"[eoscp]: POSIX is transparent for staging - nothing to check\n");
}
break;
case RAID_ACCESS:
if (debug) {
fprintf(stdout,
"[eoscp]: XROOT(RAIDIO) is transparent for staging - nothing to check\n");
}
break;
case XRD_ACCESS:
if (debug) {
fprintf(stdout,
"[eoscp]: XROOT is transparent for staging - nothing to check\n");
}
break;
case RIO_ACCESS:
if (debug) {
fprintf(stdout, "[eoscp]: RIO is transparent for staging - nothing to check\n");
}
break;
case CONSOLE_ACCESS:
if (debug) {
fprintf(stdout,
"[eoscp]: STDIN is transparent for staging - nothing to check\n");
}
break;
}
}
}
//............................................................................
// For the '-p' flag we create the needed destination directory tree
//............................................................................
struct stat dstst[MAXSRCDST];
if ((!replicamode) && createdir) {
mode_t mode = S_IRWXU | S_IRGRP | S_IROTH | S_IXGRP | S_IXOTH;
//..........................................................................
// Loop over the destination paths
//..........................................................................
for (int i = 0; i < ndst; i++) {
int pos = 0;
int mkdir_failed = 0;
int chown_failed = 0;
XrdOucString file_path = dst_location[i].second.c_str();
XrdOucString opaque = dst_location[i].second.c_str();
int npos;
if ((npos = opaque.find("?")) != STR_NPOS) {
opaque.erase(0, npos);
} else {
opaque = "";
}
while ((pos = file_path.find("/", pos + 1)) != STR_NPOS) {
XrdOucString subpath = file_path;
subpath.erase(pos);
switch (dst_type[i]) {
case LOCAL_ACCESS: {
if (debug) {
fprintf(stdout, "[eoscp]: doing POSIX stat on %s\n", subpath.c_str());
}
stat_failed = stat(const_cast(subpath.c_str()), &dstst[i]);
if (stat_failed) {
if (debug) {
fprintf(stdout, "[eoscp]: doing POSIX mkdir on %s\n", subpath.c_str());
}
mkdir_failed = mkdir(const_cast(subpath.c_str()), mode);
//..................................................................
// The root user can also set the user/group as in the target location
//..................................................................
if (getuid() == 0) {
if (!subpath.beginswith("/dev/")) {
chown_failed = chown(const_cast(subpath.c_str()), st[0].st_uid,
st[0].st_gid);
}
}
}
}
break;
case RAID_ACCESS:
case XRD_ACCESS: {
if (debug) {
fprintf(stdout, "[eoscp]: doing XROOT(RAIDIO) stat on %s\n", subpath.c_str());
}
subpath += opaque.c_str();
XrdCl::URL url(dst_location[i].first.c_str());
XrdCl::FileSystem fs(url);
XrdCl::StatInfo* response = 0;
status = fs.Stat(subpath.c_str(), response);
if (!status.IsOK()) {
if (debug) {
fprintf(stdout, "[eoscp]: doing XROOT mkdir on %s\n", subpath.c_str());
}
status = fs.MkDir(subpath.c_str(), XrdCl::MkDirFlags::None,
(XrdCl::Access::Mode)mode);
if (!status.IsOK()) {
mkdir_failed = 1;
}
}
delete response;
// Chown not supported by the standard xroot
}
break;
case RIO_ACCESS:
break;
case CONSOLE_ACCESS:
break;
}
if (mkdir_failed) {
std::string errmsg = (status.IsOK()) ?
("cannot create destination sub-directory " + subpath).c_str()
: status.GetErrorMessage();
fprintf(stderr, "error: %s\n", errmsg.c_str());
exit(-EPERM);
}
if (chown_failed) {
fprintf(stderr, "error: cannot set owner=%d/group=%d for %s\n",
st[i].st_uid, st[i].st_gid, subpath.c_str());
exit(-EPERM);
}
}
}
}
//............................................................................
// Open source files
//............................................................................
for (int i = 0; i < nsrc; i++) {
switch (src_type[i]) {
case LOCAL_ACCESS: {
if (debug) {
fprintf(stdout, "[eoscp]: doing POSIX open to read %s\n",
src_location[i].second.c_str());
}
src_handler.push_back(std::make_pair(open(src_location[i].second.c_str(),
O_RDONLY),
static_cast(NULL)));
}
break;
case RAID_ACCESS: {
if (isSrcRaid) {
int flags;
mode_t mode_sfs = 0;
std::vector vectUrl;
if (doStoreRecovery) {
flags = SFS_O_RDWR;
} else {
flags = SFS_O_RDONLY;
}
for (int j = 0; j < nsrc; j++) {
location = src_location[j].first + src_location[j].second;
vectUrl.push_back(location);
}
LayoutId::layoutid_t layout = 0;
if (replicationType == "raiddp") {
layout = LayoutId::GetId(LayoutId::kRaidDP,
1, nsrc,
LayoutId::BlockSizeEnum(stripeWidth),
LayoutId::OssXsBlockSize,
0, nparitystripes);
redundancyObj = new eos::fst::RaidDpLayout(NULL, layout, NULL, NULL,
location.c_str(),
0, doStoreRecovery);
} else if (replicationType == "reeds") {
layout = LayoutId::GetId(LayoutId::GetReedSLayoutByParity(nparitystripes),
1, nsrc,
LayoutId::BlockSizeEnum(stripeWidth),
LayoutId::OssXsBlockSize,
0, nparitystripes);
redundancyObj = new eos::fst::ReedSLayout(NULL, layout, NULL, NULL,
location.c_str(),
0, doStoreRecovery);
}
if (debug) {
fprintf(stdout, "[eoscp]: doing XROOT(RAID-PIO) open with flags: %x\n", flags);
}
if (redundancyObj->OpenPio(vectUrl, flags, mode_sfs, opaqueInfo.c_str())) {
fprintf(stderr, "error: can not open RAID object for read/write\n");
exit(-EIO);
}
}
}
break;
case XRD_ACCESS: {
if (debug) {
fprintf(stdout, "[eoscp]: doing XROOT open to read %s\n",
src_location[i].second.c_str());
}
location = src_location[i].first + src_location[i].second;
XrdCl::OpenFlags::Flags xrdcl_flags = XrdCl::OpenFlags::Read;
XrdCl::Access::Mode xrdcl_mode = XrdCl::Access::UR | XrdCl::Access::UW |
XrdCl::Access::GR | XrdCl::Access::OR;
if (doStoreRecovery) {
xrdcl_flags = XrdCl::OpenFlags::Update;
if ((location.find("?") == std::string::npos)) {
location += "?eos.rain.store=1";
} else {
location += "&eos.rain.store=1";
}
}
if (getenv("EOS_FUSE_SECRET")) {
if ((location.find("?") == std::string::npos)) {
location += "?eos.key=";
} else {
location += "&eos.key=";
}
location += getenv("EOS_FUSE_SECRET");
}
XrdCl::File* file = new XrdCl::File();
status = file->Open(location, xrdcl_flags, xrdcl_mode);
if (!status.IsOK()) {
std::string errmsg;
errmsg = status.GetErrorMessage();
fprintf(stderr, "error: %s\n", status.ToStr().c_str());
exit(-status.errNo ? -status.errNo : -EIO);
} else {
file->GetProperty("LastURL", src_lasturl);
}
src_handler.push_back(std::make_pair(0, (void*)file));
}
break;
case RIO_ACCESS: {
if (debug) {
fprintf(stdout, "[eoscp]: doing RIO open to read %s\n",
src_location[i].second.c_str());
}
location = src_location[i].first + src_location[i].second;
if (location.substr(0, 3) == "xrd") {
location.replace(0, 3, "root");
}
eos::fst::FileIo* file = eos::fst::FileIoPluginHelper::GetIoObject(
location.c_str());
if (!file) {
fprintf(stderr, "error: failed to get IO object for %s\n", location.c_str());
exit(-1);
}
retc = file->fileOpen(0);
if (retc) {
eos::common::error_retc_map(file->GetLastErrNo());
fprintf(stderr, "error: source file open failed - errno=%d : %s [%s]\n", errno,
strerror(errno),
file->GetLastErrMsg().c_str());
exit(-errno);
} else {
src_lasturl = file->GetLastUrl();
}
src_handler.push_back(std::make_pair(0, (void*)file));
}
break;
case CONSOLE_ACCESS:
src_handler.push_back(std::make_pair(fileno(stdin),
static_cast(NULL)));
break;
}
if ((!isRaidTransfer) &&
(src_handler[i].first < 0) &&
(src_handler[i].second == NULL)) {
std::string errmsg;
errmsg = status.GetErrorMessage();
fprintf(stderr, "error: %s\n", status.ToStr().c_str());
exit(-status.errNo ? -status.errNo : -EIO);
}
if (isRaidTransfer && isSrcRaid) {
break;
}
}
//............................................................................
// Seek the required start position
//............................................................................
if (startbyte > 0) {
for (int i = 0; i < nsrc; i++) {
if (debug) {
fprintf(stdout, "[eoscp]: seeking in %s to position %lld\n",
src_location[i].second.c_str(), startbyte);
}
switch (src_type[i]) {
case LOCAL_ACCESS: {
startbyte = lseek(src_handler[i].first, startbyte, SEEK_SET);
offsetXS = startbyte;
}
break;
case RAID_ACCESS: {
offsetXrd = startbyte;
offsetXS = startbyte;
}
break;
case XRD_ACCESS: {
//TODO::
//startbyte = XrdPosixXrootd::Lseek( srcfd[i], startbyte, SEEK_SET );
offsetXS = startbyte;
}
break;
case RIO_ACCESS:
offsetXrd = startbyte;
offsetXS = startbyte;
break;
case CONSOLE_ACCESS:
break;
}
if (startbyte < 0) {
fprintf(stderr, "error: cannot seek start position of file %s %d\n",
src_location[i].second.c_str(), errno);
exit(-EIO);
}
}
}
//............................................................................
// Open destination files
//............................................................................
for (int i = 0; i < ndst; i++) {
retc = 0;
switch (dst_type[i]) {
case LOCAL_ACCESS: {
if (debug) {
fprintf(stdout, "[eoscp]: doing POSIX open to write %s\n",
dst_location[i].second.c_str());
}
if (nooverwrite) {
struct stat buf;
if (!stat(dst_location[i].second.c_str(), &buf)) {
fprintf(stderr, "error: target file exists already!\n");
exit(-EEXIST);
}
}
if (appendmode) {
dst_handler.push_back(std::make_pair(open(dst_location[i].second.c_str(),
O_WRONLY | O_CREAT, st[i].st_mode),
static_cast(NULL)));
} else {
dst_handler.push_back(std::make_pair(open(dst_location[i].second.c_str(),
O_WRONLY | O_TRUNC | O_CREAT, st[i].st_mode),
static_cast(NULL)));
}
}
break;
case RAID_ACCESS: {
if (!isSrcRaid) {
int flags;
std::vector vectUrl;
flags = SFS_O_CREAT | SFS_O_WRONLY;
for (int j = 0; j < ndst; j++) {
location = dst_location[j].first + dst_location[j].second;
vectUrl.push_back(location);
}
LayoutId::layoutid_t layout = 0;
if (replicationType == "raiddp") {
layout = LayoutId::GetId(LayoutId::kRaidDP,
1, ndst,
LayoutId::BlockSizeEnum(stripeWidth),
LayoutId::OssXsBlockSize,
0, nparitystripes);
redundancyObj = new eos::fst::RaidDpLayout(NULL, layout, NULL, NULL,
location.c_str(),
0, doStoreRecovery, isStreamFile);
} else if (replicationType == "reeds") {
layout = LayoutId::GetId(LayoutId::GetReedSLayoutByParity(nparitystripes),
1, ndst,
LayoutId::BlockSizeEnum(stripeWidth),
LayoutId::OssXsBlockSize,
0, nparitystripes);
redundancyObj = new eos::fst::ReedSLayout(NULL, layout, NULL, NULL,
location.c_str(),
0, doStoreRecovery, isStreamFile);
}
if (debug) {
fprintf(stdout, "[eoscp]: doing XROOT(RAIDIO-PIO) open with flags: %x\n",
flags);
}
if (redundancyObj && redundancyObj->OpenPio(vectUrl, flags)) {
fprintf(stderr, "error: can not open RAID object for write\n");
exit(-EIO);
}
}
}
break;
case XRD_ACCESS: {
if (debug) {
fprintf(stdout, "[eoscp]: doing XROOT open to write %s\n",
dst_location[i].second.c_str());
}
location = dst_location[i].first + dst_location[i].second;
if (getenv("EOS_FUSE_SECRET")) {
if ((location.find("?") == std::string::npos)) {
location += "?eos.key=";
} else {
location += "&eos.key=";
}
location += getenv("EOS_FUSE_SECRET");
}
eos::fst::XrdIo* file = new eos::fst::XrdIo(location.c_str());
if (appendmode || nooverwrite) {
XrdCl::URL url(dst_location[i].first);
if (!url.IsValid()) {
fprintf(stderr, "error: the destination url address is not valid url=%s\n",
dst_location[i].first.c_str());
exit(-EPERM);
}
XrdCl::FileSystem fs(url);
XrdCl::StatInfo* response = 0;
status = fs.Stat(dst_location[i].second, response);
if (status.IsOK()) {
if (nooverwrite) {
fprintf(stderr, "error: target file exists already!\n");
exit(-EEXIST);
}
retc = file->fileOpen(SFS_O_RDWR, st[i].st_mode, "");
} else {
retc = file->fileOpen(SFS_O_CREAT | SFS_O_RDWR,
S_IRUSR | S_IWUSR | S_IRGRP, "");
}
if (!startwritebyte && response) {
startwritebyte = response->GetSize();
}
delete response;
} else {
retc = file->fileOpen(SFS_O_CREAT | SFS_O_RDWR,
S_IRUSR | S_IWUSR | S_IRGRP, "");
}
if (retc) {
eos::common::error_retc_map(file->GetLastErrNo());
fprintf(stderr, "error: target file open failed - errno=%d : %s [%s]\n",
errno, strerror(errno),
file->GetLastErrMsg().c_str());
exit(-errno);
} else {
dst_lasturl = file->GetLastUrl();
}
dst_handler.push_back(std::make_pair(0, file));
}
break;
case RIO_ACCESS: {
if (debug) {
fprintf(stdout, "[eoscp]: doing open to write %s\n",
dst_location[i].second.c_str());
}
location = dst_location[i].first + dst_location[i].second;
if (location.substr(0, 3) == "xrd") {
location.replace(0, 3, "root");
}
eos::fst::FileIo* file = eos::fst::FileIoPluginHelper::GetIoObject(
location.c_str());
location = src_location[i].first + src_location[i].second;
if (!file->fileExists()) {
if (nooverwrite) {
fprintf(stderr, " error; target file exists already!\n");
exit(-EEXIST);
}
retc = file->fileOpen(SFS_O_RDWR, st[i].st_mode, "");
} else {
retc = file->fileOpen(SFS_O_CREAT | SFS_O_RDWR,
st[i].st_mode, "");
}
if (retc) {
eos::common::error_retc_map(file->GetLastErrNo());
fprintf(stderr, "error: target file open failed - errno=%d : %s\n", errno,
strerror(errno));
exit(-errno);
} else {
dst_lasturl = file->GetLastUrl();
}
dst_handler.push_back(std::make_pair(0, file));
}
break;
case CONSOLE_ACCESS:
dst_handler.push_back(std::make_pair(fileno(stdout),
static_cast(NULL)));
break;
}
if ((!isRaidTransfer) &&
(dst_handler[i].first <= 0) &&
(dst_handler[i].second == NULL)) {
std::string errmsg;
errmsg = status.GetErrorMessage();
if (status.errNo) {
fprintf(stderr, "error: errc=%d msg=\"%s\"\n", status.errNo, errmsg.c_str());
} else {
fprintf(stderr, "error: errc=%d msg=\"%s\"\n", errno ? errno : EINVAL,
strerror(errno ? errno : EINVAL));
}
exit(-status.errNo ? -status.errNo : -1);
}
if (isRaidTransfer && !isSrcRaid) {
break;
}
}
//............................................................................
// In case the file exists, seek the end and print the offset
//............................................................................
if (appendmode) {
for (int i = 0; i < ndst; i++) {
switch (dst_type[i]) {
case LOCAL_ACCESS:
startwritebyte = lseek(dst_handler[i].first, 0, SEEK_END);
break;
case RAID_ACCESS:
// Not supported
break;
case XRD_ACCESS:
break;
case RIO_ACCESS:
break;
case CONSOLE_ACCESS:
// Not supported
break;
}
if (startwritebyte < 0) {
fprintf(stderr, "error: cannot seek to end of file to %d of %s\n",
dest_mode[i], dst_location[i].second.c_str());
exit(-EIO);
}
}
}
//............................................................................
// Set the source mode or a specified one for the destination
//............................................................................
for (int i = 0; i < ndst; i++) {
int chmod_failed = 0;
int chown_failed = 0;
if (!set_mode) {
//........................................................................
// If not specified on the command line, take the source mode
//........................................................................
if (S_ISREG(dstst[i].st_mode)) {
// only for files !
dest_mode[i] = st[0].st_mode;
}
}
switch (dst_type[i]) {
case LOCAL_ACCESS: {
if ((S_ISREG(dstst[i].st_mode) &&
(dst_location[i].second.substr(0, 5) != "/dev/"))) {
chmod_failed = chmod(dst_location[i].second.c_str(), dest_mode[i]);
if (getuid() == 0) {
chown_failed = chown(dst_location[i].second.c_str(), st[0].st_uid,
st[0].st_gid);
}
}
}
break;
case RAID_ACCESS:
case XRD_ACCESS:
case RIO_ACCESS:
case CONSOLE_ACCESS:
//........................................................................
// Not supported, no such functionality in the standard xroot or console
//........................................................................
break;
}
if (chmod_failed) {
fprintf(stderr, "error: cannot set permissions to %d for file %s\n",
dest_mode[i], dst_location[i].second.c_str());
exit(-EPERM);
}
if (chown_failed) {
fprintf(stderr, "error: cannot set owner=%d/group=%d for %s\n",
st[i].st_uid, st[i].st_gid, dst_location[i].second.c_str());
exit(-EPERM);
}
}
//............................................................................
// Do the actual copy operation
//............................................................................
char* ptr_buffer = buffer;
long long totalbytes = 0;
double wait_time = 0;
struct timespec start, end;
stopwritebyte = startwritebyte;
while (1) {
if (progressFile.length()) {
write_progress(totalbytes, st[0].st_size);
}
if (progbar) {
gettimeofday(&abs_stop_time, &tz);
for (int i = 0; i < nsrc; i++) {
if ((src_type[i] == XRD_ACCESS) && (targetsize)) {
st[i].st_size = targetsize;
}
}
print_progbar(totalbytes, st[0].st_size);
}
if (bandwidth) {
gettimeofday(&abs_stop_time, &tz);
float abs_time = static_cast((abs_stop_time.tv_sec -
abs_start_time.tv_sec) * 1000 +
(abs_stop_time.tv_usec - abs_start_time.tv_usec) / 1000);
//........................................................................
// Regulate the io - sleep as desired
//........................................................................
float exp_time = totalbytes / bandwidth / 1000.0;
if (abs_time < exp_time) {
usleep((int)(1000 * (exp_time - abs_time)));
}
}
//..........................................................................
// For ranges we have to adjust the last buffersize
//..........................................................................
if ((stopbyte >= 0) &&
(((stopbyte - startbyte) - totalbytes) < buffersize)) {
buffersize = (stopbyte - startbyte) - totalbytes;
}
int nread = -1;
auto mReadStart = chrono::steady_clock::now();
switch (src_type[0]) {
case LOCAL_ACCESS:
case CONSOLE_ACCESS:
nread = read(src_handler[0].first,
static_cast(ptr_buffer),
buffersize);
break;
case RAID_ACCESS: {
nread = redundancyObj->Read(offsetXrd, ptr_buffer, buffersize);
offsetXrd += nread;
}
break;
case XRD_ACCESS: {
eos::common::Timing::GetTimeSpec(start);
uint32_t xnread = 0;
status = static_cast(src_handler[0].second)->Read(offsetXrd,
buffersize, ptr_buffer, xnread);
nread = xnread;
if (!status.IsOK()) {
fprintf(stderr, "Error while doing reading. \n");
exit(-1);
}
eos::common::Timing::GetTimeSpec(end);
wait_time = static_cast((end.tv_sec * 1000 + end.tv_nsec / 1000000) -
(start.tv_sec * 1000 + start.tv_nsec / 1000000));
read_wait += wait_time;
offsetXrd += nread;
if (debug) {
fprintf(stderr, "[eoscp] read=%d\n", nread);
}
}
break;
case RIO_ACCESS: {
eos::common::Timing::GetTimeSpec(start);
int64_t nread64;
nread64 = static_cast(src_handler[0].second)->fileRead(
offsetXrd, ptr_buffer, buffersize);
if (nread64 < 0) {
nread = -1;
} else {
nread = (int) nread64;
}
eos::common::Timing::GetTimeSpec(end);
wait_time = static_cast((end.tv_sec * 1000 + end.tv_nsec / 1000000) -
(start.tv_sec * 1000 + start.tv_nsec / 1000000));
read_wait += wait_time;
offsetXrd += nread;
if (debug) {
fprintf(stderr, "[eoscp] read=%d\n", nread);
}
}
break;
}
auto mReadStop = chrono::steady_clock::now();
ingress_microseconds += chrono::duration_cast
(mReadStop - mReadStart).count();
if (nread < 0) {
fprintf(stderr, "error: read failed on file %s - destination file "
"is incomplete!\n", src_location[0].second.c_str());
exit(-EIO);
}
if (nread == 0) {
// end of file
break;
}
if (computeXS && xsObj) {
xsObj->Add(static_cast(ptr_buffer), nread, offsetXS);
offsetXS += nread;
}
auto mWriteStart = chrono::steady_clock::now();
int64_t nwrite = 0;
for (int i = 0; i < ndst; i++) {
switch (dst_type[i]) {
case LOCAL_ACCESS:
case CONSOLE_ACCESS:
write(dst_handler[i].first, ptr_buffer, nread);
nwrite = nread;
break;
case RAID_ACCESS: {
if (i == 0) {
nwrite = redundancyObj->Write(stopwritebyte, ptr_buffer, nread);
i = ndst;
}
}
break;
case XRD_ACCESS: {
// Do writes in async mode
eos::common::Timing::GetTimeSpec(start);
nwrite = static_cast(dst_handler[i].second)->fileWriteAsync(
stopwritebyte, ptr_buffer, nread);
eos::common::Timing::GetTimeSpec(end);
wait_time = static_cast((end.tv_sec * 1000 + end.tv_nsec / 1000000) -
(start.tv_sec * 1000 + start.tv_nsec / 1000000));
write_wait += wait_time;
if (debug) {
fprintf(stderr, "[eoscp] write=%li\n", nwrite);
}
}
break;
case RIO_ACCESS: {
eos::common::Timing::GetTimeSpec(start);
int64_t nwrite64;
nwrite64 = static_cast(dst_handler[i].second)->fileWrite(
stopwritebyte, ptr_buffer, nread);
if (nwrite64 < 0) {
nwrite = -1;
} else {
nwrite = (int) nwrite64;
}
eos::common::Timing::GetTimeSpec(end);
wait_time = static_cast((end.tv_sec * 1000 + end.tv_nsec / 1000000) -
(start.tv_sec * 1000 + start.tv_nsec / 1000000));
write_wait += wait_time;
if (debug) {
fprintf(stderr, "[eoscp] write=%li\n", nwrite);
}
}
break;
}
if (nwrite != nread) {
fprintf(stderr, "error: write failed on destination file %s - "
"wrote %lld/%lld bytes - destination file is incomplete!\n",
dst_location[i].second.c_str(), (long long) nwrite, (long long) nread);
exit(-EIO);
}
}
auto mWriteStop = chrono::steady_clock::now();
egress_microseconds += chrono::duration_cast
(mWriteStop - mWriteStart).count();
totalbytes += nwrite;
stopwritebyte += nwrite;
} // end while(1)
// Wait for all async write requests before moving on
eos::common::Timing::GetTimeSpec(start);
eos::fst::AsyncMetaHandler* ptr_handler = 0;
bool write_error = false;
for (int i = 0; i < ndst; i++) {
if (dst_type[i] == XRD_ACCESS) {
if (dst_handler[i].second) {
ptr_handler = static_cast(
static_cast(dst_handler[i].second)->fileGetAsyncHandler());
if (ptr_handler) {
uint16_t error_type = ptr_handler->WaitOK();
if (error_type != XrdCl::errNone) {
fprintf(stderr, "Error while doing the async writing.\n");
write_error = true;
}
}
}
}
}
eos::common::Timing::GetTimeSpec(end);
wait_time = static_cast((end.tv_sec * 1000 + end.tv_nsec / 1000000) -
(start.tv_sec * 1000 + start.tv_nsec / 1000000));
write_wait += wait_time;
if (computeXS && xsObj) {
xsObj->Finalize();
xsValue = xsObj->GetHexChecksum();
}
if (progbar) {
gettimeofday(&abs_stop_time, &tz);
for (int i = 0; i < nsrc; i++) {
if (src_type[i] == XRD_ACCESS) {
st[i].st_size = totalbytes;
}
}
print_progbar(totalbytes, st[0].st_size);
cout << endl;
}
auto xferSummary = createXferSummary(src_location, dst_location, totalbytes);
if (jsonoutput) {
print_json_summary(xferSummary);
} else {
if (summary) {
print_summary(xferSummary);
}
}
//............................................................................
// Close all files
//............................................................................
for (int i = 0; i < nsrc; i++) {
switch (src_type[i]) {
case LOCAL_ACCESS:
close(src_handler[i].first);
break;
case RAID_ACCESS:
if (i == 0) {
redundancyObj->Close();
i = nsrc;
delete redundancyObj;
}
break;
case XRD_ACCESS:
status = static_cast(src_handler[i].second)->Close();
if (!status.IsOK()) {
fprintf(stderr,
"error: close failed on source - file modified during replication\n");
exit(-EIO);
}
delete static_cast(src_handler[i].second);
break;
case RIO_ACCESS:
retc = static_cast(src_handler[i].second)->fileClose();
if (retc) {
fprintf(stderr,
"error: close failed on source - file modified during replication\n");
exit(-EIO);
}
delete static_cast(src_handler[i].second);
break;
case CONSOLE_ACCESS:
break;
}
}
for (int i = 0; i < ndst; i++) {
switch (dst_type[i]) {
case LOCAL_ACCESS:
close(dst_handler[i].first);
break;
case RAID_ACCESS:
if (i == 0) {
errno = 0;
redundancyObj->Close();
if (errno) {
fprintf(stderr, "error: %s\n", redundancyObj->GetLastErrMsg().c_str());
}
i = ndst;
delete redundancyObj;
}
break;
case XRD_ACCESS:
retc = static_cast(dst_handler[i].second)->fileClose();
if (retc) {
fprintf(stderr, "error: %s\n", status.ToStr().c_str());
exit(-EIO);
}
delete static_cast(dst_handler[i].second);
break;
case RIO_ACCESS:
retc = static_cast(dst_handler[i].second)->fileClose();
if (retc) {
fprintf(stderr, "error: close failed on target\n");
exit(-EIO);
}
delete static_cast(dst_handler[i].second);
break;
case CONSOLE_ACCESS:
//........................................................................
// Nothing to do
//........................................................................
break;
}
}
if (dosymlink) {
int symlink_failed = 0;
char rangedestname[4096];
if (appendmode) {
sprintf(rangedestname, "%s %llu:%llu",
dst_location[0].second.c_str(),
static_cast(startwritebyte),
static_cast(stopwritebyte));
} else {
sprintf(rangedestname, "%s", dst_location[0].second.c_str());
}
if (debug) {
fprintf(stdout, "[eoscp]: creating symlink %s->%s\n", symlinkname,
rangedestname);
}
switch (dst_type[0]) {
case LOCAL_ACCESS: {
unlink(symlinkname);
symlink_failed = symlink(rangedestname, symlinkname);
}
break;
case RAID_ACCESS:
case XRD_ACCESS:
case RIO_ACCESS:
case CONSOLE_ACCESS:
//........................................................................
// Noting to do, xrootd has no symlink support in posix
//........................................................................
break;
}
if (symlink_failed) {
fprintf(stderr, "error: cannot creat symlink from %s -> %s\n",
symlinkname, rangedestname);
exit(-ESPIPE);
}
}
if (debug) {
fprintf(stderr, "[eoscp] # Total read wait time : %f ms \n",
read_wait);
fprintf(stderr, "[eoscp] # Total write wait time : %f ms \n",
write_wait);
}
if (cksumcomparison) {
// The client asked for some checksum comparison between the source and the destination
std::string destServer = dst_location[0].first;
std::string destFilePath = dst_location[0].second;
XrdCl::URL url(destServer);
// No need to check the URL consistency as the transfer already happened
XrdCl::FileSystem fs(url);
CompareCksumResult res = compareChecksum(fs, destFilePath, xsString, xsValue);
if (res.cksumMismatch) {
// Checksum mismatch, print related error
fprintf(stderr, "%s\n", res.errMsg.c_str());
if (cksummismatchdelete) {
// The user wants to delete the file if the checksum mismatch between source and destination
fprintf(stderr, "Deleting the file from the destination %s%s\n",
destServer.c_str(), destFilePath.c_str());
status = fs.Rm(destFilePath);
if (!status.IsOK()) {
fprintf(stderr,
"error while trying to delete the file from the destination (%s): %s\n",
destFilePath.c_str(), status.ToStr().c_str());
exit(-status.errNo ? -status.errNo : -1);
}
}
// Just return the error code set during the checksum checking
exit(-res.xrdErrno ? -res.xrdErrno : -1);
}
}
// Free memory
delete[] buffer;
if (write_error) {
return -EIO;
}
return 0;
}