]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Make MergeOperator+CompactionFilter/Factory into Customizable Classes (#8481)
authormrambacher <mrambach@gmail.com>
Fri, 6 Aug 2021 15:26:23 +0000 (08:26 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Fri, 6 Aug 2021 15:27:25 +0000 (08:27 -0700)
Summary:
- Changed MergeOperator, CompactionFilter, and CompactionFilterFactory into Customizable classes.
 - Added Options/Configurable/Object Registration for TTL and Cassandra variants
 - Changed the StringAppend MergeOperators to accept a string delimiter rather than a simple char.  Made the delimiter into a configurable option
 - Added tests for new functionality

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8481

Reviewed By: zhichao-cao

Differential Revision: D30136050

Pulled By: mrambacher

fbshipit-source-id: 271d1772835935b6773abaf018ee71e42f9491af

39 files changed:
CMakeLists.txt
TARGETS
db/compaction/compaction_service_test.cc
include/rocksdb/compaction_filter.h
include/rocksdb/customizable.h
include/rocksdb/merge_operator.h
include/rocksdb/utilities/options_type.h
options/cf_options.cc
options/customizable_test.cc
options/options_helper.cc
options/options_test.cc
src.mk
test_util/testutil.h
tools/db_bench_tool.cc
utilities/cassandra/cassandra_compaction_filter.cc
utilities/cassandra/cassandra_compaction_filter.h
utilities/cassandra/cassandra_functional_test.cc
utilities/cassandra/cassandra_options.h [new file with mode: 0644]
utilities/cassandra/merge_operator.cc
utilities/cassandra/merge_operator.h
utilities/compaction_filters.cc [new file with mode: 0644]
utilities/compaction_filters/layered_compaction_filter_base.h
utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc
utilities/compaction_filters/remove_emptyvalue_compactionfilter.h
utilities/merge_operators.cc [new file with mode: 0644]
utilities/merge_operators.h
utilities/merge_operators/bytesxor.h
utilities/merge_operators/max.cc
utilities/merge_operators/put.cc
utilities/merge_operators/sortlist.cc
utilities/merge_operators/sortlist.h
utilities/merge_operators/string_append/stringappend.cc
utilities/merge_operators/string_append/stringappend.h
utilities/merge_operators/string_append/stringappend2.cc
utilities/merge_operators/string_append/stringappend2.h
utilities/merge_operators/uint64add.cc
utilities/ttl/db_ttl_impl.cc
utilities/ttl/db_ttl_impl.h
utilities/ttl/ttl_test.cc

index 7b587f54c5c9d7bdb62e4805d7d5e60bd70089c8..43b3f62752760cbbf929775cff4ba84bb06d1283 100644 (file)
@@ -849,6 +849,7 @@ set(SOURCES
         utilities/cassandra/format.cc
         utilities/cassandra/merge_operator.cc
         utilities/checkpoint/checkpoint_impl.cc
+        utilities/compaction_filters.cc
         utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc
         utilities/debug.cc
         utilities/env_mirror.cc
@@ -857,6 +858,7 @@ set(SOURCES
         utilities/fault_injection_fs.cc
         utilities/leveldb_options/leveldb_options.cc
         utilities/memory/memory_util.cc
+        utilities/merge_operators.cc
         utilities/merge_operators/bytesxor.cc
         utilities/merge_operators/max.cc
         utilities/merge_operators/put.cc
diff --git a/TARGETS b/TARGETS
index c73bd84ab5c6400e85b164d25fa9de337789aa14..d96845938a362a440c7e29759e2c90f19bbaaf73 100644 (file)
--- a/TARGETS
+++ b/TARGETS
@@ -368,6 +368,7 @@ cpp_library(
         "utilities/cassandra/format.cc",
         "utilities/cassandra/merge_operator.cc",
         "utilities/checkpoint/checkpoint_impl.cc",
+        "utilities/compaction_filters.cc",
         "utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc",
         "utilities/convenience/info_log_finder.cc",
         "utilities/debug.cc",
@@ -377,6 +378,7 @@ cpp_library(
         "utilities/fault_injection_fs.cc",
         "utilities/leveldb_options/leveldb_options.cc",
         "utilities/memory/memory_util.cc",
+        "utilities/merge_operators.cc",
         "utilities/merge_operators/bytesxor.cc",
         "utilities/merge_operators/max.cc",
         "utilities/merge_operators/put.cc",
@@ -681,6 +683,7 @@ cpp_library(
         "utilities/cassandra/format.cc",
         "utilities/cassandra/merge_operator.cc",
         "utilities/checkpoint/checkpoint_impl.cc",
+        "utilities/compaction_filters.cc",
         "utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc",
         "utilities/convenience/info_log_finder.cc",
         "utilities/debug.cc",
@@ -690,6 +693,7 @@ cpp_library(
         "utilities/fault_injection_fs.cc",
         "utilities/leveldb_options/leveldb_options.cc",
         "utilities/memory/memory_util.cc",
+        "utilities/merge_operators.cc",
         "utilities/merge_operators/bytesxor.cc",
         "utilities/merge_operators/max.cc",
         "utilities/merge_operators/put.cc",
index 9128bba9853c107b8ceec120558783e2cad94842..e20c6f865664ced24f38fb84a3d7ddb931197fd8 100644 (file)
@@ -337,8 +337,9 @@ class PartialDeleteCompactionFilter : public CompactionFilter {
 TEST_F(CompactionServiceTest, CompactionFilter) {
   Options options = CurrentOptions();
   options.env = env_;
-  auto delete_comp_filter = PartialDeleteCompactionFilter();
-  options.compaction_filter = &delete_comp_filter;
+  std::unique_ptr<CompactionFilter> delete_comp_filter(
+      new PartialDeleteCompactionFilter());
+  options.compaction_filter = delete_comp_filter.get();
   options.compaction_service =
       std::make_shared<MyTestCompactionService>(dbname_, options);
 
index 14515976ec5baeec8b6ff853f16876d7992549b4..400f3388e0751011b4680daa21f5fdbc951a6191 100644 (file)
@@ -13,6 +13,7 @@
 #include <string>
 #include <vector>
 
+#include "rocksdb/customizable.h"
 #include "rocksdb/rocksdb_namespace.h"
 #include "rocksdb/types.h"
 
@@ -24,7 +25,7 @@ class SliceTransform;
 // CompactionFilter allows an application to modify/delete a key-value during
 // table file creation.
 
-class CompactionFilter {
+class CompactionFilter : public Customizable {
  public:
   enum ValueType {
     kValue,
@@ -59,6 +60,10 @@ class CompactionFilter {
   };
 
   virtual ~CompactionFilter() {}
+  static const char* Type() { return "CompactionFilter"; }
+  static Status CreateFromString(const ConfigOptions& config_options,
+                                 const std::string& name,
+                                 const CompactionFilter** result);
 
   // The table file creation process invokes this method before adding a kv to
   // the table file. A return value of false indicates that the kv should be
@@ -193,7 +198,7 @@ class CompactionFilter {
 
   // Returns a name that identifies this `CompactionFilter`.
   // The name will be printed to LOG file on start up for diagnosis.
-  virtual const char* Name() const = 0;
+  const char* Name() const override = 0;
 
   // Internal (BlobDB) use only. Do not override in application code.
   virtual bool IsStackedBlobDbInternalCompactionFilter() const { return false; }
@@ -214,9 +219,13 @@ class CompactionFilter {
 // `CompactionFilter` according to `ShouldFilterTableFileCreation()`. This
 // allows the application to know about the different ongoing threads of work
 // and makes it unnecessary for `CompactionFilter` to provide thread-safety.
-class CompactionFilterFactory {
+class CompactionFilterFactory : public Customizable {
  public:
   virtual ~CompactionFilterFactory() {}
+  static const char* Type() { return "CompactionFilterFactory"; }
+  static Status CreateFromString(
+      const ConfigOptions& config_options, const std::string& name,
+      std::shared_ptr<CompactionFilterFactory>* result);
 
   // Returns whether a thread creating table files for the specified `reason`
   // should invoke `CreateCompactionFilter()` and pass KVs through the returned
index c04e71360dd142d2b26c94b0c1d687c232d8e155..b168f472e5930ab8d03e7b7ff0bd1cc1f5526761 100644 (file)
@@ -87,7 +87,18 @@ class Customizable : public Configurable {
   // @param name The name of the instance to find.
   // Returns true if the class is an instance of the input name.
   virtual bool IsInstanceOf(const std::string& name) const {
-    return name == Name();
+    if (name.empty()) {
+      return false;
+    } else if (name == Name()) {
+      return true;
+    } else {
+      const char* nickname = NickName();
+      if (nickname != nullptr && name == nickname) {
+        return true;
+      } else {
+        return false;
+      }
+    }
   }
 
   // Returns the named instance of the Customizable as a T*, or nullptr if not
@@ -179,6 +190,10 @@ class Customizable : public Configurable {
   virtual const Customizable* Inner() const { return nullptr; }
 
  protected:
+  // Some classes have both a class name (e.g. PutOperator) and a nickname
+  // (e.g. put). Classes can override this method to return a
+  // nickname.  Nicknames can be used by InstanceOf and object creation.
+  virtual const char* NickName() const { return ""; }
   //  Given a name (e.g. rocksdb.my.type.opt), returns the short name (opt)
   std::string GetOptionName(const std::string& long_name) const override;
 #ifndef ROCKSDB_LITE
index 0b04ec3102b13cb8a27bd7f72a0ed1d28f7dd18e..68065454aa9fc1e93b3f88a6226f8f7b2307aa76 100644 (file)
@@ -10,6 +10,7 @@
 #include <string>
 #include <vector>
 
+#include "rocksdb/customizable.h"
 #include "rocksdb/slice.h"
 
 namespace ROCKSDB_NAMESPACE {
@@ -43,10 +44,13 @@ class Logger;
 //
 // Refer to rocksdb-merge wiki for more details and example implementations.
 //
-class MergeOperator {
+class MergeOperator : public Customizable {
  public:
   virtual ~MergeOperator() {}
   static const char* Type() { return "MergeOperator"; }
+  static Status CreateFromString(const ConfigOptions& opts,
+                                 const std::string& id,
+                                 std::shared_ptr<MergeOperator>* result);
 
   // Gives the client a way to express the read -> modify -> write semantics
   // key:      (IN)    The key that's associated with this merge operation.
index a4e36752bd6800cb0b647cbb9efd230aeca6a4e7..ac3bfcba2558f3625e956cdaa550bae2b4162c8b 100644 (file)
@@ -35,10 +35,7 @@ enum class OptionType {
   kCompactionPri,
   kSliceTransform,
   kCompressionType,
-  kCompactionFilter,
-  kCompactionFilterFactory,
   kCompactionStopStyle,
-  kMergeOperator,
   kMemTableRepFactory,
   kFilterPolicy,
   kChecksumType,
index 89b6ed43f4a771bdc29a68571a99a93b6878ddfc..830f820ef82c3d19a5105f7ae5895688afec82db 100644 (file)
@@ -15,6 +15,7 @@
 #include "options/options_helper.h"
 #include "options/options_parser.h"
 #include "port/port.h"
+#include "rocksdb/compaction_filter.h"
 #include "rocksdb/concurrent_task_limiter.h"
 #include "rocksdb/configurable.h"
 #include "rocksdb/convenience.h"
@@ -656,30 +657,18 @@ static std::unordered_map<std::string, OptionTypeInfo>
             }
           }}},
         {"compaction_filter",
-         {offset_of(&ImmutableCFOptions::compaction_filter),
-          OptionType::kCompactionFilter, OptionVerificationType::kByName,
-          OptionTypeFlags::kNone}},
+         OptionTypeInfo::AsCustomRawPtr<const CompactionFilter>(
+             offset_of(&ImmutableCFOptions::compaction_filter),
+             OptionVerificationType::kByName, OptionTypeFlags::kAllowNull)},
         {"compaction_filter_factory",
-         {offset_of(&ImmutableCFOptions::compaction_filter_factory),
-          OptionType::kCompactionFilterFactory, OptionVerificationType::kByName,
-          OptionTypeFlags::kNone}},
+         OptionTypeInfo::AsCustomSharedPtr<CompactionFilterFactory>(
+             offset_of(&ImmutableCFOptions::compaction_filter_factory),
+             OptionVerificationType::kByName, OptionTypeFlags::kAllowNull)},
         {"merge_operator",
-         {offset_of(&ImmutableCFOptions::merge_operator),
-          OptionType::kMergeOperator,
-          OptionVerificationType::kByNameAllowFromNull,
-          OptionTypeFlags::kCompareLoose,
-          // Parses the input value as a MergeOperator, updating the value
-          [](const ConfigOptions& opts, const std::string& /*name*/,
-             const std::string& value, void* addr) {
-            auto mop = static_cast<std::shared_ptr<MergeOperator>*>(addr);
-            Status status =
-                opts.registry->NewSharedObject<MergeOperator>(value, mop);
-            // Only support static comparator for now.
-            if (status.ok()) {
-              return status;
-            }
-            return Status::OK();
-          }}},
+         OptionTypeInfo::AsCustomSharedPtr<MergeOperator>(
+             offset_of(&ImmutableCFOptions::merge_operator),
+             OptionVerificationType::kByNameAllowFromNull,
+             OptionTypeFlags::kCompareLoose | OptionTypeFlags::kAllowNull)},
         {"compaction_style",
          {offset_of(&ImmutableCFOptions::compaction_style),
           OptionType::kCompactionStyle, OptionVerificationType::kNormal,
index 814bb686db7978520c3971cfebc980dc2ce96646..5eae06175f9485ccaa5265dd0ca2891eaacc57cb 100644 (file)
@@ -29,6 +29,7 @@
 #include "test_util/testharness.h"
 #include "test_util/testutil.h"
 #include "util/string_util.h"
+#include "utilities/compaction_filters/remove_emptyvalue_compactionfilter.h"
 
 #ifndef GFLAGS
 bool FLAGS_enable_print = false;
@@ -944,6 +945,26 @@ static int RegisterTestObjects(ObjectLibrary& library,
         static test::SimpleSuffixReverseComparator ssrc;
         return &ssrc;
       });
+  library.Register<MergeOperator>(
+      "Changling",
+      [](const std::string& uri, std::unique_ptr<MergeOperator>* guard,
+         std::string* /* errmsg */) {
+        guard->reset(new test::ChanglingMergeOperator(uri));
+        return guard->get();
+      });
+  library.Register<CompactionFilter>(
+      "Changling",
+      [](const std::string& uri, std::unique_ptr<CompactionFilter>* /*guard*/,
+         std::string* /* errmsg */) {
+        return new test::ChanglingCompactionFilter(uri);
+      });
+  library.Register<CompactionFilterFactory>(
+      "Changling", [](const std::string& uri,
+                      std::unique_ptr<CompactionFilterFactory>* guard,
+                      std::string* /* errmsg */) {
+        guard->reset(new test::ChanglingCompactionFilterFactory(uri));
+        return guard->get();
+      });
 
   return static_cast<int>(library.GetFactoryCount(&num_types));
 }
@@ -1136,6 +1157,58 @@ TEST_F(LoadCustomizableTest, LoadComparatorTest) {
   }
 }
 
+TEST_F(LoadCustomizableTest, LoadMergeOperatorTest) {
+  std::shared_ptr<MergeOperator> result;
+
+  ASSERT_NOK(
+      MergeOperator::CreateFromString(config_options_, "Changling", &result));
+  ASSERT_OK(MergeOperator::CreateFromString(config_options_, "put", &result));
+  ASSERT_NE(result, nullptr);
+  ASSERT_STREQ(result->Name(), "PutOperator");
+  if (RegisterTests("Test")) {
+    ASSERT_OK(
+        MergeOperator::CreateFromString(config_options_, "Changling", &result));
+    ASSERT_NE(result, nullptr);
+    ASSERT_STREQ(result->Name(), "ChanglingMergeOperator");
+  }
+}
+
+TEST_F(LoadCustomizableTest, LoadCompactionFilterFactoryTest) {
+  std::shared_ptr<CompactionFilterFactory> result;
+
+  ASSERT_NOK(CompactionFilterFactory::CreateFromString(config_options_,
+                                                       "Changling", &result));
+  if (RegisterTests("Test")) {
+    ASSERT_OK(CompactionFilterFactory::CreateFromString(config_options_,
+                                                        "Changling", &result));
+    ASSERT_NE(result, nullptr);
+    ASSERT_STREQ(result->Name(), "ChanglingCompactionFilterFactory");
+  }
+}
+
+TEST_F(LoadCustomizableTest, LoadCompactionFilterTest) {
+  const CompactionFilter* result = nullptr;
+
+  ASSERT_NOK(CompactionFilter::CreateFromString(config_options_, "Changling",
+                                                &result));
+#ifndef ROCKSDB_LITE
+  ASSERT_OK(CompactionFilter::CreateFromString(
+      config_options_, RemoveEmptyValueCompactionFilter::kClassName(),
+      &result));
+  ASSERT_NE(result, nullptr);
+  ASSERT_STREQ(result->Name(), RemoveEmptyValueCompactionFilter::kClassName());
+  delete result;
+  result = nullptr;
+  if (RegisterTests("Test")) {
+    ASSERT_OK(CompactionFilter::CreateFromString(config_options_, "Changling",
+                                                 &result));
+    ASSERT_NE(result, nullptr);
+    ASSERT_STREQ(result->Name(), "ChanglingCompactionFilter");
+    delete result;
+  }
+#endif  // ROCKSDB_LITE
+}
+
 #ifndef ROCKSDB_LITE
 TEST_F(LoadCustomizableTest, LoadEventListenerTest) {
   std::shared_ptr<EventListener> result;
index c11984ff9ce2c3334c06f933156d68ebb27937b5..157537923c914cf0efa3918304ecf01e870c570f 100644 (file)
@@ -562,32 +562,12 @@ bool SerializeSingleOptionHelper(const void* opt_address,
                                           : kNullptrString;
       break;
     }
-    case OptionType::kCompactionFilter: {
-      // it's a const pointer of const CompactionFilter*
-      const auto* ptr =
-          static_cast<const CompactionFilter* const*>(opt_address);
-      *value = *ptr ? (*ptr)->Name() : kNullptrString;
-      break;
-    }
-    case OptionType::kCompactionFilterFactory: {
-      const auto* ptr =
-          static_cast<const std::shared_ptr<CompactionFilterFactory>*>(
-              opt_address);
-      *value = ptr->get() ? ptr->get()->Name() : kNullptrString;
-      break;
-    }
     case OptionType::kMemTableRepFactory: {
       const auto* ptr =
           static_cast<const std::shared_ptr<MemTableRepFactory>*>(opt_address);
       *value = ptr->get() ? ptr->get()->Name() : kNullptrString;
       break;
     }
-    case OptionType::kMergeOperator: {
-      const auto* ptr =
-          static_cast<const std::shared_ptr<MergeOperator>*>(opt_address);
-      *value = ptr->get() ? ptr->get()->Name() : kNullptrString;
-      break;
-    }
     case OptionType::kFilterPolicy: {
       const auto* ptr =
           static_cast<const std::shared_ptr<FilterPolicy>*>(opt_address);
index 0d657049486d9ae85253dad06b601027c895cba1..3905a957761d7e1a9603abb5344c037e519c8108 100644 (file)
@@ -30,6 +30,9 @@
 #include "util/stderr_logger.h"
 #include "util/string_util.h"
 #include "utilities/merge_operators/bytesxor.h"
+#include "utilities/merge_operators/sortlist.h"
+#include "utilities/merge_operators/string_append/stringappend.h"
+#include "utilities/merge_operators/string_append/stringappend2.h"
 
 #ifndef GFLAGS
 bool FLAGS_enable_print = false;
@@ -395,13 +398,6 @@ TEST_F(OptionsTest, GetColumnFamilyOptionsFromStringTest) {
   // MergeOperator from object registry
   std::unique_ptr<BytesXOROperator> bxo(new BytesXOROperator());
   std::string kMoName = bxo->Name();
-  ObjectLibrary::Default()->Register<MergeOperator>(
-      kMoName,
-      [](const std::string& /*name*/, std::unique_ptr<MergeOperator>* guard,
-         std::string* /* errmsg */) {
-        guard->reset(new BytesXOROperator());
-        return guard->get();
-      });
 
   ASSERT_OK(GetColumnFamilyOptionsFromString(config_options, base_cf_opt,
                                              "merge_operator=" + kMoName + ";",
@@ -2281,14 +2277,6 @@ TEST_F(OptionsOldApiTest, GetColumnFamilyOptionsFromStringTest) {
   // MergeOperator from object registry
   std::unique_ptr<BytesXOROperator> bxo(new BytesXOROperator());
   std::string kMoName = bxo->Name();
-  ObjectLibrary::Default()->Register<MergeOperator>(
-      kMoName,
-      [](const std::string& /*name*/, std::unique_ptr<MergeOperator>* guard,
-         std::string* /* errmsg */) {
-        guard->reset(new BytesXOROperator());
-        return guard->get();
-      });
-
   ASSERT_OK(GetColumnFamilyOptionsFromString(
       base_cf_opt, "merge_operator=" + kMoName + ";", &new_cf_opt));
   ASSERT_EQ(kMoName, std::string(new_cf_opt.merge_operator->Name()));
@@ -3096,8 +3084,8 @@ void VerifyCFPointerTypedOptions(
 
   // change the name of merge operator back-and-forth
   {
-    auto* merge_operator = dynamic_cast<test::ChanglingMergeOperator*>(
-        base_cf_opt->merge_operator.get());
+    auto* merge_operator = base_cf_opt->merge_operator
+                               ->CheckedCast<test::ChanglingMergeOperator>();
     if (merge_operator != nullptr) {
       name_buffer = merge_operator->Name();
       // change the name  and expect non-ok status
@@ -3114,8 +3102,8 @@ void VerifyCFPointerTypedOptions(
   // change the name of the compaction filter factory back-and-forth
   {
     auto* compaction_filter_factory =
-        dynamic_cast<test::ChanglingCompactionFilterFactory*>(
-            base_cf_opt->compaction_filter_factory.get());
+        base_cf_opt->compaction_filter_factory
+            ->CheckedCast<test::ChanglingCompactionFilterFactory>();
     if (compaction_filter_factory != nullptr) {
       name_buffer = compaction_filter_factory->Name();
       // change the name and expect non-ok status
@@ -4186,6 +4174,90 @@ TEST_F(ConfigOptionsTest, EnvFromConfigOptions) {
 
   delete mem_env;
 }
+TEST_F(ConfigOptionsTest, MergeOperatorFromString) {
+  ConfigOptions config_options;
+  std::shared_ptr<MergeOperator> merge_op;
+
+  ASSERT_OK(MergeOperator::CreateFromString(config_options, "put", &merge_op));
+  ASSERT_NE(merge_op, nullptr);
+  ASSERT_TRUE(merge_op->IsInstanceOf("put"));
+  ASSERT_STREQ(merge_op->Name(), "PutOperator");
+
+  ASSERT_OK(
+      MergeOperator::CreateFromString(config_options, "put_v1", &merge_op));
+  ASSERT_NE(merge_op, nullptr);
+  ASSERT_TRUE(merge_op->IsInstanceOf("PutOperator"));
+
+  ASSERT_OK(
+      MergeOperator::CreateFromString(config_options, "uint64add", &merge_op));
+  ASSERT_NE(merge_op, nullptr);
+  ASSERT_TRUE(merge_op->IsInstanceOf("uint64add"));
+  ASSERT_STREQ(merge_op->Name(), "UInt64AddOperator");
+
+  ASSERT_OK(MergeOperator::CreateFromString(config_options, "max", &merge_op));
+  ASSERT_NE(merge_op, nullptr);
+  ASSERT_TRUE(merge_op->IsInstanceOf("max"));
+  ASSERT_STREQ(merge_op->Name(), "MaxOperator");
+
+  ASSERT_OK(
+      MergeOperator::CreateFromString(config_options, "bytesxor", &merge_op));
+  ASSERT_NE(merge_op, nullptr);
+  ASSERT_TRUE(merge_op->IsInstanceOf("bytesxor"));
+  ASSERT_STREQ(merge_op->Name(), BytesXOROperator::kClassName());
+
+  ASSERT_OK(
+      MergeOperator::CreateFromString(config_options, "sortlist", &merge_op));
+  ASSERT_NE(merge_op, nullptr);
+  ASSERT_TRUE(merge_op->IsInstanceOf("sortlist"));
+  ASSERT_STREQ(merge_op->Name(), SortList::kClassName());
+
+  ASSERT_OK(MergeOperator::CreateFromString(config_options, "stringappend",
+                                            &merge_op));
+  ASSERT_NE(merge_op, nullptr);
+  ASSERT_TRUE(merge_op->IsInstanceOf("stringappend"));
+  ASSERT_STREQ(merge_op->Name(), StringAppendOperator::kClassName());
+  auto delimiter = merge_op->GetOptions<std::string>("Delimiter");
+  ASSERT_NE(delimiter, nullptr);
+  ASSERT_EQ(*delimiter, ",");
+
+  ASSERT_OK(MergeOperator::CreateFromString(config_options, "stringappendtest",
+                                            &merge_op));
+  ASSERT_NE(merge_op, nullptr);
+  ASSERT_TRUE(merge_op->IsInstanceOf("stringappendtest"));
+  ASSERT_STREQ(merge_op->Name(), StringAppendTESTOperator::kClassName());
+  delimiter = merge_op->GetOptions<std::string>("Delimiter");
+  ASSERT_NE(delimiter, nullptr);
+  ASSERT_EQ(*delimiter, ",");
+
+  ASSERT_OK(MergeOperator::CreateFromString(
+      config_options, "id=stringappend; delimiter=||", &merge_op));
+  ASSERT_NE(merge_op, nullptr);
+  ASSERT_TRUE(merge_op->IsInstanceOf("stringappend"));
+  ASSERT_STREQ(merge_op->Name(), StringAppendOperator::kClassName());
+  delimiter = merge_op->GetOptions<std::string>("Delimiter");
+  ASSERT_NE(delimiter, nullptr);
+  ASSERT_EQ(*delimiter, "||");
+
+  ASSERT_OK(MergeOperator::CreateFromString(
+      config_options, "id=stringappendtest; delimiter=&&", &merge_op));
+  ASSERT_NE(merge_op, nullptr);
+  ASSERT_TRUE(merge_op->IsInstanceOf("stringappendtest"));
+  ASSERT_STREQ(merge_op->Name(), StringAppendTESTOperator::kClassName());
+  delimiter = merge_op->GetOptions<std::string>("Delimiter");
+  ASSERT_NE(delimiter, nullptr);
+  ASSERT_EQ(*delimiter, "&&");
+
+  std::shared_ptr<MergeOperator> copy;
+  std::string mismatch;
+  std::string opts_str = merge_op->ToString(config_options);
+
+  ASSERT_OK(MergeOperator::CreateFromString(config_options, opts_str, &copy));
+  ASSERT_TRUE(merge_op->AreEquivalent(config_options, copy.get(), &mismatch));
+  ASSERT_NE(copy, nullptr);
+  delimiter = copy->GetOptions<std::string>("Delimiter");
+  ASSERT_NE(delimiter, nullptr);
+  ASSERT_EQ(*delimiter, "&&");
+}
 #endif  // !ROCKSDB_LITE
 }  // namespace ROCKSDB_NAMESPACE
 
diff --git a/src.mk b/src.mk
index 2a36f95073db31e96690276190134506301a884d..05318171519e2f0cc019c4c12cbbf3f562fd38ec 100644 (file)
--- a/src.mk
+++ b/src.mk
@@ -232,6 +232,7 @@ LIB_SOURCES =                                                   \
   utilities/cassandra/format.cc                                 \
   utilities/cassandra/merge_operator.cc                         \
   utilities/checkpoint/checkpoint_impl.cc                       \
+  utilities/compaction_filters.cc                               \
   utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc    \
   utilities/convenience/info_log_finder.cc                      \
   utilities/debug.cc                                            \
@@ -241,6 +242,7 @@ LIB_SOURCES =                                                   \
   utilities/fault_injection_fs.cc                               \
   utilities/leveldb_options/leveldb_options.cc                  \
   utilities/memory/memory_util.cc                               \
+  utilities/merge_operators.cc                                  \
   utilities/merge_operators/max.cc                              \
   utilities/merge_operators/put.cc                              \
   utilities/merge_operators/sortlist.cc                         \
index c1dc09b0c4c05ed8f8f5d18d385f007dc2d6370c..e688b61c639129c0b6212009c63361bffa146b3f 100644 (file)
@@ -772,6 +772,15 @@ class ChanglingMergeOperator : public MergeOperator {
                                  Logger* /*logger*/) const override {
     return false;
   }
+  static const char* kClassName() { return "ChanglingMergeOperator"; }
+  virtual bool IsInstanceOf(const std::string& id) const override {
+    if (id == kClassName()) {
+      return true;
+    } else {
+      return MergeOperator::IsInstanceOf(id);
+    }
+  }
+
   virtual const char* Name() const override { return name_.c_str(); }
 
  protected:
@@ -796,6 +805,15 @@ class ChanglingCompactionFilter : public CompactionFilter {
     return false;
   }
 
+  static const char* kClassName() { return "ChanglingCompactionFilter"; }
+  virtual bool IsInstanceOf(const std::string& id) const override {
+    if (id == kClassName()) {
+      return true;
+    } else {
+      return CompactionFilter::IsInstanceOf(id);
+    }
+  }
+
   const char* Name() const override { return name_.c_str(); }
 
  private:
@@ -821,6 +839,14 @@ class ChanglingCompactionFilterFactory : public CompactionFilterFactory {
 
   // Returns a name that identifies this compaction filter factory.
   const char* Name() const override { return name_.c_str(); }
+  static const char* kClassName() { return "ChanglingCompactionFilterFactory"; }
+  virtual bool IsInstanceOf(const std::string& id) const override {
+    if (id == kClassName()) {
+      return true;
+    } else {
+      return CompactionFilterFactory::IsInstanceOf(id);
+    }
+  }
 
  protected:
   std::string name_;
index c4437092fa263e23b6547434277a2090173e20a0..1b2ad6988a6ddd00251dc6c59cb88cb0cb3e287c 100644 (file)
@@ -3929,6 +3929,7 @@ class Benchmark {
   void InitializeOptionsFromFlags(Options* opts) {
     printf("Initializing RocksDB Options from command-line flags\n");
     Options& options = *opts;
+    ConfigOptions config_options(options);
 
     assert(db_.db == nullptr);
 
@@ -4294,12 +4295,14 @@ class Benchmark {
     options.wal_bytes_per_sync = FLAGS_wal_bytes_per_sync;
 
     // merge operator options
-    options.merge_operator = MergeOperators::CreateFromStringId(
-        FLAGS_merge_operator);
-    if (options.merge_operator == nullptr && !FLAGS_merge_operator.empty()) {
-      fprintf(stderr, "invalid merge operator: %s\n",
-              FLAGS_merge_operator.c_str());
-      exit(1);
+    if (!FLAGS_merge_operator.empty()) {
+      Status s = MergeOperator::CreateFromString(
+          config_options, FLAGS_merge_operator, &options.merge_operator);
+      if (!s.ok()) {
+        fprintf(stderr, "invalid merge operator[%s]: %s\n",
+                FLAGS_merge_operator.c_str(), s.ToString().c_str());
+        exit(1);
+      }
     }
     options.max_successive_merges = FLAGS_max_successive_merges;
     options.report_bg_io_stats = FLAGS_report_bg_io_stats;
index f0a00e4d1f352dc3cbee5e5c7e88241f262619a8..3d49ea0ab7203a3388f3f269246d228bcaf3a5ff 100644 (file)
@@ -4,15 +4,35 @@
 //  (found in the LICENSE.Apache file in the root directory).
 
 #include "utilities/cassandra/cassandra_compaction_filter.h"
+
 #include <string>
+
 #include "rocksdb/slice.h"
+#include "rocksdb/utilities/object_registry.h"
+#include "rocksdb/utilities/options_type.h"
 #include "utilities/cassandra/format.h"
+#include "utilities/cassandra/merge_operator.h"
 
 namespace ROCKSDB_NAMESPACE {
 namespace cassandra {
+static std::unordered_map<std::string, OptionTypeInfo>
+    cassandra_filter_type_info = {
+#ifndef ROCKSDB_LITE
+        {"purge_ttl_on_expiration",
+         {offsetof(struct CassandraOptions, purge_ttl_on_expiration),
+          OptionType::kBoolean, OptionVerificationType::kNormal,
+          OptionTypeFlags::kNone}},
+        {"gc_grace_period_in_seconds",
+         {offsetof(struct CassandraOptions, gc_grace_period_in_seconds),
+          OptionType::kUInt32T, OptionVerificationType::kNormal,
+          OptionTypeFlags::kNone}},
+#endif  // ROCKSDB_LITE
+};
 
-const char* CassandraCompactionFilter::Name() const {
-  return "CassandraCompactionFilter";
+CassandraCompactionFilter::CassandraCompactionFilter(
+    bool purge_ttl_on_expiration, int32_t gc_grace_period_in_seconds)
+    : options_(gc_grace_period_in_seconds, 0, purge_ttl_on_expiration) {
+  RegisterOptions(&options_, &cassandra_filter_type_info);
 }
 
 CompactionFilter::Decision CassandraCompactionFilter::FilterV2(
@@ -23,12 +43,12 @@ CompactionFilter::Decision CassandraCompactionFilter::FilterV2(
   RowValue row_value = RowValue::Deserialize(
     existing_value.data(), existing_value.size());
   RowValue compacted =
-      purge_ttl_on_expiration_
+      options_.purge_ttl_on_expiration
           ? row_value.RemoveExpiredColumns(&value_changed)
           : row_value.ConvertExpiredColumnsToTombstones(&value_changed);
 
   if (value_type == ValueType::kValue) {
-    compacted = compacted.RemoveTombstones(gc_grace_period_in_seconds_);
+    compacted = compacted.RemoveTombstones(options_.gc_grace_period_in_seconds);
   }
 
   if(compacted.Empty()) {
@@ -43,5 +63,48 @@ CompactionFilter::Decision CassandraCompactionFilter::FilterV2(
   return Decision::kKeep;
 }
 
+CassandraCompactionFilterFactory::CassandraCompactionFilterFactory(
+    bool purge_ttl_on_expiration, int32_t gc_grace_period_in_seconds)
+    : options_(gc_grace_period_in_seconds, 0, purge_ttl_on_expiration) {
+  RegisterOptions(&options_, &cassandra_filter_type_info);
+}
+
+std::unique_ptr<CompactionFilter>
+CassandraCompactionFilterFactory::CreateCompactionFilter(
+    const CompactionFilter::Context&) {
+  std::unique_ptr<CompactionFilter> result(new CassandraCompactionFilter(
+      options_.purge_ttl_on_expiration, options_.gc_grace_period_in_seconds));
+  return result;
+}
+
+#ifndef ROCKSDB_LITE
+int RegisterCassandraObjects(ObjectLibrary& library,
+                             const std::string& /*arg*/) {
+  library.Register<MergeOperator>(
+      CassandraValueMergeOperator::kClassName(),
+      [](const std::string& /*uri*/, std::unique_ptr<MergeOperator>* guard,
+         std::string* /* errmsg */) {
+        guard->reset(new CassandraValueMergeOperator(0));
+        return guard->get();
+      });
+  library.Register<CompactionFilter>(
+      CassandraCompactionFilter::kClassName(),
+      [](const std::string& /*uri*/,
+         std::unique_ptr<CompactionFilter>* /*guard */,
+         std::string* /* errmsg */) {
+        return new CassandraCompactionFilter(false, 0);
+      });
+  library.Register<CompactionFilterFactory>(
+      CassandraCompactionFilterFactory::kClassName(),
+      [](const std::string& /*uri*/,
+         std::unique_ptr<CompactionFilterFactory>* guard,
+         std::string* /* errmsg */) {
+        guard->reset(new CassandraCompactionFilterFactory(false, 0));
+        return guard->get();
+      });
+  size_t num_types;
+  return static_cast<int>(library.GetFactoryCount(&num_types));
+}
+#endif  // ROCKSDB_LITE
 }  // namespace cassandra
 }  // namespace ROCKSDB_NAMESPACE
index ac258810621915df0643c39a6c967949bc52dadd..becadde3222bc5cf09175b3029268e16e5d3564e 100644 (file)
@@ -5,8 +5,10 @@
 
 #pragma once
 #include <string>
+
 #include "rocksdb/compaction_filter.h"
 #include "rocksdb/slice.h"
+#include "utilities/cassandra/cassandra_options.h"
 
 namespace ROCKSDB_NAMESPACE {
 namespace cassandra {
@@ -25,18 +27,31 @@ namespace cassandra {
 class CassandraCompactionFilter : public CompactionFilter {
 public:
  explicit CassandraCompactionFilter(bool purge_ttl_on_expiration,
-                                    int32_t gc_grace_period_in_seconds)
-     : purge_ttl_on_expiration_(purge_ttl_on_expiration),
      gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {}
+                                    int32_t gc_grace_period_in_seconds);
+ static const char* kClassName() { return "CassandraCompactionFilter"; }
const char* Name() const override { return kClassName(); }
 
- const char* Name() const override;
  virtual Decision FilterV2(int level, const Slice& key, ValueType value_type,
                            const Slice& existing_value, std::string* new_value,
                            std::string* skip_until) const override;
 
 private:
-  bool purge_ttl_on_expiration_;
-  int32_t gc_grace_period_in_seconds_;
+ CassandraOptions options_;
+};
+
+class CassandraCompactionFilterFactory : public CompactionFilterFactory {
+ public:
+  explicit CassandraCompactionFilterFactory(bool purge_ttl_on_expiration,
+                                            int32_t gc_grace_period_in_seconds);
+  ~CassandraCompactionFilterFactory() override {}
+
+  std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+      const CompactionFilter::Context& context) override;
+  static const char* kClassName() { return "CassandraCompactionFilterFactory"; }
+  const char* Name() const override { return kClassName(); }
+
+ private:
+  CassandraOptions options_;
 };
 }  // namespace cassandra
 }  // namespace ROCKSDB_NAMESPACE
index cd06acc92d326e779cb185c691ee085a521cad18..bde20340a5cc0c56933df54cb09528ffc12cd9af 100644 (file)
@@ -6,9 +6,10 @@
 #include <iostream>
 
 #include "db/db_impl/db_impl.h"
+#include "rocksdb/convenience.h"
 #include "rocksdb/db.h"
 #include "rocksdb/merge_operator.h"
-#include "rocksdb/utilities/db_ttl.h"
+#include "rocksdb/utilities/object_registry.h"
 #include "test_util/testharness.h"
 #include "util/cast_util.h"
 #include "util/random.h"
@@ -318,6 +319,99 @@ TEST_F(CassandraFunctionalTest, CompactionShouldRemoveTombstoneFromPut) {
   ASSERT_FALSE(std::get<0>(store.Get("k1")));
 }
 
+#ifndef ROCKSDB_LITE
+TEST_F(CassandraFunctionalTest, LoadMergeOperator) {
+  ConfigOptions config_options;
+  std::shared_ptr<MergeOperator> mo;
+  config_options.ignore_unsupported_options = false;
+
+  ASSERT_NOK(MergeOperator::CreateFromString(
+      config_options, CassandraValueMergeOperator::kClassName(), &mo));
+
+  config_options.registry->AddLibrary("cassandra", RegisterCassandraObjects,
+                                      "cassandra");
+
+  ASSERT_OK(MergeOperator::CreateFromString(
+      config_options, CassandraValueMergeOperator::kClassName(), &mo));
+  ASSERT_NE(mo, nullptr);
+  ASSERT_STREQ(mo->Name(), CassandraValueMergeOperator::kClassName());
+  mo.reset();
+  ASSERT_OK(MergeOperator::CreateFromString(
+      config_options,
+      std::string("operands_limit=20;gc_grace_period_in_seconds=42;id=") +
+          CassandraValueMergeOperator::kClassName(),
+      &mo));
+  ASSERT_NE(mo, nullptr);
+  ASSERT_STREQ(mo->Name(), CassandraValueMergeOperator::kClassName());
+  const auto* opts = mo->GetOptions<CassandraOptions>();
+  ASSERT_NE(opts, nullptr);
+  ASSERT_EQ(opts->gc_grace_period_in_seconds, 42);
+  ASSERT_EQ(opts->operands_limit, 20);
+}
+
+TEST_F(CassandraFunctionalTest, LoadCompactionFilter) {
+  ConfigOptions config_options;
+  const CompactionFilter* filter = nullptr;
+  config_options.ignore_unsupported_options = false;
+
+  ASSERT_NOK(CompactionFilter::CreateFromString(
+      config_options, CassandraCompactionFilter::kClassName(), &filter));
+  config_options.registry->AddLibrary("cassandra", RegisterCassandraObjects,
+                                      "cassandra");
+
+  ASSERT_OK(CompactionFilter::CreateFromString(
+      config_options, CassandraCompactionFilter::kClassName(), &filter));
+  ASSERT_NE(filter, nullptr);
+  ASSERT_STREQ(filter->Name(), CassandraCompactionFilter::kClassName());
+  delete filter;
+  filter = nullptr;
+  ASSERT_OK(CompactionFilter::CreateFromString(
+      config_options,
+      std::string(
+          "purge_ttl_on_expiration=true;gc_grace_period_in_seconds=42;id=") +
+          CassandraCompactionFilter::kClassName(),
+      &filter));
+  ASSERT_NE(filter, nullptr);
+  ASSERT_STREQ(filter->Name(), CassandraCompactionFilter::kClassName());
+  const auto* opts = filter->GetOptions<CassandraOptions>();
+  ASSERT_NE(opts, nullptr);
+  ASSERT_EQ(opts->gc_grace_period_in_seconds, 42);
+  ASSERT_TRUE(opts->purge_ttl_on_expiration);
+  delete filter;
+}
+
+TEST_F(CassandraFunctionalTest, LoadCompactionFilterFactory) {
+  ConfigOptions config_options;
+  std::shared_ptr<CompactionFilterFactory> factory;
+
+  config_options.ignore_unsupported_options = false;
+  ASSERT_NOK(CompactionFilterFactory::CreateFromString(
+      config_options, CassandraCompactionFilterFactory::kClassName(),
+      &factory));
+  config_options.registry->AddLibrary("cassandra", RegisterCassandraObjects,
+                                      "cassandra");
+
+  ASSERT_OK(CompactionFilterFactory::CreateFromString(
+      config_options, CassandraCompactionFilterFactory::kClassName(),
+      &factory));
+  ASSERT_NE(factory, nullptr);
+  ASSERT_STREQ(factory->Name(), CassandraCompactionFilterFactory::kClassName());
+  factory.reset();
+  ASSERT_OK(CompactionFilterFactory::CreateFromString(
+      config_options,
+      std::string(
+          "purge_ttl_on_expiration=true;gc_grace_period_in_seconds=42;id=") +
+          CassandraCompactionFilterFactory::kClassName(),
+      &factory));
+  ASSERT_NE(factory, nullptr);
+  ASSERT_STREQ(factory->Name(), CassandraCompactionFilterFactory::kClassName());
+  const auto* opts = factory->GetOptions<CassandraOptions>();
+  ASSERT_NE(opts, nullptr);
+  ASSERT_EQ(opts->gc_grace_period_in_seconds, 42);
+  ASSERT_TRUE(opts->purge_ttl_on_expiration);
+}
+#endif  // ROCKSDB_LITE
+
 } // namespace cassandra
 }  // namespace ROCKSDB_NAMESPACE
 
diff --git a/utilities/cassandra/cassandra_options.h b/utilities/cassandra/cassandra_options.h
new file mode 100644 (file)
index 0000000..8cf756a
--- /dev/null
@@ -0,0 +1,40 @@
+// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+#include <cinttypes>
+
+#include "rocksdb/rocksdb_namespace.h"
+
+namespace ROCKSDB_NAMESPACE {
+class ObjectLibrary;
+namespace cassandra {
+struct CassandraOptions {
+  static const char* kName() { return "CassandraOptions"; }
+  CassandraOptions(int32_t _gc_grace_period_in_seconds, size_t _operands_limit,
+                   bool _purge_ttl_on_expiration = false)
+      : operands_limit(_operands_limit),
+        gc_grace_period_in_seconds(_gc_grace_period_in_seconds),
+        purge_ttl_on_expiration(_purge_ttl_on_expiration) {}
+  // Limit on the number of merge operands.
+  size_t operands_limit;
+
+  // How long (in seconds) tombstoned data remains before it is purged
+  int32_t gc_grace_period_in_seconds;
+
+  // If is set to true, expired data will be directly purged.
+  // Otherwise expired data will be converted tombstones first,
+  // then be eventually removed after gc grace period. This value should
+  // only true if all writes have same ttl setting, otherwise it could bring old
+  // data back.
+  bool purge_ttl_on_expiration;
+};
+#ifndef ROCKSDB_LITE
+extern "C" {
+int RegisterCassandraObjects(ObjectLibrary& library, const std::string& arg);
+}  // extern "C"
+#endif  // ROCKSDB_LITE
+}  // namespace cassandra
+}  // namespace ROCKSDB_NAMESPACE
index 82fe5d6614213ae27b0070942c8ce5a8cb8ca275..9d0cdd38548dae906d0a1a17cb2b1c24096ac15c 100644 (file)
@@ -5,16 +5,36 @@
 
 #include "merge_operator.h"
 
-#include <memory>
 #include <assert.h>
 
-#include "rocksdb/slice.h"
+#include <memory>
+
 #include "rocksdb/merge_operator.h"
-#include "utilities/merge_operators.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/utilities/options_type.h"
 #include "utilities/cassandra/format.h"
+#include "utilities/merge_operators.h"
 
 namespace ROCKSDB_NAMESPACE {
 namespace cassandra {
+static std::unordered_map<std::string, OptionTypeInfo>
+    merge_operator_options_info = {
+#ifndef ROCKSDB_LITE
+        {"gc_grace_period_in_seconds",
+         {offsetof(struct CassandraOptions, gc_grace_period_in_seconds),
+          OptionType::kUInt32T, OptionVerificationType::kNormal,
+          OptionTypeFlags::kNone}},
+        {"operands_limit",
+         {offsetof(struct CassandraOptions, operands_limit), OptionType::kSizeT,
+          OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
+#endif  // ROCKSDB_LITE
+};
+
+CassandraValueMergeOperator::CassandraValueMergeOperator(
+    int32_t gc_grace_period_in_seconds, size_t operands_limit)
+    : options_(gc_grace_period_in_seconds, operands_limit) {
+  RegisterOptions(&options_, &merge_operator_options_info);
+}
 
 // Implementation for the merge operation (merges two Cassandra values)
 bool CassandraValueMergeOperator::FullMergeV2(
@@ -34,7 +54,7 @@ bool CassandraValueMergeOperator::FullMergeV2(
   }
 
   RowValue merged = RowValue::Merge(std::move(row_values));
-  merged = merged.RemoveTombstones(gc_grace_period_in_seconds_);
+  merged = merged.RemoveTombstones(options_.gc_grace_period_in_seconds);
   merge_out->new_value.reserve(merged.Size());
   merged.Serialize(&(merge_out->new_value));
 
@@ -58,10 +78,6 @@ bool CassandraValueMergeOperator::PartialMergeMulti(
   return true;
 }
 
-const char* CassandraValueMergeOperator::Name() const  {
-  return "CassandraValueMergeOperator";
-}
-
 } // namespace cassandra
 
 }  // namespace ROCKSDB_NAMESPACE
index b5bf7c520eb646611c7db354fd2dc6f17cb6cdde..4bf9128098a9da44425c65cb7af0ea9cd6bc3ba3 100644 (file)
@@ -6,6 +6,7 @@
 #pragma once
 #include "rocksdb/merge_operator.h"
 #include "rocksdb/slice.h"
+#include "utilities/cassandra/cassandra_options.h"
 
 namespace ROCKSDB_NAMESPACE {
 namespace cassandra {
@@ -16,9 +17,7 @@ namespace cassandra {
 class CassandraValueMergeOperator : public MergeOperator {
 public:
  explicit CassandraValueMergeOperator(int32_t gc_grace_period_in_seconds,
-                                      size_t operands_limit = 0)
-     : gc_grace_period_in_seconds_(gc_grace_period_in_seconds),
-       operands_limit_(operands_limit) {}
+                                      size_t operands_limit = 0);
 
  virtual bool FullMergeV2(const MergeOperationInput& merge_in,
                           MergeOperationOutput* merge_out) const override;
@@ -28,17 +27,18 @@ public:
                                 std::string* new_value,
                                 Logger* logger) const override;
 
- virtual const char* Name() const override;
+ const char* Name() const override { return kClassName(); }
+ static const char* kClassName() { return "CassandraValueMergeOperator"; }
 
  virtual bool AllowSingleOperand() const override { return true; }
 
  virtual bool ShouldMerge(const std::vector<Slice>& operands) const override {
-   return operands_limit_ > 0 && operands.size() >= operands_limit_;
+   return options_.operands_limit > 0 &&
+          operands.size() >= options_.operands_limit;
  }
 
 private:
- int32_t gc_grace_period_in_seconds_;
- size_t operands_limit_;
+ CassandraOptions options_;
 };
 } // namespace cassandra
 }  // namespace ROCKSDB_NAMESPACE
diff --git a/utilities/compaction_filters.cc b/utilities/compaction_filters.cc
new file mode 100644 (file)
index 0000000..d527ee5
--- /dev/null
@@ -0,0 +1,56 @@
+// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include <memory>
+
+#include "rocksdb/compaction_filter.h"
+#include "rocksdb/options.h"
+#include "rocksdb/utilities/customizable_util.h"
+#include "rocksdb/utilities/options_type.h"
+#include "utilities/compaction_filters/layered_compaction_filter_base.h"
+#include "utilities/compaction_filters/remove_emptyvalue_compactionfilter.h"
+
+namespace ROCKSDB_NAMESPACE {
+#ifndef ROCKSDB_LITE
+static int RegisterBuiltinCompactionFilters(ObjectLibrary& library,
+                                            const std::string& /*arg*/) {
+  library.Register<CompactionFilter>(
+      RemoveEmptyValueCompactionFilter::kClassName(),
+      [](const std::string& /*uri*/,
+         std::unique_ptr<CompactionFilter>* /*guard*/,
+         std::string* /*errmsg*/) {
+        return new RemoveEmptyValueCompactionFilter();
+      });
+  return 1;
+}
+#endif  // ROCKSDB_LITE
+Status CompactionFilter::CreateFromString(const ConfigOptions& config_options,
+                                          const std::string& value,
+                                          const CompactionFilter** result) {
+#ifndef ROCKSDB_LITE
+  static std::once_flag once;
+  std::call_once(once, [&]() {
+    RegisterBuiltinCompactionFilters(*(ObjectLibrary::Default().get()), "");
+  });
+#endif  // ROCKSDB_LITE
+  CompactionFilter* filter = const_cast<CompactionFilter*>(*result);
+  Status status = LoadStaticObject<CompactionFilter>(config_options, value,
+                                                     nullptr, &filter);
+  if (status.ok()) {
+    *result = const_cast<CompactionFilter*>(filter);
+  }
+  return status;
+}
+
+Status CompactionFilterFactory::CreateFromString(
+    const ConfigOptions& config_options, const std::string& value,
+    std::shared_ptr<CompactionFilterFactory>* result) {
+  // Currently there are no builtin CompactionFilterFactories.
+  // If any are introduced, they need to be registered here.
+  Status status = LoadSharedObject<CompactionFilterFactory>(
+      config_options, value, nullptr, result);
+  return status;
+}
+}  // namespace ROCKSDB_NAMESPACE
index abde6495287f74fc48c8cccc2df15e4662cbc1e6..803fa94ae315c67812a96eb2c15911f58f7bfd72 100644 (file)
@@ -10,7 +10,7 @@
 
 namespace ROCKSDB_NAMESPACE {
 
-// Abstract base class for building layered compation filter on top of
+// Abstract base class for building layered compaction filter on top of
 // user compaction filter.
 // See BlobIndexCompactionFilter or TtlCompactionFilter for a basic usage.
 class LayeredCompactionFilterBase : public CompactionFilter {
@@ -29,8 +29,12 @@ class LayeredCompactionFilterBase : public CompactionFilter {
   // Return a pointer to user compaction filter
   const CompactionFilter* user_comp_filter() const { return user_comp_filter_; }
 
- private:
+  const Customizable* Inner() const override { return user_comp_filter_; }
+
+ protected:
   const CompactionFilter* user_comp_filter_;
+
+ private:
   std::unique_ptr<const CompactionFilter> user_comp_filter_from_factory_;
 };
 
index c97eef41da439784844f9f89092afb58f9966d21..f4dbce100434fcce23877eb8bc0fb572954072db 100644 (file)
 
 namespace ROCKSDB_NAMESPACE {
 
-const char* RemoveEmptyValueCompactionFilter::Name() const {
-  return "RemoveEmptyValueCompactionFilter";
-}
-
 bool RemoveEmptyValueCompactionFilter::Filter(int /*level*/,
                                               const Slice& /*key*/,
                                               const Slice& existing_value,
index f5dbec900a4705d223c56e64f65e94fd15192a37..864ad15ffafbad0eb68890400b8a990de036e8d6 100644 (file)
@@ -16,12 +16,13 @@ namespace ROCKSDB_NAMESPACE {
 
 class RemoveEmptyValueCompactionFilter : public CompactionFilter {
  public:
-    const char* Name() const override;
-    bool Filter(int level,
-        const Slice& key,
-        const Slice& existing_value,
-        std::string* new_value,
-        bool* value_changed) const override;
+  static const char* kClassName() { return "RemoveEmptyValueCompactionFilter"; }
+
+  const char* Name() const override { return kClassName(); }
+
+  bool Filter(int level, const Slice& key, const Slice& existing_value,
+              std::string* new_value, bool* value_changed) const override;
 };
+
 }  // namespace ROCKSDB_NAMESPACE
 #endif  // !ROCKSDB_LITE
diff --git a/utilities/merge_operators.cc b/utilities/merge_operators.cc
new file mode 100644 (file)
index 0000000..7fe0abf
--- /dev/null
@@ -0,0 +1,125 @@
+// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "utilities/merge_operators.h"
+
+#include <memory>
+
+#include "rocksdb/merge_operator.h"
+#include "rocksdb/options.h"
+#include "rocksdb/utilities/customizable_util.h"
+#include "rocksdb/utilities/object_registry.h"
+#include "utilities/merge_operators/bytesxor.h"
+#include "utilities/merge_operators/sortlist.h"
+#include "utilities/merge_operators/string_append/stringappend.h"
+#include "utilities/merge_operators/string_append/stringappend2.h"
+
+namespace ROCKSDB_NAMESPACE {
+static bool LoadMergeOperator(const std::string& id,
+                              std::shared_ptr<MergeOperator>* result) {
+  bool success = true;
+  // TODO: Hook the "name" up to the actual Name() of the MergeOperators?
+  // Requires these classes be moved into a header file...
+  if (id == "put" || id == "PutOperator") {
+    *result = MergeOperators::CreatePutOperator();
+  } else if (id == "put_v1") {
+    *result = MergeOperators::CreateDeprecatedPutOperator();
+  } else if (id == "uint64add" || id == "UInt64AddOperator") {
+    *result = MergeOperators::CreateUInt64AddOperator();
+  } else if (id == "max" || id == "MaxOperator") {
+    *result = MergeOperators::CreateMaxOperator();
+#ifdef ROCKSDB_LITE
+    // The remainder of the classes are handled by the ObjectRegistry in
+    // non-LITE mode
+  } else if (id == StringAppendOperator::kNickName() ||
+             id == StringAppendOperator::kClassName()) {
+    *result = MergeOperators::CreateStringAppendOperator();
+  } else if (id == StringAppendTESTOperator::kNickName() ||
+             id == StringAppendTESTOperator::kClassName()) {
+    *result = MergeOperators::CreateStringAppendTESTOperator();
+  } else if (id == BytesXOROperator::kNickName() ||
+             id == BytesXOROperator::kClassName()) {
+    *result = MergeOperators::CreateBytesXOROperator();
+  } else if (id == SortList::kNickName() || id == SortList::kClassName()) {
+    *result = MergeOperators::CreateSortOperator();
+#endif  // ROCKSDB_LITE
+  } else {
+    success = false;
+  }
+  return success;
+}
+
+#ifndef ROCKSDB_LITE
+static int RegisterBuiltinMergeOperators(ObjectLibrary& library,
+                                         const std::string& /*arg*/) {
+  size_t num_types;
+  auto AsRegex = [](const std::string& name, const std::string& alt) {
+    std::string regex;
+    regex.append("(").append(name);
+    regex.append("|").append(alt).append(")");
+    return regex;
+  };
+
+  library.Register<MergeOperator>(
+      AsRegex(StringAppendOperator::kClassName(),
+              StringAppendOperator::kNickName()),
+      [](const std::string& /*uri*/, std::unique_ptr<MergeOperator>* guard,
+         std::string* /*errmsg*/) {
+        guard->reset(new StringAppendOperator(","));
+        return guard->get();
+      });
+  library.Register<MergeOperator>(
+      AsRegex(StringAppendTESTOperator::kClassName(),
+              StringAppendTESTOperator::kNickName()),
+      [](const std::string& /*uri*/, std::unique_ptr<MergeOperator>* guard,
+         std::string* /*errmsg*/) {
+        guard->reset(new StringAppendTESTOperator(","));
+        return guard->get();
+      });
+  library.Register<MergeOperator>(
+      AsRegex(SortList::kClassName(), SortList::kNickName()),
+      [](const std::string& /*uri*/, std::unique_ptr<MergeOperator>* guard,
+         std::string* /*errmsg*/) {
+        guard->reset(new SortList());
+        return guard->get();
+      });
+  library.Register<MergeOperator>(
+      AsRegex(BytesXOROperator::kClassName(), BytesXOROperator::kNickName()),
+      [](const std::string& /*uri*/, std::unique_ptr<MergeOperator>* guard,
+         std::string* /*errmsg*/) {
+        guard->reset(new BytesXOROperator());
+        return guard->get();
+      });
+
+  return static_cast<int>(library.GetFactoryCount(&num_types));
+}
+#endif  // ROCKSDB_LITE
+
+Status MergeOperator::CreateFromString(const ConfigOptions& config_options,
+                                       const std::string& value,
+                                       std::shared_ptr<MergeOperator>* result) {
+#ifndef ROCKSDB_LITE
+  static std::once_flag once;
+  std::call_once(once, [&]() {
+    RegisterBuiltinMergeOperators(*(ObjectLibrary::Default().get()), "");
+  });
+#endif  // ROCKSDB_LITE
+  return LoadSharedObject<MergeOperator>(config_options, value,
+                                         LoadMergeOperator, result);
+}
+
+std::shared_ptr<MergeOperator> MergeOperators::CreateFromStringId(
+    const std::string& id) {
+  std::shared_ptr<MergeOperator> result;
+  Status s = MergeOperator::CreateFromString(ConfigOptions(), id, &result);
+  if (s.ok()) {
+    return result;
+  } else {
+    // Empty or unknown, just return nullptr
+    return nullptr;
+  }
+}
+
+}  // namespace ROCKSDB_NAMESPACE
index ded5e2ee8b013dfde36c9233f34a1574f694ab33..37535cdc53af1e6001a77d6b317b299473019fd5 100644 (file)
@@ -28,30 +28,8 @@ class MergeOperators {
   static std::shared_ptr<MergeOperator> CreateSortOperator();
 
   // Will return a different merge operator depending on the string.
-  // TODO: Hook the "name" up to the actual Name() of the MergeOperators?
   static std::shared_ptr<MergeOperator> CreateFromStringId(
-      const std::string& name) {
-    if (name == "put") {
-      return CreatePutOperator();
-    } else if (name == "put_v1") {
-      return CreateDeprecatedPutOperator();
-    } else if ( name == "uint64add") {
-      return CreateUInt64AddOperator();
-    } else if (name == "stringappend") {
-      return CreateStringAppendOperator();
-    } else if (name == "stringappendtest") {
-      return CreateStringAppendTESTOperator();
-    } else if (name == "max") {
-      return CreateMaxOperator();
-    } else if (name == "bytesxor") {
-      return CreateBytesXOROperator();
-    } else if (name == "sortlist") {
-      return CreateSortOperator();
-    } else {
-      // Empty or unknown, just return nullptr
-      return nullptr;
-    }
-  }
+      const std::string& name);
 };
 
 }  // namespace ROCKSDB_NAMESPACE
index ab0c5aecc6789ae631c0b1b3f2675db0996d9bf1..f05b6ca981faa342463921047157c2d917d3d91c 100644 (file)
@@ -28,9 +28,11 @@ class BytesXOROperator : public AssociativeMergeOperator {
                      std::string* new_value,
                      Logger* logger) const override;
 
-  virtual const char* Name() const override {
-    return "BytesXOR";
-  }
+  static const char* kClassName() { return "BytesXOR"; }
+  static const char* kNickName() { return "bytesxor"; }
+
+  const char* NickName() const override { return kNickName(); }
+  const char* Name() const override { return kClassName(); }
 
   void XOR(const Slice* existing_value, const Slice& value,
           std::string* new_value) const;
index 2270c1f0319c09c6467f1c8abb452f3ea2d51a27..de4abfa6fa7d28a5ba321ebb979a1c92511562aa 100644 (file)
@@ -64,7 +64,10 @@ class MaxOperator : public MergeOperator {
     return true;
   }
 
-  const char* Name() const override { return "MaxOperator"; }
+  static const char* kClassName() { return "MaxOperator"; }
+  static const char* kNickName() { return "max"; }
+  const char* Name() const override { return kClassName(); }
+  const char* NickName() const override { return kNickName(); }
 };
 
 }  // end of anonymous namespace
index 901d69e945261ab2e5c35a326ae80bf685846699..2133f1b57625b0799db1f90e32e4ad60f29dfe4a 100644 (file)
@@ -48,7 +48,10 @@ class PutOperator : public MergeOperator {
     return true;
   }
 
-  const char* Name() const override { return "PutOperator"; }
+  static const char* kClassName() { return "PutOperator"; }
+  static const char* kNickName() { return "put_v1"; }
+  const char* Name() const override { return kClassName(); }
+  const char* NickName() const override { return kNickName(); }
 };
 
 class PutOperatorV2 : public PutOperator {
@@ -67,6 +70,9 @@ class PutOperatorV2 : public PutOperator {
     merge_out->existing_operand = merge_in.operand_list.back();
     return true;
   }
+
+  static const char* kNickName() { return "put"; }
+  const char* NickName() const override { return kNickName(); }
 };
 
 } // end of anonymous namespace
index 078b83e19747681c5cf7d0a6bb188a638985a07a..fae33e2fd7b8ce9eab50afe6a47340bd691f07cc 100644 (file)
@@ -49,8 +49,6 @@ bool SortList::PartialMergeMulti(const Slice& /*key*/,
   return true;
 }
 
-const char* SortList::Name() const { return "MergeSortOperator"; }
-
 void SortList::MakeVector(std::vector<int>& operand, Slice slice) const {
   do {
     const char* begin = slice.data_;
index 5e08bd583fa98fd145ca550a9efc9b90ef251bf6..eaa4e76fbac4f80de6cafce9b6dbee52fe4a60e4 100644 (file)
@@ -27,7 +27,11 @@ class SortList : public MergeOperator {
                          const std::deque<Slice>& operand_list,
                          std::string* new_value, Logger* logger) const override;
 
-  const char* Name() const override;
+  static const char* kClassName() { return "MergeSortOperator"; }
+  static const char* kNickName() { return "sortlist"; }
+
+  const char* Name() const override { return kClassName(); }
+  const char* NickName() const override { return kNickName(); }
 
   void MakeVector(std::vector<int>& operand, Slice slice) const;
 
index cd963b5b109423dfc8e7831d05b8494dca2c8b6f..c20d415e76adc5c65a6abad38746ddd6a9acf9d2 100644 (file)
@@ -6,21 +6,36 @@
 
 #include "stringappend.h"
 
-#include <memory>
 #include <assert.h>
 
-#include "rocksdb/slice.h"
+#include <memory>
+
 #include "rocksdb/merge_operator.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/utilities/options_type.h"
 #include "utilities/merge_operators.h"
 
 namespace ROCKSDB_NAMESPACE {
-
+namespace {
+static std::unordered_map<std::string, OptionTypeInfo>
+    stringappend_merge_type_info = {
+#ifndef ROCKSDB_LITE
+        {"delimiter",
+         {0, OptionType::kString, OptionVerificationType::kNormal,
+          OptionTypeFlags::kNone}},
+#endif  // ROCKSDB_LITE
+};
+}  // namespace
 // Constructor: also specify the delimiter character.
 StringAppendOperator::StringAppendOperator(char delim_char)
-    : delim_(1, delim_char) {}
+    : delim_(1, delim_char) {
+  RegisterOptions("Delimiter", &delim_, &stringappend_merge_type_info);
+}
 
 StringAppendOperator::StringAppendOperator(const std::string& delim)
-    : delim_(delim) {}
+    : delim_(delim) {
+  RegisterOptions("Delimiter", &delim_, &stringappend_merge_type_info);
+}
 
 // Implementation for the merge operation (concatenates two strings)
 bool StringAppendOperator::Merge(const Slice& /*key*/,
@@ -46,9 +61,6 @@ bool StringAppendOperator::Merge(const Slice& /*key*/,
   return true;
 }
 
-const char* StringAppendOperator::Name() const  {
-  return "StringAppendOperator";
-}
 
 std::shared_ptr<MergeOperator> MergeOperators::CreateStringAppendOperator() {
   return std::make_shared<StringAppendOperator>(',');
index 98fc6c9980f78347ac5556660c59a5331982eb5e..3c2bb1907a486e7e49443f41ce5dd9381e878668 100644 (file)
@@ -22,7 +22,10 @@ class StringAppendOperator : public AssociativeMergeOperator {
                      std::string* new_value,
                      Logger* logger) const override;
 
-  virtual const char* Name() const override;
+  static const char* kClassName() { return "StringAppendOperator"; }
+  static const char* kNickName() { return "stringappend"; }
+  virtual const char* Name() const override { return kClassName(); }
+  virtual const char* NickName() const override { return kNickName(); }
 
  private:
   std::string delim_;  // The delimiter is inserted between elements
index 699697c43cb7acc0447ab9a28e2dad1529a3a016..36cb9ee34ea4f710ff81d2a74cbb156ab5e04a1e 100644 (file)
@@ -5,22 +5,38 @@
 
 #include "stringappend2.h"
 
+#include <assert.h>
+
 #include <memory>
 #include <string>
-#include <assert.h>
 
-#include "rocksdb/slice.h"
 #include "rocksdb/merge_operator.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/utilities/options_type.h"
 #include "utilities/merge_operators.h"
 
 namespace ROCKSDB_NAMESPACE {
+namespace {
+static std::unordered_map<std::string, OptionTypeInfo>
+    stringappend2_merge_type_info = {
+#ifndef ROCKSDB_LITE
+        {"delimiter",
+         {0, OptionType::kString, OptionVerificationType::kNormal,
+          OptionTypeFlags::kNone}},
+#endif  // ROCKSDB_LITE
+};
+}  // namespace
 
 // Constructor: also specify the delimiter character.
 StringAppendTESTOperator::StringAppendTESTOperator(char delim_char)
-    : delim_(1, delim_char) {}
+    : delim_(1, delim_char) {
+  RegisterOptions("Delimiter", &delim_, &stringappend2_merge_type_info);
+}
 
 StringAppendTESTOperator::StringAppendTESTOperator(const std::string& delim)
-    : delim_(delim) {}
+    : delim_(delim) {
+  RegisterOptions("Delimiter", &delim_, &stringappend2_merge_type_info);
+}
 
 // Implementation for the merge operation (concatenates two strings)
 bool StringAppendTESTOperator::FullMergeV2(
@@ -37,6 +53,7 @@ bool StringAppendTESTOperator::FullMergeV2(
 
   // Compute the space needed for the final result.
   size_t numBytes = 0;
+
   for (auto it = merge_in.operand_list.begin();
        it != merge_in.operand_list.end(); ++it) {
     numBytes += it->size() + delim_.size();
@@ -107,11 +124,6 @@ bool StringAppendTESTOperator::_AssocPartialMergeMulti(
   return true;
 }
 
-const char* StringAppendTESTOperator::Name() const  {
-  return "StringAppendTESTOperator";
-}
-
-
 std::shared_ptr<MergeOperator>
 MergeOperators::CreateStringAppendTESTOperator() {
   return std::make_shared<StringAppendTESTOperator>(',');
index 2d4b554f3003cbea3627f46da099a3a9cadb9ab0..339c760bd2680d5a991b3b24412ed7cf63bb3713 100644 (file)
@@ -34,7 +34,10 @@ class StringAppendTESTOperator : public MergeOperator {
                                  std::string* new_value, Logger* logger) const
       override;
 
-  virtual const char* Name() const override;
+  static const char* kClassName() { return "StringAppendTESTOperator"; }
+  static const char* kNickName() { return "stringappendtest"; }
+  const char* Name() const override { return kClassName(); }
+  const char* NickName() const override { return kNickName(); }
 
  private:
   // A version of PartialMerge that actually performs "partial merging".
index 3ef24092824f04241c90ec2672aaa3e9afe5ed65..d8e39615e79bb90cc16438294bb302ae14cb137f 100644 (file)
@@ -36,7 +36,10 @@ class UInt64AddOperator : public AssociativeMergeOperator {
     return true;  // Return true always since corruption will be treated as 0
   }
 
-  const char* Name() const override { return "UInt64AddOperator"; }
+  static const char* kClassName() { return "UInt64AddOperator"; }
+  static const char* kNickName() { return "uint64add"; }
+  const char* Name() const override { return kClassName(); }
+  const char* NickName() const override { return kNickName(); }
 
  private:
   // Takes the string and decodes it into a uint64_t
index 917130ca10f8f144f13ef1ca81568494fe617746..c59cc93ccea5bda9563dc4449da36c9a703ed9ef 100644 (file)
 #include "rocksdb/iterator.h"
 #include "rocksdb/system_clock.h"
 #include "rocksdb/utilities/db_ttl.h"
+#include "rocksdb/utilities/object_registry.h"
+#include "rocksdb/utilities/options_type.h"
 #include "util/coding.h"
 
 namespace ROCKSDB_NAMESPACE {
+static std::unordered_map<std::string, OptionTypeInfo> ttl_merge_op_type_info =
+    {{"user_operator",
+      OptionTypeInfo::AsCustomSharedPtr<MergeOperator>(
+          0, OptionVerificationType::kByName, OptionTypeFlags::kNone)}};
+
+TtlMergeOperator::TtlMergeOperator(
+    const std::shared_ptr<MergeOperator>& merge_op, SystemClock* clock)
+    : user_merge_op_(merge_op), clock_(clock) {
+  RegisterOptions("TtlMergeOptions", &user_merge_op_, &ttl_merge_op_type_info);
+}
+
+bool TtlMergeOperator::FullMergeV2(const MergeOperationInput& merge_in,
+                                   MergeOperationOutput* merge_out) const {
+  const uint32_t ts_len = DBWithTTLImpl::kTSLength;
+  if (merge_in.existing_value && merge_in.existing_value->size() < ts_len) {
+    ROCKS_LOG_ERROR(merge_in.logger,
+                    "Error: Could not remove timestamp from existing value.");
+    return false;
+  }
+
+  // Extract time-stamp from each operand to be passed to user_merge_op_
+  std::vector<Slice> operands_without_ts;
+  for (const auto& operand : merge_in.operand_list) {
+    if (operand.size() < ts_len) {
+      ROCKS_LOG_ERROR(merge_in.logger,
+                      "Error: Could not remove timestamp from operand value.");
+      return false;
+    }
+    operands_without_ts.push_back(operand);
+    operands_without_ts.back().remove_suffix(ts_len);
+  }
+
+  // Apply the user merge operator (store result in *new_value)
+  bool good = true;
+  MergeOperationOutput user_merge_out(merge_out->new_value,
+                                      merge_out->existing_operand);
+  if (merge_in.existing_value) {
+    Slice existing_value_without_ts(merge_in.existing_value->data(),
+                                    merge_in.existing_value->size() - ts_len);
+    good = user_merge_op_->FullMergeV2(
+        MergeOperationInput(merge_in.key, &existing_value_without_ts,
+                            operands_without_ts, merge_in.logger),
+        &user_merge_out);
+  } else {
+    good = user_merge_op_->FullMergeV2(
+        MergeOperationInput(merge_in.key, nullptr, operands_without_ts,
+                            merge_in.logger),
+        &user_merge_out);
+  }
+
+  // Return false if the user merge operator returned false
+  if (!good) {
+    return false;
+  }
+
+  if (merge_out->existing_operand.data()) {
+    merge_out->new_value.assign(merge_out->existing_operand.data(),
+                                merge_out->existing_operand.size());
+    merge_out->existing_operand = Slice(nullptr, 0);
+  }
+
+  // Augment the *new_value with the ttl time-stamp
+  int64_t curtime;
+  if (!clock_->GetCurrentTime(&curtime).ok()) {
+    ROCKS_LOG_ERROR(
+        merge_in.logger,
+        "Error: Could not get current time to be attached internally "
+        "to the new value.");
+    return false;
+  } else {
+    char ts_string[ts_len];
+    EncodeFixed32(ts_string, (int32_t)curtime);
+    merge_out->new_value.append(ts_string, ts_len);
+    return true;
+  }
+}
+
+bool TtlMergeOperator::PartialMergeMulti(const Slice& key,
+                                         const std::deque<Slice>& operand_list,
+                                         std::string* new_value,
+                                         Logger* logger) const {
+  const uint32_t ts_len = DBWithTTLImpl::kTSLength;
+  std::deque<Slice> operands_without_ts;
+
+  for (const auto& operand : operand_list) {
+    if (operand.size() < ts_len) {
+      ROCKS_LOG_ERROR(logger, "Error: Could not remove timestamp from value.");
+      return false;
+    }
+
+    operands_without_ts.push_back(
+        Slice(operand.data(), operand.size() - ts_len));
+  }
+
+  // Apply the user partial-merge operator (store result in *new_value)
+  assert(new_value);
+  if (!user_merge_op_->PartialMergeMulti(key, operands_without_ts, new_value,
+                                         logger)) {
+    return false;
+  }
+
+  // Augment the *new_value with the ttl time-stamp
+  int64_t curtime;
+  if (!clock_->GetCurrentTime(&curtime).ok()) {
+    ROCKS_LOG_ERROR(
+        logger,
+        "Error: Could not get current time to be attached internally "
+        "to the new value.");
+    return false;
+  } else {
+    char ts_string[ts_len];
+    EncodeFixed32(ts_string, (int32_t)curtime);
+    new_value->append(ts_string, ts_len);
+    return true;
+  }
+}
+
+Status TtlMergeOperator::PrepareOptions(const ConfigOptions& config_options) {
+  if (clock_ == nullptr) {
+    clock_ = config_options.env->GetSystemClock().get();
+  }
+  return MergeOperator::PrepareOptions(config_options);
+}
+
+Status TtlMergeOperator::ValidateOptions(
+    const DBOptions& db_opts, const ColumnFamilyOptions& cf_opts) const {
+  if (user_merge_op_ == nullptr) {
+    return Status::InvalidArgument(
+        "UserMergeOperator required by TtlMergeOperator");
+  } else if (clock_ == nullptr) {
+    return Status::InvalidArgument("SystemClock required by TtlMergeOperator");
+  } else {
+    return MergeOperator::ValidateOptions(db_opts, cf_opts);
+  }
+}
 
 void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options,
                                     SystemClock* clock) {
@@ -34,6 +171,139 @@ void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options,
   }
 }
 
+static std::unordered_map<std::string, OptionTypeInfo> ttl_type_info = {
+    {"ttl", {0, OptionType::kInt32T}},
+};
+
+static std::unordered_map<std::string, OptionTypeInfo> ttl_cff_type_info = {
+    {"user_filter_factory",
+     OptionTypeInfo::AsCustomSharedPtr<CompactionFilterFactory>(
+         0, OptionVerificationType::kByNameAllowFromNull,
+         OptionTypeFlags::kNone)}};
+static std::unordered_map<std::string, OptionTypeInfo> user_cf_type_info = {
+    {"user_filter",
+     OptionTypeInfo::AsCustomRawPtr<const CompactionFilter>(
+         0, OptionVerificationType::kByName, OptionTypeFlags::kAllowNull)}};
+
+TtlCompactionFilter::TtlCompactionFilter(
+    int32_t ttl, SystemClock* clock, const CompactionFilter* _user_comp_filter,
+    std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory)
+    : LayeredCompactionFilterBase(_user_comp_filter,
+                                  std::move(_user_comp_filter_from_factory)),
+      ttl_(ttl),
+      clock_(clock) {
+  RegisterOptions("TTL", &ttl_, &ttl_type_info);
+  RegisterOptions("UserFilter", &user_comp_filter_, &user_cf_type_info);
+}
+
+bool TtlCompactionFilter::Filter(int level, const Slice& key,
+                                 const Slice& old_val, std::string* new_val,
+                                 bool* value_changed) const {
+  if (DBWithTTLImpl::IsStale(old_val, ttl_, clock_)) {
+    return true;
+  }
+  if (user_comp_filter() == nullptr) {
+    return false;
+  }
+  assert(old_val.size() >= DBWithTTLImpl::kTSLength);
+  Slice old_val_without_ts(old_val.data(),
+                           old_val.size() - DBWithTTLImpl::kTSLength);
+  if (user_comp_filter()->Filter(level, key, old_val_without_ts, new_val,
+                                 value_changed)) {
+    return true;
+  }
+  if (*value_changed) {
+    new_val->append(old_val.data() + old_val.size() - DBWithTTLImpl::kTSLength,
+                    DBWithTTLImpl::kTSLength);
+  }
+  return false;
+}
+
+Status TtlCompactionFilter::PrepareOptions(
+    const ConfigOptions& config_options) {
+  if (clock_ == nullptr) {
+    clock_ = config_options.env->GetSystemClock().get();
+  }
+  return LayeredCompactionFilterBase::PrepareOptions(config_options);
+}
+
+Status TtlCompactionFilter::ValidateOptions(
+    const DBOptions& db_opts, const ColumnFamilyOptions& cf_opts) const {
+  if (clock_ == nullptr) {
+    return Status::InvalidArgument(
+        "SystemClock required by TtlCompactionFilter");
+  } else {
+    return LayeredCompactionFilterBase::ValidateOptions(db_opts, cf_opts);
+  }
+}
+
+TtlCompactionFilterFactory::TtlCompactionFilterFactory(
+    int32_t ttl, SystemClock* clock,
+    std::shared_ptr<CompactionFilterFactory> comp_filter_factory)
+    : ttl_(ttl), clock_(clock), user_comp_filter_factory_(comp_filter_factory) {
+  RegisterOptions("UserOptions", &user_comp_filter_factory_,
+                  &ttl_cff_type_info);
+  RegisterOptions("TTL", &ttl_, &ttl_type_info);
+}
+
+std::unique_ptr<CompactionFilter>
+TtlCompactionFilterFactory::CreateCompactionFilter(
+    const CompactionFilter::Context& context) {
+  std::unique_ptr<const CompactionFilter> user_comp_filter_from_factory =
+      nullptr;
+  if (user_comp_filter_factory_) {
+    user_comp_filter_from_factory =
+        user_comp_filter_factory_->CreateCompactionFilter(context);
+  }
+
+  return std::unique_ptr<TtlCompactionFilter>(new TtlCompactionFilter(
+      ttl_, clock_, nullptr, std::move(user_comp_filter_from_factory)));
+}
+
+Status TtlCompactionFilterFactory::PrepareOptions(
+    const ConfigOptions& config_options) {
+  if (clock_ == nullptr) {
+    clock_ = config_options.env->GetSystemClock().get();
+  }
+  return CompactionFilterFactory::PrepareOptions(config_options);
+}
+
+Status TtlCompactionFilterFactory::ValidateOptions(
+    const DBOptions& db_opts, const ColumnFamilyOptions& cf_opts) const {
+  if (clock_ == nullptr) {
+    return Status::InvalidArgument(
+        "SystemClock required by TtlCompactionFilterFactory");
+  } else {
+    return CompactionFilterFactory::ValidateOptions(db_opts, cf_opts);
+  }
+}
+
+int RegisterTtlObjects(ObjectLibrary& library, const std::string& /*arg*/) {
+  library.Register<MergeOperator>(
+      TtlMergeOperator::kClassName(),
+      [](const std::string& /*uri*/, std::unique_ptr<MergeOperator>* guard,
+         std::string* /* errmsg */) {
+        guard->reset(new TtlMergeOperator(nullptr, nullptr));
+        return guard->get();
+      });
+  library.Register<CompactionFilterFactory>(
+      TtlCompactionFilterFactory::kClassName(),
+      [](const std::string& /*uri*/,
+         std::unique_ptr<CompactionFilterFactory>* guard,
+         std::string* /* errmsg */) {
+        guard->reset(new TtlCompactionFilterFactory(0, nullptr, nullptr));
+        return guard->get();
+      });
+  library.Register<CompactionFilter>(
+      TtlCompactionFilter::kClassName(),
+      [](const std::string& /*uri*/,
+         std::unique_ptr<CompactionFilter>* /*guard*/,
+         std::string* /* errmsg */) {
+        return new TtlCompactionFilter(0, nullptr, nullptr);
+      });
+  size_t num_types;
+  return static_cast<int>(library.GetFactoryCount(&num_types));
+}
 // Open the db inside DBWithTTLImpl because options needs pointer to its ttl
 DBWithTTLImpl::DBWithTTLImpl(DB* db) : DBWithTTL(db), closed_(false) {}
 
@@ -68,9 +338,15 @@ Status UtilityDB::OpenTtlDB(const Options& options, const std::string& dbname,
   return s;
 }
 
+void DBWithTTLImpl::RegisterTtlClasses() {
+  static std::once_flag once;
+  std::call_once(once, [&]() {
+    ObjectRegistry::Default()->AddLibrary("TTL", RegisterTtlObjects, "");
+  });
+}
+
 Status DBWithTTL::Open(const Options& options, const std::string& dbname,
                        DBWithTTL** dbptr, int32_t ttl, bool read_only) {
-
   DBOptions db_options(options);
   ColumnFamilyOptions cf_options(options);
   std::vector<ColumnFamilyDescriptor> column_families;
@@ -93,6 +369,7 @@ Status DBWithTTL::Open(
     const std::vector<ColumnFamilyDescriptor>& column_families,
     std::vector<ColumnFamilyHandle*>* handles, DBWithTTL** dbptr,
     const std::vector<int32_t>& ttls, bool read_only) {
+  DBWithTTLImpl::RegisterTtlClasses();
   if (ttls.size() != column_families.size()) {
     return Status::InvalidArgument(
         "ttls size has to be the same as number of column families");
@@ -128,6 +405,7 @@ Status DBWithTTL::Open(
 Status DBWithTTLImpl::CreateColumnFamilyWithTtl(
     const ColumnFamilyOptions& options, const std::string& column_family_name,
     ColumnFamilyHandle** handle, int ttl) {
+  RegisterTtlClasses();
   ColumnFamilyOptions sanitized_options = options;
   DBWithTTLImpl::SanitizeOptions(ttl, &sanitized_options,
                                  GetEnv()->GetSystemClock().get());
index ab3ff3729d3967c20ec0c9e3317c6fb766b641dd..8f53bc497eff81e8df29514fb2e8f77918ada340 100644 (file)
 #endif
 
 namespace ROCKSDB_NAMESPACE {
-
+struct ConfigOptions;
+class ObjectLibrary;
+class ObjectRegistry;
 class DBWithTTLImpl : public DBWithTTL {
  public:
   static void SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options,
                               SystemClock* clock);
 
+  static void RegisterTtlClasses();
   explicit DBWithTTLImpl(DB* db);
 
   virtual ~DBWithTTLImpl();
@@ -155,37 +158,24 @@ class TtlCompactionFilter : public LayeredCompactionFilterBase {
   TtlCompactionFilter(int32_t ttl, SystemClock* clock,
                       const CompactionFilter* _user_comp_filter,
                       std::unique_ptr<const CompactionFilter>
-                          _user_comp_filter_from_factory = nullptr)
-      : LayeredCompactionFilterBase(_user_comp_filter,
-                                    std::move(_user_comp_filter_from_factory)),
-        ttl_(ttl),
-        clock_(clock) {}
+                          _user_comp_filter_from_factory = nullptr);
 
   virtual bool Filter(int level, const Slice& key, const Slice& old_val,
-                      std::string* new_val, bool* value_changed) const
-      override {
-    if (DBWithTTLImpl::IsStale(old_val, ttl_, clock_)) {
-      return true;
-    }
-    if (user_comp_filter() == nullptr) {
-      return false;
-    }
-    assert(old_val.size() >= DBWithTTLImpl::kTSLength);
-    Slice old_val_without_ts(old_val.data(),
-                             old_val.size() - DBWithTTLImpl::kTSLength);
-    if (user_comp_filter()->Filter(level, key, old_val_without_ts, new_val,
-                                   value_changed)) {
+                      std::string* new_val, bool* value_changed) const override;
+
+  const char* Name() const override { return kClassName(); }
+  static const char* kClassName() { return "TtlCompactionFilter"; }
+  bool IsInstanceOf(const std::string& name) const override {
+    if (name == "Delete By TTL") {
       return true;
+    } else {
+      return LayeredCompactionFilterBase::IsInstanceOf(name);
     }
-    if (*value_changed) {
-      new_val->append(
-          old_val.data() + old_val.size() - DBWithTTLImpl::kTSLength,
-          DBWithTTLImpl::kTSLength);
-    }
-    return false;
   }
 
-  virtual const char* Name() const override { return "Delete By TTL"; }
+  Status PrepareOptions(const ConfigOptions& config_options) override;
+  Status ValidateOptions(const DBOptions& db_opts,
+                         const ColumnFamilyOptions& cf_opts) const override;
 
  private:
   int32_t ttl_;
@@ -196,30 +186,21 @@ class TtlCompactionFilterFactory : public CompactionFilterFactory {
  public:
   TtlCompactionFilterFactory(
       int32_t ttl, SystemClock* clock,
-      std::shared_ptr<CompactionFilterFactory> comp_filter_factory)
-      : ttl_(ttl),
-        clock_(clock),
-        user_comp_filter_factory_(comp_filter_factory) {}
-
-  virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
-      const CompactionFilter::Context& context) override {
-    std::unique_ptr<const CompactionFilter> user_comp_filter_from_factory =
-        nullptr;
-    if (user_comp_filter_factory_) {
-      user_comp_filter_from_factory =
-          user_comp_filter_factory_->CreateCompactionFilter(context);
-    }
-
-    return std::unique_ptr<TtlCompactionFilter>(new TtlCompactionFilter(
-        ttl_, clock_, nullptr, std::move(user_comp_filter_from_factory)));
-  }
+      std::shared_ptr<CompactionFilterFactory> comp_filter_factory);
 
+  std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+      const CompactionFilter::Context& context) override;
   void SetTtl(int32_t ttl) {
     ttl_ = ttl;
   }
 
-  virtual const char* Name() const override {
-    return "TtlCompactionFilterFactory";
+  const char* Name() const override { return kClassName(); }
+  static const char* kClassName() { return "TtlCompactionFilterFactory"; }
+  Status PrepareOptions(const ConfigOptions& config_options) override;
+  Status ValidateOptions(const DBOptions& db_opts,
+                         const ColumnFamilyOptions& cf_opts) const override;
+  const Customizable* Inner() const override {
+    return user_comp_filter_factory_.get();
   }
 
  private:
@@ -232,125 +213,38 @@ class TtlMergeOperator : public MergeOperator {
 
  public:
   explicit TtlMergeOperator(const std::shared_ptr<MergeOperator>& merge_op,
-                            SystemClock* clock)
-      : user_merge_op_(merge_op), clock_(clock) {
-    assert(merge_op);
-    assert(clock);
-  }
+                            SystemClock* clock);
 
-  virtual bool FullMergeV2(const MergeOperationInput& merge_in,
-                           MergeOperationOutput* merge_out) const override {
-    const uint32_t ts_len = DBWithTTLImpl::kTSLength;
-    if (merge_in.existing_value && merge_in.existing_value->size() < ts_len) {
-      ROCKS_LOG_ERROR(merge_in.logger,
-                      "Error: Could not remove timestamp from existing value.");
-      return false;
-    }
+  bool FullMergeV2(const MergeOperationInput& merge_in,
+                   MergeOperationOutput* merge_out) const override;
 
-    // Extract time-stamp from each operand to be passed to user_merge_op_
-    std::vector<Slice> operands_without_ts;
-    for (const auto& operand : merge_in.operand_list) {
-      if (operand.size() < ts_len) {
-        ROCKS_LOG_ERROR(
-            merge_in.logger,
-            "Error: Could not remove timestamp from operand value.");
-        return false;
-      }
-      operands_without_ts.push_back(operand);
-      operands_without_ts.back().remove_suffix(ts_len);
-    }
+  bool PartialMergeMulti(const Slice& key,
+                         const std::deque<Slice>& operand_list,
+                         std::string* new_value, Logger* logger) const override;
 
-    // Apply the user merge operator (store result in *new_value)
-    bool good = true;
-    MergeOperationOutput user_merge_out(merge_out->new_value,
-                                        merge_out->existing_operand);
-    if (merge_in.existing_value) {
-      Slice existing_value_without_ts(merge_in.existing_value->data(),
-                                      merge_in.existing_value->size() - ts_len);
-      good = user_merge_op_->FullMergeV2(
-          MergeOperationInput(merge_in.key, &existing_value_without_ts,
-                              operands_without_ts, merge_in.logger),
-          &user_merge_out);
-    } else {
-      good = user_merge_op_->FullMergeV2(
-          MergeOperationInput(merge_in.key, nullptr, operands_without_ts,
-                              merge_in.logger),
-          &user_merge_out);
-    }
+  static const char* kClassName() { return "TtlMergeOperator"; }
 
-    // Return false if the user merge operator returned false
-    if (!good) {
-      return false;
-    }
-
-    if (merge_out->existing_operand.data()) {
-      merge_out->new_value.assign(merge_out->existing_operand.data(),
-                                  merge_out->existing_operand.size());
-      merge_out->existing_operand = Slice(nullptr, 0);
-    }
-
-    // Augment the *new_value with the ttl time-stamp
-    int64_t curtime;
-    if (!clock_->GetCurrentTime(&curtime).ok()) {
-      ROCKS_LOG_ERROR(
-          merge_in.logger,
-          "Error: Could not get current time to be attached internally "
-          "to the new value.");
-      return false;
-    } else {
-      char ts_string[ts_len];
-      EncodeFixed32(ts_string, (int32_t)curtime);
-      merge_out->new_value.append(ts_string, ts_len);
+  const char* Name() const override { return kClassName(); }
+  bool IsInstanceOf(const std::string& name) const override {
+    if (name == "Merge By TTL") {
       return true;
-    }
-  }
-
-  virtual bool PartialMergeMulti(const Slice& key,
-                                 const std::deque<Slice>& operand_list,
-                                 std::string* new_value, Logger* logger) const
-      override {
-    const uint32_t ts_len = DBWithTTLImpl::kTSLength;
-    std::deque<Slice> operands_without_ts;
-
-    for (const auto& operand : operand_list) {
-      if (operand.size() < ts_len) {
-        ROCKS_LOG_ERROR(logger,
-                        "Error: Could not remove timestamp from value.");
-        return false;
-      }
-
-      operands_without_ts.push_back(
-          Slice(operand.data(), operand.size() - ts_len));
-    }
-
-    // Apply the user partial-merge operator (store result in *new_value)
-    assert(new_value);
-    if (!user_merge_op_->PartialMergeMulti(key, operands_without_ts, new_value,
-                                           logger)) {
-      return false;
-    }
-
-    // Augment the *new_value with the ttl time-stamp
-    int64_t curtime;
-    if (!clock_->GetCurrentTime(&curtime).ok()) {
-      ROCKS_LOG_ERROR(
-          logger,
-          "Error: Could not get current time to be attached internally "
-          "to the new value.");
-      return false;
     } else {
-      char ts_string[ts_len];
-      EncodeFixed32(ts_string, (int32_t)curtime);
-      new_value->append(ts_string, ts_len);
-      return true;
+      return MergeOperator::IsInstanceOf(name);
     }
   }
 
-  virtual const char* Name() const override { return "Merge By TTL"; }
+  Status PrepareOptions(const ConfigOptions& config_options) override;
+  Status ValidateOptions(const DBOptions& db_opts,
+                         const ColumnFamilyOptions& cf_opts) const override;
+  const Customizable* Inner() const override { return user_merge_op_.get(); }
 
  private:
   std::shared_ptr<MergeOperator> user_merge_op_;
   SystemClock* clock_;
 };
+extern "C" {
+int RegisterTtlObjects(ObjectLibrary& library, const std::string& /*arg*/);
+}  // extern "C"
+
 }  // namespace ROCKSDB_NAMESPACE
 #endif  // ROCKSDB_LITE
index 8e60fbc763f600020227e16ab6847d7dc7d79f7b..c657dfe2b54caf590ed04f937a7442d8653c951d 100644 (file)
@@ -7,10 +7,16 @@
 
 #include <map>
 #include <memory>
+
 #include "rocksdb/compaction_filter.h"
+#include "rocksdb/convenience.h"
+#include "rocksdb/merge_operator.h"
 #include "rocksdb/utilities/db_ttl.h"
+#include "rocksdb/utilities/object_registry.h"
 #include "test_util/testharness.h"
 #include "util/string_util.h"
+#include "utilities/merge_operators/bytesxor.h"
+#include "utilities/ttl/db_ttl_impl.h"
 #ifndef OS_WIN
 #include <unistd.h>
 #endif
@@ -719,6 +725,171 @@ TEST_F(TtlTest, DeleteRangeTest) {
   CloseTtl();
 }
 
+class DummyFilter : public CompactionFilter {
+ public:
+  bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
+              std::string* /*new_value*/,
+              bool* /*value_changed*/) const override {
+    return false;
+  }
+
+  const char* Name() const override { return kClassName(); }
+  static const char* kClassName() { return "DummyFilter"; }
+};
+
+class DummyFilterFactory : public CompactionFilterFactory {
+ public:
+  const char* Name() const override { return kClassName(); }
+  static const char* kClassName() { return "DummyFilterFactory"; }
+
+  std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+      const CompactionFilter::Context&) override {
+    std::unique_ptr<CompactionFilter> f(new DummyFilter());
+    return f;
+  }
+};
+
+static int RegisterTestObjects(ObjectLibrary& library,
+                               const std::string& /*arg*/) {
+  library.Register<CompactionFilter>(
+      "DummyFilter", [](const std::string& /*uri*/,
+                        std::unique_ptr<CompactionFilter>* /*guard*/,
+                        std::string* /* errmsg */) {
+        static DummyFilter dummy;
+        return &dummy;
+      });
+  library.Register<CompactionFilterFactory>(
+      "DummyFilterFactory", [](const std::string& /*uri*/,
+                               std::unique_ptr<CompactionFilterFactory>* guard,
+                               std::string* /* errmsg */) {
+        guard->reset(new DummyFilterFactory());
+        return guard->get();
+      });
+  return 2;
+}
+
+class TtlOptionsTest : public testing::Test {
+ public:
+  TtlOptionsTest() {
+    config_options_.registry->AddLibrary("RegisterTtlObjects",
+                                         RegisterTtlObjects, "");
+    config_options_.registry->AddLibrary("RegisterTtlTestObjects",
+                                         RegisterTestObjects, "");
+  }
+  ConfigOptions config_options_;
+};
+
+TEST_F(TtlOptionsTest, LoadTtlCompactionFilter) {
+  const CompactionFilter* filter = nullptr;
+
+  ASSERT_OK(CompactionFilter::CreateFromString(
+      config_options_, TtlCompactionFilter::kClassName(), &filter));
+  ASSERT_NE(filter, nullptr);
+  ASSERT_STREQ(filter->Name(), TtlCompactionFilter::kClassName());
+  auto ttl = filter->GetOptions<int32_t>("TTL");
+  ASSERT_NE(ttl, nullptr);
+  ASSERT_EQ(*ttl, 0);
+  ASSERT_OK(filter->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
+  delete filter;
+  filter = nullptr;
+
+  ASSERT_OK(CompactionFilter::CreateFromString(
+      config_options_, "id=TtlCompactionFilter; ttl=123", &filter));
+  ASSERT_NE(filter, nullptr);
+  ttl = filter->GetOptions<int32_t>("TTL");
+  ASSERT_NE(ttl, nullptr);
+  ASSERT_EQ(*ttl, 123);
+  ASSERT_OK(filter->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
+  delete filter;
+  filter = nullptr;
+
+  ASSERT_OK(CompactionFilter::CreateFromString(
+      config_options_,
+      "id=TtlCompactionFilter; ttl=456; user_filter=DummyFilter;", &filter));
+  ASSERT_NE(filter, nullptr);
+  auto inner = filter->CheckedCast<DummyFilter>();
+  ASSERT_NE(inner, nullptr);
+  ASSERT_OK(filter->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
+  std::string mismatch;
+  std::string opts_str = filter->ToString(config_options_);
+  const CompactionFilter* copy = nullptr;
+  ASSERT_OK(
+      CompactionFilter::CreateFromString(config_options_, opts_str, &copy));
+  ASSERT_TRUE(filter->AreEquivalent(config_options_, copy, &mismatch));
+  delete filter;
+  delete copy;
+}
+
+TEST_F(TtlOptionsTest, LoadTtlCompactionFilterFactory) {
+  std::shared_ptr<CompactionFilterFactory> cff;
+
+  ASSERT_OK(CompactionFilterFactory::CreateFromString(
+      config_options_, TtlCompactionFilterFactory::kClassName(), &cff));
+  ASSERT_NE(cff.get(), nullptr);
+  ASSERT_STREQ(cff->Name(), TtlCompactionFilterFactory::kClassName());
+  auto ttl = cff->GetOptions<int32_t>("TTL");
+  ASSERT_NE(ttl, nullptr);
+  ASSERT_EQ(*ttl, 0);
+  ASSERT_OK(cff->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
+
+  ASSERT_OK(CompactionFilterFactory::CreateFromString(
+      config_options_, "id=TtlCompactionFilterFactory; ttl=123", &cff));
+  ASSERT_NE(cff.get(), nullptr);
+  ASSERT_STREQ(cff->Name(), TtlCompactionFilterFactory::kClassName());
+  ttl = cff->GetOptions<int32_t>("TTL");
+  ASSERT_NE(ttl, nullptr);
+  ASSERT_EQ(*ttl, 123);
+  ASSERT_OK(cff->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
+
+  ASSERT_OK(CompactionFilterFactory::CreateFromString(
+      config_options_,
+      "id=TtlCompactionFilterFactory; ttl=456; "
+      "user_filter_factory=DummyFilterFactory;",
+      &cff));
+  ASSERT_NE(cff.get(), nullptr);
+  auto filter = cff->CreateCompactionFilter(CompactionFilter::Context());
+  ASSERT_NE(filter.get(), nullptr);
+  auto ttlf = filter->CheckedCast<TtlCompactionFilter>();
+  ASSERT_EQ(filter.get(), ttlf);
+  auto user = filter->CheckedCast<DummyFilter>();
+  ASSERT_NE(user, nullptr);
+  ASSERT_OK(cff->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
+
+  std::string opts_str = cff->ToString(config_options_);
+  std::string mismatch;
+  std::shared_ptr<CompactionFilterFactory> copy;
+  ASSERT_OK(CompactionFilterFactory::CreateFromString(config_options_, opts_str,
+                                                      &copy));
+  ASSERT_TRUE(cff->AreEquivalent(config_options_, copy.get(), &mismatch));
+}
+
+TEST_F(TtlOptionsTest, LoadTtlMergeOperator) {
+  std::shared_ptr<MergeOperator> mo;
+
+  config_options_.invoke_prepare_options = false;
+  ASSERT_OK(MergeOperator::CreateFromString(
+      config_options_, TtlMergeOperator::kClassName(), &mo));
+  ASSERT_NE(mo.get(), nullptr);
+  ASSERT_STREQ(mo->Name(), TtlMergeOperator::kClassName());
+  ASSERT_NOK(mo->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
+
+  config_options_.invoke_prepare_options = true;
+  ASSERT_OK(MergeOperator::CreateFromString(
+      config_options_, "id=TtlMergeOperator; user_operator=bytesxor", &mo));
+  ASSERT_NE(mo.get(), nullptr);
+  ASSERT_STREQ(mo->Name(), TtlMergeOperator::kClassName());
+  ASSERT_OK(mo->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
+  auto ttl_mo = mo->CheckedCast<TtlMergeOperator>();
+  ASSERT_EQ(mo.get(), ttl_mo);
+  auto user = ttl_mo->CheckedCast<BytesXOROperator>();
+  ASSERT_NE(user, nullptr);
+
+  std::string mismatch;
+  std::string opts_str = mo->ToString(config_options_);
+  std::shared_ptr<MergeOperator> copy;
+  ASSERT_OK(MergeOperator::CreateFromString(config_options_, opts_str, &copy));
+  ASSERT_TRUE(mo->AreEquivalent(config_options_, copy.get(), &mismatch));
+}
 }  // namespace ROCKSDB_NAMESPACE
 
 // A black-box test for the ttl wrapper around rocksdb