rocksdb::WriteBatch* bat = nullptr;
+ //check for injected unittest commands
+ const char* unittest_str = getenv("RocksDBStore::reshard::unittest");
+ size_t unittest_command = unittest_str ? atoi(unittest_str) : 0;
+
auto flush_batch = [&]() {
dout(10) << "flushing batch" << dendl;
rocksdb::WriteOptions woptions;
auto process_column = [&](rocksdb::ColumnFamilyHandle* handle,
const std::string& fixed_prefix)
{
+ int r = 0;
dout(10) << " column=" << (void*)handle << " prefix=" << fixed_prefix << dendl;
rocksdb::Iterator* it;
it = db->NewIterator(rocksdb::ReadOptions(), handle);
rocksdb::Slice raw_key = it->key();
dout(30) << "key=" << pretty_binary_string(raw_key.ToString()) << dendl;
//check if need to refresh iterator
- if (bytes_per_iterator > 10000000 ||
- keys_per_iterator > 10000) {
+ if (bytes_per_iterator >= 10000000 ||
+ keys_per_iterator >= 10000) {
dout(10) << "refreshing iterator" << dendl;
bytes_per_iterator = 0;
keys_per_iterator = 0;
keys_per_iterator ++;
//check if need to write batch
- if (bytes_in_batch > 1000000 ||
- keys_in_batch > 1000) {
+ if (bytes_in_batch >= 1000000 ||
+ keys_in_batch >= 1000) {
flush_batch();
+ if (unittest_command & 1) {
+ r = -1000;
+ goto out;
+ }
}
}
flush_batch();
+ out:
delete it;
delete bat;
+ return r;
};
r = prepare_for_reshard(new_sharding, to_process_columns, to_process_handles);
" handle=" << to_process_handles[idx] << dendl;
if (to_process_columns[idx] == rocksdb::kDefaultColumnFamilyName) {
ceph_assert(to_process_handles[idx] == default_cf);
- process_column(default_cf, std::string());
+ r = process_column(default_cf, std::string());
} else {
std::string fixed_prefix = to_process_columns[idx].substr(0, to_process_columns[idx].find('-'));
dout(10) << "Prefix: " << fixed_prefix << dendl;
- process_column(to_process_handles[idx], fixed_prefix);
+ r = process_column(to_process_handles[idx], fixed_prefix);
+ }
+ if (r != 0) {
+ derr << "Error processing column " << to_process_columns[idx] << dendl;
+ goto cleanup;
+ }
+ if (unittest_command & 2) {
+ r = -1001;
+ goto cleanup;
}
}
dout(5) << "failed to cleanup after reshard" << dendl;
goto cleanup;
}
+
+ if (unittest_command & 4) {
+ r = -1002;
+ goto cleanup;
+ }
env->CreateDir(sharding_def_dir);
status = rocksdb::WriteStringToFile(env, new_sharding,
sharding_def_file, true);