]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
kv/RocksDBStore: Added proper WholeSpaceIterator
authorAdam Kupczyk <akupczyk@redhat.com>
Wed, 25 Mar 2020 14:00:00 +0000 (15:00 +0100)
committerAdam Kupczyk <akupczyk@redhat.com>
Fri, 17 Apr 2020 06:58:06 +0000 (08:58 +0200)
Added WholeSpaceIterator that properly assembles sharded and unsharded
portions of RocksDB database into one consistent iterator.

Signed-off-by: Adam Kupczyk <akupczyk@redhat.com>
src/kv/RocksDBStore.cc
src/kv/RocksDBStore.h
src/test/objectstore/test_kv.cc

index 6442273c7621d1c81cae14411ea6f2f2b8d00070..07d7a7f71184fedd00f05a86ef6183c3d7034e0d 100644 (file)
@@ -1906,12 +1906,6 @@ string RocksDBStore::past_prefix(const string &prefix)
   return limit;
 }
 
-RocksDBStore::WholeSpaceIterator RocksDBStore::get_wholespace_iterator()
-{
-  return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
-    db->NewIterator(rocksdb::ReadOptions(), default_cf));
-}
-
 class CFIteratorImpl : public KeyValueDB::IteratorImpl {
 protected:
   string prefix;
@@ -1977,6 +1971,451 @@ public:
   }
 };
 
