]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/tools: Added PG log and rgw_index workload
authorShreya Subramanian <Shreya.Subramanian@ibm.com>
Thu, 24 Jul 2025 18:22:14 +0000 (18:22 +0000)
committerSamuel Just <sjust@redhat.com>
Thu, 7 Aug 2025 21:24:23 +0000 (14:24 -0700)
This commit includes 2 workloads to crimson-store-bench
(a)PG_log workload with sequential omap write and delete
(b)RGW_index workload with randomised omap write and delete

Output is the number of operations, the total latency in seconds and the
duration of the workload in seconds per reactor.

Signed-off-by: Shreya Subramanian <Shreya.Subramanian@ibm.com>
src/crimson/tools/store_bench/store-bench.cc

index fe1053b4f5d97e51b3f9544fac744d8073cc9313..6b5d57e2bd201ac78cc7b3fb83a7aec9572de6c1 100644 (file)
  * size if performance isn't relevant (testing or developement of this utility,
  * for instance).
  *
- * One might want to add something like the following to one's .bashrc to quickly
- * run this utility during development from build/:
- *
- * function run_store_bench {
- *   rm -rf store_bench_dir
- *   mkdir store_bench_dir
- *   touch store_bench_dir/block
- *   truncate -s 10G store_bench_dir/block
- *   ./bin/crimson-store-bench --store-path store_bench_dir $@
- * }
+ * One might want to add something like the following to one's .bashrc to
+ * quickly run this utility during development from build/:
+ * rm -rf store_bench_dir
+   mkdir store_bench_dir
+   touch store_bench_dir/block
+   truncate -s 10G store_bench_dir/block
+   ./build/bin/crimson-store-bench --store-path store_bench_dir --smp 4 "$@"
  */
 
 #include <random>
+#include <vector>
 
-#include <boost/program_options/variables_map.hpp>
 #include <boost/program_options/parsers.hpp>
+#include <boost/program_options/variables_map.hpp>
 
 #include <seastar/apps/lib/stop_signal.hh>
 #include <seastar/core/app-template.hh>
@@ -42,9 +40,8 @@
 #include <seastar/core/thread.hh>
 #include <seastar/util/defer.hh>
 
-#include "crimson/common/coroutine.h"
-#include "crimson/common/log.h"
 #include "crimson/common/config_proxy.h"
+#include "crimson/common/coroutine.h"
 #include "crimson/common/log.h"
 
 #include "crimson/os/futurized_collection.h"
@@ -57,150 +54,470 @@ using namespace ceph;
 SET_SUBSYS(osd);
 
 /**
- * example_io
- *
- * Performs some simple operations against store.
- * The FuturizedStore interface can be found at
- * crimson/os/futurized_store.h
+ * These are functions that are used in both types of work_loads
  */
