utilities/cassandra/format.cc
utilities/cassandra/merge_operator.cc
utilities/checkpoint/checkpoint_impl.cc
+ utilities/compaction_filters.cc
utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc
utilities/debug.cc
utilities/env_mirror.cc
utilities/fault_injection_fs.cc
utilities/leveldb_options/leveldb_options.cc
utilities/memory/memory_util.cc
+ utilities/merge_operators.cc
utilities/merge_operators/bytesxor.cc
utilities/merge_operators/max.cc
utilities/merge_operators/put.cc
"utilities/cassandra/format.cc",
"utilities/cassandra/merge_operator.cc",
"utilities/checkpoint/checkpoint_impl.cc",
+ "utilities/compaction_filters.cc",
"utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc",
"utilities/convenience/info_log_finder.cc",
"utilities/debug.cc",
"utilities/fault_injection_fs.cc",
"utilities/leveldb_options/leveldb_options.cc",
"utilities/memory/memory_util.cc",
+ "utilities/merge_operators.cc",
"utilities/merge_operators/bytesxor.cc",
"utilities/merge_operators/max.cc",
"utilities/merge_operators/put.cc",
"utilities/cassandra/format.cc",
"utilities/cassandra/merge_operator.cc",
"utilities/checkpoint/checkpoint_impl.cc",
+ "utilities/compaction_filters.cc",
"utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc",
"utilities/convenience/info_log_finder.cc",
"utilities/debug.cc",
"utilities/fault_injection_fs.cc",
"utilities/leveldb_options/leveldb_options.cc",
"utilities/memory/memory_util.cc",
+ "utilities/merge_operators.cc",
"utilities/merge_operators/bytesxor.cc",
"utilities/merge_operators/max.cc",
"utilities/merge_operators/put.cc",
TEST_F(CompactionServiceTest, CompactionFilter) {
Options options = CurrentOptions();
options.env = env_;
- auto delete_comp_filter = PartialDeleteCompactionFilter();
- options.compaction_filter = &delete_comp_filter;
+ std::unique_ptr<CompactionFilter> delete_comp_filter(
+ new PartialDeleteCompactionFilter());
+ options.compaction_filter = delete_comp_filter.get();
options.compaction_service =
std::make_shared<MyTestCompactionService>(dbname_, options);
#include <string>
#include <vector>
+#include "rocksdb/customizable.h"
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/types.h"
// CompactionFilter allows an application to modify/delete a key-value during
// table file creation.
-class CompactionFilter {
+class CompactionFilter : public Customizable {
public:
enum ValueType {
kValue,
};
virtual ~CompactionFilter() {}
+ static const char* Type() { return "CompactionFilter"; }
+ static Status CreateFromString(const ConfigOptions& config_options,
+ const std::string& name,
+ const CompactionFilter** result);
// The table file creation process invokes this method before adding a kv to
// the table file. A return value of false indicates that the kv should be
// Returns a name that identifies this `CompactionFilter`.
// The name will be printed to LOG file on start up for diagnosis.
- virtual const char* Name() const = 0;
+ const char* Name() const override = 0;
// Internal (BlobDB) use only. Do not override in application code.
virtual bool IsStackedBlobDbInternalCompactionFilter() const { return false; }
// `CompactionFilter` according to `ShouldFilterTableFileCreation()`. This
// allows the application to know about the different ongoing threads of work
// and makes it unnecessary for `CompactionFilter` to provide thread-safety.
-class CompactionFilterFactory {
+class CompactionFilterFactory : public Customizable {
public:
virtual ~CompactionFilterFactory() {}
+ static const char* Type() { return "CompactionFilterFactory"; }
+ static Status CreateFromString(
+ const ConfigOptions& config_options, const std::string& name,
+ std::shared_ptr<CompactionFilterFactory>* result);
// Returns whether a thread creating table files for the specified `reason`
// should invoke `CreateCompactionFilter()` and pass KVs through the returned
// @param name The name of the instance to find.
// Returns true if the class is an instance of the input name.
virtual bool IsInstanceOf(const std::string& name) const {
- return name == Name();
+ if (name.empty()) {
+ return false;
+ } else if (name == Name()) {
+ return true;
+ } else {
+ const char* nickname = NickName();
+ if (nickname != nullptr && name == nickname) {
+ return true;
+ } else {
+ return false;
+ }
+ }
}
// Returns the named instance of the Customizable as a T*, or nullptr if not
virtual const Customizable* Inner() const { return nullptr; }
protected:
+ // Some classes have both a class name (e.g. PutOperator) and a nickname
+ // (e.g. put). Classes can override this method to return a
+ // nickname. Nicknames can be used by InstanceOf and object creation.
+ virtual const char* NickName() const { return ""; }
// Given a name (e.g. rocksdb.my.type.opt), returns the short name (opt)
std::string GetOptionName(const std::string& long_name) const override;
#ifndef ROCKSDB_LITE
#include <string>
#include <vector>
+#include "rocksdb/customizable.h"
#include "rocksdb/slice.h"
namespace ROCKSDB_NAMESPACE {
//
// Refer to rocksdb-merge wiki for more details and example implementations.
//
-class MergeOperator {
+class MergeOperator : public Customizable {
public:
virtual ~MergeOperator() {}
static const char* Type() { return "MergeOperator"; }
+ static Status CreateFromString(const ConfigOptions& opts,
+ const std::string& id,
+ std::shared_ptr<MergeOperator>* result);
// Gives the client a way to express the read -> modify -> write semantics
// key: (IN) The key that's associated with this merge operation.
kCompactionPri,
kSliceTransform,
kCompressionType,
- kCompactionFilter,
- kCompactionFilterFactory,
kCompactionStopStyle,
- kMergeOperator,
kMemTableRepFactory,
kFilterPolicy,
kChecksumType,
#include "options/options_helper.h"
#include "options/options_parser.h"
#include "port/port.h"
+#include "rocksdb/compaction_filter.h"
#include "rocksdb/concurrent_task_limiter.h"
#include "rocksdb/configurable.h"
#include "rocksdb/convenience.h"
}
}}},
{"compaction_filter",
- {offset_of(&ImmutableCFOptions::compaction_filter),
- OptionType::kCompactionFilter, OptionVerificationType::kByName,
- OptionTypeFlags::kNone}},
+ OptionTypeInfo::AsCustomRawPtr<const CompactionFilter>(
+ offset_of(&ImmutableCFOptions::compaction_filter),
+ OptionVerificationType::kByName, OptionTypeFlags::kAllowNull)},
{"compaction_filter_factory",
- {offset_of(&ImmutableCFOptions::compaction_filter_factory),
- OptionType::kCompactionFilterFactory, OptionVerificationType::kByName,
- OptionTypeFlags::kNone}},
+ OptionTypeInfo::AsCustomSharedPtr<CompactionFilterFactory>(
+ offset_of(&ImmutableCFOptions::compaction_filter_factory),
+ OptionVerificationType::kByName, OptionTypeFlags::kAllowNull)},
{"merge_operator",
- {offset_of(&ImmutableCFOptions::merge_operator),
- OptionType::kMergeOperator,
- OptionVerificationType::kByNameAllowFromNull,
- OptionTypeFlags::kCompareLoose,
- // Parses the input value as a MergeOperator, updating the value
- [](const ConfigOptions& opts, const std::string& /*name*/,
- const std::string& value, void* addr) {
- auto mop = static_cast<std::shared_ptr<MergeOperator>*>(addr);
- Status status =
- opts.registry->NewSharedObject<MergeOperator>(value, mop);
- // Only support static comparator for now.
- if (status.ok()) {
- return status;
- }
- return Status::OK();
- }}},
+ OptionTypeInfo::AsCustomSharedPtr<MergeOperator>(
+ offset_of(&ImmutableCFOptions::merge_operator),
+ OptionVerificationType::kByNameAllowFromNull,
+ OptionTypeFlags::kCompareLoose | OptionTypeFlags::kAllowNull)},
{"compaction_style",
{offset_of(&ImmutableCFOptions::compaction_style),
OptionType::kCompactionStyle, OptionVerificationType::kNormal,
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "util/string_util.h"
+#include "utilities/compaction_filters/remove_emptyvalue_compactionfilter.h"
#ifndef GFLAGS
bool FLAGS_enable_print = false;
static test::SimpleSuffixReverseComparator ssrc;
return &ssrc;
});
+ library.Register<MergeOperator>(
+ "Changling",
+ [](const std::string& uri, std::unique_ptr<MergeOperator>* guard,
+ std::string* /* errmsg */) {
+ guard->reset(new test::ChanglingMergeOperator(uri));
+ return guard->get();
+ });
+ library.Register<CompactionFilter>(
+ "Changling",
+ [](const std::string& uri, std::unique_ptr<CompactionFilter>* /*guard*/,
+ std::string* /* errmsg */) {
+ return new test::ChanglingCompactionFilter(uri);
+ });
+ library.Register<CompactionFilterFactory>(
+ "Changling", [](const std::string& uri,
+ std::unique_ptr<CompactionFilterFactory>* guard,
+ std::string* /* errmsg */) {
+ guard->reset(new test::ChanglingCompactionFilterFactory(uri));
+ return guard->get();
+ });
return static_cast<int>(library.GetFactoryCount(&num_types));
}
}
}
+TEST_F(LoadCustomizableTest, LoadMergeOperatorTest) {
+ std::shared_ptr<MergeOperator> result;
+
+ ASSERT_NOK(
+ MergeOperator::CreateFromString(config_options_, "Changling", &result));
+ ASSERT_OK(MergeOperator::CreateFromString(config_options_, "put", &result));
+ ASSERT_NE(result, nullptr);
+ ASSERT_STREQ(result->Name(), "PutOperator");
+ if (RegisterTests("Test")) {
+ ASSERT_OK(
+ MergeOperator::CreateFromString(config_options_, "Changling", &result));
+ ASSERT_NE(result, nullptr);
+ ASSERT_STREQ(result->Name(), "ChanglingMergeOperator");
+ }
+}
+
+TEST_F(LoadCustomizableTest, LoadCompactionFilterFactoryTest) {
+ std::shared_ptr<CompactionFilterFactory> result;
+
+ ASSERT_NOK(CompactionFilterFactory::CreateFromString(config_options_,
+ "Changling", &result));
+ if (RegisterTests("Test")) {
+ ASSERT_OK(CompactionFilterFactory::CreateFromString(config_options_,
+ "Changling", &result));
+ ASSERT_NE(result, nullptr);
+ ASSERT_STREQ(result->Name(), "ChanglingCompactionFilterFactory");
+ }
+}
+
+TEST_F(LoadCustomizableTest, LoadCompactionFilterTest) {
+ const CompactionFilter* result = nullptr;
+
+ ASSERT_NOK(CompactionFilter::CreateFromString(config_options_, "Changling",
+ &result));
+#ifndef ROCKSDB_LITE
+ ASSERT_OK(CompactionFilter::CreateFromString(
+ config_options_, RemoveEmptyValueCompactionFilter::kClassName(),
+ &result));
+ ASSERT_NE(result, nullptr);
+ ASSERT_STREQ(result->Name(), RemoveEmptyValueCompactionFilter::kClassName());
+ delete result;
+ result = nullptr;
+ if (RegisterTests("Test")) {
+ ASSERT_OK(CompactionFilter::CreateFromString(config_options_, "Changling",
+ &result));
+ ASSERT_NE(result, nullptr);
+ ASSERT_STREQ(result->Name(), "ChanglingCompactionFilter");
+ delete result;
+ }
+#endif // ROCKSDB_LITE
+}
+
#ifndef ROCKSDB_LITE
TEST_F(LoadCustomizableTest, LoadEventListenerTest) {
std::shared_ptr<EventListener> result;
: kNullptrString;
break;
}
- case OptionType::kCompactionFilter: {
- // it's a const pointer of const CompactionFilter*
- const auto* ptr =
- static_cast<const CompactionFilter* const*>(opt_address);
- *value = *ptr ? (*ptr)->Name() : kNullptrString;
- break;
- }
- case OptionType::kCompactionFilterFactory: {
- const auto* ptr =
- static_cast<const std::shared_ptr<CompactionFilterFactory>*>(
- opt_address);
- *value = ptr->get() ? ptr->get()->Name() : kNullptrString;
- break;
- }
case OptionType::kMemTableRepFactory: {
const auto* ptr =
static_cast<const std::shared_ptr<MemTableRepFactory>*>(opt_address);
*value = ptr->get() ? ptr->get()->Name() : kNullptrString;
break;
}
- case OptionType::kMergeOperator: {
- const auto* ptr =
- static_cast<const std::shared_ptr<MergeOperator>*>(opt_address);
- *value = ptr->get() ? ptr->get()->Name() : kNullptrString;
- break;
- }
case OptionType::kFilterPolicy: {
const auto* ptr =
static_cast<const std::shared_ptr<FilterPolicy>*>(opt_address);
#include "util/stderr_logger.h"
#include "util/string_util.h"
#include "utilities/merge_operators/bytesxor.h"
+#include "utilities/merge_operators/sortlist.h"
+#include "utilities/merge_operators/string_append/stringappend.h"
+#include "utilities/merge_operators/string_append/stringappend2.h"
#ifndef GFLAGS
bool FLAGS_enable_print = false;
// MergeOperator from object registry
std::unique_ptr<BytesXOROperator> bxo(new BytesXOROperator());
std::string kMoName = bxo->Name();
- ObjectLibrary::Default()->Register<MergeOperator>(
- kMoName,
- [](const std::string& /*name*/, std::unique_ptr<MergeOperator>* guard,
- std::string* /* errmsg */) {
- guard->reset(new BytesXOROperator());
- return guard->get();
- });
ASSERT_OK(GetColumnFamilyOptionsFromString(config_options, base_cf_opt,
"merge_operator=" + kMoName + ";",
// MergeOperator from object registry
std::unique_ptr<BytesXOROperator> bxo(new BytesXOROperator());
std::string kMoName = bxo->Name();
- ObjectLibrary::Default()->Register<MergeOperator>(
- kMoName,
- [](const std::string& /*name*/, std::unique_ptr<MergeOperator>* guard,
- std::string* /* errmsg */) {
- guard->reset(new BytesXOROperator());
- return guard->get();
- });
-
ASSERT_OK(GetColumnFamilyOptionsFromString(
base_cf_opt, "merge_operator=" + kMoName + ";", &new_cf_opt));
ASSERT_EQ(kMoName, std::string(new_cf_opt.merge_operator->Name()));
// change the name of merge operator back-and-forth
{
- auto* merge_operator = dynamic_cast<test::ChanglingMergeOperator*>(
- base_cf_opt->merge_operator.get());
+ auto* merge_operator = base_cf_opt->merge_operator
+ ->CheckedCast<test::ChanglingMergeOperator>();
if (merge_operator != nullptr) {
name_buffer = merge_operator->Name();
// change the name and expect non-ok status
// change the name of the compaction filter factory back-and-forth
{
auto* compaction_filter_factory =
- dynamic_cast<test::ChanglingCompactionFilterFactory*>(
- base_cf_opt->compaction_filter_factory.get());
+ base_cf_opt->compaction_filter_factory
+ ->CheckedCast<test::ChanglingCompactionFilterFactory>();
if (compaction_filter_factory != nullptr) {
name_buffer = compaction_filter_factory->Name();
// change the name and expect non-ok status
delete mem_env;
}
+TEST_F(ConfigOptionsTest, MergeOperatorFromString) {
+ ConfigOptions config_options;
+ std::shared_ptr<MergeOperator> merge_op;
+
+ ASSERT_OK(MergeOperator::CreateFromString(config_options, "put", &merge_op));
+ ASSERT_NE(merge_op, nullptr);
+ ASSERT_TRUE(merge_op->IsInstanceOf("put"));
+ ASSERT_STREQ(merge_op->Name(), "PutOperator");
+
+ ASSERT_OK(
+ MergeOperator::CreateFromString(config_options, "put_v1", &merge_op));
+ ASSERT_NE(merge_op, nullptr);
+ ASSERT_TRUE(merge_op->IsInstanceOf("PutOperator"));
+
+ ASSERT_OK(
+ MergeOperator::CreateFromString(config_options, "uint64add", &merge_op));
+ ASSERT_NE(merge_op, nullptr);
+ ASSERT_TRUE(merge_op->IsInstanceOf("uint64add"));
+ ASSERT_STREQ(merge_op->Name(), "UInt64AddOperator");
+
+ ASSERT_OK(MergeOperator::CreateFromString(config_options, "max", &merge_op));
+ ASSERT_NE(merge_op, nullptr);
+ ASSERT_TRUE(merge_op->IsInstanceOf("max"));
+ ASSERT_STREQ(merge_op->Name(), "MaxOperator");
+
+ ASSERT_OK(
+ MergeOperator::CreateFromString(config_options, "bytesxor", &merge_op));
+ ASSERT_NE(merge_op, nullptr);
+ ASSERT_TRUE(merge_op->IsInstanceOf("bytesxor"));
+ ASSERT_STREQ(merge_op->Name(), BytesXOROperator::kClassName());
+
+ ASSERT_OK(
+ MergeOperator::CreateFromString(config_options, "sortlist", &merge_op));
+ ASSERT_NE(merge_op, nullptr);
+ ASSERT_TRUE(merge_op->IsInstanceOf("sortlist"));
+ ASSERT_STREQ(merge_op->Name(), SortList::kClassName());
+
+ ASSERT_OK(MergeOperator::CreateFromString(config_options, "stringappend",
+ &merge_op));
+ ASSERT_NE(merge_op, nullptr);
+ ASSERT_TRUE(merge_op->IsInstanceOf("stringappend"));
+ ASSERT_STREQ(merge_op->Name(), StringAppendOperator::kClassName());
+ auto delimiter = merge_op->GetOptions<std::string>("Delimiter");
+ ASSERT_NE(delimiter, nullptr);
+ ASSERT_EQ(*delimiter, ",");
+
+ ASSERT_OK(MergeOperator::CreateFromString(config_options, "stringappendtest",
+ &merge_op));
+ ASSERT_NE(merge_op, nullptr);
+ ASSERT_TRUE(merge_op->IsInstanceOf("stringappendtest"));
+ ASSERT_STREQ(merge_op->Name(), StringAppendTESTOperator::kClassName());
+ delimiter = merge_op->GetOptions<std::string>("Delimiter");
+ ASSERT_NE(delimiter, nullptr);
+ ASSERT_EQ(*delimiter, ",");
+
+ ASSERT_OK(MergeOperator::CreateFromString(
+ config_options, "id=stringappend; delimiter=||", &merge_op));
+ ASSERT_NE(merge_op, nullptr);
+ ASSERT_TRUE(merge_op->IsInstanceOf("stringappend"));
+ ASSERT_STREQ(merge_op->Name(), StringAppendOperator::kClassName());
+ delimiter = merge_op->GetOptions<std::string>("Delimiter");
+ ASSERT_NE(delimiter, nullptr);
+ ASSERT_EQ(*delimiter, "||");
+
+ ASSERT_OK(MergeOperator::CreateFromString(
+ config_options, "id=stringappendtest; delimiter=&&", &merge_op));
+ ASSERT_NE(merge_op, nullptr);
+ ASSERT_TRUE(merge_op->IsInstanceOf("stringappendtest"));
+ ASSERT_STREQ(merge_op->Name(), StringAppendTESTOperator::kClassName());
+ delimiter = merge_op->GetOptions<std::string>("Delimiter");
+ ASSERT_NE(delimiter, nullptr);
+ ASSERT_EQ(*delimiter, "&&");
+
+ std::shared_ptr<MergeOperator> copy;
+ std::string mismatch;
+ std::string opts_str = merge_op->ToString(config_options);
+
+ ASSERT_OK(MergeOperator::CreateFromString(config_options, opts_str, ©));
+ ASSERT_TRUE(merge_op->AreEquivalent(config_options, copy.get(), &mismatch));
+ ASSERT_NE(copy, nullptr);
+ delimiter = copy->GetOptions<std::string>("Delimiter");
+ ASSERT_NE(delimiter, nullptr);
+ ASSERT_EQ(*delimiter, "&&");
+}
#endif // !ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE
utilities/cassandra/format.cc \
utilities/cassandra/merge_operator.cc \
utilities/checkpoint/checkpoint_impl.cc \
+ utilities/compaction_filters.cc \
utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc \
utilities/convenience/info_log_finder.cc \
utilities/debug.cc \
utilities/fault_injection_fs.cc \
utilities/leveldb_options/leveldb_options.cc \
utilities/memory/memory_util.cc \
+ utilities/merge_operators.cc \
utilities/merge_operators/max.cc \
utilities/merge_operators/put.cc \
utilities/merge_operators/sortlist.cc \
Logger* /*logger*/) const override {
return false;
}
+ static const char* kClassName() { return "ChanglingMergeOperator"; }
+ virtual bool IsInstanceOf(const std::string& id) const override {
+ if (id == kClassName()) {
+ return true;
+ } else {
+ return MergeOperator::IsInstanceOf(id);
+ }
+ }
+
virtual const char* Name() const override { return name_.c_str(); }
protected:
return false;
}
+ static const char* kClassName() { return "ChanglingCompactionFilter"; }
+ virtual bool IsInstanceOf(const std::string& id) const override {
+ if (id == kClassName()) {
+ return true;
+ } else {
+ return CompactionFilter::IsInstanceOf(id);
+ }
+ }
+
const char* Name() const override { return name_.c_str(); }
private:
// Returns a name that identifies this compaction filter factory.
const char* Name() const override { return name_.c_str(); }
+ static const char* kClassName() { return "ChanglingCompactionFilterFactory"; }
+ virtual bool IsInstanceOf(const std::string& id) const override {
+ if (id == kClassName()) {
+ return true;
+ } else {
+ return CompactionFilterFactory::IsInstanceOf(id);
+ }
+ }
protected:
std::string name_;
void InitializeOptionsFromFlags(Options* opts) {
printf("Initializing RocksDB Options from command-line flags\n");
Options& options = *opts;
+ ConfigOptions config_options(options);
assert(db_.db == nullptr);
options.wal_bytes_per_sync = FLAGS_wal_bytes_per_sync;
// merge operator options
- options.merge_operator = MergeOperators::CreateFromStringId(
- FLAGS_merge_operator);
- if (options.merge_operator == nullptr && !FLAGS_merge_operator.empty()) {
- fprintf(stderr, "invalid merge operator: %s\n",
- FLAGS_merge_operator.c_str());
- exit(1);
+ if (!FLAGS_merge_operator.empty()) {
+ Status s = MergeOperator::CreateFromString(
+ config_options, FLAGS_merge_operator, &options.merge_operator);
+ if (!s.ok()) {
+ fprintf(stderr, "invalid merge operator[%s]: %s\n",
+ FLAGS_merge_operator.c_str(), s.ToString().c_str());
+ exit(1);
+ }
}
options.max_successive_merges = FLAGS_max_successive_merges;
options.report_bg_io_stats = FLAGS_report_bg_io_stats;
// (found in the LICENSE.Apache file in the root directory).
#include "utilities/cassandra/cassandra_compaction_filter.h"
+
#include <string>
+
#include "rocksdb/slice.h"
+#include "rocksdb/utilities/object_registry.h"
+#include "rocksdb/utilities/options_type.h"
#include "utilities/cassandra/format.h"
+#include "utilities/cassandra/merge_operator.h"
namespace ROCKSDB_NAMESPACE {
namespace cassandra {
+static std::unordered_map<std::string, OptionTypeInfo>
+ cassandra_filter_type_info = {
+#ifndef ROCKSDB_LITE
+ {"purge_ttl_on_expiration",
+ {offsetof(struct CassandraOptions, purge_ttl_on_expiration),
+ OptionType::kBoolean, OptionVerificationType::kNormal,
+ OptionTypeFlags::kNone}},
+ {"gc_grace_period_in_seconds",
+ {offsetof(struct CassandraOptions, gc_grace_period_in_seconds),
+ OptionType::kUInt32T, OptionVerificationType::kNormal,
+ OptionTypeFlags::kNone}},
+#endif // ROCKSDB_LITE
+};
-const char* CassandraCompactionFilter::Name() const {
- return "CassandraCompactionFilter";
+CassandraCompactionFilter::CassandraCompactionFilter(
+ bool purge_ttl_on_expiration, int32_t gc_grace_period_in_seconds)
+ : options_(gc_grace_period_in_seconds, 0, purge_ttl_on_expiration) {
+ RegisterOptions(&options_, &cassandra_filter_type_info);
}
CompactionFilter::Decision CassandraCompactionFilter::FilterV2(
RowValue row_value = RowValue::Deserialize(
existing_value.data(), existing_value.size());
RowValue compacted =
- purge_ttl_on_expiration_
+ options_.purge_ttl_on_expiration
? row_value.RemoveExpiredColumns(&value_changed)
: row_value.ConvertExpiredColumnsToTombstones(&value_changed);
if (value_type == ValueType::kValue) {
- compacted = compacted.RemoveTombstones(gc_grace_period_in_seconds_);
+ compacted = compacted.RemoveTombstones(options_.gc_grace_period_in_seconds);
}
if(compacted.Empty()) {
return Decision::kKeep;
}
+CassandraCompactionFilterFactory::CassandraCompactionFilterFactory(
+ bool purge_ttl_on_expiration, int32_t gc_grace_period_in_seconds)
+ : options_(gc_grace_period_in_seconds, 0, purge_ttl_on_expiration) {
+ RegisterOptions(&options_, &cassandra_filter_type_info);
+}
+
+std::unique_ptr<CompactionFilter>
+CassandraCompactionFilterFactory::CreateCompactionFilter(
+ const CompactionFilter::Context&) {
+ std::unique_ptr<CompactionFilter> result(new CassandraCompactionFilter(
+ options_.purge_ttl_on_expiration, options_.gc_grace_period_in_seconds));
+ return result;
+}
+
+#ifndef ROCKSDB_LITE
+int RegisterCassandraObjects(ObjectLibrary& library,
+ const std::string& /*arg*/) {
+ library.Register<MergeOperator>(
+ CassandraValueMergeOperator::kClassName(),
+ [](const std::string& /*uri*/, std::unique_ptr<MergeOperator>* guard,
+ std::string* /* errmsg */) {
+ guard->reset(new CassandraValueMergeOperator(0));
+ return guard->get();
+ });
+ library.Register<CompactionFilter>(
+ CassandraCompactionFilter::kClassName(),
+ [](const std::string& /*uri*/,
+ std::unique_ptr<CompactionFilter>* /*guard */,
+ std::string* /* errmsg */) {
+ return new CassandraCompactionFilter(false, 0);
+ });
+ library.Register<CompactionFilterFactory>(
+ CassandraCompactionFilterFactory::kClassName(),
+ [](const std::string& /*uri*/,
+ std::unique_ptr<CompactionFilterFactory>* guard,
+ std::string* /* errmsg */) {
+ guard->reset(new CassandraCompactionFilterFactory(false, 0));
+ return guard->get();
+ });
+ size_t num_types;
+ return static_cast<int>(library.GetFactoryCount(&num_types));
+}
+#endif // ROCKSDB_LITE
} // namespace cassandra
} // namespace ROCKSDB_NAMESPACE
#pragma once
#include <string>
+
#include "rocksdb/compaction_filter.h"
#include "rocksdb/slice.h"
+#include "utilities/cassandra/cassandra_options.h"
namespace ROCKSDB_NAMESPACE {
namespace cassandra {
class CassandraCompactionFilter : public CompactionFilter {
public:
explicit CassandraCompactionFilter(bool purge_ttl_on_expiration,
- int32_t gc_grace_period_in_seconds)
- : purge_ttl_on_expiration_(purge_ttl_on_expiration),
- gc_grace_period_in_seconds_(gc_grace_period_in_seconds) {}
+ int32_t gc_grace_period_in_seconds);
+ static const char* kClassName() { return "CassandraCompactionFilter"; }
+ const char* Name() const override { return kClassName(); }
- const char* Name() const override;
virtual Decision FilterV2(int level, const Slice& key, ValueType value_type,
const Slice& existing_value, std::string* new_value,
std::string* skip_until) const override;
private:
- bool purge_ttl_on_expiration_;
- int32_t gc_grace_period_in_seconds_;
+ CassandraOptions options_;
+};
+
+class CassandraCompactionFilterFactory : public CompactionFilterFactory {
+ public:
+ explicit CassandraCompactionFilterFactory(bool purge_ttl_on_expiration,
+ int32_t gc_grace_period_in_seconds);
+ ~CassandraCompactionFilterFactory() override {}
+
+ std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+ const CompactionFilter::Context& context) override;
+ static const char* kClassName() { return "CassandraCompactionFilterFactory"; }
+ const char* Name() const override { return kClassName(); }
+
+ private:
+ CassandraOptions options_;
};
} // namespace cassandra
} // namespace ROCKSDB_NAMESPACE
#include <iostream>
#include "db/db_impl/db_impl.h"
+#include "rocksdb/convenience.h"
#include "rocksdb/db.h"
#include "rocksdb/merge_operator.h"
-#include "rocksdb/utilities/db_ttl.h"
+#include "rocksdb/utilities/object_registry.h"
#include "test_util/testharness.h"
#include "util/cast_util.h"
#include "util/random.h"
ASSERT_FALSE(std::get<0>(store.Get("k1")));
}
+#ifndef ROCKSDB_LITE
+TEST_F(CassandraFunctionalTest, LoadMergeOperator) {
+ ConfigOptions config_options;
+ std::shared_ptr<MergeOperator> mo;
+ config_options.ignore_unsupported_options = false;
+
+ ASSERT_NOK(MergeOperator::CreateFromString(
+ config_options, CassandraValueMergeOperator::kClassName(), &mo));
+
+ config_options.registry->AddLibrary("cassandra", RegisterCassandraObjects,
+ "cassandra");
+
+ ASSERT_OK(MergeOperator::CreateFromString(
+ config_options, CassandraValueMergeOperator::kClassName(), &mo));
+ ASSERT_NE(mo, nullptr);
+ ASSERT_STREQ(mo->Name(), CassandraValueMergeOperator::kClassName());
+ mo.reset();
+ ASSERT_OK(MergeOperator::CreateFromString(
+ config_options,
+ std::string("operands_limit=20;gc_grace_period_in_seconds=42;id=") +
+ CassandraValueMergeOperator::kClassName(),
+ &mo));
+ ASSERT_NE(mo, nullptr);
+ ASSERT_STREQ(mo->Name(), CassandraValueMergeOperator::kClassName());
+ const auto* opts = mo->GetOptions<CassandraOptions>();
+ ASSERT_NE(opts, nullptr);
+ ASSERT_EQ(opts->gc_grace_period_in_seconds, 42);
+ ASSERT_EQ(opts->operands_limit, 20);
+}
+
+TEST_F(CassandraFunctionalTest, LoadCompactionFilter) {
+ ConfigOptions config_options;
+ const CompactionFilter* filter = nullptr;
+ config_options.ignore_unsupported_options = false;
+
+ ASSERT_NOK(CompactionFilter::CreateFromString(
+ config_options, CassandraCompactionFilter::kClassName(), &filter));
+ config_options.registry->AddLibrary("cassandra", RegisterCassandraObjects,
+ "cassandra");
+
+ ASSERT_OK(CompactionFilter::CreateFromString(
+ config_options, CassandraCompactionFilter::kClassName(), &filter));
+ ASSERT_NE(filter, nullptr);
+ ASSERT_STREQ(filter->Name(), CassandraCompactionFilter::kClassName());
+ delete filter;
+ filter = nullptr;
+ ASSERT_OK(CompactionFilter::CreateFromString(
+ config_options,
+ std::string(
+ "purge_ttl_on_expiration=true;gc_grace_period_in_seconds=42;id=") +
+ CassandraCompactionFilter::kClassName(),
+ &filter));
+ ASSERT_NE(filter, nullptr);
+ ASSERT_STREQ(filter->Name(), CassandraCompactionFilter::kClassName());
+ const auto* opts = filter->GetOptions<CassandraOptions>();
+ ASSERT_NE(opts, nullptr);
+ ASSERT_EQ(opts->gc_grace_period_in_seconds, 42);
+ ASSERT_TRUE(opts->purge_ttl_on_expiration);
+ delete filter;
+}
+
+TEST_F(CassandraFunctionalTest, LoadCompactionFilterFactory) {
+ ConfigOptions config_options;
+ std::shared_ptr<CompactionFilterFactory> factory;
+
+ config_options.ignore_unsupported_options = false;
+ ASSERT_NOK(CompactionFilterFactory::CreateFromString(
+ config_options, CassandraCompactionFilterFactory::kClassName(),
+ &factory));
+ config_options.registry->AddLibrary("cassandra", RegisterCassandraObjects,
+ "cassandra");
+
+ ASSERT_OK(CompactionFilterFactory::CreateFromString(
+ config_options, CassandraCompactionFilterFactory::kClassName(),
+ &factory));
+ ASSERT_NE(factory, nullptr);
+ ASSERT_STREQ(factory->Name(), CassandraCompactionFilterFactory::kClassName());
+ factory.reset();
+ ASSERT_OK(CompactionFilterFactory::CreateFromString(
+ config_options,
+ std::string(
+ "purge_ttl_on_expiration=true;gc_grace_period_in_seconds=42;id=") +
+ CassandraCompactionFilterFactory::kClassName(),
+ &factory));
+ ASSERT_NE(factory, nullptr);
+ ASSERT_STREQ(factory->Name(), CassandraCompactionFilterFactory::kClassName());
+ const auto* opts = factory->GetOptions<CassandraOptions>();
+ ASSERT_NE(opts, nullptr);
+ ASSERT_EQ(opts->gc_grace_period_in_seconds, 42);
+ ASSERT_TRUE(opts->purge_ttl_on_expiration);
+}
+#endif // ROCKSDB_LITE
+
} // namespace cassandra
} // namespace ROCKSDB_NAMESPACE
--- /dev/null
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+#include <cinttypes>
+
+#include "rocksdb/rocksdb_namespace.h"
+
+namespace ROCKSDB_NAMESPACE {
+class ObjectLibrary;
+namespace cassandra {
+struct CassandraOptions {
+ static const char* kName() { return "CassandraOptions"; }
+ CassandraOptions(int32_t _gc_grace_period_in_seconds, size_t _operands_limit,
+ bool _purge_ttl_on_expiration = false)
+ : operands_limit(_operands_limit),
+ gc_grace_period_in_seconds(_gc_grace_period_in_seconds),
+ purge_ttl_on_expiration(_purge_ttl_on_expiration) {}
+ // Limit on the number of merge operands.
+ size_t operands_limit;
+
+ // How long (in seconds) tombstoned data remains before it is purged
+ int32_t gc_grace_period_in_seconds;
+
+ // If is set to true, expired data will be directly purged.
+ // Otherwise expired data will be converted tombstones first,
+ // then be eventually removed after gc grace period. This value should
+ // only true if all writes have same ttl setting, otherwise it could bring old
+ // data back.
+ bool purge_ttl_on_expiration;
+};
+#ifndef ROCKSDB_LITE
+extern "C" {
+int RegisterCassandraObjects(ObjectLibrary& library, const std::string& arg);
+} // extern "C"
+#endif // ROCKSDB_LITE
+} // namespace cassandra
+} // namespace ROCKSDB_NAMESPACE
#include "merge_operator.h"
-#include <memory>
#include <assert.h>
-#include "rocksdb/slice.h"
+#include <memory>
+
#include "rocksdb/merge_operator.h"
-#include "utilities/merge_operators.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/utilities/options_type.h"
#include "utilities/cassandra/format.h"
+#include "utilities/merge_operators.h"
namespace ROCKSDB_NAMESPACE {
namespace cassandra {
+static std::unordered_map<std::string, OptionTypeInfo>
+ merge_operator_options_info = {
+#ifndef ROCKSDB_LITE
+ {"gc_grace_period_in_seconds",
+ {offsetof(struct CassandraOptions, gc_grace_period_in_seconds),
+ OptionType::kUInt32T, OptionVerificationType::kNormal,
+ OptionTypeFlags::kNone}},
+ {"operands_limit",
+ {offsetof(struct CassandraOptions, operands_limit), OptionType::kSizeT,
+ OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
+#endif // ROCKSDB_LITE
+};
+
+CassandraValueMergeOperator::CassandraValueMergeOperator(
+ int32_t gc_grace_period_in_seconds, size_t operands_limit)
+ : options_(gc_grace_period_in_seconds, operands_limit) {
+ RegisterOptions(&options_, &merge_operator_options_info);
+}
// Implementation for the merge operation (merges two Cassandra values)
bool CassandraValueMergeOperator::FullMergeV2(
}
RowValue merged = RowValue::Merge(std::move(row_values));
- merged = merged.RemoveTombstones(gc_grace_period_in_seconds_);
+ merged = merged.RemoveTombstones(options_.gc_grace_period_in_seconds);
merge_out->new_value.reserve(merged.Size());
merged.Serialize(&(merge_out->new_value));
return true;
}
-const char* CassandraValueMergeOperator::Name() const {
- return "CassandraValueMergeOperator";
-}
-
} // namespace cassandra
} // namespace ROCKSDB_NAMESPACE
#pragma once
#include "rocksdb/merge_operator.h"
#include "rocksdb/slice.h"
+#include "utilities/cassandra/cassandra_options.h"
namespace ROCKSDB_NAMESPACE {
namespace cassandra {
class CassandraValueMergeOperator : public MergeOperator {
public:
explicit CassandraValueMergeOperator(int32_t gc_grace_period_in_seconds,
- size_t operands_limit = 0)
- : gc_grace_period_in_seconds_(gc_grace_period_in_seconds),
- operands_limit_(operands_limit) {}
+ size_t operands_limit = 0);
virtual bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override;
std::string* new_value,
Logger* logger) const override;
- virtual const char* Name() const override;
+ const char* Name() const override { return kClassName(); }
+ static const char* kClassName() { return "CassandraValueMergeOperator"; }
virtual bool AllowSingleOperand() const override { return true; }
virtual bool ShouldMerge(const std::vector<Slice>& operands) const override {
- return operands_limit_ > 0 && operands.size() >= operands_limit_;
+ return options_.operands_limit > 0 &&
+ operands.size() >= options_.operands_limit;
}
private:
- int32_t gc_grace_period_in_seconds_;
- size_t operands_limit_;
+ CassandraOptions options_;
};
} // namespace cassandra
} // namespace ROCKSDB_NAMESPACE
--- /dev/null
+// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include <memory>
+
+#include "rocksdb/compaction_filter.h"
+#include "rocksdb/options.h"
+#include "rocksdb/utilities/customizable_util.h"
+#include "rocksdb/utilities/options_type.h"
+#include "utilities/compaction_filters/layered_compaction_filter_base.h"
+#include "utilities/compaction_filters/remove_emptyvalue_compactionfilter.h"
+
+namespace ROCKSDB_NAMESPACE {
+#ifndef ROCKSDB_LITE
+static int RegisterBuiltinCompactionFilters(ObjectLibrary& library,
+ const std::string& /*arg*/) {
+ library.Register<CompactionFilter>(
+ RemoveEmptyValueCompactionFilter::kClassName(),
+ [](const std::string& /*uri*/,
+ std::unique_ptr<CompactionFilter>* /*guard*/,
+ std::string* /*errmsg*/) {
+ return new RemoveEmptyValueCompactionFilter();
+ });
+ return 1;
+}
+#endif // ROCKSDB_LITE
+Status CompactionFilter::CreateFromString(const ConfigOptions& config_options,
+ const std::string& value,
+ const CompactionFilter** result) {
+#ifndef ROCKSDB_LITE
+ static std::once_flag once;
+ std::call_once(once, [&]() {
+ RegisterBuiltinCompactionFilters(*(ObjectLibrary::Default().get()), "");
+ });
+#endif // ROCKSDB_LITE
+ CompactionFilter* filter = const_cast<CompactionFilter*>(*result);
+ Status status = LoadStaticObject<CompactionFilter>(config_options, value,
+ nullptr, &filter);
+ if (status.ok()) {
+ *result = const_cast<CompactionFilter*>(filter);
+ }
+ return status;
+}
+
+Status CompactionFilterFactory::CreateFromString(
+ const ConfigOptions& config_options, const std::string& value,
+ std::shared_ptr<CompactionFilterFactory>* result) {
+ // Currently there are no builtin CompactionFilterFactories.
+ // If any are introduced, they need to be registered here.
+ Status status = LoadSharedObject<CompactionFilterFactory>(
+ config_options, value, nullptr, result);
+ return status;
+}
+} // namespace ROCKSDB_NAMESPACE
namespace ROCKSDB_NAMESPACE {
-// Abstract base class for building layered compation filter on top of
+// Abstract base class for building layered compaction filter on top of
// user compaction filter.
// See BlobIndexCompactionFilter or TtlCompactionFilter for a basic usage.
class LayeredCompactionFilterBase : public CompactionFilter {
// Return a pointer to user compaction filter
const CompactionFilter* user_comp_filter() const { return user_comp_filter_; }
- private:
+ const Customizable* Inner() const override { return user_comp_filter_; }
+
+ protected:
const CompactionFilter* user_comp_filter_;
+
+ private:
std::unique_ptr<const CompactionFilter> user_comp_filter_from_factory_;
};
namespace ROCKSDB_NAMESPACE {
-const char* RemoveEmptyValueCompactionFilter::Name() const {
- return "RemoveEmptyValueCompactionFilter";
-}
-
bool RemoveEmptyValueCompactionFilter::Filter(int /*level*/,
const Slice& /*key*/,
const Slice& existing_value,
class RemoveEmptyValueCompactionFilter : public CompactionFilter {
public:
- const char* Name() const override;
- bool Filter(int level,
- const Slice& key,
- const Slice& existing_value,
- std::string* new_value,
- bool* value_changed) const override;
+ static const char* kClassName() { return "RemoveEmptyValueCompactionFilter"; }
+
+ const char* Name() const override { return kClassName(); }
+
+ bool Filter(int level, const Slice& key, const Slice& existing_value,
+ std::string* new_value, bool* value_changed) const override;
};
+
} // namespace ROCKSDB_NAMESPACE
#endif // !ROCKSDB_LITE
--- /dev/null
+// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "utilities/merge_operators.h"
+
+#include <memory>
+
+#include "rocksdb/merge_operator.h"
+#include "rocksdb/options.h"
+#include "rocksdb/utilities/customizable_util.h"
+#include "rocksdb/utilities/object_registry.h"
+#include "utilities/merge_operators/bytesxor.h"
+#include "utilities/merge_operators/sortlist.h"
+#include "utilities/merge_operators/string_append/stringappend.h"
+#include "utilities/merge_operators/string_append/stringappend2.h"
+
+namespace ROCKSDB_NAMESPACE {
+static bool LoadMergeOperator(const std::string& id,
+ std::shared_ptr<MergeOperator>* result) {
+ bool success = true;
+ // TODO: Hook the "name" up to the actual Name() of the MergeOperators?
+ // Requires these classes be moved into a header file...
+ if (id == "put" || id == "PutOperator") {
+ *result = MergeOperators::CreatePutOperator();
+ } else if (id == "put_v1") {
+ *result = MergeOperators::CreateDeprecatedPutOperator();
+ } else if (id == "uint64add" || id == "UInt64AddOperator") {
+ *result = MergeOperators::CreateUInt64AddOperator();
+ } else if (id == "max" || id == "MaxOperator") {
+ *result = MergeOperators::CreateMaxOperator();
+#ifdef ROCKSDB_LITE
+ // The remainder of the classes are handled by the ObjectRegistry in
+ // non-LITE mode
+ } else if (id == StringAppendOperator::kNickName() ||
+ id == StringAppendOperator::kClassName()) {
+ *result = MergeOperators::CreateStringAppendOperator();
+ } else if (id == StringAppendTESTOperator::kNickName() ||
+ id == StringAppendTESTOperator::kClassName()) {
+ *result = MergeOperators::CreateStringAppendTESTOperator();
+ } else if (id == BytesXOROperator::kNickName() ||
+ id == BytesXOROperator::kClassName()) {
+ *result = MergeOperators::CreateBytesXOROperator();
+ } else if (id == SortList::kNickName() || id == SortList::kClassName()) {
+ *result = MergeOperators::CreateSortOperator();
+#endif // ROCKSDB_LITE
+ } else {
+ success = false;
+ }
+ return success;
+}
+
+#ifndef ROCKSDB_LITE
+static int RegisterBuiltinMergeOperators(ObjectLibrary& library,
+ const std::string& /*arg*/) {
+ size_t num_types;
+ auto AsRegex = [](const std::string& name, const std::string& alt) {
+ std::string regex;
+ regex.append("(").append(name);
+ regex.append("|").append(alt).append(")");
+ return regex;
+ };
+
+ library.Register<MergeOperator>(
+ AsRegex(StringAppendOperator::kClassName(),
+ StringAppendOperator::kNickName()),
+ [](const std::string& /*uri*/, std::unique_ptr<MergeOperator>* guard,
+ std::string* /*errmsg*/) {
+ guard->reset(new StringAppendOperator(","));
+ return guard->get();
+ });
+ library.Register<MergeOperator>(
+ AsRegex(StringAppendTESTOperator::kClassName(),
+ StringAppendTESTOperator::kNickName()),
+ [](const std::string& /*uri*/, std::unique_ptr<MergeOperator>* guard,
+ std::string* /*errmsg*/) {
+ guard->reset(new StringAppendTESTOperator(","));
+ return guard->get();
+ });
+ library.Register<MergeOperator>(
+ AsRegex(SortList::kClassName(), SortList::kNickName()),
+ [](const std::string& /*uri*/, std::unique_ptr<MergeOperator>* guard,
+ std::string* /*errmsg*/) {
+ guard->reset(new SortList());
+ return guard->get();
+ });
+ library.Register<MergeOperator>(
+ AsRegex(BytesXOROperator::kClassName(), BytesXOROperator::kNickName()),
+ [](const std::string& /*uri*/, std::unique_ptr<MergeOperator>* guard,
+ std::string* /*errmsg*/) {
+ guard->reset(new BytesXOROperator());
+ return guard->get();
+ });
+
+ return static_cast<int>(library.GetFactoryCount(&num_types));
+}
+#endif // ROCKSDB_LITE
+
+Status MergeOperator::CreateFromString(const ConfigOptions& config_options,
+ const std::string& value,
+ std::shared_ptr<MergeOperator>* result) {
+#ifndef ROCKSDB_LITE
+ static std::once_flag once;
+ std::call_once(once, [&]() {
+ RegisterBuiltinMergeOperators(*(ObjectLibrary::Default().get()), "");
+ });
+#endif // ROCKSDB_LITE
+ return LoadSharedObject<MergeOperator>(config_options, value,
+ LoadMergeOperator, result);
+}
+
+std::shared_ptr<MergeOperator> MergeOperators::CreateFromStringId(
+ const std::string& id) {
+ std::shared_ptr<MergeOperator> result;
+ Status s = MergeOperator::CreateFromString(ConfigOptions(), id, &result);
+ if (s.ok()) {
+ return result;
+ } else {
+ // Empty or unknown, just return nullptr
+ return nullptr;
+ }
+}
+
+} // namespace ROCKSDB_NAMESPACE
static std::shared_ptr<MergeOperator> CreateSortOperator();
// Will return a different merge operator depending on the string.
- // TODO: Hook the "name" up to the actual Name() of the MergeOperators?
static std::shared_ptr<MergeOperator> CreateFromStringId(
- const std::string& name) {
- if (name == "put") {
- return CreatePutOperator();
- } else if (name == "put_v1") {
- return CreateDeprecatedPutOperator();
- } else if ( name == "uint64add") {
- return CreateUInt64AddOperator();
- } else if (name == "stringappend") {
- return CreateStringAppendOperator();
- } else if (name == "stringappendtest") {
- return CreateStringAppendTESTOperator();
- } else if (name == "max") {
- return CreateMaxOperator();
- } else if (name == "bytesxor") {
- return CreateBytesXOROperator();
- } else if (name == "sortlist") {
- return CreateSortOperator();
- } else {
- // Empty or unknown, just return nullptr
- return nullptr;
- }
- }
+ const std::string& name);
};
} // namespace ROCKSDB_NAMESPACE
std::string* new_value,
Logger* logger) const override;
- virtual const char* Name() const override {
- return "BytesXOR";
- }
+ static const char* kClassName() { return "BytesXOR"; }
+ static const char* kNickName() { return "bytesxor"; }
+
+ const char* NickName() const override { return kNickName(); }
+ const char* Name() const override { return kClassName(); }
void XOR(const Slice* existing_value, const Slice& value,
std::string* new_value) const;
return true;
}
- const char* Name() const override { return "MaxOperator"; }
+ static const char* kClassName() { return "MaxOperator"; }
+ static const char* kNickName() { return "max"; }
+ const char* Name() const override { return kClassName(); }
+ const char* NickName() const override { return kNickName(); }
};
} // end of anonymous namespace
return true;
}
- const char* Name() const override { return "PutOperator"; }
+ static const char* kClassName() { return "PutOperator"; }
+ static const char* kNickName() { return "put_v1"; }
+ const char* Name() const override { return kClassName(); }
+ const char* NickName() const override { return kNickName(); }
};
class PutOperatorV2 : public PutOperator {
merge_out->existing_operand = merge_in.operand_list.back();
return true;
}
+
+ static const char* kNickName() { return "put"; }
+ const char* NickName() const override { return kNickName(); }
};
} // end of anonymous namespace
return true;
}
-const char* SortList::Name() const { return "MergeSortOperator"; }
-
void SortList::MakeVector(std::vector<int>& operand, Slice slice) const {
do {
const char* begin = slice.data_;
const std::deque<Slice>& operand_list,
std::string* new_value, Logger* logger) const override;
- const char* Name() const override;
+ static const char* kClassName() { return "MergeSortOperator"; }
+ static const char* kNickName() { return "sortlist"; }
+
+ const char* Name() const override { return kClassName(); }
+ const char* NickName() const override { return kNickName(); }
void MakeVector(std::vector<int>& operand, Slice slice) const;
#include "stringappend.h"
-#include <memory>
#include <assert.h>
-#include "rocksdb/slice.h"
+#include <memory>
+
#include "rocksdb/merge_operator.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/utilities/options_type.h"
#include "utilities/merge_operators.h"
namespace ROCKSDB_NAMESPACE {
-
+namespace {
+static std::unordered_map<std::string, OptionTypeInfo>
+ stringappend_merge_type_info = {
+#ifndef ROCKSDB_LITE
+ {"delimiter",
+ {0, OptionType::kString, OptionVerificationType::kNormal,
+ OptionTypeFlags::kNone}},
+#endif // ROCKSDB_LITE
+};
+} // namespace
// Constructor: also specify the delimiter character.
StringAppendOperator::StringAppendOperator(char delim_char)
- : delim_(1, delim_char) {}
+ : delim_(1, delim_char) {
+ RegisterOptions("Delimiter", &delim_, &stringappend_merge_type_info);
+}
StringAppendOperator::StringAppendOperator(const std::string& delim)
- : delim_(delim) {}
+ : delim_(delim) {
+ RegisterOptions("Delimiter", &delim_, &stringappend_merge_type_info);
+}
// Implementation for the merge operation (concatenates two strings)
bool StringAppendOperator::Merge(const Slice& /*key*/,
return true;
}
-const char* StringAppendOperator::Name() const {
- return "StringAppendOperator";
-}
std::shared_ptr<MergeOperator> MergeOperators::CreateStringAppendOperator() {
return std::make_shared<StringAppendOperator>(',');
std::string* new_value,
Logger* logger) const override;
- virtual const char* Name() const override;
+ static const char* kClassName() { return "StringAppendOperator"; }
+ static const char* kNickName() { return "stringappend"; }
+ virtual const char* Name() const override { return kClassName(); }
+ virtual const char* NickName() const override { return kNickName(); }
private:
std::string delim_; // The delimiter is inserted between elements
#include "stringappend2.h"
+#include <assert.h>
+
#include <memory>
#include <string>
-#include <assert.h>
-#include "rocksdb/slice.h"
#include "rocksdb/merge_operator.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/utilities/options_type.h"
#include "utilities/merge_operators.h"
namespace ROCKSDB_NAMESPACE {
+namespace {
+static std::unordered_map<std::string, OptionTypeInfo>
+ stringappend2_merge_type_info = {
+#ifndef ROCKSDB_LITE
+ {"delimiter",
+ {0, OptionType::kString, OptionVerificationType::kNormal,
+ OptionTypeFlags::kNone}},
+#endif // ROCKSDB_LITE
+};
+} // namespace
// Constructor: also specify the delimiter character.
StringAppendTESTOperator::StringAppendTESTOperator(char delim_char)
- : delim_(1, delim_char) {}
+ : delim_(1, delim_char) {
+ RegisterOptions("Delimiter", &delim_, &stringappend2_merge_type_info);
+}
StringAppendTESTOperator::StringAppendTESTOperator(const std::string& delim)
- : delim_(delim) {}
+ : delim_(delim) {
+ RegisterOptions("Delimiter", &delim_, &stringappend2_merge_type_info);
+}
// Implementation for the merge operation (concatenates two strings)
bool StringAppendTESTOperator::FullMergeV2(
// Compute the space needed for the final result.
size_t numBytes = 0;
+
for (auto it = merge_in.operand_list.begin();
it != merge_in.operand_list.end(); ++it) {
numBytes += it->size() + delim_.size();
return true;
}
-const char* StringAppendTESTOperator::Name() const {
- return "StringAppendTESTOperator";
-}
-
-
std::shared_ptr<MergeOperator>
MergeOperators::CreateStringAppendTESTOperator() {
return std::make_shared<StringAppendTESTOperator>(',');
std::string* new_value, Logger* logger) const
override;
- virtual const char* Name() const override;
+ static const char* kClassName() { return "StringAppendTESTOperator"; }
+ static const char* kNickName() { return "stringappendtest"; }
+ const char* Name() const override { return kClassName(); }
+ const char* NickName() const override { return kNickName(); }
private:
// A version of PartialMerge that actually performs "partial merging".
return true; // Return true always since corruption will be treated as 0
}
- const char* Name() const override { return "UInt64AddOperator"; }
+ static const char* kClassName() { return "UInt64AddOperator"; }
+ static const char* kNickName() { return "uint64add"; }
+ const char* Name() const override { return kClassName(); }
+ const char* NickName() const override { return kNickName(); }
private:
// Takes the string and decodes it into a uint64_t
#include "rocksdb/iterator.h"
#include "rocksdb/system_clock.h"
#include "rocksdb/utilities/db_ttl.h"
+#include "rocksdb/utilities/object_registry.h"
+#include "rocksdb/utilities/options_type.h"
#include "util/coding.h"
namespace ROCKSDB_NAMESPACE {
+static std::unordered_map<std::string, OptionTypeInfo> ttl_merge_op_type_info =
+ {{"user_operator",
+ OptionTypeInfo::AsCustomSharedPtr<MergeOperator>(
+ 0, OptionVerificationType::kByName, OptionTypeFlags::kNone)}};
+
+TtlMergeOperator::TtlMergeOperator(
+ const std::shared_ptr<MergeOperator>& merge_op, SystemClock* clock)
+ : user_merge_op_(merge_op), clock_(clock) {
+ RegisterOptions("TtlMergeOptions", &user_merge_op_, &ttl_merge_op_type_info);
+}
+
+bool TtlMergeOperator::FullMergeV2(const MergeOperationInput& merge_in,
+ MergeOperationOutput* merge_out) const {
+ const uint32_t ts_len = DBWithTTLImpl::kTSLength;
+ if (merge_in.existing_value && merge_in.existing_value->size() < ts_len) {
+ ROCKS_LOG_ERROR(merge_in.logger,
+ "Error: Could not remove timestamp from existing value.");
+ return false;
+ }
+
+ // Extract time-stamp from each operand to be passed to user_merge_op_
+ std::vector<Slice> operands_without_ts;
+ for (const auto& operand : merge_in.operand_list) {
+ if (operand.size() < ts_len) {
+ ROCKS_LOG_ERROR(merge_in.logger,
+ "Error: Could not remove timestamp from operand value.");
+ return false;
+ }
+ operands_without_ts.push_back(operand);
+ operands_without_ts.back().remove_suffix(ts_len);
+ }
+
+ // Apply the user merge operator (store result in *new_value)
+ bool good = true;
+ MergeOperationOutput user_merge_out(merge_out->new_value,
+ merge_out->existing_operand);
+ if (merge_in.existing_value) {
+ Slice existing_value_without_ts(merge_in.existing_value->data(),
+ merge_in.existing_value->size() - ts_len);
+ good = user_merge_op_->FullMergeV2(
+ MergeOperationInput(merge_in.key, &existing_value_without_ts,
+ operands_without_ts, merge_in.logger),
+ &user_merge_out);
+ } else {
+ good = user_merge_op_->FullMergeV2(
+ MergeOperationInput(merge_in.key, nullptr, operands_without_ts,
+ merge_in.logger),
+ &user_merge_out);
+ }
+
+ // Return false if the user merge operator returned false
+ if (!good) {
+ return false;
+ }
+
+ if (merge_out->existing_operand.data()) {
+ merge_out->new_value.assign(merge_out->existing_operand.data(),
+ merge_out->existing_operand.size());
+ merge_out->existing_operand = Slice(nullptr, 0);
+ }
+
+ // Augment the *new_value with the ttl time-stamp
+ int64_t curtime;
+ if (!clock_->GetCurrentTime(&curtime).ok()) {
+ ROCKS_LOG_ERROR(
+ merge_in.logger,
+ "Error: Could not get current time to be attached internally "
+ "to the new value.");
+ return false;
+ } else {
+ char ts_string[ts_len];
+ EncodeFixed32(ts_string, (int32_t)curtime);
+ merge_out->new_value.append(ts_string, ts_len);
+ return true;
+ }
+}
+
+bool TtlMergeOperator::PartialMergeMulti(const Slice& key,
+ const std::deque<Slice>& operand_list,
+ std::string* new_value,
+ Logger* logger) const {
+ const uint32_t ts_len = DBWithTTLImpl::kTSLength;
+ std::deque<Slice> operands_without_ts;
+
+ for (const auto& operand : operand_list) {
+ if (operand.size() < ts_len) {
+ ROCKS_LOG_ERROR(logger, "Error: Could not remove timestamp from value.");
+ return false;
+ }
+
+ operands_without_ts.push_back(
+ Slice(operand.data(), operand.size() - ts_len));
+ }
+
+ // Apply the user partial-merge operator (store result in *new_value)
+ assert(new_value);
+ if (!user_merge_op_->PartialMergeMulti(key, operands_without_ts, new_value,
+ logger)) {
+ return false;
+ }
+
+ // Augment the *new_value with the ttl time-stamp
+ int64_t curtime;
+ if (!clock_->GetCurrentTime(&curtime).ok()) {
+ ROCKS_LOG_ERROR(
+ logger,
+ "Error: Could not get current time to be attached internally "
+ "to the new value.");
+ return false;
+ } else {
+ char ts_string[ts_len];
+ EncodeFixed32(ts_string, (int32_t)curtime);
+ new_value->append(ts_string, ts_len);
+ return true;
+ }
+}
+
+Status TtlMergeOperator::PrepareOptions(const ConfigOptions& config_options) {
+ if (clock_ == nullptr) {
+ clock_ = config_options.env->GetSystemClock().get();
+ }
+ return MergeOperator::PrepareOptions(config_options);
+}
+
+Status TtlMergeOperator::ValidateOptions(
+ const DBOptions& db_opts, const ColumnFamilyOptions& cf_opts) const {
+ if (user_merge_op_ == nullptr) {
+ return Status::InvalidArgument(
+ "UserMergeOperator required by TtlMergeOperator");
+ } else if (clock_ == nullptr) {
+ return Status::InvalidArgument("SystemClock required by TtlMergeOperator");
+ } else {
+ return MergeOperator::ValidateOptions(db_opts, cf_opts);
+ }
+}
void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options,
SystemClock* clock) {
}
}
+static std::unordered_map<std::string, OptionTypeInfo> ttl_type_info = {
+ {"ttl", {0, OptionType::kInt32T}},
+};
+
+static std::unordered_map<std::string, OptionTypeInfo> ttl_cff_type_info = {
+ {"user_filter_factory",
+ OptionTypeInfo::AsCustomSharedPtr<CompactionFilterFactory>(
+ 0, OptionVerificationType::kByNameAllowFromNull,
+ OptionTypeFlags::kNone)}};
+static std::unordered_map<std::string, OptionTypeInfo> user_cf_type_info = {
+ {"user_filter",
+ OptionTypeInfo::AsCustomRawPtr<const CompactionFilter>(
+ 0, OptionVerificationType::kByName, OptionTypeFlags::kAllowNull)}};
+
+TtlCompactionFilter::TtlCompactionFilter(
+ int32_t ttl, SystemClock* clock, const CompactionFilter* _user_comp_filter,
+ std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory)
+ : LayeredCompactionFilterBase(_user_comp_filter,
+ std::move(_user_comp_filter_from_factory)),
+ ttl_(ttl),
+ clock_(clock) {
+ RegisterOptions("TTL", &ttl_, &ttl_type_info);
+ RegisterOptions("UserFilter", &user_comp_filter_, &user_cf_type_info);
+}
+
+bool TtlCompactionFilter::Filter(int level, const Slice& key,
+ const Slice& old_val, std::string* new_val,
+ bool* value_changed) const {
+ if (DBWithTTLImpl::IsStale(old_val, ttl_, clock_)) {
+ return true;
+ }
+ if (user_comp_filter() == nullptr) {
+ return false;
+ }
+ assert(old_val.size() >= DBWithTTLImpl::kTSLength);
+ Slice old_val_without_ts(old_val.data(),
+ old_val.size() - DBWithTTLImpl::kTSLength);
+ if (user_comp_filter()->Filter(level, key, old_val_without_ts, new_val,
+ value_changed)) {
+ return true;
+ }
+ if (*value_changed) {
+ new_val->append(old_val.data() + old_val.size() - DBWithTTLImpl::kTSLength,
+ DBWithTTLImpl::kTSLength);
+ }
+ return false;
+}
+
+Status TtlCompactionFilter::PrepareOptions(
+ const ConfigOptions& config_options) {
+ if (clock_ == nullptr) {
+ clock_ = config_options.env->GetSystemClock().get();
+ }
+ return LayeredCompactionFilterBase::PrepareOptions(config_options);
+}
+
+Status TtlCompactionFilter::ValidateOptions(
+ const DBOptions& db_opts, const ColumnFamilyOptions& cf_opts) const {
+ if (clock_ == nullptr) {
+ return Status::InvalidArgument(
+ "SystemClock required by TtlCompactionFilter");
+ } else {
+ return LayeredCompactionFilterBase::ValidateOptions(db_opts, cf_opts);
+ }
+}
+
+TtlCompactionFilterFactory::TtlCompactionFilterFactory(
+ int32_t ttl, SystemClock* clock,
+ std::shared_ptr<CompactionFilterFactory> comp_filter_factory)
+ : ttl_(ttl), clock_(clock), user_comp_filter_factory_(comp_filter_factory) {
+ RegisterOptions("UserOptions", &user_comp_filter_factory_,
+ &ttl_cff_type_info);
+ RegisterOptions("TTL", &ttl_, &ttl_type_info);
+}
+
+std::unique_ptr<CompactionFilter>
+TtlCompactionFilterFactory::CreateCompactionFilter(
+ const CompactionFilter::Context& context) {
+ std::unique_ptr<const CompactionFilter> user_comp_filter_from_factory =
+ nullptr;
+ if (user_comp_filter_factory_) {
+ user_comp_filter_from_factory =
+ user_comp_filter_factory_->CreateCompactionFilter(context);
+ }
+
+ return std::unique_ptr<TtlCompactionFilter>(new TtlCompactionFilter(
+ ttl_, clock_, nullptr, std::move(user_comp_filter_from_factory)));
+}
+
+Status TtlCompactionFilterFactory::PrepareOptions(
+ const ConfigOptions& config_options) {
+ if (clock_ == nullptr) {
+ clock_ = config_options.env->GetSystemClock().get();
+ }
+ return CompactionFilterFactory::PrepareOptions(config_options);
+}
+
+Status TtlCompactionFilterFactory::ValidateOptions(
+ const DBOptions& db_opts, const ColumnFamilyOptions& cf_opts) const {
+ if (clock_ == nullptr) {
+ return Status::InvalidArgument(
+ "SystemClock required by TtlCompactionFilterFactory");
+ } else {
+ return CompactionFilterFactory::ValidateOptions(db_opts, cf_opts);
+ }
+}
+
+int RegisterTtlObjects(ObjectLibrary& library, const std::string& /*arg*/) {
+ library.Register<MergeOperator>(
+ TtlMergeOperator::kClassName(),
+ [](const std::string& /*uri*/, std::unique_ptr<MergeOperator>* guard,
+ std::string* /* errmsg */) {
+ guard->reset(new TtlMergeOperator(nullptr, nullptr));
+ return guard->get();
+ });
+ library.Register<CompactionFilterFactory>(
+ TtlCompactionFilterFactory::kClassName(),
+ [](const std::string& /*uri*/,
+ std::unique_ptr<CompactionFilterFactory>* guard,
+ std::string* /* errmsg */) {
+ guard->reset(new TtlCompactionFilterFactory(0, nullptr, nullptr));
+ return guard->get();
+ });
+ library.Register<CompactionFilter>(
+ TtlCompactionFilter::kClassName(),
+ [](const std::string& /*uri*/,
+ std::unique_ptr<CompactionFilter>* /*guard*/,
+ std::string* /* errmsg */) {
+ return new TtlCompactionFilter(0, nullptr, nullptr);
+ });
+ size_t num_types;
+ return static_cast<int>(library.GetFactoryCount(&num_types));
+}
// Open the db inside DBWithTTLImpl because options needs pointer to its ttl
DBWithTTLImpl::DBWithTTLImpl(DB* db) : DBWithTTL(db), closed_(false) {}
return s;
}
+void DBWithTTLImpl::RegisterTtlClasses() {
+ static std::once_flag once;
+ std::call_once(once, [&]() {
+ ObjectRegistry::Default()->AddLibrary("TTL", RegisterTtlObjects, "");
+ });
+}
+
Status DBWithTTL::Open(const Options& options, const std::string& dbname,
DBWithTTL** dbptr, int32_t ttl, bool read_only) {
-
DBOptions db_options(options);
ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> column_families;
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, DBWithTTL** dbptr,
const std::vector<int32_t>& ttls, bool read_only) {
+ DBWithTTLImpl::RegisterTtlClasses();
if (ttls.size() != column_families.size()) {
return Status::InvalidArgument(
"ttls size has to be the same as number of column families");
Status DBWithTTLImpl::CreateColumnFamilyWithTtl(
const ColumnFamilyOptions& options, const std::string& column_family_name,
ColumnFamilyHandle** handle, int ttl) {
+ RegisterTtlClasses();
ColumnFamilyOptions sanitized_options = options;
DBWithTTLImpl::SanitizeOptions(ttl, &sanitized_options,
GetEnv()->GetSystemClock().get());
#endif
namespace ROCKSDB_NAMESPACE {
-
+struct ConfigOptions;
+class ObjectLibrary;
+class ObjectRegistry;
class DBWithTTLImpl : public DBWithTTL {
public:
static void SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options,
SystemClock* clock);
+ static void RegisterTtlClasses();
explicit DBWithTTLImpl(DB* db);
virtual ~DBWithTTLImpl();
TtlCompactionFilter(int32_t ttl, SystemClock* clock,
const CompactionFilter* _user_comp_filter,
std::unique_ptr<const CompactionFilter>
- _user_comp_filter_from_factory = nullptr)
- : LayeredCompactionFilterBase(_user_comp_filter,
- std::move(_user_comp_filter_from_factory)),
- ttl_(ttl),
- clock_(clock) {}
+ _user_comp_filter_from_factory = nullptr);
virtual bool Filter(int level, const Slice& key, const Slice& old_val,
- std::string* new_val, bool* value_changed) const
- override {
- if (DBWithTTLImpl::IsStale(old_val, ttl_, clock_)) {
- return true;
- }
- if (user_comp_filter() == nullptr) {
- return false;
- }
- assert(old_val.size() >= DBWithTTLImpl::kTSLength);
- Slice old_val_without_ts(old_val.data(),
- old_val.size() - DBWithTTLImpl::kTSLength);
- if (user_comp_filter()->Filter(level, key, old_val_without_ts, new_val,
- value_changed)) {
+ std::string* new_val, bool* value_changed) const override;
+
+ const char* Name() const override { return kClassName(); }
+ static const char* kClassName() { return "TtlCompactionFilter"; }
+ bool IsInstanceOf(const std::string& name) const override {
+ if (name == "Delete By TTL") {
return true;
+ } else {
+ return LayeredCompactionFilterBase::IsInstanceOf(name);
}
- if (*value_changed) {
- new_val->append(
- old_val.data() + old_val.size() - DBWithTTLImpl::kTSLength,
- DBWithTTLImpl::kTSLength);
- }
- return false;
}
- virtual const char* Name() const override { return "Delete By TTL"; }
+ Status PrepareOptions(const ConfigOptions& config_options) override;
+ Status ValidateOptions(const DBOptions& db_opts,
+ const ColumnFamilyOptions& cf_opts) const override;
private:
int32_t ttl_;
public:
TtlCompactionFilterFactory(
int32_t ttl, SystemClock* clock,
- std::shared_ptr<CompactionFilterFactory> comp_filter_factory)
- : ttl_(ttl),
- clock_(clock),
- user_comp_filter_factory_(comp_filter_factory) {}
-
- virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
- const CompactionFilter::Context& context) override {
- std::unique_ptr<const CompactionFilter> user_comp_filter_from_factory =
- nullptr;
- if (user_comp_filter_factory_) {
- user_comp_filter_from_factory =
- user_comp_filter_factory_->CreateCompactionFilter(context);
- }
-
- return std::unique_ptr<TtlCompactionFilter>(new TtlCompactionFilter(
- ttl_, clock_, nullptr, std::move(user_comp_filter_from_factory)));
- }
+ std::shared_ptr<CompactionFilterFactory> comp_filter_factory);
+ std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+ const CompactionFilter::Context& context) override;
void SetTtl(int32_t ttl) {
ttl_ = ttl;
}
- virtual const char* Name() const override {
- return "TtlCompactionFilterFactory";
+ const char* Name() const override { return kClassName(); }
+ static const char* kClassName() { return "TtlCompactionFilterFactory"; }
+ Status PrepareOptions(const ConfigOptions& config_options) override;
+ Status ValidateOptions(const DBOptions& db_opts,
+ const ColumnFamilyOptions& cf_opts) const override;
+ const Customizable* Inner() const override {
+ return user_comp_filter_factory_.get();
}
private:
public:
explicit TtlMergeOperator(const std::shared_ptr<MergeOperator>& merge_op,
- SystemClock* clock)
- : user_merge_op_(merge_op), clock_(clock) {
- assert(merge_op);
- assert(clock);
- }
+ SystemClock* clock);
- virtual bool FullMergeV2(const MergeOperationInput& merge_in,
- MergeOperationOutput* merge_out) const override {
- const uint32_t ts_len = DBWithTTLImpl::kTSLength;
- if (merge_in.existing_value && merge_in.existing_value->size() < ts_len) {
- ROCKS_LOG_ERROR(merge_in.logger,
- "Error: Could not remove timestamp from existing value.");
- return false;
- }
+ bool FullMergeV2(const MergeOperationInput& merge_in,
+ MergeOperationOutput* merge_out) const override;
- // Extract time-stamp from each operand to be passed to user_merge_op_
- std::vector<Slice> operands_without_ts;
- for (const auto& operand : merge_in.operand_list) {
- if (operand.size() < ts_len) {
- ROCKS_LOG_ERROR(
- merge_in.logger,
- "Error: Could not remove timestamp from operand value.");
- return false;
- }
- operands_without_ts.push_back(operand);
- operands_without_ts.back().remove_suffix(ts_len);
- }
+ bool PartialMergeMulti(const Slice& key,
+ const std::deque<Slice>& operand_list,
+ std::string* new_value, Logger* logger) const override;
- // Apply the user merge operator (store result in *new_value)
- bool good = true;
- MergeOperationOutput user_merge_out(merge_out->new_value,
- merge_out->existing_operand);
- if (merge_in.existing_value) {
- Slice existing_value_without_ts(merge_in.existing_value->data(),
- merge_in.existing_value->size() - ts_len);
- good = user_merge_op_->FullMergeV2(
- MergeOperationInput(merge_in.key, &existing_value_without_ts,
- operands_without_ts, merge_in.logger),
- &user_merge_out);
- } else {
- good = user_merge_op_->FullMergeV2(
- MergeOperationInput(merge_in.key, nullptr, operands_without_ts,
- merge_in.logger),
- &user_merge_out);
- }
+ static const char* kClassName() { return "TtlMergeOperator"; }
- // Return false if the user merge operator returned false
- if (!good) {
- return false;
- }
-
- if (merge_out->existing_operand.data()) {
- merge_out->new_value.assign(merge_out->existing_operand.data(),
- merge_out->existing_operand.size());
- merge_out->existing_operand = Slice(nullptr, 0);
- }
-
- // Augment the *new_value with the ttl time-stamp
- int64_t curtime;
- if (!clock_->GetCurrentTime(&curtime).ok()) {
- ROCKS_LOG_ERROR(
- merge_in.logger,
- "Error: Could not get current time to be attached internally "
- "to the new value.");
- return false;
- } else {
- char ts_string[ts_len];
- EncodeFixed32(ts_string, (int32_t)curtime);
- merge_out->new_value.append(ts_string, ts_len);
+ const char* Name() const override { return kClassName(); }
+ bool IsInstanceOf(const std::string& name) const override {
+ if (name == "Merge By TTL") {
return true;
- }
- }
-
- virtual bool PartialMergeMulti(const Slice& key,
- const std::deque<Slice>& operand_list,
- std::string* new_value, Logger* logger) const
- override {
- const uint32_t ts_len = DBWithTTLImpl::kTSLength;
- std::deque<Slice> operands_without_ts;
-
- for (const auto& operand : operand_list) {
- if (operand.size() < ts_len) {
- ROCKS_LOG_ERROR(logger,
- "Error: Could not remove timestamp from value.");
- return false;
- }
-
- operands_without_ts.push_back(
- Slice(operand.data(), operand.size() - ts_len));
- }
-
- // Apply the user partial-merge operator (store result in *new_value)
- assert(new_value);
- if (!user_merge_op_->PartialMergeMulti(key, operands_without_ts, new_value,
- logger)) {
- return false;
- }
-
- // Augment the *new_value with the ttl time-stamp
- int64_t curtime;
- if (!clock_->GetCurrentTime(&curtime).ok()) {
- ROCKS_LOG_ERROR(
- logger,
- "Error: Could not get current time to be attached internally "
- "to the new value.");
- return false;
} else {
- char ts_string[ts_len];
- EncodeFixed32(ts_string, (int32_t)curtime);
- new_value->append(ts_string, ts_len);
- return true;
+ return MergeOperator::IsInstanceOf(name);
}
}
- virtual const char* Name() const override { return "Merge By TTL"; }
+ Status PrepareOptions(const ConfigOptions& config_options) override;
+ Status ValidateOptions(const DBOptions& db_opts,
+ const ColumnFamilyOptions& cf_opts) const override;
+ const Customizable* Inner() const override { return user_merge_op_.get(); }
private:
std::shared_ptr<MergeOperator> user_merge_op_;
SystemClock* clock_;
};
+extern "C" {
+int RegisterTtlObjects(ObjectLibrary& library, const std::string& /*arg*/);
+} // extern "C"
+
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE
#include <map>
#include <memory>
+
#include "rocksdb/compaction_filter.h"
+#include "rocksdb/convenience.h"
+#include "rocksdb/merge_operator.h"
#include "rocksdb/utilities/db_ttl.h"
+#include "rocksdb/utilities/object_registry.h"
#include "test_util/testharness.h"
#include "util/string_util.h"
+#include "utilities/merge_operators/bytesxor.h"
+#include "utilities/ttl/db_ttl_impl.h"
#ifndef OS_WIN
#include <unistd.h>
#endif
CloseTtl();
}
+class DummyFilter : public CompactionFilter {
+ public:
+ bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
+ std::string* /*new_value*/,
+ bool* /*value_changed*/) const override {
+ return false;
+ }
+
+ const char* Name() const override { return kClassName(); }
+ static const char* kClassName() { return "DummyFilter"; }
+};
+
+class DummyFilterFactory : public CompactionFilterFactory {
+ public:
+ const char* Name() const override { return kClassName(); }
+ static const char* kClassName() { return "DummyFilterFactory"; }
+
+ std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+ const CompactionFilter::Context&) override {
+ std::unique_ptr<CompactionFilter> f(new DummyFilter());
+ return f;
+ }
+};
+
+static int RegisterTestObjects(ObjectLibrary& library,
+ const std::string& /*arg*/) {
+ library.Register<CompactionFilter>(
+ "DummyFilter", [](const std::string& /*uri*/,
+ std::unique_ptr<CompactionFilter>* /*guard*/,
+ std::string* /* errmsg */) {
+ static DummyFilter dummy;
+ return &dummy;
+ });
+ library.Register<CompactionFilterFactory>(
+ "DummyFilterFactory", [](const std::string& /*uri*/,
+ std::unique_ptr<CompactionFilterFactory>* guard,
+ std::string* /* errmsg */) {
+ guard->reset(new DummyFilterFactory());
+ return guard->get();
+ });
+ return 2;
+}
+
+class TtlOptionsTest : public testing::Test {
+ public:
+ TtlOptionsTest() {
+ config_options_.registry->AddLibrary("RegisterTtlObjects",
+ RegisterTtlObjects, "");
+ config_options_.registry->AddLibrary("RegisterTtlTestObjects",
+ RegisterTestObjects, "");
+ }
+ ConfigOptions config_options_;
+};
+
+TEST_F(TtlOptionsTest, LoadTtlCompactionFilter) {
+ const CompactionFilter* filter = nullptr;
+
+ ASSERT_OK(CompactionFilter::CreateFromString(
+ config_options_, TtlCompactionFilter::kClassName(), &filter));
+ ASSERT_NE(filter, nullptr);
+ ASSERT_STREQ(filter->Name(), TtlCompactionFilter::kClassName());
+ auto ttl = filter->GetOptions<int32_t>("TTL");
+ ASSERT_NE(ttl, nullptr);
+ ASSERT_EQ(*ttl, 0);
+ ASSERT_OK(filter->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
+ delete filter;
+ filter = nullptr;
+
+ ASSERT_OK(CompactionFilter::CreateFromString(
+ config_options_, "id=TtlCompactionFilter; ttl=123", &filter));
+ ASSERT_NE(filter, nullptr);
+ ttl = filter->GetOptions<int32_t>("TTL");
+ ASSERT_NE(ttl, nullptr);
+ ASSERT_EQ(*ttl, 123);
+ ASSERT_OK(filter->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
+ delete filter;
+ filter = nullptr;
+
+ ASSERT_OK(CompactionFilter::CreateFromString(
+ config_options_,
+ "id=TtlCompactionFilter; ttl=456; user_filter=DummyFilter;", &filter));
+ ASSERT_NE(filter, nullptr);
+ auto inner = filter->CheckedCast<DummyFilter>();
+ ASSERT_NE(inner, nullptr);
+ ASSERT_OK(filter->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
+ std::string mismatch;
+ std::string opts_str = filter->ToString(config_options_);
+ const CompactionFilter* copy = nullptr;
+ ASSERT_OK(
+ CompactionFilter::CreateFromString(config_options_, opts_str, ©));
+ ASSERT_TRUE(filter->AreEquivalent(config_options_, copy, &mismatch));
+ delete filter;
+ delete copy;
+}
+
+TEST_F(TtlOptionsTest, LoadTtlCompactionFilterFactory) {
+ std::shared_ptr<CompactionFilterFactory> cff;
+
+ ASSERT_OK(CompactionFilterFactory::CreateFromString(
+ config_options_, TtlCompactionFilterFactory::kClassName(), &cff));
+ ASSERT_NE(cff.get(), nullptr);
+ ASSERT_STREQ(cff->Name(), TtlCompactionFilterFactory::kClassName());
+ auto ttl = cff->GetOptions<int32_t>("TTL");
+ ASSERT_NE(ttl, nullptr);
+ ASSERT_EQ(*ttl, 0);
+ ASSERT_OK(cff->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
+
+ ASSERT_OK(CompactionFilterFactory::CreateFromString(
+ config_options_, "id=TtlCompactionFilterFactory; ttl=123", &cff));
+ ASSERT_NE(cff.get(), nullptr);
+ ASSERT_STREQ(cff->Name(), TtlCompactionFilterFactory::kClassName());
+ ttl = cff->GetOptions<int32_t>("TTL");
+ ASSERT_NE(ttl, nullptr);
+ ASSERT_EQ(*ttl, 123);
+ ASSERT_OK(cff->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
+
+ ASSERT_OK(CompactionFilterFactory::CreateFromString(
+ config_options_,
+ "id=TtlCompactionFilterFactory; ttl=456; "
+ "user_filter_factory=DummyFilterFactory;",
+ &cff));
+ ASSERT_NE(cff.get(), nullptr);
+ auto filter = cff->CreateCompactionFilter(CompactionFilter::Context());
+ ASSERT_NE(filter.get(), nullptr);
+ auto ttlf = filter->CheckedCast<TtlCompactionFilter>();
+ ASSERT_EQ(filter.get(), ttlf);
+ auto user = filter->CheckedCast<DummyFilter>();
+ ASSERT_NE(user, nullptr);
+ ASSERT_OK(cff->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
+
+ std::string opts_str = cff->ToString(config_options_);
+ std::string mismatch;
+ std::shared_ptr<CompactionFilterFactory> copy;
+ ASSERT_OK(CompactionFilterFactory::CreateFromString(config_options_, opts_str,
+ ©));
+ ASSERT_TRUE(cff->AreEquivalent(config_options_, copy.get(), &mismatch));
+}
+
+TEST_F(TtlOptionsTest, LoadTtlMergeOperator) {
+ std::shared_ptr<MergeOperator> mo;
+
+ config_options_.invoke_prepare_options = false;
+ ASSERT_OK(MergeOperator::CreateFromString(
+ config_options_, TtlMergeOperator::kClassName(), &mo));
+ ASSERT_NE(mo.get(), nullptr);
+ ASSERT_STREQ(mo->Name(), TtlMergeOperator::kClassName());
+ ASSERT_NOK(mo->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
+
+ config_options_.invoke_prepare_options = true;
+ ASSERT_OK(MergeOperator::CreateFromString(
+ config_options_, "id=TtlMergeOperator; user_operator=bytesxor", &mo));
+ ASSERT_NE(mo.get(), nullptr);
+ ASSERT_STREQ(mo->Name(), TtlMergeOperator::kClassName());
+ ASSERT_OK(mo->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
+ auto ttl_mo = mo->CheckedCast<TtlMergeOperator>();
+ ASSERT_EQ(mo.get(), ttl_mo);
+ auto user = ttl_mo->CheckedCast<BytesXOROperator>();
+ ASSERT_NE(user, nullptr);
+
+ std::string mismatch;
+ std::string opts_str = mo->ToString(config_options_);
+ std::shared_ptr<MergeOperator> copy;
+ ASSERT_OK(MergeOperator::CreateFromString(config_options_, opts_str, ©));
+ ASSERT_TRUE(mo->AreEquivalent(config_options_, copy.get(), &mismatch));
+}
} // namespace ROCKSDB_NAMESPACE
// A black-box test for the ttl wrapper around rocksdb