dout(5) << "Column " << current_columns[i] << " not part of new sharding. Deleting." << dendl;
// verify that column is empty
- rocksdb::Iterator* it;
- it = db->NewIterator(rocksdb::ReadOptions(), current_handles[i]);
+ std::unique_ptr<rocksdb::Iterator> it{
+ db->NewIterator(rocksdb::ReadOptions(), current_handles[i])};
ceph_assert(it);
it->SeekToFirst();
ceph_assert(!it->Valid());
- delete it;
- rocksdb::Status status;
- status = db->DropColumnFamily(current_handles[i]);
- if (!status.ok()) {
- derr << __func__ << " Failed to delete column: "
+
+ if (rocksdb::Status status = db->DropColumnFamily(current_handles[i]);
+ !status.ok()) {
+ derr << __func__ << " Failed to drop column: "
<< current_columns[i] << dendl;
return -EINVAL;
}
size_t keys_processed = 0;
size_t keys_moved = 0;
- rocksdb::WriteBatch* bat = nullptr;
-
- auto flush_batch = [&]() {
+ auto flush_batch = [&](rocksdb::WriteBatch* batch) {
dout(10) << "flushing batch, " << keys_in_batch << " keys, for "
<< bytes_in_batch << " bytes" << dendl;
rocksdb::WriteOptions woptions;
woptions.sync = true;
- rocksdb::Status s = db->Write(woptions, bat);
+ rocksdb::Status s = db->Write(woptions, batch);
ceph_assert(s.ok());
bytes_in_batch = 0;
keys_in_batch = 0;
- delete bat;
- bat = new rocksdb::WriteBatch();
- ceph_assert(bat);
+ batch->Clear();
};
auto process_column = [&](rocksdb::ColumnFamilyHandle* handle,
const std::string& fixed_prefix)
{
- int r = 0;
dout(5) << " column=" << (void*)handle << " prefix=" << fixed_prefix << dendl;
- rocksdb::Iterator* it;
- it = db->NewIterator(rocksdb::ReadOptions(), handle);
+ std::unique_ptr<rocksdb::Iterator> it{
+ db->NewIterator(rocksdb::ReadOptions(), handle)};
ceph_assert(it);
- bat = new rocksdb::WriteBatch();
- ceph_assert(bat);
+ rocksdb::WriteBatch bat;
for (it->SeekToFirst(); it->Valid(); it->Next()) {
rocksdb::Slice raw_key = it->key();
dout(30) << "key=" << pretty_binary_string(raw_key.ToString()) << dendl;
bytes_per_iterator = 0;
keys_per_iterator = 0;
std::string raw_key_str = raw_key.ToString();
- delete it;
- it = db->NewIterator(rocksdb::ReadOptions(), handle);
+ it.reset(db->NewIterator(rocksdb::ReadOptions(), handle));
ceph_assert(it);
it->Seek(raw_key_str);
ceph_assert(it->Valid());
if ((keys_processed % 10000) == 0) {
dout(10) << "processed " << keys_processed << " keys, moved " << keys_moved << dendl;
}
- std::string new_raw_key;
rocksdb::ColumnFamilyHandle* new_handle = get_cf_handle(prefix, key);
if (new_handle == nullptr) {
new_handle = default_cf;
if (handle == new_handle) {
continue;
}
+ std::string new_raw_key;
if (new_handle == default_cf) {
new_raw_key = combine_strings(prefix, key);
} else {
new_raw_key = key;
}
- bat->Delete(handle, raw_key);
- bat->Put(new_handle, new_raw_key, value);
+ bat.Delete(handle, raw_key);
+ bat.Put(new_handle, new_raw_key, value);
dout(25) << "moving " << (void*)handle << "/" << pretty_binary_string(raw_key.ToString()) <<
" to " << (void*)new_handle << "/" << pretty_binary_string(new_raw_key) <<
" size " << value.size() << dendl;
//check if need to write batch
if (bytes_in_batch >= ctrl.bytes_per_batch ||
keys_in_batch >= ctrl.keys_per_batch) {
- flush_batch();
+ flush_batch(&bat);
if (ctrl.unittest_fail_after_first_batch) {
- r = -1000;
- goto out;
+ return -1000;
}
}
}
- flush_batch();
- out:
- delete it;
- delete bat;
- return r;
+ if (bat.Count() > 0) {
+ flush_batch(&bat);
+ }
+ return 0;
};
r = prepare_for_reshard(new_sharding, to_process_columns, to_process_handles);