]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/.../store-bench: refactor arguments and workloads
authorSamuel Just <sjust@redhat.com>
Tue, 12 Aug 2025 00:36:16 +0000 (17:36 -0700)
committerSamuel Just <sjust@redhat.com>
Thu, 21 Aug 2025 15:59:39 +0000 (08:59 -0700)
- Adds a workload abstraction grouping arguments with associated
  workload.
- Reworks argument parsing to occur prior to seastar app, allows
  passing unparsed arguments to ceph.
- Refactors time usages as necessary to use std::chrono types.
- Removes LOG_PREFIX usages that don't currently have log lines.
- Other minor cleanups not worth separating out.

Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/tools/store_bench/store-bench.cc

index e41b3877203b7cf31fe9e31772556670d1d15280..346e54f44d18675a9424413ae4d0d8acdf6c17ba 100644 (file)
 
 namespace po = boost::program_options;
 
+using namespace std::literals;
 using namespace ceph;
 
 SET_SUBSYS(osd);
 
+/**
+ * The struct stores the number of operations and the total latency for all
+ * these operations For the pg workload type write+delete increases the number
+ * of operations by 1 For the rgw index workload, each write increases the
+ * num_operations by 1 Each delete increases the number of operations by 1
+ */
+struct results_t {
+  int num_operations = 0;
+  std::chrono::duration<double> tot_latency_sec = 0s;
+  std::chrono::duration<double> duration = 0s;
+
+  results_t &operator += (const results_t &other_result) {
+    num_operations += other_result.num_operations;
+    tot_latency_sec += other_result.tot_latency_sec;
+    return *this;
+  }
+
+  void dump(ceph::Formatter *f) const {
+    f->open_object_section("results_t");
+    f->dump_int("number of operations done", num_operations);
+    f->dump_float(
+        "the total latency aka time for these operations",
+        tot_latency_sec.count());
+    f->dump_float("the time the workload took to run is ",
+                  duration.count());
+    f->close_section();
+  }
+};
+
+struct common_options_t {
+private:
+  unsigned duration = 0;
+public:
+  unsigned num_concurrent_io = 16;
+  std::chrono::duration<uint64_t> get_duration() const {
+    return std::chrono::seconds(duration);
+  }
+
+  po::options_description get_options() {
+    po::options_description ret{"Common Options"};
+    ret.add_options()
+      ("num-concurrent-io", po::value<unsigned>(&num_concurrent_io),
+       "number of IOs happening simultaneously")
+      ("duration", po::value<unsigned>(&duration)->required(),
+       "how long in seconds the actual testing loop runs "
+       "for")
+      ;
+    return ret;
+  }
+};
+
+class StoreBenchWorkload {
+public:
+  virtual po::options_description get_options() = 0;
+  virtual seastar::future<> run(
+    const common_options_t &common,
+    crimson::os::FuturizedStore &global_store) = 0;
+  virtual ~StoreBenchWorkload() {}
+};
+
+
+class PGLogWorkload : public StoreBenchWorkload {
+  unsigned num_logs = 4;
+  unsigned log_size = 1024;
+  unsigned log_length = 256;
+
+public:
+  po::options_description get_options() final {
+    po::options_description ret{"PGLogWorkload"};
+    ret.add_options()
+      ("num-logs", po::value<unsigned>(&num_logs),
+       "how many different logs we create; aka, we create a log for every "
+       "object,so how many objects we create")
+      ("log-length", po::value<unsigned>(&log_length),
+       "number of entries per log")
+      ("log-size", po::value<unsigned>(&log_size),
+       "size of each log entry")
+      ;
+    return ret;
+  }
+  seastar::future<> run(
+    const common_options_t &common,
+    crimson::os::FuturizedStore &global_store) final;
+  ~PGLogWorkload() final {}
+};
+
+class RGWIndexWorkload : public StoreBenchWorkload {
+  unsigned num_indices = 16;
+  uint64_t key_size = 1024;
+  uint64_t value_size = 1024;
+  unsigned target_keys_per_bucket = 256;
+  unsigned tolerance_range = 10;
+  unsigned num_buckets_per_collection = 16;
+
+public:
+  po::options_description options{"RGWIndexWorkload"};
+
+  po::options_description get_options() final {
+    po::options_description ret{"PGLogWorkload"};
+    ret.add_options()
+      ("num_indices", po::value<unsigned>(&num_indices),
+       "number of RGW indices/buckets")
+      ("key_size", po::value<uint64_t>(&key_size),
+       "size of keys in bytes")
+      ("value_size", po::value<uint64_t>(&value_size),
+        "size of values in bytes")
+      ("target_keys_per_bucket",
+       po::value<unsigned>(&target_keys_per_bucket),
+       "target number of keys per bucket")
+      ("tolerance_range",
+       po::value<unsigned>(&tolerance_range),
+       "tolerance range percentage")
+      ("num_buckets_per_collection",
+       po::value<unsigned>(&num_buckets_per_collection),
+       "the number of objects in each collection ")
+    ;
+    return ret;
+  }
+  seastar::future<> run(
+    const common_options_t &common,
+    crimson::os::FuturizedStore &global_store) final;
+  ~RGWIndexWorkload() final {}
+};
+
 /**
  * These are functions that are used in both types of work_loads
  */
