bool is_stopped = false;
std::optional<seastar::future<>> fut_report;
+ unsigned conn_count = 0;
+ unsigned msg_count = 0;
+ MessageRef last_msg;
+
// available in all shards
unsigned msg_len;
bufferlist msg_data;
}
}
+ void ms_handle_connect(
+ crimson::net::ConnectionRef,
+ seastar::shard_id) override {
+ ceph_abort("impossible, server won't connect");
+ }
+
+ void ms_handle_accept(
+ crimson::net::ConnectionRef,
+ seastar::shard_id new_shard,
+ bool is_replace) override {
+ ceph_assert_always(new_shard == seastar::this_shard_id());
+ auto &server = container().local();
+ ++server.conn_count;
+ }
+
+ void ms_handle_reset(
+ crimson::net::ConnectionRef,
+ bool) override {
+ auto &server = container().local();
+ --server.conn_count;
+ }
+
std::optional<seastar::future<>> ms_dispatch(
crimson::net::ConnectionRef c, MessageRef m) override {
assert(c->get_shard_id() == seastar::this_shard_id());
bufferlist data(server.msg_data);
rep->write(0, server.msg_len, data);
rep->set_tid(m->get_tid());
+ ++server.msg_count;
std::ignore = c->send(std::move(rep));
+
+ if (server.msg_count % 16 == 0) {
+ server.last_msg = std::move(m);
+ }
return {seastar::now()};
}
}
private:
+ struct ShardReport {
+ unsigned msg_count = 0;
+
+ // per-interval metrics
+ double reactor_utilization;
+ unsigned conn_count = 0;
+ int msg_size = 0;
+ unsigned msg_count_interval = 0;
+ };
+
+ // should not be called frequently to impact performance
+ void get_report(ShardReport& last) {
+ unsigned last_msg_count = last.msg_count;
+ int msg_size = -1;
+ if (last_msg) {
+ auto msg = boost::static_pointer_cast<MOSDOp>(last_msg);
+ msg->finish_decode();
+ ceph_assert_always(msg->ops.size() == 1);
+ msg_size = msg->ops[0].op.extent.length;
+ last_msg.reset();
+ }
+
+ last.msg_count = msg_count;
+ last.reactor_utilization = get_reactor_utilization();
+ last.conn_count = conn_count;
+ last.msg_size = msg_size;
+ last.msg_count_interval = msg_count - last_msg_count;
+ }
+
struct TimerReport {
unsigned elapsed = 0u;
+ mono_time start_time = mono_clock::zero();
+ std::vector<ShardReport> reports;
- seastar::future<> ticktock(bool is_fixed_cpu) {
- return seastar::sleep(1s
- ).then([this, is_fixed_cpu] {
- ++elapsed;
- if (is_fixed_cpu) {
- std::ostringstream sout;
- sout << elapsed
- << "s -- server reactor utilization: "
- << get_reactor_utilization();
- std::cout << sout.str() << std::endl;
- return seastar::now();
- } else {
- return seastar::do_with(
- std::vector<double>(seastar::smp::count),
- [this](auto &rus) {
- return seastar::smp::invoke_on_all([&rus] {
- rus[seastar::this_shard_id()] = get_reactor_utilization();
- }).then([this, &rus] {
- std::ostringstream sout;
- sout << elapsed
- << "s -- server reactor utilization: ";
- for (double ru : rus) {
- sout << ru << ",";
- }
- std::cout << sout.str() << std::endl;
- });
- });
- }
- });
- }
+ TimerReport(unsigned shards) : reports(shards) {}
};
void start_report() {
seastar::promise<> pr_report;
fut_report = pr_report.get_future();
seastar::do_with(
- TimerReport(),
+ TimerReport(seastar::smp::count),
[this](auto &report) {
return seastar::do_until(
[this] { return is_stopped; },
[&report, this] {
- return report.ticktock(is_fixed_cpu);
+ return seastar::sleep(2s
+ ).then([&report, this] {
+ report.elapsed += 2;
+ if (is_fixed_cpu) {
+ return seastar::smp::submit_to(msgr_sid,
+ [&report, this] {
+ auto &server = container().local();
+ server.get_report(report.reports[seastar::this_shard_id()]);
+ }).then([&report, this] {
+ auto now = mono_clock::now();
+ auto prv = report.start_time;
+ report.start_time = now;
+ if (prv == mono_clock::zero()) {
+ // cannot compute duration
+ return;
+ }
+ std::chrono::duration<double> duration_d = now - prv;
+ double duration = duration_d.count();
+ auto &ireport = report.reports[msgr_sid];
+ double iops = ireport.msg_count_interval / duration;
+ double throughput_MB = -1;
+ if (ireport.msg_size >= 0) {
+ throughput_MB = iops * ireport.msg_size / 1048576;
+ }
+ std::ostringstream sout;
+ sout << setfill(' ')
+ << report.elapsed
+ << "(" << std::setw(5) << duration << ") "
+ << std::setw(9) << iops << "IOPS "
+ << std::setw(8) << throughput_MB << "MiB/s "
+ << ireport.reactor_utilization
+ << "(" << ireport.conn_count << ")";
+ std::cout << sout.str() << std::endl;
+ });
+ } else {
+ return seastar::smp::invoke_on_all([&report, this] {
+ auto &server = container().local();
+ server.get_report(report.reports[seastar::this_shard_id()]);
+ }).then([&report, this] {
+ auto now = mono_clock::now();
+ auto prv = report.start_time;
+ report.start_time = now;
+ if (prv == mono_clock::zero()) {
+ // cannot compute duration
+ return;
+ }
+ std::chrono::duration<double> duration_d = now - prv;
+ double duration = duration_d.count();
+ unsigned num_msgs = 0;
+ // -1 means unavailable, -2 means mismatch
+ int msg_size = -1;
+ for (auto &i : report.reports) {
+ if (i.msg_size >= 0) {
+ if (msg_size == -2) {
+ // pass
+ } else if (msg_size == -1) {
+ msg_size = i.msg_size;
+ } else {
+ if (msg_size != i.msg_size) {
+ msg_size = -2;
+ }
+ }
+ }
+ num_msgs += i.msg_count_interval;
+ }
+ double iops = num_msgs / duration;
+ double throughput_MB = msg_size;
+ if (msg_size >= 0) {
+ throughput_MB = iops * msg_size / 1048576;
+ }
+ std::ostringstream sout;
+ sout << setfill(' ')
+ << report.elapsed
+ << "(" << std::setw(5) << duration << ") "
+ << std::setw(9) << iops << "IOPS "
+ << std::setw(8) << throughput_MB << "MiB/s ";
+ for (auto &i : report.reports) {
+ sout << i.reactor_utilization
+ << "(" << i.conn_count << ") ";
+ }
+ std::cout << sout.str() << std::endl;
+ });
+ }
+ });
}
);
}).then([this] {
void report_header() const {
std::ostringstream sout;
sout << std::setfill(' ')
- << std::setw(7) << "sec"
- << std::setw(6) << "depth"
- << std::setw(8) << "IOPS"
- << std::setw(8) << "MB/s"
- << std::setw(8) << "lat(ms)";
+ << std::setw(6) << "sec"
+ << std::setw(7) << "depth"
+ << std::setw(10) << "IOPS"
+ << std::setw(9) << "MB/s"
+ << std::setw(9) << "lat(ms)";
std::cout << sout.str() << std::endl;
}
double iops = ops/elapsed_s;
std::ostringstream sout;
sout << setfill(' ')
- << std::setw(7) << elapsed_s
+ << std::setw(5) << elapsed_s
+ << " "
<< std::setw(6) << depth
- << std::setw(8) << iops
+ << " "
+ << std::setw(9) << iops
+ << " "
<< std::setw(8) << iops * bytes_of_block / 1048576
+ << " "
<< std::setw(8) << (sampled_total_lat_s / sampled_count * 1000)
<< " -- ";
if (server_reactor_utilization.has_value()) {