unsigned sent_count = 0u;
crimson::net::ConnectionRef active_conn = nullptr;
+ struct ClientStats {
+ std::string name;
+ unsigned depth = 0;
+ double connect_time_s = 0;
+ unsigned total_msgs = 0;
+ double messaging_time_s = 0;
+ double latency_ms = 0;
+ double iops = 0;
+ double throughput_mbps = 0;
+
+ void report() const {
+ auto str = fmt::format(
+ "{}(depth={}):\n"
+ " connect time: {:08f}s\n"
+ " messages received: {}\n"
+ " messaging time: {:08f}s\n"
+ " latency: {:08f}ms\n"
+ " IOPS: {:08f}\n"
+ " out throughput: {:08f}MB/s",
+ name, depth, connect_time_s,
+ total_msgs, messaging_time_s,
+ latency_ms, iops,
+ throughput_mbps);
+ std::cout << str << std::endl;
+ }
+ };
+
bool stop_send = false;
- seastar::promise<> stopped_send_promise;
+ seastar::promise<ClientStats> stopped_send_promise;
Client(unsigned jobs, unsigned msg_len, unsigned depth)
: sid{seastar::this_shard_id()},
}
seastar::future<> shutdown() {
- return container().invoke_on_all([] (auto& client) {
- if (!client.is_active()) {
- return seastar::now();
- }
+ return seastar::do_with(
+ std::vector<ClientStats>(jobs),
+ [this](auto &all_stats) {
+ return container().invoke_on_all([&all_stats](auto& client) {
+ if (!client.is_active()) {
+ return seastar::now();
+ }
- logger().info("{} shutdown...", client.lname);
- ceph_assert(client.msgr);
- client.msgr->stop();
- return client.msgr->shutdown().then([&client] {
- return client.stop_dispatch_messages();
+ logger().info("{} shutdown...", client.lname);
+ ceph_assert(client.msgr);
+ client.msgr->stop();
+ return seastar::when_all(
+ client.stop_dispatch_messages(
+ ).then([&all_stats, &client](auto stats) {
+ all_stats[client.id] = stats;
+ }),
+ client.msgr->shutdown()
+ ).discard_result();
+ }).then([&all_stats] {
+ auto nr_clients = all_stats.size();
+ ClientStats summary;
+ summary.name = "AllClients" + std::to_string(nr_clients);
+ for (const auto &stats : all_stats) {
+ stats.report();
+ summary.depth += stats.depth;
+ summary.connect_time_s += stats.connect_time_s;
+ summary.total_msgs += stats.total_msgs;
+ summary.messaging_time_s += stats.messaging_time_s;
+ summary.latency_ms += stats.latency_ms;
+ summary.iops += stats.iops;
+ summary.throughput_mbps += stats.throughput_mbps;
+ }
+ summary.connect_time_s /= nr_clients;
+ summary.messaging_time_s /= nr_clients;
+ summary.latency_ms /= nr_clients;
+ summary.report();
});
});
}
class DepthBroken: public std::exception {};
- seastar::future<> stop_dispatch_messages() {
+ seastar::future<ClientStats> stop_dispatch_messages() {
stop_send = true;
depth.broken(DepthBroken());
return stopped_send_promise.get_future();
).handle_exception_type([] (const DepthBroken& e) {
// ok, stopped by stop_dispatch_messages()
}).then([this, conn] {
+ logger().info("{}: stopped sending OSDOPs", *conn);
+
std::chrono::duration<double> dur_conn = conn_stats.connected_time - conn_stats.connecting_time;
std::chrono::duration<double> dur_msg = mono_clock::now() - conn_stats.start_time;
unsigned ops = conn_stats.received_count - conn_stats.start_count;
- logger().info("{}: stopped sending OSDOPs.\n"
- "{}(depth={}):\n"
- " connect time: {}s\n"
- " messages received: {}\n"
- " messaging time: {}s\n"
- " latency: {}ms\n"
- " IOPS: {}\n"
- " throughput: {}MB/s\n",
- *conn,
- lname,
- nr_depth,
- dur_conn.count(),
- ops,
- dur_msg.count(),
- conn_stats.sampled_total_lat_s / conn_stats.sampled_count * 1000,
- ops / dur_msg.count(),
- ops / dur_msg.count() * msg_len / 1048576);
- stopped_send_promise.set_value();
+
+ ClientStats stats;
+ stats.name = lname;
+ stats.depth = nr_depth;
+ stats.connect_time_s = dur_conn.count();
+ stats.total_msgs = ops;
+ stats.messaging_time_s = dur_msg.count();
+ stats.latency_ms =
+ conn_stats.sampled_total_lat_s / conn_stats.sampled_count * 1000;
+ stats.iops = ops / dur_msg.count();
+ stats.throughput_mbps = ops / dur_msg.count() * msg_len / 1048576;
+
+ stopped_send_promise.set_value(stats);
});
}
};