]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/.../store-bench: add random write workload
authorSamuel Just <sjust@redhat.com>
Thu, 14 Aug 2025 23:18:42 +0000 (23:18 +0000)
committerSamuel Just <sjust@redhat.com>
Mon, 25 Aug 2025 15:36:17 +0000 (15:36 +0000)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/tools/store_bench/store-bench.cc

index 879893afed90d34f02f12a9b7a8f6a93374edf08..9e062d0bd0335872597f075fe6e02063f821ea8d 100644 (file)
@@ -26,6 +26,7 @@
 
 #include <random>
 #include <vector>
+#include <experimental/random>
 
 #include <boost/program_options/parsers.hpp>
 #include <boost/program_options/variables_map.hpp>
@@ -572,6 +573,198 @@ seastar::future<results_t> RGWIndexWorkload::run(
     common.get_duration(), common.num_concurrent_io, rgw_actual_test);
 };
 
+
+/**
+ * RandomWriteWorkload 
+ *
+ * Performs a simple random write workload.
+ */
+class RandomWriteWorkload : public StoreBenchWorkload {
+  uint64_t prefill_size = 128<<10;
+  uint64_t io_size = 4<<10;
+  uint64_t size_per_shard = 64<<20;
+  uint64_t size_per_obj = 4<<20;
+  uint64_t colls_per_shard = 16;
+  uint64_t io_concurrency_per_shard = 16;
+  uint64_t get_obj_per_shard() const {
+    return size_per_shard / size_per_obj;
+  }
+  uint64_t get_obj_per_coll() const {
+    return get_obj_per_shard() / colls_per_shard;
+  }
+public:
+  po::options_description get_options() final {
+    po::options_description ret{"RandomWriteWorkload"};
+    ret.add_options()
+      ("prefill-size", po::value<uint64_t>(&prefill_size),
+       "IO size to use when prefilling objets")
+      ("io-size", po::value<uint64_t>(&io_size),
+       "IO size")
+      ("size-per-shard", po::value<uint64_t>(&size_per_shard),
+       "Total size per shard")
+      ("size-per-obj", po::value<uint64_t>(&size_per_obj),
+       "Object Size")
+      ("colls-per-shard", po::value<uint64_t>(&colls_per_shard),
+       "Collections per shard")
+      ("io-concurrency-per-shard", po::value<uint64_t>(&io_concurrency_per_shard),
+       "IO Concurrency Per Shard")
+      ;
+    return ret;
+  }
+  seastar::future<results_t> run(
+    const common_options_t &common,
+    crimson::os::FuturizedStore &global_store) final;
+  ~RandomWriteWorkload() final {}
+};
+
+seastar::future<bufferptr> generate_random_bp(uint64_t size)
+{
+  bufferptr bp(ceph::buffer::create_page_aligned(size));
+  auto f = co_await seastar::open_file_dma(
+    "/dev/urandom", seastar::open_flags::ro);
+  static constexpr uint64_t STRIDE = 256<<10;
+  for (uint64_t off = 0; off < size; off += STRIDE) {
+    co_await f.dma_read(off, bp.c_str() + off, STRIDE);
+  }
+  co_return bp;
+}
+
+
+seastar::future<results_t> RandomWriteWorkload::run(
+  const common_options_t &common,
+  crimson::os::FuturizedStore &global_store)
+{
+  LOG_PREFIX(random_write);
+  auto &local_store = global_store.get_sharded_store();
+
+  auto random_buffer = co_await generate_random_bp(16<<20);
+  auto get_random_buffer = [&random_buffer](uint64_t size) {
+    assert((size % CEPH_PAGE_SIZE) == 0);
+    bufferptr bp(
+      random_buffer,
+      std::experimental::randint<uint64_t>(
+        0,
+        (random_buffer.length() - size) / CEPH_PAGE_SIZE) *
+      CEPH_PAGE_SIZE,
+      size);
+    assert(bp.is_page_aligned());
+    bufferlist bl;
+    bl.append(bp);
+    return bl;
+  };
+
+  auto create_hobj = [](uint64_t obj_id) {
+    return ghobject_t(
+      shard_id_t::NO_SHARD,
+      0,     // pool id
+      obj_id, // hash, normally rjenkins of name, but let's just set it to id
+      "",    // namespace, empty here
+      "",    // name, empty here
+      0,     // snapshot
+      ghobject_t::NO_GEN);
+  };
+
+  std::vector<
+    std::pair<coll_t, crimson::os::CollectionRef>
+    > coll_refs;
+  for (uint64_t collidx = 0; collidx < colls_per_shard; ++collidx) {
+   coll_t cid(
+     spg_t(pg_t(0, (seastar::this_shard_id() * colls_per_shard) + collidx))
+   );
+   auto ref = co_await local_store.create_new_collection(
+     cid);
+   coll_refs.emplace_back(std::make_pair(cid, std::move(ref)));
+  }
+  auto get_coll_id = [&](uint64_t obj_id) {
+    return coll_refs[obj_id % get_obj_per_coll()].first;
+  };
+  auto get_coll_ref = [&](uint64_t obj_id) {
+    return coll_refs[obj_id % get_obj_per_coll()].second;
+  };
+
+  unsigned running = 0;
+  std::optional<seastar::promise<>> complete;
+
+  static constexpr unsigned io_concurrency_per_shard = 16;
+  seastar::semaphore sem{io_concurrency_per_shard};
+  results_t results;
+  auto submit_transaction = [&](
+    crimson::os::CollectionRef &col_ref,
+    ceph::os::Transaction &&t) -> seastar::future<> {
+    ++running;
+    co_await sem.wait(1);
+    std::ignore = local_store.do_transaction(
+      col_ref,
+      std::move(t)
+    ).finally([&, start = ceph::mono_clock::now()] {
+      --running;
+      if (running == 0 && complete) {
+        complete->set_value();
+      }
+      sem.signal(1);
+      results.ios_completed++;
+      results.total_latency += ceph::mono_clock::now() - start;
+    });
+  };
+
+  for (uint64_t obj_id = 0; obj_id < get_obj_per_shard(); ++obj_id) {
+    auto hobj = create_hobj(obj_id);
+    auto coll_id = get_coll_id(obj_id);
+    auto coll_ref = get_coll_ref(obj_id);
+    
+    {
+      ceph::os::Transaction t;
+      t.create(coll_id, hobj);
+      co_await submit_transaction(coll_ref, std::move(t));
+    }
+    for (uint64_t off = 0; off < size_per_obj; off += prefill_size) {
+      ceph::os::Transaction t;
+      t.write(coll_id, hobj, off, prefill_size, get_random_buffer(prefill_size));
+      co_await submit_transaction(coll_ref, std::move(t));
+    }
+    INFO("wrote obj {} of {}", obj_id, get_obj_per_shard());
+  }
+
+  INFO("finished populating");
+
+  auto start = ceph::mono_clock::now();
+  uint64_t writes_started = 0;
+  while (ceph::mono_clock::now() - start < common.get_duration()) {
+    auto obj_id = std::experimental::randint<uint64_t>(0, get_obj_per_shard() - 1);
+    auto hobj = create_hobj(obj_id);
+    auto coll_id = get_coll_id(obj_id);
+    auto coll_ref = get_coll_ref(obj_id);
+
+    auto offset = std::experimental::randint<uint64_t>(
+      0,
+      (size_per_obj / io_size) - 1) * io_size;
+    
+    ceph::os::Transaction t;
+    t.write(
+      coll_id,
+      hobj,
+      offset,
+      io_size,
+      get_random_buffer(io_size));
+    co_await submit_transaction(coll_ref, std::move(t));
+    ++writes_started;
+  }
+
+  INFO("writes_started {}", writes_started);
+  for (auto &[id, ref]: coll_refs) {
+    INFO("flushing {}", id);
+    co_await local_store.flush(ref);
+  }
+
+  if (running > 0) {
+    complete = seastar::promise<>();
+    co_await complete->get_future();
+  }
+
+  results.duration = ceph::mono_clock::now() - start;
+  co_return results;
+}
+
 int main(int argc, char **argv) {
   LOG_PREFIX(main);
   po::options_description desc{"Allowed options"};
@@ -607,6 +800,7 @@ int main(int argc, char **argv) {
     
   workloads.emplace("pg_log", std::make_unique<PGLogWorkload>());
   workloads.emplace("rgw_index", std::make_unique<RGWIndexWorkload>());
+  workloads.emplace("random_write", std::make_unique<RandomWriteWorkload>());
   
   desc.add(common_options.get_options());
   for (auto &[name, workload] : workloads) {