@@ -85,36 +210,6 @@ coll_t make_cid(int obj_id, int num_objects_per_collection) {
   return coll_t(spg_t(pg_t(pg_id, 0)));
 }
 
-/**
- * The struct stores the number of operations and the total latency for all
- * these operations For the pg workload type write+delete increases the number
- * of operations by 1 For the rgw index workload, each write increases the
- * num_operations by 1 Each delete increases the number of operations by 1
- */
-struct results_t {
-  int num_operations = 0;
-  std::chrono::duration<double> tot_latency_sec =
-      std::chrono::duration<double>(0.0);
-  int duration = 0;
-
-  results_t &operator += (const results_t &other_result) {
-    num_operations += other_result.num_operations;
-    tot_latency_sec += other_result.tot_latency_sec;
-    return *this;
-  }
-
-  void dump(ceph::Formatter *f) const {
-    f->open_object_section("results_t");
-    f->dump_int("number of operations done", num_operations);
-    f->dump_float(
-        "the total latency aka time for these operations",
-        tot_latency_sec.count());
-    f->dump_int("the time the workload took to run is ",
-                                   duration);
-    f->close_section();
-  }
-};
-
 /**
  * This function is used to run a workload num_con_io times concurrently
  * the  parallel_for_each loop launches k co_routines
@@ -123,9 +218,10 @@ struct results_t {
  * added to the vector so all_io_res containes resolved futures
  */
 seastar::future<results_t>
-run_concurrent_ios(int duration, int num_concurrent_io,
-                   std::function<seastar::future<results_t>()> work_load) {
-  LOG_PREFIX(run_concurrent_ios);
+run_concurrent_ios(
+  std::chrono::duration<double> duration,
+  int num_concurrent_io,
+  std::function<seastar::future<results_t>()> work_load) {
   std::vector<int> container_io;
   std::vector<results_t> all_io_res;
   for (int i = 0; i < num_concurrent_io; ++i) {
@@ -138,8 +234,11 @@ run_concurrent_ios(int duration, int num_concurrent_io,
         co_return;
       })));
 
-  results_t total_result_all_io = {0, std::chrono::duration<double>(0.0),
-                                   duration};
+  results_t total_result_all_io = {
+    0,
+    std::chrono::duration<double>(0.0),
+    duration
+  };
   for (const auto res : all_io_res) {
     total_result_all_io += res;
   }
