add_column_family(p.name, static_cast<void*>(cf));
}
}
+ default_cf = db->DefaultColumnFamily();
} else {
std::vector<string> existing_cfs;
status = rocksdb::DB::ListColumnFamilies(
derr << status.ToString() << dendl;
return -EINVAL;
}
+ default_cf = db->DefaultColumnFamily();
} else {
// we cannot change column families for a created database. so, map
// what options we are given to whatever cf's already exist.
return -EINVAL;
}
for (unsigned i = 0; i < existing_cfs.size(); ++i) {
- if (existing_cfs[i] != rocksdb::kDefaultColumnFamilyName) {
+ if (existing_cfs[i] == rocksdb::kDefaultColumnFamilyName) {
+ default_cf = handles[i];
+ must_close_default_cf = true;
+ } else {
add_column_family(existing_cfs[i], static_cast<void*>(handles[i]));
}
}
}
}
+ assert(default_cf != nullptr);
PerfCountersBuilder plb(g_ceph_context, "rocksdb", l_rocksdb_first, l_rocksdb_last);
plb.add_u64_counter(l_rocksdb_gets, "get", "Gets");
static_cast<rocksdb::ColumnFamilyHandle*>(p.second));
p.second = nullptr;
}
+ if (must_close_default_cf) {
+ db->DestroyColumnFamilyHandle(default_cf);
+ must_close_default_cf = false;
+ }
+ default_cf = nullptr;
delete db;
db = nullptr;
put_bat(bat, cf, k, to_set_bl);
} else {
string key = combine_strings(prefix, k);
- put_bat(bat, db->db->DefaultColumnFamily(), key, to_set_bl);
+ put_bat(bat, db->default_cf, key, to_set_bl);
}
}
if (cf) {
bat.Delete(cf, rocksdb::Slice(k));
} else {
- bat.Delete(combine_strings(prefix, k));
+ bat.Delete(db->default_cf, combine_strings(prefix, k));
}
}
} else {
string key;
combine_strings(prefix, k, keylen, &key);
- bat.Delete(rocksdb::Slice(key));
+ bat.Delete(db->default_cf, rocksdb::Slice(key));
}
}
if (cf) {
bat.SingleDelete(cf, k);
} else {
- bat.SingleDelete(combine_strings(prefix, k));
+ bat.SingleDelete(db->default_cf, combine_strings(prefix, k));
}
}
if (db->enable_rmrange) {
string endprefix = prefix;
endprefix.push_back('\x01');
- bat.DeleteRange(combine_strings(prefix, string()),
+ bat.DeleteRange(db->default_cf,
+ combine_strings(prefix, string()),
combine_strings(endprefix, string()));
} else {
auto it = db->get_iterator(prefix);
for (it->seek_to_first();
it->valid();
it->next()) {
- bat.Delete(combine_strings(prefix, it->key()));
+ bat.Delete(db->default_cf, combine_strings(prefix, it->key()));
}
}
}
} else {
if (db->enable_rmrange) {
bat.DeleteRange(
- db->db->DefaultColumnFamily(),
+ db->default_cf,
rocksdb::Slice(combine_strings(prefix, start)),
rocksdb::Slice(combine_strings(prefix, end)));
} else {
if (it->key() >= end) {
break;
}
- bat.Delete(combine_strings(prefix, it->key()));
+ bat.Delete(db->default_cf,
+ combine_strings(prefix, it->key()));
it->next();
}
}
// bufferlist::c_str() is non-constant, so we can't call c_str()
if (to_set_bl.is_contiguous() && to_set_bl.length() > 0) {
bat.Merge(
- db->db->DefaultColumnFamily(),
+ db->default_cf,
rocksdb::Slice(key),
rocksdb::Slice(to_set_bl.buffers().front().c_str(), to_set_bl.length()));
} else {
rocksdb::Slice key_slice(key);
vector<rocksdb::Slice> value_slices(to_set_bl.buffers().size());
bat.Merge(
- db->db->DefaultColumnFamily(),
+ db->default_cf,
rocksdb::SliceParts(&key_slice, 1),
prepare_sliceparts(to_set_bl, &value_slices));
}
std::string value;
string k = combine_strings(prefix, key);
auto status = db->Get(rocksdb::ReadOptions(),
- db->DefaultColumnFamily(),
+ default_cf,
rocksdb::Slice(k),
&value);
if (status.ok()) {
} else {
string k = combine_strings(prefix, key);
s = db->Get(rocksdb::ReadOptions(),
- db->DefaultColumnFamily(),
+ default_cf,
rocksdb::Slice(k),
&value);
}
string k;
combine_strings(prefix, key, keylen, &k);
s = db->Get(rocksdb::ReadOptions(),
- db->DefaultColumnFamily(),
+ default_cf,
rocksdb::Slice(k),
&value);
}
{
logger->inc(l_rocksdb_compact);
rocksdb::CompactRangeOptions options;
- db->CompactRange(options, nullptr, nullptr);
+ db->CompactRange(options, default_cf, nullptr, nullptr);
+ for (auto cf : cf_handles) {
+ db->CompactRange(
+ options,
+ static_cast<rocksdb::ColumnFamilyHandle*>(cf.second),
+ nullptr, nullptr);
+ }
}
RocksDBStore::WholeSpaceIterator RocksDBStore::get_wholespace_iterator()
{
return std::make_shared<RocksDBWholeSpaceIteratorImpl>(
- db->NewIterator(rocksdb::ReadOptions()));
+ db->NewIterator(rocksdb::ReadOptions(), default_cf));
}
class CFIteratorImpl : public KeyValueDB::IteratorImpl {