From: Allen Samuels Date: Fri, 22 Apr 2016 20:41:16 +0000 (-0400) Subject: kv/RocksDBStore: implement merge operator X-Git-Tag: v11.0.0~845^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F8707%2Fhead;p=ceph.git kv/RocksDBStore: implement merge operator Signed-off-by: Allen Samuels --- diff --git a/src/kv/RocksDBStore.cc b/src/kv/RocksDBStore.cc index d8751cc4435b..bd0acd6ad40f 100644 --- a/src/kv/RocksDBStore.cc +++ b/src/kv/RocksDBStore.cc @@ -18,6 +18,7 @@ #include "rocksdb/cache.h" #include "rocksdb/filter_policy.h" #include "rocksdb/utilities/convenience.h" +#include "rocksdb/merge_operator.h" using std::string; #include "common/perf_counters.h" #include "common/debug.h" @@ -32,6 +33,66 @@ using std::string; #undef dout_prefix #define dout_prefix *_dout << "rocksdb: " +// +// One of these per rocksdb instance, implements the merge operator prefix stuff +// +class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperator { + RocksDBStore& store; + public: + const char *Name() const { + // Construct a name that rocksDB will validate against. We want to + // do this in a way that doesn't constrain the ordering of calls + // to set_merge_operator, so sort the merge operators and then + // construct a name from all of those parts. + store.assoc_name.clear(); + map names; + for (auto& p : store.merge_ops) names[p.first] = p.second->name(); + for (auto& p : names) { + store.assoc_name += '.'; + store.assoc_name += p.first; + store.assoc_name += ':'; + store.assoc_name += p.second; + } + return store.assoc_name.c_str(); + } + + MergeOperatorRouter(RocksDBStore &_store) : store(_store) {} + + virtual bool Merge(const rocksdb::Slice& key, + const rocksdb::Slice* existing_value, + const rocksdb::Slice& value, + std::string* new_value, + rocksdb::Logger* logger) const { + // Check each prefix + for (auto& p : store.merge_ops) { + if (p.first.compare(0, p.first.length(), + key.data(), p.first.length()) == 0 && + key.data()[p.first.length()] == 0) { + if (existing_value) { + p.second->merge(existing_value->data(), existing_value->size(), + value.data(), value.size(), + new_value); + } else { + p.second->merge_nonexistant(value.data(), value.size(), new_value); + } + break; + } + } + return true; // OK :) + } + +}; + +int RocksDBStore::set_merge_operator( + const string& prefix, + std::shared_ptr mop) +{ + // If you fail here, it's because you can't do this on an open database + assert(db == nullptr); + merge_ops.push_back(std::make_pair(prefix,mop)); + return 0; +} + class CephRocksdbLogger : public rocksdb::Logger { CephContext *cct; public: @@ -226,6 +287,7 @@ int RocksDBStore::do_open(ostream &out, bool create_if_missing) dout(10) << __func__ << " set block size to " << g_conf->rocksdb_block_size << " cache size to " << g_conf->rocksdb_cache_size << dendl; + opt.merge_operator.reset(new MergeOperatorRouter(*this)); status = rocksdb::DB::Open(opt, path, &db); if (!status.ok()) { derr << status.ToString() << dendl; @@ -260,6 +322,7 @@ int RocksDBStore::_test_init(const string& dir) rocksdb::DB *db; rocksdb::Status status = rocksdb::DB::Open(options, dir, &db); delete db; + db = nullptr; return status.ok() ? 0 : -EIO; } @@ -270,6 +333,7 @@ RocksDBStore::~RocksDBStore() // Ensure db is destroyed before dependent db_cache and filterpolicy delete db; + db = nullptr; if (priv) { delete static_cast(priv); @@ -383,6 +447,26 @@ void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix } } +void RocksDBStore::RocksDBTransactionImpl::merge( + const string &prefix, + const string &k, + const bufferlist &to_set_bl) +{ + string key = combine_strings(prefix, k); + + // bufferlist::c_str() is non-constant, so we can't call c_str() + if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) { + bat->Merge(rocksdb::Slice(key), + rocksdb::Slice(to_set_bl.buffers().front().c_str(), + to_set_bl.length())); + } else { + // make a copy + bufferlist val = to_set_bl; + bat->Merge(rocksdb::Slice(key), + rocksdb::Slice(val.c_str(), val.length())); + } +} + int RocksDBStore::get( const string &prefix, const std::set &keys, @@ -534,6 +618,7 @@ bool RocksDBStore::check_omap_dir(string &omap_dir) rocksdb::DB *db; rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db); delete db; + db = nullptr; return status.ok(); } void RocksDBStore::compact_range(const string& start, const string& end) diff --git a/src/kv/RocksDBStore.h b/src/kv/RocksDBStore.h index 41508ba6cd10..79897248c4dd 100644 --- a/src/kv/RocksDBStore.h +++ b/src/kv/RocksDBStore.h @@ -154,6 +154,10 @@ public: void rmkeys_by_prefix( const string &prefix ); + void merge( + const string& prefix, + const string& k, + const bufferlist &bl); }; KeyValueDB::Transaction get_transaction() { @@ -217,6 +221,12 @@ public: static bufferlist to_bufferlist(rocksdb::Slice in); static string past_prefix(const string &prefix); + class MergeOperatorRouter; + friend class MergeOperatorRouter; + virtual int set_merge_operator(const std::string& prefix, + std::shared_ptr mop); + string assoc_name; ///< Name of associative operator + virtual uint64_t get_estimated_size(map &extra) { DIR *store_dir = opendir(path.c_str()); if (!store_dir) { @@ -291,4 +301,6 @@ protected: }; + + #endif