]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
kv/RocksDBStore: simplify CF merge operator routing
authorSage Weil <sage@redhat.com>
Mon, 2 Oct 2017 18:35:07 +0000 (13:35 -0500)
committerSage Weil <sage@redhat.com>
Thu, 5 Oct 2017 03:01:02 +0000 (22:01 -0500)
Merge operators always map to a prefix, just like a CF, so we can link
directly to the operator without doing any lookup.

Signed-off-by: Sage Weil <sage@redhat.com>
src/kv/RocksDBStore.cc
src/kv/RocksDBStore.h

index 35d312d9b5f53c3b14cfd16b0c0ee4136d531eab..d4d9c45845bd1045e37cbd1bbd1721bcccd8cc0b 100644 (file)
@@ -52,16 +52,16 @@ static rocksdb::SliceParts prepare_sliceparts(const bufferlist &bl,
   return rocksdb::SliceParts(slices->data(), slices->size());
 }
 
+
 //
-// One of these per rocksdb column family(includes the default CF),
-// implements the merge operator prefix stuff
+// One of these for the default rocksdb column family, routing each prefix
+// to the appropriate MergeOperator.
 //
-class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperator {
+class RocksDBStore::MergeOperatorRouter
+  : public rocksdb::AssociativeMergeOperator
+{
   RocksDBStore& store;
-  //name of the column family associated with this merge operator
-  //only for explicit CF, not for the default CF
-  std::string cf_name;
-  public:
+public:
   const char *Name() const override {
     // Construct a name that rocksDB will validate against. We want to
     // do this in a way that doesn't constrain the ordering of calls
@@ -69,18 +69,12 @@ class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperat
     // construct a name from all of those parts.
     store.assoc_name.clear();
     map<std::string,std::string> names;
-    if (cf_name.empty()) {
-      //for default column family
-      for (auto& p : store.merge_ops) names[p.first] = p.second->name();
-      for (auto& p : store.cf_handles) names.erase(p.first);
-    } else {
-      //for user created explicit column family
-      for (auto& p : store.merge_ops) {
-       if (p.first.compare(cf_name) == 0) {
-         names[cf_name] = p.second->name();
-         break;
-       }
-      }
+
+    for (auto& p : store.merge_ops) {
+      names[p.first] = p.second->name();
+    }
+    for (auto& p : store.cf_handles) {
+      names.erase(p.first);
     }
     for (auto& p : names) {
       store.assoc_name += '.';
@@ -91,55 +85,67 @@ class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperat
     return store.assoc_name.c_str();
   }
 
-  //for default column family
   MergeOperatorRouter(RocksDBStore &_store) : store(_store) {}
-  //for user created explicit CF
-  MergeOperatorRouter(RocksDBStore &_store, const std::string &cf)
-    : store(_store), cf_name(cf) {}
 
   bool Merge(const rocksdb::Slice& key,
-                     const rocksdb::Slice* existing_value,
-                     const rocksdb::Slice& value,
-                     std::string* new_value,
-                     rocksdb::Logger* logger) const override {
-    if (cf_name.empty()) {
-      // for default column family
-      // extract prefix from key and compare against each registered merge op;
-      // even though merge operator for explicit CF is included in merge_ops,
-      // it won't be picked up, since it won't match.
-      for (auto& p : store.merge_ops) {
-       if (p.first.compare(0, p.first.length(),
-                           key.data(), p.first.length()) == 0 &&
-           key.data()[p.first.length()] == 0) {
-         if (existing_value) {
-           p.second->merge(existing_value->data(), existing_value->size(),
-                           value.data(), value.size(),
-                           new_value);
-         } else {
-           p.second->merge_nonexistent(value.data(), value.size(), new_value);
-         }
-         break;
-       }
-      }
-    } else {
-      //for user created explicit column family
-      for (auto& p : store.merge_ops) {
-       if (p.first.compare(cf_name) == 0) {
-         if (existing_value) {
-           p.second->merge(existing_value->data(), existing_value->size(),
-                           value.data(), value.size(),
-                           new_value);
-         } else {
-           p.second->merge_nonexistent(value.data(), value.size(), new_value);
-         }
-         break;
+            const rocksdb::Slice* existing_value,
+            const rocksdb::Slice& value,
+            std::string* new_value,
+            rocksdb::Logger* logger) const override {
+    // for default column family
+    // extract prefix from key and compare against each registered merge op;
+    // even though merge operator for explicit CF is included in merge_ops,
+    // it won't be picked up, since it won't match.
+    for (auto& p : store.merge_ops) {
+      if (p.first.compare(0, p.first.length(),
+                         key.data(), p.first.length()) == 0 &&
+         key.data()[p.first.length()] == 0) {
+       if (existing_value) {
+         p.second->merge(existing_value->data(), existing_value->size(),
+                         value.data(), value.size(),
+                         new_value);
+       } else {
+         p.second->merge_nonexistent(value.data(), value.size(), new_value);
        }
+       break;
       }
     }
     return true; // OK :)
   }
 };
 
+//
+// One of these per non-default column family, linked directly to the
+// merge operator for that CF/prefix (if any).
+//
+class RocksDBStore::MergeOperatorLinker
+  : public rocksdb::AssociativeMergeOperator
+{
+private:
+  std::shared_ptr<KeyValueDB::MergeOperator> mop;
+public:
+  MergeOperatorLinker(std::shared_ptr<KeyValueDB::MergeOperator> o) : mop(o) {}
+
+  const char *Name() const override {
+    return mop->name().c_str();
+  }
+
+  bool Merge(const rocksdb::Slice& key,
+            const rocksdb::Slice* existing_value,
+            const rocksdb::Slice& value,
+            std::string* new_value,
+            rocksdb::Logger* logger) const override {
+    if (existing_value) {
+      mop->merge(existing_value->data(), existing_value->size(),
+                value.data(), value.size(),
+                new_value);
+    } else {
+      mop->merge_nonexistent(value.data(), value.size(), new_value);
+    }
+    return true;
+  }
+};
+
 int RocksDBStore::set_merge_operator(
   const string& prefix,
   std::shared_ptr<KeyValueDB::MergeOperator> mop)
@@ -289,20 +295,17 @@ int RocksDBStore::create_db_dir()
   return 0;
 }
 
-int RocksDBStore::install_cf_mergeop(const string &cf_name,
-                                 rocksdb::ColumnFamilyOptions *cf_opt)
+int RocksDBStore::install_cf_mergeop(
+  const string &cf_name,
+  rocksdb::ColumnFamilyOptions *cf_opt)
 {
   assert(cf_opt != nullptr);
-  bool found_mop = false;
-  for (auto &mop : merge_ops) {
-    if (mop.first.compare(cf_name) == 0) {
-      cf_opt->merge_operator.reset(new MergeOperatorRouter(*this, cf_name));
-      found_mop = true;
-      break;
+  cf_opt->merge_operator.reset();
+  for (auto& i : merge_ops) {
+    if (i.first == cf_name) {
+      cf_opt->merge_operator.reset(new MergeOperatorLinker(i.second));
     }
   }
-  if (!found_mop)
-      cf_opt->merge_operator.reset();
   return 0;
 }
 
index 22e733e1f90c8a87a45bda4b0d55aec7db6f27cf..031f8a6d2147b038a6ac19fde6c6d111a9fbd33d 100644 (file)
@@ -394,9 +394,11 @@ public:
   static string past_prefix(const string &prefix);
 
   class MergeOperatorRouter;
+  class MergeOperatorLinker;
   friend class MergeOperatorRouter;
-  int set_merge_operator(const std::string& prefix,
-                                std::shared_ptr<KeyValueDB::MergeOperator> mop) override;
+  int set_merge_operator(
+    const std::string& prefix,
+    std::shared_ptr<KeyValueDB::MergeOperator> mop) override;
   string assoc_name; ///< Name of associative operator
 
   uint64_t get_estimated_size(map<string,uint64_t> &extra) override {