namespace po = boost::program_options;
+using namespace std::literals;
using namespace ceph;
SET_SUBSYS(osd);
+/**
+ * The struct stores the number of operations and the total latency for all
+ * these operations For the pg workload type write+delete increases the number
+ * of operations by 1 For the rgw index workload, each write increases the
+ * num_operations by 1 Each delete increases the number of operations by 1
+ */
+struct results_t {
+ int num_operations = 0;
+ std::chrono::duration<double> tot_latency_sec = 0s;
+ std::chrono::duration<double> duration = 0s;
+
+ results_t &operator += (const results_t &other_result) {
+ num_operations += other_result.num_operations;
+ tot_latency_sec += other_result.tot_latency_sec;
+ return *this;
+ }
+
+ void dump(ceph::Formatter *f) const {
+ f->open_object_section("results_t");
+ f->dump_int("number of operations done", num_operations);
+ f->dump_float(
+ "the total latency aka time for these operations",
+ tot_latency_sec.count());
+ f->dump_float("the time the workload took to run is ",
+ duration.count());
+ f->close_section();
+ }
+};
+
+struct common_options_t {
+private:
+ unsigned duration = 0;
+public:
+ unsigned num_concurrent_io = 16;
+ std::chrono::duration<uint64_t> get_duration() const {
+ return std::chrono::seconds(duration);
+ }
+
+ po::options_description get_options() {
+ po::options_description ret{"Common Options"};
+ ret.add_options()
+ ("num-concurrent-io", po::value<unsigned>(&num_concurrent_io),
+ "number of IOs happening simultaneously")
+ ("duration", po::value<unsigned>(&duration)->required(),
+ "how long in seconds the actual testing loop runs "
+ "for")
+ ;
+ return ret;
+ }
+};
+
+class StoreBenchWorkload {
+public:
+ virtual po::options_description get_options() = 0;
+ virtual seastar::future<> run(
+ const common_options_t &common,
+ crimson::os::FuturizedStore &global_store) = 0;
+ virtual ~StoreBenchWorkload() {}
+};
+
+
+class PGLogWorkload : public StoreBenchWorkload {
+ unsigned num_logs = 4;
+ unsigned log_size = 1024;
+ unsigned log_length = 256;
+
+public:
+ po::options_description get_options() final {
+ po::options_description ret{"PGLogWorkload"};
+ ret.add_options()
+ ("num-logs", po::value<unsigned>(&num_logs),
+ "how many different logs we create; aka, we create a log for every "
+ "object,so how many objects we create")
+ ("log-length", po::value<unsigned>(&log_length),
+ "number of entries per log")
+ ("log-size", po::value<unsigned>(&log_size),
+ "size of each log entry")
+ ;
+ return ret;
+ }
+ seastar::future<> run(
+ const common_options_t &common,
+ crimson::os::FuturizedStore &global_store) final;
+ ~PGLogWorkload() final {}
+};
+
+class RGWIndexWorkload : public StoreBenchWorkload {
+ unsigned num_indices = 16;
+ uint64_t key_size = 1024;
+ uint64_t value_size = 1024;
+ unsigned target_keys_per_bucket = 256;
+ unsigned tolerance_range = 10;
+ unsigned num_buckets_per_collection = 16;
+
+public:
+ po::options_description options{"RGWIndexWorkload"};
+
+ po::options_description get_options() final {
+ po::options_description ret{"PGLogWorkload"};
+ ret.add_options()
+ ("num_indices", po::value<unsigned>(&num_indices),
+ "number of RGW indices/buckets")
+ ("key_size", po::value<uint64_t>(&key_size),
+ "size of keys in bytes")
+ ("value_size", po::value<uint64_t>(&value_size),
+ "size of values in bytes")
+ ("target_keys_per_bucket",
+ po::value<unsigned>(&target_keys_per_bucket),
+ "target number of keys per bucket")
+ ("tolerance_range",
+ po::value<unsigned>(&tolerance_range),
+ "tolerance range percentage")
+ ("num_buckets_per_collection",
+ po::value<unsigned>(&num_buckets_per_collection),
+ "the number of objects in each collection ")
+ ;
+ return ret;
+ }
+ seastar::future<> run(
+ const common_options_t &common,
+ crimson::os::FuturizedStore &global_store) final;
+ ~RGWIndexWorkload() final {}
+};
+
/**
* These are functions that are used in both types of work_loads
*/
return coll_t(spg_t(pg_t(pg_id, 0)));
}
-/**
- * The struct stores the number of operations and the total latency for all
- * these operations For the pg workload type write+delete increases the number
- * of operations by 1 For the rgw index workload, each write increases the
- * num_operations by 1 Each delete increases the number of operations by 1
- */
-struct results_t {
- int num_operations = 0;
- std::chrono::duration<double> tot_latency_sec =
- std::chrono::duration<double>(0.0);
- int duration = 0;
-
- results_t &operator += (const results_t &other_result) {
- num_operations += other_result.num_operations;
- tot_latency_sec += other_result.tot_latency_sec;
- return *this;
- }
-
- void dump(ceph::Formatter *f) const {
- f->open_object_section("results_t");
- f->dump_int("number of operations done", num_operations);
- f->dump_float(
- "the total latency aka time for these operations",
- tot_latency_sec.count());
- f->dump_int("the time the workload took to run is ",
- duration);
- f->close_section();
- }
-};
-
/**
* This function is used to run a workload num_con_io times concurrently
* the parallel_for_each loop launches k co_routines
* added to the vector so all_io_res containes resolved futures
*/
seastar::future<results_t>
-run_concurrent_ios(int duration, int num_concurrent_io,
- std::function<seastar::future<results_t>()> work_load) {
- LOG_PREFIX(run_concurrent_ios);
+run_concurrent_ios(
+ std::chrono::duration<double> duration,
+ int num_concurrent_io,
+ std::function<seastar::future<results_t>()> work_load) {
std::vector<int> container_io;
std::vector<results_t> all_io_res;
for (int i = 0; i < num_concurrent_io; ++i) {
co_return;
})));
- results_t total_result_all_io = {0, std::chrono::duration<double>(0.0),
- duration};
+ results_t total_result_all_io = {
+ 0,
+ std::chrono::duration<double>(0.0),
+ duration
+ };
for (const auto res : all_io_res) {
total_result_all_io += res;
}
* (b) writing and removing logs ,this is considered 1 I/O
* (c) doing N I/o's concurrently on 1 thread
*/
-seastar::future<> pg_log_workload(crimson::os::FuturizedStore &global_store,
- int num_logs, int num_concurrent_io,
- int duration, int log_size, int log_length) {
- LOG_PREFIX(pg_log_workload);
+seastar::future<> PGLogWorkload::run(
+ const common_options_t &common,
+ crimson::os::FuturizedStore &global_store)
+{
auto &local_store = global_store.get_sharded_store();
std::map<int, coll_t> collection_id;
*/
auto pre_fill_logs = [&]() -> seastar::future<> {
- for (int i = 0; i < num_logs; ++i) {
+ for (unsigned i = 0; i < num_logs; ++i) {
auto obj_i = create_hobj(i);
auto coll_id = make_cid(i, 1);
collection_id[i] = coll_id;
auto coll_ref = co_await local_store.create_new_collection(coll_id);
coll_ref_map[i] = coll_ref;
std::map<std::string, bufferlist> data;
- for (int j = 0; j < log_length; ++j) {
+ for (unsigned j = 0; j < log_length; ++j) {
std::string key = std::to_string(j);
bufferlist bl_value;
bl_value.append_zero(log_size);
std::chrono::duration<double>(0.0);
auto start = ceph::mono_clock::now();
- while (ceph::mono_clock::now() - start <=
- (std::chrono::seconds(duration))) { // duration is an int
+ while (ceph::mono_clock::now() - start <= common.get_duration()) {
int obj_num = std::rand() % num_logs;
auto object = create_hobj(obj_num);
tot_latency += time_sec;
num_ops++;
}
- co_return results_t{num_ops, tot_latency, duration};
+ co_return results_t{num_ops, tot_latency, common.get_duration()};
};
co_await pre_fill_logs();
- co_await run_concurrent_ios(duration, num_concurrent_io, add_remove_entry);
+ co_await run_concurrent_ios(
+ common.get_duration(), common.num_concurrent_io, add_remove_entry);
co_return;
}
* based on the chosen bucets size,aka if its within acceptable range (c) doing
* N I/o's concurrently on 1 thread
*/
-seastar::future<> rgw_index_workload(crimson::os::FuturizedStore &global_store,
- int num_indices, int num_concurrent_io,
- int duration, int key_size, int value_size,
- int target_keys_per_bucket,
- int tolerance_range,
- int num_buckets_per_collection) {
-
- LOG_PREFIX(rgw_index_workload);
+seastar::future<> RGWIndexWorkload::run(
+ const common_options_t &common,
+ crimson::os::FuturizedStore &global_store)
+{
auto &local_store = global_store.get_sharded_store();
std::map<int, coll_t> collection_id_for_rgw;
std::map<int, crimson::os::CollectionRef>
*/
auto pre_fill_buckets = [&]() -> seastar::future<> {
- for (int i = 0; i < num_indices; ++i) {
+ for (unsigned i = 0; i < num_indices; ++i) {
auto bucket_i = create_hobj(i);
auto coll_id = make_cid(i, num_buckets_per_collection);
collection_id_for_rgw[i] = coll_id;
std::map<std::string, bufferlist> omap_for_this_bucket;
std::set<std::string> keys_in_this_bucket;
- for (int j = 0; j < target_keys_per_bucket; ++j) {
+ for (unsigned j = 0; j < target_keys_per_bucket; ++j) {
std::string possible_key = generate_random_string(key_size);
while (keys_in_this_bucket.count(possible_key) > 0) {
possible_key = generate_random_string(key_size);
std::chrono::duration<double>(0.0);
auto start = ceph::mono_clock::now();
- while (ceph::mono_clock::now() - start <= std::chrono::seconds(duration)) {
+ while (ceph::mono_clock::now() - start <= common.get_duration()) {
int bucket_num_we_choose = std::rand() % num_indices;
auto bucket = create_hobj(bucket_num_we_choose);
}
};
}
- co_return results_t{num_ops, tot_latency, duration};
+ co_return results_t{num_ops, tot_latency, common.get_duration()};
};
co_await pre_fill_buckets();
- co_await run_concurrent_ios(duration, num_concurrent_io, rgw_actual_test);
+ co_await run_concurrent_ios(
+ common.get_duration(), common.num_concurrent_io, rgw_actual_test);
co_return;
};
bool debug = false;
std::string store_type;
std::string store_path;
+ std::string work_load_type;
+ unsigned smp = 4;
- desc.add_options()("help,h", "show help message")(
- "store-type",
- po::value<std::string>(&store_type)->default_value("seastore"),
- "set store type")
+ desc.add_options()("help,h", "show help message")
+ ("store-type",
+ po::value<std::string>(&store_type)->default_value("seastore"),
+ "set store type")
/* store-path is a path to a directory containing a file 'block'
* block should be a symlink to a real device for actual performance
* testing, but may be a file for testing this utility.
* See build/dev/osd* after starting a vstart cluster for an example
* of what that looks like.
*/
- ("store-path", po::value<std::string>(&store_path),
- "path to store, <store-path>/block should "
- "be a symlink to the target device for bluestore or seastore")(
- "debug", po::value<bool>(&debug)->default_value(false),
- "enable debugging");
+ ("store-path", po::value<std::string>(&store_path)->required(),
+ "path to store, <store-path>/block should "
+ "be a symlink to the target device for bluestore or seastore")(
+ "debug", po::value<bool>(&debug)->default_value(false),
+ "enable debugging")
+ ("work-load-type",
+ po::value<std::string>(&work_load_type)->required(),
+ "work load type: pg_log or rgw_index")
+ ("smp", po::value<unsigned>(&smp),
+ "number of reactors");
+
+ common_options_t common_options;
+ std::map<std::string, std::unique_ptr<StoreBenchWorkload>> workloads;
+
+ workloads.emplace("pg_log", std::make_unique<PGLogWorkload>());
+ workloads.emplace("rgw_index", std::make_unique<RGWIndexWorkload>());
+
+ desc.add(common_options.get_options());
+ for (auto &[name, workload] : workloads) {
+ desc.add(workload->get_options());
+ }
po::variables_map vm;
std::vector<std::string> unrecognized_options;
app_cfg.reactor_opts.blocked_reactor_notify_ms.set_default_value(200);
seastar::app_template app(std::move(app_cfg));
- std::string work_load_type = "pg_log";
- int num_logs = 0;
- int log_length = 0;
- int log_size = 0;
- int num_concurrent_io = 0;
- int duration = 0;
- int key_size = 0;
- int value_size = 0;
- int num_indices = 0;
- int target_keys_per_bucket = 0;
- int tolerance_range = 0;
- int num_buckets_per_collection = 0;
-
- std::vector<char *> av{argv[0]};
- std::transform(std::begin(unrecognized_options),
- std::end(unrecognized_options), std::back_inserter(av),
- [](auto &s) { return const_cast<char *>(s.c_str()); });
- app.add_options()(
- "work_load_type",
- po::value<std::string>(&work_load_type)->default_value("pg_log"),
- "work load type: pg_log or rgw_index")
-
- ("num_logs", po::value<int>(&num_logs),
- "how many different logs we create; aka, we create a log for every "
- "object,so how many objects we create")
-
- ("log_length", po::value<int>(&log_length),
- "number of entries per log")
-
- ("log_size", po::value<int>(&log_size), "size of each log entry")
-
- ("num_concurrent_io", po::value<int>(&num_concurrent_io),
- "number of IOs happening simultaneously")
-
- ("duration", po::value<int>(&duration),
- "how long in seconds the actual testing loop runs "
- "for")
- // for rgw
- ("num_indices", po::value<int>(&num_indices),
- "number of RGW indices/buckets")
-
- ("key_size", po::value<int>(&key_size), "size of keys in bytes")
-
- ("value_size", po::value<int>(&value_size),
- "size of values in bytes")
-
- ("target_keys_per_bucket",
- po::value<int>(&target_keys_per_bucket),
- "target number of keys per bucket")
-
- ("tolerance_range", po::value<int>(&tolerance_range),
- "tolerance range percentage")
-
- ("num_buckets_per_collection",
- po::value<int>(&num_buckets_per_collection),
- "the number of objects in each collection ");
-
+ auto smp_str = std::to_string(smp);
+ const char *av[] = { argv[0], "--smp", smp_str.c_str() };
return app.run(
- av.size(), av.data(),
+ sizeof(av) / sizeof(av[0]), const_cast<char **>(av),
/* crimson-osd uses seastar as its scheduler. We use
* sesastar::app_template::run to start the base task for the
* application -- this lambda. The -> seastar::future<int> here
store_type, store_path)
.c_str()));
std::vector<seastar::future<>> per_shard_futures;
+
+ auto named_lambda = [&, &store_ref = *store]() -> seastar::future<> {
+ DEBUG("running example_io on reactor {}", seastar::this_shard_id());
+ auto iter = workloads.find(work_load_type);
+ if (iter != workloads.end()) {
+ co_await iter->second->run(common_options, store_ref);
+ }
+ co_return;
+ };
for (unsigned i = 0; i < seastar::smp::count; ++i) {
- auto named_lambda = [=, &store_ref = *store]() -> seastar::future<> {
- DEBUG("running example_io on reactor {}", seastar::this_shard_id());
- if (work_load_type == "pg_log") {
- co_await pg_log_workload(store_ref, num_logs, num_concurrent_io,
- duration, log_size, log_length);
- } else if (work_load_type == "rgw_index") {
- co_await rgw_index_workload(
- store_ref, num_indices, num_concurrent_io, duration, key_size,
- value_size, target_keys_per_bucket, tolerance_range,
- num_buckets_per_collection);
- }
- co_return;
- };
per_shard_futures.push_back(
seastar::smp::submit_to(i, std::move(named_lambda)));
}