From: Jianjian Huo Date: Fri, 29 Sep 2017 22:03:05 +0000 (-0500) Subject: kv/RocksDBStore: add CF support X-Git-Tag: v13.0.1~642^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=13360284eddc288ab4e29358dc9755bd7899b382;p=ceph.git kv/RocksDBStore: add CF support - detect and use existing CFs on open Signed-off-by: Jianjian Huo Signed-off-by: Adam C. Emerson Signed-off-by: Sage Weil --- diff --git a/src/kv/RocksDBStore.cc b/src/kv/RocksDBStore.cc index 98e410ffdcf..35d312d9b5f 100644 --- a/src/kv/RocksDBStore.cc +++ b/src/kv/RocksDBStore.cc @@ -53,10 +53,14 @@ static rocksdb::SliceParts prepare_sliceparts(const bufferlist &bl, } // -// One of these per rocksdb instance, implements the merge operator prefix stuff +// One of these per rocksdb column family(includes the default CF), +// implements the merge operator prefix stuff // class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperator { RocksDBStore& store; + //name of the column family associated with this merge operator + //only for explicit CF, not for the default CF + std::string cf_name; public: const char *Name() const override { // Construct a name that rocksDB will validate against. We want to @@ -65,7 +69,19 @@ class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperat // construct a name from all of those parts. store.assoc_name.clear(); map names; - for (auto& p : store.merge_ops) names[p.first] = p.second->name(); + if (cf_name.empty()) { + //for default column family + for (auto& p : store.merge_ops) names[p.first] = p.second->name(); + for (auto& p : store.cf_handles) names.erase(p.first); + } else { + //for user created explicit column family + for (auto& p : store.merge_ops) { + if (p.first.compare(cf_name) == 0) { + names[cf_name] = p.second->name(); + break; + } + } + } for (auto& p : names) { store.assoc_name += '.'; store.assoc_name += p.first; @@ -75,31 +91,53 @@ class RocksDBStore::MergeOperatorRouter : public rocksdb::AssociativeMergeOperat return store.assoc_name.c_str(); } + //for default column family MergeOperatorRouter(RocksDBStore &_store) : store(_store) {} + //for user created explicit CF + MergeOperatorRouter(RocksDBStore &_store, const std::string &cf) + : store(_store), cf_name(cf) {} bool Merge(const rocksdb::Slice& key, const rocksdb::Slice* existing_value, const rocksdb::Slice& value, std::string* new_value, rocksdb::Logger* logger) const override { - // Check each prefix - for (auto& p : store.merge_ops) { - if (p.first.compare(0, p.first.length(), - key.data(), p.first.length()) == 0 && - key.data()[p.first.length()] == 0) { - if (existing_value) { - p.second->merge(existing_value->data(), existing_value->size(), - value.data(), value.size(), - new_value); - } else { - p.second->merge_nonexistent(value.data(), value.size(), new_value); - } - break; + if (cf_name.empty()) { + // for default column family + // extract prefix from key and compare against each registered merge op; + // even though merge operator for explicit CF is included in merge_ops, + // it won't be picked up, since it won't match. + for (auto& p : store.merge_ops) { + if (p.first.compare(0, p.first.length(), + key.data(), p.first.length()) == 0 && + key.data()[p.first.length()] == 0) { + if (existing_value) { + p.second->merge(existing_value->data(), existing_value->size(), + value.data(), value.size(), + new_value); + } else { + p.second->merge_nonexistent(value.data(), value.size(), new_value); + } + break; + } + } + } else { + //for user created explicit column family + for (auto& p : store.merge_ops) { + if (p.first.compare(cf_name) == 0) { + if (existing_value) { + p.second->merge(existing_value->data(), existing_value->size(), + value.data(), value.size(), + new_value); + } else { + p.second->merge_nonexistent(value.data(), value.size(), new_value); + } + break; + } } } return true; // OK :) } - }; int RocksDBStore::set_merge_operator( @@ -233,7 +271,7 @@ int RocksDBStore::init(string _options_str) return 0; } -int RocksDBStore::create_and_open(ostream &out) +int RocksDBStore::create_db_dir() { if (env) { unique_ptr dir; @@ -248,10 +286,45 @@ int RocksDBStore::create_and_open(ostream &out) return r; } } + return 0; +} + +int RocksDBStore::install_cf_mergeop(const string &cf_name, + rocksdb::ColumnFamilyOptions *cf_opt) +{ + assert(cf_opt != nullptr); + bool found_mop = false; + for (auto &mop : merge_ops) { + if (mop.first.compare(cf_name) == 0) { + cf_opt->merge_operator.reset(new MergeOperatorRouter(*this, cf_name)); + found_mop = true; + break; + } + } + if (!found_mop) + cf_opt->merge_operator.reset(); + return 0; +} + +int RocksDBStore::create_and_open(ostream &out) +{ + int r = create_db_dir(); + if (r < 0) + return r; return do_open(out, true); } -int RocksDBStore::do_open(ostream &out, bool create_if_missing) +int RocksDBStore::create_and_open(ostream &out, + const vector& cfs) +{ + int r = create_db_dir(); + if (r < 0) + return r; + return do_open(out, true, &cfs); +} + +int RocksDBStore::do_open(ostream &out, bool create_if_missing, + const vector* cfs) { rocksdb::Options opt; rocksdb::Status status; @@ -368,10 +441,97 @@ int RocksDBStore::do_open(ostream &out, bool create_if_missing) << dendl; opt.merge_operator.reset(new MergeOperatorRouter(*this)); - status = rocksdb::DB::Open(opt, path, &db); - if (!status.ok()) { - derr << status.ToString() << dendl; - return -EINVAL; + if (create_if_missing) { + status = rocksdb::DB::Open(opt, path, &db); + if (!status.ok()) { + derr << status.ToString() << dendl; + return -EINVAL; + } + // create and open column families + if (cfs) { + for (auto& p : *cfs) { + // copy default CF settings, block cache, merge operators as + // the base for new CF + rocksdb::ColumnFamilyOptions cf_opt(opt); + // user input options will override the base options + status = rocksdb::GetColumnFamilyOptionsFromString( + cf_opt, p.option, &cf_opt); + if (!status.ok()) { + derr << __func__ << " invalid db column family option string for CF: " + << p.name << dendl; + return -EINVAL; + } + install_cf_mergeop(p.name, &cf_opt); + rocksdb::ColumnFamilyHandle *cf; + status = db->CreateColumnFamily(cf_opt, p.name, &cf); + if (!status.ok()) { + derr << __func__ << " Failed to create rocksdb column family: " + << p.name << dendl; + return -EINVAL; + } + // store the new CF handle + add_column_family(p.name, static_cast(cf)); + } + } + } else { + std::vector existing_cfs; + status = rocksdb::DB::ListColumnFamilies( + rocksdb::DBOptions(opt), + path, + &existing_cfs); + dout(1) << __func__ << " column families: " << existing_cfs << dendl; + if (existing_cfs.empty()) { + // no column families + status = rocksdb::DB::Open(opt, path, &db); + if (!status.ok()) { + derr << status.ToString() << dendl; + return -EINVAL; + } + } else { + // we cannot change column families for a created database. so, map + // what options we are given to whatever cf's already exist. + std::vector column_families; + for (auto& n : existing_cfs) { + // copy default CF settings, block cache, merge operators as + // the base for new CF + rocksdb::ColumnFamilyOptions cf_opt(opt); + bool found = false; + if (cfs) { + for (auto& i : *cfs) { + if (i.name == n) { + found = true; + status = rocksdb::GetColumnFamilyOptionsFromString( + cf_opt, i.option, &cf_opt); + if (!status.ok()) { + derr << __func__ << " invalid db column family options for CF '" + << i.name << "': " << i.option << dendl; + return -EINVAL; + } + } + } + } + if (n != rocksdb::kDefaultColumnFamilyName) { + install_cf_mergeop(n, &cf_opt); + } + column_families.push_back(rocksdb::ColumnFamilyDescriptor(n, cf_opt)); + if (!found && n != rocksdb::kDefaultColumnFamilyName) { + dout(1) << __func__ << " column family '" << n + << "' exists but not expected" << dendl; + } + } + std::vector handles; + status = rocksdb::DB::Open(rocksdb::DBOptions(opt), + path, column_families, &handles, &db); + if (!status.ok()) { + derr << status.ToString() << dendl; + return -EINVAL; + } + for (unsigned i = 0; i < existing_cfs.size(); ++i) { + if (existing_cfs[i] != rocksdb::kDefaultColumnFamilyName) { + add_column_family(existing_cfs[i], static_cast(handles[i])); + } + } + } } PerfCountersBuilder plb(g_ceph_context, "rocksdb", l_rocksdb_first, l_rocksdb_last); @@ -418,6 +578,11 @@ RocksDBStore::~RocksDBStore() delete logger; // Ensure db is destroyed before dependent db_cache and filterpolicy + for (auto& p : cf_handles) { + db->DestroyColumnFamilyHandle( + static_cast(p.second)); + p.second = nullptr; + } delete db; db = nullptr; @@ -586,20 +751,23 @@ RocksDBStore::RocksDBTransactionImpl::RocksDBTransactionImpl(RocksDBStore *_db) db = _db; } -static void put_bat( - rocksdb::WriteBatch& bat, - const string &key, +void RocksDBStore::RocksDBTransactionImpl::put_bat( + rocksdb::WriteBatch& bat, + rocksdb::ColumnFamilyHandle *cf, + const string &key, const bufferlist &to_set_bl) { // bufferlist::c_str() is non-constant, so we can't call c_str() if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) { - bat.Put(rocksdb::Slice(key), - rocksdb::Slice(to_set_bl.buffers().front().c_str(), - to_set_bl.length())); + bat.Put(cf, + rocksdb::Slice(key), + rocksdb::Slice(to_set_bl.buffers().front().c_str(), + to_set_bl.length())); } else { rocksdb::Slice key_slice(key); vector value_slices(to_set_bl.buffers().size()); - bat.Put(nullptr, rocksdb::SliceParts(&key_slice, 1), + bat.Put(cf, + rocksdb::SliceParts(&key_slice, 1), prepare_sliceparts(to_set_bl, &value_slices)); } } @@ -609,9 +777,13 @@ void RocksDBStore::RocksDBTransactionImpl::set( const string &k, const bufferlist &to_set_bl) { - string key = combine_strings(prefix, k); - - put_bat(bat, key, to_set_bl); + auto cf = db->get_cf_handle(prefix); + if (cf) { + put_bat(bat, cf, k, to_set_bl); + } else { + string key = combine_strings(prefix, k); + put_bat(bat, db->db->DefaultColumnFamily(), key, to_set_bl); + } } void RocksDBStore::RocksDBTransactionImpl::set( @@ -619,46 +791,81 @@ void RocksDBStore::RocksDBTransactionImpl::set( const char *k, size_t keylen, const bufferlist &to_set_bl) { - string key; - combine_strings(prefix, k, keylen, &key); - - put_bat(bat, key, to_set_bl); + auto cf = db->get_cf_handle(prefix); + if (cf) { + string key(k, keylen); // fixme? + put_bat(bat, cf, key, to_set_bl); + } else { + string key; + combine_strings(prefix, k, keylen, &key); + put_bat(bat, cf, key, to_set_bl); + } } void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix, const string &k) { - bat.Delete(combine_strings(prefix, k)); + auto cf = db->get_cf_handle(prefix); + if (cf) { + bat.Delete(cf, rocksdb::Slice(k)); + } else { + bat.Delete(combine_strings(prefix, k)); + } } void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix, const char *k, size_t keylen) { - string key; - combine_strings(prefix, k, keylen, &key); - bat.Delete(key); + auto cf = db->get_cf_handle(prefix); + if (cf) { + bat.Delete(cf, rocksdb::Slice(k, keylen)); + } else { + string key; + combine_strings(prefix, k, keylen, &key); + bat.Delete(rocksdb::Slice(key)); + } } void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix, const string &k) { - bat.SingleDelete(combine_strings(prefix, k)); + auto cf = db->get_cf_handle(prefix); + if (cf) { + bat.SingleDelete(cf, k); + } else { + bat.SingleDelete(combine_strings(prefix, k)); + } } void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix) { - if (db->enable_rmrange) { - string endprefix = prefix; - endprefix.push_back('\x01'); - bat.DeleteRange(combine_strings(prefix, string()), - combine_strings(endprefix, string())); + auto cf = db->get_cf_handle(prefix); + if (cf) { + if (db->enable_rmrange) { + string endprefix("\xff\xff\xff\xff"); // FIXME: this is cheating... + bat.DeleteRange(cf, string(), endprefix); + } else { + auto it = db->get_iterator(prefix); + for (it->seek_to_first(); + it->valid(); + it->next()) { + bat.Delete(cf, rocksdb::Slice(it->key())); + } + } } else { - KeyValueDB::Iterator it = db->get_iterator(prefix); - for (it->seek_to_first(); - it->valid(); - it->next()) { - bat.Delete(combine_strings(prefix, it->key())); + if (db->enable_rmrange) { + string endprefix = prefix; + endprefix.push_back('\x01'); + bat.DeleteRange(combine_strings(prefix, string()), + combine_strings(endprefix, string())); + } else { + auto it = db->get_iterator(prefix); + for (it->seek_to_first(); + it->valid(); + it->next()) { + bat.Delete(combine_strings(prefix, it->key())); + } } } } @@ -667,17 +874,37 @@ void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix, const string &start, const string &end) { - if (db->enable_rmrange) { - bat.DeleteRange(combine_strings(prefix, start), combine_strings(prefix, end)); + auto cf = db->get_cf_handle(prefix); + if (cf) { + if (db->enable_rmrange) { + bat.DeleteRange(cf, rocksdb::Slice(start), rocksdb::Slice(end)); + } else { + auto it = db->get_iterator(prefix); + it->lower_bound(start); + while (it->valid()) { + if (it->key() >= end) { + break; + } + bat.Delete(cf, rocksdb::Slice(it->key())); + it->next(); + } + } } else { - auto it = db->get_iterator(prefix); - it->lower_bound(start); - while (it->valid()) { - if (it->key() >= end) { - break; + if (db->enable_rmrange) { + bat.DeleteRange( + db->db->DefaultColumnFamily(), + rocksdb::Slice(combine_strings(prefix, start)), + rocksdb::Slice(combine_strings(prefix, end))); + } else { + auto it = db->get_iterator(prefix); + it->lower_bound(start); + while (it->valid()) { + if (it->key() >= end) { + break; + } + bat.Delete(combine_strings(prefix, it->key())); + it->next(); } - bat.Delete(combine_strings(prefix, it->key())); - it->next(); } } } @@ -687,40 +914,75 @@ void RocksDBStore::RocksDBTransactionImpl::merge( const string &k, const bufferlist &to_set_bl) { - string key = combine_strings(prefix, k); - - // bufferlist::c_str() is non-constant, so we can't call c_str() - if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) { - bat.Merge(rocksdb::Slice(key), - rocksdb::Slice(to_set_bl.buffers().front().c_str(), - to_set_bl.length())); + auto cf = db->get_cf_handle(prefix); + if (cf) { + // bufferlist::c_str() is non-constant, so we can't call c_str() + if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) { + bat.Merge( + cf, + rocksdb::Slice(k), + rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length())); + } else { + // make a copy + rocksdb::Slice key_slice(k); + vector value_slices(to_set_bl.buffers().size()); + bat.Merge(cf, rocksdb::SliceParts(&key_slice, 1), + prepare_sliceparts(to_set_bl, &value_slices)); + } } else { - // make a copy - rocksdb::Slice key_slice(key); - vector value_slices(to_set_bl.buffers().size()); - bat.Merge(nullptr, rocksdb::SliceParts(&key_slice, 1), - prepare_sliceparts(to_set_bl, &value_slices)); + string key = combine_strings(prefix, k); + // bufferlist::c_str() is non-constant, so we can't call c_str() + if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) { + bat.Merge( + db->db->DefaultColumnFamily(), + rocksdb::Slice(key), + rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length())); + } else { + // make a copy + rocksdb::Slice key_slice(key); + vector value_slices(to_set_bl.buffers().size()); + bat.Merge( + db->db->DefaultColumnFamily(), + rocksdb::SliceParts(&key_slice, 1), + prepare_sliceparts(to_set_bl, &value_slices)); + } } } -//gets will bypass RocksDB row cache, since it uses iterator int RocksDBStore::get( const string &prefix, const std::set &keys, std::map *out) { utime_t start = ceph_clock_now(); - for (std::set::const_iterator i = keys.begin(); - i != keys.end(); ++i) { - std::string value; - std::string bound = combine_strings(prefix, *i); - auto status = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(bound), &value); - if (status.ok()) { - (*out)[*i].append(value); - } else if (status.IsIOError()) { - ceph_abort_msg(cct, status.ToString()); + auto cf = get_cf_handle(prefix); + if (cf) { + for (auto& key : keys) { + std::string value; + auto status = db->Get(rocksdb::ReadOptions(), + cf, + rocksdb::Slice(key), + &value); + if (status.ok()) { + (*out)[key].append(value); + } else if (status.IsIOError()) { + ceph_abort_msg(cct, status.ToString()); + } + } + } else { + for (auto& key : keys) { + std::string value; + string k = combine_strings(prefix, key); + auto status = db->Get(rocksdb::ReadOptions(), + db->DefaultColumnFamily(), + rocksdb::Slice(k), + &value); + if (status.ok()) { + (*out)[key].append(value); + } else if (status.IsIOError()) { + ceph_abort_msg(cct, status.ToString()); + } } - } utime_t lat = ceph_clock_now() - start; logger->inc(l_rocksdb_gets); @@ -736,10 +998,21 @@ int RocksDBStore::get( assert(out && (out->length() == 0)); utime_t start = ceph_clock_now(); int r = 0; - string value, k; + string value; rocksdb::Status s; - k = combine_strings(prefix, key); - s = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(k), &value); + auto cf = get_cf_handle(prefix); + if (cf) { + s = db->Get(rocksdb::ReadOptions(), + cf, + rocksdb::Slice(key), + &value); + } else { + string k = combine_strings(prefix, key); + s = db->Get(rocksdb::ReadOptions(), + db->DefaultColumnFamily(), + rocksdb::Slice(k), + &value); + } if (s.ok()) { out->append(value); } else if (s.IsNotFound()) { @@ -762,10 +1035,22 @@ int RocksDBStore::get( assert(out && (out->length() == 0)); utime_t start = ceph_clock_now(); int r = 0; - string value, k; - combine_strings(prefix, key, keylen, &k); + string value; rocksdb::Status s; - s = db->Get(rocksdb::ReadOptions(), rocksdb::Slice(k), &value); + auto cf = get_cf_handle(prefix); + if (cf) { + s = db->Get(rocksdb::ReadOptions(), + cf, + rocksdb::Slice(key, keylen), + &value); + } else { + string k; + combine_strings(prefix, key, keylen, &k); + s = db->Get(rocksdb::ReadOptions(), + db->DefaultColumnFamily(), + rocksdb::Slice(k), + &value); + } if (s.ok()) { out->append(value); } else if (s.IsNotFound()) { @@ -882,6 +1167,7 @@ void RocksDBStore::compact_range(const string& start, const string& end) rocksdb::Slice cend(end); db->CompactRange(options, &cstart, &cend); } + RocksDBStore::RocksDBWholeSpaceIteratorImpl::~RocksDBWholeSpaceIteratorImpl() { delete dbiter; @@ -1017,3 +1303,80 @@ RocksDBStore::WholeSpaceIterator RocksDBStore::get_wholespace_iterator() db->NewIterator(rocksdb::ReadOptions())); } +class CFIteratorImpl : public KeyValueDB::IteratorImpl { +protected: + string prefix; + rocksdb::Iterator *dbiter; +public: + explicit CFIteratorImpl(const std::string& p, + rocksdb::Iterator *iter) + : prefix(p), dbiter(iter) { } + ~CFIteratorImpl() { + delete dbiter; + } + + int seek_to_first() override { + dbiter->SeekToFirst(); + return dbiter->status().ok() ? 0 : -1; + } + int seek_to_last() override { + dbiter->SeekToLast(); + return dbiter->status().ok() ? 0 : -1; + } + int upper_bound(const string &after) override { + lower_bound(after); + if (valid() && (key() == after)) { + next(); + } + return dbiter->status().ok() ? 0 : -1; + } + int lower_bound(const string &to) override { + rocksdb::Slice slice_bound(to); + dbiter->Seek(slice_bound); + return dbiter->status().ok() ? 0 : -1; + } + int next(bool validate=true) { + if (valid()) { + dbiter->Next(); + } + return dbiter->status().ok() ? 0 : -1; + } + int prev(bool validate=true) { + if (valid()) { + dbiter->Prev(); + } + return dbiter->status().ok() ? 0 : -1; + } + bool valid() override { + return dbiter->Valid(); + } + string key() override { + return dbiter->key().ToString(); + } + std::pair raw_key() { + return make_pair(prefix, key()); + } + bufferlist value() override { + return to_bufferlist(dbiter->value()); + } + bufferptr value_as_ptr() override { + rocksdb::Slice val = dbiter->value(); + return bufferptr(val.data(), val.size()); + } + int status() override { + return dbiter->status().ok() ? 0 : -1; + } +}; + +KeyValueDB::Iterator RocksDBStore::get_iterator(const std::string& prefix) +{ + rocksdb::ColumnFamilyHandle *cf_handle = + static_cast(get_cf_handle(prefix)); + if (cf_handle) { + return std::make_shared( + prefix, + db->NewIterator(rocksdb::ReadOptions(), cf_handle)); + } else { + return KeyValueDB::get_iterator(prefix); + } +} diff --git a/src/kv/RocksDBStore.h b/src/kv/RocksDBStore.h index e0b25229d5c..22e733e1f90 100644 --- a/src/kv/RocksDBStore.h +++ b/src/kv/RocksDBStore.h @@ -55,8 +55,11 @@ namespace rocksdb{ class WriteBatch; class Iterator; class Logger; + class ColumnFamilyHandle; struct Options; struct BlockBasedTableOptions; + struct DBOptions; + struct ColumnFamilyOptions; } extern rocksdb::Logger *create_rocksdb_ceph_logger(); @@ -79,7 +82,10 @@ class RocksDBStore : public KeyValueDB { bool set_cache_flag = false; int submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t); - int do_open(ostream &out, bool create_if_missing); + int install_cf_mergeop(const string &cf_name, rocksdb::ColumnFamilyOptions *cf_opt); + int create_db_dir(); + int do_open(ostream &out, bool create_if_missing, + const vector* cfs = nullptr); // manage async compactions Mutex compact_queue_lock; @@ -151,11 +157,24 @@ public: int open(ostream &out) override { return do_open(out, false); } + int open(ostream &out, const vector& cfs) override { + return do_open(out, false, &cfs); + } /// Creates underlying db if missing and opens it int create_and_open(ostream &out) override; + int create_and_open(ostream &out, + const vector& cfs) override; void close() override; + rocksdb::ColumnFamilyHandle *get_cf_handle(const std::string& cf_name) { + auto iter = cf_handles.find(cf_name); + if (iter == cf_handles.end()) + return nullptr; + else + return static_cast(iter->second); + } + void split_stats(const std::string &s, char delim, std::vector &elems); void get_statistics(Formatter *f) override; @@ -255,13 +274,19 @@ public: }; - class RocksDBTransactionImpl : public KeyValueDB::TransactionImpl { public: rocksdb::WriteBatch bat; RocksDBStore *db; explicit RocksDBTransactionImpl(RocksDBStore *_db); + private: + void put_bat( + rocksdb::WriteBatch& bat, + rocksdb::ColumnFamilyHandle *cf, + const string &k, + const bufferlist &to_set_bl); + public: void set( const string &prefix, const string &k, @@ -346,6 +371,8 @@ public: size_t value_size() override; }; + Iterator get_iterator(const std::string& prefix) override; + /// Utility static string combine_strings(const string &prefix, const string &value) { string out = prefix; diff --git a/src/test/objectstore/test_kv.cc b/src/test/objectstore/test_kv.cc index 46a8a8be32d..1d47b273753 100644 --- a/src/test/objectstore/test_kv.cc +++ b/src/test/objectstore/test_kv.cc @@ -35,6 +35,11 @@ public: KVTest() : db(0) {} + string _bl_to_str(bufferlist val) { + string str(val.c_str(), val.length()); + return str; + } + void rm_r(string path) { string cmd = string("rm -r ") + path; cout << "==> " << cmd << std::endl; @@ -199,7 +204,6 @@ struct AppendMOP : public KeyValueDB::MergeOperator { const char *ldata, size_t llen, const char *rdata, size_t rlen, std::string *new_value) override { - *new_value = std::string(ldata, llen) + std::string(rdata, rlen); } // We use each operator name and each prefix to construct the @@ -308,6 +312,160 @@ TEST_P(KVTest, RMRange) { fini(); } +TEST_P(KVTest, RocksDBColumnFamilyTest) { + if(string(GetParam()) != "rocksdb") + return; + + std::vector cfs; + cfs.push_back(KeyValueDB::ColumnFamily("cf1", "")); + cfs.push_back(KeyValueDB::ColumnFamily("cf2", "")); + ASSERT_EQ(0, db->init(g_conf->bluestore_rocksdb_options)); + cout << "creating two column families and opening them" << std::endl; + ASSERT_EQ(0, db->create_and_open(cout, cfs)); + { + KeyValueDB::Transaction t = db->get_transaction(); + bufferlist value; + value.append("value"); + cout << "write a transaction includes three keys in different CFs" << std::endl; + t->set("prefix", "key", value); + t->set("cf1", "key", value); + t->set("cf2", "key2", value); + ASSERT_EQ(0, db->submit_transaction_sync(t)); + } + fini(); + + init(); + ASSERT_EQ(0, db->open(cout, cfs)); + { + bufferlist v1, v2, v3; + cout << "reopen db and read those keys" << std::endl; + ASSERT_EQ(0, db->get("prefix", "key", &v1)); + ASSERT_EQ(0, _bl_to_str(v1) != "value"); + ASSERT_EQ(0, db->get("cf1", "key", &v2)); + ASSERT_EQ(0, _bl_to_str(v2) != "value"); + ASSERT_EQ(0, db->get("cf2", "key2", &v3)); + ASSERT_EQ(0, _bl_to_str(v2) != "value"); + } + { + cout << "delete two keys in CFs" << std::endl; + KeyValueDB::Transaction t = db->get_transaction(); + t->rmkey("prefix", "key"); + t->rmkey("cf2", "key2"); + ASSERT_EQ(0, db->submit_transaction_sync(t)); + } + fini(); + + init(); + ASSERT_EQ(0, db->open(cout, cfs)); + { + cout << "reopen db and read keys again." << std::endl; + bufferlist v1, v2, v3; + ASSERT_EQ(-ENOENT, db->get("prefix", "key", &v1)); + ASSERT_EQ(0, db->get("cf1", "key", &v2)); + ASSERT_EQ(0, _bl_to_str(v2) != "value"); + ASSERT_EQ(-ENOENT, db->get("cf2", "key2", &v3)); + } + fini(); +} + +TEST_P(KVTest, RocksDBIteratorTest) { + if(string(GetParam()) != "rocksdb") + return; + + std::vector cfs; + cfs.push_back(KeyValueDB::ColumnFamily("cf1", "")); + ASSERT_EQ(0, db->init(g_conf->bluestore_rocksdb_options)); + cout << "creating one column family and opening it" << std::endl; + ASSERT_EQ(0, db->create_and_open(cout, cfs)); + { + KeyValueDB::Transaction t = db->get_transaction(); + bufferlist bl1; + bl1.append("hello"); + bufferlist bl2; + bl2.append("world"); + cout << "write some kv pairs into default and new CFs" << std::endl; + t->set("prefix", "key1", bl1); + t->set("prefix", "key2", bl2); + t->set("cf1", "key1", bl1); + t->set("cf1", "key2", bl2); + ASSERT_EQ(0, db->submit_transaction_sync(t)); + } + { + cout << "iterating the default CF" << std::endl; + KeyValueDB::Iterator iter = db->get_iterator("prefix"); + iter->seek_to_first(); + ASSERT_EQ(1, iter->valid()); + ASSERT_EQ("key1", iter->key()); + ASSERT_EQ("hello", _bl_to_str(iter->value())); + ASSERT_EQ(0, iter->next()); + ASSERT_EQ(1, iter->valid()); + ASSERT_EQ("key2", iter->key()); + ASSERT_EQ("world", _bl_to_str(iter->value())); + } + { + cout << "iterating the new CF" << std::endl; + KeyValueDB::Iterator iter = db->get_iterator("cf1"); + iter->seek_to_first(); + ASSERT_EQ(1, iter->valid()); + ASSERT_EQ("key1", iter->key()); + ASSERT_EQ("hello", _bl_to_str(iter->value())); + ASSERT_EQ(0, iter->next()); + ASSERT_EQ(1, iter->valid()); + ASSERT_EQ("key2", iter->key()); + ASSERT_EQ("world", _bl_to_str(iter->value())); + } + fini(); +} + +TEST_P(KVTest, RocksDBCFMerge) { + if(string(GetParam()) != "rocksdb") + return; + + shared_ptr p(new AppendMOP); + int r = db->set_merge_operator("cf1",p); + if (r < 0) + return; // No merge operators for this database type + std::vector cfs; + cfs.push_back(KeyValueDB::ColumnFamily("cf1", "")); + ASSERT_EQ(0, db->init(g_conf->bluestore_rocksdb_options)); + cout << "creating one column family and opening it" << std::endl; + ASSERT_EQ(0, db->create_and_open(cout, cfs)); + + { + KeyValueDB::Transaction t = db->get_transaction(); + bufferlist v1, v2, v3; + v1.append(string("1")); + v2.append(string("2")); + v3.append(string("3")); + t->set("P", "K1", v1); + t->set("cf1", "A1", v2); + t->rmkey("cf1", "A2"); + t->merge("cf1", "A2", v3); + db->submit_transaction_sync(t); + } + { + bufferlist v1, v2, v3; + ASSERT_EQ(0, db->get("P", "K1", &v1)); + ASSERT_EQ(tostr(v1), "1"); + ASSERT_EQ(0, db->get("cf1", "A1", &v2)); + ASSERT_EQ(tostr(v2), "2"); + ASSERT_EQ(0, db->get("cf1", "A2", &v3)); + ASSERT_EQ(tostr(v3), "?3"); + } + { + KeyValueDB::Transaction t = db->get_transaction(); + bufferlist v1; + v1.append(string("1")); + t->merge("cf1", "A2", v1); + db->submit_transaction_sync(t); + } + { + bufferlist v; + ASSERT_EQ(0, db->get("cf1", "A2", &v)); + ASSERT_EQ(tostr(v), "?31"); + } + fini(); +} INSTANTIATE_TEST_CASE_P( KeyValueDB,