return limit;
}
-RocksDBStore::WholeSpaceIterator RocksDBStore::get_wholespace_iterator()
-{
- return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
- db->NewIterator(rocksdb::ReadOptions(), default_cf));
-}
-
class CFIteratorImpl : public KeyValueDB::IteratorImpl {
protected:
string prefix;
}
};
+
+//merge column iterators and rest iterator
+class WholeMergeIteratorImpl : public KeyValueDB::WholeSpaceIteratorImpl {
+private:
+ RocksDBStore* db;
+ KeyValueDB::WholeSpaceIterator main;
+ std::map<std::string, KeyValueDB::Iterator> shards;
+ std::map<std::string, KeyValueDB::Iterator>::iterator current_shard;
+ enum {on_main, on_shard} smaller;
+
+public:
+ WholeMergeIteratorImpl(RocksDBStore* db)
+ : db(db)
+ , main(db->get_default_cf_iterator())
+ {
+ for (auto& e : db->cf_handles) {
+ shards.emplace(e.first, db->get_iterator(e.first));
+ }
+ }
+
+ // returns true if value in main is smaller then in shards
+ // invalid is larger then actual value
+ bool is_main_smaller() {
+ if (main->valid()) {
+ if (current_shard != shards.end()) {
+ auto main_rk = main->raw_key();
+ ceph_assert(current_shard->second->valid());
+ auto shards_rk = current_shard->second->raw_key();
+ if (main_rk.first < shards_rk.first)
+ return true;
+ if (main_rk.first > shards_rk.first)
+ return false;
+ return main_rk.second < shards_rk.second;
+ } else {
+ return true;
+ }
+ } else {
+ if (current_shard != shards.end()) {
+ return false;
+ } else {
+ //this means that neither is valid
+ //we select main to be smaller, so valid() will signal properly
+ return true;
+ }
+ }
+ }
+
+ int seek_to_first() override {
+ int r0 = main->seek_to_first();
+ int r1 = 0;
+ // find first shard that has some data
+ current_shard = shards.begin();
+ while (current_shard != shards.end()) {
+ r1 = current_shard->second->seek_to_first();
+ if (r1 != 0 || current_shard->second->valid()) {
+ //this is the first shard that will yield some keys
+ break;
+ }
+ ++current_shard;
+ }
+ smaller = is_main_smaller() ? on_main : on_shard;
+ return r0 == 0 && r1 == 0 ? 0 : -1;
+ }
+
+ int seek_to_first(const std::string &prefix) override {
+ int r0 = main->seek_to_first(prefix);
+ int r1 = 0;
+ // find first shard that has some data
+ current_shard = shards.lower_bound(prefix);
+ while (current_shard != shards.end()) {
+ r1 = current_shard->second->seek_to_first();
+ if (r1 != 0 || current_shard->second->valid()) {
+ //this is the first shard that will yield some keys
+ break;
+ }
+ ++current_shard;
+ }
+ smaller = is_main_smaller() ? on_main : on_shard;
+ return r0 == 0 && r1 == 0 ? 0 : -1;
+ };
+
+ int seek_to_last() override {
+ int r0 = main->seek_to_last();
+ int r1 = 0;
+ r1 = shards_seek_to_last();
+ //if we have 2 candidates, we need to select
+ if (main->valid()) {
+ if (shards_valid()) {
+ if (is_main_smaller()) {
+ smaller = on_shard;
+ main->next();
+ } else {
+ smaller = on_main;
+ shards_next();
+ }
+ } else {
+ smaller = on_main;
+ }
+ } else {
+ if (shards_valid()) {
+ smaller = on_shard;
+ } else {
+ smaller = on_main;
+ }
+ }
+ return r0 == 0 && r1 == 0 ? 0 : -1;
+ }
+
+ int seek_to_last(const std::string &prefix) override {
+ int r0 = main->seek_to_last(prefix);
+ int r1 = 0;
+ // find last shard that has some data
+ bool found = false;
+ current_shard = shards.lower_bound(prefix);
+ while (current_shard != shards.begin()) {
+ r1 = current_shard->second->seek_to_last();
+ if (r1 != 0)
+ break;
+ if (current_shard->second->valid()) {
+ found = true;
+ break;
+ }
+ }
+ //if we have 2 candidates, we need to select
+ if (main->valid() && found) {
+ if (is_main_smaller()) {
+ main->next();
+ } else {
+ shards_next();
+ }
+ }
+ if (!found) {
+ //set shards state that properly represents eof
+ current_shard = shards.end();
+ }
+ smaller = is_main_smaller() ? on_main : on_shard;
+ return r0 == 0 && r1 == 0 ? 0 : -1;
+ }
+
+ int upper_bound(const std::string &prefix, const std::string &after) override {
+ int r0 = main->upper_bound(prefix, after);
+ int r1 = 0;
+ if (r0 != 0)
+ return r0;
+ current_shard = shards.lower_bound(prefix);
+ if (current_shard != shards.end()) {
+ bool located = false;
+ if (current_shard->first == prefix) {
+ r1 = current_shard->second->upper_bound(after);
+ if (r1 != 0)
+ return r1;
+ if (current_shard->second->valid()) {
+ located = true;
+ }
+ }
+ if (!located) {
+ while (current_shard != shards.end()) {
+ r1 = current_shard->second->seek_to_first();
+ if (r1 != 0)
+ return r1;
+ if (current_shard->second->valid())
+ break;
+ ++current_shard;
+ }
+ }
+ }
+ smaller = is_main_smaller() ? on_main : on_shard;
+ return 0;
+ }
+
+ int lower_bound(const std::string &prefix, const std::string &to) override {
+ int r0 = main->lower_bound(prefix, to);
+ int r1 = 0;
+ if (r0 != 0)
+ return r0;
+ current_shard = shards.lower_bound(prefix);
+ if (current_shard != shards.end()) {
+ bool located = false;
+ if (current_shard->first == prefix) {
+ r1 = current_shard->second->lower_bound(to);
+ if (r1 != 0)
+ return r1;
+ if (current_shard->second->valid()) {
+ located = true;
+ }
+ }
+ if (!located) {
+ while (current_shard != shards.end()) {
+ r1 = current_shard->second->seek_to_first();
+ if (r1 != 0)
+ return r1;
+ if (current_shard->second->valid())
+ break;
+ ++current_shard;
+ }
+ }
+ }
+ smaller = is_main_smaller() ? on_main : on_shard;
+ return 0;
+ }
+
+ bool valid() override {
+ if (smaller == on_main) {
+ return main->valid();
+ } else {
+ if (current_shard == shards.end())
+ return false;
+ return current_shard->second->valid();
+ }
+ };
+
+ int next() override {
+ int r;
+ if (smaller == on_main) {
+ r = main->next();
+ } else {
+ r = shards_next();
+ }
+ if (r != 0)
+ return r;
+ smaller = is_main_smaller() ? on_main : on_shard;
+ return 0;
+ }
+
+ int prev() override {
+ int r;
+ bool main_was_valid = false;
+ if (main->valid()) {
+ main_was_valid = true;
+ r = main->prev();
+ } else {
+ r = main->seek_to_last();
+ }
+ if (r != 0)
+ return r;
+
+ bool shards_was_valid = false;
+ if (shards_valid()) {
+ shards_was_valid = true;
+ r = shards_prev();
+ } else {
+ r = shards_seek_to_last();
+ }
+ if (r != 0)
+ return r;
+
+ if (!main->valid() && !shards_valid()) {
+ //end, no previous. set marker so valid() can work
+ smaller = on_main;
+ return 0;
+ }
+
+ //if 1 is valid, select it
+ //if 2 are valid select larger and advance the other
+ if (main->valid()) {
+ if (shards_valid()) {
+ if (is_main_smaller()) {
+ smaller = on_shard;
+ if (main_was_valid) {
+ if (main->valid()) {
+ r = main->next();
+ } else {
+ r = main->seek_to_first();
+ }
+ } else {
+ //if we have resurrected main, kill it
+ if (main->valid()) {
+ main->next();
+ }
+ }
+ } else {
+ smaller = on_main;
+ if (shards_was_valid) {
+ if (shards_valid()) {
+ r = shards_next();
+ } else {
+ r = shards_seek_to_first();
+ }
+ } else {
+ //if we have resurected shards, kill it
+ if (shards_valid()) {
+ shards_next();
+ }
+ }
+ }
+ } else {
+ smaller = on_main;
+ r = shards_seek_to_first();
+ }
+ } else {
+ smaller = on_shard;
+ r = main->seek_to_first();
+ }
+ return r;
+ }
+
+ std::string key() override
+ {
+ if (smaller == on_main) {
+ return main->key();
+ } else {
+ return current_shard->second->key();
+ }
+ }
+
+ std::pair<std::string,std::string> raw_key() override
+ {
+ if (smaller == on_main) {
+ return main->raw_key();
+ } else {
+ return { current_shard->first, current_shard->second->key() };
+ }
+ }
+
+ bool raw_key_is_prefixed(const std::string &prefix) override
+ {
+ if (smaller == on_main) {
+ return main->raw_key_is_prefixed(prefix);
+ } else {
+ return current_shard->first == prefix;
+ }
+ }
+
+ ceph::buffer::list value() override
+ {
+ if (smaller == on_main) {
+ return main->value();
+ } else {
+ return current_shard->second->value();
+ }
+ }
+
+ int status() override
+ {
+ //because we already had to inspect key, it must be ok
+ return 0;
+ }
+
+ size_t key_size() override
+ {
+ if (smaller == on_main) {
+ return main->key_size();
+ } else {
+ return current_shard->second->key().size();
+ }
+ }
+ size_t value_size() override
+ {
+ if (smaller == on_main) {
+ return main->value_size();
+ } else {
+ return current_shard->second->value().length();
+ }
+ }
+
+ int shards_valid() {
+ if (current_shard == shards.end())
+ return false;
+ return current_shard->second->valid();
+ }
+
+ int shards_next() {
+ if (current_shard == shards.end()) {
+ //illegal to next() on !valid()
+ return -1;
+ }
+ int r = 0;
+ r = current_shard->second->next();
+ if (r != 0)
+ return r;
+ if (current_shard->second->valid())
+ return 0;
+ //current shard exhaused, search for key
+ ++current_shard;
+ while (current_shard != shards.end()) {
+ r = current_shard->second->seek_to_first();
+ if (r != 0)
+ return r;
+ if (current_shard->second->valid())
+ break;
+ ++current_shard;
+ }
+ //either we found key or not, but it is success
+ return 0;
+ }
+
+ int shards_prev() {
+ if (current_shard == shards.end()) {
+ //illegal to prev() on !valid()
+ return -1;
+ }
+ int r = current_shard->second->prev();
+ while (r == 0) {
+ if (current_shard->second->valid()) {
+ break;
+ }
+ if (current_shard == shards.begin()) {
+ //we have reached pre-first element
+ //this makes it !valid(), but guarantees next() moves to first element
+ break;
+ }
+ --current_shard;
+ r = current_shard->second->seek_to_last();
+ }
+ return r;
+ }
+
+ int shards_seek_to_last() {
+ int r = 0;
+ current_shard = shards.end();
+ if (current_shard == shards.begin()) {
+ //no shards at all
+ return 0;
+ }
+ while (current_shard != shards.begin()) {
+ --current_shard;
+ r = current_shard->second->seek_to_last();
+ if (r != 0)
+ return r;
+ if (current_shard->second->valid()) {
+ return 0;
+ }
+ }
+ //no keys at all
+ current_shard = shards.end();
+ return r;
+ }
+
+ int shards_seek_to_first() {
+ int r = 0;
+ current_shard = shards.begin();
+ while (current_shard != shards.end()) {
+ r = current_shard->second->seek_to_first();
+ if (r != 0)
+ break;
+ if (current_shard->second->valid()) {
+ //this is the first shard that will yield some keys
+ break;
+ }
+ ++current_shard;
+ }
+ return r;
+ }
+};
+
class ShardMergeIteratorImpl : public KeyValueDB::IteratorImpl {
private:
struct KeyLess {
{
return db->NewIterator(rocksdb::ReadOptions(), cf);
}
+
+RocksDBStore::WholeSpaceIterator RocksDBStore::get_wholespace_iterator()
+{
+ if (cf_handles.size() == 0) {
+ return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
+ db->NewIterator(rocksdb::ReadOptions(), default_cf));
+ } else {
+ return std::make_shared<WholeMergeIteratorImpl>(this);
+ }
+}
+
+RocksDBStore::WholeSpaceIterator RocksDBStore::get_default_cf_iterator()
+{
+ return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
+ db->NewIterator(rocksdb::ReadOptions(), default_cf));
+}
}
+
+class RocksDBShardingTest : public ::testing::TestWithParam<const char*> {
+public:
+ boost::scoped_ptr<KeyValueDB> db;
+
+ RocksDBShardingTest() : db(0) {}
+
+ string _bl_to_str(bufferlist val) {
+ string str(val.c_str(), val.length());
+ return str;
+ }
+
+ void rm_r(string path) {
+ string cmd = string("rm -r ") + path;
+ if (verbose)
+ cout << "==> " << cmd << std::endl;
+ int r = ::system(cmd.c_str());
+ if (r) {
+ cerr << "failed with exit code " << r
+ << ", continuing anyway" << std::endl;
+ }
+ }
+
+ void SetUp() override {
+ verbose = getenv("VERBOSE") && strcmp(getenv("VERBOSE"), "1") == 0;
+
+ int r = ::mkdir("kv_test_temp_dir", 0777);
+ if (r < 0 && errno != EEXIST) {
+ r = -errno;
+ cerr << __func__ << ": unable to create kv_test_temp_dir: "
+ << cpp_strerror(r) << std::endl;
+ return;
+ }
+ db.reset(KeyValueDB::create(g_ceph_context, "rocksdb",
+ "kv_test_temp_dir"));
+ ASSERT_EQ(0, db->init(g_conf()->bluestore_rocksdb_options));
+ if (verbose)
+ cout << "Creating database with sharding: " << GetParam() << std::endl;
+ ASSERT_EQ(0, db->create_and_open(cout, GetParam()));
+ }
+ void TearDown() override {
+ db.reset(nullptr);
+ rm_r("kv_test_temp_dir");
+ }
+
+ /*
+ A - main 0/1/20
+ B - shard 1/3 x 0/1/20
+ C - main 0/1/20
+ D - shard 1/3 x 0/1/20
+ E - main 0/1/20
+ */
+ bool verbose;
+ std::vector<std::string> sharding_defs = {
+ "Betelgeuse D",
+ "Betelgeuse(3) D",
+ "Betelgeuse D(3)",
+ "Betelgeuse(3) D(3)"};
+ std::vector<std::string> prefixes = {"Ad", "Betelgeuse", "C", "D", "Evade"};
+ std::vector<std::string> randoms = {"0", "1", "2", "3", "4", "5",
+ "found", "brain", "fully", "pen", "worth", "race",
+ "stand", "nodded", "whenever", "surrounded", "industrial", "skin",
+ "this", "direction", "family", "beginning", "whenever", "held",
+ "metal", "year", "like", "valuable", "softly", "whistle",
+ "perfectly", "broken", "idea", "also", "coffee", "branch",
+ "tongue", "immediately", "bent", "partly", "burn", "include",
+ "certain", "burst", "final", "smoke", "positive", "perfectly"
+ };
+ int R = randoms.size();
+
+ typedef int test_id[6];
+ void zero(test_id& x) {
+ k = 0;
+ v = 0;
+ for (auto& i:x)
+ i = 0;
+ }
+ bool end(const test_id& x) {
+ return x[5] != 0;
+ }
+ void next(test_id& x) {
+ x[0]++;
+ for (int i = 0; i < 5; i++) {
+ if (x[i] == 3) {
+ x[i] = 0;
+ ++x[i + 1];
+ }
+ }
+ }
+
+ std::map<std::string, std::string> data;
+ int k = 0;
+ int v = 0;
+
+ void generate_data(const test_id& x) {
+ data.clear();
+ for (int i = 0; i < 5; i++) {
+ if (verbose)
+ std::cout << x[i] << "-";
+ switch (x[i]) {
+ case 0:
+ break;
+ case 1:
+ data[RocksDBStore::combine_strings(prefixes[i], randoms[k++ % R])] = randoms[v++ % R];
+ break;
+ case 2:
+ std::string base = randoms[k++ % R];
+ for (int j = 0; j < 10; j++) {
+ data[RocksDBStore::combine_strings(prefixes[i], base + "." + randoms[k++ % R])] = randoms[v++ % R];
+ }
+ break;
+ }
+ }
+ }
+
+ void data_to_db() {
+ KeyValueDB::Transaction t = db->get_transaction();
+ for (auto &d : data) {
+ bufferlist v1;
+ v1.append(d.second);
+ string prefix;
+ string key;
+ RocksDBStore::split_key(d.first, &prefix, &key);
+ t->set(prefix, key, v1);
+ if (verbose)
+ std::cout << "SET " << prefix << " " << key << std::endl;
+ }
+ ASSERT_EQ(db->submit_transaction_sync(t), 0);
+ }
+
+ void clear_db() {
+ KeyValueDB::Transaction t = db->get_transaction();
+ for (auto &d : data) {
+ string prefix;
+ string key;
+ RocksDBStore::split_key(d.first, &prefix, &key);
+ t->rmkey(prefix, key);
+ }
+ ASSERT_EQ(db->submit_transaction_sync(t), 0);
+ //paranoid, check if db empty
+ KeyValueDB::WholeSpaceIterator it = db->get_wholespace_iterator();
+ ASSERT_EQ(it->seek_to_first(), 0);
+ ASSERT_EQ(it->valid(), false);
+ }
+};
+
+TEST_P(RocksDBShardingTest, wholespace_next) {
+ test_id X;
+ zero(X);
+ do {
+ generate_data(X);
+ data_to_db();
+
+ KeyValueDB::WholeSpaceIterator it = db->get_wholespace_iterator();
+ //move forward
+ auto dit = data.begin();
+ int r = it->seek_to_first();
+ ASSERT_EQ(r, 0);
+ ASSERT_EQ(it->valid(), (dit != data.end()));
+
+ while (dit != data.end()) {
+ ASSERT_EQ(it->valid(), true);
+ string prefix;
+ string key;
+ RocksDBStore::split_key(dit->first, &prefix, &key);
+ auto raw_key = it->raw_key();
+ ASSERT_EQ(raw_key.first, prefix);
+ ASSERT_EQ(raw_key.second, key);
+ ASSERT_EQ(it->value().to_str(), dit->second);
+ if (verbose)
+ std::cout << "next " << prefix << " " << key << std::endl;
+ ASSERT_EQ(it->next(), 0);
+ ++dit;
+ }
+ ASSERT_EQ(it->valid(), false);
+
+ clear_db();
+ next(X);
+ } while (!end(X));
+}
+
+TEST_P(RocksDBShardingTest, wholespace_prev) {
+ test_id X;
+ zero(X);
+ do {
+ generate_data(X);
+ data_to_db();
+
+ KeyValueDB::WholeSpaceIterator it = db->get_wholespace_iterator();
+ auto dit = data.rbegin();
+ int r = it->seek_to_last();
+ ASSERT_EQ(r, 0);
+ ASSERT_EQ(it->valid(), (dit != data.rend()));
+
+ while (dit != data.rend()) {
+ ASSERT_EQ(it->valid(), true);
+ string prefix;
+ string key;
+ RocksDBStore::split_key(dit->first, &prefix, &key);
+ auto raw_key = it->raw_key();
+ ASSERT_EQ(raw_key.first, prefix);
+ ASSERT_EQ(raw_key.second, key);
+ ASSERT_EQ(it->value().to_str(), dit->second);
+ if (verbose)
+ std::cout << "prev " << prefix << " " << key << std::endl;
+ ASSERT_EQ(it->prev(), 0);
+ ++dit;
+ }
+ ASSERT_EQ(it->valid(), false);
+
+ clear_db();
+ next(X);
+ } while (!end(X));
+}
+
+TEST_P(RocksDBShardingTest, wholespace_lower_bound) {
+ test_id X;
+ zero(X);
+ do {
+ generate_data(X);
+ data_to_db();
+
+ KeyValueDB::WholeSpaceIterator it = db->get_wholespace_iterator();
+ auto dit = data.begin();
+ int r = it->seek_to_first();
+ ASSERT_EQ(r, 0);
+ ASSERT_EQ(it->valid(), (dit != data.end()));
+
+ while (dit != data.end()) {
+ ASSERT_EQ(it->valid(), true);
+ string prefix;
+ string key;
+ RocksDBStore::split_key(dit->first, &prefix, &key);
+ KeyValueDB::WholeSpaceIterator it1 = db->get_wholespace_iterator();
+ ASSERT_EQ(it1->lower_bound(prefix, key), 0);
+ ASSERT_EQ(it1->valid(), true);
+ auto raw_key = it1->raw_key();
+ ASSERT_EQ(raw_key.first, prefix);
+ ASSERT_EQ(raw_key.second, key);
+ if (verbose)
+ std::cout << "lower_bound " << prefix << " " << key << std::endl;
+ ASSERT_EQ(it->next(), 0);
+ ++dit;
+ }
+ ASSERT_EQ(it->valid(), false);
+
+ clear_db();
+ next(X);
+ } while (!end(X));
+}
+
+TEST_P(RocksDBShardingTest, wholespace_upper_bound) {
+ test_id X;
+ zero(X);
+ do {
+ generate_data(X);
+ data_to_db();
+
+ KeyValueDB::WholeSpaceIterator it = db->get_wholespace_iterator();
+ auto dit = data.begin();
+ int r = it->seek_to_first();
+ ASSERT_EQ(r, 0);
+ ASSERT_EQ(it->valid(), (dit != data.end()));
+
+ while (dit != data.end()) {
+ ASSERT_EQ(it->valid(), true);
+ string prefix;
+ string key;
+ string key_minus_1;
+ RocksDBStore::split_key(dit->first, &prefix, &key);
+ //decrement key minimally
+ key_minus_1 = key.substr(0, key.length() - 1) + std::string(1, key[key.length() - 1] - 1);
+ KeyValueDB::WholeSpaceIterator it1 = db->get_wholespace_iterator();
+ ASSERT_EQ(it1->upper_bound(prefix, key_minus_1), 0);
+ ASSERT_EQ(it1->valid(), true);
+ auto raw_key = it1->raw_key();
+ ASSERT_EQ(raw_key.first, prefix);
+ ASSERT_EQ(raw_key.second, key);
+ if (verbose)
+ std::cout << "upper_bound " << prefix << " " << key_minus_1 << std::endl;
+ ASSERT_EQ(it->next(), 0);
+ ++dit;
+ }
+ ASSERT_EQ(it->valid(), false);
+
+ clear_db();
+ next(X);
+ } while (!end(X));
+}
+
+TEST_P(RocksDBShardingTest, wholespace_lookup_limits) {
+ test_id X;
+ zero(X);
+ do {
+ generate_data(X);
+ data_to_db();
+
+ //lookup before first
+ if (data.size() > 0) {
+ auto dit = data.begin();
+ string prefix;
+ string key;
+ RocksDBStore::split_key(dit->first, &prefix, &key);
+ KeyValueDB::WholeSpaceIterator it1 = db->get_wholespace_iterator();
+ ASSERT_EQ(it1->lower_bound(" ", " "), 0);
+ ASSERT_EQ(it1->valid(), true);
+ auto raw_key = it1->raw_key();
+ ASSERT_EQ(raw_key.first, prefix);
+ ASSERT_EQ(raw_key.second, key);
+ }
+ //lookup after last
+ KeyValueDB::WholeSpaceIterator it1 = db->get_wholespace_iterator();
+ ASSERT_EQ(it1->lower_bound("~", "~"), 0);
+ ASSERT_EQ(it1->valid(), false);
+
+ clear_db();
+ next(X);
+ } while (!end(X));
+}
+
+
+
INSTANTIATE_TEST_SUITE_P(
KeyValueDB,
KVTest,
::testing::Values("leveldb", "rocksdb", "memdb"));
+INSTANTIATE_TEST_SUITE_P(
+ KeyValueDB,
+ RocksDBShardingTest,
+ ::testing::Values("Betelgeuse D",
+ "Betelgeuse(3) D",
+ "Betelgeuse D(3)",
+ "Betelgeuse(3) D(3)"));
+
int main(int argc, char **argv) {
vector<const char*> args;
argv_to_vec(argc, (const char **)argv, args);