#include <random>
#include <vector>
+#include <experimental/random>
#include <boost/program_options/parsers.hpp>
#include <boost/program_options/variables_map.hpp>
common.get_duration(), common.num_concurrent_io, rgw_actual_test);
};
+
+/**
+ * RandomWriteWorkload
+ *
+ * Performs a simple random write workload.
+ */
+class RandomWriteWorkload : public StoreBenchWorkload {
+ uint64_t prefill_size = 128<<10;
+ uint64_t io_size = 4<<10;
+ uint64_t size_per_shard = 64<<20;
+ uint64_t size_per_obj = 4<<20;
+ uint64_t colls_per_shard = 16;
+ uint64_t io_concurrency_per_shard = 16;
+ uint64_t get_obj_per_shard() const {
+ return size_per_shard / size_per_obj;
+ }
+ uint64_t get_obj_per_coll() const {
+ return get_obj_per_shard() / colls_per_shard;
+ }
+public:
+ po::options_description get_options() final {
+ po::options_description ret{"RandomWriteWorkload"};
+ ret.add_options()
+ ("prefill-size", po::value<uint64_t>(&prefill_size),
+ "IO size to use when prefilling objets")
+ ("io-size", po::value<uint64_t>(&io_size),
+ "IO size")
+ ("size-per-shard", po::value<uint64_t>(&size_per_shard),
+ "Total size per shard")
+ ("size-per-obj", po::value<uint64_t>(&size_per_obj),
+ "Object Size")
+ ("colls-per-shard", po::value<uint64_t>(&colls_per_shard),
+ "Collections per shard")
+ ("io-concurrency-per-shard", po::value<uint64_t>(&io_concurrency_per_shard),
+ "IO Concurrency Per Shard")
+ ;
+ return ret;
+ }
+ seastar::future<results_t> run(
+ const common_options_t &common,
+ crimson::os::FuturizedStore &global_store) final;
+ ~RandomWriteWorkload() final {}
+};
+
+seastar::future<bufferptr> generate_random_bp(uint64_t size)
+{
+ bufferptr bp(ceph::buffer::create_page_aligned(size));
+ auto f = co_await seastar::open_file_dma(
+ "/dev/urandom", seastar::open_flags::ro);
+ static constexpr uint64_t STRIDE = 256<<10;
+ for (uint64_t off = 0; off < size; off += STRIDE) {
+ co_await f.dma_read(off, bp.c_str() + off, STRIDE);
+ }
+ co_return bp;
+}
+
+
+seastar::future<results_t> RandomWriteWorkload::run(
+ const common_options_t &common,
+ crimson::os::FuturizedStore &global_store)
+{
+ LOG_PREFIX(random_write);
+ auto &local_store = global_store.get_sharded_store();
+
+ auto random_buffer = co_await generate_random_bp(16<<20);
+ auto get_random_buffer = [&random_buffer](uint64_t size) {
+ assert((size % CEPH_PAGE_SIZE) == 0);
+ bufferptr bp(
+ random_buffer,
+ std::experimental::randint<uint64_t>(
+ 0,
+ (random_buffer.length() - size) / CEPH_PAGE_SIZE) *
+ CEPH_PAGE_SIZE,
+ size);
+ assert(bp.is_page_aligned());
+ bufferlist bl;
+ bl.append(bp);
+ return bl;
+ };
+
+ auto create_hobj = [](uint64_t obj_id) {
+ return ghobject_t(
+ shard_id_t::NO_SHARD,
+ 0, // pool id
+ obj_id, // hash, normally rjenkins of name, but let's just set it to id
+ "", // namespace, empty here
+ "", // name, empty here
+ 0, // snapshot
+ ghobject_t::NO_GEN);
+ };
+
+ std::vector<
+ std::pair<coll_t, crimson::os::CollectionRef>
+ > coll_refs;
+ for (uint64_t collidx = 0; collidx < colls_per_shard; ++collidx) {
+ coll_t cid(
+ spg_t(pg_t(0, (seastar::this_shard_id() * colls_per_shard) + collidx))
+ );
+ auto ref = co_await local_store.create_new_collection(
+ cid);
+ coll_refs.emplace_back(std::make_pair(cid, std::move(ref)));
+ }
+ auto get_coll_id = [&](uint64_t obj_id) {
+ return coll_refs[obj_id % get_obj_per_coll()].first;
+ };
+ auto get_coll_ref = [&](uint64_t obj_id) {
+ return coll_refs[obj_id % get_obj_per_coll()].second;
+ };
+
+ unsigned running = 0;
+ std::optional<seastar::promise<>> complete;
+
+ static constexpr unsigned io_concurrency_per_shard = 16;
+ seastar::semaphore sem{io_concurrency_per_shard};
+ results_t results;
+ auto submit_transaction = [&](
+ crimson::os::CollectionRef &col_ref,
+ ceph::os::Transaction &&t) -> seastar::future<> {
+ ++running;
+ co_await sem.wait(1);
+ std::ignore = local_store.do_transaction(
+ col_ref,
+ std::move(t)
+ ).finally([&, start = ceph::mono_clock::now()] {
+ --running;
+ if (running == 0 && complete) {
+ complete->set_value();
+ }
+ sem.signal(1);
+ results.ios_completed++;
+ results.total_latency += ceph::mono_clock::now() - start;
+ });
+ };
+
+ for (uint64_t obj_id = 0; obj_id < get_obj_per_shard(); ++obj_id) {
+ auto hobj = create_hobj(obj_id);
+ auto coll_id = get_coll_id(obj_id);
+ auto coll_ref = get_coll_ref(obj_id);
+
+ {
+ ceph::os::Transaction t;
+ t.create(coll_id, hobj);
+ co_await submit_transaction(coll_ref, std::move(t));
+ }
+ for (uint64_t off = 0; off < size_per_obj; off += prefill_size) {
+ ceph::os::Transaction t;
+ t.write(coll_id, hobj, off, prefill_size, get_random_buffer(prefill_size));
+ co_await submit_transaction(coll_ref, std::move(t));
+ }
+ INFO("wrote obj {} of {}", obj_id, get_obj_per_shard());
+ }
+
+ INFO("finished populating");
+
+ auto start = ceph::mono_clock::now();
+ uint64_t writes_started = 0;
+ while (ceph::mono_clock::now() - start < common.get_duration()) {
+ auto obj_id = std::experimental::randint<uint64_t>(0, get_obj_per_shard() - 1);
+ auto hobj = create_hobj(obj_id);
+ auto coll_id = get_coll_id(obj_id);
+ auto coll_ref = get_coll_ref(obj_id);
+
+ auto offset = std::experimental::randint<uint64_t>(
+ 0,
+ (size_per_obj / io_size) - 1) * io_size;
+
+ ceph::os::Transaction t;
+ t.write(
+ coll_id,
+ hobj,
+ offset,
+ io_size,
+ get_random_buffer(io_size));
+ co_await submit_transaction(coll_ref, std::move(t));
+ ++writes_started;
+ }
+
+ INFO("writes_started {}", writes_started);
+ for (auto &[id, ref]: coll_refs) {
+ INFO("flushing {}", id);
+ co_await local_store.flush(ref);
+ }
+
+ if (running > 0) {
+ complete = seastar::promise<>();
+ co_await complete->get_future();
+ }
+
+ results.duration = ceph::mono_clock::now() - start;
+ co_return results;
+}
+
int main(int argc, char **argv) {
LOG_PREFIX(main);
po::options_description desc{"Allowed options"};
workloads.emplace("pg_log", std::make_unique<PGLogWorkload>());
workloads.emplace("rgw_index", std::make_unique<RGWIndexWorkload>());
+ workloads.emplace("random_write", std::make_unique<RandomWriteWorkload>());
desc.add(common_options.get_options());
for (auto &[name, workload] : workloads) {