}
int RocksDBStore::prepare_for_reshard(const std::string& new_sharding,
- std::vector<std::string>& to_process_columns,
- std::vector<rocksdb::ColumnFamilyHandle*>& to_process_handles)
+ RocksDBStore::columns_t& to_process_columns)
{
//0. lock db from opening
//1. list existing columns
}
}
}
+ to_process_columns.emplace(full_name, cf);
}
//8. check if all cf_handles are filled
}
}
}
- to_process_columns = existing_columns;
- to_process_handles = handles;
return 0;
}
-int RocksDBStore::reshard_cleanup(const std::vector<std::string>& current_columns,
- const std::vector<rocksdb::ColumnFamilyHandle*>& current_handles)
+int RocksDBStore::reshard_cleanup(const RocksDBStore::columns_t& current_columns)
{
std::vector<std::string> new_sharding_columns;
for (const auto& [name, handle] : cf_handles) {
}
}
- for (size_t i = 0; i < current_columns.size(); i++) {
- bool found = false;
- for (size_t j = 0; j < new_sharding_columns.size(); j++) {
- if (current_columns[i] == new_sharding_columns[j]) {
- found = true;
- break;
- }
- }
- if (found || current_columns[i] == rocksdb::kDefaultColumnFamilyName) {
- dout(5) << "Column " << current_columns[i] << " is part of new sharding." << dendl;
+ for (auto& [name, handle] : current_columns) {
+ auto found = std::find(new_sharding_columns.begin(),
+ new_sharding_columns.end(),
+ name) != new_sharding_columns.end();
+ if (found || name == rocksdb::kDefaultColumnFamilyName) {
+ dout(5) << "Column " << name << " is part of new sharding." << dendl;
continue;
}
- dout(5) << "Column " << current_columns[i] << " not part of new sharding. Deleting." << dendl;
+ dout(5) << "Column " << name << " not part of new sharding. Deleting." << dendl;
// verify that column is empty
std::unique_ptr<rocksdb::Iterator> it{
- db->NewIterator(rocksdb::ReadOptions(), current_handles[i])};
+ db->NewIterator(rocksdb::ReadOptions(), handle)};
ceph_assert(it);
it->SeekToFirst();
ceph_assert(!it->Valid());
- if (rocksdb::Status status = db->DropColumnFamily(current_handles[i]);
- !status.ok()) {
- derr << __func__ << " Failed to drop column: "
- << current_columns[i] << dendl;
+ if (rocksdb::Status status = db->DropColumnFamily(handle); !status.ok()) {
+ derr << __func__ << " Failed to drop column: " << name << dendl;
return -EINVAL;
}
}
{
rocksdb::Status status;
int r;
- std::vector<std::string> to_process_columns;
- std::vector<rocksdb::ColumnFamilyHandle*> to_process_handles;
resharding_ctrl ctrl = ctrl_in ? *ctrl_in : resharding_ctrl();
size_t bytes_in_batch = 0;
return 0;
};
- r = prepare_for_reshard(new_sharding, to_process_columns, to_process_handles);
+ columns_t to_process_columns;
+ r = prepare_for_reshard(new_sharding, to_process_columns);
if (r != 0) {
dout(1) << "failed to prepare db for reshard" << dendl;
goto cleanup;
}
- ceph_assert(to_process_columns.size() == to_process_handles.size());
- for (size_t idx = 0; idx < to_process_columns.size(); idx++) {
- dout(5) << "Processing column=" << to_process_columns[idx] <<
- " handle=" << to_process_handles[idx] << dendl;
- if (to_process_columns[idx] == rocksdb::kDefaultColumnFamilyName) {
- ceph_assert(to_process_handles[idx] == default_cf);
+ for (auto& [name, handle] : to_process_columns) {
+ dout(5) << "Processing column=" << name
+ << " handle=" << handle << dendl;
+ if (name == rocksdb::kDefaultColumnFamilyName) {
+ ceph_assert(handle == default_cf);
r = process_column(default_cf, std::string());
} else {
- std::string fixed_prefix = to_process_columns[idx].substr(0, to_process_columns[idx].find('-'));
+ std::string fixed_prefix = name.substr(0, name.find('-'));
dout(10) << "Prefix: " << fixed_prefix << dendl;
- r = process_column(to_process_handles[idx], fixed_prefix);
+ r = process_column(handle, fixed_prefix);
}
if (r != 0) {
- derr << "Error processing column " << to_process_columns[idx] << dendl;
+ derr << "Error processing column " << name << dendl;
goto cleanup;
}
if (ctrl.unittest_fail_after_processing_column) {
}
}
- r = reshard_cleanup(to_process_columns, to_process_handles);
+ r = reshard_cleanup(to_process_columns);
if (r != 0) {
dout(5) << "failed to cleanup after reshard" << dendl;
goto cleanup;