From 895952f2c2ca798f6db74d5fc2b78aad8985ba92 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 2 Oct 2017 13:35:07 -0500 Subject: [PATCH] kv/RocksDBStore: simplify CF merge operator routing Merge operators always map to a prefix, just like a CF, so we can link directly to the operator without doing any lookup. Signed-off-by: Sage Weil --- src/kv/RocksDBStore.cc | 139 +++++++++++++++++++++-------------------- src/kv/RocksDBStore.h | 6 +- 2 files changed, 75 insertions(+), 70 deletions(-) diff --git a/src/kv/RocksDBStore.cc b/src/kv/RocksDBStore.cc index 35d312d9b5f..d4d9c45845b 100644 --- a/src/kv/RocksDBStore.cc +++ b/src/kv/RocksDBStore.cc @@ -52,16 +52,16 @@ static rocksdb::SliceParts prepare_sliceparts(const bufferlist &bl, return rocksdb::SliceParts(slices->data(), slices->size()); } + // -// One of these per rocksdb column family(includes the default CF), -// implements the merge operator prefix stuff +// One of these for the default rocksdb column family, routing each prefix +// to the appropriate MergeOperator. // -class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperator { +class RocksDBStore::MergeOperatorRouter + : public rocksdb::AssociativeMergeOperator +{ RocksDBStore& store; - //name of the column family associated with this merge operator - //only for explicit CF, not for the default CF - std::string cf_name; - public: +public: const char *Name() const override { // Construct a name that rocksDB will validate against. We want to // do this in a way that doesn't constrain the ordering of calls @@ -69,18 +69,12 @@ class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperat // construct a name from all of those parts. store.assoc_name.clear(); map names; - if (cf_name.empty()) { - //for default column family - for (auto& p : store.merge_ops) names[p.first] = p.second->name(); - for (auto& p : store.cf_handles) names.erase(p.first); - } else { - //for user created explicit column family - for (auto& p : store.merge_ops) { - if (p.first.compare(cf_name) == 0) { - names[cf_name] = p.second->name(); - break; - } - } + + for (auto& p : store.merge_ops) { + names[p.first] = p.second->name(); + } + for (auto& p : store.cf_handles) { + names.erase(p.first); } for (auto& p : names) { store.assoc_name += '.'; @@ -91,55 +85,67 @@ class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperat return store.assoc_name.c_str(); } - //for default column family MergeOperatorRouter(RocksDBStore &_store) : store(_store) {} - //for user created explicit CF - MergeOperatorRouter(RocksDBStore &_store, const std::string &cf) - : store(_store), cf_name(cf) {} bool Merge(const rocksdb::Slice& key, - const rocksdb::Slice* existing_value, - const rocksdb::Slice& value, - std::string* new_value, - rocksdb::Logger* logger) const override { - if (cf_name.empty()) { - // for default column family - // extract prefix from key and compare against each registered merge op; - // even though merge operator for explicit CF is included in merge_ops, - // it won't be picked up, since it won't match. - for (auto& p : store.merge_ops) { - if (p.first.compare(0, p.first.length(), - key.data(), p.first.length()) == 0 && - key.data()[p.first.length()] == 0) { - if (existing_value) { - p.second->merge(existing_value->data(), existing_value->size(), - value.data(), value.size(), - new_value); - } else { - p.second->merge_nonexistent(value.data(), value.size(), new_value); - } - break; - } - } - } else { - //for user created explicit column family - for (auto& p : store.merge_ops) { - if (p.first.compare(cf_name) == 0) { - if (existing_value) { - p.second->merge(existing_value->data(), existing_value->size(), - value.data(), value.size(), - new_value); - } else { - p.second->merge_nonexistent(value.data(), value.size(), new_value); - } - break; + const rocksdb::Slice* existing_value, + const rocksdb::Slice& value, + std::string* new_value, + rocksdb::Logger* logger) const override { + // for default column family + // extract prefix from key and compare against each registered merge op; + // even though merge operator for explicit CF is included in merge_ops, + // it won't be picked up, since it won't match. + for (auto& p : store.merge_ops) { + if (p.first.compare(0, p.first.length(), + key.data(), p.first.length()) == 0 && + key.data()[p.first.length()] == 0) { + if (existing_value) { + p.second->merge(existing_value->data(), existing_value->size(), + value.data(), value.size(), + new_value); + } else { + p.second->merge_nonexistent(value.data(), value.size(), new_value); } + break; } } return true; // OK :) } }; +// +// One of these per non-default column family, linked directly to the +// merge operator for that CF/prefix (if any). +// +class RocksDBStore::MergeOperatorLinker + : public rocksdb::AssociativeMergeOperator +{ +private: + std::shared_ptr mop; +public: + MergeOperatorLinker(std::shared_ptr o) : mop(o) {} + + const char *Name() const override { + return mop->name().c_str(); + } + + bool Merge(const rocksdb::Slice& key, + const rocksdb::Slice* existing_value, + const rocksdb::Slice& value, + std::string* new_value, + rocksdb::Logger* logger) const override { + if (existing_value) { + mop->merge(existing_value->data(), existing_value->size(), + value.data(), value.size(), + new_value); + } else { + mop->merge_nonexistent(value.data(), value.size(), new_value); + } + return true; + } +}; + int RocksDBStore::set_merge_operator( const string& prefix, std::shared_ptr mop) @@ -289,20 +295,17 @@ int RocksDBStore::create_db_dir() return 0; } -int RocksDBStore::install_cf_mergeop(const string &cf_name, - rocksdb::ColumnFamilyOptions *cf_opt) +int RocksDBStore::install_cf_mergeop( + const string &cf_name, + rocksdb::ColumnFamilyOptions *cf_opt) { assert(cf_opt != nullptr); - bool found_mop = false; - for (auto &mop : merge_ops) { - if (mop.first.compare(cf_name) == 0) { - cf_opt->merge_operator.reset(new MergeOperatorRouter(*this, cf_name)); - found_mop = true; - break; + cf_opt->merge_operator.reset(); + for (auto& i : merge_ops) { + if (i.first == cf_name) { + cf_opt->merge_operator.reset(new MergeOperatorLinker(i.second)); } } - if (!found_mop) - cf_opt->merge_operator.reset(); return 0; } diff --git a/src/kv/RocksDBStore.h b/src/kv/RocksDBStore.h index 22e733e1f90..031f8a6d214 100644 --- a/src/kv/RocksDBStore.h +++ b/src/kv/RocksDBStore.h @@ -394,9 +394,11 @@ public: static string past_prefix(const string &prefix); class MergeOperatorRouter; + class MergeOperatorLinker; friend class MergeOperatorRouter; - int set_merge_operator(const std::string& prefix, - std::shared_ptr mop) override; + int set_merge_operator( + const std::string& prefix, + std::shared_ptr mop) override; string assoc_name; ///< Name of associative operator uint64_t get_estimated_size(map &extra) override { -- 2.39.5