-seastar::future<> example_io(crimson::os::FuturizedStore &global_store)
-{
-  LOG_PREFIX(example_io);
-  /* crimson-osd's architecture partitions most resources per seastar
-   * reactor.  This allows us to (mostly) avoid locking and other forms
-   * of contention.  This call gets us the FuturizedStore::Shard
-   * local to the reactor we are executing on. */
-  auto &local_store = global_store.get_sharded_store();
 
-  /* Objects in FuturizedStore instances are stored in collections.
-   * crimson-osd (and ceph-osd) use one collection per pg, so
-   * collections are designated by spg_t via coll_t
-   * (see osd/osd_types.h).
-   */
-  coll_t cid(
-    spg_t(
-      // Map shard to pool for this test
-      pg_t(seastar::this_shard_id(), 0)
-    )
-  );
-  auto coll_ref = co_await local_store.create_new_collection(cid);
-
-  /* Objects in FuturizedStore are named by ghobject_t --
-   * see common/hobject.h.  Let's create a convenience function
-   * to generate instances which differ by one paramater.
-   *
-   * Note that objects must have globally unique names(ghobject_t), even if
-   * in different collections.
-   */
-  auto create_hobj = [](unsigned id) {
-    return ghobject_t(
-      shard_id_t::NO_SHARD,
-      seastar::this_shard_id(),
+/**
+ * Creates ghobject_t object identifier from integer identifier.
+ */
+ghobject_t create_hobj(unsigned id) {
+  return ghobject_t(
+      shard_id_t::NO_SHARD, seastar::this_shard_id(),
       id, // hash, normally rjenkins of name, but let's just set it to id
       "", // namespace, empty here
       "", // name, empty here
       0,  // snapshot
       ghobject_t::NO_GEN);
-  };
+};
 
-  /* ceph buffers are represented using bufferlist.
-   * See include/buffer.h
+/**
+ * The function is called whenever we create an object
+ * This function creates a collection identified by pg_id and shard_id
+ * we calculate pg_id as integer division of obj_id/num_objects_per_collection
+ * This ensures that the number of objects in each collection is essentially
+ * dependent on objects_per_collection in the case of pg_workload we set this to
+ * 1 , so every object will fall into a different pg in the case of rgw_workload
+ * we set it to x so all objects with obj id be 0 and x-1 will fall into 1
+ * collection
+ */
+coll_t make_cid(int obj_id, int num_objects_per_collection) {
+  int pg_id = obj_id / num_objects_per_collection;
+  return coll_t(spg_t(pg_t(pg_id, 0)));
+}
+
+/**
+ * The struct stores the number of operations and the total latency for all
+ * these operations For the pg workload type write+delete increases the number
+ * of operations by 1 For the rgw index workload, each write increases the
+ * num_operations by 1 Each delete increases the number of operations by 1
+ */
+struct results_t {
+  int num_operations = 0;
+  std::chrono::duration<double> tot_latency_sec =
+      std::chrono::duration<double>(0.0);
+  int duration = 0;
+
+  results_t &operator += (const results_t &other_result) {
+    num_operations += other_result.num_operations;
+    tot_latency_sec += other_result.tot_latency_sec;
+    return *this;
+  }
+
+  void dump(ceph::Formatter *f) const {
+    f->open_object_section("results_t");
+    f->dump_int("number of operations done", num_operations);
+    f->dump_float(
+        "the total latency aka time for these operations",
+        tot_latency_sec.count());
+    f->dump_int("the time the workload took to run is ",
+                                   duration);
+    f->close_section();
+  }
+};
+
+/**
+ * This function is used to run a workload num_con_io times concurrently
+ * the  parallel_for_each loop launches k co_routines
+ * Each coroutine is the actual work_load function which returns a
+ * future<results_t> Since its a co_await once that future is resolved it is
+ * added to the vector so all_io_res containes resolved futures
+ */
+seastar::future<results_t>
+run_concurrent_ios(int duration, int num_concurrent_io,
+                   std::function<seastar::future<results_t>()> work_load) {
+  LOG_PREFIX(run_concurrent_ios);
+  std::vector<int> container_io;
+  std::vector<results_t> all_io_res;
+  for (int i = 0; i < num_concurrent_io; ++i) {
+    container_io.push_back(i);
+  }
+  co_await seastar::parallel_for_each(
+      container_io, (seastar::coroutine::lambda([&](int) -> seastar::future<> {
+        auto res = co_await work_load();
+        all_io_res.push_back(std::move(res));
+        co_return;
+      })));
+
+  results_t total_result_all_io = {0, std::chrono::duration<double>(0.0),
+                                   duration};
+  for (const auto res : all_io_res) {
+    total_result_all_io += res;
+  }
+  ceph::JSONFormatter jf;
+  total_result_all_io.dump(&jf);
+  jf.flush(std::cout);
+  std::cout << std::endl;
+  co_return total_result_all_io;
+};
+
+/**
+ * This function adds and removes log entries to a log object
+ * It returns throughput(number of operations/nano sec)
+ * Also returns average latency (time/operation)
+ * This function is split into 3 steps:
+ * (a)pre filling the logs to steady state
+ * (b) writing and removing logs ,this is considered 1 I/O
+ * (c) doing N I/o's concurrently on 1 thread
+ */
+seastar::future<> pg_log_workload(crimson::os::FuturizedStore &global_store,
+                                  int num_logs, int num_concurrent_io,
+                                  int duration, int log_size, int log_length) {
+  LOG_PREFIX(pg_log_workload);
+  auto &local_store = global_store.get_sharded_store();
+
+  std::map<int, coll_t> collection_id;
+  std::map<int, crimson::os::CollectionRef> coll_ref_map;
+
+  /**
+   * This method returns a future with pre filled logs
+   * It creates num_log number of log objects
+   * Each log object has log_length number of entries
+   * Each entry is a key value pair, with the value having size log_size
    */
-  auto make_bl = [](std::string_view sv) {
-    bufferlist bl;
-    bl.append(bufferptr(sv.data(), sv.size()));
-    return bl;
-  };
-  auto bl_to_str = [](bufferlist bl) {
-    return std::string(bl.c_str(), bl.length());
+
+  auto pre_fill_logs = [&]() -> seastar::future<> {
+    for (int i = 0; i < num_logs; ++i) {
+      auto obj_i = create_hobj(i);
+      auto coll_id = make_cid(i, 1);
+      collection_id[i] = coll_id;
+      auto coll_ref = co_await local_store.create_new_collection(coll_id);
+      coll_ref_map[i] = coll_ref;
+      std::map<std::string, bufferlist> data;
+      for (int j = 0; j < log_length; ++j) {
+        std::string key = std::to_string(j);
+        bufferlist bl_value;
+        bl_value.append_zero(log_size);
+        data[key] = bl_value;
+      }
+      ceph::os::Transaction txn;
+      txn.create(coll_id, obj_i);
+      txn.omap_setkeys(coll_id, obj_i, data);
+      co_await local_store.do_transaction(coll_ref, std::move(txn));
+    }
+    co_return;
   };
 
-  /* Writes to FuturizedStore instances are performed via
-   * the ceph::os::Transaction class.  All operations within
-   * the transaction will be applied atomically -- after a
-   * failure either the whole transaction will have happened
-   * or none of it will.
-   *
-   * ceph::os::Transaction is shared with classic because it
-   * is part of the primary->replica replication protocol.
-   * See os/Transaction.h for details.
+  std::vector<int> first_key_per_log(num_logs,
+                                     0); // first key in each log object
+  std::vector<int> last_key_per_log(num_logs,
+                                    log_length); // last key in each log object
+
+  /**
+   * This method returns a future of type struct results_t
+   * In this function we choose a random log object to write to and remove keys
+   * from We add and remove keys sequentially, add keys to the end, remove from
+   * the front Total latency per io is calculated as the sum of the time it
+   * takes to add and remove each key
    */
-  auto obj0 = create_hobj(0);
-  std::string key("foo");
-  std::string val("bar");
-  {
-    ceph::os::Transaction t;
-    // create object
-    t.create(cid, obj0);
-    // set omap key "foo" to "bar"
-    std::map<std::string, bufferlist> attrs;
-    attrs[key] = make_bl(val);
-    t.omap_setkeys(
-      cid,
-      obj0,
-      attrs);
-    // actually submit the transaction and await commit
-    co_await local_store.do_transaction(
-      coll_ref,
-      std::move(t));
-    ERROR("created object {} in collection {} with omap {}->{}",
-          obj0, cid, key, val);
-  }
+  auto add_remove_entry = [&]() -> seastar::future<results_t> {
+    int num_ops = 0;
+    std::chrono::duration<double> tot_latency =
+        std::chrono::duration<double>(0.0);
+    auto start = ceph::mono_clock::now();
+
+    while (ceph::mono_clock::now() - start <=
+           (std::chrono::seconds(duration))) { // duration is an int
+      int obj_num = std::rand() % num_logs;
+
+      auto object = create_hobj(obj_num);
+      auto coll_id = collection_id[obj_num];
+      auto coll_ref = coll_ref_map[obj_num];
+
+      std::string key_to_write = std::to_string(last_key_per_log[obj_num]);
+      last_key_per_log[obj_num] += 1;
 
-  {
-    std::set<std::string> keys;
-    keys.insert(key);
-    auto result = co_await local_store.omap_get_values(
-      coll_ref,
-      obj0,
-      keys
-    ).handle_error(
-      crimson::ct_error::assert_all("error reading object")
-    );
-    auto iter = result.find(key);
-    if (iter == result.end()) {
-      ERROR("Failed to find key {} on obj {}", key, obj0);
-    } else if (bl_to_str(iter->second) != val) {
-      ERROR("key {} on obj {} does not match", key, obj0);
-    } else {
-      ERROR("read key {} on obj {}, matches", key, obj0);
+      std::string key_to_remove = std::to_string(first_key_per_log[obj_num]);
+      first_key_per_log[obj_num] += 1;
+
+      bufferlist val;
+      val.append_zero(log_size);
+      std::map<std::string, bufferlist> key_val;
+      key_val[key_to_write] = val;
+
+      ceph::os::Transaction one_write_delete;
+      one_write_delete.omap_setkeys(coll_id, object, key_val);
+      one_write_delete.omap_rmkey(coll_id, object, key_to_remove);
+
+      auto latency_start = ceph::mono_clock::now();
+      co_await local_store.do_transaction(coll_ref,
+                                          std::move(one_write_delete));
+      auto latency_end = ceph::mono_clock::now();
+
+      auto time_nanosec = (latency_end - latency_start);
+      std::chrono::duration<double> time_sec =
+          std::chrono::duration<double>(time_nanosec);
+      tot_latency += time_sec;
+      num_ops++;
     }
+    co_return results_t{num_ops, tot_latency, duration};
+  };
+  co_await pre_fill_logs();
+  co_await run_concurrent_ios(duration, num_concurrent_io, add_remove_entry);
+  co_return;
+}
+
+// rgw start
+
+/**
+ * This function is a helper function specfically for the rgw workload
+ * Since this workload involves insertion and deletion of random keys we have a
+ * function to generate a random key of size key_size
+ */
+std::string generate_random_string(int key_size) {
+  std::string res = "";
+  for (int i = 0; i < key_size; ++i) {
+    char letter = char(std::rand() % 26 + 97);
+    res += letter;
+  }
+  return res;
+}
+
+// helper functions for rgw_workload specifically
+
+/**
+ * This function is writing a key to a specific bucket
+ * given a bucket, a set of keys that aready exist in that bucket
+ * the current size of that bucket, this function writes a randomly generated
+ * unique key to  that bucket The size of the bucket and the actual insertion
+ * into the keyset of the bucket is completed after the transaction to write the
+ * key is submitted this method returns a future of type result_t, with the
+ * number of operations as 1, and the time as the time it took for the write
+ * every time this method is called we will incremnet the number of operations
+ * by 1 and the latency by the time it took for this operation
+ */
+
+seastar::future<results_t>
+write_unique_key(crimson::os::FuturizedStore::Shard &shard_ref, coll_t coll_id,
+                 crimson::os::CollectionRef coll_ref, ghobject_t bucket,
+                 std::set<std::string> &existing_keys, int key_size,
+                 int value_size) {
+  std::string new_key = generate_random_string(key_size);
+  while (existing_keys.count(new_key) > 0) {
+    new_key = generate_random_string(key_size);
   }
+  bufferlist value;
+  value.append_zero(value_size);
+
+  std::map<std::string, bufferlist> data_entry;
+  data_entry[new_key] = value;
+
+  ceph::os::Transaction one_write;
+  one_write.omap_setkeys(coll_id, bucket, data_entry);
+
+  auto latency_start = ceph::mono_clock::now();
+  co_await shard_ref.do_transaction(coll_ref, std::move(one_write));
+  auto latency_end = ceph::mono_clock::now();
+  auto time_per_write = (latency_end - latency_start);
+  std::chrono::duration<double> time_per_write_sec = time_per_write;
+
+  existing_keys.insert(new_key);
+  co_return results_t{1, time_per_write_sec};
 }
 
-int main(int argc, char** argv)
-{
+/**
+ * This function is deleting a key from a specific bucket
+ * given a bucket, a set of keys that aready exist in that bucket
+ * the current size of that bucket, this function deletes a randomly selected
+ * key from that bucket The size of the bucket and the actual deletion from the
+ * keyset of the bucket is completed after the transaction to write the key is
+ * submitted this method returns a future of type result_t, with the number of
+ * operations as 1, and the time as the time it took for the write every time
+ * this method is called we will incremnet the number of operations by 1 and the
+ * latency by the time it took for this operation
+ */
+
+seastar::future<results_t>
+delete_random_key(crimson::os::FuturizedStore::Shard &shard_ref,
+                  coll_t &coll_id, crimson::os::CollectionRef coll_ref,
+                  ghobject_t &bucket, std::set<std::string> &existing_keys) {
+  int index = std::rand() % existing_keys.size();
+  auto it = existing_keys.begin();
+  std::advance(it, index);
+  std::string key_to_delete = *it;
+  existing_keys.erase(it);
+
+  ceph::os::Transaction one_delete;
+  one_delete.omap_rmkey(coll_id, bucket, key_to_delete);
+  auto latency_start = ceph::mono_clock::now();
+  co_await shard_ref.do_transaction(coll_ref, std::move(one_delete));
+  auto latency_end = ceph::mono_clock::now();
+  auto time_per_delete = (latency_end - latency_start);
+  std::chrono::duration<double> time_per_delete_sec = time_per_delete;
+  co_return results_t{1, time_per_delete_sec};
+}
+
+/**
+ * This function randomly adds and removes keys to a randomly chosen rgw bucket
+ * It returns throughput(number of operations/nano sec)
+ * Also returns average latency (time/operation)
+ * This function is split into 3 steps:
+ * (a)pre filling the buckets to steady state aka to a set required number of
+ * keys in each set of each bucket (b)Randomly choosing to write or remove a key
+ * based on the chosen bucets size,aka if its within acceptable range (c) doing
+ * N I/o's concurrently on 1 thread
+ */
+seastar::future<> rgw_index_workload(crimson::os::FuturizedStore &global_store,
+                                     int num_indices, int num_concurrent_io,
+                                     int duration, int key_size, int value_size,
+                                     int target_keys_per_bucket,
+                                     int tolerance_range,
+                                     int num_buckets_per_collection) {
+
+  LOG_PREFIX(rgw_index_workload);
+  auto &local_store = global_store.get_sharded_store();
+  std::map<int, coll_t> collection_id_for_rgw;
+  std::map<int, crimson::os::CollectionRef>
+      coll_ref_map_rgw; // map of bucket number and coll_ref
+  std::vector<std::set<std::string>> keys_per_bucket(
+      num_indices); // vector where each index is the bucket number and the
+                    // value at that index is a set of existing keys in that//
+                    // bucket
+
+  /**
+   * This method returns a future with pre filled buckets
+   * It creates num_indices number of buckets
+   * Each bucket is pre-filled to target_keys_per_bucket
+   * Each entry is a key value pair, with each key being unique of size key_size
+   * and each value with size value_size
+   */
+
+  auto pre_fill_buckets = [&]() -> seastar::future<> {
+    for (int i = 0; i < num_indices; ++i) {
+      auto bucket_i = create_hobj(i);
+      auto coll_id = make_cid(i, num_buckets_per_collection);
+      collection_id_for_rgw[i] = coll_id;
+      auto coll_ref = co_await local_store.create_new_collection(coll_id);
+      coll_ref_map_rgw[i] = coll_ref;
+
+      std::map<std::string, bufferlist> omap_for_this_bucket;
+      std::set<std::string> keys_in_this_bucket;
+
+      for (int j = 0; j < target_keys_per_bucket; ++j) {
+        std::string possible_key = generate_random_string(key_size);
+        while (keys_in_this_bucket.count(possible_key) > 0) {
+          possible_key = generate_random_string(key_size);
+        }
+        keys_in_this_bucket.insert(possible_key);
+        bufferlist val_for_poss_key;
+        val_for_poss_key.append_zero(value_size);
+        omap_for_this_bucket[possible_key] = val_for_poss_key;
+      }
+      keys_per_bucket[i] = keys_in_this_bucket;
+      ceph::os::Transaction txn_write_omap_for_bucket;
+      txn_write_omap_for_bucket.create(coll_id, bucket_i);
+      txn_write_omap_for_bucket.omap_setkeys(coll_id, bucket_i,
+                                             omap_for_this_bucket);
+      co_await local_store.do_transaction(coll_ref,
+                                          std::move(txn_write_omap_for_bucket));
+    }
+    co_return;
+  };
+
+  // size of each bucket, initially each bucket has size
+  // target_keys_per_bucket,because all buckets are pre filled
+  std::vector<int> size_per_bucket(num_indices, target_keys_per_bucket);
+
+  // min and max size is the range of allowable bucket size
+  int min_size =
+      std::floor(target_keys_per_bucket * (1 - tolerance_range / 100.0));
+  int max_size =
+      std::ceil(target_keys_per_bucket * (1 + tolerance_range / 100.0));
+
+  /**
+   * This method returns a future of type struct results_t
+   * In this function we choose a random bucket to write and remove keys from
+   * Based on the size of the bucket we chose, we either force write,force
+   * delete or randomly pick to write or delete a key
+   */
+  auto rgw_actual_test = [&]() -> seastar::future<results_t> {
+    int num_ops = 0;
+    std::chrono::duration<double> tot_latency =
+        std::chrono::duration<double>(0.0);
+    auto start = ceph::mono_clock::now();
+
+    while (ceph::mono_clock::now() - start <= std::chrono::seconds(duration)) {
+
+      int bucket_num_we_choose = std::rand() % num_indices;
+      auto bucket = create_hobj(bucket_num_we_choose);
+      auto coll_id = collection_id_for_rgw[bucket_num_we_choose];
+      auto coll_ref = coll_ref_map_rgw[bucket_num_we_choose];
+
+      int size_bucket_we_choose = size_per_bucket[bucket_num_we_choose];
+      auto &keys_in_that_bucket = keys_per_bucket[bucket_num_we_choose];
+
+      // this case happens when the size of the bucket is min size and we choose
+      // to delete
+      if (size_bucket_we_choose <= min_size) {
+        results_t result = co_await write_unique_key(
+            local_store, coll_id, coll_ref, bucket, keys_in_that_bucket,
+            key_size, value_size);
+        size_per_bucket[bucket_num_we_choose] += 1;
+        tot_latency += result.tot_latency_sec;
+        num_ops += result.num_operations;
+
+      } else if (size_bucket_we_choose >= max_size) {
+        results_t result = co_await delete_random_key(
+            local_store, coll_id, coll_ref, bucket, keys_in_that_bucket);
+        size_per_bucket[bucket_num_we_choose] -= 1;
+        tot_latency += result.tot_latency_sec;
+        num_ops += result.num_operations;
+      } else {
+        int choice = std::rand() % 2;
+        // choice 0 is write, choice 1 is delete
+        if (choice == 0) {
+          results_t result = co_await write_unique_key(
+              local_store, coll_id, coll_ref, bucket, keys_in_that_bucket,
+              key_size, value_size);
+          size_per_bucket[bucket_num_we_choose] += 1;
+          tot_latency += result.tot_latency_sec;
+          num_ops += result.num_operations;
+        } else {
+          results_t result = co_await delete_random_key(
+              local_store, coll_id, coll_ref, bucket, keys_in_that_bucket);
+          size_per_bucket[bucket_num_we_choose] -= 1;
+          tot_latency += result.tot_latency_sec;
+          num_ops += result.num_operations;
+        }
+      };
+    }
+    co_return results_t{num_ops, tot_latency, duration};
+  };
+
+  co_await pre_fill_buckets();
+  co_await run_concurrent_ios(duration, num_concurrent_io, rgw_actual_test);
+  co_return;
+};
+
+int main(int argc, char **argv) {
   LOG_PREFIX(main);
   po::options_description desc{"Allowed options"};
   bool debug = false;
   std::string store_type;
   std::string store_path;
-  std::string io_pattern;
-
-  desc.add_options()
-    ("help,h", "show help message")
-    ("store-type",
-     po::value<std::string>(&store_type)->default_value("seastore"),
-     "set store type")
-    /* store-path is a path to a directory containing a file 'block'
-     * block should be a symlink to a real device for actual performance
-     * testing, but may be a file for testing this utility.
-     * See build/dev/osd* after starting a vstart cluster for an example
-     * of what that looks like.
-     */
-    ("store-path", po::value<std::string>(&store_path),
-     "path to store, <store-path>/block should "
-     "be a symlink to the target device for bluestore or seastore")
-    ("debug", po::value<bool>(&debug)->default_value(false),
-     "enable debugging");
+
+  desc.add_options()("help,h", "show help message")(
+      "store-type",
+      po::value<std::string>(&store_type)->default_value("seastore"),
+      "set store type")
+      /* store-path is a path to a directory containing a file 'block'
+       * block should be a symlink to a real device for actual performance
+       * testing, but may be a file for testing this utility.
+       * See build/dev/osd* after starting a vstart cluster for an example
+       * of what that looks like.
+       */
+      ("store-path", po::value<std::string>(&store_path),
+       "path to store, <store-path>/block should "
+       "be a symlink to the target device for bluestore or seastore")(
+          "debug", po::value<bool>(&debug)->default_value(false),
+          "enable debugging");
 
   po::variables_map vm;
   std::vector<std::string> unrecognized_options;
   try {
     auto parsed = po::command_line_parser(argc, argv)
-      .options(desc)
-      .allow_unregistered()
-      .run();
+                      .options(desc)
+                      .allow_unregistered()
+                      .run();
     po::store(parsed, vm);
     if (vm.count("help")) {
       std::cout << desc << std::endl;
@@ -209,8 +526,8 @@ int main(int argc, char** argv)
 
     po::notify(vm);
     unrecognized_options =
-      po::collect_unrecognized(parsed.options, po::include_positional);
-  } catch(const po::error& e) {
+        po::collect_unrecognized(parsed.options, po::include_positional);
+  } catch (const po::error &e) {
     std::cerr << "error: " << e.what() << std::endl;
     return 1;
   }
@@ -220,121 +537,168 @@ int main(int argc, char** argv)
   app_cfg.auto_handle_sigint_sigterm = false;
   seastar::app_template app(std::move(app_cfg));
 
-  std::vector<char*> av{argv[0]};
-  std::transform(
-    std::begin(unrecognized_options),
-    std::end(unrecognized_options),
-    std::back_inserter(av),
-    [](auto& s) {
-      return const_cast<char*>(s.c_str());
-    });
+  std::string work_load_type = "pg_log";
+  int num_logs = 0;
+  int log_length = 0;
+  int log_size = 0;
+  int num_concurrent_io = 0;
+  int duration = 0;
+  int key_size = 0;
+  int value_size = 0;
+  int num_indices = 0;
+  int target_keys_per_bucket = 0;
+  int tolerance_range = 0;
+  int num_buckets_per_collection = 0;
+
+  std::vector<char *> av{argv[0]};
+  std::transform(std::begin(unrecognized_options),
+                 std::end(unrecognized_options), std::back_inserter(av),
+                 [](auto &s) { return const_cast<char *>(s.c_str()); });
+  app.add_options()(
+      "work_load_type",
+      po::value<std::string>(&work_load_type)->default_value("pg_log"),
+      "work load type: pg_log or rgw_index")
+
+      ("num_logs", po::value<int>(&num_logs),
+       "how many different logs we create; aka, we create a log for every "
+       "object,so how many objects we create")
+
+      ("log_length", po::value<int>(&log_length),
+        "number of entries per log")
+
+      ("log_size", po::value<int>(&log_size), "size of each log entry")
+
+      ("num_concurrent_io", po::value<int>(&num_concurrent_io),
+        "number of IOs happening simultaneously")
+
+      ("duration", po::value<int>(&duration),
+        "how long in seconds the actual testing loop runs "
+        "for")
+      // for rgw
+      ("num_indices", po::value<int>(&num_indices),
+       "number of RGW indices/buckets")
+
+      ("key_size", po::value<int>(&key_size), "size of keys in bytes")
+
+      ("value_size", po::value<int>(&value_size),
+        "size of values in bytes")
+
+      ("target_keys_per_bucket",
+        po::value<int>(&target_keys_per_bucket),
+        "target number of keys per bucket")
+
+      ("tolerance_range", po::value<int>(&tolerance_range),
+        "tolerance range percentage")
+
+      ("num_buckets_per_collection",
+        po::value<int>(&num_buckets_per_collection),
+        "the number of objects in each collection ");
+
   return app.run(
-    av.size(), av.data(),
-    /* crimson-osd uses seastar as its scheduler.  We use
-     * sesastar::app_template::run to start the base task for the
-     * application -- this lambda.  The -> seastar::future<int> here
-     * explicitely states the return type of the lambda, a future
-     * which resolves to an int.  We need to do this because the
-     * co_return at the end is insufficient to express the type.
-     *
-     * The lambda internally uses co_await/co_return and is therefore
-     * a coroutine.  co_await <future> suspends execution until <future>
-     * resolves.  The whole co_await expression then evaluates to the
-     * contents of the future -- int for seastar::future<int>.
-     *
-     * What's a bit confusing is that a coroutine generally *returns*
-     * at the first suspension point yielding it's return type, a
-     * seastar::future<int> in this case.  This is tricky for
-     * lambda-coroutines because it means that the lambda could go out
-     * of scope before the coroutine actually completes, resulting in
-     * captured variables (references to everything in the parent frame
-     * in this case -- [&]) being free'd.  Resuming the coroutine would
-     * then hit a use-after-free as soon as it tries to access any
-     * of those variables.  seastar::coroutine::lambda avoids this.
-     * I suggest having a look at src/seastar/include/seastar/core/coroutine.hh
-     * for the implementation.
-     * Note, the language guarrantees that *arguments* (whether to
-     * a lambda or not) have their lifetimes extended for the duration
-     * of the coroutine, so this isn't a problem for non-lambda
-     * coroutines.
-     */
-    seastar::coroutine::lambda([&]() -> seastar::future<int> {
-    if (debug) {
-      seastar::global_logger_registry().set_all_loggers_level(
-        seastar::log_level::debug
-      );
-    } else {
-      seastar::global_logger_registry().set_all_loggers_level(
-        seastar::log_level::error
-      );
-    }
+      av.size(), av.data(),
+      /* crimson-osd uses seastar as its scheduler.  We use
+       * sesastar::app_template::run to start the base task for the
+       * application -- this lambda.  The -> seastar::future<int> here
+       * explicitely states the return type of the lambda, a future
+       * which resolves to an int.  We need to do this because the
+       * co_return at the end is insufficient to express the type.
+       *
+       * The lambda internally uses co_await/co_return and is therefore
+       * a coroutine.  co_await <future> suspends execution until <future>
+       * resolves.  The whole co_await expression then evaluates to the
+       * contents of the future -- int for seastar::future<int>.
+       *
+       * What's a bit confusing is that a coroutine generally *returns*
+       * at the first suspension point yielding it's return type, a
+       * seastar::future<int> in this case.  This is tricky for
+       * lambda-coroutines because it means that the lambda could go out
+       * of scope before the coroutine actually completes, resulting in
+       * captured variables (references to everything in the parent frame
+       * in this case -- [&]) being free'd.  Resuming the coroutine would
+       * then hit a use-after-free as soon as it tries to access any
+       * of those variables.  seastar::coroutine::lambda avoids this.
+       * I suggest having a look at
+       * src/seastar/include/seastar/core/coroutine.hh for the implementation.
+       * Note, the language guarrantees that *arguments* (whether to
+       * a lambda or not) have their lifetimes extended for the duration
+       * of the coroutine, so this isn't a problem for non-lambda
+       * coroutines.
+       */
+      seastar::coroutine::lambda([&]() -> seastar::future<int> {
+        if (debug) {
+          seastar::global_logger_registry().set_all_loggers_level(
+              seastar::log_level::debug);
+        } else {
+          seastar::global_logger_registry().set_all_loggers_level(
+              seastar::log_level::error);
+        }
 
-    co_await crimson::common::sharded_conf().start(
-      EntityName{}, std::string_view{"ceph"});
-    co_await crimson::common::local_conf().start();
-
-    {
-      std::vector<const char*> cav;
-      std::transform(
-        std::begin(unrecognized_options),
-        std::end(unrecognized_options),
-        std::back_inserter(cav),
-        [](auto& s) {
-          return s.c_str();
-        });
-      co_await crimson::common::local_conf().parse_argv(
-        cav);
-    }
+        co_await crimson::common::sharded_conf().start(
+            EntityName{}, std::string_view{"ceph"});
+        co_await crimson::common::local_conf().start();
 
-    auto store = crimson::os::FuturizedStore::create(
-      store_type,
-      store_path,
-      crimson::common::local_conf().get_config_values()
-    );
-
-    uuid_d uuid;
-    uuid.generate_random();
-
-    co_await store->start();
-    /* FuturizedStore interfaces use errorated-futures rather than bare
-     * seastar futures in order to encode possible errors in the type.
-     * However, this utility doesn't really need to do anything clever
-     * with a failure to execute mkfs other than tell the user what
-     * happened, so we simply respond uniformly to all error cases
-     * using the handle_error handler.  See FuturizedStore::mkfs for
-     * the actual return type and crimson/common/errorator.h for the
-     * implementation of errorators.
-     */
-    co_await store->mkfs(uuid).handle_error(
-      crimson::stateful_ec::assert_failure(
-        std::format(
-          "error creating empty object store type {} in {}",
-          store_type,
-          store_path).c_str()));
-    co_await store->stop();
-
-    co_await store->start();
-    co_await store->mount().handle_error(
-      crimson::stateful_ec::assert_failure(
-        std::format(
-          "error mounting object store type {} in {}",
-          store_type,
-          store_path).c_str()));
-
-    for (unsigned i = 0; i < seastar::smp::count; ++i) {
-      co_await seastar::smp::submit_to(
-        i,
-        seastar::coroutine::lambda(
-          [FNAME, &store_ref=*store]() -> seastar::future<> {
-            ERROR("running example_io on reactor {}", seastar::this_shard_id());
-            co_await example_io(store_ref);
-          })
-      );
-    }
+        {
+          std::vector<const char *> cav;
+          std::transform(
+              std::begin(unrecognized_options), std::end(unrecognized_options),
+              std::back_inserter(cav), [](auto &s) { return s.c_str(); });
+          co_await crimson::common::local_conf().parse_argv(cav);
+        }
+
+        auto store = crimson::os::FuturizedStore::create(
+            store_type, store_path,
+            crimson::common::local_conf().get_config_values());
+
+        uuid_d uuid;
+        uuid.generate_random();
+
+        co_await store->start();
+        /* FuturizedStore interfaces use errorated-futures rather than bare
+         * seastar futures in order to encode possible errors in the type.
+         * However, this utility doesn't really need to do anything clever
+         * with a failure to execute mkfs other than tell the user what
+         * happened, so we simply respond uniformly to all error cases
+         * using the handle_error handler.  See FuturizedStore::mkfs for
+         * the actual return type and crimson/common/errorator.h for the
+         * implementation of errorators.
+         */
+        co_await store->mkfs(uuid).handle_error(
+            crimson::stateful_ec::assert_failure(
+                std::format("error creating empty object store type {} in {}",
+                            store_type, store_path)
+                    .c_str()));
+        co_await store->stop();
+
+        co_await store->start();
+        co_await store->mount().handle_error(
+            crimson::stateful_ec::assert_failure(
+                std::format("error mounting object store type {} in {}",
+                            store_type, store_path)
+                    .c_str()));
+        std::vector<seastar::future<>> per_shard_futures;
+        for (unsigned i = 0; i < seastar::smp::count; ++i) {
+          auto named_lambda = [=, &store_ref = *store]() -> seastar::future<> {
+            DEBUG("running example_io on reactor {}", seastar::this_shard_id());
+            if (work_load_type == "pg_log") {
+              co_await pg_log_workload(store_ref, num_logs, num_concurrent_io,
+                                       duration, log_size, log_length);
+            } else if (work_load_type == "rgw_index") {
+              co_await rgw_index_workload(
+                  store_ref, num_indices, num_concurrent_io, duration, key_size,
+                  value_size, target_keys_per_bucket, tolerance_range,
+                  num_buckets_per_collection);
+            }
+            co_return;
+          };
+          per_shard_futures.push_back(
+              seastar::smp::submit_to(i, std::move(named_lambda)));
+        }
 
-    co_await store->umount();
-    co_await store->stop();
-    co_await crimson::common::sharded_conf().stop();
-    co_return 0;
-  }));
+        co_await seastar::when_all(per_shard_futures.begin(),
+                                   per_shard_futures.end());
+        co_await store->umount();
+        co_await store->stop();
+        co_await crimson::common::sharded_conf().stop();
+        co_return 0;
+      }));
 }