]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kv/RocksDBStore: add CF support
authorJianjian Huo <jianjian.huo@ssi.samsung.com>
Fri, 29 Sep 2017 22:03:05 +0000 (17:03 -0500)
committerSage Weil <sage@redhat.com>
Thu, 5 Oct 2017 03:01:02 +0000 (22:01 -0500)
- detect and use existing CFs on open

Signed-off-by: Jianjian Huo <jianjian.huo@ssi.samsung.com>
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
Signed-off-by: Sage Weil <sage@redhat.com>
src/kv/RocksDBStore.cc
src/kv/RocksDBStore.h
src/test/objectstore/test_kv.cc

index 98e410ffdcf61510eaf9f18177f41e106b3d7678..35d312d9b5f53c3b14cfd16b0c0ee4136d531eab 100644 (file)
@@ -53,10 +53,14 @@ static rocksdb::SliceParts prepare_sliceparts(const bufferlist &bl,
 }
 
 //
-// One of these per rocksdb instance, implements the merge operator prefix stuff
+// One of these per rocksdb column family(includes the default CF),
+// implements the merge operator prefix stuff
 //
 class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperator {
   RocksDBStore& store;
+  //name of the column family associated with this merge operator
+  //only for explicit CF, not for the default CF
+  std::string cf_name;
   public:
   const char *Name() const override {
     // Construct a name that rocksDB will validate against. We want to
@@ -65,7 +69,19 @@ class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperat
     // construct a name from all of those parts.
     store.assoc_name.clear();
     map<std::string,std::string> names;
-    for (auto& p : store.merge_ops) names[p.first] = p.second->name();
+    if (cf_name.empty()) {
+      //for default column family
+      for (auto& p : store.merge_ops) names[p.first] = p.second->name();
+      for (auto& p : store.cf_handles) names.erase(p.first);
+    } else {
+      //for user created explicit column family
+      for (auto& p : store.merge_ops) {
+       if (p.first.compare(cf_name) == 0) {
+         names[cf_name] = p.second->name();
+         break;
+       }
+      }
+    }
     for (auto& p : names) {
       store.assoc_name += '.';
       store.assoc_name += p.first;
@@ -75,31 +91,53 @@ class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperat
     return store.assoc_name.c_str();
   }
 
+  //for default column family
   MergeOperatorRouter(RocksDBStore &_store) : store(_store) {}
+  //for user created explicit CF
+  MergeOperatorRouter(RocksDBStore &_store, const std::string &cf)
+    : store(_store), cf_name(cf) {}
 
   bool Merge(const rocksdb::Slice& key,
                      const rocksdb::Slice* existing_value,
                      const rocksdb::Slice& value,
                      std::string* new_value,
                      rocksdb::Logger* logger) const override {
-    // Check each prefix
-    for (auto& p : store.merge_ops) {
-      if (p.first.compare(0, p.first.length(),
-                         key.data(), p.first.length()) == 0 &&
-         key.data()[p.first.length()] == 0) {
-        if (existing_value) {
-          p.second->merge(existing_value->data(), existing_value->size(),
-                         value.data(), value.size(),
-                         new_value);
-        } else {
-          p.second->merge_nonexistent(value.data(), value.size(), new_value);
-        }
-        break;
+    if (cf_name.empty()) {
+      // for default column family
+      // extract prefix from key and compare against each registered merge op;
+      // even though merge operator for explicit CF is included in merge_ops,
+      // it won't be picked up, since it won't match.
+      for (auto& p : store.merge_ops) {
+       if (p.first.compare(0, p.first.length(),
+                           key.data(), p.first.length()) == 0 &&
+           key.data()[p.first.length()] == 0) {
+         if (existing_value) {
+           p.second->merge(existing_value->data(), existing_value->size(),
+                           value.data(), value.size(),
+                           new_value);
+         } else {
+           p.second->merge_nonexistent(value.data(), value.size(), new_value);
+         }
+         break;
+       }
+      }
+    } else {
+      //for user created explicit column family
+      for (auto& p : store.merge_ops) {
+       if (p.first.compare(cf_name) == 0) {
+         if (existing_value) {
+           p.second->merge(existing_value->data(), existing_value->size(),
+                           value.data(), value.size(),
+                           new_value);
+         } else {
+           p.second->merge_nonexistent(value.data(), value.size(), new_value);
+         }
+         break;
+       }
       }
     }
     return true; // OK :)
   }
-
 };
 
 int RocksDBStore::set_merge_operator(
@@ -233,7 +271,7 @@ int RocksDBStore::init(string _options_str)
   return 0;
 }
 
-int RocksDBStore::create_and_open(ostream &out)
+int RocksDBStore::create_db_dir()
 {
   if (env) {
     unique_ptr<rocksdb::Directory> dir;
@@ -248,10 +286,45 @@ int RocksDBStore::create_and_open(ostream &out)
       return r;
     }
   }
+  return 0;
+}
+
+int RocksDBStore::install_cf_mergeop(const string &cf_name,
+                                 rocksdb::ColumnFamilyOptions *cf_opt)
+{
+  assert(cf_opt != nullptr);
+  bool found_mop = false;
+  for (auto &mop : merge_ops) {
+    if (mop.first.compare(cf_name) == 0) {
+      cf_opt->merge_operator.reset(new MergeOperatorRouter(*this, cf_name));
+      found_mop = true;
+      break;
+    }
+  }
+  if (!found_mop)
+      cf_opt->merge_operator.reset();
+  return 0;
+}
+
+int RocksDBStore::create_and_open(ostream &out)
+{
+  int r = create_db_dir();
+  if (r < 0)
+    return r;
   return do_open(out, true);
 }
 
-int RocksDBStore::do_open(ostream &out, bool create_if_missing)
+int RocksDBStore::create_and_open(ostream &out,
+                                 const vector<ColumnFamily>& cfs)
+{
+  int r = create_db_dir();
+  if (r < 0)
+    return r;
+  return do_open(out, true, &cfs);
+}
+
+int RocksDBStore::do_open(ostream &out, bool create_if_missing,
+                         const vector<ColumnFamily>* cfs)
 {
   rocksdb::Options opt;
   rocksdb::Status status;
@@ -368,10 +441,97 @@ int RocksDBStore::do_open(ostream &out, bool create_if_missing)
           << dendl;
 
   opt.merge_operator.reset(new MergeOperatorRouter(*this));
-  status = rocksdb::DB::Open(opt, path, &db);
-  if (!status.ok()) {
-    derr << status.ToString() << dendl;
-    return -EINVAL;
+  if (create_if_missing) {
+    status = rocksdb::DB::Open(opt, path, &db);
+    if (!status.ok()) {
+      derr << status.ToString() << dendl;
+      return -EINVAL;
+    }
+    // create and open column families
+    if (cfs) {
+      for (auto& p : *cfs) {
+       // copy default CF settings, block cache, merge operators as
+       // the base for new CF
+       rocksdb::ColumnFamilyOptions cf_opt(opt);
+       // user input options will override the base options
+       status = rocksdb::GetColumnFamilyOptionsFromString(
+         cf_opt, p.option, &cf_opt);
+       if (!status.ok()) {
+         derr << __func__ << " invalid db column family option string for CF: "
+              << p.name << dendl;
+         return -EINVAL;
+       }
+       install_cf_mergeop(p.name, &cf_opt);
+       rocksdb::ColumnFamilyHandle *cf;
+       status = db->CreateColumnFamily(cf_opt, p.name, &cf);
+       if (!status.ok()) {
+         derr << __func__ << " Failed to create rocksdb column family: "
+              << p.name << dendl;
+         return -EINVAL;
+       }
+       // store the new CF handle
+       add_column_family(p.name, static_cast<void*>(cf));
+      }
+    }
+  } else {
+    std::vector<string> existing_cfs;
+    status = rocksdb::DB::ListColumnFamilies(
+      rocksdb::DBOptions(opt),
+      path,
+      &existing_cfs);
+    dout(1) << __func__ << " column families: " << existing_cfs << dendl;
+    if (existing_cfs.empty()) {
+      // no column families
+      status = rocksdb::DB::Open(opt, path, &db);
+      if (!status.ok()) {
+       derr << status.ToString() << dendl;
+       return -EINVAL;
+      }
+    } else {
+      // we cannot change column families for a created database.  so, map
+      // what options we are given to whatever cf's already exist.
+      std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
+      for (auto& n : existing_cfs) {
+       // copy default CF settings, block cache, merge operators as
+       // the base for new CF
+       rocksdb::ColumnFamilyOptions cf_opt(opt);
+       bool found = false;
+       if (cfs) {
+         for (auto& i : *cfs) {
+           if (i.name == n) {
+             found = true;
+             status = rocksdb::GetColumnFamilyOptionsFromString(
+               cf_opt, i.option, &cf_opt);
+             if (!status.ok()) {
+               derr << __func__ << " invalid db column family options for CF '"
+                    << i.name << "': " << i.option << dendl;
+               return -EINVAL;
+             }
+           }
+         }
+       }
+       if (n != rocksdb::kDefaultColumnFamilyName) {
+         install_cf_mergeop(n, &cf_opt);
+       }
+       column_families.push_back(rocksdb::ColumnFamilyDescriptor(n, cf_opt));
+       if (!found && n != rocksdb::kDefaultColumnFamilyName) {
+         dout(1) << __func__ << " column family '" << n
+                 << "' exists but not expected" << dendl;
+       }
+      }
+      std::vector<rocksdb::ColumnFamilyHandle*> handles;
+      status = rocksdb::DB::Open(rocksdb::DBOptions(opt),
+                                path, column_families, &handles, &db);
+      if (!status.ok()) {
+       derr << status.ToString() << dendl;
+       return -EINVAL;
+      }
+      for (unsigned i = 0; i < existing_cfs.size(); ++i) {
+       if (existing_cfs[i] != rocksdb::kDefaultColumnFamilyName) {
+         add_column_family(existing_cfs[i], static_cast<void*>(handles[i]));
+       }
+      }
+    }
   }
   
   PerfCountersBuilder plb(g_ceph_context, "rocksdb", l_rocksdb_first, l_rocksdb_last);
@@ -418,6 +578,11 @@ RocksDBStore::~RocksDBStore()
   delete logger;
 
   // Ensure db is destroyed before dependent db_cache and filterpolicy
+  for (auto& p : cf_handles) {
+    db->DestroyColumnFamilyHandle(
+      static_cast<rocksdb::ColumnFamilyHandle*>(p.second));
+    p.second = nullptr;
+  }
   delete db;
   db = nullptr;
 
@@ -586,20 +751,23 @@ RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db)
   db = _db;
 }
 
-static void put_bat(
-  rocksdb::WriteBatch& bat, 
-  const string &key, 
+void RocksDBStore::RocksDBTransactionImpl::put_bat(
+  rocksdb::WriteBatch& bat,
+  rocksdb::ColumnFamilyHandle *cf,
+  const string &key,
   const bufferlist &to_set_bl)
 {
   // bufferlist::c_str() is non-constant, so we can't call c_str()
   if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
-    bat.Put(rocksdb::Slice(key),
-            rocksdb::Slice(to_set_bl.buffers().front().c_str(),
-                           to_set_bl.length()));
+    bat.Put(cf,
+           rocksdb::Slice(key),
+           rocksdb::Slice(to_set_bl.buffers().front().c_str(),
+                          to_set_bl.length()));
   } else {
     rocksdb::Slice key_slice(key);
     vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
-    bat.Put(nullptr, rocksdb::SliceParts(&key_slice, 1),
+    bat.Put(cf,
+           rocksdb::SliceParts(&key_slice, 1),
             prepare_sliceparts(to_set_bl, &value_slices));
   }
 }
@@ -609,9 +777,13 @@ void RocksDBStore::RocksDBTransactionImpl::set(
   const string &k,
   const bufferlist &to_set_bl)
 {
-  string key = combine_strings(prefix, k);
-
-  put_bat(bat, key, to_set_bl);
+  auto cf = db->get_cf_handle(prefix);
+  if (cf) {
+    put_bat(bat, cf, k, to_set_bl);
+  } else {
+    string key = combine_strings(prefix, k);
+    put_bat(bat, db->db->DefaultColumnFamily(), key, to_set_bl);
+  }
 }
 
 void RocksDBStore::RocksDBTransactionImpl::set(
@@ -619,46 +791,81 @@ void RocksDBStore::RocksDBTransactionImpl::set(
   const char *k, size_t keylen,
   const bufferlist &to_set_bl)
 {
-  string key;
-  combine_strings(prefix, k, keylen, &key);
-
-  put_bat(bat, key, to_set_bl);
+  auto cf = db->get_cf_handle(prefix);
+  if (cf) {
+    string key(k, keylen);  // fixme?
+    put_bat(bat, cf, key, to_set_bl);
+  } else {
+    string key;
+    combine_strings(prefix, k, keylen, &key);
+    put_bat(bat, cf, key, to_set_bl);
+  }
 }
 
 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
                                                 const string &k)
 {
-  bat.Delete(combine_strings(prefix, k));
+  auto cf = db->get_cf_handle(prefix);
+  if (cf) {
+    bat.Delete(cf, rocksdb::Slice(k));
+  } else {
+    bat.Delete(combine_strings(prefix, k));
+  }
 }
 
 void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
                                                 const char *k,
                                                 size_t keylen)
 {
-  string key;
-  combine_strings(prefix, k, keylen, &key);
-  bat.Delete(key);
+  auto cf = db->get_cf_handle(prefix);
+  if (cf) {
+    bat.Delete(cf, rocksdb::Slice(k, keylen));
+  } else {
+    string key;
+    combine_strings(prefix, k, keylen, &key);
+    bat.Delete(rocksdb::Slice(key));
+  }
 }
 
 void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix,
                                                         const string &k)
 {
-  bat.SingleDelete(combine_strings(prefix, k));
+  auto cf = db->get_cf_handle(prefix);
+  if (cf) {
+    bat.SingleDelete(cf, k);
+  } else {
+    bat.SingleDelete(combine_strings(prefix, k));
+  }
 }
 
 void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
 {
-  if (db->enable_rmrange) {
-    string endprefix = prefix;
-    endprefix.push_back('\x01');
-    bat.DeleteRange(combine_strings(prefix, string()),
-                   combine_strings(endprefix, string()));
+  auto cf = db->get_cf_handle(prefix);
+  if (cf) {
+    if (db->enable_rmrange) {
+      string endprefix("\xff\xff\xff\xff");  // FIXME: this is cheating...
+      bat.DeleteRange(cf, string(), endprefix);
+    } else {
+      auto it = db->get_iterator(prefix);
+      for (it->seek_to_first();
+          it->valid();
+          it->next()) {
+       bat.Delete(cf, rocksdb::Slice(it->key()));
+      }
+    }
   } else {
-    KeyValueDB::Iterator it = db->get_iterator(prefix);
-    for (it->seek_to_first();
-        it->valid();
-        it->next()) {
-      bat.Delete(combine_strings(prefix, it->key()));
+    if (db->enable_rmrange) {
+      string endprefix = prefix;
+      endprefix.push_back('\x01');
+      bat.DeleteRange(combine_strings(prefix, string()),
+                     combine_strings(endprefix, string()));
+    } else {
+      auto it = db->get_iterator(prefix);
+      for (it->seek_to_first();
+          it->valid();
+          it->next()) {
+       bat.Delete(combine_strings(prefix, it->key()));
+      }
     }
   }
 }
@@ -667,17 +874,37 @@ void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix,
                                                          const string &start,
                                                          const string &end)
 {
-  if (db->enable_rmrange) {
-    bat.DeleteRange(combine_strings(prefix, start), combine_strings(prefix, end));
+  auto cf = db->get_cf_handle(prefix);
+  if (cf) {
+    if (db->enable_rmrange) {
+      bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end));
+    } else {
+      auto it = db->get_iterator(prefix);
+      it->lower_bound(start);
+      while (it->valid()) {
+       if (it->key() >= end) {
+         break;
+       }
+       bat.Delete(cf, rocksdb::Slice(it->key()));
+       it->next();
+      }
+    }
   } else {
-    auto it = db->get_iterator(prefix);
-    it->lower_bound(start);
-    while (it->valid()) {
-      if (it->key() >= end) {
-        break;
+    if (db->enable_rmrange) {
+      bat.DeleteRange(
+       db->db->DefaultColumnFamily(),
+       rocksdb::Slice(combine_strings(prefix, start)),
+       rocksdb::Slice(combine_strings(prefix, end)));
+    } else {
+      auto it = db->get_iterator(prefix);
+      it->lower_bound(start);
+      while (it->valid()) {
+       if (it->key() >= end) {
+         break;
+       }
+       bat.Delete(combine_strings(prefix, it->key()));
+       it->next();
       }
-      bat.Delete(combine_strings(prefix, it->key()));
-      it->next();
     }
   }
 }
@@ -687,40 +914,75 @@ void RocksDBStore::RocksDBTransactionImpl::merge(
   const string &k,
   const bufferlist &to_set_bl)
 {
-  string key = combine_strings(prefix, k);
-
-  // bufferlist::c_str() is non-constant, so we can't call c_str()
-  if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
-    bat.Merge(rocksdb::Slice(key),
-              rocksdb::Slice(to_set_bl.buffers().front().c_str(),
-                           to_set_bl.length()));
+  auto cf = db->get_cf_handle(prefix);
+  if (cf) {
+    // bufferlist::c_str() is non-constant, so we can't call c_str()
+    if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
+      bat.Merge(
+       cf,
+       rocksdb::Slice(k),
+       rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length()));
+    } else {
+      // make a copy
+      rocksdb::Slice key_slice(k);
+      vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
+      bat.Merge(cf, rocksdb::SliceParts(&key_slice, 1),
+               prepare_sliceparts(to_set_bl, &value_slices));
+    }
   } else {
-    // make a copy
-    rocksdb::Slice key_slice(key);
-    vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
-    bat.Merge(nullptr, rocksdb::SliceParts(&key_slice, 1),
-              prepare_sliceparts(to_set_bl, &value_slices));
+    string key = combine_strings(prefix, k);
+    // bufferlist::c_str() is non-constant, so we can't call c_str()
+    if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
+      bat.Merge(
+       db->db->DefaultColumnFamily(),
+       rocksdb::Slice(key),
+       rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length()));
+    } else {
+      // make a copy
+      rocksdb::Slice key_slice(key);
+      vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
+      bat.Merge(
+       db->db->DefaultColumnFamily(),
+       rocksdb::SliceParts(&key_slice, 1),
+       prepare_sliceparts(to_set_bl, &value_slices));
+    }
   }
 }
 
