//------------------------------------------------------------------------------
//! @file EosFstHttpHandler.hh
//! @author Andreas-Joachim Peters & Elvin Sindrilaru - CERN
//------------------------------------------------------------------------------
/************************************************************************
* EOS - the CERN Disk Storage System *
* Copyright (C) 2021 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include
#include "XrdSfs/XrdSfsInterface.hh"
#include "common/Logging.hh"
#include "fst/XrdFstOfs.hh"
#include "fst/http/HttpServer.hh"
#include "common/http/ProtocolHandler.hh"
#include "common/StringConversion.hh"
#include "common/Timing.hh"
#include "EosFstHttpHandler.hh"
XrdVERSIONINFO(XrdSfsGetFileSystem, EosFstHttp);
//------------------------------------------------------------------------------
// Helper function to convert hex to decimal
//------------------------------------------------------------------------------
static int decode_hex(int ch)
{
if ('0' <= ch && ch <= '9') {
return ch - '0';
} else if ('A' <= ch && ch <= 'F') {
return ch - 'A' + 0xa;
} else if ('a' <= ch && ch <= 'f') {
return ch - 'a' + 0xa;
} else {
return -1;
}
}
//------------------------------------------------------------------------------
// Initialize handler
//------------------------------------------------------------------------------
int
EosFstHttpHandler::Init(const char* cfgfile)
{
if (getenv("EOSFSTOFS")) {
OFS = (eos::fst::XrdFstOfs*)(strtoull(getenv("EOSFSTOFS"), 0, 10));
}
std::string cfg;
eos::common::StringConversion::LoadFileIntoString(cfgfile, cfg);
size_t fpos = cfg.find("xrd.protocol XrdHttp:");
if (fpos != std::string::npos) {
size_t epos = cfg.find(" ", fpos + 21);
if (epos != std::string::npos) {
std::string port = cfg.substr(fpos + 21, epos - fpos - 21);
setenv("EOSFSTXRDHTTP", port.c_str(), 1);
eos_static_notice("publishing XrdHttp port: %s", port.c_str());
}
}
return 0;
}
//------------------------------------------------------------------------------
// Check if request should be handled by the current handler
//------------------------------------------------------------------------------
bool
EosFstHttpHandler::MatchesPath(const char* verb, const char* path)
{
if (EOS_LOGS_DEBUG) {
eos_static_debug("verb=%s path=%s", verb, path);
}
// Leave the XrdHttpTPC plugin deal with COPY/OPTIONS verbs
if ((strcmp(verb, "COPY") == 0) || (strcmp(verb, "OPTIONS") == 0)) {
return false;
}
return true;
}
//------------------------------------------------------------------------------
// Process current request
//------------------------------------------------------------------------------
int
EosFstHttpHandler::ProcessReq(XrdHttpExtReq& req)
{
using eos::common::HttpResponse;
std::string body;
if (!OFS) {
eos_static_crit("%s", "msg=\"OFS not accessible\"");
return -1;
}
std::map cookies;
std::map normalized_headers;
// Normalize the input headers to lower-case
for (const auto& hdr : req.headers) {
eos_static_info("msg\"normalize hdr\" key=\"%s\" value=\"%s\"",
hdr.first.c_str(), hdr.second.c_str());
normalized_headers[LC_STRING(hdr.first)] = hdr.second;
}
std::string verb = req.verb;
std::string query = normalized_headers.count("xrd-http-query") ?
normalized_headers["xrd-http-query"] : "";
if (req.verb == "PUT") {
// CREATE makes sure the handler just opens the file and all writes
// are done later
verb = "CREATE";
}
std::unique_ptr
handler = OFS->mHttpd->XrdHttpHandler(verb, req.resource, normalized_headers,
query, cookies, body, req.GetSecEntity());
if (handler == nullptr) {
std::string errmsg = "failed to create handler";
return req.SendSimpleResp(500, errmsg.c_str(), nullptr, errmsg.c_str(),
errmsg.length());
}
eos::common::HttpResponse* response = handler->GetResponse();
if (!response) {
std::string errmsg = "failed to create response object";
return req.SendSimpleResp(500, errmsg.c_str(), nullptr, errmsg.c_str(),
errmsg.length());
}
response->AddHeader("Date", eos::common::Timing::utctime(time(NULL)));
eos_static_debug("response-header: %s",
response->GetHdrsWithFilter({}).c_str());
if (req.verb == "HEAD") {
return req.SendSimpleResp(response->GetResponseCode(),
response->GetResponseCodeDescription().c_str(),
response->GetHdrsWithFilter({HttpResponse::kContentLength}).c_str(),
response->GetBody().c_str(),
response->GetBody().length());
}
if (req.verb == "GET") {
if ((response->GetResponseCode() != response->OK) &&
(response->GetResponseCode() != response->PARTIAL_CONTENT)) {
return req.SendSimpleResp(response->GetResponseCode(),
response->GetResponseCodeDescription().c_str(),
response->GetHdrsWithFilter({HttpResponse::kContentLength}).c_str(),
response->GetBody().c_str(),
response->GetBody().length());
} else {
int retc = 0;
// Need to update the content length determined while opening the file
long long content_length = 0ll;
auto it_hd = response->GetHeaders().find("Content-Length");
if (it_hd != response->GetHeaders().end()) {
try {
content_length = std::stoll(it_hd->second);
} catch (...) {}
}
if (response->GetResponseCode() == response->PARTIAL_CONTENT) {
retc = req.SendSimpleResp(response->GetResponseCode(),
response->GetResponseCodeDescription().c_str(),
response->GetHdrsWithFilter({HttpResponse::kContentLength}).c_str(),
nullptr, content_length);
} else {
retc = req.SendSimpleResp(0, response->GetResponseCodeDescription().c_str(),
response->GetHdrsWithFilter({HttpResponse::kContentLength}).c_str(),
nullptr , content_length);
}
if (retc) {
return retc;
}
ssize_t nread = 0;
off_t pos = 0;
// allocate an IO buffer of 1M or if smaller the required content length
std::vector buffer(content_length > (1024 * 1024) ?
(1024 * 1024) : content_length);
do {
eos_static_debug("pos=%llu size=%u", pos, buffer.capacity());
nread = OFS->mHttpd->FileReader(handler.get(), pos, &buffer[0],
buffer.capacity());
if (nread >= 0) {
pos += nread;
retc |= req.SendSimpleResp(1, nullptr, nullptr, &buffer[0], nread);
eos_static_debug("retc=%d", retc);
} else {
retc = -1;
}
} while ((pos != content_length) && (nread > 0) && !retc);
OFS->mHttpd->FileClose(handler.get(), retc);
return retc;
}
}
if (req.verb == "PUT") {
bool is_chunked = (normalized_headers.count("transfer-encoding") &&
(normalized_headers["transfer-encoding"] == "chunked"));
// If no content-length provided then return an error
if ((normalized_headers.count("content-length") == 0) && !is_chunked) {
response->SetResponseCode(eos::common::HttpResponse::LENGTH_REQUIRED);
}
eos_static_debug("response-code=%d", response->GetResponseCode());
if ((response->GetResponseCode() != 0) &&
(response->GetResponseCode() != 200)) {
return req.SendSimpleResp(response->GetResponseCode(),
response->GetResponseCodeDescription().c_str(),
response->GetHdrsWithFilter({HttpResponse::kContentLength}).c_str(),
response->GetBody().c_str(),
response->GetBody().length());
}
if (is_chunked) {
if (!HandleChunkUpload(req, handler.get(), normalized_headers, cookies,
query)) {
return req.SendSimpleResp(500, "fatal internal error", "during chunk upload",
nullptr, 0);
}
} else {
long long content_length = 0ll;
try {
content_length = std::stoll(normalized_headers["content-length"]);
} catch (...) {}
if ((response->GetResponseCode() == 0) &&
(normalized_headers.count("expect") &&
(normalized_headers["expect"] == "100-continue"))) {
// reply to 100-CONTINUE request
eos_static_debug("%s", "msg=\"sending 100-continue\"");
req.SendSimpleResp(100, nullptr,
response->GetHdrsWithFilter({HttpResponse::kContentLength}).c_str(),
nullptr, 0);
}
int retc = 0;
long long content_left = content_length;
const long long eoshttp_sz = 1024 * 1024;
const long long xrdhttp_sz = 256 * 1024;
std::string body;
do {
long long content_read = std::min(eoshttp_sz, content_left);
body.clear();
body.reserve(content_read);
char* ptr = nullptr;
long long read_len = 0;
do {
size_t chunk_len = std::min(xrdhttp_sz, content_read - read_len);
int rb = req.BuffgetData(chunk_len, &ptr, true);
eos_static_debug("content-read=%lli rb=%i body=%u content_left=%lli",
content_read, rb, body.size(), content_left);
if (rb > 0) {
body.append(ptr, rb);
read_len += rb;
} else {
break;
}
} while (read_len < content_read);
if (read_len != content_read) {
eos_static_crit("msg=\"short read during PUT, expected %lu bytes"
" but got %lu bytes", content_read, read_len);
retc = -1;
} else {
retc |= OFS->mHttpd->FileWriter(handler.get(), req.verb, req.resource,
normalized_headers, query, cookies, body);
if (!retc) {
content_left -= content_read;
}
}
} while (!retc && content_left);
eos_static_debug("retc=%d", retc);
if (!retc) {
// trigger the close handler by calling with empty body
body.clear();
retc |= OFS->mHttpd->FileWriter(handler.get(), req.verb, req.resource,
normalized_headers, query, cookies, body);
}
}
eos::common::HttpResponse* response = handler->GetResponse();
if (response && response->GetResponseCode()) {
return req.SendSimpleResp(response->GetResponseCode(),
response->GetResponseCodeDescription().c_str(),
response->GetHdrsWithFilter({HttpResponse::kContentLength}).c_str(),
response->GetBody().c_str(),
response->GetBody().length());
} else {
return req.SendSimpleResp(500, "fatal internal error", nullptr, nullptr, 0);
}
}
return 0;
}
//------------------------------------------------------------------------------
// Handle chunk upload operation
//------------------------------------------------------------------------------
bool
EosFstHttpHandler::HandleChunkUpload(XrdHttpExtReq& req,
eos::common::ProtocolHandler* handler,
std::map& norm_hdrs,
std::map& cookies,
std::string& query)
{
bool success = false;
const unsigned long long xrdhttp_sz = 256 * 1024;
const int max_size = 4096;
std::string ssize;
std::string chunk;
char* ptr = nullptr;
eos::common::Timing tm("ChunkUpload");
COMMONTIMING("START", &tm);
while (true) {
bool has_size = false;
ssize.clear();
// Counter of the amount of times we did not receive any data from the client
// if it is greater than a certain threshold, we will stop trying to read data from the socket
unsigned int noDataReceivedCpt = 0;
unsigned int noDataReceivedCptThreshold = max_size * 2;
// Read in line containing the chunk size
while (ssize.length() < max_size && noDataReceivedCpt <= noDataReceivedCptThreshold) {
if (req.BuffgetData(1, &ptr, true) == 1) {
ssize.append(ptr, 1);
}
size_t len = ssize.length();
if(len == 0) {
noDataReceivedCpt++;
} else {
noDataReceivedCpt = 0;
}
if ((len >= 2) && (ssize[len - 2] == '\r') && (ssize[len - 1] == '\n')) {
ssize.erase(len - 2);
has_size = true;
break;
}
}
if(noDataReceivedCpt > noDataReceivedCptThreshold) {
std::stringstream ss;
ss << "msg=\"no data received from the client after " << noDataReceivedCptThreshold << " attempts\"";
eos_static_err("%s", ss.str().c_str());
}
if (!has_size) {
break;
}
// Get numeric value for the chunk size
unsigned long long chunk_sz = 0ull;
try {
size_t pos = 0;
chunk_sz = std::stoull(ssize, &pos, 16);
if (pos != ssize.length()) {
throw std::runtime_error("failed to convert chunk size");
}
} catch (...) {
eos_static_err("msg=\"chunk size is not a number\" data=\"%s\"",
ssize.c_str());
break;
}
chunk.clear();
chunk.reserve(chunk_sz);
// This is the final byte, read in the CRLF ("\r\n")
if (chunk_sz == 0) {
if (req.BuffgetData(2, &ptr, true) != 2) {
eos_static_err("%s", "msg=\"failed reading end message for chunk upload\"");
break;
}
if ((*ptr != '\r') || (*(++ptr) != '\n')) {
eos_static_err("%s", "msg=\"chunk upload end message not what we expected\"");
break;
}
} else { // This is normal chunk with data, read it in and write to the file
unsigned long long read_len = 0ull;
do {
size_t block_len = std::min(xrdhttp_sz, chunk_sz);
int rb = req.BuffgetData(block_len, &ptr, true);
if (rb > 0) {
chunk.append(ptr, rb);
read_len += rb;
} else {
eos_static_err("msg=\"failed to read chunk block\" block_len=%llu",
block_len);
break;
}
} while (read_len < chunk_sz);
// We read less than we expected, malformed chunk request
if (read_len != chunk_sz) {
eos_static_err("msg=\"chunk size less than what we expected\" len=%llu "
"expected=%llu", read_len, chunk_sz);
break;
}
// Read also the line separator CRLF ("\r\n")
if (req.BuffgetData(2, &ptr, true) != 2) {
eos_static_err("%s", "msg=\"failed reading end message for chunk upload\"");
break;
}
if ((*ptr != '\r') || (*(++ptr) != '\n')) {
eos_static_err("%s", "msg=\"chunk upload end message not what we expected\"");
break;
}
}
// Write the chunk to the file. Last chunk with size 0 will trigger the
// close handler
//eos_static_info("msg=\"writing chunk\" len=%llu data=\"%s\"",
// chunk.length(), chunk.c_str());
size_t wb = (size_t) OFS->mHttpd->FileWriter(handler, req.verb, req.resource,
norm_hdrs, query, cookies, chunk);
if (wb) {
eos_static_err("msg=\"failed writing chunk to file\" chunk_sz=%llu",
chunk.length());
break;
}
if (chunk.length() == 0) {
success = true;
break;
}
}
COMMONTIMING("done", &tm);
if (EOS_LOGS_DEBUG) {
tm.Print();
}
return success;
}
//------------------------------------------------------------------------------
// Handle chunk upload operation - optimised version
//------------------------------------------------------------------------------
bool
EosFstHttpHandler::HandleChunkUpload2(XrdHttpExtReq& req,
eos::common::ProtocolHandler* handler,
std::map& norm_hdrs,
std::map& cookies,
std::string& query)
{
enum {CHUNK_SIZE, CHUNK_CLRF1, CHUNK_CLRF2, CHUNK_DATA, ERROR};
int retries = 0;
const int max_retries = 5;
const unsigned long long xrdhttp_sz = 256 * 1024;
const unsigned long long eoshttp_sz = 1024 * 1024;
char* ptr = nullptr, *end_ptr = nullptr;
std::string chunk;
chunk.reserve(eoshttp_sz);
eos::common::Timing tm("ChunkUpload");
COMMONTIMING("START", &tm);
int state = CHUNK_SIZE;
int nread = 0;
int hex_count = 0;
long int chunk_sz = 0;
bool final_chunk = false;
while (true) {
eos_static_info("%s", "msg=\"calling BuffgetData\"");
nread = req.BuffgetData(xrdhttp_sz, &ptr, false);
end_ptr = ptr + nread;
eos_static_info("msg=\"http read\" nread=%li", nread);
if (nread < 0) {
eos_static_err("%s", "msg=\"got a socket error from XrdHttp\"");
state = ERROR;
break;
} else if (nread == 0) {
++retries;
if (retries > max_retries) {
eos_static_err("%s", "msg=\"reached the maximum number of retries\"");
state = ERROR;
break;
} else {
eos_static_warning("msg=\"wait for more data\" retry=%i", retries);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
continue;
}
}
while (end_ptr - ptr != 0) {
switch (state) {
case CHUNK_SIZE:
int v;
if ((v = decode_hex(*ptr)) == -1) {
if (hex_count == 0) {
state = ERROR;
} else {
eos_static_info("msg=\"got chunk size\" chunk_sz=%li", chunk_sz);
state = CHUNK_CLRF1;
}
} else {
chunk_sz = chunk_sz * 16 + v;
++hex_count;
++ptr;
}
break;
case CHUNK_CLRF1:
if (*ptr != '\r') {
state = ERROR;
} else {
state = CHUNK_CLRF2;
++ptr;
}
break;
case CHUNK_CLRF2:
if (*ptr != '\n') {
state = ERROR;
} else {
eos_static_info("%s", "msg=\"done reading CLRF\"");
++ptr;
if (hex_count) {
// Entering after CHUNK_SIZE
hex_count = 0;
state = CHUNK_DATA;
} else {
// Entering after CHUNK_DATA
state = CHUNK_SIZE;
}
}
break;
case CHUNK_DATA:
if (chunk_sz == 0) {
if (final_chunk) {
eos_static_info("%s", "msg=\"done reading final chunk\"");
break;
} else {
// This is the final chunk
final_chunk = true;
state = CHUNK_CLRF1;
eos_static_info("%s", "msg=\"do read final chunk\"");
}
} else if (chunk_sz <= end_ptr - ptr) {
eos_static_info("msg=\"add data to chunk [1]\" sz=%li", chunk_sz);
chunk.append(ptr, chunk_sz);
ptr += chunk_sz;
chunk_sz = 0;
state = CHUNK_CLRF1;
} else {
eos_static_info("msg=\"add data to chunk [2]\" sz=%li", (end_ptr - ptr));
chunk.append(ptr, end_ptr - ptr);
chunk_sz -= (end_ptr - ptr);
ptr = end_ptr;
}
break;
case ERROR:
break;
}
if ((state == ERROR) ||
(final_chunk && (state = CHUNK_DATA))) {
break;
}
}
if (state == ERROR) {
eos_static_err("%s", "msg=\"error state\"");
break;
}
// Write the chunk to the file. Last chunk with size 0 will trigger the
// close handler
if ((final_chunk && (state == CHUNK_DATA)) ||
(chunk.size() >= eoshttp_sz)) {
eos_static_info("msg=\"writing chunk\" len=%llu", chunk.length());
size_t wb = (size_t) OFS->mHttpd->FileWriter(handler, req.verb, req.resource,
norm_hdrs, query, cookies, chunk);
if (wb) {
eos_static_err("msg=\"failed writing chunk to file\" chunk_sz=%llu",
chunk.length());
state = ERROR;
break;
}
chunk.clear();
// For final chunk also trigger write of 0 length which closes the file
if (final_chunk) {
size_t wb = (size_t) OFS->mHttpd->FileWriter(handler, req.verb, req.resource,
norm_hdrs, query, cookies, chunk);
if (wb) {
eos_static_err("msg=\"failed writing chunk to file\" chunk_sz=%llu",
chunk.length());
state = ERROR;
}
break;
}
}
}
COMMONTIMING("done", &tm);
if (EOS_LOGS_DEBUG) {
tm.Print();
}
return (state == CHUNK_DATA);
}