From 771419c34c59a0cf3fe7159b0cffd892238eb4d0 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Tue, 29 Jan 2019 20:08:47 +0800 Subject: [PATCH] crimson/net: improve UT performance and added timing Signed-off-by: Yingxin Cheng --- src/test/crimson/test_messenger.cc | 128 +++++++++++++++++------------ 1 file changed, 75 insertions(+), 53 deletions(-) diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index d13dc9f6782..6ec23f6d117 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -1,3 +1,4 @@ +#include "common/ceph_time.h" #include "messages/MPing.h" #include "crimson/common/log.h" #include "crimson/net/Connection.h" @@ -33,6 +34,7 @@ static seastar::future<> test_echo(unsigned rounds, : public ceph::net::Dispatcher, public seastar::peering_sharded_service { ceph::net::Messenger *msgr = nullptr; + MessageRef msg_pong{new MPing(), false}; Dispatcher* get_local_shard() override { return &(container().local()); @@ -46,7 +48,7 @@ static seastar::future<> test_echo(unsigned rounds, logger().info("server got {}", *m); } // reply with a pong - return c->send(MessageRef{new MPing(), false}); + return c->send(msg_pong); } seastar::future<> init(const entity_name_t& name, @@ -76,6 +78,8 @@ static seastar::future<> test_echo(unsigned rounds, struct PingSession : public seastar::enable_shared_from_this { unsigned count = 0u; + mono_time connected_time; + mono_time finish_time; }; using PingSessionRef = seastar::shared_ptr; @@ -84,10 +88,20 @@ static seastar::future<> test_echo(unsigned rounds, ceph::net::Messenger *msgr = nullptr; std::map> pending_conns; std::map sessions; + MessageRef msg_ping{new MPing(), false}; Client(unsigned rounds, double keepalive_ratio) : rounds(rounds), keepalive_dist(std::bernoulli_distribution{keepalive_ratio}) {} + + PingSessionRef find_session(ceph::net::ConnectionRef c) { + auto found = sessions.find(c); + if (found == sessions.end()) { + ceph_assert(false); + } + return found->second; + } + Dispatcher* get_local_shard() override { return &(container().local()); } @@ -100,19 +114,12 @@ static seastar::future<> test_echo(unsigned rounds, auto [i, added] = sessions.emplace(conn, session); std::ignore = i; ceph_assert(added); - return container().invoke_on_all([conn = conn.get()](auto& client) { - auto [i, added] = client.pending_conns.emplace(conn, seastar::promise<>()); - std::ignore = i; - ceph_assert(added); - }); + session->connected_time = mono_clock::now(); + return seastar::now(); } seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, MessageRef m) override { - auto found = sessions.find(c); - if (found == sessions.end()) { - ceph_assert(false); - } - auto session = found->second; + auto session = find_session(c); ++(session->count); if (verbose) { logger().info("client ms_dispatch {}", session->count); @@ -120,6 +127,7 @@ static seastar::future<> test_echo(unsigned rounds, if (session->count == rounds) { logger().info("{}: finished receiving {} pongs", *c.get(), session->count); + session->finish_time = mono_clock::now(); return container().invoke_on_all([conn = c.get()](auto &client) { auto found = client.pending_conns.find(conn); ceph_assert(found != client.pending_conns.end()); @@ -149,56 +157,70 @@ static seastar::future<> test_echo(unsigned rounds, } seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr, bool foreign_dispatch=true) { + mono_time start_time = mono_clock::now(); return msgr->connect(peer_addr, entity_name_t::TYPE_OSD) - .then([this, foreign_dispatch](auto conn) { - if (foreign_dispatch) { - return do_dispatch_pingpong(&**conn) - .finally([this, conn] {}); - } else { - // NOTE: this could be faster if we don't switch cores in do_dispatch_pingpong(). - return container().invoke_on(conn->get()->shard_id(), [conn = &**conn](auto &client) { - return client.do_dispatch_pingpong(conn); - }).finally([this, conn] {}); - } + .then([this, foreign_dispatch, start_time](auto conn) { + return seastar::futurize_apply([this, conn, foreign_dispatch] { + if (foreign_dispatch) { + return do_dispatch_pingpong(&**conn); + } else { + // NOTE: this could be faster if we don't switch cores in do_dispatch_pingpong(). + return container().invoke_on(conn->get()->shard_id(), [conn = &**conn](auto &client) { + return client.do_dispatch_pingpong(conn); + }); + } + }).finally([this, conn, start_time] { + return container().invoke_on(conn->get()->shard_id(), [conn, start_time](auto &client) { + auto session = client.find_session((*conn)->shared_from_this()); + std::chrono::duration dur_handshake = session->connected_time - start_time; + std::chrono::duration dur_pingpong = session->finish_time - session->connected_time; + logger().info("{}: handshake {}, pingpong {}", + **conn, dur_handshake.count(), dur_pingpong.count()); + }); + }); }); } private: seastar::future<> do_dispatch_pingpong(ceph::net::Connection* conn) { - return seastar::do_with(0u, 0u, - [this, conn](auto &count_ping, auto &count_keepalive) { - return seastar::do_until( - [this, conn, &count_ping, &count_keepalive] { - bool stop = (count_ping == rounds); - if (stop) { - logger().info("{}: finished sending {} pings with {} keepalives", - *conn, count_ping, count_keepalive); - } - return stop; - }, - [this, conn, &count_ping, &count_keepalive] { - return seastar::repeat([this, conn, &count_ping, &count_keepalive] { - if (keepalive_dist(rng)) { - count_keepalive += 1; - return conn->keepalive() - .then([&count_keepalive] { - return seastar::make_ready_future( - seastar::stop_iteration::no); - }); - } else { - count_ping += 1; - return conn->send(MessageRef{new MPing(), false}) - .then([] { - return seastar::make_ready_future( - seastar::stop_iteration::yes); - }); + return container().invoke_on_all([conn](auto& client) { + auto [i, added] = client.pending_conns.emplace(conn, seastar::promise<>()); + std::ignore = i; + ceph_assert(added); + }).then([this, conn] { + return seastar::do_with(0u, 0u, + [this, conn](auto &count_ping, auto &count_keepalive) { + return seastar::do_until( + [this, conn, &count_ping, &count_keepalive] { + bool stop = (count_ping == rounds); + if (stop) { + logger().info("{}: finished sending {} pings with {} keepalives", + *conn, count_ping, count_keepalive); } + return stop; + }, + [this, conn, &count_ping, &count_keepalive] { + return seastar::repeat([this, conn, &count_ping, &count_keepalive] { + if (keepalive_dist(rng)) { + return conn->keepalive() + .then([&count_keepalive] { + count_keepalive += 1; + return seastar::make_ready_future( + seastar::stop_iteration::no); + }); + } else { + return conn->send(msg_ping) + .then([&count_ping] { + count_ping += 1; + return seastar::make_ready_future( + seastar::stop_iteration::yes); + }); + } + }); + }).then([this, conn] { + auto found = pending_conns.find(conn); + return found->second.get_future(); }); - }).then([this, conn] { - auto found = pending_conns.find(conn); - if (found == pending_conns.end()) - throw std::runtime_error{"Not connected."}; - return found->second.get_future(); }); }); } -- 2.39.5