#include "rocksdb/cache.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/utilities/convenience.h"
+#include "rocksdb/merge_operator.h"
using std::string;
#include "common/perf_counters.h"
#include "common/debug.h"
#undef dout_prefix
#define dout_prefix *_dout << "rocksdb: "
+//
+// One of these per rocksdb instance, implements the merge operator prefix stuff
+//
+class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperator {
+ RocksDBStore& store;
+ public:
+ const char *Name() const {
+ // Construct a name that rocksDB will validate against. We want to
+ // do this in a way that doesn't constrain the ordering of calls
+ // to set_merge_operator, so sort the merge operators and then
+ // construct a name from all of those parts.
+ store.assoc_name.clear();
+ map<std::string,std::string> names;
+ for (auto& p : store.merge_ops) names[p.first] = p.second->name();
+ for (auto& p : names) {
+ store.assoc_name += '.';
+ store.assoc_name += p.first;
+ store.assoc_name += ':';
+ store.assoc_name += p.second;
+ }
+ return store.assoc_name.c_str();
+ }
+
+ MergeOperatorRouter(RocksDBStore &_store) : store(_store) {}
+
+ virtual bool Merge(const rocksdb::Slice& key,
+ const rocksdb::Slice* existing_value,
+ const rocksdb::Slice& value,
+ std::string* new_value,
+ rocksdb::Logger* logger) const {
+ // Check each prefix
+ for (auto& p : store.merge_ops) {
+ if (p.first.compare(0, p.first.length(),
+ key.data(), p.first.length()) == 0 &&
+ key.data()[p.first.length()] == 0) {
+ if (existing_value) {
+ p.second->merge(existing_value->data(), existing_value->size(),
+ value.data(), value.size(),
+ new_value);
+ } else {
+ p.second->merge_nonexistant(value.data(), value.size(), new_value);
+ }
+ break;
+ }
+ }
+ return true; // OK :)
+ }
+
+};
+
+int RocksDBStore::set_merge_operator(
+ const string& prefix,
+ std::shared_ptr<KeyValueDB::MergeOperator> mop)
+{
+ // If you fail here, it's because you can't do this on an open database
+ assert(db == nullptr);
+ merge_ops.push_back(std::make_pair(prefix,mop));
+ return 0;
+}
+
class CephRocksdbLogger : public rocksdb::Logger {
CephContext *cct;
public:
dout(10) << __func__ << " set block size to " << g_conf->rocksdb_block_size
<< " cache size to " << g_conf->rocksdb_cache_size << dendl;
+ opt.merge_operator.reset(new MergeOperatorRouter(*this));
status = rocksdb::DB::Open(opt, path, &db);
if (!status.ok()) {
derr << status.ToString() << dendl;
rocksdb::DB *db;
rocksdb::Status status = rocksdb::DB::Open(options, dir, &db);
delete db;
+ db = nullptr;
return status.ok() ? 0 : -EIO;
}
// Ensure db is destroyed before dependent db_cache and filterpolicy
delete db;
+ db = nullptr;
if (priv) {
delete static_cast<rocksdb::Env*>(priv);
}
}
+void RocksDBStore::RocksDBTransactionImpl::merge(
+ const string &prefix,
+ const string &k,
+ const bufferlist &to_set_bl)
+{
+ string key = combine_strings(prefix, k);
+
+ // bufferlist::c_str() is non-constant, so we can't call c_str()
+ if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
+ bat->Merge(rocksdb::Slice(key),
+ rocksdb::Slice(to_set_bl.buffers().front().c_str(),
+ to_set_bl.length()));
+ } else {
+ // make a copy
+ bufferlist val = to_set_bl;
+ bat->Merge(rocksdb::Slice(key),
+ rocksdb::Slice(val.c_str(), val.length()));
+ }
+}
+
int RocksDBStore::get(
const string &prefix,
const std::set<string> &keys,
rocksdb::DB *db;
rocksdb::Status status = rocksdb::DB::Open(options, omap_dir, &db);
delete db;
+ db = nullptr;
return status.ok();
}
void RocksDBStore::compact_range(const string& start, const string& end)