## Unreleased
### Bug Fixes
* If the primary's CURRENT file is missing or inaccessible, the secondary instance should not hang repeatedly trying to switch to a new MANIFEST. It should instead return the error code encountered while accessing the file.
-
+### New Features
+* Made the EventListener extend the Customizable class.
+* EventListeners that have a non-empty Name() and that are registered with the ObjectRegistry can now be serialized to/from the OPTIONS file.
## 6.23.0 (2021-07-16)
### Behavior Changes
* Obsolete keys in the bottommost level that were preserved for a snapshot will now be cleaned upon snapshot release in all cases. This form of compaction (snapshot release triggered compaction) previously had an artificial limitation that multiple tombstones needed to be present.
class OnFileDeletionListener : public EventListener {
public:
OnFileDeletionListener() : matched_count_(0), expected_file_name_("") {}
+ const char* Name() const override { return kClassName(); }
+ static const char* kClassName() { return "OnFileDeletionListener"; }
void SetExpectedFileName(const std::string file_name) {
expected_file_name_ = file_name;
class FlushCounterListener : public EventListener {
public:
+ const char* Name() const override { return kClassName(); }
+ static const char* kClassName() { return "FlushCounterListener"; }
std::atomic<int> count{0};
std::atomic<FlushReason> expected_flush_reason{FlushReason::kOthers};
#include "db/event_helpers.h"
+#include "rocksdb/convenience.h"
+#include "rocksdb/listener.h"
+#include "rocksdb/utilities/customizable_util.h"
+
namespace ROCKSDB_NAMESPACE {
+#ifndef ROCKSDB_LITE
+Status EventListener::CreateFromString(const ConfigOptions& config_options,
+ const std::string& id,
+ std::shared_ptr<EventListener>* result) {
+ return LoadSharedObject<EventListener>(config_options, id, nullptr, result);
+}
+#endif // ROCKSDB_LITE
namespace {
template <class T>
db_paths_(db_paths),
column_families_(column_families),
num_pending_file_creations_(0) {}
+
#ifndef ROCKSDB_LITE
+ const char* Name() const override { return kClassName(); }
+ static const char* kClassName() { return "DBStressListener"; }
+
~DbStressListener() override { assert(num_pending_file_creations_ == 0); }
void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
assert(IsValidColumnFamilyName(info.cf_name));
ROCKSDB_NAMESPACE::DB* db;
ROCKSDB_NAMESPACE::Options options;
options.create_if_missing = true;
- ROCKSDB_NAMESPACE::Status status = ROCKSDB_NAMESPACE::DB::Open(options, db_path, &db);
+ ROCKSDB_NAMESPACE::Status status =
+ ROCKSDB_NAMESPACE::DB::Open(options, db_path, &db);
if (!status.ok()) {
return 0;
}
break;
}
case kIterator: {
- ROCKSDB_NAMESPACE::Iterator* it = db->NewIterator(ROCKSDB_NAMESPACE::ReadOptions());
+ ROCKSDB_NAMESPACE::Iterator* it =
+ db->NewIterator(ROCKSDB_NAMESPACE::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
}
delete it;
case kColumn: {
ROCKSDB_NAMESPACE::ColumnFamilyHandle* cf;
ROCKSDB_NAMESPACE::Status s;
- s = db->CreateColumnFamily(ROCKSDB_NAMESPACE::ColumnFamilyOptions(), "new_cf",
- &cf);
+ s = db->CreateColumnFamily(ROCKSDB_NAMESPACE::ColumnFamilyOptions(),
+ "new_cf", &cf);
s = db->DestroyColumnFamilyHandle(cf);
db->Close();
delete db;
std::vector<ROCKSDB_NAMESPACE::ColumnFamilyDescriptor> column_families;
// have to open default column family
column_families.push_back(ROCKSDB_NAMESPACE::ColumnFamilyDescriptor(
- ROCKSDB_NAMESPACE::kDefaultColumnFamilyName, ROCKSDB_NAMESPACE::ColumnFamilyOptions()));
+ ROCKSDB_NAMESPACE::kDefaultColumnFamilyName,
+ ROCKSDB_NAMESPACE::ColumnFamilyOptions()));
// open the new one, too
column_families.push_back(ROCKSDB_NAMESPACE::ColumnFamilyDescriptor(
"new_cf", ROCKSDB_NAMESPACE::ColumnFamilyOptions()));
std::vector<ROCKSDB_NAMESPACE::ColumnFamilyHandle*> handles;
- s = ROCKSDB_NAMESPACE::DB::Open(ROCKSDB_NAMESPACE::DBOptions(), db_path, column_families,
- &handles, &db);
+ s = ROCKSDB_NAMESPACE::DB::Open(ROCKSDB_NAMESPACE::DBOptions(), db_path,
+ column_families, &handles, &db);
if (s.ok()) {
std::string key1 = fuzzed_data.ConsumeRandomLengthString();
std::string val1 = fuzzed_data.ConsumeRandomLengthString();
std::string key2 = fuzzed_data.ConsumeRandomLengthString();
- s = db->Put(ROCKSDB_NAMESPACE::WriteOptions(), handles[1], key1, val1);
+ s = db->Put(ROCKSDB_NAMESPACE::WriteOptions(), handles[1], key1,
+ val1);
std::string value;
- s = db->Get(ROCKSDB_NAMESPACE::ReadOptions(), handles[1], key2, &value);
+ s = db->Get(ROCKSDB_NAMESPACE::ReadOptions(), handles[1], key2,
+ &value);
s = db->DropColumnFamily(handles[1]);
for (auto handle : handles) {
s = db->DestroyColumnFamilyHandle(handle);
protobuf_mutator::libfuzzer::PostProcessorRegistration<DBOperations> reg = {
[](DBOperations* input, unsigned int /* seed */) {
- const ROCKSDB_NAMESPACE::Comparator* comparator = ROCKSDB_NAMESPACE::BytewiseComparator();
+ const ROCKSDB_NAMESPACE::Comparator* comparator =
+ ROCKSDB_NAMESPACE::BytewiseComparator();
auto ops = input->mutable_operations();
// Make sure begin <= end for DELETE_RANGE.
for (DBOperation& op : *ops) {
const std::string kDbPath = "/tmp/db_map_fuzzer_test";
auto fs = ROCKSDB_NAMESPACE::FileSystem::Default();
- if (fs->FileExists(kDbPath, ROCKSDB_NAMESPACE::IOOptions(), /*dbg=*/nullptr).ok()) {
+ if (fs->FileExists(kDbPath, ROCKSDB_NAMESPACE::IOOptions(), /*dbg=*/nullptr)
+ .ok()) {
std::cerr << "db path " << kDbPath << " already exists" << std::endl;
abort();
}
for (const DBOperation& op : input.operations()) {
switch (op.type()) {
case OpType::PUT: {
- CHECK_OK(db->Put(ROCKSDB_NAMESPACE::WriteOptions(), op.key(), op.value()));
+ CHECK_OK(
+ db->Put(ROCKSDB_NAMESPACE::WriteOptions(), op.key(), op.value()));
kv[op.key()] = op.value();
break;
}
CHECK_OK(ROCKSDB_NAMESPACE::DB::Open(options, kDbPath, &db));
auto kv_it = kv.begin();
- ROCKSDB_NAMESPACE::Iterator* it = db->NewIterator(ROCKSDB_NAMESPACE::ReadOptions());
+ ROCKSDB_NAMESPACE::Iterator* it =
+ db->NewIterator(ROCKSDB_NAMESPACE::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next(), kv_it++) {
CHECK_TRUE(kv_it != kv.end());
CHECK_EQ(it->key().ToString(), kv_it->first);
#include "rocksdb/compaction_job_stats.h"
#include "rocksdb/compression_type.h"
+#include "rocksdb/customizable.h"
#include "rocksdb/status.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/types.h"
// the current thread holding any DB mutex. This is to prevent potential
// deadlock and performance issue when using EventListener callback
// in a complex way.
-class EventListener {
+class EventListener : public Customizable {
public:
+ static const char* Type() { return "EventListener"; }
+ static Status CreateFromString(const ConfigOptions& options,
+ const std::string& id,
+ std::shared_ptr<EventListener>* result);
+ const char* Name() const override {
+ // Since EventListeners did not have a name previously, we will assume
+ // an empty name. Instances should override this method.
+ return "";
+ }
// A callback function to RocksDB which will be called whenever a
// registered RocksDB flushes a file. The default implementation is
// no-op.
const std::string& prefix) const {
std::string result;
std::string parent;
- if (!config_options.IsShallow()) {
+ std::string id = GetId();
+ if (!config_options.IsShallow() && !id.empty()) {
parent = Configurable::SerializeOptions(config_options, "");
}
if (parent.empty()) {
- result = GetId();
+ result = id;
} else {
- result.append(prefix + ConfigurableHelper::kIdPropName + "=" + GetId() +
- config_options.delimiter);
+ result.append(prefix);
+ result.append(ConfigurableHelper::kIdPropName).append("=");
+ result.append(id).append(config_options.delimiter);
result.append(parent);
}
return result;
#include <cstring>
#include <unordered_map>
-#include "options/configurable_helper.h"
+#include "db/db_test_util.h"
#include "options/options_helper.h"
#include "options/options_parser.h"
#include "rocksdb/convenience.h"
guard->reset(new mock::MockTableFactory());
return guard->get();
});
+ library.Register<EventListener>(
+ OnFileDeletionListener::kClassName(),
+ [](const std::string& /*uri*/, std::unique_ptr<EventListener>* guard,
+ std::string* /* errmsg */) {
+ guard->reset(new OnFileDeletionListener());
+ return guard->get();
+ });
+ library.Register<EventListener>(
+ FlushCounterListener::kClassName(),
+ [](const std::string& /*uri*/, std::unique_ptr<EventListener>* guard,
+ std::string* /* errmsg */) {
+ guard->reset(new FlushCounterListener());
+ return guard->get();
+ });
library.Register<const Comparator>(
test::SimpleSuffixReverseComparator::kClassName(),
[](const std::string& /*uri*/,
}
#ifndef ROCKSDB_LITE
+TEST_F(LoadCustomizableTest, LoadEventListenerTest) {
+ std::shared_ptr<EventListener> result;
+
+ ASSERT_NOK(EventListener::CreateFromString(
+ config_options_, OnFileDeletionListener::kClassName(), &result));
+ ASSERT_NOK(EventListener::CreateFromString(
+ config_options_, FlushCounterListener::kClassName(), &result));
+ if (RegisterTests("Test")) {
+ ASSERT_OK(EventListener::CreateFromString(
+ config_options_, OnFileDeletionListener::kClassName(), &result));
+ ASSERT_NE(result, nullptr);
+ ASSERT_STREQ(result->Name(), OnFileDeletionListener::kClassName());
+ ASSERT_OK(EventListener::CreateFromString(
+ config_options_, FlushCounterListener::kClassName(), &result));
+ ASSERT_NE(result, nullptr);
+ ASSERT_STREQ(result->Name(), FlushCounterListener::kClassName());
+ }
+}
+
TEST_F(LoadCustomizableTest, LoadEncryptionProviderTest) {
std::shared_ptr<EncryptionProvider> result;
ASSERT_NOK(
#include "rocksdb/configurable.h"
#include "rocksdb/env.h"
#include "rocksdb/file_system.h"
+#include "rocksdb/listener.h"
#include "rocksdb/rate_limiter.h"
#include "rocksdb/sst_file_manager.h"
#include "rocksdb/system_clock.h"
std::shared_ptr<RateLimiter> rate_limiter;
std::shared_ptr<Statistics> statistics;
std::vector<DbPath> db_paths;
- std::vector<std::shared_ptr<EventListener>> listeners;
FileTypeSet checksum_handoff_file_types;
*/
{"advise_random_on_open",
{offsetof(struct ImmutableDBOptions, allow_data_in_errors),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
+ // Allow EventListeners that have a non-empty Name() to be read/written
+ // as options Each listener will either be
+ // - A simple name (e.g. "MyEventListener")
+ // - A name with properties (e.g. "{id=MyListener1; timeout=60}"
+ // Multiple listeners will be separated by a ":":
+ // - "MyListener0;{id=MyListener1; timeout=60}
+ {"listeners",
+ {offsetof(struct ImmutableDBOptions, listeners), OptionType::kVector,
+ OptionVerificationType::kByNameAllowNull,
+ OptionTypeFlags::kCompareNever,
+ [](const ConfigOptions& opts, const std::string& /*name*/,
+ const std::string& value, void* addr) {
+ ConfigOptions embedded = opts;
+ embedded.ignore_unsupported_options = true;
+ std::vector<std::shared_ptr<EventListener>> listeners;
+ Status s;
+ for (size_t start = 0, end = 0;
+ s.ok() && start < value.size() && end != std::string::npos;
+ start = end + 1) {
+ std::string token;
+ s = OptionTypeInfo::NextToken(value, ':', start, &end, &token);
+ if (s.ok() && !token.empty()) {
+ std::shared_ptr<EventListener> listener;
+ s = EventListener::CreateFromString(embedded, token, &listener);
+ if (s.ok() && listener != nullptr) {
+ listeners.push_back(listener);
+ }
+ }
+ }
+ if (s.ok()) { // It worked
+ *(static_cast<std::vector<std::shared_ptr<EventListener>>*>(
+ addr)) = listeners;
+ }
+ return s;
+ },
+ [](const ConfigOptions& opts, const std::string& /*name*/,
+ const void* addr, std::string* value) {
+ const auto listeners =
+ static_cast<const std::vector<std::shared_ptr<EventListener>>*>(
+ addr);
+ ConfigOptions embedded = opts;
+ embedded.delimiter = ";";
+ int printed = 0;
+ for (const auto& listener : *listeners) {
+ auto id = listener->GetId();
+ if (!id.empty()) {
+ std::string elem_str = listener->ToString(embedded, "");
+ if (printed++ == 0) {
+ value->append("{");
+ } else {
+ value->append(":");
+ }
+ value->append(elem_str);
+ }
+ }
+ if (printed > 0) {
+ value->append("}");
+ }
+ return Status::OK();
+ },
+ nullptr}},
};
const std::string OptionsHelper::kDBOptionsName = "DBOptions";
leveldb_opt.block_restart_interval);
ASSERT_EQ(table_opt->filter_policy.get(), leveldb_opt.filter_policy);
}
+#ifndef ROCKSDB_LITE
+class TestEventListener : public EventListener {
+ private:
+ std::string id_;
+
+ public:
+ explicit TestEventListener(const std::string& id) : id_("Test" + id) {}
+ const char* Name() const override { return id_.c_str(); }
+};
+
+static std::unordered_map<std::string, OptionTypeInfo>
+ test_listener_option_info = {
+ {"s",
+ {0, OptionType::kString, OptionVerificationType::kNormal,
+ OptionTypeFlags::kNone}},
+
+};
+
+class TestConfigEventListener : public TestEventListener {
+ private:
+ std::string s_;
+
+ public:
+ explicit TestConfigEventListener(const std::string& id)
+ : TestEventListener("Config" + id) {
+ s_ = id;
+ RegisterOptions("Test", &s_, &test_listener_option_info);
+ }
+};
+
+static int RegisterTestEventListener(ObjectLibrary& library,
+ const std::string& arg) {
+ library.Register<EventListener>(
+ "Test" + arg,
+ [](const std::string& name, std::unique_ptr<EventListener>* guard,
+ std::string* /* errmsg */) {
+ guard->reset(new TestEventListener(name.substr(4)));
+ return guard->get();
+ });
+ library.Register<EventListener>(
+ "TestConfig" + arg,
+ [](const std::string& name, std::unique_ptr<EventListener>* guard,
+ std::string* /* errmsg */) {
+ guard->reset(new TestConfigEventListener(name.substr(10)));
+ return guard->get();
+ });
+ return 1;
+}
+TEST_F(OptionsTest, OptionsListenerTest) {
+ DBOptions orig, copy;
+ orig.listeners.push_back(std::make_shared<TestEventListener>("1"));
+ orig.listeners.push_back(std::make_shared<TestEventListener>("2"));
+ orig.listeners.push_back(std::make_shared<TestEventListener>(""));
+ orig.listeners.push_back(std::make_shared<TestConfigEventListener>("1"));
+ orig.listeners.push_back(std::make_shared<TestConfigEventListener>("2"));
+ orig.listeners.push_back(std::make_shared<TestConfigEventListener>(""));
+ ConfigOptions config_opts(orig);
+ config_opts.registry->AddLibrary("listener", RegisterTestEventListener, "1");
+ std::string opts_str;
+ ASSERT_OK(GetStringFromDBOptions(config_opts, orig, &opts_str));
+ ASSERT_OK(GetDBOptionsFromString(config_opts, orig, opts_str, ©));
+ ASSERT_OK(GetStringFromDBOptions(config_opts, copy, &opts_str));
+ ASSERT_EQ(
+ copy.listeners.size(),
+ 2); // The Test{Config}1 Listeners could be loaded but not the others
+ ASSERT_OK(RocksDBOptionsParser::VerifyDBOptions(config_opts, orig, copy));
+}
+#endif // ROCKSDB_LITE
#ifndef ROCKSDB_LITE
const static std::string kCustomEnvName = "Custom";
~ErrorHandlerListener() override {}
+ const char* Name() const override { return kClassName(); }
+ static const char* kClassName() { return "ErrorHandlerListener"; }
+
void OnErrorRecoveryBegin(BackgroundErrorReason /*reason*/,
Status /*bg_error*/,
bool* auto_recovery) override {
blob_db_impl_->UpdateLiveSSTSize();
}
+ const char* Name() const override { return kClassName(); }
+ static const char* kClassName() { return "BlobDBListener"; }
+
protected:
BlobDBImpl* blob_db_impl_;
};
explicit BlobDBListenerGC(BlobDBImpl* blob_db_impl)
: BlobDBListener(blob_db_impl) {}
+ const char* Name() const override { return kClassName(); }
+ static const char* kClassName() { return "BlobDBListenerGC"; }
void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
BlobDBListener::OnFlushCompleted(db, info);
/*
- A wrapper around ROCKSDB_NAMESPACE::TransactionDBMutexFactory-provided condition and
- mutex that provides toku_pthread_*-like interface. The functions are named
+ A wrapper around ROCKSDB_NAMESPACE::TransactionDBMutexFactory-provided
+ condition and mutex that provides toku_pthread_*-like interface. The functions
+ are named
toku_external_{mutex|cond}_XXX