static client_config load(bpo::variables_map& options) {
client_config conf;
entity_addr_t addr;
- ceph_assert(addr.parse(options["addr"].as<std::string>().c_str(), nullptr));
+ ceph_assert(addr.parse(options["server-addr"].as<std::string>().c_str(), nullptr));
ceph_assert_always(addr.is_msgr2());
conf.server_addr = addr;
- conf.block_size = options["cbs"].as<unsigned>();
+ conf.block_size = options["client-bs"].as<unsigned>();
conf.ramptime = options["ramptime"].as<unsigned>();
conf.msgtime = options["msgtime"].as<unsigned>();
- conf.jobs = options["jobs"].as<unsigned>();
+ conf.jobs = options["client-jobs"].as<unsigned>();
conf.depth = options["depth"].as<unsigned>();
ceph_assert(conf.depth % conf.jobs == 0);
return conf;
static server_config load(bpo::variables_map& options) {
server_config conf;
entity_addr_t addr;
- ceph_assert(addr.parse(options["addr"].as<std::string>().c_str(), nullptr));
+ ceph_assert(addr.parse(options["server-addr"].as<std::string>().c_str(), nullptr));
ceph_assert_always(addr.is_msgr2());
conf.addr = addr;
- conf.block_size = options["sbs"].as<unsigned>();
- conf.core = options["core"].as<unsigned>();
+ conf.block_size = options["server-bs"].as<unsigned>();
+ conf.core = options["server-core"].as<unsigned>();
return conf;
}
};
});
}
- static seastar::future<ServerFRef> create(seastar::shard_id msgr_sid, unsigned msg_len) {
- return seastar::smp::submit_to(msgr_sid, [msg_len] {
- return seastar::make_foreign(std::make_unique<Server>(msg_len));
+ static seastar::future<ServerFRef> create(
+ seastar::shard_id msgr_sid,
+ unsigned msg_len) {
+ return seastar::smp::submit_to(
+ msgr_sid, [msg_len] {
+ return seastar::make_foreign(
+ std::make_unique<Server>(msg_len));
});
}
};
unsigned start_count = 0u;
unsigned sampled_count = 0u;
- double total_lat_s = 0.0;
+ double sampled_total_lat_s = 0.0;
// for reporting only
mono_time finish_time = mono_clock::zero();
- void start() {
+ void start_connecting() {
+ connecting_time = mono_clock::now();
+ }
+
+ void finish_connecting() {
+ connected_time = mono_clock::now();
+ }
+
+ void start_collect() {
start_time = mono_clock::now();
start_count = received_count;
sampled_count = 0u;
- total_lat_s = 0.0;
+ sampled_total_lat_s = 0.0;
finish_time = mono_clock::zero();
}
+
+ void prepare_summary(const ConnStats ¤t) {
+ *this = current;
+ finish_time = mono_clock::now();
+ }
};
ConnStats conn_stats;
mono_time start_time = mono_clock::zero();
unsigned start_count = 0u;
unsigned sampled_count = 0u;
- double total_lat_s = 0.0;
+ double sampled_total_lat_s = 0.0;
// for reporting only
mono_time finish_time = mono_clock::zero();
unsigned finish_count = 0u;
unsigned depth = 0u;
- void reset(unsigned received_count, PeriodStats* snap = nullptr) {
- if (snap) {
- snap->start_time = start_time;
- snap->start_count = start_count;
- snap->sampled_count = sampled_count;
- snap->total_lat_s = total_lat_s;
- snap->finish_time = mono_clock::now();
- snap->finish_count = received_count;
- }
+ void start_collect(unsigned received_count) {
start_time = mono_clock::now();
start_count = received_count;
sampled_count = 0u;
- total_lat_s = 0.0;
+ sampled_total_lat_s = 0.0;
+ }
+
+ void reset_period(
+ unsigned received_count, unsigned _depth, PeriodStats &snapshot) {
+ snapshot.start_time = start_time;
+ snapshot.start_count = start_count;
+ snapshot.sampled_count = sampled_count;
+ snapshot.sampled_total_lat_s = sampled_total_lat_s;
+ snapshot.finish_time = mono_clock::now();
+ snapshot.finish_count = received_count;
+ snapshot.depth = _depth;
+
+ start_collect(received_count);
}
};
PeriodStats period_stats;
crimson::net::ConnectionRef conn,
seastar::shard_id new_shard) override {
ceph_assert_always(new_shard == seastar::this_shard_id());
- conn_stats.connected_time = mono_clock::now();
+ conn_stats.finish_connecting();
}
+
std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef, MessageRef m) override {
+ assert(is_active());
// server replies with MOSDOp to generate server-side write workload
ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
auto index = msg_id % time_msgs_sent.size();
ceph_assert(time_msgs_sent[index] != mono_clock::zero());
std::chrono::duration<double> cur_latency = mono_clock::now() - time_msgs_sent[index];
- conn_stats.total_lat_s += cur_latency.count();
+ conn_stats.sampled_total_lat_s += cur_latency.count();
++(conn_stats.sampled_count);
- period_stats.total_lat_s += cur_latency.count();
+ period_stats.sampled_total_lat_s += cur_latency.count();
++(period_stats.sampled_count);
time_msgs_sent[index] = mono_clock::zero();
}
seastar::future<> shutdown() {
return container().invoke_on_all([] (auto& client) {
- if (client.is_active()) {
- logger().info("{} shutdown...", client.lname);
- ceph_assert(client.msgr);
- client.msgr->stop();
- return client.msgr->shutdown().then([&client] {
- return client.stop_dispatch_messages();
- });
+ if (!client.is_active()) {
+ return seastar::now();
}
- return seastar::now();
+
+ logger().info("{} shutdown...", client.lname);
+ ceph_assert(client.msgr);
+ client.msgr->stop();
+ return client.msgr->shutdown().then([&client] {
+ return client.stop_dispatch_messages();
+ });
});
}
return container().invoke_on_all([peer_addr] (auto& client) {
// start clients in active cores (#1 ~ #jobs)
if (client.is_active()) {
- mono_time start_time = mono_clock::now();
+ client.conn_stats.start_connecting();
client.active_conn = client.msgr->connect(peer_addr, entity_name_t::TYPE_OSD);
// make sure handshake won't hurt the performance
- return seastar::sleep(1s).then([&client, start_time] {
+ return seastar::sleep(1s).then([&client] {
if (client.conn_stats.connected_time == mono_clock::zero()) {
logger().error("\n{} not connected after 1s!\n", client.lname);
ceph_assert(false);
}
- client.conn_stats.connecting_time = start_time;
});
}
return seastar::now();
const unsigned bytes_of_block;
unsigned elapsed = 0u;
- std::vector<mono_time> start_times;
std::vector<PeriodStats> snaps;
std::vector<ConnStats> summaries;
: jobs{jobs},
msgtime{msgtime},
bytes_of_block{bs},
- start_times{jobs, mono_clock::zero()},
snaps{jobs},
summaries{jobs} {}
});
}
- void report_header() {
+ void report_header() const {
std::ostringstream sout;
sout << std::setfill(' ')
<< std::setw(7) << "sec"
}
void report_period() {
- if (elapsed == 1) {
- // init this->start_times at the first period
- for (unsigned i=0; i<jobs; ++i) {
- start_times[i] = snaps[i].start_time;
- }
- }
std::chrono::duration<double> elapsed_d = 0s;
unsigned depth = 0u;
unsigned ops = 0u;
unsigned sampled_count = 0u;
- double total_lat_s = 0.0;
+ double sampled_total_lat_s = 0.0;
for (const auto& snap: snaps) {
elapsed_d += (snap.finish_time - snap.start_time);
depth += snap.depth;
ops += (snap.finish_count - snap.start_count);
sampled_count += snap.sampled_count;
- total_lat_s += snap.total_lat_s;
+ sampled_total_lat_s += snap.sampled_total_lat_s;
}
double elapsed_s = elapsed_d.count() / jobs;
double iops = ops/elapsed_s;
<< std::setw(6) << depth
<< std::setw(8) << iops
<< std::setw(8) << iops * bytes_of_block / 1048576
- << std::setw(8) << (total_lat_s / sampled_count * 1000);
+ << std::setw(8) << (sampled_total_lat_s / sampled_count * 1000);
std::cout << sout.str() << std::endl;
}
std::chrono::duration<double> elapsed_d = 0s;
unsigned ops = 0u;
unsigned sampled_count = 0u;
- double total_lat_s = 0.0;
+ double sampled_total_lat_s = 0.0;
for (const auto& summary: summaries) {
elapsed_d += (summary.finish_time - summary.start_time);
ops += (summary.received_count - summary.start_count);
sampled_count += summary.sampled_count;
- total_lat_s += summary.total_lat_s;
+ sampled_total_lat_s += summary.sampled_total_lat_s;
}
double elapsed_s = elapsed_d.count() / jobs;
double iops = ops / elapsed_s;
<< std::setw(6) << "-"
<< std::setw(8) << iops
<< std::setw(8) << iops * bytes_of_block / 1048576
- << std::setw(8) << (total_lat_s / sampled_count * 1000)
+ << std::setw(8) << (sampled_total_lat_s / sampled_count * 1000)
<< "\n";
std::cout << sout.str() << std::endl;
}
return container().invoke_on_all([&report] (auto& client) {
if (client.is_active()) {
PeriodStats& snap = report.get_snap_by_job(client.sid);
- client.period_stats.reset(client.conn_stats.received_count,
- &snap);
- snap.depth = client.get_current_depth();
+ client.period_stats.reset_period(
+ client.conn_stats.received_count,
+ client.get_current_depth(),
+ snap);
}
}).then([&report] {
report.report_period();
return container().invoke_on_all([&report] (auto& client) {
if (client.is_active()) {
ConnStats& summary = report.get_summary_by_job(client.sid);
- summary = client.conn_stats;
- summary.finish_time = mono_clock::now();
+ summary.prepare_summary(client.conn_stats);
}
}).then([&report] {
report.report_summary();
}).then([this] {
return container().invoke_on_all([] (auto& client) {
if (client.is_active()) {
- client.conn_stats.start();
- client.period_stats.reset(client.conn_stats.received_count);
+ client.conn_stats.start_collect();
+ client.period_stats.start_collect(client.conn_stats.received_count);
}
});
}).then([this, msgtime] {
m->set_tid(sent_count);
// sample message latency
- if (sent_count % SAMPLE_RATE == 0) {
+ if (unlikely(sent_count % SAMPLE_RATE == 0)) {
auto index = sent_count % time_msgs_sent.size();
ceph_assert(time_msgs_sent[index] == mono_clock::zero());
time_msgs_sent[index] = mono_clock::now();
dur_conn.count(),
ops,
dur_msg.count(),
- conn_stats.total_lat_s / conn_stats.sampled_count * 1000,
+ conn_stats.sampled_total_lat_s / conn_stats.sampled_count * 1000,
ops / dur_msg.count(),
ops / dur_msg.count() * msg_len / 1048576);
stopped_send_promise.set_value();
};
return seastar::when_all(
- test_state::Server::create(server_conf.core, server_conf.block_size),
- create_sharded<test_state::Client>(client_conf.jobs, client_conf.block_size, client_conf.depth),
- crimson::common::sharded_conf().start(EntityName{}, std::string_view{"ceph"}).then([] {
+ test_state::Server::create(
+ server_conf.core,
+ server_conf.block_size),
+ create_sharded<test_state::Client>(
+ client_conf.jobs,
+ client_conf.block_size,
+ client_conf.depth),
+ crimson::common::sharded_conf().start(
+ EntityName{}, std::string_view{"ceph"}
+ ).then([] {
return crimson::common::local_conf().start();
}).then([crc_enabled] {
return crimson::common::local_conf().set_val(
app.add_options()
("mode", bpo::value<unsigned>()->default_value(0),
"0: both, 1:client, 2:server")
- ("addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9010"),
+ ("server-addr", bpo::value<std::string>()->default_value("v2:127.0.0.1:9010"),
"server address(only support msgr v2 protocol)")
("ramptime", bpo::value<unsigned>()->default_value(5),
"seconds of client ramp-up time")
("msgtime", bpo::value<unsigned>()->default_value(15),
"seconds of client messaging time")
- ("jobs", bpo::value<unsigned>()->default_value(1),
+ ("client-jobs", bpo::value<unsigned>()->default_value(1),
"number of client jobs (messengers)")
- ("cbs", bpo::value<unsigned>()->default_value(4096),
+ ("client-bs", bpo::value<unsigned>()->default_value(4096),
"client block size")
("depth", bpo::value<unsigned>()->default_value(512),
"client io depth")
- ("core", bpo::value<unsigned>()->default_value(0),
+ ("server-core", bpo::value<unsigned>()->default_value(0),
"server running core")
- ("sbs", bpo::value<unsigned>()->default_value(0),
+ ("server-bs", bpo::value<unsigned>()->default_value(0),
"server block size")
("crc-enabled", bpo::value<bool>()->default_value(false),
"enable CRC checks");