From 456812065ad20fdf105325493f7f750e727ad469 Mon Sep 17 00:00:00 2001 From: Adam Kupczyk Date: Wed, 25 Mar 2020 15:00:00 +0100 Subject: [PATCH] kv/RocksDBStore: Added proper WholeSpaceIterator Added WholeSpaceIterator that properly assembles sharded and unsharded portions of RocksDB database into one consistent iterator. Signed-off-by: Adam Kupczyk --- src/kv/RocksDBStore.cc | 467 +++++++++++++++++++++++++++++++- src/kv/RocksDBStore.h | 5 +- src/test/objectstore/test_kv.cc | 330 ++++++++++++++++++++++ 3 files changed, 794 insertions(+), 8 deletions(-) diff --git a/src/kv/RocksDBStore.cc b/src/kv/RocksDBStore.cc index 6442273c762..07d7a7f7118 100644 --- a/src/kv/RocksDBStore.cc +++ b/src/kv/RocksDBStore.cc @@ -1906,12 +1906,6 @@ string RocksDBStore::past_prefix(const string &prefix) return limit; } -RocksDBStore::WholeSpaceIterator RocksDBStore::get_wholespace_iterator() -{ - return std::make_shared( - db->NewIterator(rocksdb::ReadOptions(), default_cf)); -} - class CFIteratorImpl : public KeyValueDB::IteratorImpl { protected: string prefix; @@ -1977,6 +1971,451 @@ public: } }; + +//merge column iterators and rest iterator +class WholeMergeIteratorImpl : public KeyValueDB::WholeSpaceIteratorImpl { +private: + RocksDBStore* db; + KeyValueDB::WholeSpaceIterator main; + std::map shards; + std::map::iterator current_shard; + enum {on_main, on_shard} smaller; + +public: + WholeMergeIteratorImpl(RocksDBStore* db) + : db(db) + , main(db->get_default_cf_iterator()) + { + for (auto& e : db->cf_handles) { + shards.emplace(e.first, db->get_iterator(e.first)); + } + } + + // returns true if value in main is smaller then in shards + // invalid is larger then actual value + bool is_main_smaller() { + if (main->valid()) { + if (current_shard != shards.end()) { + auto main_rk = main->raw_key(); + ceph_assert(current_shard->second->valid()); + auto shards_rk = current_shard->second->raw_key(); + if (main_rk.first < shards_rk.first) + return true; + if (main_rk.first > shards_rk.first) + return false; + return main_rk.second < shards_rk.second; + } else { + return true; + } + } else { + if (current_shard != shards.end()) { + return false; + } else { + //this means that neither is valid + //we select main to be smaller, so valid() will signal properly + return true; + } + } + } + + int seek_to_first() override { + int r0 = main->seek_to_first(); + int r1 = 0; + // find first shard that has some data + current_shard = shards.begin(); + while (current_shard != shards.end()) { + r1 = current_shard->second->seek_to_first(); + if (r1 != 0 || current_shard->second->valid()) { + //this is the first shard that will yield some keys + break; + } + ++current_shard; + } + smaller = is_main_smaller() ? on_main : on_shard; + return r0 == 0 && r1 == 0 ? 0 : -1; + } + + int seek_to_first(const std::string &prefix) override { + int r0 = main->seek_to_first(prefix); + int r1 = 0; + // find first shard that has some data + current_shard = shards.lower_bound(prefix); + while (current_shard != shards.end()) { + r1 = current_shard->second->seek_to_first(); + if (r1 != 0 || current_shard->second->valid()) { + //this is the first shard that will yield some keys + break; + } + ++current_shard; + } + smaller = is_main_smaller() ? on_main : on_shard; + return r0 == 0 && r1 == 0 ? 0 : -1; + }; + + int seek_to_last() override { + int r0 = main->seek_to_last(); + int r1 = 0; + r1 = shards_seek_to_last(); + //if we have 2 candidates, we need to select + if (main->valid()) { + if (shards_valid()) { + if (is_main_smaller()) { + smaller = on_shard; + main->next(); + } else { + smaller = on_main; + shards_next(); + } + } else { + smaller = on_main; + } + } else { + if (shards_valid()) { + smaller = on_shard; + } else { + smaller = on_main; + } + } + return r0 == 0 && r1 == 0 ? 0 : -1; + } + + int seek_to_last(const std::string &prefix) override { + int r0 = main->seek_to_last(prefix); + int r1 = 0; + // find last shard that has some data + bool found = false; + current_shard = shards.lower_bound(prefix); + while (current_shard != shards.begin()) { + r1 = current_shard->second->seek_to_last(); + if (r1 != 0) + break; + if (current_shard->second->valid()) { + found = true; + break; + } + } + //if we have 2 candidates, we need to select + if (main->valid() && found) { + if (is_main_smaller()) { + main->next(); + } else { + shards_next(); + } + } + if (!found) { + //set shards state that properly represents eof + current_shard = shards.end(); + } + smaller = is_main_smaller() ? on_main : on_shard; + return r0 == 0 && r1 == 0 ? 0 : -1; + } + + int upper_bound(const std::string &prefix, const std::string &after) override { + int r0 = main->upper_bound(prefix, after); + int r1 = 0; + if (r0 != 0) + return r0; + current_shard = shards.lower_bound(prefix); + if (current_shard != shards.end()) { + bool located = false; + if (current_shard->first == prefix) { + r1 = current_shard->second->upper_bound(after); + if (r1 != 0) + return r1; + if (current_shard->second->valid()) { + located = true; + } + } + if (!located) { + while (current_shard != shards.end()) { + r1 = current_shard->second->seek_to_first(); + if (r1 != 0) + return r1; + if (current_shard->second->valid()) + break; + ++current_shard; + } + } + } + smaller = is_main_smaller() ? on_main : on_shard; + return 0; + } + + int lower_bound(const std::string &prefix, const std::string &to) override { + int r0 = main->lower_bound(prefix, to); + int r1 = 0; + if (r0 != 0) + return r0; + current_shard = shards.lower_bound(prefix); + if (current_shard != shards.end()) { + bool located = false; + if (current_shard->first == prefix) { + r1 = current_shard->second->lower_bound(to); + if (r1 != 0) + return r1; + if (current_shard->second->valid()) { + located = true; + } + } + if (!located) { + while (current_shard != shards.end()) { + r1 = current_shard->second->seek_to_first(); + if (r1 != 0) + return r1; + if (current_shard->second->valid()) + break; + ++current_shard; + } + } + } + smaller = is_main_smaller() ? on_main : on_shard; + return 0; + } + + bool valid() override { + if (smaller == on_main) { + return main->valid(); + } else { + if (current_shard == shards.end()) + return false; + return current_shard->second->valid(); + } + }; + + int next() override { + int r; + if (smaller == on_main) { + r = main->next(); + } else { + r = shards_next(); + } + if (r != 0) + return r; + smaller = is_main_smaller() ? on_main : on_shard; + return 0; + } + + int prev() override { + int r; + bool main_was_valid = false; + if (main->valid()) { + main_was_valid = true; + r = main->prev(); + } else { + r = main->seek_to_last(); + } + if (r != 0) + return r; + + bool shards_was_valid = false; + if (shards_valid()) { + shards_was_valid = true; + r = shards_prev(); + } else { + r = shards_seek_to_last(); + } + if (r != 0) + return r; + + if (!main->valid() && !shards_valid()) { + //end, no previous. set marker so valid() can work + smaller = on_main; + return 0; + } + + //if 1 is valid, select it + //if 2 are valid select larger and advance the other + if (main->valid()) { + if (shards_valid()) { + if (is_main_smaller()) { + smaller = on_shard; + if (main_was_valid) { + if (main->valid()) { + r = main->next(); + } else { + r = main->seek_to_first(); + } + } else { + //if we have resurrected main, kill it + if (main->valid()) { + main->next(); + } + } + } else { + smaller = on_main; + if (shards_was_valid) { + if (shards_valid()) { + r = shards_next(); + } else { + r = shards_seek_to_first(); + } + } else { + //if we have resurected shards, kill it + if (shards_valid()) { + shards_next(); + } + } + } + } else { + smaller = on_main; + r = shards_seek_to_first(); + } + } else { + smaller = on_shard; + r = main->seek_to_first(); + } + return r; + } + + std::string key() override + { + if (smaller == on_main) { + return main->key(); + } else { + return current_shard->second->key(); + } + } + + std::pair raw_key() override + { + if (smaller == on_main) { + return main->raw_key(); + } else { + return { current_shard->first, current_shard->second->key() }; + } + } + + bool raw_key_is_prefixed(const std::string &prefix) override + { + if (smaller == on_main) { + return main->raw_key_is_prefixed(prefix); + } else { + return current_shard->first == prefix; + } + } + + ceph::buffer::list value() override + { + if (smaller == on_main) { + return main->value(); + } else { + return current_shard->second->value(); + } + } + + int status() override + { + //because we already had to inspect key, it must be ok + return 0; + } + + size_t key_size() override + { + if (smaller == on_main) { + return main->key_size(); + } else { + return current_shard->second->key().size(); + } + } + size_t value_size() override + { + if (smaller == on_main) { + return main->value_size(); + } else { + return current_shard->second->value().length(); + } + } + + int shards_valid() { + if (current_shard == shards.end()) + return false; + return current_shard->second->valid(); + } + + int shards_next() { + if (current_shard == shards.end()) { + //illegal to next() on !valid() + return -1; + } + int r = 0; + r = current_shard->second->next(); + if (r != 0) + return r; + if (current_shard->second->valid()) + return 0; + //current shard exhaused, search for key + ++current_shard; + while (current_shard != shards.end()) { + r = current_shard->second->seek_to_first(); + if (r != 0) + return r; + if (current_shard->second->valid()) + break; + ++current_shard; + } + //either we found key or not, but it is success + return 0; + } + + int shards_prev() { + if (current_shard == shards.end()) { + //illegal to prev() on !valid() + return -1; + } + int r = current_shard->second->prev(); + while (r == 0) { + if (current_shard->second->valid()) { + break; + } + if (current_shard == shards.begin()) { + //we have reached pre-first element + //this makes it !valid(), but guarantees next() moves to first element + break; + } + --current_shard; + r = current_shard->second->seek_to_last(); + } + return r; + } + + int shards_seek_to_last() { + int r = 0; + current_shard = shards.end(); + if (current_shard == shards.begin()) { + //no shards at all + return 0; + } + while (current_shard != shards.begin()) { + --current_shard; + r = current_shard->second->seek_to_last(); + if (r != 0) + return r; + if (current_shard->second->valid()) { + return 0; + } + } + //no keys at all + current_shard = shards.end(); + return r; + } + + int shards_seek_to_first() { + int r = 0; + current_shard = shards.begin(); + while (current_shard != shards.end()) { + r = current_shard->second->seek_to_first(); + if (r != 0) + break; + if (current_shard->second->valid()) { + //this is the first shard that will yield some keys + break; + } + ++current_shard; + } + return r; + } +}; + class ShardMergeIteratorImpl : public KeyValueDB::IteratorImpl { private: struct KeyLess { @@ -2209,3 +2648,19 @@ rocksdb::Iterator* RocksDBStore::new_shard_iterator(rocksdb::ColumnFamilyHandle* { return db->NewIterator(rocksdb::ReadOptions(), cf); } + +RocksDBStore::WholeSpaceIterator RocksDBStore::get_wholespace_iterator() +{ + if (cf_handles.size() == 0) { + return std::make_shared( + db->NewIterator(rocksdb::ReadOptions(), default_cf)); + } else { + return std::make_shared(this); + } +} + +RocksDBStore::WholeSpaceIterator RocksDBStore::get_default_cf_iterator() +{ + return std::make_shared( + db->NewIterator(rocksdb::ReadOptions(), default_cf)); +} diff --git a/src/kv/RocksDBStore.h b/src/kv/RocksDBStore.h index ca36bdb7df1..fe749145b32 100644 --- a/src/kv/RocksDBStore.h +++ b/src/kv/RocksDBStore.h @@ -86,6 +86,7 @@ class RocksDBStore : public KeyValueDB { uint64_t cache_size = 0; bool set_cache_flag = false; friend class ShardMergeIteratorImpl; + friend class WholeMergeIteratorImpl; /* * See RocksDB's definition of a column family(CF) and how to use it. * The interfaces of KeyValueDB is extended, when a column family is created. @@ -565,8 +566,8 @@ err: } WholeSpaceIterator get_wholespace_iterator() override; +private: + WholeSpaceIterator get_default_cf_iterator(); }; - - #endif diff --git a/src/test/objectstore/test_kv.cc b/src/test/objectstore/test_kv.cc index b011c0efce4..24053c7fbf8 100644 --- a/src/test/objectstore/test_kv.cc +++ b/src/test/objectstore/test_kv.cc @@ -665,11 +665,341 @@ TEST_P(KVTest, RocksDB_parse_sharding_def) { } + +class RocksDBShardingTest : public ::testing::TestWithParam { +public: + boost::scoped_ptr db; + + RocksDBShardingTest() : db(0) {} + + string _bl_to_str(bufferlist val) { + string str(val.c_str(), val.length()); + return str; + } + + void rm_r(string path) { + string cmd = string("rm -r ") + path; + if (verbose) + cout << "==> " << cmd << std::endl; + int r = ::system(cmd.c_str()); + if (r) { + cerr << "failed with exit code " << r + << ", continuing anyway" << std::endl; + } + } + + void SetUp() override { + verbose = getenv("VERBOSE") && strcmp(getenv("VERBOSE"), "1") == 0; + + int r = ::mkdir("kv_test_temp_dir", 0777); + if (r < 0 && errno != EEXIST) { + r = -errno; + cerr << __func__ << ": unable to create kv_test_temp_dir: " + << cpp_strerror(r) << std::endl; + return; + } + db.reset(KeyValueDB::create(g_ceph_context, "rocksdb", + "kv_test_temp_dir")); + ASSERT_EQ(0, db->init(g_conf()->bluestore_rocksdb_options)); + if (verbose) + cout << "Creating database with sharding: " << GetParam() << std::endl; + ASSERT_EQ(0, db->create_and_open(cout, GetParam())); + } + void TearDown() override { + db.reset(nullptr); + rm_r("kv_test_temp_dir"); + } + + /* + A - main 0/1/20 + B - shard 1/3 x 0/1/20 + C - main 0/1/20 + D - shard 1/3 x 0/1/20 + E - main 0/1/20 + */ + bool verbose; + std::vector sharding_defs = { + "Betelgeuse D", + "Betelgeuse(3) D", + "Betelgeuse D(3)", + "Betelgeuse(3) D(3)"}; + std::vector prefixes = {"Ad", "Betelgeuse", "C", "D", "Evade"}; + std::vector randoms = {"0", "1", "2", "3", "4", "5", + "found", "brain", "fully", "pen", "worth", "race", + "stand", "nodded", "whenever", "surrounded", "industrial", "skin", + "this", "direction", "family", "beginning", "whenever", "held", + "metal", "year", "like", "valuable", "softly", "whistle", + "perfectly", "broken", "idea", "also", "coffee", "branch", + "tongue", "immediately", "bent", "partly", "burn", "include", + "certain", "burst", "final", "smoke", "positive", "perfectly" + }; + int R = randoms.size(); + + typedef int test_id[6]; + void zero(test_id& x) { + k = 0; + v = 0; + for (auto& i:x) + i = 0; + } + bool end(const test_id& x) { + return x[5] != 0; + } + void next(test_id& x) { + x[0]++; + for (int i = 0; i < 5; i++) { + if (x[i] == 3) { + x[i] = 0; + ++x[i + 1]; + } + } + } + + std::map data; + int k = 0; + int v = 0; + + void generate_data(const test_id& x) { + data.clear(); + for (int i = 0; i < 5; i++) { + if (verbose) + std::cout << x[i] << "-"; + switch (x[i]) { + case 0: + break; + case 1: + data[RocksDBStore::combine_strings(prefixes[i], randoms[k++ % R])] = randoms[v++ % R]; + break; + case 2: + std::string base = randoms[k++ % R]; + for (int j = 0; j < 10; j++) { + data[RocksDBStore::combine_strings(prefixes[i], base + "." + randoms[k++ % R])] = randoms[v++ % R]; + } + break; + } + } + } + + void data_to_db() { + KeyValueDB::Transaction t = db->get_transaction(); + for (auto &d : data) { + bufferlist v1; + v1.append(d.second); + string prefix; + string key; + RocksDBStore::split_key(d.first, &prefix, &key); + t->set(prefix, key, v1); + if (verbose) + std::cout << "SET " << prefix << " " << key << std::endl; + } + ASSERT_EQ(db->submit_transaction_sync(t), 0); + } + + void clear_db() { + KeyValueDB::Transaction t = db->get_transaction(); + for (auto &d : data) { + string prefix; + string key; + RocksDBStore::split_key(d.first, &prefix, &key); + t->rmkey(prefix, key); + } + ASSERT_EQ(db->submit_transaction_sync(t), 0); + //paranoid, check if db empty + KeyValueDB::WholeSpaceIterator it = db->get_wholespace_iterator(); + ASSERT_EQ(it->seek_to_first(), 0); + ASSERT_EQ(it->valid(), false); + } +}; + +TEST_P(RocksDBShardingTest, wholespace_next) { + test_id X; + zero(X); + do { + generate_data(X); + data_to_db(); + + KeyValueDB::WholeSpaceIterator it = db->get_wholespace_iterator(); + //move forward + auto dit = data.begin(); + int r = it->seek_to_first(); + ASSERT_EQ(r, 0); + ASSERT_EQ(it->valid(), (dit != data.end())); + + while (dit != data.end()) { + ASSERT_EQ(it->valid(), true); + string prefix; + string key; + RocksDBStore::split_key(dit->first, &prefix, &key); + auto raw_key = it->raw_key(); + ASSERT_EQ(raw_key.first, prefix); + ASSERT_EQ(raw_key.second, key); + ASSERT_EQ(it->value().to_str(), dit->second); + if (verbose) + std::cout << "next " << prefix << " " << key << std::endl; + ASSERT_EQ(it->next(), 0); + ++dit; + } + ASSERT_EQ(it->valid(), false); + + clear_db(); + next(X); + } while (!end(X)); +} + +TEST_P(RocksDBShardingTest, wholespace_prev) { + test_id X; + zero(X); + do { + generate_data(X); + data_to_db(); + + KeyValueDB::WholeSpaceIterator it = db->get_wholespace_iterator(); + auto dit = data.rbegin(); + int r = it->seek_to_last(); + ASSERT_EQ(r, 0); + ASSERT_EQ(it->valid(), (dit != data.rend())); + + while (dit != data.rend()) { + ASSERT_EQ(it->valid(), true); + string prefix; + string key; + RocksDBStore::split_key(dit->first, &prefix, &key); + auto raw_key = it->raw_key(); + ASSERT_EQ(raw_key.first, prefix); + ASSERT_EQ(raw_key.second, key); + ASSERT_EQ(it->value().to_str(), dit->second); + if (verbose) + std::cout << "prev " << prefix << " " << key << std::endl; + ASSERT_EQ(it->prev(), 0); + ++dit; + } + ASSERT_EQ(it->valid(), false); + + clear_db(); + next(X); + } while (!end(X)); +} + +TEST_P(RocksDBShardingTest, wholespace_lower_bound) { + test_id X; + zero(X); + do { + generate_data(X); + data_to_db(); + + KeyValueDB::WholeSpaceIterator it = db->get_wholespace_iterator(); + auto dit = data.begin(); + int r = it->seek_to_first(); + ASSERT_EQ(r, 0); + ASSERT_EQ(it->valid(), (dit != data.end())); + + while (dit != data.end()) { + ASSERT_EQ(it->valid(), true); + string prefix; + string key; + RocksDBStore::split_key(dit->first, &prefix, &key); + KeyValueDB::WholeSpaceIterator it1 = db->get_wholespace_iterator(); + ASSERT_EQ(it1->lower_bound(prefix, key), 0); + ASSERT_EQ(it1->valid(), true); + auto raw_key = it1->raw_key(); + ASSERT_EQ(raw_key.first, prefix); + ASSERT_EQ(raw_key.second, key); + if (verbose) + std::cout << "lower_bound " << prefix << " " << key << std::endl; + ASSERT_EQ(it->next(), 0); + ++dit; + } + ASSERT_EQ(it->valid(), false); + + clear_db(); + next(X); + } while (!end(X)); +} + +TEST_P(RocksDBShardingTest, wholespace_upper_bound) { + test_id X; + zero(X); + do { + generate_data(X); + data_to_db(); + + KeyValueDB::WholeSpaceIterator it = db->get_wholespace_iterator(); + auto dit = data.begin(); + int r = it->seek_to_first(); + ASSERT_EQ(r, 0); + ASSERT_EQ(it->valid(), (dit != data.end())); + + while (dit != data.end()) { + ASSERT_EQ(it->valid(), true); + string prefix; + string key; + string key_minus_1; + RocksDBStore::split_key(dit->first, &prefix, &key); + //decrement key minimally + key_minus_1 = key.substr(0, key.length() - 1) + std::string(1, key[key.length() - 1] - 1); + KeyValueDB::WholeSpaceIterator it1 = db->get_wholespace_iterator(); + ASSERT_EQ(it1->upper_bound(prefix, key_minus_1), 0); + ASSERT_EQ(it1->valid(), true); + auto raw_key = it1->raw_key(); + ASSERT_EQ(raw_key.first, prefix); + ASSERT_EQ(raw_key.second, key); + if (verbose) + std::cout << "upper_bound " << prefix << " " << key_minus_1 << std::endl; + ASSERT_EQ(it->next(), 0); + ++dit; + } + ASSERT_EQ(it->valid(), false); + + clear_db(); + next(X); + } while (!end(X)); +} + +TEST_P(RocksDBShardingTest, wholespace_lookup_limits) { + test_id X; + zero(X); + do { + generate_data(X); + data_to_db(); + + //lookup before first + if (data.size() > 0) { + auto dit = data.begin(); + string prefix; + string key; + RocksDBStore::split_key(dit->first, &prefix, &key); + KeyValueDB::WholeSpaceIterator it1 = db->get_wholespace_iterator(); + ASSERT_EQ(it1->lower_bound(" ", " "), 0); + ASSERT_EQ(it1->valid(), true); + auto raw_key = it1->raw_key(); + ASSERT_EQ(raw_key.first, prefix); + ASSERT_EQ(raw_key.second, key); + } + //lookup after last + KeyValueDB::WholeSpaceIterator it1 = db->get_wholespace_iterator(); + ASSERT_EQ(it1->lower_bound("~", "~"), 0); + ASSERT_EQ(it1->valid(), false); + + clear_db(); + next(X); + } while (!end(X)); +} + + + INSTANTIATE_TEST_SUITE_P( KeyValueDB, KVTest, ::testing::Values("leveldb", "rocksdb", "memdb")); +INSTANTIATE_TEST_SUITE_P( + KeyValueDB, + RocksDBShardingTest, + ::testing::Values("Betelgeuse D", + "Betelgeuse(3) D", + "Betelgeuse D(3)", + "Betelgeuse(3) D(3)")); + int main(int argc, char **argv) { vector args; argv_to_vec(argc, (const char **)argv, args); -- 2.39.5