]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson: pass `Connection*` to Dispatch::ms_dispatch() 27690/head
authorKefu Chai <kchai@redhat.com>
Fri, 19 Apr 2019 08:50:25 +0000 (16:50 +0800)
committerKefu Chai <kchai@redhat.com>
Sat, 20 Apr 2019 05:58:32 +0000 (13:58 +0800)
currently, we use a `with_gate()` in `ProtocolV2::read_message()` for
ensuring that `this` (or `Connection` holding this protocol instance)
will outlive the continuation of `dispatcher.ms_dispatch()` which
references `this->dispatcher`. but we also pass a strong reference of
connection to dispatcher. in short, we have *two* safeguards for the
same purpose.

in this change, one of these safeguards is removed -- to pass the raw
pointer of `Connection` to `Dispatch::ms_dispatch()`. the reason why
the `with_gate()` is kept is that, if we have removed `with_gate()` in
Protocol, we need to

1. let `Dispatcher::ms_dispatch()` return `void`, as it should not block
any succeeding calls.
2. add a `with_gate()` in `Dispatcher::ms_dispatch()` to ensure that
`this` is alive during the lifecycle of the continuation(s) in
`Dispatcher::ms_dispatch()`.

Signed-off-by: Yingxin Cheng <yingxincheng@gmail.com>
Signed-off-by: Kefu Chai <kchai@redhat.com>
14 files changed:
src/crimson/mgr/client.cc
src/crimson/mgr/client.h
src/crimson/mon/MonClient.cc
src/crimson/mon/MonClient.h
src/crimson/net/Dispatcher.h
src/crimson/net/ProtocolV1.cc
src/crimson/net/ProtocolV2.cc
src/crimson/osd/chained_dispatchers.cc
src/crimson/osd/chained_dispatchers.h
src/crimson/osd/heartbeat.cc
src/crimson/osd/heartbeat.h
src/test/crimson/perf_crimson_msgr.cc
src/test/crimson/test_alien_echo.cc
src/test/crimson/test_messenger.cc

index bdce3f14fa7345b67b85dfb70e4ebc1b2fc125e5..fd617543423b8675537a8c8428b6fb65b31fe200 100644 (file)
@@ -49,7 +49,7 @@ seastar::future<> Client::stop()
   });
 }
 
-seastar::future<> Client::ms_dispatch(ceph::net::ConnectionRef conn,
+seastar::future<> Client::ms_dispatch(ceph::net::Connection* conn,
                                       MessageRef m)
 {
   switch(m->get_type()) {
@@ -90,7 +90,7 @@ seastar::future<> Client::reconnect()
   });
 }
 
