From 1b1b236072f6874733f9902aa1fad8e63553f37e Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Thu, 14 Aug 2025 23:18:42 +0000 Subject: [PATCH] crimson/.../store-bench: add random write workload Signed-off-by: Samuel Just --- src/crimson/tools/store_bench/store-bench.cc | 194 +++++++++++++++++++ 1 file changed, 194 insertions(+) diff --git a/src/crimson/tools/store_bench/store-bench.cc b/src/crimson/tools/store_bench/store-bench.cc index 879893afed9..9e062d0bd03 100644 --- a/src/crimson/tools/store_bench/store-bench.cc +++ b/src/crimson/tools/store_bench/store-bench.cc @@ -26,6 +26,7 @@ #include #include +#include #include #include @@ -572,6 +573,198 @@ seastar::future RGWIndexWorkload::run( common.get_duration(), common.num_concurrent_io, rgw_actual_test); }; + +/** + * RandomWriteWorkload + * + * Performs a simple random write workload. + */ +class RandomWriteWorkload : public StoreBenchWorkload { + uint64_t prefill_size = 128<<10; + uint64_t io_size = 4<<10; + uint64_t size_per_shard = 64<<20; + uint64_t size_per_obj = 4<<20; + uint64_t colls_per_shard = 16; + uint64_t io_concurrency_per_shard = 16; + uint64_t get_obj_per_shard() const { + return size_per_shard / size_per_obj; + } + uint64_t get_obj_per_coll() const { + return get_obj_per_shard() / colls_per_shard; + } +public: + po::options_description get_options() final { + po::options_description ret{"RandomWriteWorkload"}; + ret.add_options() + ("prefill-size", po::value(&prefill_size), + "IO size to use when prefilling objets") + ("io-size", po::value(&io_size), + "IO size") + ("size-per-shard", po::value(&size_per_shard), + "Total size per shard") + ("size-per-obj", po::value(&size_per_obj), + "Object Size") + ("colls-per-shard", po::value(&colls_per_shard), + "Collections per shard") + ("io-concurrency-per-shard", po::value(&io_concurrency_per_shard), + "IO Concurrency Per Shard") + ; + return ret; + } + seastar::future run( + const common_options_t &common, + crimson::os::FuturizedStore &global_store) final; + ~RandomWriteWorkload() final {} +}; + +seastar::future generate_random_bp(uint64_t size) +{ + bufferptr bp(ceph::buffer::create_page_aligned(size)); + auto f = co_await seastar::open_file_dma( + "/dev/urandom", seastar::open_flags::ro); + static constexpr uint64_t STRIDE = 256<<10; + for (uint64_t off = 0; off < size; off += STRIDE) { + co_await f.dma_read(off, bp.c_str() + off, STRIDE); + } + co_return bp; +} + + +seastar::future RandomWriteWorkload::run( + const common_options_t &common, + crimson::os::FuturizedStore &global_store) +{ + LOG_PREFIX(random_write); + auto &local_store = global_store.get_sharded_store(); + + auto random_buffer = co_await generate_random_bp(16<<20); + auto get_random_buffer = [&random_buffer](uint64_t size) { + assert((size % CEPH_PAGE_SIZE) == 0); + bufferptr bp( + random_buffer, + std::experimental::randint( + 0, + (random_buffer.length() - size) / CEPH_PAGE_SIZE) * + CEPH_PAGE_SIZE, + size); + assert(bp.is_page_aligned()); + bufferlist bl; + bl.append(bp); + return bl; + }; + + auto create_hobj = [](uint64_t obj_id) { + return ghobject_t( + shard_id_t::NO_SHARD, + 0, // pool id + obj_id, // hash, normally rjenkins of name, but let's just set it to id + "", // namespace, empty here + "", // name, empty here + 0, // snapshot + ghobject_t::NO_GEN); + }; + + std::vector< + std::pair + > coll_refs; + for (uint64_t collidx = 0; collidx < colls_per_shard; ++collidx) { + coll_t cid( + spg_t(pg_t(0, (seastar::this_shard_id() * colls_per_shard) + collidx)) + ); + auto ref = co_await local_store.create_new_collection( + cid); + coll_refs.emplace_back(std::make_pair(cid, std::move(ref))); + } + auto get_coll_id = [&](uint64_t obj_id) { + return coll_refs[obj_id % get_obj_per_coll()].first; + }; + auto get_coll_ref = [&](uint64_t obj_id) { + return coll_refs[obj_id % get_obj_per_coll()].second; + }; + + unsigned running = 0; + std::optional> complete; + + static constexpr unsigned io_concurrency_per_shard = 16; + seastar::semaphore sem{io_concurrency_per_shard}; + results_t results; + auto submit_transaction = [&]( + crimson::os::CollectionRef &col_ref, + ceph::os::Transaction &&t) -> seastar::future<> { + ++running; + co_await sem.wait(1); + std::ignore = local_store.do_transaction( + col_ref, + std::move(t) + ).finally([&, start = ceph::mono_clock::now()] { + --running; + if (running == 0 && complete) { + complete->set_value(); + } + sem.signal(1); + results.ios_completed++; + results.total_latency += ceph::mono_clock::now() - start; + }); + }; + + for (uint64_t obj_id = 0; obj_id < get_obj_per_shard(); ++obj_id) { + auto hobj = create_hobj(obj_id); + auto coll_id = get_coll_id(obj_id); + auto coll_ref = get_coll_ref(obj_id); + + { + ceph::os::Transaction t; + t.create(coll_id, hobj); + co_await submit_transaction(coll_ref, std::move(t)); + } + for (uint64_t off = 0; off < size_per_obj; off += prefill_size) { + ceph::os::Transaction t; + t.write(coll_id, hobj, off, prefill_size, get_random_buffer(prefill_size)); + co_await submit_transaction(coll_ref, std::move(t)); + } + INFO("wrote obj {} of {}", obj_id, get_obj_per_shard()); + } + + INFO("finished populating"); + + auto start = ceph::mono_clock::now(); + uint64_t writes_started = 0; + while (ceph::mono_clock::now() - start < common.get_duration()) { + auto obj_id = std::experimental::randint(0, get_obj_per_shard() - 1); + auto hobj = create_hobj(obj_id); + auto coll_id = get_coll_id(obj_id); + auto coll_ref = get_coll_ref(obj_id); + + auto offset = std::experimental::randint( + 0, + (size_per_obj / io_size) - 1) * io_size; + + ceph::os::Transaction t; + t.write( + coll_id, + hobj, + offset, + io_size, + get_random_buffer(io_size)); + co_await submit_transaction(coll_ref, std::move(t)); + ++writes_started; + } + + INFO("writes_started {}", writes_started); + for (auto &[id, ref]: coll_refs) { + INFO("flushing {}", id); + co_await local_store.flush(ref); + } + + if (running > 0) { + complete = seastar::promise<>(); + co_await complete->get_future(); + } + + results.duration = ceph::mono_clock::now() - start; + co_return results; +} + int main(int argc, char **argv) { LOG_PREFIX(main); po::options_description desc{"Allowed options"}; @@ -607,6 +800,7 @@ int main(int argc, char **argv) { workloads.emplace("pg_log", std::make_unique()); workloads.emplace("rgw_index", std::make_unique()); + workloads.emplace("random_write", std::make_unique()); desc.add(common_options.get_options()); for (auto &[name, workload] : workloads) { -- 2.47.3