From 41430cb4c954b8e3ff2b92a8eb7b1efebfb7478e Mon Sep 17 00:00:00 2001 From: Shreya Subramanian Date: Thu, 24 Jul 2025 18:22:14 +0000 Subject: [PATCH] crimson/tools: Added PG log and rgw_index workload This commit includes 2 workloads to crimson-store-bench (a)PG_log workload with sequential omap write and delete (b)RGW_index workload with randomised omap write and delete Output is the number of operations, the total latency in seconds and the duration of the workload in seconds per reactor. Signed-off-by: Shreya Subramanian --- src/crimson/tools/store_bench/store-bench.cc | 854 +++++++++++++------ 1 file changed, 609 insertions(+), 245 deletions(-) diff --git a/src/crimson/tools/store_bench/store-bench.cc b/src/crimson/tools/store_bench/store-bench.cc index fe1053b4f5d97..6b5d57e2bd201 100644 --- a/src/crimson/tools/store_bench/store-bench.cc +++ b/src/crimson/tools/store_bench/store-bench.cc @@ -15,22 +15,20 @@ * size if performance isn't relevant (testing or developement of this utility, * for instance). * - * One might want to add something like the following to one's .bashrc to quickly - * run this utility during development from build/: - * - * function run_store_bench { - * rm -rf store_bench_dir - * mkdir store_bench_dir - * touch store_bench_dir/block - * truncate -s 10G store_bench_dir/block - * ./bin/crimson-store-bench --store-path store_bench_dir $@ - * } + * One might want to add something like the following to one's .bashrc to + * quickly run this utility during development from build/: + * rm -rf store_bench_dir + mkdir store_bench_dir + touch store_bench_dir/block + truncate -s 10G store_bench_dir/block + ./build/bin/crimson-store-bench --store-path store_bench_dir --smp 4 "$@" */ #include +#include -#include #include +#include #include #include @@ -42,9 +40,8 @@ #include #include -#include "crimson/common/coroutine.h" -#include "crimson/common/log.h" #include "crimson/common/config_proxy.h" +#include "crimson/common/coroutine.h" #include "crimson/common/log.h" #include "crimson/os/futurized_collection.h" @@ -57,150 +54,470 @@ using namespace ceph; SET_SUBSYS(osd); /** - * example_io - * - * Performs some simple operations against store. - * The FuturizedStore interface can be found at - * crimson/os/futurized_store.h + * These are functions that are used in both types of work_loads */ -seastar::future<> example_io(crimson::os::FuturizedStore &global_store) -{ - LOG_PREFIX(example_io); - /* crimson-osd's architecture partitions most resources per seastar - * reactor. This allows us to (mostly) avoid locking and other forms - * of contention. This call gets us the FuturizedStore::Shard - * local to the reactor we are executing on. */ - auto &local_store = global_store.get_sharded_store(); - /* Objects in FuturizedStore instances are stored in collections. - * crimson-osd (and ceph-osd) use one collection per pg, so - * collections are designated by spg_t via coll_t - * (see osd/osd_types.h). - */ - coll_t cid( - spg_t( - // Map shard to pool for this test - pg_t(seastar::this_shard_id(), 0) - ) - ); - auto coll_ref = co_await local_store.create_new_collection(cid); - - /* Objects in FuturizedStore are named by ghobject_t -- - * see common/hobject.h. Let's create a convenience function - * to generate instances which differ by one paramater. - * - * Note that objects must have globally unique names(ghobject_t), even if - * in different collections. - */ - auto create_hobj = [](unsigned id) { - return ghobject_t( - shard_id_t::NO_SHARD, - seastar::this_shard_id(), +/** + * Creates ghobject_t object identifier from integer identifier. + */ +ghobject_t create_hobj(unsigned id) { + return ghobject_t( + shard_id_t::NO_SHARD, seastar::this_shard_id(), id, // hash, normally rjenkins of name, but let's just set it to id "", // namespace, empty here "", // name, empty here 0, // snapshot ghobject_t::NO_GEN); - }; +}; - /* ceph buffers are represented using bufferlist. - * See include/buffer.h +/** + * The function is called whenever we create an object + * This function creates a collection identified by pg_id and shard_id + * we calculate pg_id as integer division of obj_id/num_objects_per_collection + * This ensures that the number of objects in each collection is essentially + * dependent on objects_per_collection in the case of pg_workload we set this to + * 1 , so every object will fall into a different pg in the case of rgw_workload + * we set it to x so all objects with obj id be 0 and x-1 will fall into 1 + * collection + */ +coll_t make_cid(int obj_id, int num_objects_per_collection) { + int pg_id = obj_id / num_objects_per_collection; + return coll_t(spg_t(pg_t(pg_id, 0))); +} + +/** + * The struct stores the number of operations and the total latency for all + * these operations For the pg workload type write+delete increases the number + * of operations by 1 For the rgw index workload, each write increases the + * num_operations by 1 Each delete increases the number of operations by 1 + */ +struct results_t { + int num_operations = 0; + std::chrono::duration tot_latency_sec = + std::chrono::duration(0.0); + int duration = 0; + + results_t &operator += (const results_t &other_result) { + num_operations += other_result.num_operations; + tot_latency_sec += other_result.tot_latency_sec; + return *this; + } + + void dump(ceph::Formatter *f) const { + f->open_object_section("results_t"); + f->dump_int("number of operations done", num_operations); + f->dump_float( + "the total latency aka time for these operations", + tot_latency_sec.count()); + f->dump_int("the time the workload took to run is ", + duration); + f->close_section(); + } +}; + +/** + * This function is used to run a workload num_con_io times concurrently + * the parallel_for_each loop launches k co_routines + * Each coroutine is the actual work_load function which returns a + * future Since its a co_await once that future is resolved it is + * added to the vector so all_io_res containes resolved futures + */ +seastar::future +run_concurrent_ios(int duration, int num_concurrent_io, + std::function()> work_load) { + LOG_PREFIX(run_concurrent_ios); + std::vector container_io; + std::vector all_io_res; + for (int i = 0; i < num_concurrent_io; ++i) { + container_io.push_back(i); + } + co_await seastar::parallel_for_each( + container_io, (seastar::coroutine::lambda([&](int) -> seastar::future<> { + auto res = co_await work_load(); + all_io_res.push_back(std::move(res)); + co_return; + }))); + + results_t total_result_all_io = {0, std::chrono::duration(0.0), + duration}; + for (const auto res : all_io_res) { + total_result_all_io += res; + } + ceph::JSONFormatter jf; + total_result_all_io.dump(&jf); + jf.flush(std::cout); + std::cout << std::endl; + co_return total_result_all_io; +}; + +/** + * This function adds and removes log entries to a log object + * It returns throughput(number of operations/nano sec) + * Also returns average latency (time/operation) + * This function is split into 3 steps: + * (a)pre filling the logs to steady state + * (b) writing and removing logs ,this is considered 1 I/O + * (c) doing N I/o's concurrently on 1 thread + */ +seastar::future<> pg_log_workload(crimson::os::FuturizedStore &global_store, + int num_logs, int num_concurrent_io, + int duration, int log_size, int log_length) { + LOG_PREFIX(pg_log_workload); + auto &local_store = global_store.get_sharded_store(); + + std::map collection_id; + std::map coll_ref_map; + + /** + * This method returns a future with pre filled logs + * It creates num_log number of log objects + * Each log object has log_length number of entries + * Each entry is a key value pair, with the value having size log_size */ - auto make_bl = [](std::string_view sv) { - bufferlist bl; - bl.append(bufferptr(sv.data(), sv.size())); - return bl; - }; - auto bl_to_str = [](bufferlist bl) { - return std::string(bl.c_str(), bl.length()); + + auto pre_fill_logs = [&]() -> seastar::future<> { + for (int i = 0; i < num_logs; ++i) { + auto obj_i = create_hobj(i); + auto coll_id = make_cid(i, 1); + collection_id[i] = coll_id; + auto coll_ref = co_await local_store.create_new_collection(coll_id); + coll_ref_map[i] = coll_ref; + std::map data; + for (int j = 0; j < log_length; ++j) { + std::string key = std::to_string(j); + bufferlist bl_value; + bl_value.append_zero(log_size); + data[key] = bl_value; + } + ceph::os::Transaction txn; + txn.create(coll_id, obj_i); + txn.omap_setkeys(coll_id, obj_i, data); + co_await local_store.do_transaction(coll_ref, std::move(txn)); + } + co_return; }; - /* Writes to FuturizedStore instances are performed via - * the ceph::os::Transaction class. All operations within - * the transaction will be applied atomically -- after a - * failure either the whole transaction will have happened - * or none of it will. - * - * ceph::os::Transaction is shared with classic because it - * is part of the primary->replica replication protocol. - * See os/Transaction.h for details. + std::vector first_key_per_log(num_logs, + 0); // first key in each log object + std::vector last_key_per_log(num_logs, + log_length); // last key in each log object + + /** + * This method returns a future of type struct results_t + * In this function we choose a random log object to write to and remove keys + * from We add and remove keys sequentially, add keys to the end, remove from + * the front Total latency per io is calculated as the sum of the time it + * takes to add and remove each key */ - auto obj0 = create_hobj(0); - std::string key("foo"); - std::string val("bar"); - { - ceph::os::Transaction t; - // create object - t.create(cid, obj0); - // set omap key "foo" to "bar" - std::map attrs; - attrs[key] = make_bl(val); - t.omap_setkeys( - cid, - obj0, - attrs); - // actually submit the transaction and await commit - co_await local_store.do_transaction( - coll_ref, - std::move(t)); - ERROR("created object {} in collection {} with omap {}->{}", - obj0, cid, key, val); - } + auto add_remove_entry = [&]() -> seastar::future { + int num_ops = 0; + std::chrono::duration tot_latency = + std::chrono::duration(0.0); + auto start = ceph::mono_clock::now(); + + while (ceph::mono_clock::now() - start <= + (std::chrono::seconds(duration))) { // duration is an int + int obj_num = std::rand() % num_logs; + + auto object = create_hobj(obj_num); + auto coll_id = collection_id[obj_num]; + auto coll_ref = coll_ref_map[obj_num]; + + std::string key_to_write = std::to_string(last_key_per_log[obj_num]); + last_key_per_log[obj_num] += 1; - { - std::set keys; - keys.insert(key); - auto result = co_await local_store.omap_get_values( - coll_ref, - obj0, - keys - ).handle_error( - crimson::ct_error::assert_all("error reading object") - ); - auto iter = result.find(key); - if (iter == result.end()) { - ERROR("Failed to find key {} on obj {}", key, obj0); - } else if (bl_to_str(iter->second) != val) { - ERROR("key {} on obj {} does not match", key, obj0); - } else { - ERROR("read key {} on obj {}, matches", key, obj0); + std::string key_to_remove = std::to_string(first_key_per_log[obj_num]); + first_key_per_log[obj_num] += 1; + + bufferlist val; + val.append_zero(log_size); + std::map key_val; + key_val[key_to_write] = val; + + ceph::os::Transaction one_write_delete; + one_write_delete.omap_setkeys(coll_id, object, key_val); + one_write_delete.omap_rmkey(coll_id, object, key_to_remove); + + auto latency_start = ceph::mono_clock::now(); + co_await local_store.do_transaction(coll_ref, + std::move(one_write_delete)); + auto latency_end = ceph::mono_clock::now(); + + auto time_nanosec = (latency_end - latency_start); + std::chrono::duration time_sec = + std::chrono::duration(time_nanosec); + tot_latency += time_sec; + num_ops++; } + co_return results_t{num_ops, tot_latency, duration}; + }; + co_await pre_fill_logs(); + co_await run_concurrent_ios(duration, num_concurrent_io, add_remove_entry); + co_return; +} + +// rgw start + +/** + * This function is a helper function specfically for the rgw workload + * Since this workload involves insertion and deletion of random keys we have a + * function to generate a random key of size key_size + */ +std::string generate_random_string(int key_size) { + std::string res = ""; + for (int i = 0; i < key_size; ++i) { + char letter = char(std::rand() % 26 + 97); + res += letter; + } + return res; +} + +// helper functions for rgw_workload specifically + +/** + * This function is writing a key to a specific bucket + * given a bucket, a set of keys that aready exist in that bucket + * the current size of that bucket, this function writes a randomly generated + * unique key to that bucket The size of the bucket and the actual insertion + * into the keyset of the bucket is completed after the transaction to write the + * key is submitted this method returns a future of type result_t, with the + * number of operations as 1, and the time as the time it took for the write + * every time this method is called we will incremnet the number of operations + * by 1 and the latency by the time it took for this operation + */ + +seastar::future +write_unique_key(crimson::os::FuturizedStore::Shard &shard_ref, coll_t coll_id, + crimson::os::CollectionRef coll_ref, ghobject_t bucket, + std::set &existing_keys, int key_size, + int value_size) { + std::string new_key = generate_random_string(key_size); + while (existing_keys.count(new_key) > 0) { + new_key = generate_random_string(key_size); } + bufferlist value; + value.append_zero(value_size); + + std::map data_entry; + data_entry[new_key] = value; + + ceph::os::Transaction one_write; + one_write.omap_setkeys(coll_id, bucket, data_entry); + + auto latency_start = ceph::mono_clock::now(); + co_await shard_ref.do_transaction(coll_ref, std::move(one_write)); + auto latency_end = ceph::mono_clock::now(); + auto time_per_write = (latency_end - latency_start); + std::chrono::duration time_per_write_sec = time_per_write; + + existing_keys.insert(new_key); + co_return results_t{1, time_per_write_sec}; } -int main(int argc, char** argv) -{ +/** + * This function is deleting a key from a specific bucket + * given a bucket, a set of keys that aready exist in that bucket + * the current size of that bucket, this function deletes a randomly selected + * key from that bucket The size of the bucket and the actual deletion from the + * keyset of the bucket is completed after the transaction to write the key is + * submitted this method returns a future of type result_t, with the number of + * operations as 1, and the time as the time it took for the write every time + * this method is called we will incremnet the number of operations by 1 and the + * latency by the time it took for this operation + */ + +seastar::future +delete_random_key(crimson::os::FuturizedStore::Shard &shard_ref, + coll_t &coll_id, crimson::os::CollectionRef coll_ref, + ghobject_t &bucket, std::set &existing_keys) { + int index = std::rand() % existing_keys.size(); + auto it = existing_keys.begin(); + std::advance(it, index); + std::string key_to_delete = *it; + existing_keys.erase(it); + + ceph::os::Transaction one_delete; + one_delete.omap_rmkey(coll_id, bucket, key_to_delete); + auto latency_start = ceph::mono_clock::now(); + co_await shard_ref.do_transaction(coll_ref, std::move(one_delete)); + auto latency_end = ceph::mono_clock::now(); + auto time_per_delete = (latency_end - latency_start); + std::chrono::duration time_per_delete_sec = time_per_delete; + co_return results_t{1, time_per_delete_sec}; +} + +/** + * This function randomly adds and removes keys to a randomly chosen rgw bucket + * It returns throughput(number of operations/nano sec) + * Also returns average latency (time/operation) + * This function is split into 3 steps: + * (a)pre filling the buckets to steady state aka to a set required number of + * keys in each set of each bucket (b)Randomly choosing to write or remove a key + * based on the chosen bucets size,aka if its within acceptable range (c) doing + * N I/o's concurrently on 1 thread + */ +seastar::future<> rgw_index_workload(crimson::os::FuturizedStore &global_store, + int num_indices, int num_concurrent_io, + int duration, int key_size, int value_size, + int target_keys_per_bucket, + int tolerance_range, + int num_buckets_per_collection) { + + LOG_PREFIX(rgw_index_workload); + auto &local_store = global_store.get_sharded_store(); + std::map collection_id_for_rgw; + std::map + coll_ref_map_rgw; // map of bucket number and coll_ref + std::vector> keys_per_bucket( + num_indices); // vector where each index is the bucket number and the + // value at that index is a set of existing keys in that// + // bucket + + /** + * This method returns a future with pre filled buckets + * It creates num_indices number of buckets + * Each bucket is pre-filled to target_keys_per_bucket + * Each entry is a key value pair, with each key being unique of size key_size + * and each value with size value_size + */ + + auto pre_fill_buckets = [&]() -> seastar::future<> { + for (int i = 0; i < num_indices; ++i) { + auto bucket_i = create_hobj(i); + auto coll_id = make_cid(i, num_buckets_per_collection); + collection_id_for_rgw[i] = coll_id; + auto coll_ref = co_await local_store.create_new_collection(coll_id); + coll_ref_map_rgw[i] = coll_ref; + + std::map omap_for_this_bucket; + std::set keys_in_this_bucket; + + for (int j = 0; j < target_keys_per_bucket; ++j) { + std::string possible_key = generate_random_string(key_size); + while (keys_in_this_bucket.count(possible_key) > 0) { + possible_key = generate_random_string(key_size); + } + keys_in_this_bucket.insert(possible_key); + bufferlist val_for_poss_key; + val_for_poss_key.append_zero(value_size); + omap_for_this_bucket[possible_key] = val_for_poss_key; + } + keys_per_bucket[i] = keys_in_this_bucket; + ceph::os::Transaction txn_write_omap_for_bucket; + txn_write_omap_for_bucket.create(coll_id, bucket_i); + txn_write_omap_for_bucket.omap_setkeys(coll_id, bucket_i, + omap_for_this_bucket); + co_await local_store.do_transaction(coll_ref, + std::move(txn_write_omap_for_bucket)); + } + co_return; + }; + + // size of each bucket, initially each bucket has size + // target_keys_per_bucket,because all buckets are pre filled + std::vector size_per_bucket(num_indices, target_keys_per_bucket); + + // min and max size is the range of allowable bucket size + int min_size = + std::floor(target_keys_per_bucket * (1 - tolerance_range / 100.0)); + int max_size = + std::ceil(target_keys_per_bucket * (1 + tolerance_range / 100.0)); + + /** + * This method returns a future of type struct results_t + * In this function we choose a random bucket to write and remove keys from + * Based on the size of the bucket we chose, we either force write,force + * delete or randomly pick to write or delete a key + */ + auto rgw_actual_test = [&]() -> seastar::future { + int num_ops = 0; + std::chrono::duration tot_latency = + std::chrono::duration(0.0); + auto start = ceph::mono_clock::now(); + + while (ceph::mono_clock::now() - start <= std::chrono::seconds(duration)) { + + int bucket_num_we_choose = std::rand() % num_indices; + auto bucket = create_hobj(bucket_num_we_choose); + auto coll_id = collection_id_for_rgw[bucket_num_we_choose]; + auto coll_ref = coll_ref_map_rgw[bucket_num_we_choose]; + + int size_bucket_we_choose = size_per_bucket[bucket_num_we_choose]; + auto &keys_in_that_bucket = keys_per_bucket[bucket_num_we_choose]; + + // this case happens when the size of the bucket is min size and we choose + // to delete + if (size_bucket_we_choose <= min_size) { + results_t result = co_await write_unique_key( + local_store, coll_id, coll_ref, bucket, keys_in_that_bucket, + key_size, value_size); + size_per_bucket[bucket_num_we_choose] += 1; + tot_latency += result.tot_latency_sec; + num_ops += result.num_operations; + + } else if (size_bucket_we_choose >= max_size) { + results_t result = co_await delete_random_key( + local_store, coll_id, coll_ref, bucket, keys_in_that_bucket); + size_per_bucket[bucket_num_we_choose] -= 1; + tot_latency += result.tot_latency_sec; + num_ops += result.num_operations; + } else { + int choice = std::rand() % 2; + // choice 0 is write, choice 1 is delete + if (choice == 0) { + results_t result = co_await write_unique_key( + local_store, coll_id, coll_ref, bucket, keys_in_that_bucket, + key_size, value_size); + size_per_bucket[bucket_num_we_choose] += 1; + tot_latency += result.tot_latency_sec; + num_ops += result.num_operations; + } else { + results_t result = co_await delete_random_key( + local_store, coll_id, coll_ref, bucket, keys_in_that_bucket); + size_per_bucket[bucket_num_we_choose] -= 1; + tot_latency += result.tot_latency_sec; + num_ops += result.num_operations; + } + }; + } + co_return results_t{num_ops, tot_latency, duration}; + }; + + co_await pre_fill_buckets(); + co_await run_concurrent_ios(duration, num_concurrent_io, rgw_actual_test); + co_return; +}; + +int main(int argc, char **argv) { LOG_PREFIX(main); po::options_description desc{"Allowed options"}; bool debug = false; std::string store_type; std::string store_path; - std::string io_pattern; - - desc.add_options() - ("help,h", "show help message") - ("store-type", - po::value(&store_type)->default_value("seastore"), - "set store type") - /* store-path is a path to a directory containing a file 'block' - * block should be a symlink to a real device for actual performance - * testing, but may be a file for testing this utility. - * See build/dev/osd* after starting a vstart cluster for an example - * of what that looks like. - */ - ("store-path", po::value(&store_path), - "path to store, /block should " - "be a symlink to the target device for bluestore or seastore") - ("debug", po::value(&debug)->default_value(false), - "enable debugging"); + + desc.add_options()("help,h", "show help message")( + "store-type", + po::value(&store_type)->default_value("seastore"), + "set store type") + /* store-path is a path to a directory containing a file 'block' + * block should be a symlink to a real device for actual performance + * testing, but may be a file for testing this utility. + * See build/dev/osd* after starting a vstart cluster for an example + * of what that looks like. + */ + ("store-path", po::value(&store_path), + "path to store, /block should " + "be a symlink to the target device for bluestore or seastore")( + "debug", po::value(&debug)->default_value(false), + "enable debugging"); po::variables_map vm; std::vector unrecognized_options; try { auto parsed = po::command_line_parser(argc, argv) - .options(desc) - .allow_unregistered() - .run(); + .options(desc) + .allow_unregistered() + .run(); po::store(parsed, vm); if (vm.count("help")) { std::cout << desc << std::endl; @@ -209,8 +526,8 @@ int main(int argc, char** argv) po::notify(vm); unrecognized_options = - po::collect_unrecognized(parsed.options, po::include_positional); - } catch(const po::error& e) { + po::collect_unrecognized(parsed.options, po::include_positional); + } catch (const po::error &e) { std::cerr << "error: " << e.what() << std::endl; return 1; } @@ -220,121 +537,168 @@ int main(int argc, char** argv) app_cfg.auto_handle_sigint_sigterm = false; seastar::app_template app(std::move(app_cfg)); - std::vector av{argv[0]}; - std::transform( - std::begin(unrecognized_options), - std::end(unrecognized_options), - std::back_inserter(av), - [](auto& s) { - return const_cast(s.c_str()); - }); + std::string work_load_type = "pg_log"; + int num_logs = 0; + int log_length = 0; + int log_size = 0; + int num_concurrent_io = 0; + int duration = 0; + int key_size = 0; + int value_size = 0; + int num_indices = 0; + int target_keys_per_bucket = 0; + int tolerance_range = 0; + int num_buckets_per_collection = 0; + + std::vector av{argv[0]}; + std::transform(std::begin(unrecognized_options), + std::end(unrecognized_options), std::back_inserter(av), + [](auto &s) { return const_cast(s.c_str()); }); + app.add_options()( + "work_load_type", + po::value(&work_load_type)->default_value("pg_log"), + "work load type: pg_log or rgw_index") + + ("num_logs", po::value(&num_logs), + "how many different logs we create; aka, we create a log for every " + "object,so how many objects we create") + + ("log_length", po::value(&log_length), + "number of entries per log") + + ("log_size", po::value(&log_size), "size of each log entry") + + ("num_concurrent_io", po::value(&num_concurrent_io), + "number of IOs happening simultaneously") + + ("duration", po::value(&duration), + "how long in seconds the actual testing loop runs " + "for") + // for rgw + ("num_indices", po::value(&num_indices), + "number of RGW indices/buckets") + + ("key_size", po::value(&key_size), "size of keys in bytes") + + ("value_size", po::value(&value_size), + "size of values in bytes") + + ("target_keys_per_bucket", + po::value(&target_keys_per_bucket), + "target number of keys per bucket") + + ("tolerance_range", po::value(&tolerance_range), + "tolerance range percentage") + + ("num_buckets_per_collection", + po::value(&num_buckets_per_collection), + "the number of objects in each collection "); + return app.run( - av.size(), av.data(), - /* crimson-osd uses seastar as its scheduler. We use - * sesastar::app_template::run to start the base task for the - * application -- this lambda. The -> seastar::future here - * explicitely states the return type of the lambda, a future - * which resolves to an int. We need to do this because the - * co_return at the end is insufficient to express the type. - * - * The lambda internally uses co_await/co_return and is therefore - * a coroutine. co_await suspends execution until - * resolves. The whole co_await expression then evaluates to the - * contents of the future -- int for seastar::future. - * - * What's a bit confusing is that a coroutine generally *returns* - * at the first suspension point yielding it's return type, a - * seastar::future in this case. This is tricky for - * lambda-coroutines because it means that the lambda could go out - * of scope before the coroutine actually completes, resulting in - * captured variables (references to everything in the parent frame - * in this case -- [&]) being free'd. Resuming the coroutine would - * then hit a use-after-free as soon as it tries to access any - * of those variables. seastar::coroutine::lambda avoids this. - * I suggest having a look at src/seastar/include/seastar/core/coroutine.hh - * for the implementation. - * Note, the language guarrantees that *arguments* (whether to - * a lambda or not) have their lifetimes extended for the duration - * of the coroutine, so this isn't a problem for non-lambda - * coroutines. - */ - seastar::coroutine::lambda([&]() -> seastar::future { - if (debug) { - seastar::global_logger_registry().set_all_loggers_level( - seastar::log_level::debug - ); - } else { - seastar::global_logger_registry().set_all_loggers_level( - seastar::log_level::error - ); - } + av.size(), av.data(), + /* crimson-osd uses seastar as its scheduler. We use + * sesastar::app_template::run to start the base task for the + * application -- this lambda. The -> seastar::future here + * explicitely states the return type of the lambda, a future + * which resolves to an int. We need to do this because the + * co_return at the end is insufficient to express the type. + * + * The lambda internally uses co_await/co_return and is therefore + * a coroutine. co_await suspends execution until + * resolves. The whole co_await expression then evaluates to the + * contents of the future -- int for seastar::future. + * + * What's a bit confusing is that a coroutine generally *returns* + * at the first suspension point yielding it's return type, a + * seastar::future in this case. This is tricky for + * lambda-coroutines because it means that the lambda could go out + * of scope before the coroutine actually completes, resulting in + * captured variables (references to everything in the parent frame + * in this case -- [&]) being free'd. Resuming the coroutine would + * then hit a use-after-free as soon as it tries to access any + * of those variables. seastar::coroutine::lambda avoids this. + * I suggest having a look at + * src/seastar/include/seastar/core/coroutine.hh for the implementation. + * Note, the language guarrantees that *arguments* (whether to + * a lambda or not) have their lifetimes extended for the duration + * of the coroutine, so this isn't a problem for non-lambda + * coroutines. + */ + seastar::coroutine::lambda([&]() -> seastar::future { + if (debug) { + seastar::global_logger_registry().set_all_loggers_level( + seastar::log_level::debug); + } else { + seastar::global_logger_registry().set_all_loggers_level( + seastar::log_level::error); + } - co_await crimson::common::sharded_conf().start( - EntityName{}, std::string_view{"ceph"}); - co_await crimson::common::local_conf().start(); - - { - std::vector cav; - std::transform( - std::begin(unrecognized_options), - std::end(unrecognized_options), - std::back_inserter(cav), - [](auto& s) { - return s.c_str(); - }); - co_await crimson::common::local_conf().parse_argv( - cav); - } + co_await crimson::common::sharded_conf().start( + EntityName{}, std::string_view{"ceph"}); + co_await crimson::common::local_conf().start(); - auto store = crimson::os::FuturizedStore::create( - store_type, - store_path, - crimson::common::local_conf().get_config_values() - ); - - uuid_d uuid; - uuid.generate_random(); - - co_await store->start(); - /* FuturizedStore interfaces use errorated-futures rather than bare - * seastar futures in order to encode possible errors in the type. - * However, this utility doesn't really need to do anything clever - * with a failure to execute mkfs other than tell the user what - * happened, so we simply respond uniformly to all error cases - * using the handle_error handler. See FuturizedStore::mkfs for - * the actual return type and crimson/common/errorator.h for the - * implementation of errorators. - */ - co_await store->mkfs(uuid).handle_error( - crimson::stateful_ec::assert_failure( - std::format( - "error creating empty object store type {} in {}", - store_type, - store_path).c_str())); - co_await store->stop(); - - co_await store->start(); - co_await store->mount().handle_error( - crimson::stateful_ec::assert_failure( - std::format( - "error mounting object store type {} in {}", - store_type, - store_path).c_str())); - - for (unsigned i = 0; i < seastar::smp::count; ++i) { - co_await seastar::smp::submit_to( - i, - seastar::coroutine::lambda( - [FNAME, &store_ref=*store]() -> seastar::future<> { - ERROR("running example_io on reactor {}", seastar::this_shard_id()); - co_await example_io(store_ref); - }) - ); - } + { + std::vector cav; + std::transform( + std::begin(unrecognized_options), std::end(unrecognized_options), + std::back_inserter(cav), [](auto &s) { return s.c_str(); }); + co_await crimson::common::local_conf().parse_argv(cav); + } + + auto store = crimson::os::FuturizedStore::create( + store_type, store_path, + crimson::common::local_conf().get_config_values()); + + uuid_d uuid; + uuid.generate_random(); + + co_await store->start(); + /* FuturizedStore interfaces use errorated-futures rather than bare + * seastar futures in order to encode possible errors in the type. + * However, this utility doesn't really need to do anything clever + * with a failure to execute mkfs other than tell the user what + * happened, so we simply respond uniformly to all error cases + * using the handle_error handler. See FuturizedStore::mkfs for + * the actual return type and crimson/common/errorator.h for the + * implementation of errorators. + */ + co_await store->mkfs(uuid).handle_error( + crimson::stateful_ec::assert_failure( + std::format("error creating empty object store type {} in {}", + store_type, store_path) + .c_str())); + co_await store->stop(); + + co_await store->start(); + co_await store->mount().handle_error( + crimson::stateful_ec::assert_failure( + std::format("error mounting object store type {} in {}", + store_type, store_path) + .c_str())); + std::vector> per_shard_futures; + for (unsigned i = 0; i < seastar::smp::count; ++i) { + auto named_lambda = [=, &store_ref = *store]() -> seastar::future<> { + DEBUG("running example_io on reactor {}", seastar::this_shard_id()); + if (work_load_type == "pg_log") { + co_await pg_log_workload(store_ref, num_logs, num_concurrent_io, + duration, log_size, log_length); + } else if (work_load_type == "rgw_index") { + co_await rgw_index_workload( + store_ref, num_indices, num_concurrent_io, duration, key_size, + value_size, target_keys_per_bucket, tolerance_range, + num_buckets_per_collection); + } + co_return; + }; + per_shard_futures.push_back( + seastar::smp::submit_to(i, std::move(named_lambda))); + } - co_await store->umount(); - co_await store->stop(); - co_await crimson::common::sharded_conf().stop(); - co_return 0; - })); + co_await seastar::when_all(per_shard_futures.begin(), + per_shard_futures.end()); + co_await store->umount(); + co_await store->stop(); + co_await crimson::common::sharded_conf().stop(); + co_return 0; + })); } -- 2.39.5