-seastar::future<> Client::handle_mgr_map(ceph::net::ConnectionRef,
+seastar::future<> Client::handle_mgr_map(ceph::net::Connection*,
                                          Ref<MMgrMap> m)
 {
   mgrmap = m->get_map();
@@ -104,7 +104,7 @@ seastar::future<> Client::handle_mgr_map(ceph::net::ConnectionRef,
   }
 }
 
-seastar::future<> Client::handle_mgr_conf(ceph::net::ConnectionRef conn,
+seastar::future<> Client::handle_mgr_conf(ceph::net::Connection* conn,
                                           Ref<MMgrConfigure> m)
 {
   logger().info("{} {}", __func__, *m);
index 016044bebbb9c5e5f51b5d9222ceba4d5dcf3f85..1a63b60fb68157542e080b7a63fbd972ca01da70 100644 (file)
@@ -37,12 +37,12 @@ public:
   seastar::future<> start();
   seastar::future<> stop();
 private:
-  seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn,
+  seastar::future<> ms_dispatch(ceph::net::Connection* conn,
                                Ref<Message> m) override;
   seastar::future<> ms_handle_reset(ceph::net::ConnectionRef conn) override;
-  seastar::future<> handle_mgr_map(ceph::net::ConnectionRef conn,
+  seastar::future<> handle_mgr_map(ceph::net::Connection* conn,
                                   Ref<MMgrMap> m);
-  seastar::future<> handle_mgr_conf(ceph::net::ConnectionRef conn,
+  seastar::future<> handle_mgr_conf(ceph::net::Connection* conn,
                                    Ref<MMgrConfigure> m);
   seastar::future<> reconnect();
   void report();
index 6676a26883a0a455ee90403a329a96e3fe4cb032..4a527b574525b38f4acfedc45cd194939625a501 100644 (file)
@@ -433,7 +433,7 @@ bool Client::is_hunting() const {
 }
 
 seastar::future<>
-Client::ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m)
+Client::ms_dispatch(ceph::net::Connection* conn, MessageRef m)
 {
   // we only care about these message types
   switch (m->get_type()) {
@@ -689,7 +689,7 @@ int Client::handle_auth_bad_method(ceph::net::ConnectionRef conn,
   }
 }
 
-seastar::future<> Client::handle_monmap(ceph::net::ConnectionRef conn,
+seastar::future<> Client::handle_monmap(ceph::net::Connection* conn,
                                         Ref<MMonMap> m)
 {
   monmap.decode(m->monmapbl);
@@ -707,7 +707,7 @@ seastar::future<> Client::handle_monmap(ceph::net::ConnectionRef conn,
   }
 }
 
-seastar::future<> Client::handle_auth_reply(ceph::net::ConnectionRef conn,
+seastar::future<> Client::handle_auth_reply(ceph::net::Connection* conn,
                                                Ref<MAuthReply> m)
 {
   logger().info("mon {} => {} returns {}: {}",
index 940574eaf53509a407ee729adfb120e97530359d..852494db3984b5df740373db6cf98631f28b0d8f 100644 (file)
@@ -143,14 +143,14 @@ private:
 private:
   void tick();
 
-  seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn,
+  seastar::future<> ms_dispatch(ceph::net::Connection* conn,
                                MessageRef m) override;
   seastar::future<> ms_handle_reset(ceph::net::ConnectionRef conn) override;
   AuthAuthorizer* ms_get_authorizer(peer_type_t peer) const override;
 
-  seastar::future<> handle_monmap(ceph::net::ConnectionRef conn,
+  seastar::future<> handle_monmap(ceph::net::Connection* conn,
                                  Ref<MMonMap> m);
-  seastar::future<> handle_auth_reply(ceph::net::ConnectionRef conn,
+  seastar::future<> handle_auth_reply(ceph::net::Connection* conn,
                                      Ref<MAuthReply> m);
   seastar::future<> handle_subscribe_ack(Ref<MMonSubscribeAck> m);
   seastar::future<> handle_get_version_reply(Ref<MMonGetVersionReply> m);
index 8bcc47c917cf9c529067daa9afc12742821dc3cf..7a725e2fccff245fbcc29c0386a6a57f90808725 100644 (file)
@@ -27,7 +27,7 @@ class Dispatcher {
  public:
   virtual ~Dispatcher() {}
 
-  virtual seastar::future<> ms_dispatch(ConnectionRef conn, MessageRef m) {
+  virtual seastar::future<> ms_dispatch(Connection* conn, MessageRef m) {
     return seastar::make_ready_future<>();
   }
 
index 3730e236013635e86fde50280d73d183e100f942..bc88913aaa0ad7848a017c7d77d227ca6742a94a 100644 (file)
@@ -733,10 +733,7 @@ seastar::future<> ProtocolV1::read_message()
       seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] {
           logger().debug("{} <= {}@{} === {}", messenger,
                 msg->get_source(), conn.peer_addr, *msg);
-          return dispatcher.ms_dispatch(
-              seastar::static_pointer_cast<SocketConnection>(
-                conn.shared_from_this()),
-              std::move(msg))
+          return dispatcher.ms_dispatch(&conn, std::move(msg))
             .handle_exception([this] (std::exception_ptr eptr) {
               logger().error("{} ms_dispatch caught exception: {}", conn, eptr);
               ceph_assert(false);
index 086914da95d6d0896bf90c42c696fe719429df16..6fdc7163e04919927d19b16bdc0e50bb1691a4e3 100644 (file)
@@ -1466,11 +1466,8 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
     // TODO: change MessageRef with seastar::shared_ptr
     auto msg_ref = MessageRef{message, false};
     seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] {
-      return dispatcher.ms_dispatch(
-          seastar::static_pointer_cast<SocketConnection>(
-            conn.shared_from_this()),
-          std::move(msg))
-      .handle_exception([this] (std::exception_ptr eptr) {
+      return dispatcher.ms_dispatch(&conn, std::move(msg))
+       .handle_exception([this] (std::exception_ptr eptr) {
         logger().error("{} ms_dispatch caught exception: {}", conn, eptr);
         ceph_assert(false);
       });
index dfca51e8cbad0a95a15af7f126d8dd6112d8f350..0473c8b00fe4b0d43946c299284c207db4a2cd5c 100644 (file)
@@ -3,7 +3,7 @@
 
 
 seastar::future<>
-ChainedDispatchers::ms_dispatch(ceph::net::ConnectionRef conn,
+ChainedDispatchers::ms_dispatch(ceph::net::Connection* conn,
                                 MessageRef m) {
   return seastar::do_for_each(dispatchers, [conn, m](Dispatcher* dispatcher) {
     return dispatcher->ms_dispatch(conn, m);
index 8368021c86998cb9c2a035e26680e15bacc2c78d..1871e7416d04d3305784068ca4578a827d1c5114 100644 (file)
@@ -22,7 +22,7 @@ public:
   void push_back(Dispatcher* dispatcher) {
     dispatchers.push_back(dispatcher);
   }
-  seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m) override;
+  seastar::future<> ms_dispatch(ceph::net::Connection* conn, MessageRef m) override;
   seastar::future<> ms_handle_accept(ceph::net::ConnectionRef conn) override;
   seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override;
   seastar::future<> ms_handle_reset(ceph::net::ConnectionRef conn) override;
index a44fd7e79a8f9904f39404fbce2fe63638978102..c3f86a3fea738fc9a05069210abdb4ce3fc98abe 100644 (file)
@@ -201,7 +201,7 @@ seastar::future<> Heartbeat::remove_peer(osd_id_t peer)
     });
 }
 
-seastar::future<> Heartbeat::ms_dispatch(ceph::net::ConnectionRef conn,
+seastar::future<> Heartbeat::ms_dispatch(ceph::net::Connection* conn,
                                          MessageRef m)
 {
   switch (m->get_type()) {
@@ -229,7 +229,7 @@ seastar::future<> Heartbeat::ms_handle_reset(ceph::net::ConnectionRef conn)
   });
 }
 
-seastar::future<> Heartbeat::handle_osd_ping(ceph::net::ConnectionRef conn,
+seastar::future<> Heartbeat::handle_osd_ping(ceph::net::Connection* conn,
                                              Ref<MOSDPing> m)
 {
   switch (m->op) {
@@ -244,7 +244,7 @@ seastar::future<> Heartbeat::handle_osd_ping(ceph::net::ConnectionRef conn,
   }
 }
 
-seastar::future<> Heartbeat::handle_ping(ceph::net::ConnectionRef conn,
+seastar::future<> Heartbeat::handle_ping(ceph::net::Connection* conn,
                                          Ref<MOSDPing> m)
 {
   auto min_message = static_cast<uint32_t>(
@@ -258,7 +258,7 @@ seastar::future<> Heartbeat::handle_ping(ceph::net::ConnectionRef conn,
   return conn->send(reply);
 }
 
-seastar::future<> Heartbeat::handle_reply(ceph::net::ConnectionRef conn,
+seastar::future<> Heartbeat::handle_reply(ceph::net::Connection* conn,
                                           Ref<MOSDPing> m)
 {
   const osd_id_t from = m->get_source().num();
index 081c06445ea8f6b1da7481c7586d587a93806378..4046b208a9e927fa1126ff4f5d1141875253e74e 100644 (file)
@@ -42,17 +42,17 @@ public:
   const entity_addrvec_t& get_back_addrs() const;
 
   // Dispatcher methods
-  seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn,
+  seastar::future<> ms_dispatch(ceph::net::Connection* conn,
                                MessageRef m) override;
   seastar::future<> ms_handle_reset(ceph::net::ConnectionRef conn) override;
   AuthAuthorizer* ms_get_authorizer(peer_type_t peer) const override;
 
 private:
-  seastar::future<> handle_osd_ping(ceph::net::ConnectionRef conn,
+  seastar::future<> handle_osd_ping(ceph::net::Connection* conn,
                                    Ref<MOSDPing> m);
-  seastar::future<> handle_ping(ceph::net::ConnectionRef conn,
+  seastar::future<> handle_ping(ceph::net::Connection* conn,
                                Ref<MOSDPing> m);
-  seastar::future<> handle_reply(ceph::net::ConnectionRef conn,
+  seastar::future<> handle_reply(ceph::net::Connection* conn,
                                 Ref<MOSDPing> m);
   seastar::future<> handle_you_died();
 
index 07f67210a91150773480fa5f687e6137ec5f1637..ee678ccedf0d3d601d3b2379e16f955ec2fc7541 100644 (file)
@@ -128,7 +128,7 @@ static seastar::future<> run(
       seastar::future<> stop() {
         return seastar::make_ready_future<>();
       }
-      seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+      seastar::future<> ms_dispatch(ceph::net::Connection* c,
                                     MessageRef m) override {
         ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
 
@@ -224,7 +224,7 @@ static seastar::future<> run(
         active_session->connected_time = mono_clock::now();
         return seastar::now();
       }
-      seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+      seastar::future<> ms_dispatch(ceph::net::Connection* c,
                                     MessageRef m) override {
         // server replies with MOSDOp to generate server-side write workload
         ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
index 8e3272cab81ca8936b2f05ab988e72a8554fcba4..ff7c2df6d441ed2cf9ad8e3f1659635c9b5b69ac 100644 (file)
@@ -42,7 +42,7 @@ struct Server {
   struct ServerDispatcher : ceph::net::Dispatcher {
     unsigned count = 0;
     seastar::condition_variable on_reply;
-    seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+    seastar::future<> ms_dispatch(ceph::net::Connection* c,
                                   MessageRef m) override {
       std::cout << "server got ping " << *m << std::endl;
       // reply with a pong
@@ -79,7 +79,7 @@ struct Client {
   struct ClientDispatcher : ceph::net::Dispatcher {
     unsigned count = 0;
     seastar::condition_variable on_reply;
-    seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+    seastar::future<> ms_dispatch(ceph::net::Connection* c,
                                   MessageRef m) override {
       std::cout << "client got pong " << *m << std::endl;
       ++count;
index de80376b8630a58fd542275371729126af50dafe..ea942618ce40ece67008506105d5a308c1bf4f8d 100644 (file)
@@ -45,7 +45,7 @@ static seastar::future<> test_echo(unsigned rounds,
       seastar::future<> stop() {
         return seastar::make_ready_future<>();
       }
-      seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+      seastar::future<> ms_dispatch(ceph::net::Connection* c,
                                     MessageRef m) override {
         if (verbose) {
           logger().info("server got {}", *m);
@@ -93,7 +93,7 @@ static seastar::future<> test_echo(unsigned rounds,
       std::bernoulli_distribution keepalive_dist;
       ceph::net::Messenger *msgr = nullptr;
       std::map<ceph::net::Connection*, seastar::promise<>> pending_conns;
-      std::map<ceph::net::ConnectionRef, PingSessionRef> sessions;
+      std::map<ceph::net::Connection*, PingSessionRef> sessions;
       MessageRef msg_ping{new MPing(), false};
       ceph::auth::DummyAuthClientServer dummy_auth;
 
@@ -101,7 +101,7 @@ static seastar::future<> test_echo(unsigned rounds,
         : rounds(rounds),
           keepalive_dist(std::bernoulli_distribution{keepalive_ratio}) {}
 
-      PingSessionRef find_session(ceph::net::ConnectionRef c) {
+      PingSessionRef find_session(ceph::net::Connection* c) {
         auto found = sessions.find(c);
         if (found == sessions.end()) {
           ceph_assert(false);
@@ -118,13 +118,13 @@ static seastar::future<> test_echo(unsigned rounds,
       seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override {
         logger().info("{}: connected to {}", *conn, conn->get_peer_addr());
         auto session = seastar::make_shared<PingSession>();
-        auto [i, added] = sessions.emplace(conn, session);
+        auto [i, added] = sessions.emplace(conn.get(), session);
         std::ignore = i;
         ceph_assert(added);
         session->connected_time = mono_clock::now();
         return seastar::now();
       }
-      seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+      seastar::future<> ms_dispatch(ceph::net::Connection* c,
                                     MessageRef m) override {
         auto session = find_session(c);
         ++(session->count);
@@ -133,10 +133,10 @@ static seastar::future<> test_echo(unsigned rounds,
         }
 
         if (session->count == rounds) {
-          logger().info("{}: finished receiving {} pongs", *c.get(), session->count);
+          logger().info("{}: finished receiving {} pongs", *c, session->count);
           session->finish_time = mono_clock::now();
-          return container().invoke_on_all([conn = c.get()](auto &client) {
-              auto found = client.pending_conns.find(conn);
+          return container().invoke_on_all([c](auto &client) {
+              auto found = client.pending_conns.find(c);
               ceph_assert(found != client.pending_conns.end());
               found->second.set_value();
             });
@@ -181,7 +181,7 @@ static seastar::future<> test_echo(unsigned rounds,
                 }
               }).finally([this, conn, start_time] {
                 return container().invoke_on(conn->get()->shard_id(), [conn, start_time](auto &client) {
-                    auto session = client.find_session((*conn)->shared_from_this());
+                    auto session = client.find_session(&**conn);
                     std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
                     std::chrono::duration<double> dur_pingpong = session->finish_time - session->connected_time;
                     logger().info("{}: handshake {}, pingpong {}",
@@ -305,7 +305,7 @@ static seastar::future<> test_concurrent_dispatch(bool v2)
       seastar::promise<> on_done; // satisfied when first dispatch unblocks
       ceph::auth::DummyAuthClientServer dummy_auth;
 
-      seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+      seastar::future<> ms_dispatch(ceph::net::Connection* c,
                                     MessageRef m) override {
         switch (++count) {
         case 1: