From: Samuel Just Date: Tue, 12 Aug 2025 00:36:16 +0000 (-0700) Subject: crimson/.../store-bench: refactor arguments and workloads X-Git-Tag: testing/wip-vshankar-testing-20250825.134853-debug~27^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=d4eac2723c8109a33ae5e0f1bbbc1eba398520c3;p=ceph-ci.git crimson/.../store-bench: refactor arguments and workloads - Adds a workload abstraction grouping arguments with associated workload. - Reworks argument parsing to occur prior to seastar app, allows passing unparsed arguments to ceph. - Refactors time usages as necessary to use std::chrono types. - Removes LOG_PREFIX usages that don't currently have log lines. - Other minor cleanups not worth separating out. Signed-off-by: Samuel Just --- diff --git a/src/crimson/tools/store_bench/store-bench.cc b/src/crimson/tools/store_bench/store-bench.cc index e41b3877203..346e54f44d1 100644 --- a/src/crimson/tools/store_bench/store-bench.cc +++ b/src/crimson/tools/store_bench/store-bench.cc @@ -49,10 +49,135 @@ namespace po = boost::program_options; +using namespace std::literals; using namespace ceph; SET_SUBSYS(osd); +/** + * The struct stores the number of operations and the total latency for all + * these operations For the pg workload type write+delete increases the number + * of operations by 1 For the rgw index workload, each write increases the + * num_operations by 1 Each delete increases the number of operations by 1 + */ +struct results_t { + int num_operations = 0; + std::chrono::duration tot_latency_sec = 0s; + std::chrono::duration duration = 0s; + + results_t &operator += (const results_t &other_result) { + num_operations += other_result.num_operations; + tot_latency_sec += other_result.tot_latency_sec; + return *this; + } + + void dump(ceph::Formatter *f) const { + f->open_object_section("results_t"); + f->dump_int("number of operations done", num_operations); + f->dump_float( + "the total latency aka time for these operations", + tot_latency_sec.count()); + f->dump_float("the time the workload took to run is ", + duration.count()); + f->close_section(); + } +}; + +struct common_options_t { +private: + unsigned duration = 0; +public: + unsigned num_concurrent_io = 16; + std::chrono::duration get_duration() const { + return std::chrono::seconds(duration); + } + + po::options_description get_options() { + po::options_description ret{"Common Options"}; + ret.add_options() + ("num-concurrent-io", po::value(&num_concurrent_io), + "number of IOs happening simultaneously") + ("duration", po::value(&duration)->required(), + "how long in seconds the actual testing loop runs " + "for") + ; + return ret; + } +}; + +class StoreBenchWorkload { +public: + virtual po::options_description get_options() = 0; + virtual seastar::future<> run( + const common_options_t &common, + crimson::os::FuturizedStore &global_store) = 0; + virtual ~StoreBenchWorkload() {} +}; + + +class PGLogWorkload : public StoreBenchWorkload { + unsigned num_logs = 4; + unsigned log_size = 1024; + unsigned log_length = 256; + +public: + po::options_description get_options() final { + po::options_description ret{"PGLogWorkload"}; + ret.add_options() + ("num-logs", po::value(&num_logs), + "how many different logs we create; aka, we create a log for every " + "object,so how many objects we create") + ("log-length", po::value(&log_length), + "number of entries per log") + ("log-size", po::value(&log_size), + "size of each log entry") + ; + return ret; + } + seastar::future<> run( + const common_options_t &common, + crimson::os::FuturizedStore &global_store) final; + ~PGLogWorkload() final {} +}; + +class RGWIndexWorkload : public StoreBenchWorkload { + unsigned num_indices = 16; + uint64_t key_size = 1024; + uint64_t value_size = 1024; + unsigned target_keys_per_bucket = 256; + unsigned tolerance_range = 10; + unsigned num_buckets_per_collection = 16; + +public: + po::options_description options{"RGWIndexWorkload"}; + + po::options_description get_options() final { + po::options_description ret{"PGLogWorkload"}; + ret.add_options() + ("num_indices", po::value(&num_indices), + "number of RGW indices/buckets") + ("key_size", po::value(&key_size), + "size of keys in bytes") + ("value_size", po::value(&value_size), + "size of values in bytes") + ("target_keys_per_bucket", + po::value(&target_keys_per_bucket), + "target number of keys per bucket") + ("tolerance_range", + po::value(&tolerance_range), + "tolerance range percentage") + ("num_buckets_per_collection", + po::value(&num_buckets_per_collection), + "the number of objects in each collection ") + ; + return ret; + } + seastar::future<> run( + const common_options_t &common, + crimson::os::FuturizedStore &global_store) final; + ~RGWIndexWorkload() final {} +}; + /** * These are functions that are used in both types of work_loads */ @@ -85,36 +210,6 @@ coll_t make_cid(int obj_id, int num_objects_per_collection) { return coll_t(spg_t(pg_t(pg_id, 0))); } -/** - * The struct stores the number of operations and the total latency for all - * these operations For the pg workload type write+delete increases the number - * of operations by 1 For the rgw index workload, each write increases the - * num_operations by 1 Each delete increases the number of operations by 1 - */ -struct results_t { - int num_operations = 0; - std::chrono::duration tot_latency_sec = - std::chrono::duration(0.0); - int duration = 0; - - results_t &operator += (const results_t &other_result) { - num_operations += other_result.num_operations; - tot_latency_sec += other_result.tot_latency_sec; - return *this; - } - - void dump(ceph::Formatter *f) const { - f->open_object_section("results_t"); - f->dump_int("number of operations done", num_operations); - f->dump_float( - "the total latency aka time for these operations", - tot_latency_sec.count()); - f->dump_int("the time the workload took to run is ", - duration); - f->close_section(); - } -}; - /** * This function is used to run a workload num_con_io times concurrently * the parallel_for_each loop launches k co_routines @@ -123,9 +218,10 @@ struct results_t { * added to the vector so all_io_res containes resolved futures */ seastar::future -run_concurrent_ios(int duration, int num_concurrent_io, - std::function()> work_load) { - LOG_PREFIX(run_concurrent_ios); +run_concurrent_ios( + std::chrono::duration duration, + int num_concurrent_io, + std::function()> work_load) { std::vector container_io; std::vector all_io_res; for (int i = 0; i < num_concurrent_io; ++i) { @@ -138,8 +234,11 @@ run_concurrent_ios(int duration, int num_concurrent_io, co_return; }))); - results_t total_result_all_io = {0, std::chrono::duration(0.0), - duration}; + results_t total_result_all_io = { + 0, + std::chrono::duration(0.0), + duration + }; for (const auto res : all_io_res) { total_result_all_io += res; } @@ -159,10 +258,10 @@ run_concurrent_ios(int duration, int num_concurrent_io, * (b) writing and removing logs ,this is considered 1 I/O * (c) doing N I/o's concurrently on 1 thread */ -seastar::future<> pg_log_workload(crimson::os::FuturizedStore &global_store, - int num_logs, int num_concurrent_io, - int duration, int log_size, int log_length) { - LOG_PREFIX(pg_log_workload); +seastar::future<> PGLogWorkload::run( + const common_options_t &common, + crimson::os::FuturizedStore &global_store) +{ auto &local_store = global_store.get_sharded_store(); std::map collection_id; @@ -176,14 +275,14 @@ seastar::future<> pg_log_workload(crimson::os::FuturizedStore &global_store, */ auto pre_fill_logs = [&]() -> seastar::future<> { - for (int i = 0; i < num_logs; ++i) { + for (unsigned i = 0; i < num_logs; ++i) { auto obj_i = create_hobj(i); auto coll_id = make_cid(i, 1); collection_id[i] = coll_id; auto coll_ref = co_await local_store.create_new_collection(coll_id); coll_ref_map[i] = coll_ref; std::map data; - for (int j = 0; j < log_length; ++j) { + for (unsigned j = 0; j < log_length; ++j) { std::string key = std::to_string(j); bufferlist bl_value; bl_value.append_zero(log_size); @@ -215,8 +314,7 @@ seastar::future<> pg_log_workload(crimson::os::FuturizedStore &global_store, std::chrono::duration(0.0); auto start = ceph::mono_clock::now(); - while (ceph::mono_clock::now() - start <= - (std::chrono::seconds(duration))) { // duration is an int + while (ceph::mono_clock::now() - start <= common.get_duration()) { int obj_num = std::rand() % num_logs; auto object = create_hobj(obj_num); @@ -249,10 +347,11 @@ seastar::future<> pg_log_workload(crimson::os::FuturizedStore &global_store, tot_latency += time_sec; num_ops++; } - co_return results_t{num_ops, tot_latency, duration}; + co_return results_t{num_ops, tot_latency, common.get_duration()}; }; co_await pre_fill_logs(); - co_await run_concurrent_ios(duration, num_concurrent_io, add_remove_entry); + co_await run_concurrent_ios( + common.get_duration(), common.num_concurrent_io, add_remove_entry); co_return; } @@ -356,14 +455,10 @@ delete_random_key(crimson::os::FuturizedStore::Shard &shard_ref, * based on the chosen bucets size,aka if its within acceptable range (c) doing * N I/o's concurrently on 1 thread */ -seastar::future<> rgw_index_workload(crimson::os::FuturizedStore &global_store, - int num_indices, int num_concurrent_io, - int duration, int key_size, int value_size, - int target_keys_per_bucket, - int tolerance_range, - int num_buckets_per_collection) { - - LOG_PREFIX(rgw_index_workload); +seastar::future<> RGWIndexWorkload::run( + const common_options_t &common, + crimson::os::FuturizedStore &global_store) +{ auto &local_store = global_store.get_sharded_store(); std::map collection_id_for_rgw; std::map @@ -382,7 +477,7 @@ seastar::future<> rgw_index_workload(crimson::os::FuturizedStore &global_store, */ auto pre_fill_buckets = [&]() -> seastar::future<> { - for (int i = 0; i < num_indices; ++i) { + for (unsigned i = 0; i < num_indices; ++i) { auto bucket_i = create_hobj(i); auto coll_id = make_cid(i, num_buckets_per_collection); collection_id_for_rgw[i] = coll_id; @@ -392,7 +487,7 @@ seastar::future<> rgw_index_workload(crimson::os::FuturizedStore &global_store, std::map omap_for_this_bucket; std::set keys_in_this_bucket; - for (int j = 0; j < target_keys_per_bucket; ++j) { + for (unsigned j = 0; j < target_keys_per_bucket; ++j) { std::string possible_key = generate_random_string(key_size); while (keys_in_this_bucket.count(possible_key) > 0) { possible_key = generate_random_string(key_size); @@ -435,7 +530,7 @@ seastar::future<> rgw_index_workload(crimson::os::FuturizedStore &global_store, std::chrono::duration(0.0); auto start = ceph::mono_clock::now(); - while (ceph::mono_clock::now() - start <= std::chrono::seconds(duration)) { + while (ceph::mono_clock::now() - start <= common.get_duration()) { int bucket_num_we_choose = std::rand() % num_indices; auto bucket = create_hobj(bucket_num_we_choose); @@ -480,11 +575,12 @@ seastar::future<> rgw_index_workload(crimson::os::FuturizedStore &global_store, } }; } - co_return results_t{num_ops, tot_latency, duration}; + co_return results_t{num_ops, tot_latency, common.get_duration()}; }; co_await pre_fill_buckets(); - co_await run_concurrent_ios(duration, num_concurrent_io, rgw_actual_test); + co_await run_concurrent_ios( + common.get_duration(), common.num_concurrent_io, rgw_actual_test); co_return; }; @@ -494,22 +590,40 @@ int main(int argc, char **argv) { bool debug = false; std::string store_type; std::string store_path; + std::string work_load_type; + unsigned smp = 4; - desc.add_options()("help,h", "show help message")( - "store-type", - po::value(&store_type)->default_value("seastore"), - "set store type") + desc.add_options()("help,h", "show help message") + ("store-type", + po::value(&store_type)->default_value("seastore"), + "set store type") /* store-path is a path to a directory containing a file 'block' * block should be a symlink to a real device for actual performance * testing, but may be a file for testing this utility. * See build/dev/osd* after starting a vstart cluster for an example * of what that looks like. */ - ("store-path", po::value(&store_path), - "path to store, /block should " - "be a symlink to the target device for bluestore or seastore")( - "debug", po::value(&debug)->default_value(false), - "enable debugging"); + ("store-path", po::value(&store_path)->required(), + "path to store, /block should " + "be a symlink to the target device for bluestore or seastore")( + "debug", po::value(&debug)->default_value(false), + "enable debugging") + ("work-load-type", + po::value(&work_load_type)->required(), + "work load type: pg_log or rgw_index") + ("smp", po::value(&smp), + "number of reactors"); + + common_options_t common_options; + std::map> workloads; + + workloads.emplace("pg_log", std::make_unique()); + workloads.emplace("rgw_index", std::make_unique()); + + desc.add(common_options.get_options()); + for (auto &[name, workload] : workloads) { + desc.add(workload->get_options()); + } po::variables_map vm; std::vector unrecognized_options; @@ -539,65 +653,10 @@ int main(int argc, char **argv) { app_cfg.reactor_opts.blocked_reactor_notify_ms.set_default_value(200); seastar::app_template app(std::move(app_cfg)); - std::string work_load_type = "pg_log"; - int num_logs = 0; - int log_length = 0; - int log_size = 0; - int num_concurrent_io = 0; - int duration = 0; - int key_size = 0; - int value_size = 0; - int num_indices = 0; - int target_keys_per_bucket = 0; - int tolerance_range = 0; - int num_buckets_per_collection = 0; - - std::vector av{argv[0]}; - std::transform(std::begin(unrecognized_options), - std::end(unrecognized_options), std::back_inserter(av), - [](auto &s) { return const_cast(s.c_str()); }); - app.add_options()( - "work_load_type", - po::value(&work_load_type)->default_value("pg_log"), - "work load type: pg_log or rgw_index") - - ("num_logs", po::value(&num_logs), - "how many different logs we create; aka, we create a log for every " - "object,so how many objects we create") - - ("log_length", po::value(&log_length), - "number of entries per log") - - ("log_size", po::value(&log_size), "size of each log entry") - - ("num_concurrent_io", po::value(&num_concurrent_io), - "number of IOs happening simultaneously") - - ("duration", po::value(&duration), - "how long in seconds the actual testing loop runs " - "for") - // for rgw - ("num_indices", po::value(&num_indices), - "number of RGW indices/buckets") - - ("key_size", po::value(&key_size), "size of keys in bytes") - - ("value_size", po::value(&value_size), - "size of values in bytes") - - ("target_keys_per_bucket", - po::value(&target_keys_per_bucket), - "target number of keys per bucket") - - ("tolerance_range", po::value(&tolerance_range), - "tolerance range percentage") - - ("num_buckets_per_collection", - po::value(&num_buckets_per_collection), - "the number of objects in each collection "); - + auto smp_str = std::to_string(smp); + const char *av[] = { argv[0], "--smp", smp_str.c_str() }; return app.run( - av.size(), av.data(), + sizeof(av) / sizeof(av[0]), const_cast(av), /* crimson-osd uses seastar as its scheduler. We use * sesastar::app_template::run to start the base task for the * application -- this lambda. The -> seastar::future here @@ -678,20 +737,16 @@ int main(int argc, char **argv) { store_type, store_path) .c_str())); std::vector> per_shard_futures; + + auto named_lambda = [&, &store_ref = *store]() -> seastar::future<> { + DEBUG("running example_io on reactor {}", seastar::this_shard_id()); + auto iter = workloads.find(work_load_type); + if (iter != workloads.end()) { + co_await iter->second->run(common_options, store_ref); + } + co_return; + }; for (unsigned i = 0; i < seastar::smp::count; ++i) { - auto named_lambda = [=, &store_ref = *store]() -> seastar::future<> { - DEBUG("running example_io on reactor {}", seastar::this_shard_id()); - if (work_load_type == "pg_log") { - co_await pg_log_workload(store_ref, num_logs, num_concurrent_io, - duration, log_size, log_length); - } else if (work_load_type == "rgw_index") { - co_await rgw_index_workload( - store_ref, num_indices, num_concurrent_io, duration, key_size, - value_size, target_keys_per_bucket, tolerance_range, - num_buckets_per_collection); - } - co_return; - }; per_shard_futures.push_back( seastar::smp::submit_to(i, std::move(named_lambda))); }