Added possibility to control batch size and iterator refresh time for resharding process.
Replaced getenv() with new control for resharding unittests.
Signed-off-by: Adam Kupczyk <akupczyk@redhat.com>
| **ceph-bluestore-tool** bluefs-bdev-new-db --path *osd path* --dev-target *new-device*
| **ceph-bluestore-tool** bluefs-bdev-migrate --path *osd path* --dev-target *new-device* --devs-source *device1* [--devs-source *device2*]
| **ceph-bluestore-tool** free-dump|free-score --path *osd path* [ --allocator block/bluefs-wal/bluefs-db/bluefs-slow ]
+| **ceph-bluestore-tool** reshard --path *osd path* --sharding *new sharding* [ --sharding-ctrl *control string* ]
Description
Give a [0-1] number that represents quality of fragmentation in allocator.
0 represents case when all free space is in one chunk. 1 represents worst possible fragmentation.
+:command:`reshard` --path *osd path* --sharding *new sharding* [ --resharding-ctrl *control string* ]
+
+ Changes sharding of BlueStore's RocksDB. Sharding is build on top of RocksDB column families.
+ This option allows to test performance of *new sharding* without need to redeploy OSD.
+ Resharding is usually a long process, which involves walking through entire RocksDB key space
+ and moving some of them to different column families.
+ Option --resharding-ctrl provides performance control over resharding process.
+ Interrupted resharding will prevent OSD from running.
+ Interrupted resharding does not corrupt data. It is always possible to continue previous resharding,
+ or select any other sharding scheme, including reverting to original one.
+
Options
=======
Useful for *free-dump* and *free-score* actions. Selects allocator(s).
+.. option:: --resharding-ctrl *control string*
+
+ Provides control over resharding process. Specifies how often refresh RocksDB iterator,
+ and how large should commit batch be before committing to RocksDB. Option format is:
+ <iterator_refresh_bytes>/<iterator_refresh_keys>/<batch_commit_bytes>/<batch_commit_keys>
+ Default: 10000000/10000/1000000/1000
+
Device labels
=============
return 0;
}
-int RocksDBStore::reshard(const std::string& new_sharding)
+int RocksDBStore::reshard(const std::string& new_sharding, const RocksDBStore::resharding_ctrl* ctrl_in)
{
rocksdb::Status status;
int r;
std::vector<std::string> to_process_columns;
std::vector<rocksdb::ColumnFamilyHandle*> to_process_handles;
+ resharding_ctrl ctrl = ctrl_in ? *ctrl_in : resharding_ctrl();
size_t bytes_in_batch = 0;
size_t keys_in_batch = 0;
size_t bytes_per_iterator = 0;
rocksdb::WriteBatch* bat = nullptr;
- //check for injected unittest commands
- const char* unittest_str = getenv("RocksDBStore::reshard::unittest");
- size_t unittest_command = unittest_str ? atoi(unittest_str) : 0;
-
auto flush_batch = [&]() {
dout(10) << "flushing batch, " << keys_in_batch << " keys, for "
<< bytes_in_batch << " bytes" << dendl;
rocksdb::Slice raw_key = it->key();
dout(30) << "key=" << pretty_binary_string(raw_key.ToString()) << dendl;
//check if need to refresh iterator
- if (bytes_per_iterator >= 10000000 ||
- keys_per_iterator >= 10000) {
+ if (bytes_per_iterator >= ctrl.bytes_per_iterator ||
+ keys_per_iterator >= ctrl.keys_per_iterator) {
dout(8) << "refreshing iterator" << dendl;
bytes_per_iterator = 0;
keys_per_iterator = 0;
keys_per_iterator++;
//check if need to write batch
- if (bytes_in_batch >= 1000000 ||
- keys_in_batch >= 1000) {
+ if (bytes_in_batch >= ctrl.bytes_per_batch ||
+ keys_in_batch >= ctrl.keys_per_batch) {
flush_batch();
- if (unittest_command & 1) {
+ if (ctrl.unittest_fail_after_first_batch) {
r = -1000;
goto out;
}
derr << "Error processing column " << to_process_columns[idx] << dendl;
goto cleanup;
}
- if (unittest_command & 2) {
+ if (ctrl.unittest_fail_after_processing_column) {
r = -1001;
goto cleanup;
}
goto cleanup;
}
- if (unittest_command & 4) {
+ if (ctrl.unittest_fail_after_successful_processing) {
r = -1002;
goto cleanup;
}
int reshard_cleanup(const std::vector<std::string>& current_columns,
const std::vector<rocksdb::ColumnFamilyHandle*>& current_handles);
public:
- int reshard(const std::string& new_sharding);
+ struct resharding_ctrl {
+ size_t bytes_per_iterator = 10000000; /// amount of data to process before refreshing iterator
+ size_t keys_per_iterator = 10000;
+ size_t bytes_per_batch = 1000000; /// amount of data before submitting batch
+ size_t keys_per_batch = 1000;
+ bool unittest_fail_after_first_batch = false;
+ bool unittest_fail_after_processing_column = false;
+ bool unittest_fail_after_successful_processing = false;
+ };
+ int reshard(const std::string& new_sharding, const resharding_ctrl* ctrl = nullptr);
};
#endif
vector<string> allocs_name;
string empty_sharding(1, '\0');
string new_sharding = empty_sharding;
+ string resharding_ctrl;
int log_level = 30;
bool fsck_deep = false;
po::options_description po_options("Options");
("value,v", po::value<string>(&value), "label metadata value")
("allocator", po::value<vector<string>>(&allocs_name), "allocator to inspect: 'block'/'bluefs-wal'/'bluefs-db'/'bluefs-slow'")
("sharding", po::value<string>(&new_sharding), "new sharding to apply")
+ ("resharding-ctrl", po::value<string>(&resharding_ctrl), "gives control over resharding procedure details")
;
po::options_description po_positional("Positional options");
po_positional.add_options()
cout << std::string(out.c_str(), out.length()) << std::endl;
bluestore.cold_close();
} else if (action == "reshard") {
+ auto get_ctrl = [&](size_t& val) {
+ if (!resharding_ctrl.empty()) {
+ size_t pos;
+ std::string token;
+ pos = resharding_ctrl.find('/');
+ token = resharding_ctrl.substr(0, pos);
+ if (pos != std::string::npos)
+ resharding_ctrl.erase(0, pos + 1);
+ else
+ resharding_ctrl.erase();
+ char* endptr;
+ val = strtoll(token.c_str(), &endptr, 0);
+ if (*endptr != '\0') {
+ cerr << "invalid --resharding-ctrl. '" << token << "' is not a number" << std::endl;
+ exit(EXIT_FAILURE);
+ }
+ }
+ };
BlueStore bluestore(cct.get(), path);
KeyValueDB *db_ptr;
+ RocksDBStore::resharding_ctrl ctrl;
+ if (!resharding_ctrl.empty()) {
+ get_ctrl(ctrl.bytes_per_iterator);
+ get_ctrl(ctrl.keys_per_iterator);
+ get_ctrl(ctrl.bytes_per_batch);
+ get_ctrl(ctrl.keys_per_batch);
+ if (!resharding_ctrl.empty()) {
+ cerr << "extra chars in --resharding-ctrl" << std::endl;
+ exit(EXIT_FAILURE);
+ }
+ }
int r = bluestore.open_db_environment(&db_ptr);
if (r < 0) {
cerr << "error preparing db environment: " << cpp_strerror(r) << std::endl;
RocksDBStore* rocks_db = dynamic_cast<RocksDBStore*>(db_ptr);
ceph_assert(db_ptr);
ceph_assert(rocks_db);
- r = rocks_db->reshard(new_sharding);
+ r = rocks_db->reshard(new_sharding, &ctrl);
if (r < 0) {
cerr << "error resharding: " << cpp_strerror(r) << std::endl;
} else {
data_to_db();
check_db();
db->close();
- setenv("RocksDBStore::reshard::unittest", "1", 1);
- ASSERT_EQ(db->reshard("Evade(4)"), -1000);
- unsetenv("RocksDBStore::reshard::unittest");
+ RocksDBStore::resharding_ctrl ctrl;
+ ctrl.unittest_fail_after_first_batch = true;
+ ASSERT_EQ(db->reshard("Evade(4)", &ctrl), -1000);
ASSERT_NE(db->open(cout), 0);
ASSERT_EQ(db->reshard("Evade(4)"), 0);
ASSERT_EQ(db->open(cout), 0);
data_to_db();
check_db();
db->close();
- setenv("RocksDBStore::reshard::unittest", "2", 1);
- ASSERT_EQ(db->reshard("Evade(4)"), -1001);
- unsetenv("RocksDBStore::reshard::unittest");
+ RocksDBStore::resharding_ctrl ctrl;
+ ctrl.unittest_fail_after_processing_column = true;
+ ASSERT_EQ(db->reshard("Evade(4)", &ctrl), -1001);
ASSERT_NE(db->open(cout), 0);
ASSERT_EQ(db->reshard("Evade(4)"), 0);
ASSERT_EQ(db->open(cout), 0);
generate_data();
data_to_db();
check_db();
- // ASSERT_EQ(check_db_expect_difference(), true);
db->close();
- setenv("RocksDBStore::reshard::unittest", "4", 1);
- ASSERT_EQ(db->reshard("Evade(4)"), -1002);
- unsetenv("RocksDBStore::reshard::unittest");
+ RocksDBStore::resharding_ctrl ctrl;
+ ctrl.unittest_fail_after_successful_processing = true;
+ ASSERT_EQ(db->reshard("Evade(4)", &ctrl), -1002);
ASSERT_NE(db->open(cout), 0);
ASSERT_EQ(db->reshard("Evade(4)"), 0);
ASSERT_EQ(db->open(cout), 0);
data_to_db();
check_db();
db->close();
- setenv("RocksDBStore::reshard::unittest", "4", 1);
- ASSERT_EQ(db->reshard("Evade(4,0-8)"), -1002);
- unsetenv("RocksDBStore::reshard::unittest");
+ RocksDBStore::resharding_ctrl ctrl;
+ ctrl.unittest_fail_after_successful_processing = true;
+ ASSERT_EQ(db->reshard("Evade(4,0-8)", &ctrl), -1002);
ASSERT_NE(db->open(cout), 0);
ASSERT_EQ(db->reshard("Evade(4,0-8)"), 0);
ASSERT_EQ(db->open(cout), 0);
data_to_db();
check_db();
db->close();
- setenv("RocksDBStore::reshard::unittest", "1", 1);
- ASSERT_EQ(db->reshard("C(5) D(3)"), -1000);
+ RocksDBStore::resharding_ctrl ctrl;
+ ctrl.unittest_fail_after_first_batch = true;
+ ASSERT_EQ(db->reshard("C(5) D(3)", &ctrl), -1000);
ASSERT_NE(db->open(cout), 0);
- setenv("RocksDBStore::reshard::unittest", "2", 1);
- ASSERT_EQ(db->reshard("C(5) Evade(2)"), -1001);
+ ctrl.unittest_fail_after_first_batch = false;
+ ctrl.unittest_fail_after_processing_column = true;
+ ASSERT_EQ(db->reshard("C(5) Evade(2)", &ctrl), -1001);
ASSERT_NE(db->open(cout), 0);
- setenv("RocksDBStore::reshard::unittest", "4", 1);
- ASSERT_EQ(db->reshard("Evade(2) D(3)"), -1002);
+ ctrl.unittest_fail_after_processing_column = false;
+ ctrl.unittest_fail_after_successful_processing = true;
+ ASSERT_EQ(db->reshard("Evade(2) D(3)", &ctrl), -1002);
ASSERT_NE(db->open(cout), 0);
- unsetenv("RocksDBStore::reshard::unittest");
ASSERT_EQ(db->reshard("Ad(1) Evade(5)"), 0);
ASSERT_EQ(db->open(cout), 0);
check_db();