}
//
-// One of these per rocksdb instance, implements the merge operator prefix stuff
+// One of these per rocksdb column family(includes the default CF),
+// implements the merge operator prefix stuff
//
class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperator {
RocksDBStore& store;
+ //name of the column family associated with this merge operator
+ //only for explicit CF, not for the default CF
+ std::string cf_name;
public:
const char *Name() const override {
// Construct a name that rocksDB will validate against. We want to
// construct a name from all of those parts.
store.assoc_name.clear();
map<std::string,std::string> names;
- for (auto& p : store.merge_ops) names[p.first] = p.second->name();
+ if (cf_name.empty()) {
+ //for default column family
+ for (auto& p : store.merge_ops) names[p.first] = p.second->name();
+ for (auto& p : store.cf_handles) names.erase(p.first);
+ } else {
+ //for user created explicit column family
+ for (auto& p : store.merge_ops) {
+ if (p.first.compare(cf_name) == 0) {
+ names[cf_name] = p.second->name();
+ break;
+ }
+ }
+ }
for (auto& p : names) {
store.assoc_name += '.';
store.assoc_name += p.first;
return store.assoc_name.c_str();
}
+ //for default column family
MergeOperatorRouter(RocksDBStore &_store) : store(_store) {}
+ //for user created explicit CF
+ MergeOperatorRouter(RocksDBStore &_store, const std::string &cf)
+ : store(_store), cf_name(cf) {}
bool Merge(const rocksdb::Slice& key,
const rocksdb::Slice* existing_value,
const rocksdb::Slice& value,
std::string* new_value,
rocksdb::Logger* logger) const override {
- // Check each prefix
- for (auto& p : store.merge_ops) {
- if (p.first.compare(0, p.first.length(),
- key.data(), p.first.length()) == 0 &&
- key.data()[p.first.length()] == 0) {
- if (existing_value) {
- p.second->merge(existing_value->data(), existing_value->size(),
- value.data(), value.size(),
- new_value);
- } else {
- p.second->merge_nonexistent(value.data(), value.size(), new_value);
- }
- break;
+ if (cf_name.empty()) {
+ // for default column family
+ // extract prefix from key and compare against each registered merge op;
+ // even though merge operator for explicit CF is included in merge_ops,
+ // it won't be picked up, since it won't match.
+ for (auto& p : store.merge_ops) {
+ if (p.first.compare(0, p.first.length(),
+ key.data(), p.first.length()) == 0 &&
+ key.data()[p.first.length()] == 0) {
+ if (existing_value) {
+ p.second->merge(existing_value->data(), existing_value->size(),
+ value.data(), value.size(),
+ new_value);
+ } else {
+ p.second->merge_nonexistent(value.data(), value.size(), new_value);
+ }
+ break;
+ }
+ }
+ } else {
+ //for user created explicit column family
+ for (auto& p : store.merge_ops) {
+ if (p.first.compare(cf_name) == 0) {
+ if (existing_value) {
+ p.second->merge(existing_value->data(), existing_value->size(),
+ value.data(), value.size(),
+ new_value);
+ } else {
+ p.second->merge_nonexistent(value.data(), value.size(), new_value);
+ }
+ break;
+ }
}
}
return true; // OK :)
}
-
};
int RocksDBStore::set_merge_operator(
return 0;
}
-int RocksDBStore::create_and_open(ostream &out)
+int RocksDBStore::create_db_dir()
{
if (env) {
unique_ptr<rocksdb::Directory> dir;
return r;
}
}
+ return 0;
+}
+
+int RocksDBStore::install_cf_mergeop(const string &cf_name,
+ rocksdb::ColumnFamilyOptions *cf_opt)
+{
+ assert(cf_opt != nullptr);
+ bool found_mop = false;
+ for (auto &mop : merge_ops) {
+ if (mop.first.compare(cf_name) == 0) {
+ cf_opt->merge_operator.reset(new MergeOperatorRouter(*this, cf_name));
+ found_mop = true;
+ break;
+ }
+ }
+ if (!found_mop)
+ cf_opt->merge_operator.reset();
+ return 0;
+}
+
+int RocksDBStore::create_and_open(ostream &out)
+{
+ int r = create_db_dir();
+ if (r < 0)
+ return r;
return do_open(out, true);
}
-int RocksDBStore::do_open(ostream &out, bool create_if_missing)
+int RocksDBStore::create_and_open(ostream &out,
+ const vector<ColumnFamily>& cfs)
+{
+ int r = create_db_dir();
+ if (r < 0)
+ return r;
+ return do_open(out, true, &cfs);
+}
+
+int RocksDBStore::do_open(ostream &out, bool create_if_missing,
+ const vector<ColumnFamily>* cfs)
{
rocksdb::Options opt;
rocksdb::Status status;
<< dendl;
opt.merge_operator.reset(new MergeOperatorRouter(*this));
- status = rocksdb::DB::Open(opt, path, &db);
- if (!status.ok()) {
- derr << status.ToString() << dendl;
- return -EINVAL;
+ if (create_if_missing) {
+ status = rocksdb::DB::Open(opt, path, &db);
+ if (!status.ok()) {
+ derr << status.ToString() << dendl;
+ return -EINVAL;
+ }
+ // create and open column families
+ if (cfs) {
+ for (auto& p : *cfs) {
+ // copy default CF settings, block cache, merge operators as
+ // the base for new CF
+ rocksdb::ColumnFamilyOptions cf_opt(opt);
+ // user input options will override the base options
+ status = rocksdb::GetColumnFamilyOptionsFromString(
+ cf_opt, p.option, &cf_opt);
+ if (!status.ok()) {
+ derr << __func__ << " invalid db column family option string for CF: "
+ << p.name << dendl;
+ return -EINVAL;
+ }
+ install_cf_mergeop(p.name, &cf_opt);
+ rocksdb::ColumnFamilyHandle *cf;
+ status = db->CreateColumnFamily(cf_opt, p.name, &cf);
+ if (!status.ok()) {
+ derr << __func__ << " Failed to create rocksdb column family: "
+ << p.name << dendl;
+ return -EINVAL;
+ }
+ // store the new CF handle
+ add_column_family(p.name, static_cast<void*>(cf));
+ }
+ }
+ } else {
+ std::vector<string> existing_cfs;
+ status = rocksdb::DB::ListColumnFamilies(
+ rocksdb::DBOptions(opt),
+ path,
+ &existing_cfs);
+ dout(1) << __func__ << " column families: " << existing_cfs << dendl;
+ if (existing_cfs.empty()) {
+ // no column families
+ status = rocksdb::DB::Open(opt, path, &db);
+ if (!status.ok()) {
+ derr << status.ToString() << dendl;
+ return -EINVAL;
+ }
+ } else {
+ // we cannot change column families for a created database. so, map
+ // what options we are given to whatever cf's already exist.
+ std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
+ for (auto& n : existing_cfs) {
+ // copy default CF settings, block cache, merge operators as
+ // the base for new CF
+ rocksdb::ColumnFamilyOptions cf_opt(opt);
+ bool found = false;
+ if (cfs) {
+ for (auto& i : *cfs) {
+ if (i.name == n) {
+ found = true;
+ status = rocksdb::GetColumnFamilyOptionsFromString(
+ cf_opt, i.option, &cf_opt);
+ if (!status.ok()) {
+ derr << __func__ << " invalid db column family options for CF '"
+ << i.name << "': " << i.option << dendl;
+ return -EINVAL;
+ }
+ }
+ }
+ }
+ if (n != rocksdb::kDefaultColumnFamilyName) {
+ install_cf_mergeop(n, &cf_opt);
+ }
+ column_families.push_back(rocksdb::ColumnFamilyDescriptor(n, cf_opt));
+ if (!found && n != rocksdb::kDefaultColumnFamilyName) {
+ dout(1) << __func__ << " column family '" << n
+ << "' exists but not expected" << dendl;
+ }
+ }
+ std::vector<rocksdb::ColumnFamilyHandle*> handles;
+ status = rocksdb::DB::Open(rocksdb::DBOptions(opt),
+ path, column_families, &handles, &db);
+ if (!status.ok()) {
+ derr << status.ToString() << dendl;
+ return -EINVAL;
+ }
+ for (unsigned i = 0; i < existing_cfs.size(); ++i) {
+ if (existing_cfs[i] != rocksdb::kDefaultColumnFamilyName) {
+ add_column_family(existing_cfs[i], static_cast<void*>(handles[i]));
+ }
+ }
+ }
}
PerfCountersBuilder plb(g_ceph_context, "rocksdb", l_rocksdb_first, l_rocksdb_last);
delete logger;
// Ensure db is destroyed before dependent db_cache and filterpolicy
+ for (auto& p : cf_handles) {
+ db->DestroyColumnFamilyHandle(
+ static_cast<rocksdb::ColumnFamilyHandle*>(p.second));
+ p.second = nullptr;
+ }
delete db;
db = nullptr;
db = _db;
}
-static void put_bat(
- rocksdb::WriteBatch& bat,
- const string &key,
+void RocksDBStore::RocksDBTransactionImpl::put_bat(
+ rocksdb::WriteBatch& bat,
+ rocksdb::ColumnFamilyHandle *cf,
+ const string &key,
const bufferlist &to_set_bl)
{
// bufferlist::c_str() is non-constant, so we can't call c_str()
if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
- bat.Put(rocksdb::Slice(key),
- rocksdb::Slice(to_set_bl.buffers().front().c_str(),
- to_set_bl.length()));
+ bat.Put(cf,
+ rocksdb::Slice(key),
+ rocksdb::Slice(to_set_bl.buffers().front().c_str(),
+ to_set_bl.length()));
} else {
rocksdb::Slice key_slice(key);
vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
- bat.Put(nullptr, rocksdb::SliceParts(&key_slice, 1),
+ bat.Put(cf,
+ rocksdb::SliceParts(&key_slice, 1),
prepare_sliceparts(to_set_bl, &value_slices));
}
}
const string &k,
const bufferlist &to_set_bl)
{
- string key = combine_strings(prefix, k);
-
- put_bat(bat, key, to_set_bl);
+ auto cf = db->get_cf_handle(prefix);
+ if (cf) {
+ put_bat(bat, cf, k, to_set_bl);
+ } else {
+ string key = combine_strings(prefix, k);
+ put_bat(bat, db->db->DefaultColumnFamily(), key, to_set_bl);
+ }
}
void RocksDBStore::RocksDBTransactionImpl::set(
const char *k, size_t keylen,
const bufferlist &to_set_bl)
{
- string key;
- combine_strings(prefix, k, keylen, &key);
-
- put_bat(bat, key, to_set_bl);
+ auto cf = db->get_cf_handle(prefix);
+ if (cf) {
+ string key(k, keylen); // fixme?
+ put_bat(bat, cf, key, to_set_bl);
+ } else {
+ string key;
+ combine_strings(prefix, k, keylen, &key);
+ put_bat(bat, cf, key, to_set_bl);
+ }
}
void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
const string &k)
{
- bat.Delete(combine_strings(prefix, k));
+ auto cf = db->get_cf_handle(prefix);
+ if (cf) {
+ bat.Delete(cf, rocksdb::Slice(k));
+ } else {
+ bat.Delete(combine_strings(prefix, k));
+ }
}
void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix,
const char *k,
size_t keylen)
{
- string key;
- combine_strings(prefix, k, keylen, &key);
- bat.Delete(key);
+ auto cf = db->get_cf_handle(prefix);
+ if (cf) {
+ bat.Delete(cf, rocksdb::Slice(k, keylen));
+ } else {
+ string key;
+ combine_strings(prefix, k, keylen, &key);
+ bat.Delete(rocksdb::Slice(key));
+ }
}
void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix,
const string &k)
{
- bat.SingleDelete(combine_strings(prefix, k));
+ auto cf = db->get_cf_handle(prefix);
+ if (cf) {
+ bat.SingleDelete(cf, k);
+ } else {
+ bat.SingleDelete(combine_strings(prefix, k));
+ }
}
void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
{
- if (db->enable_rmrange) {
- string endprefix = prefix;
- endprefix.push_back('\x01');
- bat.DeleteRange(combine_strings(prefix, string()),
- combine_strings(endprefix, string()));
+ auto cf = db->get_cf_handle(prefix);
+ if (cf) {
+ if (db->enable_rmrange) {
+ string endprefix("\xff\xff\xff\xff"); // FIXME: this is cheating...
+ bat.DeleteRange(cf, string(), endprefix);
+ } else {
+ auto it = db->get_iterator(prefix);
+ for (it->seek_to_first();
+ it->valid();
+ it->next()) {
+ bat.Delete(cf, rocksdb::Slice(it->key()));
+ }
+ }
} else {
- KeyValueDB::Iterator it = db->get_iterator(prefix);
- for (it->seek_to_first();
- it->valid();
- it->next()) {
- bat.Delete(combine_strings(prefix, it->key()));
+ if (db->enable_rmrange) {
+ string endprefix = prefix;
+ endprefix.push_back('\x01');
+ bat.DeleteRange(combine_strings(prefix, string()),
+ combine_strings(endprefix, string()));
+ } else {
+ auto it = db->get_iterator(prefix);
+ for (it->seek_to_first();
+ it->valid();
+ it->next()) {
+ bat.Delete(combine_strings(prefix, it->key()));
+ }
}
}
}
const string &start,
const string &end)
{
- if (db->enable_rmrange) {
- bat.DeleteRange(combine_strings(prefix, start), combine_strings(prefix, end));
+ auto cf = db->get_cf_handle(prefix);
+ if (cf) {
+ if (db->enable_rmrange) {
+ bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end));
+ } else {
+ auto it = db->get_iterator(prefix);
+ it->lower_bound(start);
+ while (it->valid()) {
+ if (it->key() >= end) {
+ break;
+ }
+ bat.Delete(cf, rocksdb::Slice(it->key()));
+ it->next();
+ }
+ }
} else {
- auto it = db->get_iterator(prefix);
- it->lower_bound(start);
- while (it->valid()) {
- if (it->key() >= end) {
- break;
+ if (db->enable_rmrange) {
+ bat.DeleteRange(
+ db->db->DefaultColumnFamily(),
+ rocksdb::Slice(combine_strings(prefix, start)),
+ rocksdb::Slice(combine_strings(prefix, end)));
+ } else {
+ auto it = db->get_iterator(prefix);
+ it->lower_bound(start);
+ while (it->valid()) {
+ if (it->key() >= end) {
+ break;
+ }
+ bat.Delete(combine_strings(prefix, it->key()));
+ it->next();
}
- bat.Delete(combine_strings(prefix, it->key()));
- it->next();
}
}
}
const string &k,
const bufferlist &to_set_bl)
{
- string key = combine_strings(prefix, k);
-
- // bufferlist::c_str() is non-constant, so we can't call c_str()
- if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
- bat.Merge(rocksdb::Slice(key),
- rocksdb::Slice(to_set_bl.buffers().front().c_str(),
- to_set_bl.length()));
+ auto cf = db->get_cf_handle(prefix);
+ if (cf) {
+ // bufferlist::c_str() is non-constant, so we can't call c_str()
+ if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
+ bat.Merge(
+ cf,
+ rocksdb::Slice(k),
+ rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length()));
+ } else {
+ // make a copy
+ rocksdb::Slice key_slice(k);
+ vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
+ bat.Merge(cf, rocksdb::SliceParts(&key_slice, 1),
+ prepare_sliceparts(to_set_bl, &value_slices));
+ }
} else {
- // make a copy
- rocksdb::Slice key_slice(key);
- vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
- bat.Merge(nullptr, rocksdb::SliceParts(&key_slice, 1),
- prepare_sliceparts(to_set_bl, &value_slices));
+ string key = combine_strings(prefix, k);
+ // bufferlist::c_str() is non-constant, so we can't call c_str()
+ if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
+ bat.Merge(
+ db->db->DefaultColumnFamily(),
+ rocksdb::Slice(key),
+ rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length()));
+ } else {
+ // make a copy
+ rocksdb::Slice key_slice(key);
+ vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
+ bat.Merge(
+ db->db->DefaultColumnFamily(),
+ rocksdb::SliceParts(&key_slice, 1),
+ prepare_sliceparts(to_set_bl, &value_slices));
+ }
}
}
-//gets will bypass RocksDB row cache, since it uses iterator
int RocksDBStore::get(
const string &prefix,
const std::set<string> &keys,
std::map<string, bufferlist> *out)
{
utime_t start = ceph_clock_now();
- for (std::set<string>::const_iterator i = keys.begin();
- i != keys.end(); ++i) {
- std::string value;
- std::string bound = combine_strings(prefix, *i);
- auto status = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(bound), &value);
- if (status.ok()) {
- (*out)[*i].append(value);
- } else if (status.IsIOError()) {
- ceph_abort_msg(cct, status.ToString());
+ auto cf = get_cf_handle(prefix);
+ if (cf) {
+ for (auto& key : keys) {
+ std::string value;
+ auto status = db->Get(rocksdb::ReadOptions(),
+ cf,
+ rocksdb::Slice(key),
+ &value);
+ if (status.ok()) {
+ (*out)[key].append(value);
+ } else if (status.IsIOError()) {
+ ceph_abort_msg(cct, status.ToString());
+ }
+ }
+ } else {
+ for (auto& key : keys) {
+ std::string value;
+ string k = combine_strings(prefix, key);
+ auto status = db->Get(rocksdb::ReadOptions(),
+ db->DefaultColumnFamily(),
+ rocksdb::Slice(k),
+ &value);
+ if (status.ok()) {
+ (*out)[key].append(value);
+ } else if (status.IsIOError()) {
+ ceph_abort_msg(cct, status.ToString());
+ }
}
-
}
utime_t lat = ceph_clock_now() - start;
logger->inc(l_rocksdb_gets);
assert(out && (out->length() == 0));
utime_t start = ceph_clock_now();
int r = 0;
- string value, k;
+ string value;
rocksdb::Status s;
- k = combine_strings(prefix, key);
- s = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(k), &value);
+ auto cf = get_cf_handle(prefix);
+ if (cf) {
+ s = db->Get(rocksdb::ReadOptions(),
+ cf,
+ rocksdb::Slice(key),
+ &value);
+ } else {
+ string k = combine_strings(prefix, key);
+ s = db->Get(rocksdb::ReadOptions(),
+ db->DefaultColumnFamily(),
+ rocksdb::Slice(k),
+ &value);
+ }
if (s.ok()) {
out->append(value);
} else if (s.IsNotFound()) {
assert(out && (out->length() == 0));
utime_t start = ceph_clock_now();
int r = 0;
- string value, k;
- combine_strings(prefix, key, keylen, &k);
+ string value;
rocksdb::Status s;
- s = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(k), &value);
+ auto cf = get_cf_handle(prefix);
+ if (cf) {
+ s = db->Get(rocksdb::ReadOptions(),
+ cf,
+ rocksdb::Slice(key, keylen),
+ &value);
+ } else {
+ string k;
+ combine_strings(prefix, key, keylen, &k);
+ s = db->Get(rocksdb::ReadOptions(),
+ db->DefaultColumnFamily(),
+ rocksdb::Slice(k),
+ &value);
+ }
if (s.ok()) {
out->append(value);
} else if (s.IsNotFound()) {
rocksdb::Slice cend(end);
db->CompactRange(options, &cstart, &cend);
}
+
RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl()
{
delete dbiter;
db->NewIterator(rocksdb::ReadOptions()));
}
+class CFIteratorImpl : public KeyValueDB::IteratorImpl {
+protected:
+ string prefix;
+ rocksdb::Iterator *dbiter;
+public:
+ explicit CFIteratorImpl(const std::string& p,
+ rocksdb::Iterator *iter)
+ : prefix(p), dbiter(iter) { }
+ ~CFIteratorImpl() {
+ delete dbiter;
+ }
+
+ int seek_to_first() override {
+ dbiter->SeekToFirst();
+ return dbiter->status().ok() ? 0 : -1;
+ }
+ int seek_to_last() override {
+ dbiter->SeekToLast();
+ return dbiter->status().ok() ? 0 : -1;
+ }
+ int upper_bound(const string &after) override {
+ lower_bound(after);
+ if (valid() && (key() == after)) {
+ next();
+ }
+ return dbiter->status().ok() ? 0 : -1;
+ }
+ int lower_bound(const string &to) override {
+ rocksdb::Slice slice_bound(to);
+ dbiter->Seek(slice_bound);
+ return dbiter->status().ok() ? 0 : -1;
+ }
+ int next(bool validate=true) {
+ if (valid()) {
+ dbiter->Next();
+ }
+ return dbiter->status().ok() ? 0 : -1;
+ }
+ int prev(bool validate=true) {
+ if (valid()) {
+ dbiter->Prev();
+ }
+ return dbiter->status().ok() ? 0 : -1;
+ }
+ bool valid() override {
+ return dbiter->Valid();
+ }
+ string key() override {
+ return dbiter->key().ToString();
+ }
+ std::pair<std::string, std::string> raw_key() {
+ return make_pair(prefix, key());
+ }
+ bufferlist value() override {
+ return to_bufferlist(dbiter->value());
+ }
+ bufferptr value_as_ptr() override {
+ rocksdb::Slice val = dbiter->value();
+ return bufferptr(val.data(), val.size());
+ }
+ int status() override {
+ return dbiter->status().ok() ? 0 : -1;
+ }
+};
+
+KeyValueDB::Iterator RocksDBStore::get_iterator(const std::string& prefix)
+{
+ rocksdb::ColumnFamilyHandle *cf_handle =
+ static_cast<rocksdb::ColumnFamilyHandle*>(get_cf_handle(prefix));
+ if (cf_handle) {
+ return std::make_shared<CFIteratorImpl>(
+ prefix,
+ db->NewIterator(rocksdb::ReadOptions(), cf_handle));
+ } else {
+ return KeyValueDB::get_iterator(prefix);
+ }
+}
KVTest() : db(0) {}
+ string _bl_to_str(bufferlist val) {
+ string str(val.c_str(), val.length());
+ return str;
+ }
+
void rm_r(string path) {
string cmd = string("rm -r ") + path;
cout << "==> " << cmd << std::endl;
const char *ldata, size_t llen,
const char *rdata, size_t rlen,
std::string *new_value) override {
-
*new_value = std::string(ldata, llen) + std::string(rdata, rlen);
}
// We use each operator name and each prefix to construct the
fini();
}
+TEST_P(KVTest, RocksDBColumnFamilyTest) {
+ if(string(GetParam()) != "rocksdb")
+ return;
+
+ std::vector<KeyValueDB::ColumnFamily> cfs;
+ cfs.push_back(KeyValueDB::ColumnFamily("cf1", ""));
+ cfs.push_back(KeyValueDB::ColumnFamily("cf2", ""));
+ ASSERT_EQ(0, db->init(g_conf->bluestore_rocksdb_options));
+ cout << "creating two column families and opening them" << std::endl;
+ ASSERT_EQ(0, db->create_and_open(cout, cfs));
+ {
+ KeyValueDB::Transaction t = db->get_transaction();
+ bufferlist value;
+ value.append("value");
+ cout << "write a transaction includes three keys in different CFs" << std::endl;
+ t->set("prefix", "key", value);
+ t->set("cf1", "key", value);
+ t->set("cf2", "key2", value);
+ ASSERT_EQ(0, db->submit_transaction_sync(t));
+ }
+ fini();
+
+ init();
+ ASSERT_EQ(0, db->open(cout, cfs));
+ {
+ bufferlist v1, v2, v3;
+ cout << "reopen db and read those keys" << std::endl;
+ ASSERT_EQ(0, db->get("prefix", "key", &v1));
+ ASSERT_EQ(0, _bl_to_str(v1) != "value");
+ ASSERT_EQ(0, db->get("cf1", "key", &v2));
+ ASSERT_EQ(0, _bl_to_str(v2) != "value");
+ ASSERT_EQ(0, db->get("cf2", "key2", &v3));
+ ASSERT_EQ(0, _bl_to_str(v2) != "value");
+ }
+ {
+ cout << "delete two keys in CFs" << std::endl;
+ KeyValueDB::Transaction t = db->get_transaction();
+ t->rmkey("prefix", "key");
+ t->rmkey("cf2", "key2");
+ ASSERT_EQ(0, db->submit_transaction_sync(t));
+ }
+ fini();
+
+ init();
+ ASSERT_EQ(0, db->open(cout, cfs));
+ {
+ cout << "reopen db and read keys again." << std::endl;
+ bufferlist v1, v2, v3;
+ ASSERT_EQ(-ENOENT, db->get("prefix", "key", &v1));
+ ASSERT_EQ(0, db->get("cf1", "key", &v2));
+ ASSERT_EQ(0, _bl_to_str(v2) != "value");
+ ASSERT_EQ(-ENOENT, db->get("cf2", "key2", &v3));
+ }
+ fini();
+}
+
+TEST_P(KVTest, RocksDBIteratorTest) {
+ if(string(GetParam()) != "rocksdb")
+ return;
+
+ std::vector<KeyValueDB::ColumnFamily> cfs;
+ cfs.push_back(KeyValueDB::ColumnFamily("cf1", ""));
+ ASSERT_EQ(0, db->init(g_conf->bluestore_rocksdb_options));
+ cout << "creating one column family and opening it" << std::endl;
+ ASSERT_EQ(0, db->create_and_open(cout, cfs));
+ {
+ KeyValueDB::Transaction t = db->get_transaction();
+ bufferlist bl1;
+ bl1.append("hello");
+ bufferlist bl2;
+ bl2.append("world");
+ cout << "write some kv pairs into default and new CFs" << std::endl;
+ t->set("prefix", "key1", bl1);
+ t->set("prefix", "key2", bl2);
+ t->set("cf1", "key1", bl1);
+ t->set("cf1", "key2", bl2);
+ ASSERT_EQ(0, db->submit_transaction_sync(t));
+ }
+ {
+ cout << "iterating the default CF" << std::endl;
+ KeyValueDB::Iterator iter = db->get_iterator("prefix");
+ iter->seek_to_first();
+ ASSERT_EQ(1, iter->valid());
+ ASSERT_EQ("key1", iter->key());
+ ASSERT_EQ("hello", _bl_to_str(iter->value()));
+ ASSERT_EQ(0, iter->next());
+ ASSERT_EQ(1, iter->valid());
+ ASSERT_EQ("key2", iter->key());
+ ASSERT_EQ("world", _bl_to_str(iter->value()));
+ }
+ {
+ cout << "iterating the new CF" << std::endl;
+ KeyValueDB::Iterator iter = db->get_iterator("cf1");
+ iter->seek_to_first();
+ ASSERT_EQ(1, iter->valid());
+ ASSERT_EQ("key1", iter->key());
+ ASSERT_EQ("hello", _bl_to_str(iter->value()));
+ ASSERT_EQ(0, iter->next());
+ ASSERT_EQ(1, iter->valid());
+ ASSERT_EQ("key2", iter->key());
+ ASSERT_EQ("world", _bl_to_str(iter->value()));
+ }
+ fini();
+}
+
+TEST_P(KVTest, RocksDBCFMerge) {
+ if(string(GetParam()) != "rocksdb")
+ return;
+
+ shared_ptr<KeyValueDB::MergeOperator> p(new AppendMOP);
+ int r = db->set_merge_operator("cf1",p);
+ if (r < 0)
+ return; // No merge operators for this database type
+ std::vector<KeyValueDB::ColumnFamily> cfs;
+ cfs.push_back(KeyValueDB::ColumnFamily("cf1", ""));
+ ASSERT_EQ(0, db->init(g_conf->bluestore_rocksdb_options));
+ cout << "creating one column family and opening it" << std::endl;
+ ASSERT_EQ(0, db->create_and_open(cout, cfs));
+
+ {
+ KeyValueDB::Transaction t = db->get_transaction();
+ bufferlist v1, v2, v3;
+ v1.append(string("1"));
+ v2.append(string("2"));
+ v3.append(string("3"));
+ t->set("P", "K1", v1);
+ t->set("cf1", "A1", v2);
+ t->rmkey("cf1", "A2");
+ t->merge("cf1", "A2", v3);
+ db->submit_transaction_sync(t);
+ }
+ {
+ bufferlist v1, v2, v3;
+ ASSERT_EQ(0, db->get("P", "K1", &v1));
+ ASSERT_EQ(tostr(v1), "1");
+ ASSERT_EQ(0, db->get("cf1", "A1", &v2));
+ ASSERT_EQ(tostr(v2), "2");
+ ASSERT_EQ(0, db->get("cf1", "A2", &v3));
+ ASSERT_EQ(tostr(v3), "?3");
+ }
+ {
+ KeyValueDB::Transaction t = db->get_transaction();
+ bufferlist v1;
+ v1.append(string("1"));
+ t->merge("cf1", "A2", v1);
+ db->submit_transaction_sync(t);
+ }
+ {
+ bufferlist v;
+ ASSERT_EQ(0, db->get("cf1", "A2", &v));
+ ASSERT_EQ(tostr(v), "?31");
+ }
+ fini();
+}
INSTANTIATE_TEST_CASE_P(
KeyValueDB,