// ----------------------------------------------------------------------
// File: bulkload.cc
// Author: Georgios Bitzes - CERN
// ----------------------------------------------------------------------
/************************************************************************
* quarkdb - a redis-like highly available key-value store *
* Copyright (C) 2016 CERN/Switzerland *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation, either version 3 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program. If not, see .*
************************************************************************/
#include "raft/RaftDispatcher.hh"
#include "raft/RaftReplicator.hh"
#include "raft/RaftTimeouts.hh"
#include "raft/RaftCommitTracker.hh"
#include "raft/RaftReplicator.hh"
#include "ShardDirectory.hh"
#include "Configuration.hh"
#include "QuarkDBNode.hh"
#include "../test-utils.hh"
#include "RedisParser.hh"
#include
#include
#include "utils/AssistedThread.hh"
#include "../test-reply-macros.hh"
#define ASSERT_OK(msg) ASSERT_TRUE(msg.ok())
using namespace quarkdb;
TEST(BulkLoad, BasicSanity) {
if (system("rm -rf /tmp/quarkdb-bulkload-test")) {
FAIL() << "Failed to remove directory /tmp/quarkdb-bulkload-test";
}
{
StateMachine stateMachine("/tmp/quarkdb-bulkload-test", false, true);
for(size_t i = 0; i < 100; i++) {
bool created;
ASSERT_OK(stateMachine.hset("some-key", SSTR("field-" << i), "value", created));
ASSERT_TRUE(created);
ASSERT_OK(stateMachine.hset(SSTR("some-key-" << i), "field", "value", created));
ASSERT_TRUE(created);
ASSERT_OK(stateMachine.hset(SSTR("some-key-" << i), "field", "value", created));
ASSERT_TRUE(created);
ASSERT_OK(stateMachine.set(SSTR("a-" << i), SSTR("v-" << i)));
ASSERT_OK(stateMachine.set(SSTR("z#|#-" << i), SSTR("vz-" << i)));
RedisRequest items;
items.push_back(SSTR(i));
items.push_back(SSTR(i+1));
items.push_back(SSTR(i+200));
int64_t ignored;
ASSERT_OK(stateMachine.sadd(SSTR("some-set-" << i), items.begin(), items.end(), ignored));
ASSERT_OK(stateMachine.sadd("some-set", items.begin(), items.end(), ignored));
ASSERT_OK(stateMachine.lhset("locality-hash-1", SSTR("field-" << i), SSTR("hint-" << i), SSTR("lh1-value-" << i), created));
ASSERT_TRUE(created);
ASSERT_OK(stateMachine.lhset("locality-hash-2", "field", "hint", SSTR("lh2-value-" << i), created));
ASSERT_TRUE(created);
ASSERT_OK(stateMachine.lhset("locality-hash-3", SSTR("field-" << i), "hint", SSTR("lh3-value-" << i), created));
ASSERT_TRUE(created);
}
stateMachine.finalizeBulkload();
}
size_t len;
StateMachine stateMachine("/tmp/quarkdb-bulkload-test");
ASSERT_OK(stateMachine.hlen("some-key", len));
ASSERT_EQ(len, 100u);
ASSERT_OK(stateMachine.scard("some-set", len));
ASSERT_EQ(len, 201u);
ASSERT_OK(stateMachine.lhlen("locality-hash-1", len));
ASSERT_EQ(len, 100u);
ASSERT_OK(stateMachine.lhlen("locality-hash-2", len));
ASSERT_EQ(len, 1u);
ASSERT_OK(stateMachine.lhlen("locality-hash-3", len));
ASSERT_EQ(len, 100u);
std::string contents;
ASSERT_OK(stateMachine.lhget("locality-hash-2", "field", "", contents));
ASSERT_EQ(contents, "lh2-value-99");
ASSERT_OK(stateMachine.lhget("locality-hash-2", "field", "wrong-hint", contents));
ASSERT_EQ(contents, "lh2-value-99");
ASSERT_OK(stateMachine.lhget("locality-hash-2", "field", "hint", contents));
ASSERT_EQ(contents, "lh2-value-99");
for(size_t i = 0; i < 100; i++) {
ASSERT_OK(stateMachine.hlen(SSTR("some-key-" << i), len));
ASSERT_EQ(len, 1u);
std::string v;
ASSERT_OK(stateMachine.get(SSTR("a-" << i), v));
ASSERT_EQ(v, SSTR("v-" << i));
ASSERT_OK(stateMachine.get(SSTR("z#|#-" << i), v));
ASSERT_EQ(v, SSTR("vz-" << i));
ASSERT_OK(stateMachine.scard(SSTR("some-set-" << i), len));
ASSERT_EQ(len, 3u);
ASSERT_OK(stateMachine.lhget("locality-hash-1", SSTR("field-" << i), "", contents));
ASSERT_EQ(contents, SSTR("lh1-value-" << i));
ASSERT_OK(stateMachine.lhget("locality-hash-1", SSTR("field-" << i), "wrong-hint", contents));
ASSERT_EQ(contents, SSTR("lh1-value-" << i));
ASSERT_OK(stateMachine.lhget("locality-hash-1", SSTR("field-" << i), SSTR("hint-" << i), contents));
ASSERT_EQ(contents, SSTR("lh1-value-" << i));
ASSERT_OK(stateMachine.lhget("locality-hash-3", SSTR("field-" << i), "", contents));
ASSERT_EQ(contents, SSTR("lh3-value-" << i));
ASSERT_OK(stateMachine.lhget("locality-hash-3", SSTR("field-" << i), "wrong-hint", contents));
ASSERT_EQ(contents, SSTR("lh3-value-" << i));
ASSERT_OK(stateMachine.lhget("locality-hash-3", SSTR("field-" << i), "hint", contents));
ASSERT_EQ(contents, SSTR("lh3-value-" << i));
}
}
TEST(BulkLoad, PanicWhenOpeningUnfinalizedStateMachine) {
ASSERT_EQ(system("rm -rf /tmp/quarkdb-bulkload-test"), 0);
{
StateMachine stateMachine("/tmp/quarkdb-bulkload-test", false, true);
for(size_t i = 0; i < 100; i++) {
bool created;
ASSERT_OK(stateMachine.hset("some-key", SSTR("field-" << i), "value", created));
ASSERT_TRUE(created);
}
}
ASSERT_THROW(StateMachine("/tmp/quarkdb-bulkload-test"), FatalException);
}
TEST(Bulkload, RaftJournalAtNonZeroIndex) {
ASSERT_EQ(system("rm -rf /tmp/quarkdb-tests-raft-journal"), 0);
{
RaftServer srv {"localhost", 2222};
RaftJournal journal("/tmp/quarkdb-tests-raft-journal", "some-uuid", {srv}, 1337, FsyncPolicy::kAsync);
ASSERT_EQ(journal.getLogSize(), 1338);
ASSERT_EQ(journal.getLogStart(), 1337);
ASSERT_EQ(journal.getCommitIndex(), 1337);
ASSERT_EQ(journal.getEpoch(), 1337);
RaftEntry entry;
ASSERT_TRUE(journal.fetch(1337, entry).ok());
ASSERT_EQ(entry, RaftEntry(0, "JOURNAL_UPDATE_MEMBERS", "localhost:2222|", "some-uuid"));
}
}
TEST(Bulkload, CreateConsensusShardFromExistingSM) {
ASSERT_EQ(system("rm -rf /tmp/quarkdb-tests-shard-from-existing-sm"), 0);
ASSERT_EQ(system("mkdir /tmp/quarkdb-tests-shard-from-existing-sm"), 0);
std::unique_ptr sm;
sm.reset(new StateMachine("/tmp/quarkdb-tests-shard-from-existing-sm/original-sm"));
ASSERT_EQ(sm->getLastApplied(), 0);
ASSERT_EQ(sm->getPhysicalLocation(), "/tmp/quarkdb-tests-shard-from-existing-sm/original-sm");
ASSERT_TRUE(sm->set("my-key", "123", 1).ok());
std::string value;
ASSERT_TRUE(sm->get("my-key", value).ok());
ASSERT_EQ(value, "123");
ASSERT_EQ(sm->getLastApplied(), 1);
std::unique_ptr shardDir;
RaftServer srv("localhost", 123);
quarkdb::Status st;
shardDir.reset(ShardDirectory::create("/tmp/quarkdb-tests-shard-from-existing-sm/shard", "cluster-id", "shar-id", {srv}, 99, FsyncPolicy::kAsync, std::move(sm), st ));
st.assertOk();
ASSERT_EQ(sm.get(), nullptr);
ASSERT_EQ(shardDir->getStateMachine()->getPhysicalLocation(), "/tmp/quarkdb-tests-shard-from-existing-sm/shard/current/state-machine");
ASSERT_EQ(shardDir->getStateMachine()->getLastApplied(), 99);
ASSERT_TRUE(shardDir->getStateMachine()->get("my-key", value).ok());
}