From: Sage Weil Date: Thu, 5 Oct 2017 13:31:04 +0000 (-0500) Subject: kv/RocksDBStore: fix default CF handling X-Git-Tag: v13.0.1~642^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F18049%2Fhead;p=ceph.git kv/RocksDBStore: fix default CF handling We need to close the default CF handle *only* if we explicitly opened it. Also, take care to use that handle wherever we can. This is mostly paranoia--I'm not sure if rocksdb cares if you mix the handle you opened with DefaultColumnFamily(), which is different. Signed-off-by: Sage Weil --- diff --git a/src/kv/RocksDBStore.cc b/src/kv/RocksDBStore.cc index d4d9c45845bd..49f903dbbda2 100644 --- a/src/kv/RocksDBStore.cc +++ b/src/kv/RocksDBStore.cc @@ -476,6 +476,7 @@ int RocksDBStore::do_open(ostream &out, bool create_if_missing, add_column_family(p.name, static_cast(cf)); } } + default_cf = db->DefaultColumnFamily(); } else { std::vector existing_cfs; status = rocksdb::DB::ListColumnFamilies( @@ -490,6 +491,7 @@ int RocksDBStore::do_open(ostream &out, bool create_if_missing, derr << status.ToString() << dendl; return -EINVAL; } + default_cf = db->DefaultColumnFamily(); } else { // we cannot change column families for a created database. so, map // what options we are given to whatever cf's already exist. @@ -530,12 +532,16 @@ int RocksDBStore::do_open(ostream &out, bool create_if_missing, return -EINVAL; } for (unsigned i = 0; i < existing_cfs.size(); ++i) { - if (existing_cfs[i] != rocksdb::kDefaultColumnFamilyName) { + if (existing_cfs[i] == rocksdb::kDefaultColumnFamilyName) { + default_cf = handles[i]; + must_close_default_cf = true; + } else { add_column_family(existing_cfs[i], static_cast(handles[i])); } } } } + assert(default_cf != nullptr); PerfCountersBuilder plb(g_ceph_context, "rocksdb", l_rocksdb_first, l_rocksdb_last); plb.add_u64_counter(l_rocksdb_gets, "get", "Gets"); @@ -586,6 +592,11 @@ RocksDBStore::~RocksDBStore() static_cast(p.second)); p.second = nullptr; } + if (must_close_default_cf) { + db->DestroyColumnFamilyHandle(default_cf); + must_close_default_cf = false; + } + default_cf = nullptr; delete db; db = nullptr; @@ -785,7 +796,7 @@ void RocksDBStore::RocksDBTransactionImpl::set( put_bat(bat, cf, k, to_set_bl); } else { string key = combine_strings(prefix, k); - put_bat(bat, db->db->DefaultColumnFamily(), key, to_set_bl); + put_bat(bat, db->default_cf, key, to_set_bl); } } @@ -812,7 +823,7 @@ void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix, if (cf) { bat.Delete(cf, rocksdb::Slice(k)); } else { - bat.Delete(combine_strings(prefix, k)); + bat.Delete(db->default_cf, combine_strings(prefix, k)); } } @@ -826,7 +837,7 @@ void RocksDBStore::RocksDBTransactionImpl::rmkey(const string &prefix, } else { string key; combine_strings(prefix, k, keylen, &key); - bat.Delete(rocksdb::Slice(key)); + bat.Delete(db->default_cf, rocksdb::Slice(key)); } } @@ -837,7 +848,7 @@ void RocksDBStore::RocksDBTransactionImpl::rm_single_key(const string &prefix, if (cf) { bat.SingleDelete(cf, k); } else { - bat.SingleDelete(combine_strings(prefix, k)); + bat.SingleDelete(db->default_cf, combine_strings(prefix, k)); } } @@ -860,14 +871,15 @@ void RocksDBStore::RocksDBTransactionImpl::rmkeys_by_prefix(const string &prefix if (db->enable_rmrange) { string endprefix = prefix; endprefix.push_back('\x01'); - bat.DeleteRange(combine_strings(prefix, string()), + bat.DeleteRange(db->default_cf, + combine_strings(prefix, string()), combine_strings(endprefix, string())); } else { auto it = db->get_iterator(prefix); for (it->seek_to_first(); it->valid(); it->next()) { - bat.Delete(combine_strings(prefix, it->key())); + bat.Delete(db->default_cf, combine_strings(prefix, it->key())); } } } @@ -895,7 +907,7 @@ void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix, } else { if (db->enable_rmrange) { bat.DeleteRange( - db->db->DefaultColumnFamily(), + db->default_cf, rocksdb::Slice(combine_strings(prefix, start)), rocksdb::Slice(combine_strings(prefix, end))); } else { @@ -905,7 +917,8 @@ void RocksDBStore::RocksDBTransactionImpl::rm_range_keys(const string &prefix, if (it->key() >= end) { break; } - bat.Delete(combine_strings(prefix, it->key())); + bat.Delete(db->default_cf, + combine_strings(prefix, it->key())); it->next(); } } @@ -937,7 +950,7 @@ void RocksDBStore::RocksDBTransactionImpl::merge( // bufferlist::c_str() is non-constant, so we can't call c_str() if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) { bat.Merge( - db->db->DefaultColumnFamily(), + db->default_cf, rocksdb::Slice(key), rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length())); } else { @@ -945,7 +958,7 @@ void RocksDBStore::RocksDBTransactionImpl::merge( rocksdb::Slice key_slice(key); vector value_slices(to_set_bl.buffers().size()); bat.Merge( - db->db->DefaultColumnFamily(), + db->default_cf, rocksdb::SliceParts(&key_slice, 1), prepare_sliceparts(to_set_bl, &value_slices)); } @@ -977,7 +990,7 @@ int RocksDBStore::get( std::string value; string k = combine_strings(prefix, key); auto status = db->Get(rocksdb::ReadOptions(), - db->DefaultColumnFamily(), + default_cf, rocksdb::Slice(k), &value); if (status.ok()) { @@ -1012,7 +1025,7 @@ int RocksDBStore::get( } else { string k = combine_strings(prefix, key); s = db->Get(rocksdb::ReadOptions(), - db->DefaultColumnFamily(), + default_cf, rocksdb::Slice(k), &value); } @@ -1050,7 +1063,7 @@ int RocksDBStore::get( string k; combine_strings(prefix, key, keylen, &k); s = db->Get(rocksdb::ReadOptions(), - db->DefaultColumnFamily(), + default_cf, rocksdb::Slice(k), &value); } @@ -1091,7 +1104,13 @@ void RocksDBStore::compact() { logger->inc(l_rocksdb_compact); rocksdb::CompactRangeOptions options; - db->CompactRange(options, nullptr, nullptr); + db->CompactRange(options, default_cf, nullptr, nullptr); + for (auto cf : cf_handles) { + db->CompactRange( + options, + static_cast(cf.second), + nullptr, nullptr); + } } @@ -1303,7 +1322,7 @@ string RocksDBStore::past_prefix(const string &prefix) RocksDBStore::WholeSpaceIterator RocksDBStore::get_wholespace_iterator() { return std::make_shared( - db->NewIterator(rocksdb::ReadOptions())); + db->NewIterator(rocksdb::ReadOptions(), default_cf)); } class CFIteratorImpl : public KeyValueDB::IteratorImpl { diff --git a/src/kv/RocksDBStore.h b/src/kv/RocksDBStore.h index 031f8a6d2147..a552fef01781 100644 --- a/src/kv/RocksDBStore.h +++ b/src/kv/RocksDBStore.h @@ -81,6 +81,9 @@ class RocksDBStore : public KeyValueDB { uint64_t cache_size = 0; bool set_cache_flag = false; + bool must_close_default_cf = false; + rocksdb::ColumnFamilyHandle *default_cf = nullptr; + int submit_common(rocksdb::WriteOptions& woptions, KeyValueDB::Transaction t); int install_cf_mergeop(const string &cf_name, rocksdb::ColumnFamilyOptions *cf_opt); int create_db_dir();