]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
kv/RocksDBStore: add sharding to column families.
authorAdam Kupczyk <akupczyk@redhat.com>
Tue, 4 Feb 2020 14:10:08 +0000 (15:10 +0100)
committerAdam Kupczyk <akupczyk@redhat.com>
Fri, 17 Apr 2020 06:55:54 +0000 (08:55 +0200)
Column families (defined by prefix) can now be split into multiple shards.
Each shard is a separate column family.

Signed-off-by: Adam Kupczyk <akupczyk@redhat.com>
src/common/options.cc
src/kv/KeyValueDB.h
src/kv/LevelDBStore.cc
src/kv/LevelDBStore.h
src/kv/MemDB.cc
src/kv/MemDB.h
src/kv/RocksDBStore.cc
src/kv/RocksDBStore.h
src/os/bluestore/BlueStore.cc
src/test/ObjectMap/KeyValueDBMemory.h
src/test/objectstore/test_kv.cc

index 330f165328463b8e5b20fbe93d50e2f35e9dcfff..380dd16402766014184021527a702658fd80f0ec 100644 (file)
@@ -4417,8 +4417,12 @@ std::vector<Option> get_global_options() {
     .set_description("Enable use of rocksdb column families for bluestore metadata"),
 
     Option("bluestore_rocksdb_cfs", Option::TYPE_STR, Option::LEVEL_DEV)
-    .set_default("M= P= L=")
-    .set_description("List of whitespace-separate key/value pairs where key is CF name and value is CF options"),
+    .set_default("m(5,0-8) O(7,0-13) L")
+    .set_description("Definition of column families and their sharding")
+    .set_long_description("Space separated list of elements: column_def [ '=' rocksdb_options ]. "
+                         "column_def := column_name [ '(' shard_count [ ',' hash_begin '-' [ hash_end ] ] ')' ]. "
+                         "Example: 'I=write_buffer_size=1048576 O(6) m(7,10-)'. "
+                         "Interval [hash_begin..hash_end) defines characters to use for hash calculation."),
 
     Option("bluestore_fsck_on_mount", Option::TYPE_BOOL, Option::LEVEL_DEV)
     .set_default(false)
index cdf10d49b02d9dbda85575aa11f491d290daa02b..ad93631121accc64212dd54c2cf1e3c39f1bd413 100644 (file)
  */
 class KeyValueDB {
 public:
-  /*
-   *  See RocksDB's definition of a column family(CF) and how to use it.
-   *  The interfaces of KeyValueDB is extended, when a column family is created.
-   *  Prefix will be the name of column family to use.
-   */
-  struct ColumnFamily {
-    std::string name;      //< name of this individual column family
-    std::string option;    //< configure option string for this CF
-    ColumnFamily(const std::string &name, const std::string &option)
-      : name(name), option(option) {}
-  };
-
   class TransactionImpl {
   public:
     /// Set Keys
@@ -155,12 +143,11 @@ public:
   /// test whether we can successfully initialize; may have side effects (e.g., create)
   static int test_init(const std::string& type, const std::string& dir);
   virtual int init(std::string option_str="") = 0;
-  virtual int open(std::ostream &out, const std::vector<ColumnFamily>& cfs = {}) = 0;
+  virtual int open(std::ostream &out, const std::string& cfs="") = 0;
   // std::vector cfs contains column families to be created when db is created.
-  virtual int create_and_open(std::ostream &out,
-                             const std::vector<ColumnFamily>& cfs = {}) = 0;
+  virtual int create_and_open(std::ostream &out, const std::string& cfs="") = 0;
 
-  virtual int open_read_only(std::ostream &out, const std::vector<ColumnFamily>& cfs = {}) {
+  virtual int open_read_only(std::ostream &out, const std::string& cfs="") {
     return -ENOTSUP;
   }
 
@@ -333,14 +320,6 @@ public:
       get_wholespace_iterator());
   }
 
-  void add_column_family(const std::string& cf_name, void *handle) {
-    cf_handles.insert(std::make_pair(cf_name, handle));
-  }
-
-  bool is_column_family(const std::string& prefix) {
-    return cf_handles.count(prefix);
-  }
-
   virtual uint64_t get_estimated_size(std::map<std::string,uint64_t> &extra) = 0;
   virtual int get_statfs(struct store_statfs_t *buf) {
     return -EOPNOTSUPP;
@@ -438,8 +417,6 @@ protected:
   std::vector<std::pair<std::string,
                        std::shared_ptr<MergeOperator> > > merge_ops;
 
-  /// column families in use, name->handle
-  std::unordered_map<std::string, void *> cf_handles;
 };
 
 #endif
index 89cd8b00c912b6a3f5dd0f9b73d25f3687aac0b7..8c7d8ce6087c985e4ea5d7ad208bec517957ea44 100644 (file)
@@ -66,14 +66,14 @@ int LevelDBStore::init(string option_str)
   return 0;
 }
 
-int LevelDBStore::open(ostream &out, const vector<ColumnFamily>& cfs)  {
+int LevelDBStore::open(ostream &out, const std::string& cfs)  {
   if (!cfs.empty()) {
     ceph_abort_msg("Not implemented");
   }
   return do_open(out, false);
 }
 
-int LevelDBStore::create_and_open(ostream &out, const vector<ColumnFamily>& cfs) {
+int LevelDBStore::create_and_open(ostream &out, const std::string& cfs) {
   if (!cfs.empty()) {
     ceph_abort_msg("Not implemented");
   }
index 44ec2f6a0a5c25fcc0dda3c135f388128b0cb5b9..7876904e82db62cf0b3d0f0ef612053628274750 100644 (file)
@@ -177,9 +177,9 @@ public:
   int init(std::string option_str="") override;
 
   /// Opens underlying db
-  int open(std::ostream &out, const std::vector<ColumnFamily>& = {}) override;
+  int open(std::ostream &out, const std::string& cfs="") override;
   /// Creates underlying db if missing and opens it
-  int create_and_open(std::ostream &out, const std::vector<ColumnFamily>& = {}) override;
+  int create_and_open(std::ostream &out, const std::string& cfs="") override;
 
   void close() override;
 
index 9d16b344816a0d98ebe612b4830ef748c45ff1e7..1f5740a34a592e5dada24eedf6c8e8577b04528b 100644 (file)
@@ -187,14 +187,14 @@ int MemDB::do_open(ostream &out, bool create)
   return _init(create);
 }
 
-int MemDB::open(ostream &out, const vector<ColumnFamily>& cfs) {
+int MemDB::open(ostream &out, const std::string& cfs) {
   if (!cfs.empty()) {
     ceph_abort_msg("Not implemented");
   }
   return do_open(out, false);
 }
 
-int MemDB::create_and_open(ostream &out, const vector<ColumnFamily>& cfs) {
+int MemDB::create_and_open(ostream &out, const std::string& cfs) {
   if (!cfs.empty()) {
     ceph_abort_msg("Not implemented");
   }
index 08ce1684daa1b430fc7ffcf5569501def1efab7f..71558ad0c8b4ff3d40e68ebbfa06d59cf69eb476 100644 (file)
@@ -132,8 +132,8 @@ public:
   int _init(bool format);
 
   int do_open(std::ostream &out, bool create);
-  int open(std::ostream &out, const std::vector<ColumnFamily>&) override;
-  int create_and_open(std::ostream &out, const std::vector<ColumnFamily>&) override;
+  int open(std::ostream &out, const std::string& cfs="") override;
+  int create_and_open(std::ostream &out, const std::string& cfs="") override;
   using KeyValueDB::create_and_open;
 
   KeyValueDB::Transaction get_transaction() override {
index 500df339ea9617add4cf8331ce078690872b86d9..01aab1bdf33eff14eaf101d0aa02fd1fb8f5e1f0 100644 (file)
@@ -49,6 +49,8 @@ using ceph::bufferlist;
 using ceph::bufferptr;
 using ceph::Formatter;
 
+static const char* sharding_def_file = "sharding/def";
+
 static bufferlist to_bufferlist(rocksdb::Slice in) {
   bufferlist bl;
   bl.append(bufferptr(in.data(), in.size()));
@@ -331,13 +333,13 @@ int RocksDBStore::create_db_dir()
 }
 
 int RocksDBStore::install_cf_mergeop(
-  const string &cf_name,
+  const string &key_prefix,
   rocksdb::ColumnFamilyOptions *cf_opt)
 {
   ceph_assert(cf_opt != nullptr);
   cf_opt->merge_operator.reset();
   for (auto& i : merge_ops) {
-    if (i.first == cf_name) {
+    if (i.first == key_prefix) {
       cf_opt->merge_operator.reset(new MergeOperatorLinker(i.second));
     }
   }
@@ -345,16 +347,12 @@ int RocksDBStore::install_cf_mergeop(
 }
 
 int RocksDBStore::create_and_open(ostream &out,
-                                 const vector<ColumnFamily>& cfs)
+                                 const std::string& cfs)
 {
   int r = create_db_dir();
   if (r < 0)
     return r;
-  if (cfs.empty()) {
-    return do_open(out, true, false, nullptr);
-  } else {
-    return do_open(out, true, false, &cfs);
-  }
+  return do_open(out, true, false, cfs);
 }
 
 int RocksDBStore::load_rocksdb_options(bool create_if_missing, rocksdb::Options& opt)
@@ -495,14 +493,234 @@ int RocksDBStore::load_rocksdb_options(bool create_if_missing, rocksdb::Options&
           << dendl;
 
   opt.merge_operator.reset(new MergeOperatorRouter(*this));
+  comparator = opt.comparator;
+  return 0;
+}
+
+void RocksDBStore::add_column_family(const std::string& cf_name, uint32_t hash_l, uint32_t hash_h,
+                                    size_t shard_idx, rocksdb::ColumnFamilyHandle *handle) {
+  dout(10) << __func__ << " column_name=" << cf_name << " shard_idx=" << shard_idx <<
+    " hash_l=" << hash_l << " hash_h=" << hash_h << " handle=" << (void*) handle << dendl;
+  bool exists = cf_handles.count(cf_name) > 0;
+  auto& column = cf_handles[cf_name];
+  if (exists) {
+    ceph_assert(hash_l == column.hash_l);
+    ceph_assert(hash_h == column.hash_h);
+  } else {
+    ceph_assert(hash_l < hash_h);
+    column.hash_l = hash_l;
+    column.hash_h = hash_h;
+  }
+  if (column.handles.size() <= shard_idx)
+    column.handles.resize(shard_idx + 1);
+  column.handles[shard_idx] = handle;
+}
+
+bool RocksDBStore::is_column_family(const std::string& prefix) {
+  return cf_handles.count(prefix);
+}
 
+rocksdb::ColumnFamilyHandle *RocksDBStore::get_cf_handle(const std::string& prefix, const std::string& key) {
+  auto iter = cf_handles.find(prefix);
+  if (iter == cf_handles.end())
+    return nullptr;
+  else {
+    if (iter->second.handles.size() == 1) {
+      return iter->second.handles[0];
+    } else {
+      uint32_t hash_l = std::min<uint32_t>(iter->second.hash_l, key.size());
+      uint32_t hash_h = std::min<uint32_t>(iter->second.hash_h, key.size());
+      uint32_t hash = ceph_str_hash_rjenkins(&key[hash_l], hash_h - hash_l);
+      return iter->second.handles[hash % iter->second.handles.size()];
+    }
+  }
+}
+
+rocksdb::ColumnFamilyHandle *RocksDBStore::get_cf_handle(const std::string& prefix, const char* key, size_t keylen) {
+  auto iter = cf_handles.find(prefix);
+  if (iter == cf_handles.end())
+    return nullptr;
+  else {
+    if (iter->second.handles.size() == 1) {
+      return iter->second.handles[0];
+    } else {
+      uint32_t hash_l = std::min<uint32_t>(iter->second.hash_l, keylen);
+      uint32_t hash_h = std::min<uint32_t>(iter->second.hash_h, keylen);
+      uint32_t hash = ceph_str_hash_rjenkins(&key[hash_l], hash_h - hash_l);
+      return iter->second.handles[hash % iter->second.handles.size()];
+    }
+  }
+}
+
+/**
+ * Definition of sharding:
+ * space-separated list of: column_def [ '=' options ]
+ * column_def := column_name '(' shard_count ')'
+ * column_def := column_name '(' shard_count ',' hash_begin '-' ')'
+ * column_def := column_name '(' shard_count ',' hash_begin '-' hash_end ')'
+ * I=write_buffer_size=1048576 O(6) m(7,10-) prefix(4,0-10)=disable_auto_compactions=true,max_bytes_for_level_base=1048576
+ */
+bool RocksDBStore::parse_sharding_def(const std::string_view text_def_in,
+                                    std::vector<ColumnFamily>& sharding_def,
+                                    char const* *error_position,
+                                    std::string *error_msg)
+{
+  std::string_view text_def = text_def_in;
+  char const* error_position_local = nullptr;
+  std::string error_msg_local;
+  if (error_position == nullptr) {
+    error_position = &error_position_local;
+    *error_position = nullptr;
+  }
+  if (error_msg == nullptr) {
+    error_msg = &error_msg_local;
+    error_msg->clear();
+  }
+
+  sharding_def.clear();
+  while (!text_def.empty()) {
+    std::string_view options;
+    std::string_view name;
+    size_t shard_cnt = 1;
+    uint32_t l_bound = 0;
+    uint32_t h_bound = std::numeric_limits<uint32_t>::max();
+
+    std::string_view column_def;
+    size_t spos = text_def.find(' ');
+    if (spos == std::string_view::npos) {
+      column_def = text_def;
+      text_def = std::string_view(text_def.end(), 0);
+    } else {
+      column_def = text_def.substr(0, spos);
+      text_def = text_def.substr(spos + 1);
+    }
+    size_t eqpos = column_def.find('=');
+    if (eqpos != std::string_view::npos) {
+      options = column_def.substr(eqpos + 1);
+      column_def = column_def.substr(0, eqpos);
+    }
+
+    std::string_view shards_def;
+    size_t bpos = column_def.find('(');
+    if (bpos != std::string_view::npos) {
+      name = column_def.substr(0, bpos);
+      const char* nptr = &column_def[bpos + 1];
+      char* endptr;
+      shard_cnt = strtol(nptr, &endptr, 10);
+      if (nptr == endptr) {
+       *error_position = nptr;
+       *error_msg = "expecting integer";
+       break;
+      }
+      nptr = endptr;
+      if (*nptr == ',') {
+       nptr++;
+       l_bound = strtol(nptr, &endptr, 10);
+       if (nptr == endptr) {
+         *error_position = nptr;
+         *error_msg = "expecting integer";
+         break;
+       }
+       nptr = endptr;
+       if (*nptr != '-') {
+         *error_position = nptr;
+         *error_msg = "expecting '-'";
+         break;
+       }
+       nptr++;
+       h_bound = strtol(nptr, &endptr, 10);
+       if (nptr == endptr) {
+         h_bound = std::numeric_limits<uint32_t>::max();
+       }
+       nptr = endptr;
+      }
+      if (*nptr != ')') {
+       *error_position = nptr;
+       *error_msg = "expecting ')'";
+       break;
+      }
+    } else {
+      name = column_def;
+    }
+    sharding_def.emplace_back(std::string(name), shard_cnt, std::string(options), l_bound, h_bound);
+  }
+  return *error_position == nullptr;
+}
+
+void RocksDBStore::sharding_def_to_columns(const std::vector<ColumnFamily>& sharding_def,
+                                         std::vector<std::string>& columns)
+{
+  columns.clear();
+  for (size_t i = 0; i < sharding_def.size(); i++) {
+    if (sharding_def[i].shard_cnt == 1) {
+       columns.push_back(sharding_def[i].name);
+    } else {
+      for (size_t j = 0; j < sharding_def[i].shard_cnt; j++) {
+       columns.push_back(sharding_def[i].name + "-" + to_string(j));
+      }
+    }
+  }
+}
+
+int RocksDBStore::create_shards(const rocksdb::Options& opt,
+                               const vector<ColumnFamily>& sharding_def)
+{
+  for (auto& p : sharding_def) {
+    // copy default CF settings, block cache, merge operators as
+    // the base for new CF
+    rocksdb::ColumnFamilyOptions cf_opt(opt);
+    // user input options will override the base options
+    rocksdb::Status status;
+    status = rocksdb::GetColumnFamilyOptionsFromString(
+                                                      cf_opt, p.options, &cf_opt);
+    if (!status.ok()) {
+      derr << __func__ << " invalid db column family option string for CF: "
+          << p.name << dendl;
+      return -EINVAL;
+    }
+    install_cf_mergeop(p.name, &cf_opt);
+    for (size_t idx = 0; idx < p.shard_cnt; idx++) {
+      std::string cf_name;
+      if (p.shard_cnt == 1)
+       cf_name = p.name;
+      else
+       cf_name = p.name + "-" + to_string(idx);
+      rocksdb::ColumnFamilyHandle *cf;
+      status = db->CreateColumnFamily(cf_opt, cf_name, &cf);
+      if (!status.ok()) {
+       derr << __func__ << " Failed to create rocksdb column family: "
+            << cf_name << dendl;
+       return -EINVAL;
+      }
+      // store the new CF handle
+      add_column_family(p.name, p.hash_l, p.hash_h, idx, cf);
+    }
+  }
   return 0;
 }
 
+std::ostream& operator<<(std::ostream& out, const RocksDBStore::ColumnFamily& cf)
+{
+  out << "(";
+  out << cf.name;
+  out << ",";
+  out << cf.shard_cnt;
+  out << ",";
+  out << cf.hash_l;
+  out << "-";
+  if (cf.hash_h != std::numeric_limits<uint32_t>::max()) {
+    out << cf.hash_h;
+  }
+  out << ",";
+  out << cf.options;
+  out << ")";
+  return out;
+}
+
 int RocksDBStore::do_open(ostream &out,
                          bool create_if_missing,
                          bool open_readonly,
-                         const vector<ColumnFamily>* cfs)
+                         const std::string& sharding_text)
 {
   ceph_assert(!(create_if_missing && open_readonly));
   rocksdb::Options opt;
@@ -519,40 +737,99 @@ int RocksDBStore::do_open(ostream &out,
       return -EINVAL;
     }
     // create and open column families
-    if (cfs) {
-      for (auto& p : *cfs) {
-       // copy default CF settings, block cache, merge operators as
-       // the base for new CF
-       rocksdb::ColumnFamilyOptions cf_opt(opt);
-       // user input options will override the base options
-       status = rocksdb::GetColumnFamilyOptionsFromString(
-         cf_opt, p.option, &cf_opt);
-       if (!status.ok()) {
-         derr << __func__ << " invalid db column family option string for CF: "
-              << p.name << dendl;
-         return -EINVAL;
-       }
-       install_cf_mergeop(p.name, &cf_opt);
-       rocksdb::ColumnFamilyHandle *cf;
-       status = db->CreateColumnFamily(cf_opt, p.name, &cf);
-       if (!status.ok()) {
-         derr << __func__ << " Failed to create rocksdb column family: "
-              << p.name << dendl;
-         return -EINVAL;
-       }
-       // store the new CF handle
-       add_column_family(p.name, static_cast<void*>(cf));
+    if (!sharding_text.empty()) {
+      bool b;
+      std::vector<ColumnFamily> sharding_def;
+      b = parse_sharding_def(sharding_text, sharding_def);
+      if (!b) {
+       dout(1) << __func__ << " bad sharding: " << sharding_text << dendl;
+       return -EINVAL;
+      }
+      r = create_shards(opt, sharding_def);
+      if (r != 0 ) {
+       return r;
+      }
+      opt.env->CreateDir("sharding");
+      status = rocksdb::WriteStringToFile(opt.env, sharding_text,
+                                         sharding_def_file, true);
+      if (!status.ok()) {
+       derr << __func__ << " cannot write to " << sharding_def_file << dendl;
+       return -EIO;
       }
+    } else {
+      status = opt.env->DeleteFile(sharding_def_file);
     }
     default_cf = db->DefaultColumnFamily();
   } else {
+    std::string stored_sharding_text;
+    status = opt.env->FileExists(sharding_def_file);
+    if (status.ok()) {
+      status = rocksdb::ReadFileToString(opt.env,
+                                        sharding_def_file,
+                                        &stored_sharding_text);
+      if(!status.ok()) {
+       derr << __func__ << " cannot read from " << sharding_def_file << dendl;
+       return -EIO;
+      }
+    } else {
+      //no "sharding_def" present
+    }
+    //check if sharding_def matches stored_sharding_def
+    std::vector<ColumnFamily> sharding_def;
+    parse_sharding_def(sharding_text, sharding_def);
+    std::vector<ColumnFamily> stored_sharding_def;
+    parse_sharding_def(stored_sharding_text, stored_sharding_def);
+
+    std::sort(sharding_def.begin(), sharding_def.end(),
+             [](ColumnFamily& a, ColumnFamily&b) {return a.name < b.name;});
+    std::sort(stored_sharding_def.begin(), stored_sharding_def.end(),
+             [](ColumnFamily& a, ColumnFamily&b) {return a.name < b.name;});
+
+    bool match = true;
+    if (sharding_def.size() != stored_sharding_def.size()) {
+      match = false;
+    } else {
+      for (size_t i = 0; i < sharding_def.size(); i++) {
+       auto& a = sharding_def[i];
+       auto& b = stored_sharding_def[i];
+       if ( (a.name != b.name) ||
+            (a.shard_cnt != b.shard_cnt) ||
+            (a.hash_l != b.hash_l) ||
+            (a.hash_h != b.hash_h) ) {
+         match = false;
+         break;
+       }
+      }
+    }
+    if (!match) {
+      derr << __func__ << " mismatch on sharding. requested = " << sharding_def
+          << " stored = " << stored_sharding_def << dendl;
+      return -EIO;
+    }
+
     std::vector<string> existing_cfs;
     status = rocksdb::DB::ListColumnFamilies(
       rocksdb::DBOptions(opt),
       path,
       &existing_cfs);
-    dout(1) << __func__ << " column families: " << existing_cfs << dendl;
-    if (existing_cfs.empty()) {
+    dout(5) << __func__ << " column families from rocksdb: " << existing_cfs << dendl;
+
+    std::sort(existing_cfs.begin(), existing_cfs.end(),
+             [&](const std::string& a, const std::string&b) {
+               return a < b;}
+             );
+
+    std::vector<std::string> columns_from_stored;
+    sharding_def_to_columns(stored_sharding_def, columns_from_stored);
+    columns_from_stored.push_back(rocksdb::kDefaultColumnFamilyName);
+    std::sort(columns_from_stored.begin(), columns_from_stored.end());
+
+    if (existing_cfs != columns_from_stored) {
+      derr << __func__ << " mismatch on sharding. rocksdb columns = " << existing_cfs
+          << " stored columns = " << columns_from_stored << dendl;
+    }
+
+    if (columns_from_stored.empty()) {
       // no column families
       if (open_readonly) {
        status = rocksdb::DB::Open(opt, path, &db);
@@ -565,37 +842,27 @@ int RocksDBStore::do_open(ostream &out,
       }
       default_cf = db->DefaultColumnFamily();
     } else {
-      // we cannot change column families for a created database.  so, map
-      // what options we are given to whatever cf's already exist.
       std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
-      for (auto& n : existing_cfs) {
-       // copy default CF settings, block cache, merge operators as
-       // the base for new CF
+      for (auto& column : stored_sharding_def) {
        rocksdb::ColumnFamilyOptions cf_opt(opt);
-       bool found = false;
-       if (cfs) {
-         for (auto& i : *cfs) {
-           if (i.name == n) {
-             found = true;
-             status = rocksdb::GetColumnFamilyOptionsFromString(
-               cf_opt, i.option, &cf_opt);
-             if (!status.ok()) {
-               derr << __func__ << " invalid db column family options for CF '"
-                    << i.name << "': " << i.option << dendl;
-               return -EINVAL;
-             }
-           }
-         }
-       }
-       if (n != rocksdb::kDefaultColumnFamilyName) {
-         install_cf_mergeop(n, &cf_opt);
+       status = rocksdb::GetColumnFamilyOptionsFromString(
+         cf_opt, column.options, &cf_opt);
+       if (!status.ok()) {
+         derr << __func__ << " invalid db column family options for CF '"
+              << column.name << "': " << column.options << dendl;
+         return -EINVAL;
        }
-       column_families.push_back(rocksdb::ColumnFamilyDescriptor(n, cf_opt));
-       if (!found && n != rocksdb::kDefaultColumnFamilyName) {
-         dout(1) << __func__ << " column family '" << n
-                 << "' exists but not expected" << dendl;
+       install_cf_mergeop(column.name, &cf_opt);
+       if (column.shard_cnt == 1) {
+         column_families.push_back(rocksdb::ColumnFamilyDescriptor(column.name, cf_opt));
+       } else {
+         for (size_t i = 0; i < column.shard_cnt; i++) {
+           std::string cf_name = column.name + "-" + to_string(i);
+           column_families.push_back(rocksdb::ColumnFamilyDescriptor(cf_name, cf_opt));
+         }
        }
       }
+      column_families.push_back(rocksdb::ColumnFamilyDescriptor("default",opt));
       std::vector<rocksdb::ColumnFamilyHandle*> handles;
       if (open_readonly) {
         status = rocksdb::DB::OpenForReadOnly(rocksdb::DBOptions(opt),
@@ -609,14 +876,17 @@ int RocksDBStore::do_open(ostream &out,
        derr << status.ToString() << dendl;
        return -EINVAL;
       }
-      for (unsigned i = 0; i < existing_cfs.size(); ++i) {
-       if (existing_cfs[i] == rocksdb::kDefaultColumnFamilyName) {
-         default_cf = handles[i];
-         must_close_default_cf = true;
-       } else {
-         add_column_family(existing_cfs[i], static_cast<void*>(handles[i]));
+
+      size_t pos = 0;
+      for (auto& column : stored_sharding_def) {
+       for (size_t i = 0; i < column.shard_cnt; i++) {
+         add_column_family(column.name, column.hash_l, column.hash_h, i, handles[pos]);
+         pos++;
        }
       }
+      ceph_assert(pos == handles.size() - 1);
+      default_cf = handles[pos];
+      must_close_default_cf = true;
     }
   }
   ceph_assert(default_cf != nullptr);
@@ -691,10 +961,11 @@ void RocksDBStore::close()
 
   // Ensure db is destroyed before dependent db_cache and filterpolicy
   for (auto& p : cf_handles) {
-    db->DestroyColumnFamilyHandle(
-      static_cast<rocksdb::ColumnFamilyHandle*>(p.second));
-    p.second = nullptr;
+    for (size_t i = 0; i < p.second.handles.size(); i++) {
+      db->DestroyColumnFamilyHandle(p.second.handles[i]);
+    }
   }
+  cf_handles.clear();
   if (must_close_default_cf) {
     db->DestroyColumnFamilyHandle(default_cf);
     must_close_default_cf = false;
@@ -741,16 +1012,20 @@ bool RocksDBStore::get_property(
 int64_t RocksDBStore::estimate_prefix_size(const string& prefix,
                                           const string& key_prefix)
 {
-  auto cf = get_cf_handle(prefix);
   uint64_t size = 0;
   uint8_t flags =
     //rocksdb::DB::INCLUDE_MEMTABLES |  // do not include memtables...
     rocksdb::DB::INCLUDE_FILES;
-  if (cf) {
-    string start = key_prefix + string(1, '\x00');
-    string limit = key_prefix + string("\xff\xff\xff\xff");
-    rocksdb::Range r(start, limit);
-    db->GetApproximateSizes(cf, &r, 1, &size, flags);
+  auto p_iter = cf_handles.find(prefix);
+  if (p_iter != cf_handles.end()) {
+    for (auto cf : p_iter->second.handles) {
+      uint64_t s = 0;
+      string start = key_prefix + string(1, '\x00');
+      string limit = key_prefix + string("\xff\xff\xff\xff");
+      rocksdb::Range r(start, limit);
+      db->GetApproximateSizes(cf, &r, 1, &s, flags);
+      size += s;
+    }
   } else {
     string start = combine_strings(prefix , key_prefix);
     string limit = combine_strings(prefix , key_prefix + "\xff\xff\xff\xff");
@@ -927,7 +1202,7 @@ void RocksDBStore::RocksDBTransactionImpl::set(
   const string &k,
   const bufferlist &to_set_bl)
 {
-  auto cf = db->get_cf_handle(prefix);
+  auto cf = db->get_cf_handle(prefix, k);
   if (cf) {
     put_bat(bat, cf, k, to_set_bl);
   } else {
@@ -941,7 +1216,7 @@ void RocksDBStore::RocksDBTransactionImpl::set(
   const char *k, size_t keylen,
   const bufferlist &to_set_bl)
 {
-  auto cf = db->get_cf_handle(prefix);
+  auto cf = db->get_cf_handle(prefix, k, keylen);
   if (cf) {
     string key(k, keylen);  // fixme?
     put_bat(bat, cf, key, to_set_bl);
@@ -955,7 +1230,7 @@ void RocksDBStore::RocksDBTransactionImpl::set(
 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
                                                 const string &k)
 {
-  auto cf = db->get_cf_handle(prefix);
+  auto cf = db->get_cf_handle(prefix, k);
   if (cf) {
     bat.Delete(cf, rocksdb::Slice(k));
   } else {
@@ -967,7 +1242,7 @@ void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
                                                 const char *k,
                                                 size_t keylen)
 {
-  auto cf = db->get_cf_handle(prefix);
+  auto cf = db->get_cf_handle(prefix, k, keylen);
   if (cf) {
     bat.Delete(cf, rocksdb::Slice(k, keylen));
   } else {
@@ -980,7 +1255,7 @@ void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
 void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix,
                                                         const string &k)
 {
-  auto cf = db->get_cf_handle(prefix);
+  auto cf = db->get_cf_handle(prefix, k);
   if (cf) {
     bat.SingleDelete(cf, k);
   } else {
@@ -990,69 +1265,87 @@ void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix,
 
 void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
 {
-  auto cf = db->get_cf_handle(prefix);
-  uint64_t cnt = db->delete_range_threshold;
-  bat.SetSavePoint();
-  auto it = db->get_iterator(prefix);
-  for (it->seek_to_first(); it->valid(); it->next()) {
-    if (!cnt) {
-      bat.RollbackToSavePoint();
-      if (cf) {
-        string endprefix = "\xff\xff\xff\xff";  // FIXME: this is cheating...
-        bat.DeleteRange(cf, string(), endprefix);
-      } else {
-        string endprefix = prefix;
+  auto p_iter = db->cf_handles.find(prefix);
+  if (p_iter == db->cf_handles.end()) {
+    uint64_t cnt = db->delete_range_threshold;
+    bat.SetSavePoint();
+    auto it = db->get_iterator(prefix);
+    for (it->seek_to_first(); it->valid() && (--cnt) != 0; it->next()) {
+      bat.Delete(db->default_cf, combine_strings(prefix, it->key()));
+    }
+    if (cnt == 0) {
+       bat.RollbackToSavePoint();
+       string endprefix = prefix;
         endprefix.push_back('\x01');
        bat.DeleteRange(db->default_cf,
                         combine_strings(prefix, string()),
                         combine_strings(endprefix, string()));
-      }
-      return;
-    }
-    if (cf) {
-      bat.Delete(cf, rocksdb::Slice(it->key()));
     } else {
-      bat.Delete(db->default_cf, combine_strings(prefix, it->key()));
+      bat.PopSavePoint();
+    }
+  } else {
+    ceph_assert(p_iter->second.handles.size() >= 1);
+    for (auto cf : p_iter->second.handles) {
+      uint64_t cnt = db->delete_range_threshold;
+      bat.SetSavePoint();
+      auto it = db->new_shard_iterator(cf);
+      for (it->SeekToFirst(); it->Valid() && (--cnt) != 0; it->Next()) {
+       bat.Delete(cf, it->key());
+      }
+      if (cnt == 0) {
+       bat.RollbackToSavePoint();
+       string endprefix = "\xff\xff\xff\xff";  // FIXME: this is cheating...
+       bat.DeleteRange(cf, string(), endprefix);
+      } else {
+       bat.PopSavePoint();
+      }
     }
-    --cnt;
   }
-  bat.PopSavePoint();
 }
 
 void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix,
                                                          const string &start,
                                                          const string &end)
 {
-  auto cf = db->get_cf_handle(prefix);
-
-  uint64_t cnt = db->delete_range_threshold;
-  auto it = db->get_iterator(prefix);
-  bat.SetSavePoint();
-  it->lower_bound(start);
-  while (it->valid()) {
-    if (it->key() >= end) {
-      break;
+  auto p_iter = db->cf_handles.find(prefix);
+  if (p_iter == db->cf_handles.end()) {
+    uint64_t cnt = db->delete_range_threshold;
+    bat.SetSavePoint();
+    auto it = db->get_iterator(prefix);
+    for (it->lower_bound(start);
+        it->valid() && db->comparator->Compare(it->key(), end) < 0 && (--cnt) != 0;
+        it->next()) {
+      bat.Delete(db->default_cf, combine_strings(prefix, it->key()));
     }
-    if (!cnt) {
+    if (cnt == 0) {
       bat.RollbackToSavePoint();
-      if (cf) {
-        bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end));
+      bat.DeleteRange(db->default_cf,
+                     rocksdb::Slice(combine_strings(prefix, start)),
+                     rocksdb::Slice(combine_strings(prefix, end)));
+    } else {
+      bat.PopSavePoint();
+    }
+  } else {
+    ceph_assert(p_iter->second.handles.size() >= 1);
+    for (auto cf : p_iter->second.handles) {
+      uint64_t cnt = db->delete_range_threshold;
+      bat.SetSavePoint();
+      rocksdb::Iterator* it = db->new_shard_iterator(cf);
+      ceph_assert(it != nullptr);
+      for (it->Seek(start);
+          it->Valid() && db->comparator->Compare(it->key(), end) < 0 && (--cnt) != 0;
+          it->Next()) {
+       bat.Delete(cf, it->key());
+      }
+      if (cnt == 0) {
+       bat.RollbackToSavePoint();
+       bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end));
       } else {
-        bat.DeleteRange(db->default_cf,
-                        rocksdb::Slice(combine_strings(prefix, start)),
-                        rocksdb::Slice(combine_strings(prefix, end)));
+       bat.PopSavePoint();
       }
-      return;
+      delete it;
     }
-    if (cf) {
-      bat.Delete(cf, rocksdb::Slice(it->key()));
-    } else {
-      bat.Delete(db->default_cf, combine_strings(prefix, it->key()));
-    }
-    it->next();
-    --cnt;
   }
-  bat.PopSavePoint();
 }
 
 void RocksDBStore::RocksDBTransactionImpl::merge(
@@ -1060,7 +1353,7 @@ void RocksDBStore::RocksDBTransactionImpl::merge(
   const string &k,
   const bufferlist &to_set_bl)
 {
-  auto cf = db->get_cf_handle(prefix);
+  auto cf = db->get_cf_handle(prefix, k);
   if (cf) {
     // bufferlist::c_str() is non-constant, so we can't call c_str()
     if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
@@ -1102,11 +1395,11 @@ int RocksDBStore::get(
 {
   rocksdb::PinnableSlice value;
   utime_t start = ceph_clock_now();
-  auto cf = get_cf_handle(prefix);
-  if (cf) {
+  if (cf_handles.count(prefix) > 0) {
     for (auto& key : keys) {
+      auto cf_handle = get_cf_handle(prefix, key);
       auto status = db->Get(rocksdb::ReadOptions(),
-                           cf,
+                           cf_handle,
                            rocksdb::Slice(key),
                            &value);
       if (status.ok()) {
@@ -1147,7 +1440,7 @@ int RocksDBStore::get(
   int r = 0;
   rocksdb::PinnableSlice value;
   rocksdb::Status s;
-  auto cf = get_cf_handle(prefix);
+  auto cf = get_cf_handle(prefix, key);
   if (cf) {
     s = db->Get(rocksdb::ReadOptions(),
                cf,
@@ -1184,7 +1477,7 @@ int RocksDBStore::get(
   int r = 0;
   rocksdb::PinnableSlice value;
   rocksdb::Status s;
-  auto cf = get_cf_handle(prefix);
+  auto cf = get_cf_handle(prefix, key, keylen);
   if (cf) {
     s = db->Get(rocksdb::ReadOptions(),
                cf,
@@ -1214,7 +1507,7 @@ int RocksDBStore::get(
 int RocksDBStore::split_key(rocksdb::Slice in, string *prefix, string *key)
 {
   size_t prefix_len = 0;
-  
+
   // Find separator inside Slice
   char* separator = (char*) memchr(in.data(), 0, in.size());
   if (separator == NULL)
@@ -1237,14 +1530,15 @@ void RocksDBStore::compact()
   rocksdb::CompactRangeOptions options;
   db->CompactRange(options, default_cf, nullptr, nullptr);
   for (auto cf : cf_handles) {
-    db->CompactRange(
-      options,
-      static_cast<rocksdb::ColumnFamilyHandle*>(cf.second),
-      nullptr, nullptr);
+    for (auto shard_cf : cf.second.handles) {
+      db->CompactRange(
+       options,
+       shard_cf,
+       nullptr, nullptr);
+    }
   }
 }
 
-
 void RocksDBStore::compact_thread_entry()
 {
   std::unique_lock l{compact_queue_lock};
@@ -1530,15 +1824,225 @@ public:
   }
 };
 
+class ShardMergeIteratorImpl : public KeyValueDB::IteratorImpl {
+private:
+  struct KeyLess {
+  private:
+    const rocksdb::Comparator* comparator;
+  public:
+    KeyLess(const rocksdb::Comparator* comparator) : comparator(comparator) { };
+
+    bool operator()(rocksdb::Iterator* a, rocksdb::Iterator* b) const
+    {
+      if (a->Valid()) {
+       if (b->Valid()) {
+         return comparator->Compare(a->key(), b->key()) < 0;
+       } else {
+         return true;
+       }
+      } else {
+       if (b->Valid()) {
+         return false;
+       } else {
+         return static_cast<void*>(a) < static_cast<void*>(b);
+       }
+      }
+    }
+  };
+
+  const RocksDBStore* db;
+  KeyLess keyless;
+  string prefix;
+  std::vector<rocksdb::Iterator*> iters;
+public:
+  explicit ShardMergeIteratorImpl(const RocksDBStore* db,
+                                 const std::string& prefix,
+                                 const std::vector<rocksdb::ColumnFamilyHandle*>& shards)
+    : db(db), keyless(db->comparator), prefix(prefix)
+  {
+    iters.reserve(shards.size());
+    for (auto& s : shards) {
+      iters.push_back(db->db->NewIterator(rocksdb::ReadOptions(), s));
+    }
+  }
+  ~ShardMergeIteratorImpl() {
+    for (auto& it : iters) {
+      delete it;
+    }
+  }
+  int seek_to_first() override {
+    for (auto& it : iters) {
+      it->SeekToFirst();
+      if (!it->status().ok()) {
+       return -1;
+      }
+    }
+    //all iterators seeked, sort
+    std::sort(iters.begin(), iters.end(), keyless);
+    return 0;
+  }
+  int seek_to_last() override {
+    for (auto& it : iters) {
+      it->SeekToLast();
+      if (!it->status().ok()) {
+       return -1;
+      }
+    }
+    for (size_t i = 0; i < iters.size(); i++) {
+      if (keyless(iters[0], iters[i])) {
+       swap(iters[0], iters[i]);
+      }
+      //it might happen that cf was empty
+      if (iters[i]->Valid()) {
+       iters[i]->Next();
+      }
+    }
+    //no need to sort, as at most 1 iterator is valid now
+    return 0;
+  }
+  int upper_bound(const string &after) override {
+    rocksdb::Slice slice_bound(after);
+    for (auto& it : iters) {
+      it->Seek(slice_bound);
+      if (it->Valid() && it->key() == after) {
+       it->Next();
+      }
+      if (!it->status().ok()) {
+       return -1;
+      }
+    }
+    std::sort(iters.begin(), iters.end(), keyless);
+    return 0;
+  }
+  int lower_bound(const string &to) override {
+    rocksdb::Slice slice_bound(to);
+    for (auto& it : iters) {
+      it->Seek(slice_bound);
+      if (!it->status().ok()) {
+       return -1;
+      }
+    }
+    std::sort(iters.begin(), iters.end(), keyless);
+    return 0;
+  }
+  int next() override {
+    int r = -1;
+    if (iters[0]->Valid()) {
+      iters[0]->Next();
+      if (iters[0]->status().ok()) {
+       r = 0;
+       //bubble up
+       for (size_t i = 0; i < iters.size() - 1; i++) {
+         if (keyless(iters[i], iters[i + 1])) {
+           //matches, fixed
+           break;
+         }
+         std::swap(iters[i], iters[i + 1]);
+       }
+      }
+    }
+    return r;
+  }
+  // iters are sorted, so
+  // a[0] < b[0] < c[0] < d[0]
+  // a[0] > a[-1], a[0] > b[-1], a[0] > c[-1], a[0] > d[-1]
+  // so, prev() will be one of:
+  // a[-1], b[-1], c[-1], d[-1]
+  // prev() will be the one that is *largest* of them
+  //
+  // alg:
+  // 1. go prev() on each iterator we can
+  // 2. select largest key from those iterators
+  // 3. go next() on all iterators except (2)
+  // 4. sort
+  int prev() override {
+    std::vector<rocksdb::Iterator*> prev_done;
+    //1
+    for (auto it: iters) {
+      if (it->Valid()) {
+       it->Prev();
+       if (it->Valid()) {
+         prev_done.push_back(it);
+       } else {
+         it->SeekToFirst();
+       }
+      } else {
+       it->SeekToLast();
+       if (it->Valid()) {
+         prev_done.push_back(it);
+       }
+      }
+    }
+    if (prev_done.size() == 0) {
+      /* there is no previous element */
+      if (iters[0]->Valid()) {
+       iters[0]->Prev();
+       ceph_assert(!iters[0]->Valid());
+      }
+      return -1;
+    }
+    //2,3
+    rocksdb::Iterator* highest = prev_done[0];
+    for (size_t i = 1; i < prev_done.size(); i++) {
+      if (keyless(highest, prev_done[i])) {
+       highest->Next();
+       highest = prev_done[i];
+      } else {
+       prev_done[i]->Next();
+      }
+    }
+    //4
+    //insert highest in the beginning, and shift values until we pick highest
+    //untouched rest is sorted - we just prev()/next() them
+    rocksdb::Iterator* hold = highest;
+    for (size_t i = 0; i < iters.size(); i++) {
+      std::swap(hold, iters[i]);
+      if (hold == highest) break;
+    }
+    ceph_assert(hold == highest);
+    return 0;
+  }
+  bool valid() override {
+    return iters[0]->Valid();
+  }
+  string key() override {
+    return iters[0]->key().ToString();
+  }
+  std::pair<std::string, std::string> raw_key() override {
+    return make_pair(prefix, key());
+  }
+  bufferlist value() override {
+    return to_bufferlist(iters[0]->value());
+  }
+  bufferptr value_as_ptr() override {
+    rocksdb::Slice val = iters[0]->value();
+    return bufferptr(val.data(), val.size());
+  }
+  int status() override {
+    return iters[0]->status().ok() ? 0 : -1;
+  }
+};
+
 KeyValueDB::Iterator RocksDBStore::get_iterator(const std::string& prefix)
 {
-  rocksdb::ColumnFamilyHandle *cf_handle =
-    static_cast<rocksdb::ColumnFamilyHandle*>(get_cf_handle(prefix));
-  if (cf_handle) {
-    return std::make_shared<CFIteratorImpl>(
-      prefix,
-      db->NewIterator(rocksdb::ReadOptions(), cf_handle));
+  auto cf_it = cf_handles.find(prefix);
+  if (cf_it != cf_handles.end()) {
+    if (cf_it->second.handles.size() == 1) {
+      return std::make_shared<CFIteratorImpl>(
+        prefix,
+        db->NewIterator(rocksdb::ReadOptions(), cf_it->second.handles[0]));
+    } else {
+      return std::make_shared<ShardMergeIteratorImpl>(
+        this,
+        prefix,
+        cf_it->second.handles);
+    }
   } else {
     return KeyValueDB::get_iterator(prefix);
   }
 }
+
+rocksdb::Iterator* RocksDBStore::new_shard_iterator(rocksdb::ColumnFamilyHandle* cf)
+{
+  return db->NewIterator(rocksdb::ReadOptions(), cf);
+}
index 4aa9c1cd61909612946096ecb3f19c8d60c968a4..f78b23bc55a71829829f4e76af82516ec185d249 100644 (file)
@@ -77,23 +77,70 @@ class RocksDBStore : public KeyValueDB {
   void *priv;
   rocksdb::DB *db;
   rocksdb::Env *env;
+  const rocksdb::Comparator* comparator;
   std::shared_ptr<rocksdb::Statistics> dbstats;
   rocksdb::BlockBasedTableOptions bbt_opts;
   std::string options_str;
 
   uint64_t cache_size = 0;
   bool set_cache_flag = false;
+  friend class ShardMergeIteratorImpl;
+  /*
+   *  See RocksDB's definition of a column family(CF) and how to use it.
+   *  The interfaces of KeyValueDB is extended, when a column family is created.
+   *  Prefix will be the name of column family to use.
+   */
+public:
+  struct ColumnFamily {
+    string name;      //< name of this individual column family
+    size_t shard_cnt; //< count of shards
+    string options;   //< configure option string for this CF
+    uint32_t hash_l;  //< first character to take for hash calc.
+    uint32_t hash_h;  //< last character to take for hash calc.
+    ColumnFamily(const string &name, size_t shard_cnt, const string &options,
+                uint32_t hash_l, uint32_t hash_h)
+      : name(name), shard_cnt(shard_cnt), options(options), hash_l(hash_l), hash_h(hash_h) {}
+  };
+private:
+  friend std::ostream& operator<<(std::ostream& out, const ColumnFamily& cf);
 
   bool must_close_default_cf = false;
   rocksdb::ColumnFamilyHandle *default_cf = nullptr;
 
+  /// column families in use, name->handles
+  struct prefix_shards {
+    uint32_t hash_l;  //< first character to take for hash calc.
+    uint32_t hash_h;  //< last character to take for hash calc.
+    std::vector<rocksdb::ColumnFamilyHandle *> handles;
+  };
+  std::unordered_map<std::string, prefix_shards> cf_handles;
+
+  void add_column_family(const std::string& cf_name, uint32_t hash_l, uint32_t hash_h,
+                        size_t shard_idx, rocksdb::ColumnFamilyHandle *handle);
+  bool is_column_family(const std::string& prefix);
+  rocksdb::ColumnFamilyHandle *get_cf_handle(const std::string& prefix, const std::string& key);
+  rocksdb::ColumnFamilyHandle *get_cf_handle(const std::string& prefix, const char* key, size_t keylen);
+
   int submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t);
   int install_cf_mergeop(const std::string &cf_name, rocksdb::ColumnFamilyOptions *cf_opt);
   int create_db_dir();
   int do_open(std::ostream &out, bool create_if_missing, bool open_readonly,
-             const std::vector<ColumnFamily>* cfs = nullptr);
+             const std::string& cfs="");
   int load_rocksdb_options(bool create_if_missing, rocksdb::Options& opt);
+public:
+  static bool parse_sharding_def(const std::string_view text_def,
+                               std::vector<ColumnFamily>& sharding_def,
+                               char const* *error_position = nullptr,
+                               std::string *error_msg = nullptr);
+  const rocksdb::Comparator* get_comparator() const {
+    return comparator;
+  }
 
+private:
+  static void sharding_def_to_columns(const std::vector<ColumnFamily>& sharding_def,
+                                     std::vector<std::string>& columns);
+  int create_shards(const rocksdb::Options& opt,
+                   const vector<ColumnFamily>& sharding_def);
   // manage async compactions
   ceph::mutex compact_queue_lock =
     ceph::make_mutex("RocksDBStore::compact_thread_lock");
@@ -162,6 +209,7 @@ public:
     priv(p),
     db(NULL),
     env(static_cast<rocksdb::Env*>(p)),
+    comparator(nullptr),
     dbstats(NULL),
     compact_queue_stop(false),
     compact_thread(this),
@@ -174,26 +222,19 @@ public:
 
   static bool check_omap_dir(std::string &omap_dir);
   /// Opens underlying db
-  int open(std::ostream &out, const std::vector<ColumnFamily>& cfs = {}) override {
-    return do_open(out, false, false, &cfs);
+  int open(std::ostream &out, const std::string& cfs="") override {
+    return do_open(out, false, false, cfs);
   }
   /// Creates underlying db if missing and opens it
   int create_and_open(std::ostream &out,
-                     const std::vector<ColumnFamily>& cfs = {}) override;
+                     const std::string& cfs="") override;
 
-  int open_read_only(std::ostream &out, const std::vector<ColumnFamily>& cfs = {}) override {
-    return do_open(out, false, true, &cfs);
+  int open_read_only(std::ostream &out, const std::string& cfs="") override {
+    return do_open(out, false, true, cfs);
   }
 
   void close() override;
 
-  rocksdb::ColumnFamilyHandle *get_cf_handle(const std::string& cf_name) {
-    auto iter = cf_handles.find(cf_name);
-    if (iter == cf_handles.end())
-      return nullptr;
-    else
-      return static_cast<rocksdb::ColumnFamilyHandle*>(iter->second);
-  }
   int repair(std::ostream &out) override;
   void split_stats(const std::string &s, char delim, std::vector<std::string> &elems);
   void get_statistics(ceph::Formatter *f) override;
@@ -399,7 +440,10 @@ public:
   };
 
   Iterator get_iterator(const std::string& prefix) override;
-
+private:
+  /// this iterator spans single cf
+  rocksdb::Iterator* new_shard_iterator(rocksdb::ColumnFamilyHandle* cf);
+public:
   /// Utility
   static std::string combine_strings(const std::string &prefix, const std::string &value) {
     std::string out = prefix;
index de18468d474526ca3855ecabb398ba3aa57b42fc..0d35780b9eed32563de89760c783ca85ce0739ba 100644 (file)
@@ -5694,7 +5694,7 @@ int BlueStore::_open_db(bool create, bool to_repair_db, bool read_only)
   std::shared_ptr<Int64ArrayMergeOperator> merge_op(new Int64ArrayMergeOperator);
 
   string kv_backend;
-  std::vector<KeyValueDB::ColumnFamily> cfs;
+  std::string sharding_def;
 
   if (create) {
     kv_backend = cct->_conf->bluestore_kvbackend;
@@ -5838,15 +5838,8 @@ int BlueStore::_open_db(bool create, bool to_repair_db, bool read_only)
 
   if (kv_backend == "rocksdb") {
     options = cct->_conf->bluestore_rocksdb_options;
-
-    map<string,string> cf_map;
-    cct->_conf.with_val<string>("bluestore_rocksdb_cfs",
-                                 get_str_map,
-                                 &cf_map,
-                                 " \t");
-    for (auto& i : cf_map) {
-      dout(10) << "column family " << i.first << ": " << i.second << dendl;
-      cfs.push_back(KeyValueDB::ColumnFamily(i.first, i.second));
+    if (cct->_conf.get_val<bool>("bluestore_rocksdb_cf")) {
+      sharding_def = cct->_conf.get_val<std::string>("bluestore_rocksdb_cfs");
     }
   }
 
@@ -5854,17 +5847,13 @@ int BlueStore::_open_db(bool create, bool to_repair_db, bool read_only)
   if (to_repair_db)
     return 0;
   if (create) {
-    if (cct->_conf.get_val<bool>("bluestore_rocksdb_cf")) {
-      r = db->create_and_open(err, cfs);
-    } else {
-      r = db->create_and_open(err);
-    }
+    r = db->create_and_open(err, sharding_def);
   } else {
     // we pass in cf list here, but it is only used if the db already has
     // column families created.
     r = read_only ?
-      db->open_read_only(err, cfs) :
-      db->open(err, cfs);
+      db->open_read_only(err, sharding_def) :
+      db->open(err, sharding_def);
   }
   if (r) {
     derr << __func__ << " erroring opening db: " << err.str() << dendl;
index 415399e0a30dce4e44472080512b126bc23eee4b..75e5d64232675b3d8da5377d6ee76934b32b5cfe 100644 (file)
@@ -21,10 +21,10 @@ public:
   int init(string _opt) override {
     return 0;
   }
-  int open(std::ostream &out, const vector<ColumnFamily>& cfs = {}) override {
+  int open(std::ostream &out, const std::string& cfs="") override {
     return 0;
   }
-  int create_and_open(ostream &out, const vector<ColumnFamily>& cfs = {}) override {
+  int create_and_open(ostream &out, const std::string& cfs="") override {
     return 0;
   }
 
index 39209e796b4d3345c78850ac42c97de7e1f50f97..7cb73b5b46684bc853bd99c7404a7bbfccc55ab2 100644 (file)
@@ -18,6 +18,7 @@
 #include <time.h>
 #include <sys/mount.h>
 #include "kv/KeyValueDB.h"
+#include "kv/RocksDBStore.h"
 #include "include/Context.h"
 #include "common/ceph_argparse.h"
 #include "global/global_init.h"
@@ -315,9 +316,7 @@ TEST_P(KVTest, RocksDBColumnFamilyTest) {
   if(string(GetParam()) != "rocksdb")
     return;
 
-  std::vector<KeyValueDB::ColumnFamily> cfs;
-  cfs.push_back(KeyValueDB::ColumnFamily("cf1", ""));
-  cfs.push_back(KeyValueDB::ColumnFamily("cf2", ""));
+  std::string cfs("cf1 cf2");
   ASSERT_EQ(0, db->init(g_conf()->bluestore_rocksdb_options));
   cout << "creating two column families and opening them" << std::endl;
   ASSERT_EQ(0, db->create_and_open(cout, cfs));
@@ -371,8 +370,7 @@ TEST_P(KVTest, RocksDBIteratorTest) {
   if(string(GetParam()) != "rocksdb")
     return;
 
-  std::vector<KeyValueDB::ColumnFamily> cfs;
-  cfs.push_back(KeyValueDB::ColumnFamily("cf1", ""));
+  std::string cfs("cf1");
   ASSERT_EQ(0, db->init(g_conf()->bluestore_rocksdb_options));
   cout << "creating one column family and opening it" << std::endl;
   ASSERT_EQ(0, db->create_and_open(cout, cfs));
@@ -416,6 +414,48 @@ TEST_P(KVTest, RocksDBIteratorTest) {
   fini();
 }
 
+TEST_P(KVTest, RocksDBShardingIteratorTest) {
+  if(string(GetParam()) != "rocksdb")
+    return;
+
+  std::string cfs("A(6)");
+  ASSERT_EQ(0, db->init(g_conf()->bluestore_rocksdb_options));
+  cout << "creating one column family and opening it" << std::endl;
+  ASSERT_EQ(0, db->create_and_open(cout, cfs));
+  {
+    KeyValueDB::Transaction t = db->get_transaction();
+    for (int v = 100; v <= 999; v++) {
+      std::string str = to_string(v);
+      bufferlist val;
+      val.append(str);
+      t->set("A", str, val);
+    }
+    ASSERT_EQ(0, db->submit_transaction_sync(t));
+  }
+  {
+    KeyValueDB::Iterator it = db->get_iterator("A");
+    int pos = 0;
+    ASSERT_EQ(it->lower_bound(to_string(pos)), 0);
+    for (pos = 100; pos <= 999; pos++) {
+      ASSERT_EQ(it->valid(), true);
+      ASSERT_EQ(it->key(), to_string(pos));
+      ASSERT_EQ(it->value().to_str(), to_string(pos));
+      it->next();
+    }
+    ASSERT_EQ(it->valid(), false);
+    pos = 999;
+    ASSERT_EQ(it->lower_bound(to_string(pos)), 0);
+    for (pos = 999; pos >= 100; pos--) {
+      ASSERT_EQ(it->valid(), true);
+      ASSERT_EQ(it->key(), to_string(pos));
+      ASSERT_EQ(it->value().to_str(), to_string(pos));
+      it->prev();
+    }
+    ASSERT_EQ(it->valid(), false);
+  }
+  fini();
+}
+
 TEST_P(KVTest, RocksDBCFMerge) {
   if(string(GetParam()) != "rocksdb")
     return;
@@ -424,8 +464,7 @@ TEST_P(KVTest, RocksDBCFMerge) {
   int r = db->set_merge_operator("cf1",p);
   if (r < 0)
     return; // No merge operators for this database type
-  std::vector<KeyValueDB::ColumnFamily> cfs;
-  cfs.push_back(KeyValueDB::ColumnFamily("cf1", ""));
+  std::string cfs("cf1");
   ASSERT_EQ(0, db->init(g_conf()->bluestore_rocksdb_options));
   cout << "creating one column family and opening it" << std::endl;
   ASSERT_EQ(0, db->create_and_open(cout, cfs));
@@ -470,8 +509,7 @@ TEST_P(KVTest, RocksDB_estimate_size) {
   if(string(GetParam()) != "rocksdb")
     GTEST_SKIP();
 
-  std::vector<KeyValueDB::ColumnFamily> cfs;
-  cfs.push_back(KeyValueDB::ColumnFamily("cf1", ""));
+  std::string cfs("cf1");
   ASSERT_EQ(0, db->init(g_conf()->bluestore_rocksdb_options));
   cout << "creating one column family and opening it" << std::endl;
   ASSERT_EQ(0, db->create_and_open(cout));
@@ -503,8 +541,7 @@ TEST_P(KVTest, RocksDB_estimate_size_column_family) {
   if(string(GetParam()) != "rocksdb")
     GTEST_SKIP();
 
-  std::vector<KeyValueDB::ColumnFamily> cfs;
-  cfs.push_back(KeyValueDB::ColumnFamily("cf1", ""));
+  std::string cfs("cf1");
   ASSERT_EQ(0, db->init(g_conf()->bluestore_rocksdb_options));
   cout << "creating one column family and opening it" << std::endl;
   ASSERT_EQ(0, db->create_and_open(cout, cfs));
@@ -532,6 +569,65 @@ TEST_P(KVTest, RocksDB_estimate_size_column_family) {
   fini();
 }
 
+TEST_P(KVTest, RocksDB_parse_sharding_def) {
+  if(string(GetParam()) != "rocksdb")
+    GTEST_SKIP();
+
+  bool result;
+  std::vector<RocksDBStore::ColumnFamily> sharding_def;
+  char const* error_position = nullptr;
+  std::string error_msg;
+
+  std::string_view text_def = "A(10,0-30) B(6)=option1,option2=aaaa C";
+  result = RocksDBStore::parse_sharding_def(text_def,
+                                           sharding_def,
+                                           &error_position,
+                                           &error_msg);
+
+  ASSERT_EQ(result, true);
+  ASSERT_EQ(error_position, nullptr);
+  ASSERT_EQ(error_msg, "");
+  std::cout << text_def << std::endl;
+  if (error_position) std::cout << std::string(error_position - text_def.begin(), ' ') << "^" << error_msg << std::endl;
+
+  ASSERT_EQ(sharding_def.size(), 3);
+  ASSERT_EQ(sharding_def[0].name, "A");
+  ASSERT_EQ(sharding_def[0].shard_cnt, 10);
+  ASSERT_EQ(sharding_def[0].hash_l, 0);
+  ASSERT_EQ(sharding_def[0].hash_h, 30);
+
+  ASSERT_EQ(sharding_def[1].name, "B");
+  ASSERT_EQ(sharding_def[1].shard_cnt, 6);
+  ASSERT_EQ(sharding_def[1].options, "option1,option2=aaaa");
+  ASSERT_EQ(sharding_def[2].name, "C");
+  ASSERT_EQ(sharding_def[2].shard_cnt, 1);
+
+
+  text_def = "A(10 B(6)=option C";
+  result = RocksDBStore::parse_sharding_def(text_def,
+                                           sharding_def,
+                                           &error_position,
+                                           &error_msg);
+  std::cout << text_def << std::endl;
+  if (error_position)
+    std::cout << std::string(error_position - text_def.begin(), ' ') << "^" << error_msg << std::endl;
+  ASSERT_EQ(result, false);
+  ASSERT_NE(error_position, nullptr);
+  ASSERT_NE(error_msg, "");
+
+  text_def = "A(10,1) B(6)=option C";
+  result = RocksDBStore::parse_sharding_def(text_def,
+                                           sharding_def,
+                                           &error_position,
+                                           &error_msg);
+  std::cout << text_def << std::endl;
+  std::cout << std::string(error_position - text_def.begin(), ' ') << "^" << error_msg << std::endl;
+  ASSERT_EQ(result, false);
+  ASSERT_NE(error_position, nullptr);
+  ASSERT_NE(error_msg, "");
+}
+
+
 INSTANTIATE_TEST_SUITE_P(
   KeyValueDB,
   KVTest,