]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: ms_dispatch() use ConnectionRef
authorYingxin Cheng <yingxin.cheng@intel.com>
Mon, 30 Nov 2020 07:53:46 +0000 (15:53 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Tue, 1 Dec 2020 01:34:42 +0000 (09:34 +0800)
The future returned by ms_dispatch() is only for throttling, not for
Connection lifecycle management. And Messenger may not hold the
connection reference once it is closed.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
21 files changed:
src/crimson/admin/admin_socket.cc
src/crimson/admin/admin_socket.h
src/crimson/mgr/client.cc
src/crimson/mgr/client.h
src/crimson/mon/MonClient.cc
src/crimson/mon/MonClient.h
src/crimson/net/Connection.h
src/crimson/net/Dispatcher.h
src/crimson/net/ProtocolV1.cc
src/crimson/net/ProtocolV2.cc
src/crimson/net/chained_dispatchers.cc
src/crimson/net/chained_dispatchers.h
src/crimson/osd/heartbeat.cc
src/crimson/osd/heartbeat.h
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/crimson/osd/osd_operations/client_request.cc
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/test/crimson/test_messenger.cc
src/tools/crimson/perf_crimson_msgr.cc

index b141023e93c85d8d034b1b8ca3dd6572412fe18e..75cb80a46609264a09f82e865a234e00bc9589e5 100644 (file)
@@ -123,7 +123,7 @@ seastar::future<> AdminSocket::finalize_response(
 }
 
 
-seastar::future<> AdminSocket::handle_command(crimson::net::Connection* conn,
+seastar::future<> AdminSocket::handle_command(crimson::net::ConnectionRef conn,
                                              boost::intrusive_ptr<MCommand> m)
 {
   return execute_command(m->cmd, std::move(m->get_data())).then(
index 0beee751c0babc5a418770e3c4df7bbd86d7d74f..a842b62a2d1a03ee0baa2ff54cdb4b21b65d2f7b 100644 (file)
@@ -126,7 +126,7 @@ class AdminSocket : public seastar::enable_lw_shared_from_this<AdminSocket> {
    * \param conn connection over which the incoming command message is received
    * \param m message carrying the command vector and optional input buffer
    */
-  seastar::future<> handle_command(crimson::net::Connection* conn,
+  seastar::future<> handle_command(crimson::net::ConnectionRef conn,
                                   boost::intrusive_ptr<MCommand> m);
 
 private:
index a98480ac8b519e7a881b80a9137e447f7739e1a3..c00370cf3b31769ccafb4aedc30afaca329f59dd 100644 (file)
@@ -48,7 +48,7 @@ seastar::future<> Client::stop()
 }
 
 std::tuple<bool, seastar::future<>>
-Client::ms_dispatch(crimson::net::Connection* conn, MessageRef m)
+Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
 {
   bool dispatched = true;
   gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] {
@@ -117,7 +117,7 @@ seastar::future<> Client::reconnect()
   });
 }
 
-seastar::future<> Client::handle_mgr_map(crimson::net::Connection*,
+seastar::future<> Client::handle_mgr_map(crimson::net::ConnectionRef,
                                          Ref<MMgrMap> m)
 {
   mgrmap = m->get_map();
@@ -131,7 +131,7 @@ seastar::future<> Client::handle_mgr_map(crimson::net::Connection*,
   }
 }
 
-seastar::future<> Client::handle_mgr_conf(crimson::net::Connection* conn,
+seastar::future<> Client::handle_mgr_conf(crimson::net::ConnectionRef,
                                           Ref<MMgrConfigure> m)
 {
   logger().info("{} {}", __func__, *m);
index 25cbfa343254c4fdeae21a8c3a5ba65ac55fdcbc..555e779bfbc057ebfe0402cc28148551b77bea18 100644 (file)
@@ -38,12 +38,12 @@ public:
 
 private:
   std::tuple<bool, seastar::future<>> ms_dispatch(
-      crimson::net::Connection* conn, Ref<Message> m) override;
+      crimson::net::ConnectionRef conn, Ref<Message> m) override;
   void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final;
   void ms_handle_connect(crimson::net::ConnectionRef conn) final;
-  seastar::future<> handle_mgr_map(crimson::net::Connection* conn,
+  seastar::future<> handle_mgr_map(crimson::net::ConnectionRef conn,
                                   Ref<MMgrMap> m);
-  seastar::future<> handle_mgr_conf(crimson::net::Connection* conn,
+  seastar::future<> handle_mgr_conf(crimson::net::ConnectionRef conn,
                                    Ref<MMgrConfigure> m);
   seastar::future<> reconnect();
 
index 835bce01cae7c0bebc12916f13b295e99aff3b92..5df743aaeb0a28a48cd1f54b3963ec85911fe66b 100644 (file)
@@ -519,7 +519,7 @@ bool Client::is_hunting() const {
 }
 
 std::tuple<bool, seastar::future<>>
-Client::ms_dispatch(crimson::net::Connection* conn, MessageRef m)
+Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
 {
   bool dispatched = true;
   gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] {
@@ -785,7 +785,7 @@ int Client::handle_auth_bad_method(crimson::net::ConnectionRef conn,
   }
 }
 
-seastar::future<> Client::handle_monmap(crimson::net::Connection* conn,
+seastar::future<> Client::handle_monmap(crimson::net::ConnectionRef conn,
                                         Ref<MMonMap> m)
 {
   monmap.decode(m->monmapbl);
@@ -815,8 +815,8 @@ seastar::future<> Client::handle_monmap(crimson::net::Connection* conn,
   }
 }
 
-seastar::future<> Client::handle_auth_reply(crimson::net::Connection* conn,
-                                               Ref<MAuthReply> m)
+seastar::future<> Client::handle_auth_reply(crimson::net::ConnectionRef conn,
+                                            Ref<MAuthReply> m)
 {
   logger().info(
     "handle_auth_reply mon {} => {} returns {}: {}",
index bf5daa850ca9cab44a86ae4c8ffaa98a5bd3e6e4..bc8593d60a1e1a5aefc35b63c7eab4e0176a4cd2 100644 (file)
@@ -140,13 +140,13 @@ private:
 private:
   void tick();
 
-  std::tuple<bool, seastar::future<>> ms_dispatch(crimson::net::Connection* conn,
+  std::tuple<bool, seastar::future<>> ms_dispatch(crimson::net::ConnectionRef conn,
                                                   MessageRef m) override;
   void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
 
-  seastar::future<> handle_monmap(crimson::net::Connection* conn,
+  seastar::future<> handle_monmap(crimson::net::ConnectionRef conn,
                                  Ref<MMonMap> m);
-  seastar::future<> handle_auth_reply(crimson::net::Connection* conn,
+  seastar::future<> handle_auth_reply(crimson::net::ConnectionRef conn,
                                      Ref<MAuthReply> m);
   seastar::future<> handle_subscribe_ack(Ref<MMonSubscribeAck> m);
   seastar::future<> handle_get_version_reply(Ref<MMonGetVersionReply> m);
index 25b3f5af562d43cd81ea1c865aa241ecd43c167e..6af12692e78bce9dd44a2cb884552437d958d702 100644 (file)
@@ -147,10 +147,6 @@ class Connection : public seastar::enable_shared_from_this<Connection> {
   auto get_last_keepalive() const { return last_keepalive; }
   auto get_last_keepalive_ack() const { return last_keepalive_ack; }
 
-  seastar::shared_ptr<Connection> get_shared() {
-    return shared_from_this();
-  }
-
   struct user_private_t {
     virtual ~user_private_t() = default;
   };
index 00cd7d474a2a3f4efdb65073fc2c5fade8f22c15..71be61783b097a5320a6f3e0a9e3894010b81c64 100644 (file)
@@ -29,7 +29,7 @@ class Dispatcher {
   // to prevent other dispatchers from processing it, and returns a future
   // to throttle the connection if it's too busy. Else, it returns false and
   // the second future is ignored.
-  virtual std::tuple<bool, seastar::future<>> ms_dispatch(Connection*, MessageRef) = 0;
+  virtual std::tuple<bool, seastar::future<>> ms_dispatch(ConnectionRef, MessageRef) = 0;
 
   virtual void ms_handle_accept(ConnectionRef conn) {}
 
index 95afb61c1cc9e0e753756e8e7ff89a7bf48f4e6d..34fb14573e217975b959cf96bec60d5d49a30776 100644 (file)
@@ -848,10 +848,10 @@ seastar::future<> ProtocolV1::read_message()
     }).then([this] (bufferlist bl) {
       auto p = bl.cbegin();
       ::decode(m.footer, p);
-      auto pconn = seastar::static_pointer_cast<SocketConnection>(
+      auto conn_ref = seastar::static_pointer_cast<SocketConnection>(
         conn.shared_from_this());
       auto msg = ::decode_message(nullptr, 0, m.header, m.footer,
-                                  m.front, m.middle, m.data, std::move(pconn));
+                                  m.front, m.middle, m.data, conn_ref);
       if (unlikely(!msg)) {
         logger().warn("{} decode message failed", conn);
         throw std::system_error{make_error_code(error::corrupted_message)};
@@ -877,7 +877,7 @@ seastar::future<> ProtocolV1::read_message()
       logger().debug("{} <== #{} === {} ({})",
                      conn, msg_ref->get_seq(), *msg_ref, msg_ref->get_type());
       // throttle the reading process by the returned future
-      return dispatchers.ms_dispatch(&conn, std::move(msg_ref));
+      return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
     });
 }
 
index 194b6af486bc2dffe6b076f20153935e167beac5..4d7d06d7a3361aef807c3520ca4414a61d9c572b 100644 (file)
@@ -1883,11 +1883,10 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
     ceph_msg_footer footer{init_le32(0), init_le32(0),
                            init_le32(0), init_le64(0), current_header.flags};
 
-    auto pconn = seastar::static_pointer_cast<SocketConnection>(
+    auto conn_ref = seastar::static_pointer_cast<SocketConnection>(
         conn.shared_from_this());
     Message *message = decode_message(nullptr, 0, header, footer,
-        msg_frame.front(), msg_frame.middle(), msg_frame.data(),
-        std::move(pconn));
+        msg_frame.front(), msg_frame.middle(), msg_frame.data(), conn_ref);
     if (!message) {
       logger().warn("{} decode message failed", conn);
       abort_in_fault();
@@ -1933,7 +1932,7 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp)
     // TODO: change MessageRef with seastar::shared_ptr
     auto msg_ref = MessageRef{message, false};
     // throttle the reading process by the returned future
-    return dispatchers.ms_dispatch(&conn, std::move(msg_ref));
+    return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
   });
 }
 
index 6aa9b93293135f9c68200556544d6127739a2af7..635794e0db3b9ac3ec842065ba21b95555a4f5e3 100644 (file)
@@ -13,7 +13,7 @@ namespace {
 namespace crimson::net {
 
 seastar::future<>
-ChainedDispatchers::ms_dispatch(crimson::net::Connection* conn,
+ChainedDispatchers::ms_dispatch(crimson::net::ConnectionRef conn,
                                 MessageRef m) {
   try {
     for (auto& dispatcher : dispatchers) {
index 9df7b36f1f77ae60c7110464d663798999a867f4..531fe906369c234591c57a89bca38d46288609a0 100644 (file)
@@ -25,7 +25,7 @@ public:
   bool empty() const {
     return dispatchers.empty();
   }
-  seastar::future<> ms_dispatch(crimson::net::Connection* conn, MessageRef m);
+  seastar::future<> ms_dispatch(crimson::net::ConnectionRef, MessageRef);
   void ms_handle_accept(crimson::net::ConnectionRef conn);
   void ms_handle_connect(crimson::net::ConnectionRef conn);
   void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace);
index 4c64e1573ac6847b5d64cfaa2eceb0985b78bb0c..23014cf7f937cb265cba1e0e5e3f4b8be8f22cc7 100644 (file)
@@ -204,7 +204,7 @@ void Heartbeat::remove_peer(osd_id_t peer)
 }
 
 std::tuple<bool, seastar::future<>>
-Heartbeat::ms_dispatch(crimson::net::Connection* conn, MessageRef m)
+Heartbeat::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
 {
   bool dispatched = true;
   gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] {
@@ -258,7 +258,7 @@ void Heartbeat::ms_handle_accept(crimson::net::ConnectionRef conn)
   }
 }
 
-seastar::future<> Heartbeat::handle_osd_ping(crimson::net::Connection* conn,
+seastar::future<> Heartbeat::handle_osd_ping(crimson::net::ConnectionRef conn,
                                              Ref<MOSDPing> m)
 {
   switch (m->op) {
@@ -273,7 +273,7 @@ seastar::future<> Heartbeat::handle_osd_ping(crimson::net::Connection* conn,
   }
 }
 
-seastar::future<> Heartbeat::handle_ping(crimson::net::Connection* conn,
+seastar::future<> Heartbeat::handle_ping(crimson::net::ConnectionRef conn,
                                          Ref<MOSDPing> m)
 {
   auto min_message = static_cast<uint32_t>(
@@ -291,7 +291,7 @@ seastar::future<> Heartbeat::handle_ping(crimson::net::Connection* conn,
   return conn->send(reply);
 }
 
-seastar::future<> Heartbeat::handle_reply(crimson::net::Connection* conn,
+seastar::future<> Heartbeat::handle_reply(crimson::net::ConnectionRef conn,
                                           Ref<MOSDPing> m)
 {
   const osd_id_t from = m->get_source().num();
@@ -373,9 +373,9 @@ Heartbeat::Connection::~Connection()
   }
 }
 
-bool Heartbeat::Connection::matches(crimson::net::Connection* _conn) const
+bool Heartbeat::Connection::matches(crimson::net::ConnectionRef _conn) const
 {
-  return (conn && conn.get() == _conn);
+  return (conn && conn == _conn);
 }
 
 void Heartbeat::Connection::accepted(crimson::net::ConnectionRef accepted_conn)
@@ -551,7 +551,7 @@ void Heartbeat::Peer::send_heartbeat(
 }
 
 seastar::future<> Heartbeat::Peer::handle_reply(
-    crimson::net::Connection* conn, Ref<MOSDPing> m)
+    crimson::net::ConnectionRef conn, Ref<MOSDPing> m)
 {
   if (!session.is_started()) {
     // we haven't sent any ping yet
index 36e1c8c4183f73586deb7df4b9eb7c4ce7c0e117..46d12463c3db87cbceb18e2f8d25a484b2994490 100644 (file)
@@ -49,18 +49,18 @@ public:
 
   // Dispatcher methods
   std::tuple<bool, seastar::future<>> ms_dispatch(
-      crimson::net::Connection* conn, MessageRef m) override;
+      crimson::net::ConnectionRef conn, MessageRef m) override;
   void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
   void ms_handle_connect(crimson::net::ConnectionRef conn) override;
   void ms_handle_accept(crimson::net::ConnectionRef conn) override;
 
   void print(std::ostream&) const;
 private:
-  seastar::future<> handle_osd_ping(crimson::net::Connection* conn,
+  seastar::future<> handle_osd_ping(crimson::net::ConnectionRef conn,
                                    Ref<MOSDPing> m);
-  seastar::future<> handle_ping(crimson::net::Connection* conn,
+  seastar::future<> handle_ping(crimson::net::ConnectionRef conn,
                                Ref<MOSDPing> m);
-  seastar::future<> handle_reply(crimson::net::Connection* conn,
+  seastar::future<> handle_reply(crimson::net::ConnectionRef conn,
                                 Ref<MOSDPing> m);
   seastar::future<> handle_you_died();
 
@@ -182,10 +182,7 @@ class Heartbeat::Connection {
 
   ~Connection();
 
-  bool matches(crimson::net::Connection* _conn) const;
-  bool matches(crimson::net::ConnectionRef conn) const {
-    return matches(conn.get());
-  }
+  bool matches(crimson::net::ConnectionRef _conn) const;
   void connected() {
     set_connected();
   }
@@ -410,7 +407,7 @@ class Heartbeat::Peer final : private Heartbeat::ConnectionListener {
   }
   void send_heartbeat(
       clock::time_point, ceph::signedspan, std::vector<seastar::future<>>&);
-  seastar::future<> handle_reply(crimson::net::Connection*, Ref<MOSDPing>);
+  seastar::future<> handle_reply(crimson::net::ConnectionRef, Ref<MOSDPing>);
   void handle_reset(crimson::net::ConnectionRef conn, bool is_replace) {
     for_each_conn([&] (auto& _conn) {
       if (_conn.matches(conn)) {
index 264932cfb02c464b792cb19a9c6722b48f9d814e..e8f5143ed28fec2267fd1e45b4086705c98d1ce3 100644 (file)
@@ -420,7 +420,7 @@ seastar::future<> OSD::_add_me_to_crush()
   });
 }
 
-seastar::future<> OSD::handle_command(crimson::net::Connection* conn,
+seastar::future<> OSD::handle_command(crimson::net::ConnectionRef conn,
                                      Ref<MCommand> m)
 {
   return asok->handle_command(conn, std::move(m));
@@ -618,7 +618,7 @@ seastar::future<Ref<PG>> OSD::load_pg(spg_t pgid)
 }
 
 std::tuple<bool, seastar::future<>>
-OSD::ms_dispatch(crimson::net::Connection* conn, MessageRef m)
+OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m)
 {
   if (state.is_stopping()) {
     return {false, seastar::now()};
@@ -633,7 +633,7 @@ OSD::ms_dispatch(crimson::net::Connection* conn, MessageRef m)
     case MSG_OSD_PG_CREATE2:
       shard_services.start_operation<CompoundPeeringRequest>(
        *this,
-       conn->get_shared(),
+       conn,
        m);
       return seastar::now();
     case MSG_COMMAND:
@@ -952,7 +952,7 @@ seastar::future<Ref<PG>> OSD::handle_pg_create_info(
   });
 }
 
-seastar::future<> OSD::handle_osd_map(crimson::net::Connection* conn,
+seastar::future<> OSD::handle_osd_map(crimson::net::ConnectionRef conn,
                                       Ref<MOSDMap> m)
 {
   logger().info("handle_osd_map {}", *m);
@@ -1089,17 +1089,17 @@ seastar::future<> OSD::committed_osd_maps(version_t first,
   });
 }
 
-seastar::future<> OSD::handle_osd_op(crimson::net::Connection* conn,
+seastar::future<> OSD::handle_osd_op(crimson::net::ConnectionRef conn,
                                      Ref<MOSDOp> m)
 {
   (void) shard_services.start_operation<ClientRequest>(
     *this,
-    conn->get_shared(),
+    conn,
     std::move(m));
   return seastar::now();
 }
 
-seastar::future<> OSD::send_incremental_map(crimson::net::Connection* conn,
+seastar::future<> OSD::send_incremental_map(crimson::net::ConnectionRef conn,
                                            epoch_t first)
 {
   if (first >= superblock.oldest_map) {
@@ -1125,18 +1125,18 @@ seastar::future<> OSD::send_incremental_map(crimson::net::Connection* conn,
   }
 }
 
-seastar::future<> OSD::handle_rep_op(crimson::net::Connection* conn,
+seastar::future<> OSD::handle_rep_op(crimson::net::ConnectionRef conn,
                                     Ref<MOSDRepOp> m)
 {
   m->finish_decode();
   (void) shard_services.start_operation<RepRequest>(
     *this,
-    conn->get_shared(),
+    std::move(conn),
     std::move(m));
   return seastar::now();
 }
 
-seastar::future<> OSD::handle_rep_op_reply(crimson::net::Connection* conn,
+seastar::future<> OSD::handle_rep_op_reply(crimson::net::ConnectionRef conn,
                                           Ref<MOSDRepOpReply> m)
 {
   const auto& pgs = pg_map.get_pgs();
@@ -1149,7 +1149,7 @@ seastar::future<> OSD::handle_rep_op_reply(crimson::net::Connection* conn,
   return seastar::now();
 }
 
-seastar::future<> OSD::handle_scrub(crimson::net::Connection* conn,
+seastar::future<> OSD::handle_scrub(crimson::net::ConnectionRef conn,
                                    Ref<MOSDScrub2> m)
 {
   if (m->fsid != superblock.cluster_fsid) {
@@ -1157,7 +1157,7 @@ seastar::future<> OSD::handle_scrub(crimson::net::Connection* conn,
     return seastar::now();
   }
   return seastar::parallel_for_each(std::move(m->scrub_pgs),
-    [m, conn=conn->get_shared(), this](spg_t pgid) {
+    [m, conn, this](spg_t pgid) {
     pg_shard_t from_shard{static_cast<int>(m->get_source().num()),
                           pgid.shard};
     PeeringState::RequestScrub scrub_request{m->deep, m->repair};
@@ -1171,7 +1171,7 @@ seastar::future<> OSD::handle_scrub(crimson::net::Connection* conn,
   });
 }
 
-seastar::future<> OSD::handle_mark_me_down(crimson::net::Connection* conn,
+seastar::future<> OSD::handle_mark_me_down(crimson::net::ConnectionRef conn,
                                           Ref<MOSDMarkMeDown> m)
 {
   if (state.is_prestop()) {
@@ -1180,12 +1180,12 @@ seastar::future<> OSD::handle_mark_me_down(crimson::net::Connection* conn,
   return seastar::now();
 }
 
-seastar::future<> OSD::handle_recovery_subreq(crimson::net::Connection* conn,
+seastar::future<> OSD::handle_recovery_subreq(crimson::net::ConnectionRef conn,
                                   Ref<MOSDFastDispatchOp> m)
 {
   (void) shard_services.start_operation<RecoverySubRequest>(
     *this,
-    conn->get_shared(),
+    conn,
     std::move(m));
   return seastar::now();
 }
@@ -1269,7 +1269,7 @@ void OSD::update_heartbeat_peers()
 }
 
 seastar::future<> OSD::handle_peering_op(
-  crimson::net::Connection* conn,
+  crimson::net::ConnectionRef conn,
   Ref<MOSDPeeringOp> m)
 {
   const int from = m->get_source().num();
@@ -1277,7 +1277,7 @@ seastar::future<> OSD::handle_peering_op(
   std::unique_ptr<PGPeeringEvent> evt(m->get_event());
   (void) shard_services.start_operation<RemotePeeringEvent>(
     *this,
-    conn->get_shared(),
+    conn,
     shard_services,
     pg_shard_t{from, m->get_spg().shard},
     m->get_spg(),
index 2f25fbe79a7d1a65e2eab7b7e6a5230b9ee8e215..5b6fcc8448c89ae2d49650b3b5f8e85920033ddc 100644 (file)
@@ -96,7 +96,7 @@ class OSD final : public crimson::net::Dispatcher,
   OSDSuperblock superblock;
 
   // Dispatcher methods
-  std::tuple<bool, seastar::future<>> ms_dispatch(crimson::net::Connection*, MessageRef) final;
+  std::tuple<bool, seastar::future<>> ms_dispatch(crimson::net::ConnectionRef, MessageRef) final;
   void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final;
   void ms_handle_remote_reset(crimson::net::ConnectionRef conn) final;
 
@@ -136,7 +136,7 @@ public:
   void dump_pg_state_history(Formatter*) const;
   void print(std::ostream&) const;
 
-  seastar::future<> send_incremental_map(crimson::net::Connection* conn,
+  seastar::future<> send_incremental_map(crimson::net::ConnectionRef conn,
                                         epoch_t first);
 
   /// @return the seq id of the pg stats being sent
@@ -178,21 +178,21 @@ private:
   seastar::future<Ref<PG>> handle_pg_create_info(
     std::unique_ptr<PGCreateInfo> info);
 
-  seastar::future<> handle_osd_map(crimson::net::Connection* conn,
+  seastar::future<> handle_osd_map(crimson::net::ConnectionRef conn,
                                    Ref<MOSDMap> m);
-  seastar::future<> handle_osd_op(crimson::net::Connection* conn,
+  seastar::future<> handle_osd_op(crimson::net::ConnectionRef conn,
                                  Ref<MOSDOp> m);
-  seastar::future<> handle_rep_op(crimson::net::Connection* conn,
+  seastar::future<> handle_rep_op(crimson::net::ConnectionRef conn,
                                  Ref<MOSDRepOp> m);
-  seastar::future<> handle_rep_op_reply(crimson::net::Connection* conn,
+  seastar::future<> handle_rep_op_reply(crimson::net::ConnectionRef conn,
                                        Ref<MOSDRepOpReply> m);
-  seastar::future<> handle_peering_op(crimson::net::Connection* conn,
+  seastar::future<> handle_peering_op(crimson::net::ConnectionRef conn,
                                      Ref<MOSDPeeringOp> m);
-  seastar::future<> handle_recovery_subreq(crimson::net::Connection* conn,
+  seastar::future<> handle_recovery_subreq(crimson::net::ConnectionRef conn,
                                           Ref<MOSDFastDispatchOp> m);
-  seastar::future<> handle_scrub(crimson::net::Connection* conn,
+  seastar::future<> handle_scrub(crimson::net::ConnectionRef conn,
                                 Ref<MOSDScrub2> m);
-  seastar::future<> handle_mark_me_down(crimson::net::Connection* conn,
+  seastar::future<> handle_mark_me_down(crimson::net::ConnectionRef conn,
                                        Ref<MOSDMarkMeDown> m);
 
   seastar::future<> committed_osd_maps(version_t first,
@@ -201,7 +201,7 @@ private:
 
   void check_osdmap_features();
 
-  seastar::future<> handle_command(crimson::net::Connection* conn,
+  seastar::future<> handle_command(crimson::net::ConnectionRef conn,
                                   Ref<MCommand> m);
   seastar::future<> start_asok_admin();
 
index f2fa0e2050759c28798342f5081254444ad9539e..97e5f952475996b949e479873c5f3bdfb3cbe3e2 100644 (file)
@@ -70,7 +70,7 @@ seastar::future<> ClientRequest::start()
       }).then([this, opref](Ref<PG> pgref) {
        PG &pg = *pgref;
        if (pg.can_discard_op(*m)) {
-         return osd.send_incremental_map(conn.get(), m->get_map_epoch());
+         return osd.send_incremental_map(conn, m->get_map_epoch());
        }
        return with_blocking_future(
          handle.enter(pp(pg).await_map)
index 7598326379c77cd8dd6a4553a084a32e3c67d9e5..548391cc27dae5674f4939b7a4e5e9ab2cb22bc9 100644 (file)
@@ -946,7 +946,7 @@ seastar::future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
     });
 }
 
-void PG::handle_rep_op_reply(crimson::net::Connection* conn,
+void PG::handle_rep_op_reply(crimson::net::ConnectionRef conn,
                             const MOSDRepOpReply& m)
 {
   if (!can_discard_replica_op(m)) {
index bccb74b198a09c9b5255de445ee4ffb1526ed4fa..bd5d43f7db8035c765fb08076973c4b27b41d6e3 100644 (file)
@@ -513,7 +513,7 @@ public:
     with_obc_func_t&& f);
 
   seastar::future<> handle_rep_op(Ref<MOSDRepOp> m);
-  void handle_rep_op_reply(crimson::net::Connection* conn,
+  void handle_rep_op_reply(crimson::net::ConnectionRef conn,
                           const MOSDRepOpReply& m);
 
   void print(std::ostream& os) const;
index d5699f97526867b5b846e385ba8e2a58f14c7aad..5afb6f55f3257a698a5a2993f7493d655298d2db 100644 (file)
@@ -58,7 +58,7 @@ static seastar::future<> test_echo(unsigned rounds,
       crimson::auth::DummyAuthClientServer dummy_auth;
 
       std::tuple<bool, seastar::future<>> ms_dispatch(
-          crimson::net::Connection* c, MessageRef m) override {
+          crimson::net::ConnectionRef c, MessageRef m) override {
         if (verbose) {
           logger().info("server got {}", *m);
         }
@@ -104,15 +104,15 @@ static seastar::future<> test_echo(unsigned rounds,
       unsigned rounds;
       std::bernoulli_distribution keepalive_dist;
       crimson::net::MessengerRef msgr;
-      std::map<crimson::net::Connection*, seastar::promise<>> pending_conns;
-      std::map<crimson::net::Connection*, PingSessionRef> sessions;
+      std::map<crimson::net::ConnectionRef, seastar::promise<>> pending_conns;
+      std::map<crimson::net::ConnectionRef, PingSessionRef> sessions;
       crimson::auth::DummyAuthClientServer dummy_auth;
 
       Client(unsigned rounds, double keepalive_ratio)
         : rounds(rounds),
           keepalive_dist(std::bernoulli_distribution{keepalive_ratio}) {}
 
-      PingSessionRef find_session(crimson::net::Connection* c) {
+      PingSessionRef find_session(crimson::net::ConnectionRef c) {
         auto found = sessions.find(c);
         if (found == sessions.end()) {
           ceph_assert(false);
@@ -122,13 +122,13 @@ static seastar::future<> test_echo(unsigned rounds,
 
       void ms_handle_connect(crimson::net::ConnectionRef conn) override {
         auto session = seastar::make_shared<PingSession>();
-        auto [i, added] = sessions.emplace(conn.get(), session);
+        auto [i, added] = sessions.emplace(conn, session);
         std::ignore = i;
         ceph_assert(added);
         session->connected_time = mono_clock::now();
       }
       std::tuple<bool, seastar::future<>> ms_dispatch(
-          crimson::net::Connection* c, MessageRef m) override {
+          crimson::net::ConnectionRef c, MessageRef m) override {
         auto session = find_session(c);
         ++(session->count);
         if (verbose) {
@@ -165,9 +165,9 @@ static seastar::future<> test_echo(unsigned rounds,
         mono_time start_time = mono_clock::now();
         auto conn = msgr->connect(peer_addr, entity_name_t::TYPE_OSD);
         return seastar::futurize_invoke([this, conn] {
-          return do_dispatch_pingpong(conn.get());
+          return do_dispatch_pingpong(conn);
         }).then([this, conn, start_time] {
-          auto session = find_session(conn.get());
+          auto session = find_session(conn);
           std::chrono::duration<double> dur_handshake = session->connected_time - start_time;
           std::chrono::duration<double> dur_pingpong = session->finish_time - session->connected_time;
           logger().info("{}: handshake {}, pingpong {}",
@@ -176,7 +176,7 @@ static seastar::future<> test_echo(unsigned rounds,
       }
 
      private:
-      seastar::future<> do_dispatch_pingpong(crimson::net::Connection* conn) {
+      seastar::future<> do_dispatch_pingpong(crimson::net::ConnectionRef conn) {
         auto [i, added] = pending_conns.emplace(conn, seastar::promise<>());
         std::ignore = i;
         ceph_assert(added);
@@ -278,7 +278,7 @@ static seastar::future<> test_concurrent_dispatch(bool v2)
       crimson::auth::DummyAuthClientServer dummy_auth;
 
       std::tuple<bool, seastar::future<>> ms_dispatch(
-          crimson::net::Connection* c, MessageRef m) override {
+          crimson::net::ConnectionRef, MessageRef m) override {
         switch (++count) {
         case 1:
           // block on the first request until we reenter with the second
@@ -320,7 +320,7 @@ static seastar::future<> test_concurrent_dispatch(bool v2)
       crimson::auth::DummyAuthClientServer dummy_auth;
 
       std::tuple<bool, seastar::future<>> ms_dispatch(
-          crimson::net::Connection* c, MessageRef m) override {
+          crimson::net::ConnectionRef, MessageRef m) override {
         return {true, seastar::now()};
       }
 
@@ -379,7 +379,7 @@ seastar::future<> test_preemptive_shutdown(bool v2) {
       crimson::auth::DummyAuthClientServer dummy_auth;
 
       std::tuple<bool, seastar::future<>> ms_dispatch(
-          crimson::net::Connection* c, MessageRef m) override {
+          crimson::net::ConnectionRef c, MessageRef m) override {
         std::ignore = c->send(make_message<MPing>());
         return {true, seastar::now()};
       }
@@ -420,7 +420,7 @@ seastar::future<> test_preemptive_shutdown(bool v2) {
       seastar::promise<> stopped_send_promise;
 
       std::tuple<bool, seastar::future<>> ms_dispatch(
-          crimson::net::Connection* c, MessageRef m) override {
+          crimson::net::ConnectionRef, MessageRef m) override {
         return {true, seastar::now()};
       }
 
@@ -813,14 +813,14 @@ class FailoverSuite : public Dispatcher {
   unsigned pending_peer_receive = 0;
   unsigned pending_receive = 0;
 
-  std::tuple<bool, seastar::future<>> ms_dispatch(Connection* c, MessageRef m) override {
-    auto result = interceptor.find_result(c->shared_from_this());
+  std::tuple<bool, seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
+    auto result = interceptor.find_result(c);
     if (result == nullptr) {
       logger().error("Untracked ms dispatched connection: {}", *c);
       ceph_abort();
     }
 
-    if (tracked_conn != c->shared_from_this()) {
+    if (tracked_conn != c) {
       logger().error("[{}] {} got op, but doesn't match tracked_conn [{}] {}",
                      result->index, *c, tracked_index, *tracked_conn);
       ceph_abort();
@@ -1209,7 +1209,7 @@ class FailoverTest : public Dispatcher {
 
   std::unique_ptr<FailoverSuite> test_suite;
 
-  std::tuple<bool, seastar::future<>> ms_dispatch(Connection* c, MessageRef m) override {
+  std::tuple<bool, seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
     switch (m->get_type()) {
      case CEPH_MSG_PING:
       ceph_assert(recv_pong);
@@ -1407,10 +1407,10 @@ class FailoverSuitePeer : public Dispatcher {
   ConnectionRef tracked_conn;
   unsigned pending_send = 0;
 
-  std::tuple<bool, seastar::future<>> ms_dispatch(Connection* c, MessageRef m) override {
+  std::tuple<bool, seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
     logger().info("[TestPeer] got op from Test");
     ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
-    ceph_assert(tracked_conn == c->shared_from_this());
+    ceph_assert(tracked_conn == c);
     std::ignore = op_callback();
     return {true, seastar::now()};
   }
@@ -1537,8 +1537,8 @@ class FailoverTestPeer : public Dispatcher {
   const entity_addr_t test_peer_addr;
   std::unique_ptr<FailoverSuitePeer> test_suite;
 
-  std::tuple<bool, seastar::future<>> ms_dispatch(Connection* c, MessageRef m) override {
-    ceph_assert(cmd_conn == c->shared_from_this());
+  std::tuple<bool, seastar::future<>> ms_dispatch(ConnectionRef c, MessageRef m) override {
+    ceph_assert(cmd_conn == c);
     switch (m->get_type()) {
      case CEPH_MSG_PING:
       std::ignore = c->send(make_message<MPing>());
index 4acd01e0d32c82de8f5b47ef4d1ab6e328f1ed04..e20a570292f1cedf853db6f6c75b969a8d7c0bed 100644 (file)
@@ -153,7 +153,7 @@ static seastar::future<> run(
       }
 
       std::tuple<bool, seastar::future<>> ms_dispatch(
-          crimson::net::Connection* c, MessageRef m) override {
+          crimson::net::ConnectionRef c, MessageRef m) override {
         ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);
 
         // server replies with MOSDOp to generate server-side write workload
@@ -308,7 +308,7 @@ static seastar::future<> run(
         conn_stats.connected_time = mono_clock::now();
       }
       std::tuple<bool, seastar::future<>> ms_dispatch(
-          crimson::net::Connection* c, MessageRef m) override {
+          crimson::net::ConnectionRef, MessageRef m) override {
         // server replies with MOSDOp to generate server-side write workload
         ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);