#include "crimson/common/config_proxy.h"
#include "crimson/common/coroutine.h"
#include "crimson/common/log.h"
+#include "crimson/common/metrics_helpers.h"
#include "crimson/os/futurized_collection.h"
#include "crimson/os/futurized_store.h"
}
void dump(ceph::Formatter *f) const {
- f->open_object_section("results_t");
f->dump_int("ios_completed", ios_completed);
f->dump_float(
"total_latency_s",
f->dump_float(
"total_duration_s",
duration.count());
- f->close_section();
}
};
class StoreBenchWorkload {
public:
virtual po::options_description get_options() = 0;
- virtual seastar::future<> run(
+ virtual seastar::future<results_t> run(
const common_options_t &common,
crimson::os::FuturizedStore &global_store) = 0;
virtual ~StoreBenchWorkload() {}
;
return ret;
}
- seastar::future<> run(
+ seastar::future<results_t> run(
const common_options_t &common,
crimson::os::FuturizedStore &global_store) final;
~PGLogWorkload() final {}
;
return ret;
}
- seastar::future<> run(
+ seastar::future<results_t> run(
const common_options_t &common,
crimson::os::FuturizedStore &global_store) final;
~RGWIndexWorkload() final {}
for (const auto res : all_io_res) {
total_result_all_io += res;
}
- ceph::JSONFormatter jf;
- total_result_all_io.dump(&jf);
- jf.flush(std::cout);
- std::cout << std::endl;
co_return total_result_all_io;
};
* (b) writing and removing logs ,this is considered 1 I/O
* (c) doing N I/o's concurrently on 1 thread
*/
-seastar::future<> PGLogWorkload::run(
+seastar::future<results_t> PGLogWorkload::run(
const common_options_t &common,
crimson::os::FuturizedStore &global_store)
{
co_return results_t{num_ops, tot_latency, common.get_duration()};
};
co_await pre_fill_logs();
- co_await run_concurrent_ios(
+ co_return co_await run_concurrent_ios(
common.get_duration(), common.num_concurrent_io, add_remove_entry);
- co_return;
}
// rgw start
* based on the chosen bucets size,aka if its within acceptable range (c) doing
* N I/o's concurrently on 1 thread
*/
-seastar::future<> RGWIndexWorkload::run(
+seastar::future<results_t> RGWIndexWorkload::run(
const common_options_t &common,
crimson::os::FuturizedStore &global_store)
{
};
co_await pre_fill_buckets();
- co_await run_concurrent_ios(
+ co_return co_await run_concurrent_ios(
common.get_duration(), common.num_concurrent_io, rgw_actual_test);
- co_return;
};
int main(int argc, char **argv) {
std::format("error mounting object store type {} in {}",
store_type, store_path)
.c_str()));
- std::vector<seastar::future<>> per_shard_futures;
+ std::vector<seastar::future<results_t>> per_shard_futures;
- auto named_lambda = [&, &store_ref = *store]() -> seastar::future<> {
+ auto named_lambda = [&, &store_ref = *store]()
+ -> seastar::future<results_t> {
DEBUG("running example_io on reactor {}", seastar::this_shard_id());
auto iter = workloads.find(work_load_type);
if (iter != workloads.end()) {
- co_await iter->second->run(common_options, store_ref);
+ co_return co_await iter->second->run(common_options, store_ref);
+ } else {
+ co_return results_t{};
}
- co_return;
};
for (unsigned i = 0; i < seastar::smp::count; ++i) {
per_shard_futures.push_back(
seastar::smp::submit_to(i, std::move(named_lambda)));
}
- co_await seastar::when_all(per_shard_futures.begin(),
- per_shard_futures.end());
+ JSONFormatter f(true /* pretty */);
+ f.open_object_section("store-bench");
+ {
+ f.dump_float("duration_s", common_options.get_duration().count());
+ f.open_array_section("results");
+ for (unsigned i = 0; i < per_shard_futures.size(); ++i) {
+ auto results = co_await std::move(per_shard_futures[i]);
+ f.open_object_section("result");
+ results.dump(&f);
+ f.dump_string("shard", std::to_string(i));
+ f.close_section();
+ }
+ f.close_section();
+ }
+ f.close_section();
+ f.flush(std::cout);
+
co_await store->umount();
co_await store->stop();
co_await crimson::common::sharded_conf().stop();