//-----------------------------------------------------------------------
// File: Dispatcher.cc
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* quarkdb - a redis-like highly available key-value store *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "storage/StagingArea.hh"
#include "utils/CommandParsing.hh"
#include "utils/ParseUtils.hh"
#include "redis/Transaction.hh"
#include "redis/ArrayResponseBuilder.hh"
#include "pubsub/Publisher.hh"
#include "StateMachine.hh"
#include "Dispatcher.hh"
#include "Utils.hh"
#include "Formatter.hh"
using namespace quarkdb;
RedisEncodedResponse Dispatcher::handleConversion(RedisRequest &request) {
// Provide simple commands for conversion between binary-string-encoded
// integers and human-readable ASCII representation.
//
// This is strictly for interactive use, to simplify life when debugging and
// dealing with binary-string integers, especially when using the recovery
// tool, or inspecting the contents of the raft journal.
//
// Please don't use this in scripts.. Using QDB as "binary-string conversion"
// service would be really silly.
switch(request.getCommand()) {
case RedisCommand::CONVERT_STRING_TO_INT: {
if(request.size() != 2u) return Formatter::errArgs(request[0]);
if(request[1].size() != 8u) return Formatter::err(SSTR("expected string with 8 characters, was given " << request[1].size() << " instead"));
std::vector reply;
reply.emplace_back(SSTR("Interpreted as int64_t: " << binaryStringToInt(request[1])));
reply.emplace_back(SSTR("Interpreted as uint64_t: " << binaryStringToUnsignedInt(request[1])));
return Formatter::statusVector(reply);
}
case RedisCommand::CONVERT_INT_TO_STRING: {
if(request.size() != 2u) return Formatter::errArgs(request[0]);
int64_t value;
if(!ParseUtils::parseInt64(request[1], value)) {
return Formatter::err("cannot parse integer");
}
std::vector reply;
reply.emplace_back(SSTR("As int64_t: " << intToBinaryString(value)));
reply.emplace_back(SSTR("As uint64_t: " << unsignedIntToBinaryString(value)));
return Formatter::vector(reply);
}
default: {
qdb_throw("internal dispatching error for " << request.toPrintableString());
}
}
}
RedisEncodedResponse Dispatcher::handlePing(RedisRequest &request) {
qdb_assert(request.getCommand() == RedisCommand::PING);
if(request.size() > 2) return Formatter::errArgs(request[0]);
if(request.size() == 1) return Formatter::pong();
return Formatter::string(request[1]);
}
RedisDispatcher::RedisDispatcher(StateMachine &rocksdb, Publisher &pub) : store(rocksdb),
publisher(pub) {
}
LinkStatus RedisDispatcher::dispatch(Connection *conn, Transaction &transaction) {
return conn->raw(dispatch(transaction, 0));
}
LinkStatus RedisDispatcher::dispatch(Connection *conn, RedisRequest &req) {
return conn->raw(dispatch(req, 0));
}
RedisEncodedResponse RedisDispatcher::errArgs(RedisRequest &request) {
return Formatter::errArgs(request[0]);
}
RedisEncodedResponse RedisDispatcher::dispatchingError(RedisRequest &request, LogIndex commit) {
std::string msg = SSTR("internal dispatching error for " << quotes(request[0]));
qdb_critical(msg);
if(commit != 0) qdb_throw("Could not dispatch request " << quotes(request[0]) << " with positive commit index: " << commit);
return Formatter::err(msg);
}
RedisEncodedResponse RedisDispatcher::dispatchReadOnly(StagingArea &stagingArea, Transaction &transaction) {
qdb_assert(!transaction.containsWrites());
ArrayResponseBuilder builder(transaction.size(), transaction.isPhantom());
for(size_t i = 0; i < transaction.size(); i++) {
builder.push_back(dispatchRead(stagingArea, transaction[i]));
}
return builder.buildResponse();
}
RedisEncodedResponse RedisDispatcher::dispatch(StagingArea &stagingArea, Transaction &transaction) {
ArrayResponseBuilder builder(transaction.size(), transaction.isPhantom());
for(size_t i = 0; i < transaction.size(); i++) {
builder.push_back(dispatchReadWrite(stagingArea, transaction[i]));
}
return builder.buildResponse();
}
RedisEncodedResponse RedisDispatcher::dispatch(Transaction &transaction, LogIndex commit) {
StagingArea stagingArea(store, !transaction.containsWrites());
RedisEncodedResponse resp = dispatch(stagingArea, transaction);
if(transaction.containsWrites()) {
stagingArea.commit(commit);
VersionedHashRevisionTracker &revisionTracker = stagingArea.getRevisionTracker();
if(!revisionTracker.empty()) {
publisher.schedulePublishing(std::move(revisionTracker));
}
}
store.getRequestCounter().account(transaction);
return resp;
}
RedisEncodedResponse RedisDispatcher::dispatchLHSET(StagingArea &stagingArea, std::string_view key, std::string_view field, std::string_view hint, std::string_view value) {
bool fieldcreated;
rocksdb::Status st = store.lhset(stagingArea, key, field, hint, value, fieldcreated);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(fieldcreated);
}
RedisEncodedResponse RedisDispatcher::dispatchHDEL(StagingArea &stagingArea, std::string_view key, const ReqIterator &start, const ReqIterator &end) {
int64_t count = 0;
rocksdb::Status st = store.hdel(stagingArea, key, start, end, count);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(count);
}
RedisEncodedResponse RedisDispatcher::dispatchLHDEL(StagingArea &stagingArea, std::string_view key, const ReqIterator &start, const ReqIterator &end) {
int64_t count = 0;
rocksdb::Status st = store.lhdel(stagingArea, key, start, end, count);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(count);
}
RedisEncodedResponse RedisDispatcher::dispatchWrite(StagingArea &stagingArea, RedisRequest &request) {
qdb_assert(request.getCommandType() == CommandType::WRITE);
switch(request.getCommand()) {
case RedisCommand::FLUSHALL: {
if(request.size() != 1) return errArgs(request);
rocksdb::Status st = store.flushall(stagingArea);
return Formatter::fromStatus(st);
}
case RedisCommand::SET: {
if(request.size() != 3) return errArgs(request);
rocksdb::Status st = store.set(stagingArea, request[1], request[2]);
return Formatter::fromStatus(st);
}
case RedisCommand::DEL: {
if(request.size() <= 1) return errArgs(request);
int64_t count = 0;
rocksdb::Status st = store.del(stagingArea, request.begin()+1, request.end(), count);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(count);
}
case RedisCommand::HSET: {
if(request.size() != 4) return errArgs(request);
bool fieldcreated;
rocksdb::Status st = store.hset(stagingArea, request[1], request[2], request[3], fieldcreated);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(fieldcreated);
}
case RedisCommand::HSETNX: {
if(request.size() != 4) return errArgs(request);
bool fieldcreated;
rocksdb::Status st = store.hsetnx(stagingArea, request[1], request[2], request[3], fieldcreated);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(fieldcreated);
}
case RedisCommand::HMSET: {
if(request.size() <= 3 || request.size() % 2 != 0) return Formatter::errArgs(request[0]);
rocksdb::Status st = store.hmset(stagingArea, request[1], request.begin()+2, request.end());
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::ok();
}
case RedisCommand::HINCRBY: {
if(request.size() != 4) return errArgs(request);
int64_t ret = 0;
rocksdb::Status st = store.hincrby(stagingArea, request[1], request[2], request[3], ret);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(ret);
}
case RedisCommand::HINCRBYMULTI: {
if(request.size() < 4 || ( ((request.size()-1) % 3)) != 0) return errArgs(request);
size_t index = 1;
int64_t ret = 0;
while(index < request.size()) {
int64_t tmpret = 0;
store.hincrby(stagingArea, request[index], request[index+1], request[index+2], tmpret);
ret += tmpret;
index += 3;
}
return Formatter::integer(ret);
}
case RedisCommand::HINCRBYFLOAT: {
if(request.size() != 4) return errArgs(request);
double ret = 0;
rocksdb::Status st = store.hincrbyfloat(stagingArea, request[1], request[2], request[3], ret);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::string(std::to_string(ret));
}
case RedisCommand::HDEL: {
if(request.size() <= 2) return errArgs(request);
return dispatchHDEL(stagingArea, request[1], request.begin()+2, request.end());
}
case RedisCommand::HCLONE: {
if(request.size() != 3) return errArgs(request);
rocksdb::Status st = store.hclone(stagingArea, request[1], request[2]);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::ok();
}
case RedisCommand::SADD: {
if(request.size() <= 2) return errArgs(request);
int64_t count = 0;
rocksdb::Status st = store.sadd(stagingArea, request[1], request.begin()+2, request.end(), count);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(count);
}
case RedisCommand::SREM: {
if(request.size() <= 2) return errArgs(request);
int64_t count = 0;
rocksdb::Status st = store.srem(stagingArea, request[1], request.begin()+2, request.end(), count);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(count);
}
case RedisCommand::SMOVE: {
if(request.size() != 4) return errArgs(request);
int64_t count = 0;
rocksdb::Status st = store.smove(stagingArea, request[1], request[2], request[3], count);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(count);
}
case RedisCommand::DEQUE_PUSH_FRONT: {
if(request.size() < 3) return errArgs(request);
int64_t length;
rocksdb::Status st = store.dequePushFront(stagingArea, request[1], request.begin()+2, request.end(), length);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(length);
}
case RedisCommand::DEQUE_PUSH_BACK: {
if(request.size() < 3) return errArgs(request);
int64_t length;
rocksdb::Status st = store.dequePushBack(stagingArea, request[1], request.begin()+2, request.end(), length);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(length);
}
case RedisCommand::DEQUE_POP_FRONT: {
if(request.size() != 2) return errArgs(request);
std::string item;
rocksdb::Status st = store.dequePopFront(stagingArea, request[1], item);
if(st.IsNotFound()) return Formatter::null();
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::string(item);
}
case RedisCommand::DEQUE_POP_BACK: {
if(request.size() != 2) return errArgs(request);
std::string item;
rocksdb::Status st = store.dequePopBack(stagingArea, request[1], item);
if(st.IsNotFound()) return Formatter::null();
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::string(item);
}
case RedisCommand::DEQUE_TRIM_FRONT: {
if(request.size() != 3) return errArgs(request);
int64_t itemsRemoved;
rocksdb::Status st = store.dequeTrimFront(stagingArea, request[1], request[2], itemsRemoved);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(itemsRemoved);
}
case RedisCommand::DEQUE_CLEAR: {
if(request.size() != 2) return errArgs(request);
int64_t itemsRemoved;
rocksdb::Status st = store.dequeTrimFront(stagingArea, request[1], "0", itemsRemoved);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(itemsRemoved);
}
case RedisCommand::CONFIG_SET: {
if(request.size() != 3) return errArgs(request);
rocksdb::Status st = store.configSet(stagingArea, request[1], request[2]);
return Formatter::fromStatus(st);
}
case RedisCommand::LHSET: {
if(request.size() != 5) return errArgs(request);
return dispatchLHSET(stagingArea, request[1], request[2], request[3], request[4]);
}
case RedisCommand::LHDEL: {
if(request.size() <= 2) return errArgs(request);
return dispatchLHDEL(stagingArea, request[1], request.begin()+2, request.end());
}
case RedisCommand::LHLOCDEL: {
if(request.size() != 4) return errArgs(request);
int64_t itemsRemoved;
rocksdb::Status st = store.lhlocdel(stagingArea, request[1], request[2], request[3], itemsRemoved);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(itemsRemoved);
}
case RedisCommand::LHMSET: {
if(request.size() <= 4 || (request.size()-2) % 3 != 0) return Formatter::errArgs(request[0]);
rocksdb::Status st = store.lhmset(stagingArea, request[1], request.begin()+2, request.end());
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::ok();
}
case RedisCommand::LHSET_AND_DEL_FALLBACK: {
if(request.size() != 6) return errArgs(request);
RedisEncodedResponse resp = dispatchLHSET(stagingArea, request[1], request[2], request[3], request[4]);
dispatchHDEL(stagingArea, request[5], request.begin()+2, request.begin()+3);
return resp;
}
case RedisCommand::LHDEL_WITH_FALLBACK: {
if(request.size() != 4) return errArgs(request);
RedisEncodedResponse resp = dispatchLHDEL(stagingArea, request[1], request.begin()+2, request.begin()+3);
if(resp.val != Formatter::integer(1).val) {
return dispatchHDEL(stagingArea, request[3], request.begin()+2, request.begin()+3);
}
return resp;
}
case RedisCommand::CONVERT_HASH_FIELD_TO_LHASH: {
if(request.size() != 6) return errArgs(request);
std::string shouldBeEmpty;
rocksdb::Status st = store.lhget(stagingArea, request[3], request[4], request[5], shouldBeEmpty);
if(st.ok()) return Formatter::err("Destination field already exists!");
std::string value;
st = store.hget(stagingArea, request[1], request[2], value);
if(!st.ok()) return Formatter::fromStatus(st);
bool fieldCreated = false;
st = store.lhset(stagingArea, request[3], request[4], request[5], value, fieldCreated);
if(!st.ok()) return Formatter::fromStatus(st);
if(!fieldCreated) qdb_throw("should never happen");
int64_t removed = 0;
st = store.hdel(stagingArea, request[1], request.begin()+2, request.begin()+3, removed);
if(!st.ok()) qdb_throw("should never happen");
qdb_assert(removed == 1);
return Formatter::ok();
}
case RedisCommand::TIMESTAMPED_LEASE_ACQUIRE: {
if(request.size() != 5) return Formatter::errArgs("lease_acquire");
int64_t duration = 0;
if(!ParseUtils::parseInt64(request[3], duration) || duration < 1) {
return Formatter::err("value is not an integer or out of range");
}
qdb_assert(request[4].size() == 8u);
ClockValue timestamp = binaryStringToUnsignedInt(request[4]);
LeaseInfo leaseInfo;
LeaseAcquisitionStatus status = store.lease_acquire(stagingArea, request[1], request[2], timestamp, duration, leaseInfo);
if(status == LeaseAcquisitionStatus::kKeyTypeMismatch) {
return Formatter::err("Invalid Argument: WRONGTYPE Operation against a key holding the wrong kind of value");
}
else if(status == LeaseAcquisitionStatus::kAcquired) {
return Formatter::status("ACQUIRED");
}
else if(status == LeaseAcquisitionStatus::kRenewed) {
return Formatter::status("RENEWED");
}
else {
qdb_assert(status == LeaseAcquisitionStatus::kFailedDueToOtherOwner);
return Formatter::err(SSTR("lease held by '" << leaseInfo.getValue() << "', time remaining " << leaseInfo.getDeadline() - timestamp << " ms"));
}
}
case RedisCommand::TIMESTAMPED_LEASE_GET: {
if(request.size() != 3) return Formatter::errArgs("lease_get");
qdb_assert(request[2].size() == 8u);
ClockValue timestamp = binaryStringToUnsignedInt(request[2]);
LeaseInfo leaseInfo;
rocksdb::Status st = store.lease_get(stagingArea, request[1], timestamp, leaseInfo);
if(st.IsNotFound()) {
return Formatter::null();
}
qdb_assert(st.ok());
std::vector reply;
reply.emplace_back(SSTR("HOLDER: " << leaseInfo.getValue()));
reply.emplace_back(SSTR("REMAINING: " << leaseInfo.getDeadline() - timestamp << " ms"));
return Formatter::statusVector(reply);
}
case RedisCommand::TIMESTAMPED_LEASE_RELEASE: {
if(request.size() != 3) return Formatter::errArgs("lease_release");
qdb_assert(request[2].size() == 8u);
ClockValue timestamp = binaryStringToUnsignedInt(request[2]);
rocksdb::Status st = store.lease_release(stagingArea, request[1], timestamp);
if(st.IsNotFound()) {
return Formatter::null();
}
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::ok();
}
case RedisCommand::ARTIFICIALLY_SLOW_WRITE_NEVER_USE_THIS: {
if(request.size() != 2) return errArgs(request);
rocksdb::Status st = store.artificiallySlowWriteNeverUseThis(stagingArea, request[1]);
return Formatter::fromStatus(st);
}
case RedisCommand::VHSET: {
if(request.size() != 4) return errArgs(request);
uint64_t version;
rocksdb::Status st = store.vhset(stagingArea, request[1], request[2], request[3], version);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(version);
}
case RedisCommand::VHDEL: {
if(request.size() <= 2) return errArgs(request);
uint64_t version = 0;
rocksdb::Status st = store.vhdel(stagingArea, request[1], request.begin()+2, request.end(), version);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(version);
}
case RedisCommand::TX_READWRITE: {
// Unpack transaction and process
Transaction transaction;
qdb_assert(transaction.deserialize(request));
return dispatch(stagingArea, transaction);
}
default: {
qdb_throw("internal dispatching error in RedisDispatcher for " << request);
}
}
}
RedisEncodedResponse RedisDispatcher::dispatchHGET(StagingArea &stagingArea, std::string_view key, std::string_view field) {
std::string value;
rocksdb::Status st = store.hget(stagingArea, key, field, value);
if(st.IsNotFound()) return Formatter::null();
else if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::string(value);
}
RedisEncodedResponse RedisDispatcher::dispatchLHGET(StagingArea &stagingArea, std::string_view key, std::string_view field, std::string_view hint) {
std::string value;
rocksdb::Status st = store.lhget(stagingArea, key, field, hint, value);
if(st.IsNotFound()) return Formatter::null();
else if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::string(value);
}
RedisEncodedResponse RedisDispatcher::dispatchRead(StagingArea &stagingArea, RedisRequest &request) {
switch(request.getCommand()) {
case RedisCommand::TYPE: {
if(request.size() != 2) return errArgs(request);
std::string retval;
store.getType(stagingArea, request[1], retval);
return Formatter::status(retval);
}
case RedisCommand::GET: {
if(request.size() != 2) return errArgs(request);
std::string value;
rocksdb::Status st = store.get(stagingArea, request[1], value);
if(st.IsNotFound()) return Formatter::null();
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::string(value);
}
case RedisCommand::EXISTS: {
if(request.size() <= 1) return errArgs(request);
int64_t count = 0;
rocksdb::Status st = store.exists(stagingArea, request.begin()+1, request.end(), count);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(count);
}
case RedisCommand::KEYS: {
if(request.size() != 2) return errArgs(request);
std::vector ret;
rocksdb::Status st = store.keys(stagingArea, request[1], ret);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::vector(ret);
}
case RedisCommand::SCAN: {
if(request.size() < 2) return errArgs(request);
ScanCommandArguments args = parseScanCommand(request.begin()+1, request.end(), true);
if(!args.error.empty()) {
return Formatter::err(args.error);
}
std::string newcursor;
std::vector vec;
rocksdb::Status st = store.scan(stagingArea, args.cursor, args.match, args.count, newcursor, vec);
if(!st.ok()) return Formatter::fromStatus(st);
if(newcursor == "") newcursor = "0";
else newcursor = "next:" + newcursor;
return Formatter::scan(newcursor, vec);
}
case RedisCommand::HGET: {
if(request.size() != 3) return errArgs(request);
return dispatchHGET(stagingArea, request[1], request[2]);
}
case RedisCommand::HEXISTS: {
if(request.size() != 3) return errArgs(request);
rocksdb::Status st = store.hexists(stagingArea, request[1], request[2]);
if(st.ok()) return Formatter::integer(1);
if(st.IsNotFound()) return Formatter::integer(0);
return Formatter::fromStatus(st);
}
case RedisCommand::HKEYS: {
if(request.size() != 2) return errArgs(request);
std::vector keys;
rocksdb::Status st = store.hkeys(stagingArea, request[1], keys);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::vector(keys);
}
case RedisCommand::HGETALL: {
if(request.size() != 2) return errArgs(request);
std::vector vec;
rocksdb::Status st = store.hgetall(stagingArea, request[1], vec);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::vector(vec);
}
case RedisCommand::HLEN: {
if(request.size() != 2) return errArgs(request);
size_t len;
rocksdb::Status st = store.hlen(stagingArea, request[1], len);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(len);
}
case RedisCommand::HVALS: {
if(request.size() != 2) return errArgs(request);
std::vector values;
rocksdb::Status st = store.hvals(stagingArea, request[1], values);
return Formatter::vector(values);
}
case RedisCommand::HSCAN: {
if(request.size() < 3) return errArgs(request);
ScanCommandArguments args = parseScanCommand(request.begin()+2, request.end(), false);
if(!args.error.empty()) {
return Formatter::err(args.error);
}
std::string newcursor;
std::vector vec;
rocksdb::Status st = store.hscan(stagingArea, request[1], args.cursor, args.count, newcursor, vec);
if(!st.ok()) return Formatter::fromStatus(st);
if(newcursor == "") newcursor = "0";
else newcursor = "next:" + newcursor;
return Formatter::scan(newcursor, vec);
}
case RedisCommand::SISMEMBER: {
if(request.size() != 3) return errArgs(request);
rocksdb::Status st = store.sismember(stagingArea, request[1], request[2]);
if(st.ok()) return Formatter::integer(1);
if(st.IsNotFound()) return Formatter::integer(0);
return Formatter::fromStatus(st);
}
case RedisCommand::SMEMBERS: {
if(request.size() != 2) return errArgs(request);
std::vector members;
rocksdb::Status st = store.smembers(stagingArea, request[1], members);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::vector(members);
}
case RedisCommand::SCARD: {
if(request.size() != 2) return errArgs(request);
size_t count;
rocksdb::Status st = store.scard(stagingArea, request[1], count);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(count);
}
case RedisCommand::SSCAN: {
if(request.size() < 3) return errArgs(request);
ScanCommandArguments args = parseScanCommand(request.begin()+2, request.end(), false);
if(!args.error.empty()) {
return Formatter::err(args.error);
}
std::string newcursor;
std::vector vec;
rocksdb::Status st = store.sscan(stagingArea, request[1], args.cursor, args.count, newcursor, vec);
if(!st.ok()) return Formatter::fromStatus(st);
if(newcursor == "") newcursor = "0";
else newcursor = "next:" + newcursor;
return Formatter::scan(newcursor, vec);
}
case RedisCommand::DEQUE_LEN: {
if(request.size() != 2) return errArgs(request);
size_t len;
rocksdb::Status st = store.dequeLen(stagingArea, request[1], len);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(len);
}
case RedisCommand::DEQUE_SCAN_BACK: {
if(request.size() < 3) return errArgs(request);
ScanCommandArguments args = parseScanCommand(request.begin()+2, request.end(), false);
if(!args.error.empty()) {
return Formatter::err(args.error);
}
std::string newcursor;
std::vector vec;
rocksdb::Status st = store.dequeScanBack(stagingArea, request[1], args.cursor, args.count, newcursor, vec);
if(!st.ok()) return Formatter::fromStatus(st);
if(newcursor == "") newcursor = "0";
else newcursor = "next:" + newcursor;
return Formatter::scan(newcursor, vec);
}
case RedisCommand::CONFIG_GET: {
if(request.size() != 2) return errArgs(request);
std::string value;
rocksdb::Status st = store.configGet(stagingArea, request[1], value);
if(st.IsNotFound()) return Formatter::null();
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::string(value);
}
case RedisCommand::CONFIG_GETALL: {
if(request.size() != 1) return errArgs(request);
std::vector ret;
rocksdb::Status st = store.configGetall(stagingArea, ret);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::vector(ret);
}
case RedisCommand::LHGET: {
if(request.size() == 3) {
return dispatchLHGET(stagingArea, request[1], request[2], "");
}
if(request.size() == 4) {
return dispatchLHGET(stagingArea, request[1], request[2], request[3]);
}
return errArgs(request);
}
case RedisCommand::LHSCAN: {
if(request.size() < 3) return errArgs(request);
ScanCommandArguments args = parseScanCommand(request.begin()+2, request.end(), false, true);
if(!args.error.empty()) {
return Formatter::err(args.error);
}
std::string newcursor;
std::vector vec;
rocksdb::Status st = store.lhscan(stagingArea, request[1], args.cursor, args.matchloc, args.count, newcursor, vec);
if(!st.ok()) return Formatter::fromStatus(st);
if(newcursor == "") newcursor = "0";
else newcursor = "next:" + newcursor;
return Formatter::scan(newcursor, vec);
}
case RedisCommand::LHGET_WITH_FALLBACK: {
// First, try LHGET...
RedisEncodedResponse resp;
if(request.size() == 4) {
resp = dispatchLHGET(stagingArea, request[1], request[2], "");
}
else if(request.size() == 5) {
resp = dispatchLHGET(stagingArea, request[1], request[2], request[3]);
}
// Did we succeed?
if(resp.val != Formatter::null().val) return resp;
// Nope, fallback. Look for the same field, but in the hash specified as
// last argument.
return dispatchHGET(stagingArea, request[request.size()-1], request[2]);
}
case RedisCommand::LHLEN: {
if(request.size() != 2) return errArgs(request);
size_t len;
rocksdb::Status st = store.lhlen(stagingArea, request[1], len);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(len);
}
case RedisCommand::RAW_SCAN: {
case RedisCommand::RAW_SCAN_TOMBSTONES:
bool withTombstones = request.getCommand() == RedisCommand::RAW_SCAN_TOMBSTONES;
if(request.size() != 1 && request.size() != 2 && request.size() != 4) return errArgs(request);
std::string_view cursor;
if(request.size() > 1) {
cursor = request[1];
}
rocksdb::Status st;
std::vector results;
if(request.size() == 1 || request.size() == 2) {
st = store.rawScanMaybeTombstones(stagingArea, cursor, 50, results, withTombstones);
}
else {
if(!caseInsensitiveEquals(request[2], "count")) {
return Formatter::err("syntax error");
}
int64_t count;
if(!ParseUtils::parseInt64(request[3], count) || count <= 0) {
return Formatter::err("syntax error");
}
st = store.rawScanMaybeTombstones(stagingArea, cursor, count, results, withTombstones);
}
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::vector(results);
}
case RedisCommand::RAW_GET_ALL_VERSIONS: {
if(request.size() != 2) return errArgs(request);
std::vector versions;
rocksdb::Status st = store.rawGetAllVersions(request[1], versions);
if(!st.ok()) return Formatter::fromStatus(st);
std::vector reply;
for(const rocksdb::KeyVersion& ver : versions) {
reply.emplace_back(SSTR("KEY: " << ver.user_key));
reply.emplace_back(SSTR("VALUE: " << ver.value));
reply.emplace_back(SSTR("SEQUENCE: " << ver.sequence));
reply.emplace_back(SSTR("TYPE: " << ver.type));
}
return Formatter::vector(reply);
}
case RedisCommand::CLOCK_GET: {
if(request.size() != 1) return errArgs(request);
std::vector reply;
ClockValue staticClock;
store.getClock(stagingArea, staticClock);
ClockValue dynamicClock = store.getDynamicClock();
reply.emplace_back(SSTR("STATIC-CLOCK: " << staticClock));
reply.emplace_back(SSTR("DYNAMIC-CLOCK: " << dynamicClock));
return Formatter::statusVector(reply);
}
case RedisCommand::LEASE_GET_PENDING_EXPIRATION_EVENTS: {
if(request.size() != 1) return errArgs(request);
std::vector events;
ClockValue staticClock, dynamicClock;
store.lease_get_pending_expiration_events(stagingArea, staticClock, dynamicClock, events);
std::vector reply;
reply.emplace_back(SSTR("STATIC-CLOCK: " << staticClock));
reply.emplace_back(SSTR("DYNAMIC-CLOCK: " << dynamicClock));
for(auto it = events.begin(); it != events.end(); it++) {
reply.emplace_back(SSTR(it->deadline << ": " << it->key));
}
return Formatter::vector(reply);
}
case RedisCommand::VHKEYS: {
if(request.size() != 2) return errArgs(request);
std::vector vec;
uint64_t version = 0u;
rocksdb::Status st = store.vhkeys(stagingArea, request[1], vec, version);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::versionedVector(version, vec);
}
case RedisCommand::VHGETALL: {
if(request.size() != 2) return errArgs(request);
std::vector vec;
uint64_t version = 0u;
rocksdb::Status st = store.vhgetall(stagingArea, request[1], vec, version);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::versionedVector(version, vec);
}
case RedisCommand::VHLEN: {
if(request.size() != 2) return errArgs(request);
size_t len;
rocksdb::Status st = store.vhlen(stagingArea, request[1], len);
if(!st.ok()) return Formatter::fromStatus(st);
return Formatter::integer(len);
}
case RedisCommand::TX_READONLY: {
// Unpack transaction and process
Transaction transaction;
qdb_assert(transaction.deserialize(request));
return dispatchReadOnly(stagingArea, transaction);
}
default: {
return dispatchingError(request, 0);
}
}
}
RedisEncodedResponse RedisDispatcher::dispatch(RedisRequest &request, LogIndex commit) {
if(request.getCommand() == RedisCommand::INVALID) {
if(StringUtils::startsWith(request[0], "JOURNAL_")) {
if(request[0] == "JOURNAL_LEADERSHIP_MARKER") {
// Hard-synchronize our dynamic clock to the static one. The dynamic
// clock is only used in leaders to timestamp incoming lease requests.
// So, strictly speaking, synchronizing the clock is only necessary for
// leader nodes, but it's so cheap to do that we don't care. Let's
// synchronize all nodes.
store.hardSynchronizeDynamicClock();
}
store.noop(commit);
return Formatter::ok();
}
qdb_assert(commit == 0);
return Formatter::err(SSTR("unknown command " << quotes(request[0])));
}
if(commit > 0 && request.getCommandType() != CommandType::WRITE) {
qdb_throw("attempted to dispatch non-write command '" << request[0] << "' with a positive commit index: " << commit);
}
if(request.getCommand() == RedisCommand::PING) {
return handlePing(request);
}
if(request.getCommandType() != CommandType::READ && request.getCommandType() != CommandType::WRITE) {
return dispatchingError(request, commit);
}
return dispatchReadWriteAndCommit(request, commit);
}
RedisEncodedResponse RedisDispatcher::dispatchReadWrite(StagingArea &stagingArea, RedisRequest &request) {
if(request.getCommandType() == CommandType::WRITE) {
return dispatchWrite(stagingArea, request);
}
return dispatchRead(stagingArea, request);
}
RedisEncodedResponse RedisDispatcher::dispatchReadWriteAndCommit(RedisRequest &request, LogIndex commit) {
StagingArea stagingArea(store, request.getCommandType() == CommandType::READ);
RedisEncodedResponse response = dispatchReadWrite(stagingArea, request);
// Handle writes in a separate function, use batch write API
if(request.getCommandType() == CommandType::WRITE) {
stagingArea.commit(commit);
}
store.getRequestCounter().account(request);
return response;
}