@@ -159,10 +258,10 @@ run_concurrent_ios(int duration, int num_concurrent_io,
  * (b) writing and removing logs ,this is considered 1 I/O
  * (c) doing N I/o's concurrently on 1 thread
  */
-seastar::future<> pg_log_workload(crimson::os::FuturizedStore &global_store,
-                                  int num_logs, int num_concurrent_io,
-                                  int duration, int log_size, int log_length) {
-  LOG_PREFIX(pg_log_workload);
+seastar::future<> PGLogWorkload::run(
+  const common_options_t &common,
+  crimson::os::FuturizedStore &global_store)
+{
   auto &local_store = global_store.get_sharded_store();
 
   std::map<int, coll_t> collection_id;
@@ -176,14 +275,14 @@ seastar::future<> pg_log_workload(crimson::os::FuturizedStore &global_store,
    */
 
   auto pre_fill_logs = [&]() -> seastar::future<> {
-    for (int i = 0; i < num_logs; ++i) {
+    for (unsigned i = 0; i < num_logs; ++i) {
       auto obj_i = create_hobj(i);
       auto coll_id = make_cid(i, 1);
       collection_id[i] = coll_id;
       auto coll_ref = co_await local_store.create_new_collection(coll_id);
       coll_ref_map[i] = coll_ref;
       std::map<std::string, bufferlist> data;
-      for (int j = 0; j < log_length; ++j) {
+      for (unsigned j = 0; j < log_length; ++j) {
         std::string key = std::to_string(j);
         bufferlist bl_value;
         bl_value.append_zero(log_size);
@@ -215,8 +314,7 @@ seastar::future<> pg_log_workload(crimson::os::FuturizedStore &global_store,
         std::chrono::duration<double>(0.0);
     auto start = ceph::mono_clock::now();
 
-    while (ceph::mono_clock::now() - start <=
-           (std::chrono::seconds(duration))) { // duration is an int
+    while (ceph::mono_clock::now() - start <= common.get_duration()) {
       int obj_num = std::rand() % num_logs;
 
       auto object = create_hobj(obj_num);
@@ -249,10 +347,11 @@ seastar::future<> pg_log_workload(crimson::os::FuturizedStore &global_store,
       tot_latency += time_sec;
       num_ops++;
     }
-    co_return results_t{num_ops, tot_latency, duration};
+    co_return results_t{num_ops, tot_latency, common.get_duration()};
   };
   co_await pre_fill_logs();
-  co_await run_concurrent_ios(duration, num_concurrent_io, add_remove_entry);
+  co_await run_concurrent_ios(
+    common.get_duration(), common.num_concurrent_io, add_remove_entry);
   co_return;
 }
 
@@ -356,14 +455,10 @@ delete_random_key(crimson::os::FuturizedStore::Shard &shard_ref,
  * based on the chosen bucets size,aka if its within acceptable range (c) doing
  * N I/o's concurrently on 1 thread
  */
-seastar::future<> rgw_index_workload(crimson::os::FuturizedStore &global_store,
-                                     int num_indices, int num_concurrent_io,
-                                     int duration, int key_size, int value_size,
-                                     int target_keys_per_bucket,
-                                     int tolerance_range,
-                                     int num_buckets_per_collection) {
-
-  LOG_PREFIX(rgw_index_workload);
+seastar::future<> RGWIndexWorkload::run(
+  const common_options_t &common,
+  crimson::os::FuturizedStore &global_store)
+{
   auto &local_store = global_store.get_sharded_store();
   std::map<int, coll_t> collection_id_for_rgw;
   std::map<int, crimson::os::CollectionRef>
@@ -382,7 +477,7 @@ seastar::future<> rgw_index_workload(crimson::os::FuturizedStore &global_store,
    */
 
   auto pre_fill_buckets = [&]() -> seastar::future<> {
-    for (int i = 0; i < num_indices; ++i) {
+    for (unsigned i = 0; i < num_indices; ++i) {
       auto bucket_i = create_hobj(i);
       auto coll_id = make_cid(i, num_buckets_per_collection);
       collection_id_for_rgw[i] = coll_id;
@@ -392,7 +487,7 @@ seastar::future<> rgw_index_workload(crimson::os::FuturizedStore &global_store,
       std::map<std::string, bufferlist> omap_for_this_bucket;
       std::set<std::string> keys_in_this_bucket;
 
-      for (int j = 0; j < target_keys_per_bucket; ++j) {
+      for (unsigned j = 0; j < target_keys_per_bucket; ++j) {
         std::string possible_key = generate_random_string(key_size);
         while (keys_in_this_bucket.count(possible_key) > 0) {
           possible_key = generate_random_string(key_size);
@@ -435,7 +530,7 @@ seastar::future<> rgw_index_workload(crimson::os::FuturizedStore &global_store,
         std::chrono::duration<double>(0.0);
     auto start = ceph::mono_clock::now();
 
-    while (ceph::mono_clock::now() - start <= std::chrono::seconds(duration)) {
+    while (ceph::mono_clock::now() - start <= common.get_duration()) {
 
       int bucket_num_we_choose = std::rand() % num_indices;
       auto bucket = create_hobj(bucket_num_we_choose);
@@ -480,11 +575,12 @@ seastar::future<> rgw_index_workload(crimson::os::FuturizedStore &global_store,
         }
       };
     }
-    co_return results_t{num_ops, tot_latency, duration};
+    co_return results_t{num_ops, tot_latency, common.get_duration()};
   };
 
   co_await pre_fill_buckets();
-  co_await run_concurrent_ios(duration, num_concurrent_io, rgw_actual_test);
+  co_await run_concurrent_ios(
+    common.get_duration(), common.num_concurrent_io, rgw_actual_test);
   co_return;
 };
 
@@ -494,22 +590,40 @@ int main(int argc, char **argv) {
   bool debug = false;
   std::string store_type;
   std::string store_path;
+  std::string work_load_type;
+  unsigned smp = 4;
 
-  desc.add_options()("help,h", "show help message")(
-      "store-type",
-      po::value<std::string>(&store_type)->default_value("seastore"),
-      "set store type")
+  desc.add_options()("help,h", "show help message")
+    ("store-type",
+     po::value<std::string>(&store_type)->default_value("seastore"),
+     "set store type")
       /* store-path is a path to a directory containing a file 'block'
        * block should be a symlink to a real device for actual performance
        * testing, but may be a file for testing this utility.
        * See build/dev/osd* after starting a vstart cluster for an example
        * of what that looks like.
        */
-      ("store-path", po::value<std::string>(&store_path),
-       "path to store, <store-path>/block should "
-       "be a symlink to the target device for bluestore or seastore")(
-          "debug", po::value<bool>(&debug)->default_value(false),
-          "enable debugging");
+    ("store-path", po::value<std::string>(&store_path)->required(),
+     "path to store, <store-path>/block should "
+     "be a symlink to the target device for bluestore or seastore")(
+       "debug", po::value<bool>(&debug)->default_value(false),
+       "enable debugging")
+    ("work-load-type",
+     po::value<std::string>(&work_load_type)->required(),
+     "work load type: pg_log or rgw_index")
+    ("smp", po::value<unsigned>(&smp),
+     "number of reactors");
+  
+  common_options_t common_options;
+  std::map<std::string, std::unique_ptr<StoreBenchWorkload>> workloads;
+    
+  workloads.emplace("pg_log", std::make_unique<PGLogWorkload>());
+  workloads.emplace("rgw_index", std::make_unique<RGWIndexWorkload>());
+  
+  desc.add(common_options.get_options());
+  for (auto &[name, workload] : workloads) {
+    desc.add(workload->get_options());
+  }
 
   po::variables_map vm;
   std::vector<std::string> unrecognized_options;
@@ -539,65 +653,10 @@ int main(int argc, char **argv) {
   app_cfg.reactor_opts.blocked_reactor_notify_ms.set_default_value(200);
   seastar::app_template app(std::move(app_cfg));
 
-  std::string work_load_type = "pg_log";
-  int num_logs = 0;
-  int log_length = 0;
-  int log_size = 0;
-  int num_concurrent_io = 0;
-  int duration = 0;
-  int key_size = 0;
-  int value_size = 0;
-  int num_indices = 0;
-  int target_keys_per_bucket = 0;
-  int tolerance_range = 0;
-  int num_buckets_per_collection = 0;
-
-  std::vector<char *> av{argv[0]};
-  std::transform(std::begin(unrecognized_options),
-                 std::end(unrecognized_options), std::back_inserter(av),
-                 [](auto &s) { return const_cast<char *>(s.c_str()); });
-  app.add_options()(
-      "work_load_type",
-      po::value<std::string>(&work_load_type)->default_value("pg_log"),
-      "work load type: pg_log or rgw_index")
-
-      ("num_logs", po::value<int>(&num_logs),
-       "how many different logs we create; aka, we create a log for every "
-       "object,so how many objects we create")
-
-      ("log_length", po::value<int>(&log_length),
-        "number of entries per log")
-
-      ("log_size", po::value<int>(&log_size), "size of each log entry")
-
-      ("num_concurrent_io", po::value<int>(&num_concurrent_io),
-        "number of IOs happening simultaneously")
-
-      ("duration", po::value<int>(&duration),
-        "how long in seconds the actual testing loop runs "
-        "for")
-      // for rgw
-      ("num_indices", po::value<int>(&num_indices),
-       "number of RGW indices/buckets")
-
-      ("key_size", po::value<int>(&key_size), "size of keys in bytes")
-
-      ("value_size", po::value<int>(&value_size),
-        "size of values in bytes")
-
-      ("target_keys_per_bucket",
-        po::value<int>(&target_keys_per_bucket),
-        "target number of keys per bucket")
-
-      ("tolerance_range", po::value<int>(&tolerance_range),
-        "tolerance range percentage")
-
-      ("num_buckets_per_collection",
-        po::value<int>(&num_buckets_per_collection),
-        "the number of objects in each collection ");
-
+  auto smp_str = std::to_string(smp);
+  const char *av[] = { argv[0], "--smp", smp_str.c_str() };
   return app.run(
-      av.size(), av.data(),
+      sizeof(av) / sizeof(av[0]), const_cast<char **>(av),
       /* crimson-osd uses seastar as its scheduler.  We use
        * sesastar::app_template::run to start the base task for the
        * application -- this lambda.  The -> seastar::future<int> here
@@ -678,20 +737,16 @@ int main(int argc, char **argv) {
                             store_type, store_path)
                     .c_str()));
         std::vector<seastar::future<>> per_shard_futures;
+
+        auto named_lambda = [&, &store_ref = *store]() -> seastar::future<> {
+          DEBUG("running example_io on reactor {}", seastar::this_shard_id());
+          auto iter = workloads.find(work_load_type);
+          if (iter != workloads.end()) {
+            co_await iter->second->run(common_options, store_ref);
+          }
+          co_return;
+        };
         for (unsigned i = 0; i < seastar::smp::count; ++i) {
-          auto named_lambda = [=, &store_ref = *store]() -> seastar::future<> {
-            DEBUG("running example_io on reactor {}", seastar::this_shard_id());
-            if (work_load_type == "pg_log") {
-              co_await pg_log_workload(store_ref, num_logs, num_concurrent_io,
-                                       duration, log_size, log_length);
-            } else if (work_load_type == "rgw_index") {
-              co_await rgw_index_workload(
-                  store_ref, num_indices, num_concurrent_io, duration, key_size,
-                  value_size, target_keys_per_bucket, tolerance_range,
-                  num_buckets_per_collection);
-            }
-            co_return;
-          };
           per_shard_futures.push_back(
               seastar::smp::submit_to(i, std::move(named_lambda)));
         }