+
+//merge column iterators and rest iterator
+class WholeMergeIteratorImpl : public KeyValueDB::WholeSpaceIteratorImpl {
+private:
+  RocksDBStore* db;
+  KeyValueDB::WholeSpaceIterator main;
+  std::map<std::string, KeyValueDB::Iterator> shards;
+  std::map<std::string, KeyValueDB::Iterator>::iterator current_shard;
+  enum {on_main, on_shard} smaller;
+
+public:
+  WholeMergeIteratorImpl(RocksDBStore* db)
+    : db(db)
+    , main(db->get_default_cf_iterator())
+  {
+    for (auto& e : db->cf_handles) {
+      shards.emplace(e.first, db->get_iterator(e.first));
+    }
+  }
+
+  // returns true if value in main is smaller then in shards
+  // invalid is larger then actual value
+  bool is_main_smaller() {
+    if (main->valid()) {
+      if (current_shard != shards.end()) {
+       auto main_rk = main->raw_key();
+       ceph_assert(current_shard->second->valid());
+       auto shards_rk = current_shard->second->raw_key();
+       if (main_rk.first < shards_rk.first)
+         return true;
+       if (main_rk.first > shards_rk.first)
+         return false;
+       return main_rk.second < shards_rk.second;
+      } else {
+       return true;
+      }
+    } else {
+      if (current_shard != shards.end()) {
+       return false;
+      } else {
+       //this means that neither is valid
+       //we select main to be smaller, so valid() will signal properly
+       return true;
+      }
+    }
+  }
+
+  int seek_to_first() override {
+    int r0 = main->seek_to_first();
+    int r1 = 0;
+    // find first shard that has some data
+    current_shard = shards.begin();
+    while (current_shard != shards.end()) {
+      r1 = current_shard->second->seek_to_first();
+      if (r1 != 0 || current_shard->second->valid()) {
+       //this is the first shard that will yield some keys
+       break;
+      }
+      ++current_shard;
+    }
+    smaller = is_main_smaller() ? on_main : on_shard;
+    return r0 == 0 && r1 == 0 ? 0 : -1;
+  }
+
+  int seek_to_first(const std::string &prefix) override {
+    int r0 = main->seek_to_first(prefix);
+    int r1 = 0;
+    // find first shard that has some data
+    current_shard = shards.lower_bound(prefix);
+    while (current_shard != shards.end()) {
+      r1 = current_shard->second->seek_to_first();
+      if (r1 != 0 || current_shard->second->valid()) {
+       //this is the first shard that will yield some keys
+       break;
+      }
+      ++current_shard;
+    }
+    smaller = is_main_smaller() ? on_main : on_shard;
+    return r0 == 0 && r1 == 0 ? 0 : -1;
+  };
+
+  int seek_to_last() override {
+    int r0 = main->seek_to_last();
+    int r1 = 0;
+    r1 = shards_seek_to_last();
+    //if we have 2 candidates, we need to select
+    if (main->valid()) {
+      if (shards_valid()) {
+       if (is_main_smaller()) {
+         smaller = on_shard;
+         main->next();
+       } else {
+         smaller = on_main;
+         shards_next();
+       }
+      } else {
+       smaller = on_main;
+      }
+    } else {
+      if (shards_valid()) {
+       smaller = on_shard;
+      } else {
+       smaller = on_main;
+      }
+    }
+    return r0 == 0 && r1 == 0 ? 0 : -1;
+  }
+
+  int seek_to_last(const std::string &prefix) override {
+    int r0 = main->seek_to_last(prefix);
+    int r1 = 0;
+    // find last shard that has some data
+    bool found = false;
+    current_shard = shards.lower_bound(prefix);
+    while (current_shard != shards.begin()) {
+      r1 = current_shard->second->seek_to_last();
+      if (r1 != 0)
+       break;
+      if (current_shard->second->valid()) {
+       found = true;
+       break;
+      }
+    }
+    //if we have 2 candidates, we need to select
+    if (main->valid() && found) {
+      if (is_main_smaller()) {
+       main->next();
+      } else {
+       shards_next();
+      }
+    }
+    if (!found) {
+      //set shards state that properly represents eof
+      current_shard = shards.end();
+    }
+    smaller = is_main_smaller() ? on_main : on_shard;
+    return r0 == 0 && r1 == 0 ? 0 : -1;
+  }
+
+  int upper_bound(const std::string &prefix, const std::string &after) override {
+    int r0 = main->upper_bound(prefix, after);
+    int r1 = 0;
+    if (r0 != 0)
+      return r0;
+    current_shard = shards.lower_bound(prefix);
+    if (current_shard != shards.end()) {
+      bool located = false;
+      if (current_shard->first == prefix) {
+       r1 = current_shard->second->upper_bound(after);
+       if (r1 != 0)
+         return r1;
+        if (current_shard->second->valid()) {
+         located = true;
+       }
+      }
+      if (!located) {
+       while (current_shard != shards.end()) {
+         r1 = current_shard->second->seek_to_first();
+         if (r1 != 0)
+           return r1;
+         if (current_shard->second->valid())
+           break;
+         ++current_shard;
+       }
+      }
+    }
+    smaller = is_main_smaller() ? on_main : on_shard;
+    return 0;
+  }
+
+  int lower_bound(const std::string &prefix, const std::string &to) override {
+    int r0 = main->lower_bound(prefix, to);
+    int r1 = 0;
+    if (r0 != 0)
+      return r0;
+    current_shard = shards.lower_bound(prefix);
+    if (current_shard != shards.end()) {
+      bool located = false;
+      if (current_shard->first == prefix) {
+       r1 = current_shard->second->lower_bound(to);
+       if (r1 != 0)
+         return r1;
+       if (current_shard->second->valid()) {
+         located = true;
+       }
+      }
+      if (!located) {
+       while (current_shard != shards.end()) {
+         r1 = current_shard->second->seek_to_first();
+         if (r1 != 0)
+           return r1;
+         if (current_shard->second->valid())
+           break;
+         ++current_shard;
+       }
+      }
+    }
+    smaller = is_main_smaller() ? on_main : on_shard;
+    return 0;
+  }
+
+  bool valid() override {
+    if (smaller == on_main) {
+      return main->valid();
+    } else {
+      if (current_shard == shards.end())
+       return false;
+      return current_shard->second->valid();
+    }
+  };
+
+  int next() override {
+    int r;
+    if (smaller == on_main) {
+      r = main->next();
+    } else {
+      r = shards_next();
+    }
+    if (r != 0)
+      return r;
+    smaller = is_main_smaller() ? on_main : on_shard;
+    return 0;
+  }
+
+  int prev() override {
+    int r;
+    bool main_was_valid = false;
+    if (main->valid()) {
+      main_was_valid = true;
+      r = main->prev();
+    } else {
+      r = main->seek_to_last();
+    }
+    if (r != 0)
+      return r;
+
+    bool shards_was_valid = false;
+    if (shards_valid()) {
+      shards_was_valid = true;
+      r = shards_prev();
+    } else {
+      r = shards_seek_to_last();
+    }
+    if (r != 0)
+      return r;
+
+    if (!main->valid() && !shards_valid()) {
+      //end, no previous. set marker so valid() can work
+      smaller = on_main;
+      return 0;
+    }
+
+    //if 1 is valid, select it
+    //if 2 are valid select larger and advance the other
+    if (main->valid()) {
+      if (shards_valid()) {
+       if (is_main_smaller()) {
+         smaller = on_shard;
+         if (main_was_valid) {
+           if (main->valid()) {
+             r = main->next();
+           } else {
+             r = main->seek_to_first();
+           }
+         } else {
+           //if we have resurrected main, kill it
+           if (main->valid()) {
+             main->next();
+           }
+         }
+       } else {
+         smaller = on_main;
+         if (shards_was_valid) {
+           if (shards_valid()) {
+             r = shards_next();
+           } else {
+             r = shards_seek_to_first();
+           }
+         } else {
+           //if we have resurected shards, kill it
+           if (shards_valid()) {
+             shards_next();
+           }
+         }
+       }
+      } else {
+       smaller = on_main;
+       r = shards_seek_to_first();
+      }
+    } else {
+      smaller = on_shard;
+      r = main->seek_to_first();
+    }
+    return r;
+  }
+
+  std::string key() override
+  {
+    if (smaller == on_main) {
+      return main->key();
+    } else {
+      return current_shard->second->key();
+    }
+  }
+
+  std::pair<std::string,std::string> raw_key() override
+  {
+    if (smaller == on_main) {
+      return main->raw_key();
+    } else {
+      return { current_shard->first, current_shard->second->key() };
+    }
+  }
+
+  bool raw_key_is_prefixed(const std::string &prefix) override
+  {
+    if (smaller == on_main) {
+      return main->raw_key_is_prefixed(prefix);
+    } else {
+      return current_shard->first == prefix;
+    }
+  }
+
+  ceph::buffer::list value() override
+  {
+    if (smaller == on_main) {
+      return main->value();
+    } else {
+      return current_shard->second->value();
+    }
+  }
+
+  int status() override
+  {
+    //because we already had to inspect key, it must be ok
+    return 0;
+  }
+
+  size_t key_size() override
+  {
+    if (smaller == on_main) {
+      return main->key_size();
+    } else {
+      return current_shard->second->key().size();
+    }
+  }
+  size_t value_size() override
+  {
+    if (smaller == on_main) {
+      return main->value_size();
+    } else {
+      return current_shard->second->value().length();
+    }
+  }
+
+  int shards_valid() {
+    if (current_shard == shards.end())
+      return false;
+    return current_shard->second->valid();
+  }
+
+  int shards_next() {
+    if (current_shard == shards.end()) {
+      //illegal to next() on !valid()
+      return -1;
+    }
+    int r = 0;
+    r = current_shard->second->next();
+    if (r != 0)
+      return r;
+    if (current_shard->second->valid())
+      return 0;
+    //current shard exhaused, search for key
+    ++current_shard;
+    while (current_shard != shards.end()) {
+      r = current_shard->second->seek_to_first();
+      if (r != 0)
+       return r;
+      if (current_shard->second->valid())
+       break;
+      ++current_shard;
+    }
+    //either we found key or not, but it is success
+    return 0;
+  }
+
+  int shards_prev() {
+    if (current_shard == shards.end()) {
+      //illegal to prev() on !valid()
+      return -1;
+    }
+    int r = current_shard->second->prev();
+    while (r == 0) {
+      if (current_shard->second->valid()) {
+       break;
+      }
+      if (current_shard == shards.begin()) {
+       //we have reached pre-first element
+       //this makes it !valid(), but guarantees next() moves to first element
+       break;
+      }
+      --current_shard;
+      r = current_shard->second->seek_to_last();
+    }
+    return r;
+  }
+
+  int shards_seek_to_last() {
+    int r = 0;
+    current_shard = shards.end();
+    if (current_shard == shards.begin()) {
+      //no shards at all
+      return 0;
+    }
+    while (current_shard != shards.begin()) {
+      --current_shard;
+      r = current_shard->second->seek_to_last();
+      if (r != 0)
+       return r;
+      if (current_shard->second->valid()) {
+       return 0;
+      }
+    }
+    //no keys at all
+    current_shard = shards.end();
+    return r;
+  }
+
+  int shards_seek_to_first() {
+    int r = 0;
+    current_shard = shards.begin();
+    while (current_shard != shards.end()) {
+      r = current_shard->second->seek_to_first();
+      if (r != 0)
+       break;
+      if (current_shard->second->valid()) {
+       //this is the first shard that will yield some keys
+       break;
+      }
+      ++current_shard;
+    }
+    return r;
+  }
+};
+
 class ShardMergeIteratorImpl : public KeyValueDB::IteratorImpl {
 private:
   struct KeyLess {
@@ -2209,3 +2648,19 @@ rocksdb::Iterator* RocksDBStore::new_shard_iterator(rocksdb::ColumnFamilyHandle*
 {
   return db->NewIterator(rocksdb::ReadOptions(), cf);
 }
+
+RocksDBStore::WholeSpaceIterator RocksDBStore::get_wholespace_iterator()
+{
+  if (cf_handles.size() == 0) {
+    return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
+      db->NewIterator(rocksdb::ReadOptions(), default_cf));
+  } else {
+    return std::make_shared<WholeMergeIteratorImpl>(this);
+  }
+}
+
+RocksDBStore::WholeSpaceIterator RocksDBStore::get_default_cf_iterator()
+{
+  return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
+    db->NewIterator(rocksdb::ReadOptions(), default_cf));
+}
index ca36bdb7df19822044b8980c0de947f8266135ed..fe749145b32df039112ac0d8dd404904bdeebf86 100644 (file)
@@ -86,6 +86,7 @@ class RocksDBStore : public KeyValueDB {
   uint64_t cache_size = 0;
   bool set_cache_flag = false;
   friend class ShardMergeIteratorImpl;
+  friend class WholeMergeIteratorImpl;
   /*
    *  See RocksDB's definition of a column family(CF) and how to use it.
    *  The interfaces of KeyValueDB is extended, when a column family is created.
@@ -565,8 +566,8 @@ err:
   }
 
   WholeSpaceIterator get_wholespace_iterator() override;
+private:
+  WholeSpaceIterator get_default_cf_iterator();
 };
 
-
-
 #endif
index b011c0efce4586bdec16929c298880d10f2d012c..24053c7fbf8ed7aa5dd9ff329e974f993ce3ac79 100644 (file)
@@ -665,11 +665,341 @@ TEST_P(KVTest, RocksDB_parse_sharding_def) {
 }
 
 
+
+class RocksDBShardingTest : public ::testing::TestWithParam<const char*> {
+public:
+  boost::scoped_ptr<KeyValueDB> db;
+
+  RocksDBShardingTest() : db(0) {}
+
+  string _bl_to_str(bufferlist val) {
+    string str(val.c_str(), val.length());
+    return str;
+  }
+
+  void rm_r(string path) {
+    string cmd = string("rm -r ") + path;
+    if (verbose)
+      cout << "==> " << cmd << std::endl;
+    int r = ::system(cmd.c_str());
+    if (r) {
+      cerr << "failed with exit code " << r
+          << ", continuing anyway" << std::endl;
+    }
+  }
+
+  void SetUp() override {
+    verbose = getenv("VERBOSE") && strcmp(getenv("VERBOSE"), "1") == 0;
+
+    int r = ::mkdir("kv_test_temp_dir", 0777);
+    if (r < 0 && errno != EEXIST) {
+      r = -errno;
+      cerr << __func__ << ": unable to create kv_test_temp_dir: "
+          << cpp_strerror(r) << std::endl;
+      return;
+    }
+    db.reset(KeyValueDB::create(g_ceph_context, "rocksdb",
+                               "kv_test_temp_dir"));
+    ASSERT_EQ(0, db->init(g_conf()->bluestore_rocksdb_options));
+    if (verbose)
+      cout << "Creating database with sharding: " << GetParam() << std::endl;
+    ASSERT_EQ(0, db->create_and_open(cout, GetParam()));
+  }
+  void TearDown() override {
+    db.reset(nullptr);
+    rm_r("kv_test_temp_dir");
+  }
+
+  /*
+    A - main 0/1/20
+    B - shard 1/3 x 0/1/20
+    C - main 0/1/20
+    D - shard 1/3 x 0/1/20
+    E - main 0/1/20
+  */
+  bool verbose;
+  std::vector<std::string> sharding_defs = {
+    "Betelgeuse D",
+    "Betelgeuse(3) D",
+    "Betelgeuse D(3)",
+    "Betelgeuse(3) D(3)"};
+  std::vector<std::string> prefixes = {"Ad", "Betelgeuse", "C", "D", "Evade"};
+  std::vector<std::string> randoms = {"0", "1", "2", "3", "4", "5",
+                                     "found", "brain", "fully", "pen", "worth", "race",
+                                     "stand", "nodded", "whenever", "surrounded", "industrial", "skin",
+                                     "this", "direction", "family", "beginning", "whenever", "held",
+                                     "metal", "year", "like", "valuable", "softly", "whistle",
+                                     "perfectly", "broken", "idea", "also", "coffee", "branch",
+                                     "tongue", "immediately", "bent", "partly", "burn", "include",
+                                     "certain", "burst", "final", "smoke", "positive", "perfectly"
+  };
+  int R = randoms.size();
+
+  typedef int test_id[6];
+  void zero(test_id& x) {
+    k = 0;
+    v = 0;
+    for (auto& i:x)
+      i = 0;
+  }
+  bool end(const test_id& x) {
+    return x[5] != 0;
+  }
+  void next(test_id& x) {
+    x[0]++;
+    for (int i = 0; i < 5; i++) {
+      if (x[i] == 3) {
+       x[i] = 0;
+       ++x[i + 1];
+      }
+    }
+  }
+
+  std::map<std::string, std::string> data;
+  int k = 0;
+  int v = 0;
+
+  void generate_data(const test_id& x) {
+    data.clear();
+    for (int i = 0; i < 5; i++) {
+      if (verbose)
+       std::cout << x[i] << "-";
+      switch (x[i]) {
+      case 0:
+       break;
+      case 1:
+       data[RocksDBStore::combine_strings(prefixes[i], randoms[k++ % R])] = randoms[v++ % R];
+       break;
+      case 2:
+       std::string base = randoms[k++ % R];
+       for (int j = 0; j < 10; j++) {
+         data[RocksDBStore::combine_strings(prefixes[i], base + "." + randoms[k++ % R])] = randoms[v++ % R];
+       }
+       break;
+      }
+    }
+  }
+
+  void data_to_db() {
+    KeyValueDB::Transaction t = db->get_transaction();
+    for (auto &d : data) {
+      bufferlist v1;
+      v1.append(d.second);
+      string prefix;
+      string key;
+      RocksDBStore::split_key(d.first, &prefix, &key);
+      t->set(prefix, key, v1);
+      if (verbose)
+       std::cout << "SET " << prefix << " " << key << std::endl;
+    }
+    ASSERT_EQ(db->submit_transaction_sync(t), 0);
+  }
+
+  void clear_db() {
+    KeyValueDB::Transaction t = db->get_transaction();
+    for (auto &d : data) {
+      string prefix;
+      string key;
+      RocksDBStore::split_key(d.first, &prefix, &key);
+      t->rmkey(prefix, key);
+    }
+    ASSERT_EQ(db->submit_transaction_sync(t), 0);
+    //paranoid, check if db empty
+    KeyValueDB::WholeSpaceIterator it = db->get_wholespace_iterator();
+    ASSERT_EQ(it->seek_to_first(), 0);
+    ASSERT_EQ(it->valid(), false);
+  }
+};
+
+TEST_P(RocksDBShardingTest, wholespace_next) {
+  test_id X;
+  zero(X);
+  do {
+    generate_data(X);
+    data_to_db();
+
+    KeyValueDB::WholeSpaceIterator it = db->get_wholespace_iterator();
+    //move forward
+    auto dit = data.begin();
+    int r = it->seek_to_first();
+    ASSERT_EQ(r, 0);
+    ASSERT_EQ(it->valid(), (dit != data.end()));
+
+    while (dit != data.end()) {
+      ASSERT_EQ(it->valid(), true);
+      string prefix;
+      string key;
+      RocksDBStore::split_key(dit->first, &prefix, &key);
+      auto raw_key = it->raw_key();
+      ASSERT_EQ(raw_key.first, prefix);
+      ASSERT_EQ(raw_key.second, key);
+      ASSERT_EQ(it->value().to_str(), dit->second);
+      if (verbose)
+       std::cout << "next " << prefix << " " << key << std::endl;
+      ASSERT_EQ(it->next(), 0);
+      ++dit;
+    }
+    ASSERT_EQ(it->valid(), false);
+
+    clear_db();
+    next(X);
+  } while (!end(X));
+}
+
+TEST_P(RocksDBShardingTest, wholespace_prev) {
+  test_id X;
+  zero(X);
+  do {
+    generate_data(X);
+    data_to_db();
+
+    KeyValueDB::WholeSpaceIterator it = db->get_wholespace_iterator();
+    auto dit = data.rbegin();
+    int r = it->seek_to_last();
+    ASSERT_EQ(r, 0);
+    ASSERT_EQ(it->valid(), (dit != data.rend()));
+
+    while (dit != data.rend()) {
+      ASSERT_EQ(it->valid(), true);
+      string prefix;
+      string key;
+      RocksDBStore::split_key(dit->first, &prefix, &key);
+      auto raw_key = it->raw_key();
+      ASSERT_EQ(raw_key.first, prefix);
+      ASSERT_EQ(raw_key.second, key);
+      ASSERT_EQ(it->value().to_str(), dit->second);
+      if (verbose)
+       std::cout << "prev " << prefix << " " << key << std::endl;
+      ASSERT_EQ(it->prev(), 0);
+      ++dit;
+    }
+    ASSERT_EQ(it->valid(), false);
+
+    clear_db();
+    next(X);
+  } while (!end(X));
+}
+
+TEST_P(RocksDBShardingTest, wholespace_lower_bound) {
+  test_id X;
+  zero(X);
+  do {
+    generate_data(X);
+    data_to_db();
+
+    KeyValueDB::WholeSpaceIterator it = db->get_wholespace_iterator();
+    auto dit = data.begin();
+    int r = it->seek_to_first();
+    ASSERT_EQ(r, 0);
+    ASSERT_EQ(it->valid(), (dit != data.end()));
+
+    while (dit != data.end()) {
+      ASSERT_EQ(it->valid(), true);
+      string prefix;
+      string key;
+      RocksDBStore::split_key(dit->first, &prefix, &key);
+      KeyValueDB::WholeSpaceIterator it1 = db->get_wholespace_iterator();
+      ASSERT_EQ(it1->lower_bound(prefix, key), 0);
+      ASSERT_EQ(it1->valid(), true);
+      auto raw_key = it1->raw_key();
+      ASSERT_EQ(raw_key.first, prefix);
+      ASSERT_EQ(raw_key.second, key);
+      if (verbose)
+       std::cout << "lower_bound " << prefix << " " << key << std::endl;
+      ASSERT_EQ(it->next(), 0);
+      ++dit;
+    }
+    ASSERT_EQ(it->valid(), false);
+
+    clear_db();
+    next(X);
+  } while (!end(X));
+}
+
+TEST_P(RocksDBShardingTest, wholespace_upper_bound) {
+  test_id X;
+  zero(X);
+  do {
+    generate_data(X);
+    data_to_db();
+
+    KeyValueDB::WholeSpaceIterator it = db->get_wholespace_iterator();
+    auto dit = data.begin();
+    int r = it->seek_to_first();
+    ASSERT_EQ(r, 0);
+    ASSERT_EQ(it->valid(), (dit != data.end()));
+
+    while (dit != data.end()) {
+      ASSERT_EQ(it->valid(), true);
+      string prefix;
+      string key;
+      string key_minus_1;
+      RocksDBStore::split_key(dit->first, &prefix, &key);
+      //decrement key minimally
+      key_minus_1 = key.substr(0, key.length() - 1) + std::string(1, key[key.length() - 1] - 1);
+      KeyValueDB::WholeSpaceIterator it1 = db->get_wholespace_iterator();
+      ASSERT_EQ(it1->upper_bound(prefix, key_minus_1), 0);
+      ASSERT_EQ(it1->valid(), true);
+      auto raw_key = it1->raw_key();
+      ASSERT_EQ(raw_key.first, prefix);
+      ASSERT_EQ(raw_key.second, key);
+      if (verbose)
+       std::cout << "upper_bound " << prefix << " " << key_minus_1 << std::endl;
+      ASSERT_EQ(it->next(), 0);
+      ++dit;
+    }
+    ASSERT_EQ(it->valid(), false);
+
+    clear_db();
+    next(X);
+  } while (!end(X));
+}
+
+TEST_P(RocksDBShardingTest, wholespace_lookup_limits) {
+  test_id X;
+  zero(X);
+  do {
+    generate_data(X);
+    data_to_db();
+
+    //lookup before first
+    if (data.size() > 0) {
+      auto dit = data.begin();
+      string prefix;
+      string key;
+      RocksDBStore::split_key(dit->first, &prefix, &key);
+      KeyValueDB::WholeSpaceIterator it1 = db->get_wholespace_iterator();
+      ASSERT_EQ(it1->lower_bound(" ", " "), 0);
+      ASSERT_EQ(it1->valid(), true);
+      auto raw_key = it1->raw_key();
+      ASSERT_EQ(raw_key.first, prefix);
+      ASSERT_EQ(raw_key.second, key);
+    }
+    //lookup after last
+    KeyValueDB::WholeSpaceIterator it1 = db->get_wholespace_iterator();
+    ASSERT_EQ(it1->lower_bound("~", "~"), 0);
+    ASSERT_EQ(it1->valid(), false);
+
+    clear_db();
+    next(X);
+  } while (!end(X));
+}
+
+
+
 INSTANTIATE_TEST_SUITE_P(
   KeyValueDB,
   KVTest,
   ::testing::Values("leveldb", "rocksdb", "memdb"));
 
+INSTANTIATE_TEST_SUITE_P(
+  KeyValueDB,
+  RocksDBShardingTest,
+  ::testing::Values("Betelgeuse D",
+                   "Betelgeuse(3) D",
+                   "Betelgeuse D(3)",
+                   "Betelgeuse(3) D(3)"));
+
 int main(int argc, char **argv) {
   vector<const char*> args;
   argv_to_vec(argc, (const char **)argv, args);