]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kv/KeyValueDB: add merge operator interface and test
authorAllen Samuels <allen.samuels@sandisk.com>
Fri, 22 Apr 2016 20:39:09 +0000 (16:39 -0400)
committerSage Weil <sage@redhat.com>
Fri, 22 Apr 2016 20:41:46 +0000 (16:41 -0400)
Signed-off-by: Allen Samuels <allen.samuels@sandisk.com>
Signed-off-by: Sage Weil <sage@redhat.com>
src/kv/KeyValueDB.h
src/test/objectstore/test_kv.cc

index c7a83a253230330e0c9e8d36da5f6ab27eb55497..aef93a3eb3c422f155d8bc88004b9da83e70a22d 100644 (file)
@@ -93,6 +93,13 @@ public:
       const std::string &prefix ///< [in] Prefix by which to remove keys
       ) = 0;
 
+    /// Merge value into key
+    virtual void merge(
+      const std::string &prefix,   ///< [in] Prefix ==> MUST match some established merge operator
+      const std::string &key,      ///< [in] Key to be merged
+      const bufferlist  &value     ///< [in] value to be merged into key
+    ) { assert(0 == "Not implemented"); }
+
     virtual ~TransactionImpl() {}
   };
   typedef ceph::shared_ptr< TransactionImpl > Transaction;
@@ -277,7 +284,36 @@ public:
   virtual void compact_range_async(const std::string& prefix,
                                   const std::string& start, const std::string& end) {}
 
+  // See RocksDB merge operator definition, we support the basic
+  // associative merge only right now.
+  class MergeOperator {
+    public:
+    /// Merge into a key that doesn't exist
+    virtual void merge_nonexistant(
+      const char *rdata, size_t rlen,
+      std::string *new_value) = 0;
+    /// Merge into a key that does exist
+    virtual void merge(
+      const char *ldata, size_t llen,
+      const char *rdata, size_t rlen,
+      std::string *new_value) = 0;
+    /// We use each operator name and each prefix to construct the overall RocksDB operator name for consistency check at open time.
+    virtual string name() const = 0;
+
+    virtual ~MergeOperator() {}
+  };
+
+  /// Setup one or more operators, this needs to be done BEFORE the DB is opened.
+  virtual int set_merge_operator(const std::string& prefix,
+                                std::shared_ptr<MergeOperator> mop) {
+    return -EOPNOTSUPP;
+  }
+
 protected:
+  /// List of matching prefixes and merge operators
+  std::vector<std::pair<std::string,
+                       std::shared_ptr<MergeOperator> > > merge_ops;
+
   virtual WholeSpaceIterator _get_iterator() = 0;
   virtual WholeSpaceIterator _get_snapshot_iterator() = 0;
 };
index 06c45e0cb7e18e7d491eb9c46a15a2c6b8f0eeed..828411d2a354a3f1d29f978b1bdba86f4be1e9ed 100644 (file)
@@ -159,6 +159,70 @@ TEST_P(KVTest, BenchCommit) {
   fini();
 }
 
+struct AppendMOP : public KeyValueDB::MergeOperator {
+  virtual void merge_nonexistant(
+    const char *rdata, size_t rlen, std::string *new_value) override {
+    *new_value = "?" + std::string(rdata, rlen);
+  }
+  virtual void merge(
+    const char *ldata, size_t llen,
+    const char *rdata, size_t rlen,
+    std::string *new_value) {
+    *new_value = std::string(ldata, llen) + std::string(rdata, rlen);
+  }
+  // We use each operator name and each prefix to construct the
+  // overall RocksDB operator name for consistency check at open time.
+  virtual string name() const {
+    return "Append";
+  }
+};
+
+string tostr(bufferlist& b) {
+  return string(b.c_str(),b.length());
+}
+
+TEST_P(KVTest, Merge) {
+  shared_ptr<KeyValueDB::MergeOperator> p(new AppendMOP);
+  int r = db->set_merge_operator("A",p);
+  if (r < 0)
+    return; // No merge operators for this database type
+  ASSERT_EQ(0, db->create_and_open(cout));
+  {
+    KeyValueDB::Transaction t = db->get_transaction();
+    bufferlist v1, v2, v3;
+    v1.append(string("1"));
+    v2.append(string("2"));
+    v3.append(string("3"));
+    t->set("P", "K1", v1);
+    t->set("A", "A1", v2);
+    t->rmkey("A", "A2");
+    t->merge("A", "A2", v3);
+    db->submit_transaction_sync(t);
+  }
+  {
+    bufferlist v1, v2, v3;
+    ASSERT_EQ(0, db->get("P", "K1", &v1));
+    ASSERT_EQ(tostr(v1), "1");
+    ASSERT_EQ(0, db->get("A", "A1", &v2));
+    ASSERT_EQ(tostr(v2), "2");
+    ASSERT_EQ(0, db->get("A", "A2", &v3));
+    ASSERT_EQ(tostr(v3), "?3");
+  }
+  {
+    KeyValueDB::Transaction t = db->get_transaction();
+    bufferlist v1;
+    v1.append(string("1"));
+    t->merge("A", "A2", v1);
+    db->submit_transaction_sync(t);
+  }
+  {
+    bufferlist v;
+    ASSERT_EQ(0, db->get("A", "A2", &v));
+    ASSERT_EQ(tostr(v), "?31");
+  }
+  fini();
+}
+
 
 INSTANTIATE_TEST_CASE_P(
   KeyValueDB,