//-----------------------------------------------------------------------
// File: Formatter.cc
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* quarkdb - a redis-like highly available key-value store *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "RedisRequest.hh"
#include "raft/RaftCommon.hh"
#include "Common.hh"
#include "Formatter.hh"
#include "redis/ArrayResponseBuilder.hh"
#include "redis/Transaction.hh"
#include "utils/Statistics.hh"
using namespace quarkdb;
RedisEncodedResponse Formatter::moved(int64_t shardId, const RaftServer &location) {
return RedisEncodedResponse(SSTR("-MOVED " << shardId << " " << location.toString() << "\r\n"));
}
RedisEncodedResponse Formatter::err(std::string_view err) {
return RedisEncodedResponse(SSTR("-ERR " << err << "\r\n"));
}
RedisEncodedResponse Formatter::errArgs(std::string_view cmd) {
qdb_warn("Received malformed " << quotes(cmd) << " command - wrong number of arguments");
return RedisEncodedResponse(SSTR("-ERR wrong number of arguments for '" << cmd << "' command\r\n"));
}
RedisEncodedResponse Formatter::pong() {
return RedisEncodedResponse(SSTR("+PONG\r\n"));
}
void Formatter::string(std::ostringstream &ss, std::string_view str) {
ss << "$" << str.length() << "\r\n" << str << "\r\n";
}
RedisEncodedResponse Formatter::string(std::string_view str) {
std::ostringstream ss;
Formatter::string(ss, str);
return RedisEncodedResponse(ss.str());
}
void Formatter::status(std::ostringstream &ss, std::string_view str) {
ss << "+" << str << "\r\n";
}
RedisEncodedResponse Formatter::status(std::string_view str) {
std::ostringstream ss;
status(ss, str);
return RedisEncodedResponse(ss.str());
}
RedisEncodedResponse Formatter::ok() {
return RedisEncodedResponse("+OK\r\n");
}
RedisEncodedResponse Formatter::null() {
return RedisEncodedResponse("$-1\r\n");
}
void Formatter::integer(std::ostringstream &ss, int64_t number) {
ss << ":" << number << "\r\n";
}
void Formatter::uint64(std::ostringstream &ss, uint64_t number) {
ss << ":" << number << "\r\n";
}
RedisEncodedResponse Formatter::integer(int64_t number) {
std::ostringstream ss;
integer(ss, number);
return RedisEncodedResponse(ss.str());
}
RedisEncodedResponse Formatter::fromStatus(const rocksdb::Status &status) {
if(status.ok()) return Formatter::ok();
return Formatter::err(status.ToString());
}
RedisEncodedResponse Formatter::vector(const std::vector &vec) {
std::stringstream ss;
ss << "*" << vec.size() << "\r\n";
for(std::vector::const_iterator it = vec.begin(); it != vec.end(); it++) {
ss << "$" << it->length() << "\r\n";
ss << *it << "\r\n";
}
return RedisEncodedResponse(ss.str());
}
void Formatter::statusVector(std::ostringstream &ss, const std::vector &vec) {
ss << "*" << vec.size() << "\r\n";
for(std::vector::const_iterator it = vec.begin(); it != vec.end(); it++) {
ss << "+" << *it << "\r\n";
}
}
RedisEncodedResponse Formatter::statusVector(const std::vector &vec) {
std::ostringstream ss;
statusVector(ss, vec);
return RedisEncodedResponse(ss.str());
}
RedisEncodedResponse Formatter::scan(std::string_view marker, const std::vector &vec) {
std::stringstream ss;
ss << "*2\r\n";
ss << "$" << marker.length() << "\r\n";
ss << marker << "\r\n";
ss << "*" << vec.size() << "\r\n";
for(std::vector::const_iterator it = vec.begin(); it != vec.end(); it++) {
ss << "$" << it->length() << "\r\n";
ss << *it << "\r\n";
}
return RedisEncodedResponse(ss.str());
}
RedisEncodedResponse Formatter::simpleRedisRequest(const RedisRequest &req) {
std::vector vec;
for(size_t i = 0; i < req.size(); i++) {
vec.emplace_back(req[i]);
}
return Formatter::vector(vec);
}
RedisEncodedResponse Formatter::redisRequest(const RedisRequest &req) {
if(req.getCommand() == RedisCommand::TX_READWRITE || req.getCommand() == RedisCommand::TX_READONLY) {
Transaction transaction;
transaction.deserialize(req[1]);
ArrayResponseBuilder builder(transaction.size() + 1);
builder.push_back(Formatter::string(req[0]));
for(size_t i = 0; i < transaction.size(); i++) {
builder.push_back(simpleRedisRequest(transaction[i]));
}
return builder.buildResponse();
}
// Simple case, no transactions.
return simpleRedisRequest(req);
}
RedisEncodedResponse Formatter::raftEntry(const RaftEntry &entry, bool raw, LogIndex idx) {
// Very inefficient with copying, but this function is only to help
// debugging, so we don't really mind.
bool hasIndex = (idx != -1);
ArrayResponseBuilder builder(2 + hasIndex);
if(idx != -1) {
builder.push_back(Formatter::string(SSTR("INDEX: " << idx)));
}
builder.push_back(Formatter::string(SSTR("TERM: " << entry.term)));
if(raw) {
builder.push_back(simpleRedisRequest(entry.request));
}
else {
builder.push_back(redisRequest(entry.request));
}
return builder.buildResponse();
}
RedisEncodedResponse Formatter::raftEntries(const std::vector &entries, bool raw) {
std::stringstream ss;
ss << "*" << entries.size() << "\r\n";
for(size_t i = 0; i < entries.size(); i++) {
ss << Formatter::raftEntry(entries[i], raw).val;
}
return RedisEncodedResponse(ss.str());
}
RedisEncodedResponse Formatter::journalScan(LogIndex cursor, const std::vector &entries) {
std::string marker;
if(cursor == 0) {
marker = "0";
}
else {
marker = SSTR("next:" << cursor);
}
std::stringstream ss;
ss << "*2\r\n";
ss << "$" << marker.length() << "\r\n";
ss << marker << "\r\n";
ss << "*" << entries.size() << "\r\n";
for(size_t i = 0; i < entries.size(); i++) {
ss << Formatter::raftEntry(entries[i].entry, false, entries[i].index).val;
}
return RedisEncodedResponse(ss.str());
}
RedisEncodedResponse Formatter::noauth(std::string_view str) {
return RedisEncodedResponse(SSTR("-NOAUTH " << str << "\r\n"));
}
RedisEncodedResponse Formatter::versionedVector(uint64_t num, const std::vector &vec) {
std::ostringstream ss;
ss << "*2\r\n";
ss << ":" << num << "\r\n";
ss << "*" << vec.size() << "\r\n";
for(auto it = vec.begin(); it != vec.end(); it++) {
ss << "$" << it->length() << "\r\n";
ss << *it << "\r\n";
}
return RedisEncodedResponse(ss.str());
}
RedisEncodedResponse Formatter::vhashRevision(uint64_t rev, const std::vector> &contents) {
std::ostringstream ss;
ss << "*2\r\n";
Formatter::uint64(ss, rev);
ss << "*" << contents.size()*2 << "\r\n";
for(size_t i = 0; i < contents.size(); i++) {
Formatter::string(ss, contents[i].first);
Formatter::string(ss, contents[i].second);
}
return RedisEncodedResponse(ss.str());
}
RedisEncodedResponse Formatter::multiply(const RedisEncodedResponse &resp, size_t factor) {
qdb_assert(factor >= 1);
std::ostringstream ss;
for(size_t i = 0; i < factor; i++) {
ss << resp.val;
}
return RedisEncodedResponse(ss.str());
}
//------------------------------------------------------------------------------
// Produce a vector of vectors, where each vector has its own header. No binary
// data, only text is safe.
//
// 1) 1) SECTION 1
// 2) 1) one
// 2) two
// 3) three
// 2) 1) SECTION 2
// 2) 1) four
// 2) five
// 3) six
//------------------------------------------------------------------------------
RedisEncodedResponse Formatter::vectorsWithHeaders(const std::vector &headers,
const std::vector> &data) {
qdb_assert(headers.size() == data.size());
std::ostringstream ss;
ss << "*" << headers.size() << "\r\n";
for(size_t i = 0; i < headers.size(); i++) {
ss << "*2\r\n";
ss << "+" << headers[i] << "\r\n";
ss << "*" << data[i].size() << "\r\n";
for(size_t j = 0; j < data[i].size(); j++) {
ss << "+" << data[i][j] << "\r\n";
}
}
return RedisEncodedResponse(ss.str());
}
RedisEncodedResponse Formatter::stats(const Statistics &stats) {
std::vector arr;
arr.emplace_back(SSTR("TOTAL-READS " << stats.reads));
arr.emplace_back(SSTR("TOTAL-WRITES " << stats.writes));
arr.emplace_back(SSTR("TOTAL-TXREAD " << stats.txread));
arr.emplace_back(SSTR("TOTAL-TXREADWRITE " << stats.txreadwrite));
return statusVector(arr);
}
RedisEncodedResponse Formatter::subscribe(bool pushType, std::string_view channel, size_t active) {
if(pushType) {
return pushStrstrstrint("pubsub", "subscribe", channel, active);
}
return strstrint("subscribe", channel, active);
}
RedisEncodedResponse Formatter::psubscribe(bool pushType, std::string_view pattern, size_t active) {
if(pushType) {
return pushStrstrstrint("pubsub", "psubscribe", pattern, active);
}
return strstrint("psubscribe", pattern, active);
}
RedisEncodedResponse Formatter::unsubscribe(bool pushType, std::string_view channel, size_t active) {
if(pushType) {
return pushStrstrstrint("pubsub", "unsubscribe", channel, active);
}
return strstrint("unsubscribe", channel, active);
}
RedisEncodedResponse Formatter::punsubscribe(bool pushType, std::string_view pattern, size_t active) {
if(pushType) {
return pushStrstrstrint("pubsub", "punsubscribe", pattern, active);
}
return strstrint("punsubscribe", pattern, active);
}
RedisEncodedResponse Formatter::message(bool pushType, std::string_view channel, std::string_view payload) {
std::ostringstream ss;
if(pushType) {
ss << ">4\r\n" "$6\r\npubsub\r\n";
}
else {
ss << "*3\r\n";
}
ss << "$7\r\nmessage\r\n";
ss << "$" << channel.size() << "\r\n";
ss << channel << "\r\n";
ss << "$" << payload.size() << "\r\n";
ss << payload << "\r\n";
return RedisEncodedResponse(ss.str());
}
RedisEncodedResponse Formatter::pmessage(bool pushType, std::string_view pattern, std::string_view channel, std::string_view payload) {
std::ostringstream ss;
if(pushType) {
ss << ">5\r\n" "$6\r\npubsub\r\n";
}
else {
ss << "*4\r\n";
}
ss << "$8\r\npmessage\r\n";
ss << "$" << pattern.size() << "\r\n";
ss << pattern << "\r\n";
ss << "$" << channel.size() << "\r\n";
ss << channel << "\r\n";
ss << "$" << payload.size() << "\r\n";
ss << payload << "\r\n";
return RedisEncodedResponse(ss.str());
}
RedisEncodedResponse Formatter::strstrint(std::string_view str1, std::string_view str2, int num) {
std::ostringstream ss;
ss << "*3\r\n";
Formatter::string(ss, str1);
Formatter::string(ss, str2);
Formatter::integer(ss, num);
return RedisEncodedResponse(ss.str());
}
RedisEncodedResponse Formatter::pushStrstrstrint(std::string_view str1, std::string_view str2, std::string_view str3, int num) {
std::ostringstream ss;
ss << ">4\r\n";
Formatter::string(ss, str1);
Formatter::string(ss, str2);
Formatter::string(ss, str3);
Formatter::integer(ss, num);
return RedisEncodedResponse(ss.str());
}
RedisEncodedResponse Formatter::nodeHealth(const NodeHealth &nh) {
std::vector output;
output.emplace_back(SSTR("NODE-HEALTH " << healthStatusAsString(chooseWorstHealth(nh.getIndicators()))));
if(!nh.getNode().empty()) {
output.emplace_back(SSTR("NODE " << nh.getNode()));
}
output.emplace_back(SSTR("VERSION " << nh.getVersion()));
output.emplace_back("----------");
std::vector indicators = healthIndicatorsAsStrings(nh.getIndicators());
output.insert(output.end(), indicators.begin(), indicators.end());
return statusVector(output);
}