]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: improve UT performance and added timing
authorYingxin Cheng <yingxincheng@gmail.com>
Tue, 29 Jan 2019 12:08:47 +0000 (20:08 +0800)
committerYingxin Cheng <yingxincheng@gmail.com>
Tue, 12 Feb 2019 11:18:06 +0000 (19:18 +0800)
Signed-off-by: Yingxin Cheng <yingxincheng@gmail.com>
src/test/crimson/test_messenger.cc

index d13dc9f6782fc390e23b8072ebeaed8845d44179..6ec23f6d117651d80ca096cf3b2c0423908f51e6 100644 (file)
@@ -1,3 +1,4 @@
+#include "common/ceph_time.h"
 #include "messages/MPing.h"
 #include "crimson/common/log.h"
 #include "crimson/net/Connection.h"
@@ -33,6 +34,7 @@ static seastar::future<> test_echo(unsigned rounds,
         : public ceph::net::Dispatcher,
           public seastar::peering_sharded_service<Server> {
       ceph::net::Messenger *msgr = nullptr;
+      MessageRef msg_pong{new MPing(), false};
 
       Dispatcher* get_local_shard() override {
         return &(container().local());
@@ -46,7 +48,7 @@ static seastar::future<> test_echo(unsigned rounds,
           logger().info("server got {}", *m);
         }
         // reply with a pong
-        return c->send(MessageRef{new MPing(), false});
+        return c->send(msg_pong);
       }
 
       seastar::future<> init(const entity_name_t& name,
@@ -76,6 +78,8 @@ static seastar::future<> test_echo(unsigned rounds,
 
       struct PingSession : public seastar::enable_shared_from_this<PingSession> {
         unsigned count = 0u;
+        mono_time connected_time;
+        mono_time finish_time;
       };
       using PingSessionRef = seastar::shared_ptr<PingSession>;
 
@@ -84,10 +88,20 @@ static seastar::future<> test_echo(unsigned rounds,
       ceph::net::Messenger *msgr = nullptr;
       std::map<ceph::net::Connection*, seastar::promise<>> pending_conns;
       std::map<ceph::net::ConnectionRef, PingSessionRef> sessions;
+      MessageRef msg_ping{new MPing(), false};
 
       Client(unsigned rounds, double keepalive_ratio)
         : rounds(rounds),
           keepalive_dist(std::bernoulli_distribution{keepalive_ratio}) {}
+
+      PingSessionRef find_session(ceph::net::ConnectionRef c) {
+        auto found = sessions.find(c);
+        if (found == sessions.end()) {
+          ceph_assert(false);
+        }
+        return found->second;
+      }
+
       Dispatcher* get_local_shard() override {
         return &(container().local());
       }
@@ -100,19 +114,12 @@ static seastar::future<> test_echo(unsigned rounds,
         auto [i, added] = sessions.emplace(conn, session);
         std::ignore = i;
         ceph_assert(added);
-        return container().invoke_on_all([conn = conn.get()](auto& client) {
-            auto [i, added] = client.pending_conns.emplace(conn, seastar::promise<>());
-            std::ignore = i;
-            ceph_assert(added);
-          });
+        session->connected_time = mono_clock::now();
+        return seastar::now();
       }
       seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
                                     MessageRef m) override {
-        auto found = sessions.find(c);
-        if (found == sessions.end()) {
-          ceph_assert(false);
-        }
-        auto session = found->second;
+        auto session = find_session(c);
         ++(session->count);
         if (verbose) {
           logger().info("client ms_dispatch {}", session->count);
@@ -120,6 +127,7 @@ static seastar::future<> test_echo(unsigned rounds,
 
         if (session->count == rounds) {
           logger().info("{}: finished receiving {} pongs", *c.get(), session->count);
+          session->finish_time = mono_clock::now();
           return container().invoke_on_all([conn = c.get()](auto &client) {
               auto found = client.pending_conns.find(conn);
               ceph_assert(found != client.pending_conns.end());
@@ -149,56 +157,70 @@ static seastar::future<> test_echo(unsigned rounds,
       }
 
       seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr, bool foreign_dispatch=true) {
+        mono_time start_time = mono_clock::now();
         return msgr->connect(peer_addr, entity_name_t::TYPE_OSD)
-          .then([this, foreign_dispatch](auto conn) {
-            if (foreign_dispatch) {
-              return do_dispatch_pingpong(&**conn)
-                .finally([this, conn] {});
-            } else {
-              // NOTE: this could be faster if we don't switch cores in do_dispatch_pingpong().
-              return container().invoke_on(conn->get()->shard_id(), [conn = &**conn](auto &client) {
-                  return client.do_dispatch_pingpong(conn);
-                }).finally([this, conn] {});
-            }
+          .then([this, foreign_dispatch, start_time](auto conn) {
+            return seastar::futurize_apply([this, conn, foreign_dispatch] {
+                if (foreign_dispatch) {
+                  return do_dispatch_pingpong(&**conn);
+                } else {
+                  // NOTE: this could be faster if we don't switch cores in do_dispatch_pingpong().
+                  return container().invoke_on(conn->get()->shard_id(), [conn = &**conn](auto &client) {
+                      return client.do_dispatch_pingpong(conn);
+                    });
+                }
+              }).finally([this, conn, start_time] {
+                return container().invoke_on(conn->get()->shard_id(), [conn, start_time](auto &client) {
+                    auto session = client.find_session((*conn)->shared_from_this());
+                    std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
+                    std::chrono::duration<double> dur_pingpong = session->finish_time - session->connected_time;
+                    logger().info("{}: handshake {}, pingpong {}",
+                                  **conn, dur_handshake.count(), dur_pingpong.count());
+                  });
+              });
           });
       }
 
      private:
       seastar::future<> do_dispatch_pingpong(ceph::net::Connection* conn) {
-        return seastar::do_with(0u, 0u,
-                                [this, conn](auto &count_ping, auto &count_keepalive) {
-            return seastar::do_until(
-              [this, conn, &count_ping, &count_keepalive] {
-                bool stop = (count_ping == rounds);
-                if (stop) {
-                  logger().info("{}: finished sending {} pings with {} keepalives",
-                                *conn, count_ping, count_keepalive);
-                }
-                return stop;
-              },
-              [this, conn, &count_ping, &count_keepalive] {
-                return seastar::repeat([this, conn, &count_ping, &count_keepalive] {
-                    if (keepalive_dist(rng)) {
-                      count_keepalive += 1;
-                      return conn->keepalive()
-                        .then([&count_keepalive] {
-                          return seastar::make_ready_future<seastar::stop_iteration>(
-                            seastar::stop_iteration::no);
-                        });
-                    } else {
-                      count_ping += 1;
-                      return conn->send(MessageRef{new MPing(), false})
-                        .then([] {
-                          return seastar::make_ready_future<seastar::stop_iteration>(
-                            seastar::stop_iteration::yes);
-                        });
+        return container().invoke_on_all([conn](auto& client) {
+            auto [i, added] = client.pending_conns.emplace(conn, seastar::promise<>());
+            std::ignore = i;
+            ceph_assert(added);
+          }).then([this, conn] {
+            return seastar::do_with(0u, 0u,
+                                    [this, conn](auto &count_ping, auto &count_keepalive) {
+                return seastar::do_until(
+                  [this, conn, &count_ping, &count_keepalive] {
+                    bool stop = (count_ping == rounds);
+                    if (stop) {
+                      logger().info("{}: finished sending {} pings with {} keepalives",
+                                    *conn, count_ping, count_keepalive);
                     }
+                    return stop;
+                  },
+                  [this, conn, &count_ping, &count_keepalive] {
+                    return seastar::repeat([this, conn, &count_ping, &count_keepalive] {
+                        if (keepalive_dist(rng)) {
+                          return conn->keepalive()
+                            .then([&count_keepalive] {
+                              count_keepalive += 1;
+                              return seastar::make_ready_future<seastar::stop_iteration>(
+                                seastar::stop_iteration::no);
+                            });
+                        } else {
+                          return conn->send(msg_ping)
+                            .then([&count_ping] {
+                              count_ping += 1;
+                              return seastar::make_ready_future<seastar::stop_iteration>(
+                                seastar::stop_iteration::yes);
+                            });
+                        }
+                      });
+                  }).then([this, conn] {
+                    auto found = pending_conns.find(conn);
+                    return found->second.get_future();
                   });
-              }).then([this, conn] {
-                auto found = pending_conns.find(conn);
-                if (found == pending_conns.end())
-                  throw std::runtime_error{"Not connected."};
-                return found->second.get_future();
               });
           });
       }