return rocksdb::SliceParts(slices->data(), slices->size());
}
+
//
-// One of these per rocksdb column family(includes the default CF),
-// implements the merge operator prefix stuff
+// One of these for the default rocksdb column family, routing each prefix
+// to the appropriate MergeOperator.
//
-class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperator {
+class RocksDBStore::MergeOperatorRouter
+ : public rocksdb::AssociativeMergeOperator
+{
RocksDBStore& store;
- //name of the column family associated with this merge operator
- //only for explicit CF, not for the default CF
- std::string cf_name;
- public:
+public:
const char *Name() const override {
// Construct a name that rocksDB will validate against. We want to
// do this in a way that doesn't constrain the ordering of calls
// construct a name from all of those parts.
store.assoc_name.clear();
map<std::string,std::string> names;
- if (cf_name.empty()) {
- //for default column family
- for (auto& p : store.merge_ops) names[p.first] = p.second->name();
- for (auto& p : store.cf_handles) names.erase(p.first);
- } else {
- //for user created explicit column family
- for (auto& p : store.merge_ops) {
- if (p.first.compare(cf_name) == 0) {
- names[cf_name] = p.second->name();
- break;
- }
- }
+
+ for (auto& p : store.merge_ops) {
+ names[p.first] = p.second->name();
+ }
+ for (auto& p : store.cf_handles) {
+ names.erase(p.first);
}
for (auto& p : names) {
store.assoc_name += '.';
return store.assoc_name.c_str();
}
- //for default column family
MergeOperatorRouter(RocksDBStore &_store) : store(_store) {}
- //for user created explicit CF
- MergeOperatorRouter(RocksDBStore &_store, const std::string &cf)
- : store(_store), cf_name(cf) {}
bool Merge(const rocksdb::Slice& key,
- const rocksdb::Slice* existing_value,
- const rocksdb::Slice& value,
- std::string* new_value,
- rocksdb::Logger* logger) const override {
- if (cf_name.empty()) {
- // for default column family
- // extract prefix from key and compare against each registered merge op;
- // even though merge operator for explicit CF is included in merge_ops,
- // it won't be picked up, since it won't match.
- for (auto& p : store.merge_ops) {
- if (p.first.compare(0, p.first.length(),
- key.data(), p.first.length()) == 0 &&
- key.data()[p.first.length()] == 0) {
- if (existing_value) {
- p.second->merge(existing_value->data(), existing_value->size(),
- value.data(), value.size(),
- new_value);
- } else {
- p.second->merge_nonexistent(value.data(), value.size(), new_value);
- }
- break;
- }
- }
- } else {
- //for user created explicit column family
- for (auto& p : store.merge_ops) {
- if (p.first.compare(cf_name) == 0) {
- if (existing_value) {
- p.second->merge(existing_value->data(), existing_value->size(),
- value.data(), value.size(),
- new_value);
- } else {
- p.second->merge_nonexistent(value.data(), value.size(), new_value);
- }
- break;
+ const rocksdb::Slice* existing_value,
+ const rocksdb::Slice& value,
+ std::string* new_value,
+ rocksdb::Logger* logger) const override {
+ // for default column family
+ // extract prefix from key and compare against each registered merge op;
+ // even though merge operator for explicit CF is included in merge_ops,
+ // it won't be picked up, since it won't match.
+ for (auto& p : store.merge_ops) {
+ if (p.first.compare(0, p.first.length(),
+ key.data(), p.first.length()) == 0 &&
+ key.data()[p.first.length()] == 0) {
+ if (existing_value) {
+ p.second->merge(existing_value->data(), existing_value->size(),
+ value.data(), value.size(),
+ new_value);
+ } else {
+ p.second->merge_nonexistent(value.data(), value.size(), new_value);
}
+ break;
}
}
return true; // OK :)
}
};
+//
+// One of these per non-default column family, linked directly to the
+// merge operator for that CF/prefix (if any).
+//
+class RocksDBStore::MergeOperatorLinker
+ : public rocksdb::AssociativeMergeOperator
+{
+private:
+ std::shared_ptr<KeyValueDB::MergeOperator> mop;
+public:
+ MergeOperatorLinker(std::shared_ptr<KeyValueDB::MergeOperator> o) : mop(o) {}
+
+ const char *Name() const override {
+ return mop->name().c_str();
+ }
+
+ bool Merge(const rocksdb::Slice& key,
+ const rocksdb::Slice* existing_value,
+ const rocksdb::Slice& value,
+ std::string* new_value,
+ rocksdb::Logger* logger) const override {
+ if (existing_value) {
+ mop->merge(existing_value->data(), existing_value->size(),
+ value.data(), value.size(),
+ new_value);
+ } else {
+ mop->merge_nonexistent(value.data(), value.size(), new_value);
+ }
+ return true;
+ }
+};
+
int RocksDBStore::set_merge_operator(
const string& prefix,
std::shared_ptr<KeyValueDB::MergeOperator> mop)
return 0;
}
-int RocksDBStore::install_cf_mergeop(const string &cf_name,
- rocksdb::ColumnFamilyOptions *cf_opt)
+int RocksDBStore::install_cf_mergeop(
+ const string &cf_name,
+ rocksdb::ColumnFamilyOptions *cf_opt)
{
assert(cf_opt != nullptr);
- bool found_mop = false;
- for (auto &mop : merge_ops) {
- if (mop.first.compare(cf_name) == 0) {
- cf_opt->merge_operator.reset(new MergeOperatorRouter(*this, cf_name));
- found_mop = true;
- break;
+ cf_opt->merge_operator.reset();
+ for (auto& i : merge_ops) {
+ if (i.first == cf_name) {
+ cf_opt->merge_operator.reset(new MergeOperatorLinker(i.second));
}
}
- if (!found_mop)
- cf_opt->merge_operator.reset();
return 0;
}