From: Yingxin Cheng Date: Fri, 14 Jun 2019 09:25:48 +0000 (+0800) Subject: test/crimson: improved perf_crimson_msgr with timer X-Git-Tag: v15.1.0~2317^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=5062b74fb03b2355998c7ae036b2f8d616c2aed6;p=ceph.git test/crimson: improved perf_crimson_msgr with timer Added timer to support: * Ramp up before collecting perf reports; * Shutdown based on running seconds instead of rounds; * Report latency and throughput per second; * Report summary every 10 seconds and before shutdown; * Report summary of each job before shutdown; Signed-off-by: Yingxin Cheng --- diff --git a/src/test/crimson/perf_crimson_msgr.cc b/src/test/crimson/perf_crimson_msgr.cc index 2fffd9b2080e..dacfc0aa81ef 100644 --- a/src/test/crimson/perf_crimson_msgr.cc +++ b/src/test/crimson/perf_crimson_msgr.cc @@ -41,7 +41,8 @@ enum class perf_mode_t { struct client_config { entity_addr_t server_addr; unsigned block_size; - unsigned rounds; + unsigned ramptime; + unsigned msgtime; unsigned jobs; unsigned depth; @@ -49,7 +50,8 @@ struct client_config { std::ostringstream out; out << "client[>> " << server_addr << "](bs=" << block_size - << ", rounds=" << rounds + << ", ramptime=" << ramptime + << ", msgtime=" << msgtime << ", jobs=" << jobs << ", depth=" << depth << ")"; @@ -63,9 +65,11 @@ struct client_config { conf.server_addr = addr; conf.block_size = options["cbs"].as(); - conf.rounds = options["rounds"].as(); + conf.ramptime = options["ramptime"].as(); + conf.msgtime = options["msgtime"].as(); conf.jobs = options["jobs"].as(); conf.depth = options["depth"].as(); + ceph_assert(conf.depth % conf.jobs == 0); return conf; } }; @@ -96,6 +100,8 @@ struct server_config { } }; +const unsigned SAMPLE_RATE = 7; + static seastar::future<> run( perf_mode_t mode, const client_config& client_conf, @@ -165,7 +171,7 @@ static seastar::future<> run( }); } seastar::future<> shutdown() { - logger().info("\n{} shutdown...", lname); + logger().info("{} shutdown...", lname); return container().invoke_on(msgr_sid, [] (auto& server) { ceph_assert(server.msgr); return server.msgr->shutdown(); @@ -177,31 +183,66 @@ static seastar::future<> run( : public ceph::net::Dispatcher, public seastar::peering_sharded_service { - struct ConnSession { + struct ConnStats { mono_time connecting_time = mono_clock::zero(); mono_time connected_time = mono_clock::zero(); + unsigned received_count = 0u; mono_time start_time = mono_clock::zero(); + unsigned start_count = 0u; + + unsigned sampled_count = 0u; + double total_lat_s = 0.0; + + // for reporting only mono_time finish_time = mono_clock::zero(); - unsigned received_count = 0u; + void start() { + start_time = mono_clock::now(); + start_count = received_count; + sampled_count = 0u; + total_lat_s = 0.0; + finish_time = mono_clock::zero(); + } + }; + ConnStats conn_stats; - const unsigned SAMPLE_RATE = 7; + struct PeriodStats { + mono_time start_time = mono_clock::zero(); + unsigned start_count = 0u; unsigned sampled_count = 0u; double total_lat_s = 0.0; - seastar::promise<> done; + // for reporting only + mono_time finish_time = mono_clock::zero(); + unsigned finish_count = 0u; + unsigned depth = 0u; + + void reset(unsigned received_count, PeriodStats* snap = nullptr) { + if (snap) { + snap->start_time = start_time; + snap->start_count = start_count; + snap->sampled_count = sampled_count; + snap->total_lat_s = total_lat_s; + snap->finish_time = mono_clock::now(); + snap->finish_count = received_count; + } + start_time = mono_clock::now(); + start_count = received_count; + sampled_count = 0u; + total_lat_s = 0.0; + } }; - ConnSession conn_session; + PeriodStats period_stats; const seastar::shard_id sid; std::string lname; const unsigned jobs; - const unsigned rounds; ceph::net::Messenger *msgr = nullptr; const unsigned msg_len; bufferlist msg_data; + const unsigned nr_depth; seastar::semaphore depth; std::vector time_msgs_sent; ceph::auth::DummyAuthClientServer dummy_auth; @@ -209,18 +250,26 @@ static seastar::future<> run( unsigned sent_count = 0u; ceph::net::ConnectionRef active_conn = nullptr; - Client(unsigned jobs, unsigned rounds, unsigned msg_len, unsigned depth) + bool stop_send = false; + seastar::promise<> stopped_send_promise; + + Client(unsigned jobs, unsigned msg_len, unsigned depth) : sid{seastar::engine().cpu_id()}, jobs{jobs}, - rounds{rounds/jobs}, msg_len{msg_len}, - depth{depth}, - time_msgs_sent{depth, mono_clock::zero()} { + nr_depth{depth/jobs}, + depth{nr_depth}, + time_msgs_sent{depth/jobs, mono_clock::zero()} { lname = "client#"; lname += std::to_string(sid); msg_data.append_zero(msg_len); } + unsigned get_current_depth() const { + ceph_assert(depth.available_units() >= 0); + return nr_depth - depth.current(); + } + Dispatcher* get_local_shard() override { return &(container().local()); } @@ -229,7 +278,7 @@ static seastar::future<> run( } seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override { logger().info("{}: connected", *conn); - conn_session.connected_time = mono_clock::now(); + conn_stats.connected_time = mono_clock::now(); return seastar::now(); } seastar::future<> ms_dispatch(ceph::net::Connection* c, @@ -238,23 +287,20 @@ static seastar::future<> run( ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); auto msg_id = m->get_tid(); - if (msg_id % conn_session.SAMPLE_RATE == 0) { + if (msg_id % SAMPLE_RATE == 0) { auto index = msg_id % time_msgs_sent.size(); ceph_assert(time_msgs_sent[index] != mono_clock::zero()); std::chrono::duration cur_latency = mono_clock::now() - time_msgs_sent[index]; - conn_session.total_lat_s += cur_latency.count(); - ++(conn_session.sampled_count); + conn_stats.total_lat_s += cur_latency.count(); + ++(conn_stats.sampled_count); + period_stats.total_lat_s += cur_latency.count(); + ++(period_stats.sampled_count); time_msgs_sent[index] = mono_clock::zero(); } - ++(conn_session.received_count); + ++(conn_stats.received_count); depth.signal(1); - if (conn_session.received_count == rounds) { - logger().info("{}: finished receiving {} REPLYs", *c, conn_session.received_count); - conn_session.finish_time = mono_clock::now(); - conn_session.done.set_value(); - } return seastar::now(); } @@ -284,15 +330,17 @@ static seastar::future<> run( seastar::future<> shutdown() { return container().invoke_on_all([] (auto& client) { if (client.is_active()) { - logger().info("\n{} shutdown...", client.lname); + logger().info("{} shutdown...", client.lname); ceph_assert(client.msgr); - return client.msgr->shutdown(); + return client.msgr->shutdown().then([&client] { + return client.stop_dispatch_messages(); + }); } return seastar::now(); }); } - seastar::future<> dispatch_messages(const entity_addr_t& peer_addr) { + seastar::future<> connect_wait_verify(const entity_addr_t& peer_addr) { return container().invoke_on_all([peer_addr] (auto& client) { // start clients in active cores (#1 ~ #jobs) if (client.is_active()) { @@ -303,27 +351,200 @@ static seastar::future<> run( // make sure handshake won't hurt the performance return seastar::sleep(1s); }).then([&client, start_time] { - if (client.conn_session.connected_time == mono_clock::zero()) { + if (client.conn_stats.connected_time == mono_clock::zero()) { logger().error("\n{} not connected after 1s!\n", client.lname); ceph_assert(false); } - client.conn_session.connecting_time = start_time; + client.conn_stats.connecting_time = start_time; }); } return seastar::now(); + }); + } + + private: + class TimerReport { + private: + const unsigned jobs; + const unsigned msgtime; + const unsigned bytes_of_block; + + unsigned elapsed = 0u; + std::vector start_times; + std::vector snaps; + std::vector summaries; + + public: + TimerReport(unsigned jobs, unsigned msgtime, unsigned bs) + : jobs{jobs}, + msgtime{msgtime}, + bytes_of_block{bs}, + start_times{jobs, mono_clock::zero()}, + snaps{jobs}, + summaries{jobs} {} + + unsigned get_elapsed() const { return elapsed; } + + PeriodStats& get_snap_by_job(seastar::shard_id sid) { + ceph_assert(sid >= 1 && sid <= jobs); + return snaps[sid - 1]; + } + + ConnStats& get_summary_by_job(seastar::shard_id sid) { + ceph_assert(sid >= 1 && sid <= jobs); + return summaries[sid - 1]; + } + + bool should_stop() const { + return elapsed >= msgtime; + } + + seastar::future<> ticktock() { + return seastar::sleep(1s).then([this] { + ++elapsed; + }); + } + + void report_header() { + std::ostringstream sout; + sout << std::setfill(' ') + << std::setw(7) << "sec" + << std::setw(6) << "depth" + << std::setw(8) << "IOPS" + << std::setw(8) << "MB/s" + << std::setw(8) << "lat(ms)"; + std::cout << sout.str() << std::endl; + } + + void report_period() { + if (elapsed == 1) { + // init this->start_times at the first period + for (unsigned i=0; i elapsed_d = 0s; + unsigned depth = 0u; + unsigned ops = 0u; + unsigned sampled_count = 0u; + double total_lat_s = 0.0; + for (const auto& snap: snaps) { + elapsed_d += (snap.finish_time - snap.start_time); + depth += snap.depth; + ops += (snap.finish_count - snap.start_count); + sampled_count += snap.sampled_count; + total_lat_s += snap.total_lat_s; + } + double elapsed_s = elapsed_d.count() / jobs; + double iops = ops/elapsed_s; + std::ostringstream sout; + sout << setfill(' ') + << std::setw(7) << elapsed_s + << std::setw(6) << depth + << std::setw(8) << iops + << std::setw(8) << iops * bytes_of_block / 1048576 + << std::setw(8) << (total_lat_s / sampled_count * 1000); + std::cout << sout.str() << std::endl; + } + + void report_summary() const { + std::chrono::duration elapsed_d = 0s; + unsigned ops = 0u; + unsigned sampled_count = 0u; + double total_lat_s = 0.0; + for (const auto& summary: summaries) { + elapsed_d += (summary.finish_time - summary.start_time); + ops += (summary.received_count - summary.start_count); + sampled_count += summary.sampled_count; + total_lat_s += summary.total_lat_s; + } + double elapsed_s = elapsed_d.count() / jobs; + double iops = ops / elapsed_s; + std::ostringstream sout; + sout << "--------------" + << " summary " + << "--------------\n" + << setfill(' ') + << std::setw(7) << elapsed_s + << std::setw(6) << "-" + << std::setw(8) << iops + << std::setw(8) << iops * bytes_of_block / 1048576 + << std::setw(8) << (total_lat_s / sampled_count * 1000) + << "\n"; + std::cout << sout.str() << std::endl; + } + }; + + seastar::future<> report_period(TimerReport& report) { + return container().invoke_on_all([&report] (auto& client) { + if (client.is_active()) { + PeriodStats& snap = report.get_snap_by_job(client.sid); + client.period_stats.reset(client.conn_stats.received_count, + &snap); + snap.depth = client.get_current_depth(); + } + }).then([&report] { + report.report_period(); + }); + } + + seastar::future<> report_summary(TimerReport& report) { + return container().invoke_on_all([&report] (auto& client) { + if (client.is_active()) { + ConnStats& summary = report.get_summary_by_job(client.sid); + summary = client.conn_stats; + summary.finish_time = mono_clock::now(); + } + }).then([&report] { + report.report_summary(); + }); + } + + public: + seastar::future<> dispatch_with_timer(unsigned ramptime, unsigned msgtime) { + logger().info("[all clients]: start sending MOSDOps from {} clients", jobs); + return container().invoke_on_all([] (auto& client) { + if (client.is_active()) { + client.do_dispatch_messages(client.active_conn.get()); + } + }).then([this, ramptime] { + logger().info("[all clients]: ramping up {} seconds...", ramptime); + return seastar::sleep(std::chrono::seconds(ramptime)); }).then([this] { - logger().info("\nstart sending {} MOSDOps from {} clients", - jobs * rounds, jobs); - mono_time start_time = mono_clock::now(); return container().invoke_on_all([] (auto& client) { if (client.is_active()) { - return client.do_dispatch_messages(client.active_conn.get()); + client.conn_stats.start(); + client.period_stats.reset(client.conn_stats.received_count); } - return seastar::now(); - }).then([this, start_time] { - std::chrono::duration dur_messaging = mono_clock::now() - start_time; - logger().info("\nSummary:\n clients: {}\n MOSDOps: {}\n total time: {}s\n", - jobs, jobs * rounds, dur_messaging.count()); + }); + }).then([this, msgtime] { + logger().info("[all clients]: reporting {} seconds...\n", msgtime); + return seastar::do_with( + TimerReport(jobs, msgtime, msg_len), [this] (auto& report) { + report.report_header(); + return seastar::do_until( + [&report] { return report.should_stop(); }, + [&report, this] { + return report.ticktock().then([&report, this] { + // report period every 1s + return report_period(report); + }).then([&report, this] { + // report summary every 10s + if (report.get_elapsed() % 10 == 0) { + return report_summary(report); + } else { + return seastar::now(); + } + }); + } + ).then([&report, this] { + // report the final summary + if (report.get_elapsed() % 10 != 0) { + return report_summary(report); + } else { + return seastar::now(); + } + }); }); }); } @@ -346,7 +567,7 @@ static seastar::future<> run( m->set_tid(sent_count); // sample message latency - if (sent_count % conn_session.SAMPLE_RATE == 0) { + if (sent_count % SAMPLE_RATE == 0) { auto index = sent_count % time_msgs_sent.size(); ceph_assert(time_msgs_sent[index] == mono_clock::zero()); time_msgs_sent[index] = mono_clock::now(); @@ -356,40 +577,48 @@ static seastar::future<> run( }); } - seastar::future<> do_dispatch_messages(ceph::net::Connection* conn) { + class DepthBroken: public std::exception {}; + + seastar::future<> stop_dispatch_messages() { + stop_send = true; + depth.broken(DepthBroken()); + return stopped_send_promise.get_future(); + } + + void do_dispatch_messages(ceph::net::Connection* conn) { ceph_assert(seastar::engine().cpu_id() == sid); ceph_assert(sent_count == 0); - conn_session.start_time = mono_clock::now(); - return seastar::do_until( - [this, conn] { - bool stop = (sent_count == rounds); - if (stop) { - logger().info("{}: finished sending {} OSDOPs", - *conn, sent_count); - } - return stop; - }, + conn_stats.start_time = mono_clock::now(); + seastar::do_until( + [this] { return stop_send; }, [this, conn] { sent_count += 1; return send_msg(conn); } - ).then([this] { - return conn_session.done.get_future(); - }).then([this] { - std::chrono::duration dur_conn = conn_session.connected_time - conn_session.connecting_time; - std::chrono::duration dur_msg = mono_clock::now() - conn_session.start_time; - logger().info("\n{}:\n" - " messages: {}\n" + ).handle_exception_type([] (const DepthBroken& e) { + // ok, stopped by stop_dispatch_messages() + }).finally([this, conn] { + std::chrono::duration dur_conn = conn_stats.connected_time - conn_stats.connecting_time; + std::chrono::duration dur_msg = mono_clock::now() - conn_stats.start_time; + unsigned ops = conn_stats.received_count - conn_stats.start_count; + logger().info("{}: stopped sending OSDOPs.\n" + "{}(depth={}):\n" " connect time: {}s\n" + " messages received: {}\n" " messaging time: {}s\n" " latency: {}ms\n" " IOPS: {}\n" " throughput: {}MB/s\n", - lname, conn_session.received_count, - dur_conn.count(), dur_msg.count(), - conn_session.total_lat_s / conn_session.sampled_count * 1000, - conn_session.received_count / dur_msg.count(), - conn_session.received_count / dur_msg.count() * msg_len / 1048576); + *conn, + lname, + nr_depth, + dur_conn.count(), + ops, + dur_msg.count(), + conn_stats.total_lat_s / conn_stats.sampled_count * 1000, + ops / dur_msg.count(), + ops / dur_msg.count() * msg_len / 1048576); + stopped_send_promise.set_value(); }); } }; @@ -397,7 +626,7 @@ static seastar::future<> run( return seastar::when_all_succeed( ceph::net::create_sharded(server_conf.core, server_conf.block_size), - ceph::net::create_sharded(client_conf.jobs, client_conf.rounds, + ceph::net::create_sharded(client_conf.jobs, client_conf.block_size, client_conf.depth)) .then([=](test_state::Server *server, test_state::Client *client) { @@ -411,10 +640,11 @@ static seastar::future<> run( return seastar::when_all_succeed( server->init(server_conf.addr), client->init()) - // dispatch ops .then([client, addr = client_conf.server_addr] { - return client->dispatch_messages(addr); - // shutdown + return client->connect_wait_verify(addr); + }).then([client, ramptime = client_conf.ramptime, + msgtime = client_conf.msgtime] { + return client->dispatch_with_timer(ramptime, msgtime); }).finally([client] { return client->shutdown(); }).finally([server] { @@ -425,10 +655,11 @@ static seastar::future<> run( ceph_assert(seastar::smp::count >= 1+client_conf.jobs); ceph_assert(client_conf.jobs > 0); return client->init() - // dispatch ops .then([client, addr = client_conf.server_addr] { - return client->dispatch_messages(addr); - // shutdown + return client->connect_wait_verify(addr); + }).then([client, ramptime = client_conf.ramptime, + msgtime = client_conf.msgtime] { + return client->dispatch_with_timer(ramptime, msgtime); }).finally([client] { return client->shutdown(); }); @@ -457,8 +688,10 @@ int main(int argc, char** argv) "0: both, 1:client, 2:server") ("addr", bpo::value()->default_value("v1:0.0.0.0:9010"), "server address") - ("rounds", bpo::value()->default_value(65536), - "number of client messaging rounds") + ("ramptime", bpo::value()->default_value(5), + "seconds of client ramp-up time") + ("msgtime", bpo::value()->default_value(15), + "seconds of client messaging time") ("jobs", bpo::value()->default_value(1), "number of client jobs (messengers)") ("cbs", bpo::value()->default_value(4096),