-//gets will bypass RocksDB row cache, since it uses iterator
 int RocksDBStore::get(
     const string &prefix,
     const std::set<string> &keys,
     std::map<string, bufferlist> *out)
 {
   utime_t start = ceph_clock_now();
-  for (std::set<string>::const_iterator i = keys.begin();
-       i != keys.end(); ++i) {
-    std::string value;
-    std::string bound = combine_strings(prefix, *i);
-    auto status = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(bound), &value);
-    if (status.ok()) {
-      (*out)[*i].append(value);
-    } else if (status.IsIOError()) {
-      ceph_abort_msg(cct, status.ToString());
+  auto cf = get_cf_handle(prefix);
+  if (cf) {
+    for (auto& key : keys) {
+      std::string value;
+      auto status = db->Get(rocksdb::ReadOptions(),
+                           cf,
+                           rocksdb::Slice(key),
+                           &value);
+      if (status.ok()) {
+       (*out)[key].append(value);
+      } else if (status.IsIOError()) {
+       ceph_abort_msg(cct, status.ToString());
+      }
+    }
+  } else {
+    for (auto& key : keys) {
+      std::string value;
+      string k = combine_strings(prefix, key);
+      auto status = db->Get(rocksdb::ReadOptions(),
+                           db->DefaultColumnFamily(),
+                           rocksdb::Slice(k),
+                           &value);
+      if (status.ok()) {
+       (*out)[key].append(value);
+      } else if (status.IsIOError()) {
+       ceph_abort_msg(cct, status.ToString());
+      }
     }
-
   }
   utime_t lat = ceph_clock_now() - start;
   logger->inc(l_rocksdb_gets);
@@ -736,10 +998,21 @@ int RocksDBStore::get(
   assert(out && (out->length() == 0));
   utime_t start = ceph_clock_now();
   int r = 0;
-  string value, k;
+  string value;
   rocksdb::Status s;
-  k = combine_strings(prefix, key);
-  s = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(k), &value);
+  auto cf = get_cf_handle(prefix);
+  if (cf) {
+    s = db->Get(rocksdb::ReadOptions(),
+               cf,
+               rocksdb::Slice(key),
+               &value);
+  } else {
+    string k = combine_strings(prefix, key);
+    s = db->Get(rocksdb::ReadOptions(),
+               db->DefaultColumnFamily(),
+               rocksdb::Slice(k),
+               &value);
+  }
   if (s.ok()) {
     out->append(value);
   } else if (s.IsNotFound()) {
@@ -762,10 +1035,22 @@ int RocksDBStore::get(
   assert(out && (out->length() == 0));
   utime_t start = ceph_clock_now();
   int r = 0;
-  string value, k;
-  combine_strings(prefix, key, keylen, &k);
+  string value;
   rocksdb::Status s;
-  s = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(k), &value);
+  auto cf = get_cf_handle(prefix);
+  if (cf) {
+    s = db->Get(rocksdb::ReadOptions(),
+               cf,
+               rocksdb::Slice(key, keylen),
+               &value);
+  } else {
+    string k;
+    combine_strings(prefix, key, keylen, &k);
+    s = db->Get(rocksdb::ReadOptions(),
+               db->DefaultColumnFamily(),
+               rocksdb::Slice(k),
+               &value);
+  }
   if (s.ok()) {
     out->append(value);
   } else if (s.IsNotFound()) {
@@ -882,6 +1167,7 @@ void RocksDBStore::compact_range(const string& start, const string& end)
   rocksdb::Slice cend(end);
   db->CompactRange(options, &cstart, &cend);
 }
+
 RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl()
 {
   delete dbiter;
@@ -1017,3 +1303,80 @@ RocksDBStore::WholeSpaceIterator RocksDBStore::get_wholespace_iterator()
         db->NewIterator(rocksdb::ReadOptions()));
 }
 
+class CFIteratorImpl : public KeyValueDB::IteratorImpl {
+protected:
+  string prefix;
+  rocksdb::Iterator *dbiter;
+public:
+  explicit CFIteratorImpl(const std::string& p,
+                                rocksdb::Iterator *iter)
+    : prefix(p), dbiter(iter) { }
+  ~CFIteratorImpl() {
+    delete dbiter;
+  }
+
+  int seek_to_first() override {
+    dbiter->SeekToFirst();
+    return dbiter->status().ok() ? 0 : -1;
+  }
+  int seek_to_last() override {
+    dbiter->SeekToLast();
+    return dbiter->status().ok() ? 0 : -1;
+  }
+  int upper_bound(const string &after) override {
+    lower_bound(after);
+    if (valid() && (key() == after)) {
+      next();
+    }
+    return dbiter->status().ok() ? 0 : -1;
+  }
+  int lower_bound(const string &to) override {
+    rocksdb::Slice slice_bound(to);
+    dbiter->Seek(slice_bound);
+    return dbiter->status().ok() ? 0 : -1;
+  }
+  int next(bool validate=true) {
+    if (valid()) {
+      dbiter->Next();
+    }
+    return dbiter->status().ok() ? 0 : -1;
+  }
+  int prev(bool validate=true) {
+    if (valid()) {
+      dbiter->Prev();
+    }
+    return dbiter->status().ok() ? 0 : -1;
+  }
+  bool valid() override {
+    return dbiter->Valid();
+  }
+  string key() override {
+    return dbiter->key().ToString();
+  }
+  std::pair<std::string, std::string> raw_key() {
+    return make_pair(prefix, key());
+  }
+  bufferlist value() override {
+    return to_bufferlist(dbiter->value());
+  }
+  bufferptr value_as_ptr() override {
+    rocksdb::Slice val = dbiter->value();
+    return bufferptr(val.data(), val.size());
+  }
+  int status() override {
+    return dbiter->status().ok() ? 0 : -1;
+  }
+};
+
+KeyValueDB::Iterator RocksDBStore::get_iterator(const std::string& prefix)
+{
+  rocksdb::ColumnFamilyHandle *cf_handle =
+    static_cast<rocksdb::ColumnFamilyHandle*>(get_cf_handle(prefix));
+  if (cf_handle) {
+    return std::make_shared<CFIteratorImpl>(
+      prefix,
+      db->NewIterator(rocksdb::ReadOptions(), cf_handle));
+  } else {
+    return KeyValueDB::get_iterator(prefix);
+  }
+}
index e0b25229d5c9eb0523d77e0525cc43df55662b55..22e733e1f90c8a87a45bda4b0d55aec7db6f27cf 100644 (file)
@@ -55,8 +55,11 @@ namespace rocksdb{
   class WriteBatch;
   class Iterator;
   class Logger;
+  class ColumnFamilyHandle;
   struct Options;
   struct BlockBasedTableOptions;
+  struct DBOptions;
+  struct ColumnFamilyOptions;
 }
 
 extern rocksdb::Logger *create_rocksdb_ceph_logger();
@@ -79,7 +82,10 @@ class RocksDBStore : public KeyValueDB {
   bool set_cache_flag = false;
 
   int submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t);
-  int do_open(ostream &out, bool create_if_missing);
+  int install_cf_mergeop(const string &cf_name, rocksdb::ColumnFamilyOptions *cf_opt);
+  int create_db_dir();
+  int do_open(ostream &out, bool create_if_missing,
+             const vector<ColumnFamily>* cfs = nullptr);
 
   // manage async compactions
   Mutex compact_queue_lock;
@@ -151,11 +157,24 @@ public:
   int open(ostream &out) override {
     return do_open(out, false);
   }
+  int open(ostream &out, const vector<ColumnFamily>& cfs) override {
+    return do_open(out, false, &cfs);
+  }
   /// Creates underlying db if missing and opens it
   int create_and_open(ostream &out) override;
+  int create_and_open(ostream &out,
+                     const vector<ColumnFamily>& cfs) override;
 
   void close() override;
 
+  rocksdb::ColumnFamilyHandle *get_cf_handle(const std::string& cf_name) {
+    auto iter = cf_handles.find(cf_name);
+    if (iter == cf_handles.end())
+      return nullptr;
+    else
+      return static_cast<rocksdb::ColumnFamilyHandle*>(iter->second);
+  }
+
   void split_stats(const std::string &s, char delim, std::vector<std::string> &elems);
   void get_statistics(Formatter *f) override;
 
@@ -255,13 +274,19 @@ public:
 
   };
 
-
   class RocksDBTransactionImpl : public KeyValueDB::TransactionImpl {
   public:
     rocksdb::WriteBatch bat;
     RocksDBStore *db;
 
     explicit RocksDBTransactionImpl(RocksDBStore *_db);
+  private:
+    void put_bat(
+      rocksdb::WriteBatch& bat,
+      rocksdb::ColumnFamilyHandle *cf,
+      const string &k,
+      const bufferlist &to_set_bl);
+  public:
     void set(
       const string &prefix,
       const string &k,
@@ -346,6 +371,8 @@ public:
     size_t value_size() override;
   };
 
+  Iterator get_iterator(const std::string& prefix) override;
+
   /// Utility
   static string combine_strings(const string &prefix, const string &value) {
     string out = prefix;
index 46a8a8be32d90625366e35f2a72b512467718419..1d47b2737533fed5b99ec44d9d2fcddf7bd1b6c4 100644 (file)
@@ -35,6 +35,11 @@ public:
 
   KVTest() : db(0) {}
 
+  string _bl_to_str(bufferlist val) {
+    string str(val.c_str(), val.length());
+    return str;
+  }
+
   void rm_r(string path) {
     string cmd = string("rm -r ") + path;
     cout << "==> " << cmd << std::endl;
@@ -199,7 +204,6 @@ struct AppendMOP : public KeyValueDB::MergeOperator {
     const char *ldata, size_t llen,
     const char *rdata, size_t rlen,
     std::string *new_value) override {
-
     *new_value = std::string(ldata, llen) + std::string(rdata, rlen);
   }
   // We use each operator name and each prefix to construct the
@@ -308,6 +312,160 @@ TEST_P(KVTest, RMRange) {
   fini();
 }
 
+TEST_P(KVTest, RocksDBColumnFamilyTest) {
+  if(string(GetParam()) != "rocksdb")
+    return;
+
+  std::vector<KeyValueDB::ColumnFamily> cfs;
+  cfs.push_back(KeyValueDB::ColumnFamily("cf1", ""));
+  cfs.push_back(KeyValueDB::ColumnFamily("cf2", ""));
+  ASSERT_EQ(0, db->init(g_conf->bluestore_rocksdb_options));
+  cout << "creating two column families and opening them" << std::endl;
+  ASSERT_EQ(0, db->create_and_open(cout, cfs));
+  {
+    KeyValueDB::Transaction t = db->get_transaction();
+    bufferlist value;
+    value.append("value");
+    cout << "write a transaction includes three keys in different CFs" << std::endl;
+    t->set("prefix", "key", value);
+    t->set("cf1", "key", value);
+    t->set("cf2", "key2", value);
+    ASSERT_EQ(0, db->submit_transaction_sync(t));
+  }
+  fini();
+
+  init();
+  ASSERT_EQ(0, db->open(cout, cfs));
+  {
+    bufferlist v1, v2, v3;
+    cout << "reopen db and read those keys" << std::endl;
+    ASSERT_EQ(0, db->get("prefix", "key", &v1));
+    ASSERT_EQ(0, _bl_to_str(v1) != "value");
+    ASSERT_EQ(0, db->get("cf1", "key", &v2));
+    ASSERT_EQ(0, _bl_to_str(v2) != "value");
+    ASSERT_EQ(0, db->get("cf2", "key2", &v3));
+    ASSERT_EQ(0, _bl_to_str(v2) != "value");
+  }
+  {
+    cout << "delete two keys in CFs" << std::endl;
+    KeyValueDB::Transaction t = db->get_transaction();
+    t->rmkey("prefix", "key");
+    t->rmkey("cf2", "key2");
+    ASSERT_EQ(0, db->submit_transaction_sync(t));
+  }
+  fini();
+
+  init();
+  ASSERT_EQ(0, db->open(cout, cfs));
+  {
+    cout << "reopen db and read keys again." << std::endl;
+    bufferlist v1, v2, v3;
+    ASSERT_EQ(-ENOENT, db->get("prefix", "key", &v1));
+    ASSERT_EQ(0, db->get("cf1", "key", &v2));
+    ASSERT_EQ(0, _bl_to_str(v2) != "value");
+    ASSERT_EQ(-ENOENT, db->get("cf2", "key2", &v3));
+  }
+  fini();
+}
+
+TEST_P(KVTest, RocksDBIteratorTest) {
+  if(string(GetParam()) != "rocksdb")
+    return;
+
+  std::vector<KeyValueDB::ColumnFamily> cfs;
+  cfs.push_back(KeyValueDB::ColumnFamily("cf1", ""));
+  ASSERT_EQ(0, db->init(g_conf->bluestore_rocksdb_options));
+  cout << "creating one column family and opening it" << std::endl;
+  ASSERT_EQ(0, db->create_and_open(cout, cfs));
+  {
+    KeyValueDB::Transaction t = db->get_transaction();
+    bufferlist bl1;
+    bl1.append("hello");
+    bufferlist bl2;
+    bl2.append("world");
+    cout << "write some kv pairs into default and new CFs" << std::endl;
+    t->set("prefix", "key1", bl1);
+    t->set("prefix", "key2", bl2);
+    t->set("cf1", "key1", bl1);
+    t->set("cf1", "key2", bl2);
+    ASSERT_EQ(0, db->submit_transaction_sync(t));
+  }
+  {
+    cout << "iterating the default CF" << std::endl;
+    KeyValueDB::Iterator iter = db->get_iterator("prefix");
+    iter->seek_to_first();
+    ASSERT_EQ(1, iter->valid());
+    ASSERT_EQ("key1", iter->key());
+    ASSERT_EQ("hello", _bl_to_str(iter->value()));
+    ASSERT_EQ(0, iter->next());
+    ASSERT_EQ(1, iter->valid());
+    ASSERT_EQ("key2", iter->key());
+    ASSERT_EQ("world", _bl_to_str(iter->value()));
+  }
+  {
+    cout << "iterating the new CF" << std::endl;
+    KeyValueDB::Iterator iter = db->get_iterator("cf1");
+    iter->seek_to_first();
+    ASSERT_EQ(1, iter->valid());
+    ASSERT_EQ("key1", iter->key());
+    ASSERT_EQ("hello", _bl_to_str(iter->value()));
+    ASSERT_EQ(0, iter->next());
+    ASSERT_EQ(1, iter->valid());
+    ASSERT_EQ("key2", iter->key());
+    ASSERT_EQ("world", _bl_to_str(iter->value()));
+  }
+  fini();
+}
+
+TEST_P(KVTest, RocksDBCFMerge) {
+  if(string(GetParam()) != "rocksdb")
+    return;
+
+  shared_ptr<KeyValueDB::MergeOperator> p(new AppendMOP);
+  int r = db->set_merge_operator("cf1",p);
+  if (r < 0)
+    return; // No merge operators for this database type
+  std::vector<KeyValueDB::ColumnFamily> cfs;
+  cfs.push_back(KeyValueDB::ColumnFamily("cf1", ""));
+  ASSERT_EQ(0, db->init(g_conf->bluestore_rocksdb_options));
+  cout << "creating one column family and opening it" << std::endl;
+  ASSERT_EQ(0, db->create_and_open(cout, cfs));
+
+  {
+    KeyValueDB::Transaction t = db->get_transaction();
+    bufferlist v1, v2, v3;
+    v1.append(string("1"));
+    v2.append(string("2"));
+    v3.append(string("3"));
+    t->set("P", "K1", v1);
+    t->set("cf1", "A1", v2);
+    t->rmkey("cf1", "A2");
+    t->merge("cf1", "A2", v3);
+    db->submit_transaction_sync(t);
+  }
+  {
+    bufferlist v1, v2, v3;
+    ASSERT_EQ(0, db->get("P", "K1", &v1));
+    ASSERT_EQ(tostr(v1), "1");
+    ASSERT_EQ(0, db->get("cf1", "A1", &v2));
+    ASSERT_EQ(tostr(v2), "2");
+    ASSERT_EQ(0, db->get("cf1", "A2", &v3));
+    ASSERT_EQ(tostr(v3), "?3");
+  }
+  {
+    KeyValueDB::Transaction t = db->get_transaction();
+    bufferlist v1;
+    v1.append(string("1"));
+    t->merge("cf1", "A2", v1);
+    db->submit_transaction_sync(t);
+  }
+  {
+    bufferlist v;
+    ASSERT_EQ(0, db->get("cf1", "A2", &v));
+    ASSERT_EQ(tostr(v), "?31");
+  }
+  fini();
+}
 
 INSTANTIATE_TEST_CASE_P(
   KeyValueDB,