From: Yingxin Cheng Date: Mon, 30 Nov 2020 07:53:46 +0000 (+0800) Subject: crimson/net: ms_dispatch() use ConnectionRef X-Git-Tag: v16.1.0~416^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=659420c64cd379f1cb95e4b0a17113c3ddfea270;p=ceph.git crimson/net: ms_dispatch() use ConnectionRef The future returned by ms_dispatch() is only for throttling, not for Connection lifecycle management. And Messenger may not hold the connection reference once it is closed. Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/admin/admin_socket.cc b/src/crimson/admin/admin_socket.cc index b141023e93c8..75cb80a46609 100644 --- a/src/crimson/admin/admin_socket.cc +++ b/src/crimson/admin/admin_socket.cc @@ -123,7 +123,7 @@ seastar::future<> AdminSocket::finalize_response( } -seastar::future<> AdminSocket::handle_command(crimson::net::Connection* conn, +seastar::future<> AdminSocket::handle_command(crimson::net::ConnectionRef conn, boost::intrusive_ptr m) { return execute_command(m->cmd, std::move(m->get_data())).then( diff --git a/src/crimson/admin/admin_socket.h b/src/crimson/admin/admin_socket.h index 0beee751c0ba..a842b62a2d1a 100644 --- a/src/crimson/admin/admin_socket.h +++ b/src/crimson/admin/admin_socket.h @@ -126,7 +126,7 @@ class AdminSocket : public seastar::enable_lw_shared_from_this { * \param conn connection over which the incoming command message is received * \param m message carrying the command vector and optional input buffer */ - seastar::future<> handle_command(crimson::net::Connection* conn, + seastar::future<> handle_command(crimson::net::ConnectionRef conn, boost::intrusive_ptr m); private: diff --git a/src/crimson/mgr/client.cc b/src/crimson/mgr/client.cc index a98480ac8b51..c00370cf3b31 100644 --- a/src/crimson/mgr/client.cc +++ b/src/crimson/mgr/client.cc @@ -48,7 +48,7 @@ seastar::future<> Client::stop() } std::tuple> -Client::ms_dispatch(crimson::net::Connection* conn, MessageRef m) +Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m) { bool dispatched = true; gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] { @@ -117,7 +117,7 @@ seastar::future<> Client::reconnect() }); } -seastar::future<> Client::handle_mgr_map(crimson::net::Connection*, +seastar::future<> Client::handle_mgr_map(crimson::net::ConnectionRef, Ref m) { mgrmap = m->get_map(); @@ -131,7 +131,7 @@ seastar::future<> Client::handle_mgr_map(crimson::net::Connection*, } } -seastar::future<> Client::handle_mgr_conf(crimson::net::Connection* conn, +seastar::future<> Client::handle_mgr_conf(crimson::net::ConnectionRef, Ref m) { logger().info("{} {}", __func__, *m); diff --git a/src/crimson/mgr/client.h b/src/crimson/mgr/client.h index 25cbfa343254..555e779bfbc0 100644 --- a/src/crimson/mgr/client.h +++ b/src/crimson/mgr/client.h @@ -38,12 +38,12 @@ public: private: std::tuple> ms_dispatch( - crimson::net::Connection* conn, Ref m) override; + crimson::net::ConnectionRef conn, Ref m) override; void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final; void ms_handle_connect(crimson::net::ConnectionRef conn) final; - seastar::future<> handle_mgr_map(crimson::net::Connection* conn, + seastar::future<> handle_mgr_map(crimson::net::ConnectionRef conn, Ref m); - seastar::future<> handle_mgr_conf(crimson::net::Connection* conn, + seastar::future<> handle_mgr_conf(crimson::net::ConnectionRef conn, Ref m); seastar::future<> reconnect(); diff --git a/src/crimson/mon/MonClient.cc b/src/crimson/mon/MonClient.cc index 835bce01cae7..5df743aaeb0a 100644 --- a/src/crimson/mon/MonClient.cc +++ b/src/crimson/mon/MonClient.cc @@ -519,7 +519,7 @@ bool Client::is_hunting() const { } std::tuple> -Client::ms_dispatch(crimson::net::Connection* conn, MessageRef m) +Client::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m) { bool dispatched = true; gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] { @@ -785,7 +785,7 @@ int Client::handle_auth_bad_method(crimson::net::ConnectionRef conn, } } -seastar::future<> Client::handle_monmap(crimson::net::Connection* conn, +seastar::future<> Client::handle_monmap(crimson::net::ConnectionRef conn, Ref m) { monmap.decode(m->monmapbl); @@ -815,8 +815,8 @@ seastar::future<> Client::handle_monmap(crimson::net::Connection* conn, } } -seastar::future<> Client::handle_auth_reply(crimson::net::Connection* conn, - Ref m) +seastar::future<> Client::handle_auth_reply(crimson::net::ConnectionRef conn, + Ref m) { logger().info( "handle_auth_reply mon {} => {} returns {}: {}", diff --git a/src/crimson/mon/MonClient.h b/src/crimson/mon/MonClient.h index bf5daa850ca9..bc8593d60a1e 100644 --- a/src/crimson/mon/MonClient.h +++ b/src/crimson/mon/MonClient.h @@ -140,13 +140,13 @@ private: private: void tick(); - std::tuple> ms_dispatch(crimson::net::Connection* conn, + std::tuple> ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m) override; void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override; - seastar::future<> handle_monmap(crimson::net::Connection* conn, + seastar::future<> handle_monmap(crimson::net::ConnectionRef conn, Ref m); - seastar::future<> handle_auth_reply(crimson::net::Connection* conn, + seastar::future<> handle_auth_reply(crimson::net::ConnectionRef conn, Ref m); seastar::future<> handle_subscribe_ack(Ref m); seastar::future<> handle_get_version_reply(Ref m); diff --git a/src/crimson/net/Connection.h b/src/crimson/net/Connection.h index 25b3f5af562d..6af12692e78b 100644 --- a/src/crimson/net/Connection.h +++ b/src/crimson/net/Connection.h @@ -147,10 +147,6 @@ class Connection : public seastar::enable_shared_from_this { auto get_last_keepalive() const { return last_keepalive; } auto get_last_keepalive_ack() const { return last_keepalive_ack; } - seastar::shared_ptr get_shared() { - return shared_from_this(); - } - struct user_private_t { virtual ~user_private_t() = default; }; diff --git a/src/crimson/net/Dispatcher.h b/src/crimson/net/Dispatcher.h index 00cd7d474a2a..71be61783b09 100644 --- a/src/crimson/net/Dispatcher.h +++ b/src/crimson/net/Dispatcher.h @@ -29,7 +29,7 @@ class Dispatcher { // to prevent other dispatchers from processing it, and returns a future // to throttle the connection if it's too busy. Else, it returns false and // the second future is ignored. - virtual std::tuple> ms_dispatch(Connection*, MessageRef) = 0; + virtual std::tuple> ms_dispatch(ConnectionRef, MessageRef) = 0; virtual void ms_handle_accept(ConnectionRef conn) {} diff --git a/src/crimson/net/ProtocolV1.cc b/src/crimson/net/ProtocolV1.cc index 95afb61c1cc9..34fb14573e21 100644 --- a/src/crimson/net/ProtocolV1.cc +++ b/src/crimson/net/ProtocolV1.cc @@ -848,10 +848,10 @@ seastar::future<> ProtocolV1::read_message() }).then([this] (bufferlist bl) { auto p = bl.cbegin(); ::decode(m.footer, p); - auto pconn = seastar::static_pointer_cast( + auto conn_ref = seastar::static_pointer_cast( conn.shared_from_this()); auto msg = ::decode_message(nullptr, 0, m.header, m.footer, - m.front, m.middle, m.data, std::move(pconn)); + m.front, m.middle, m.data, conn_ref); if (unlikely(!msg)) { logger().warn("{} decode message failed", conn); throw std::system_error{make_error_code(error::corrupted_message)}; @@ -877,7 +877,7 @@ seastar::future<> ProtocolV1::read_message() logger().debug("{} <== #{} === {} ({})", conn, msg_ref->get_seq(), *msg_ref, msg_ref->get_type()); // throttle the reading process by the returned future - return dispatchers.ms_dispatch(&conn, std::move(msg_ref)); + return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref)); }); } diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 194b6af486bc..4d7d06d7a336 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -1883,11 +1883,10 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp) ceph_msg_footer footer{init_le32(0), init_le32(0), init_le32(0), init_le64(0), current_header.flags}; - auto pconn = seastar::static_pointer_cast( + auto conn_ref = seastar::static_pointer_cast( conn.shared_from_this()); Message *message = decode_message(nullptr, 0, header, footer, - msg_frame.front(), msg_frame.middle(), msg_frame.data(), - std::move(pconn)); + msg_frame.front(), msg_frame.middle(), msg_frame.data(), conn_ref); if (!message) { logger().warn("{} decode message failed", conn); abort_in_fault(); @@ -1933,7 +1932,7 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp) // TODO: change MessageRef with seastar::shared_ptr auto msg_ref = MessageRef{message, false}; // throttle the reading process by the returned future - return dispatchers.ms_dispatch(&conn, std::move(msg_ref)); + return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref)); }); } diff --git a/src/crimson/net/chained_dispatchers.cc b/src/crimson/net/chained_dispatchers.cc index 6aa9b9329313..635794e0db3b 100644 --- a/src/crimson/net/chained_dispatchers.cc +++ b/src/crimson/net/chained_dispatchers.cc @@ -13,7 +13,7 @@ namespace { namespace crimson::net { seastar::future<> -ChainedDispatchers::ms_dispatch(crimson::net::Connection* conn, +ChainedDispatchers::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m) { try { for (auto& dispatcher : dispatchers) { diff --git a/src/crimson/net/chained_dispatchers.h b/src/crimson/net/chained_dispatchers.h index 9df7b36f1f77..531fe906369c 100644 --- a/src/crimson/net/chained_dispatchers.h +++ b/src/crimson/net/chained_dispatchers.h @@ -25,7 +25,7 @@ public: bool empty() const { return dispatchers.empty(); } - seastar::future<> ms_dispatch(crimson::net::Connection* conn, MessageRef m); + seastar::future<> ms_dispatch(crimson::net::ConnectionRef, MessageRef); void ms_handle_accept(crimson::net::ConnectionRef conn); void ms_handle_connect(crimson::net::ConnectionRef conn); void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace); diff --git a/src/crimson/osd/heartbeat.cc b/src/crimson/osd/heartbeat.cc index 4c64e1573ac6..23014cf7f937 100644 --- a/src/crimson/osd/heartbeat.cc +++ b/src/crimson/osd/heartbeat.cc @@ -204,7 +204,7 @@ void Heartbeat::remove_peer(osd_id_t peer) } std::tuple> -Heartbeat::ms_dispatch(crimson::net::Connection* conn, MessageRef m) +Heartbeat::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m) { bool dispatched = true; gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] { @@ -258,7 +258,7 @@ void Heartbeat::ms_handle_accept(crimson::net::ConnectionRef conn) } } -seastar::future<> Heartbeat::handle_osd_ping(crimson::net::Connection* conn, +seastar::future<> Heartbeat::handle_osd_ping(crimson::net::ConnectionRef conn, Ref m) { switch (m->op) { @@ -273,7 +273,7 @@ seastar::future<> Heartbeat::handle_osd_ping(crimson::net::Connection* conn, } } -seastar::future<> Heartbeat::handle_ping(crimson::net::Connection* conn, +seastar::future<> Heartbeat::handle_ping(crimson::net::ConnectionRef conn, Ref m) { auto min_message = static_cast( @@ -291,7 +291,7 @@ seastar::future<> Heartbeat::handle_ping(crimson::net::Connection* conn, return conn->send(reply); } -seastar::future<> Heartbeat::handle_reply(crimson::net::Connection* conn, +seastar::future<> Heartbeat::handle_reply(crimson::net::ConnectionRef conn, Ref m) { const osd_id_t from = m->get_source().num(); @@ -373,9 +373,9 @@ Heartbeat::Connection::~Connection() } } -bool Heartbeat::Connection::matches(crimson::net::Connection* _conn) const +bool Heartbeat::Connection::matches(crimson::net::ConnectionRef _conn) const { - return (conn && conn.get() == _conn); + return (conn && conn == _conn); } void Heartbeat::Connection::accepted(crimson::net::ConnectionRef accepted_conn) @@ -551,7 +551,7 @@ void Heartbeat::Peer::send_heartbeat( } seastar::future<> Heartbeat::Peer::handle_reply( - crimson::net::Connection* conn, Ref m) + crimson::net::ConnectionRef conn, Ref m) { if (!session.is_started()) { // we haven't sent any ping yet diff --git a/src/crimson/osd/heartbeat.h b/src/crimson/osd/heartbeat.h index 36e1c8c4183f..46d12463c3db 100644 --- a/src/crimson/osd/heartbeat.h +++ b/src/crimson/osd/heartbeat.h @@ -49,18 +49,18 @@ public: // Dispatcher methods std::tuple> ms_dispatch( - crimson::net::Connection* conn, MessageRef m) override; + crimson::net::ConnectionRef conn, MessageRef m) override; void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override; void ms_handle_connect(crimson::net::ConnectionRef conn) override; void ms_handle_accept(crimson::net::ConnectionRef conn) override; void print(std::ostream&) const; private: - seastar::future<> handle_osd_ping(crimson::net::Connection* conn, + seastar::future<> handle_osd_ping(crimson::net::ConnectionRef conn, Ref m); - seastar::future<> handle_ping(crimson::net::Connection* conn, + seastar::future<> handle_ping(crimson::net::ConnectionRef conn, Ref m); - seastar::future<> handle_reply(crimson::net::Connection* conn, + seastar::future<> handle_reply(crimson::net::ConnectionRef conn, Ref m); seastar::future<> handle_you_died(); @@ -182,10 +182,7 @@ class Heartbeat::Connection { ~Connection(); - bool matches(crimson::net::Connection* _conn) const; - bool matches(crimson::net::ConnectionRef conn) const { - return matches(conn.get()); - } + bool matches(crimson::net::ConnectionRef _conn) const; void connected() { set_connected(); } @@ -410,7 +407,7 @@ class Heartbeat::Peer final : private Heartbeat::ConnectionListener { } void send_heartbeat( clock::time_point, ceph::signedspan, std::vector>&); - seastar::future<> handle_reply(crimson::net::Connection*, Ref); + seastar::future<> handle_reply(crimson::net::ConnectionRef, Ref); void handle_reset(crimson::net::ConnectionRef conn, bool is_replace) { for_each_conn([&] (auto& _conn) { if (_conn.matches(conn)) { diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index 264932cfb02c..e8f5143ed28f 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -420,7 +420,7 @@ seastar::future<> OSD::_add_me_to_crush() }); } -seastar::future<> OSD::handle_command(crimson::net::Connection* conn, +seastar::future<> OSD::handle_command(crimson::net::ConnectionRef conn, Ref m) { return asok->handle_command(conn, std::move(m)); @@ -618,7 +618,7 @@ seastar::future> OSD::load_pg(spg_t pgid) } std::tuple> -OSD::ms_dispatch(crimson::net::Connection* conn, MessageRef m) +OSD::ms_dispatch(crimson::net::ConnectionRef conn, MessageRef m) { if (state.is_stopping()) { return {false, seastar::now()}; @@ -633,7 +633,7 @@ OSD::ms_dispatch(crimson::net::Connection* conn, MessageRef m) case MSG_OSD_PG_CREATE2: shard_services.start_operation( *this, - conn->get_shared(), + conn, m); return seastar::now(); case MSG_COMMAND: @@ -952,7 +952,7 @@ seastar::future> OSD::handle_pg_create_info( }); } -seastar::future<> OSD::handle_osd_map(crimson::net::Connection* conn, +seastar::future<> OSD::handle_osd_map(crimson::net::ConnectionRef conn, Ref m) { logger().info("handle_osd_map {}", *m); @@ -1089,17 +1089,17 @@ seastar::future<> OSD::committed_osd_maps(version_t first, }); } -seastar::future<> OSD::handle_osd_op(crimson::net::Connection* conn, +seastar::future<> OSD::handle_osd_op(crimson::net::ConnectionRef conn, Ref m) { (void) shard_services.start_operation( *this, - conn->get_shared(), + conn, std::move(m)); return seastar::now(); } -seastar::future<> OSD::send_incremental_map(crimson::net::Connection* conn, +seastar::future<> OSD::send_incremental_map(crimson::net::ConnectionRef conn, epoch_t first) { if (first >= superblock.oldest_map) { @@ -1125,18 +1125,18 @@ seastar::future<> OSD::send_incremental_map(crimson::net::Connection* conn, } } -seastar::future<> OSD::handle_rep_op(crimson::net::Connection* conn, +seastar::future<> OSD::handle_rep_op(crimson::net::ConnectionRef conn, Ref m) { m->finish_decode(); (void) shard_services.start_operation( *this, - conn->get_shared(), + std::move(conn), std::move(m)); return seastar::now(); } -seastar::future<> OSD::handle_rep_op_reply(crimson::net::Connection* conn, +seastar::future<> OSD::handle_rep_op_reply(crimson::net::ConnectionRef conn, Ref m) { const auto& pgs = pg_map.get_pgs(); @@ -1149,7 +1149,7 @@ seastar::future<> OSD::handle_rep_op_reply(crimson::net::Connection* conn, return seastar::now(); } -seastar::future<> OSD::handle_scrub(crimson::net::Connection* conn, +seastar::future<> OSD::handle_scrub(crimson::net::ConnectionRef conn, Ref m) { if (m->fsid != superblock.cluster_fsid) { @@ -1157,7 +1157,7 @@ seastar::future<> OSD::handle_scrub(crimson::net::Connection* conn, return seastar::now(); } return seastar::parallel_for_each(std::move(m->scrub_pgs), - [m, conn=conn->get_shared(), this](spg_t pgid) { + [m, conn, this](spg_t pgid) { pg_shard_t from_shard{static_cast(m->get_source().num()), pgid.shard}; PeeringState::RequestScrub scrub_request{m->deep, m->repair}; @@ -1171,7 +1171,7 @@ seastar::future<> OSD::handle_scrub(crimson::net::Connection* conn, }); } -seastar::future<> OSD::handle_mark_me_down(crimson::net::Connection* conn, +seastar::future<> OSD::handle_mark_me_down(crimson::net::ConnectionRef conn, Ref m) { if (state.is_prestop()) { @@ -1180,12 +1180,12 @@ seastar::future<> OSD::handle_mark_me_down(crimson::net::Connection* conn, return seastar::now(); } -seastar::future<> OSD::handle_recovery_subreq(crimson::net::Connection* conn, +seastar::future<> OSD::handle_recovery_subreq(crimson::net::ConnectionRef conn, Ref m) { (void) shard_services.start_operation( *this, - conn->get_shared(), + conn, std::move(m)); return seastar::now(); } @@ -1269,7 +1269,7 @@ void OSD::update_heartbeat_peers() } seastar::future<> OSD::handle_peering_op( - crimson::net::Connection* conn, + crimson::net::ConnectionRef conn, Ref m) { const int from = m->get_source().num(); @@ -1277,7 +1277,7 @@ seastar::future<> OSD::handle_peering_op( std::unique_ptr evt(m->get_event()); (void) shard_services.start_operation( *this, - conn->get_shared(), + conn, shard_services, pg_shard_t{from, m->get_spg().shard}, m->get_spg(), diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index 2f25fbe79a7d..5b6fcc8448c8 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -96,7 +96,7 @@ class OSD final : public crimson::net::Dispatcher, OSDSuperblock superblock; // Dispatcher methods - std::tuple> ms_dispatch(crimson::net::Connection*, MessageRef) final; + std::tuple> ms_dispatch(crimson::net::ConnectionRef, MessageRef) final; void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final; void ms_handle_remote_reset(crimson::net::ConnectionRef conn) final; @@ -136,7 +136,7 @@ public: void dump_pg_state_history(Formatter*) const; void print(std::ostream&) const; - seastar::future<> send_incremental_map(crimson::net::Connection* conn, + seastar::future<> send_incremental_map(crimson::net::ConnectionRef conn, epoch_t first); /// @return the seq id of the pg stats being sent @@ -178,21 +178,21 @@ private: seastar::future> handle_pg_create_info( std::unique_ptr info); - seastar::future<> handle_osd_map(crimson::net::Connection* conn, + seastar::future<> handle_osd_map(crimson::net::ConnectionRef conn, Ref m); - seastar::future<> handle_osd_op(crimson::net::Connection* conn, + seastar::future<> handle_osd_op(crimson::net::ConnectionRef conn, Ref m); - seastar::future<> handle_rep_op(crimson::net::Connection* conn, + seastar::future<> handle_rep_op(crimson::net::ConnectionRef conn, Ref m); - seastar::future<> handle_rep_op_reply(crimson::net::Connection* conn, + seastar::future<> handle_rep_op_reply(crimson::net::ConnectionRef conn, Ref m); - seastar::future<> handle_peering_op(crimson::net::Connection* conn, + seastar::future<> handle_peering_op(crimson::net::ConnectionRef conn, Ref m); - seastar::future<> handle_recovery_subreq(crimson::net::Connection* conn, + seastar::future<> handle_recovery_subreq(crimson::net::ConnectionRef conn, Ref m); - seastar::future<> handle_scrub(crimson::net::Connection* conn, + seastar::future<> handle_scrub(crimson::net::ConnectionRef conn, Ref m); - seastar::future<> handle_mark_me_down(crimson::net::Connection* conn, + seastar::future<> handle_mark_me_down(crimson::net::ConnectionRef conn, Ref m); seastar::future<> committed_osd_maps(version_t first, @@ -201,7 +201,7 @@ private: void check_osdmap_features(); - seastar::future<> handle_command(crimson::net::Connection* conn, + seastar::future<> handle_command(crimson::net::ConnectionRef conn, Ref m); seastar::future<> start_asok_admin(); diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index f2fa0e205075..97e5f9524759 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -70,7 +70,7 @@ seastar::future<> ClientRequest::start() }).then([this, opref](Ref pgref) { PG &pg = *pgref; if (pg.can_discard_op(*m)) { - return osd.send_incremental_map(conn.get(), m->get_map_epoch()); + return osd.send_incremental_map(conn, m->get_map_epoch()); } return with_blocking_future( handle.enter(pp(pg).await_map) diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 7598326379c7..548391cc27da 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -946,7 +946,7 @@ seastar::future<> PG::handle_rep_op(Ref req) }); } -void PG::handle_rep_op_reply(crimson::net::Connection* conn, +void PG::handle_rep_op_reply(crimson::net::ConnectionRef conn, const MOSDRepOpReply& m) { if (!can_discard_replica_op(m)) { diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index bccb74b198a0..bd5d43f7db80 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -513,7 +513,7 @@ public: with_obc_func_t&& f); seastar::future<> handle_rep_op(Ref m); - void handle_rep_op_reply(crimson::net::Connection* conn, + void handle_rep_op_reply(crimson::net::ConnectionRef conn, const MOSDRepOpReply& m); void print(std::ostream& os) const; diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index d5699f975268..5afb6f55f325 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -58,7 +58,7 @@ static seastar::future<> test_echo(unsigned rounds, crimson::auth::DummyAuthClientServer dummy_auth; std::tuple> ms_dispatch( - crimson::net::Connection* c, MessageRef m) override { + crimson::net::ConnectionRef c, MessageRef m) override { if (verbose) { logger().info("server got {}", *m); } @@ -104,15 +104,15 @@ static seastar::future<> test_echo(unsigned rounds, unsigned rounds; std::bernoulli_distribution keepalive_dist; crimson::net::MessengerRef msgr; - std::map> pending_conns; - std::map sessions; + std::map> pending_conns; + std::map sessions; crimson::auth::DummyAuthClientServer dummy_auth; Client(unsigned rounds, double keepalive_ratio) : rounds(rounds), keepalive_dist(std::bernoulli_distribution{keepalive_ratio}) {} - PingSessionRef find_session(crimson::net::Connection* c) { + PingSessionRef find_session(crimson::net::ConnectionRef c) { auto found = sessions.find(c); if (found == sessions.end()) { ceph_assert(false); @@ -122,13 +122,13 @@ static seastar::future<> test_echo(unsigned rounds, void ms_handle_connect(crimson::net::ConnectionRef conn) override { auto session = seastar::make_shared(); - auto [i, added] = sessions.emplace(conn.get(), session); + auto [i, added] = sessions.emplace(conn, session); std::ignore = i; ceph_assert(added); session->connected_time = mono_clock::now(); } std::tuple> ms_dispatch( - crimson::net::Connection* c, MessageRef m) override { + crimson::net::ConnectionRef c, MessageRef m) override { auto session = find_session(c); ++(session->count); if (verbose) { @@ -165,9 +165,9 @@ static seastar::future<> test_echo(unsigned rounds, mono_time start_time = mono_clock::now(); auto conn = msgr->connect(peer_addr, entity_name_t::TYPE_OSD); return seastar::futurize_invoke([this, conn] { - return do_dispatch_pingpong(conn.get()); + return do_dispatch_pingpong(conn); }).then([this, conn, start_time] { - auto session = find_session(conn.get()); + auto session = find_session(conn); std::chrono::duration dur_handshake = session->connected_time - start_time; std::chrono::duration dur_pingpong = session->finish_time - session->connected_time; logger().info("{}: handshake {}, pingpong {}", @@ -176,7 +176,7 @@ static seastar::future<> test_echo(unsigned rounds, } private: - seastar::future<> do_dispatch_pingpong(crimson::net::Connection* conn) { + seastar::future<> do_dispatch_pingpong(crimson::net::ConnectionRef conn) { auto [i, added] = pending_conns.emplace(conn, seastar::promise<>()); std::ignore = i; ceph_assert(added); @@ -278,7 +278,7 @@ static seastar::future<> test_concurrent_dispatch(bool v2) crimson::auth::DummyAuthClientServer dummy_auth; std::tuple> ms_dispatch( - crimson::net::Connection* c, MessageRef m) override { + crimson::net::ConnectionRef, MessageRef m) override { switch (++count) { case 1: // block on the first request until we reenter with the second @@ -320,7 +320,7 @@ static seastar::future<> test_concurrent_dispatch(bool v2) crimson::auth::DummyAuthClientServer dummy_auth; std::tuple> ms_dispatch( - crimson::net::Connection* c, MessageRef m) override { + crimson::net::ConnectionRef, MessageRef m) override { return {true, seastar::now()}; } @@ -379,7 +379,7 @@ seastar::future<> test_preemptive_shutdown(bool v2) { crimson::auth::DummyAuthClientServer dummy_auth; std::tuple> ms_dispatch( - crimson::net::Connection* c, MessageRef m) override { + crimson::net::ConnectionRef c, MessageRef m) override { std::ignore = c->send(make_message()); return {true, seastar::now()}; } @@ -420,7 +420,7 @@ seastar::future<> test_preemptive_shutdown(bool v2) { seastar::promise<> stopped_send_promise; std::tuple> ms_dispatch( - crimson::net::Connection* c, MessageRef m) override { + crimson::net::ConnectionRef, MessageRef m) override { return {true, seastar::now()}; } @@ -813,14 +813,14 @@ class FailoverSuite : public Dispatcher { unsigned pending_peer_receive = 0; unsigned pending_receive = 0; - std::tuple> ms_dispatch(Connection* c, MessageRef m) override { - auto result = interceptor.find_result(c->shared_from_this()); + std::tuple> ms_dispatch(ConnectionRef c, MessageRef m) override { + auto result = interceptor.find_result(c); if (result == nullptr) { logger().error("Untracked ms dispatched connection: {}", *c); ceph_abort(); } - if (tracked_conn != c->shared_from_this()) { + if (tracked_conn != c) { logger().error("[{}] {} got op, but doesn't match tracked_conn [{}] {}", result->index, *c, tracked_index, *tracked_conn); ceph_abort(); @@ -1209,7 +1209,7 @@ class FailoverTest : public Dispatcher { std::unique_ptr test_suite; - std::tuple> ms_dispatch(Connection* c, MessageRef m) override { + std::tuple> ms_dispatch(ConnectionRef c, MessageRef m) override { switch (m->get_type()) { case CEPH_MSG_PING: ceph_assert(recv_pong); @@ -1407,10 +1407,10 @@ class FailoverSuitePeer : public Dispatcher { ConnectionRef tracked_conn; unsigned pending_send = 0; - std::tuple> ms_dispatch(Connection* c, MessageRef m) override { + std::tuple> ms_dispatch(ConnectionRef c, MessageRef m) override { logger().info("[TestPeer] got op from Test"); ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); - ceph_assert(tracked_conn == c->shared_from_this()); + ceph_assert(tracked_conn == c); std::ignore = op_callback(); return {true, seastar::now()}; } @@ -1537,8 +1537,8 @@ class FailoverTestPeer : public Dispatcher { const entity_addr_t test_peer_addr; std::unique_ptr test_suite; - std::tuple> ms_dispatch(Connection* c, MessageRef m) override { - ceph_assert(cmd_conn == c->shared_from_this()); + std::tuple> ms_dispatch(ConnectionRef c, MessageRef m) override { + ceph_assert(cmd_conn == c); switch (m->get_type()) { case CEPH_MSG_PING: std::ignore = c->send(make_message()); diff --git a/src/tools/crimson/perf_crimson_msgr.cc b/src/tools/crimson/perf_crimson_msgr.cc index 4acd01e0d32c..e20a570292f1 100644 --- a/src/tools/crimson/perf_crimson_msgr.cc +++ b/src/tools/crimson/perf_crimson_msgr.cc @@ -153,7 +153,7 @@ static seastar::future<> run( } std::tuple> ms_dispatch( - crimson::net::Connection* c, MessageRef m) override { + crimson::net::ConnectionRef c, MessageRef m) override { ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); // server replies with MOSDOp to generate server-side write workload @@ -308,7 +308,7 @@ static seastar::future<> run( conn_stats.connected_time = mono_clock::now(); } std::tuple> ms_dispatch( - crimson::net::Connection* c, MessageRef m) override { + crimson::net::ConnectionRef, MessageRef m) override { // server replies with MOSDOp to generate server-side write workload ceph_assert(m->get_type() == CEPH_MSG_OSD_OP);