From c69bf0c002f1b62283b3eb5df62fdcb65963b4ae Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Fri, 19 Apr 2019 16:50:25 +0800 Subject: [PATCH] crimson: pass `Connection*` to Dispatch::ms_dispatch() currently, we use a `with_gate()` in `ProtocolV2::read_message()` for ensuring that `this` (or `Connection` holding this protocol instance) will outlive the continuation of `dispatcher.ms_dispatch()` which references `this->dispatcher`. but we also pass a strong reference of connection to dispatcher. in short, we have *two* safeguards for the same purpose. in this change, one of these safeguards is removed -- to pass the raw pointer of `Connection` to `Dispatch::ms_dispatch()`. the reason why the `with_gate()` is kept is that, if we have removed `with_gate()` in Protocol, we need to 1. let `Dispatcher::ms_dispatch()` return `void`, as it should not block any succeeding calls. 2. add a `with_gate()` in `Dispatcher::ms_dispatch()` to ensure that `this` is alive during the lifecycle of the continuation(s) in `Dispatcher::ms_dispatch()`. Signed-off-by: Yingxin Cheng Signed-off-by: Kefu Chai --- src/crimson/mgr/client.cc | 6 +++--- src/crimson/mgr/client.h | 6 +++--- src/crimson/mon/MonClient.cc | 6 +++--- src/crimson/mon/MonClient.h | 6 +++--- src/crimson/net/Dispatcher.h | 2 +- src/crimson/net/ProtocolV1.cc | 5 +---- src/crimson/net/ProtocolV2.cc | 7 ++----- src/crimson/osd/chained_dispatchers.cc | 2 +- src/crimson/osd/chained_dispatchers.h | 2 +- src/crimson/osd/heartbeat.cc | 8 ++++---- src/crimson/osd/heartbeat.h | 8 ++++---- src/test/crimson/perf_crimson_msgr.cc | 4 ++-- src/test/crimson/test_alien_echo.cc | 4 ++-- src/test/crimson/test_messenger.cc | 20 ++++++++++---------- 14 files changed, 40 insertions(+), 46 deletions(-) diff --git a/src/crimson/mgr/client.cc b/src/crimson/mgr/client.cc index bdce3f14fa734..fd617543423b8 100644 --- a/src/crimson/mgr/client.cc +++ b/src/crimson/mgr/client.cc @@ -49,7 +49,7 @@ seastar::future<> Client::stop() }); } -seastar::future<> Client::ms_dispatch(ceph::net::ConnectionRef conn, +seastar::future<> Client::ms_dispatch(ceph::net::Connection* conn, MessageRef m) { switch(m->get_type()) { @@ -90,7 +90,7 @@ seastar::future<> Client::reconnect() }); } -seastar::future<> Client::handle_mgr_map(ceph::net::ConnectionRef, +seastar::future<> Client::handle_mgr_map(ceph::net::Connection*, Ref m) { mgrmap = m->get_map(); @@ -104,7 +104,7 @@ seastar::future<> Client::handle_mgr_map(ceph::net::ConnectionRef, } } -seastar::future<> Client::handle_mgr_conf(ceph::net::ConnectionRef conn, +seastar::future<> Client::handle_mgr_conf(ceph::net::Connection* conn, Ref m) { logger().info("{} {}", __func__, *m); diff --git a/src/crimson/mgr/client.h b/src/crimson/mgr/client.h index 016044bebbb9c..1a63b60fb6815 100644 --- a/src/crimson/mgr/client.h +++ b/src/crimson/mgr/client.h @@ -37,12 +37,12 @@ public: seastar::future<> start(); seastar::future<> stop(); private: - seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn, + seastar::future<> ms_dispatch(ceph::net::Connection* conn, Ref m) override; seastar::future<> ms_handle_reset(ceph::net::ConnectionRef conn) override; - seastar::future<> handle_mgr_map(ceph::net::ConnectionRef conn, + seastar::future<> handle_mgr_map(ceph::net::Connection* conn, Ref m); - seastar::future<> handle_mgr_conf(ceph::net::ConnectionRef conn, + seastar::future<> handle_mgr_conf(ceph::net::Connection* conn, Ref m); seastar::future<> reconnect(); void report(); diff --git a/src/crimson/mon/MonClient.cc b/src/crimson/mon/MonClient.cc index 6676a26883a0a..4a527b574525b 100644 --- a/src/crimson/mon/MonClient.cc +++ b/src/crimson/mon/MonClient.cc @@ -433,7 +433,7 @@ bool Client::is_hunting() const { } seastar::future<> -Client::ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m) +Client::ms_dispatch(ceph::net::Connection* conn, MessageRef m) { // we only care about these message types switch (m->get_type()) { @@ -689,7 +689,7 @@ int Client::handle_auth_bad_method(ceph::net::ConnectionRef conn, } } -seastar::future<> Client::handle_monmap(ceph::net::ConnectionRef conn, +seastar::future<> Client::handle_monmap(ceph::net::Connection* conn, Ref m) { monmap.decode(m->monmapbl); @@ -707,7 +707,7 @@ seastar::future<> Client::handle_monmap(ceph::net::ConnectionRef conn, } } -seastar::future<> Client::handle_auth_reply(ceph::net::ConnectionRef conn, +seastar::future<> Client::handle_auth_reply(ceph::net::Connection* conn, Ref m) { logger().info("mon {} => {} returns {}: {}", diff --git a/src/crimson/mon/MonClient.h b/src/crimson/mon/MonClient.h index 940574eaf5350..852494db3984b 100644 --- a/src/crimson/mon/MonClient.h +++ b/src/crimson/mon/MonClient.h @@ -143,14 +143,14 @@ private: private: void tick(); - seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn, + seastar::future<> ms_dispatch(ceph::net::Connection* conn, MessageRef m) override; seastar::future<> ms_handle_reset(ceph::net::ConnectionRef conn) override; AuthAuthorizer* ms_get_authorizer(peer_type_t peer) const override; - seastar::future<> handle_monmap(ceph::net::ConnectionRef conn, + seastar::future<> handle_monmap(ceph::net::Connection* conn, Ref m); - seastar::future<> handle_auth_reply(ceph::net::ConnectionRef conn, + seastar::future<> handle_auth_reply(ceph::net::Connection* conn, Ref m); seastar::future<> handle_subscribe_ack(Ref m); seastar::future<> handle_get_version_reply(Ref m); diff --git a/src/crimson/net/Dispatcher.h b/src/crimson/net/Dispatcher.h index 8bcc47c917cf9..7a725e2fccff2 100644 --- a/src/crimson/net/Dispatcher.h +++ b/src/crimson/net/Dispatcher.h @@ -27,7 +27,7 @@ class Dispatcher { public: virtual ~Dispatcher() {} - virtual seastar::future<> ms_dispatch(ConnectionRef conn, MessageRef m) { + virtual seastar::future<> ms_dispatch(Connection* conn, MessageRef m) { return seastar::make_ready_future<>(); } diff --git a/src/crimson/net/ProtocolV1.cc b/src/crimson/net/ProtocolV1.cc index 3730e23601363..bc88913aaa0ad 100644 --- a/src/crimson/net/ProtocolV1.cc +++ b/src/crimson/net/ProtocolV1.cc @@ -733,10 +733,7 @@ seastar::future<> ProtocolV1::read_message() seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] { logger().debug("{} <= {}@{} === {}", messenger, msg->get_source(), conn.peer_addr, *msg); - return dispatcher.ms_dispatch( - seastar::static_pointer_cast( - conn.shared_from_this()), - std::move(msg)) + return dispatcher.ms_dispatch(&conn, std::move(msg)) .handle_exception([this] (std::exception_ptr eptr) { logger().error("{} ms_dispatch caught exception: {}", conn, eptr); ceph_assert(false); diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 086914da95d6d..6fdc7163e0491 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -1466,11 +1466,8 @@ seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp) // TODO: change MessageRef with seastar::shared_ptr auto msg_ref = MessageRef{message, false}; seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] { - return dispatcher.ms_dispatch( - seastar::static_pointer_cast( - conn.shared_from_this()), - std::move(msg)) - .handle_exception([this] (std::exception_ptr eptr) { + return dispatcher.ms_dispatch(&conn, std::move(msg)) + .handle_exception([this] (std::exception_ptr eptr) { logger().error("{} ms_dispatch caught exception: {}", conn, eptr); ceph_assert(false); }); diff --git a/src/crimson/osd/chained_dispatchers.cc b/src/crimson/osd/chained_dispatchers.cc index dfca51e8cbad0..0473c8b00fe4b 100644 --- a/src/crimson/osd/chained_dispatchers.cc +++ b/src/crimson/osd/chained_dispatchers.cc @@ -3,7 +3,7 @@ seastar::future<> -ChainedDispatchers::ms_dispatch(ceph::net::ConnectionRef conn, +ChainedDispatchers::ms_dispatch(ceph::net::Connection* conn, MessageRef m) { return seastar::do_for_each(dispatchers, [conn, m](Dispatcher* dispatcher) { return dispatcher->ms_dispatch(conn, m); diff --git a/src/crimson/osd/chained_dispatchers.h b/src/crimson/osd/chained_dispatchers.h index 8368021c86998..1871e7416d04d 100644 --- a/src/crimson/osd/chained_dispatchers.h +++ b/src/crimson/osd/chained_dispatchers.h @@ -22,7 +22,7 @@ public: void push_back(Dispatcher* dispatcher) { dispatchers.push_back(dispatcher); } - seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m) override; + seastar::future<> ms_dispatch(ceph::net::Connection* conn, MessageRef m) override; seastar::future<> ms_handle_accept(ceph::net::ConnectionRef conn) override; seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override; seastar::future<> ms_handle_reset(ceph::net::ConnectionRef conn) override; diff --git a/src/crimson/osd/heartbeat.cc b/src/crimson/osd/heartbeat.cc index a44fd7e79a8f9..c3f86a3fea738 100644 --- a/src/crimson/osd/heartbeat.cc +++ b/src/crimson/osd/heartbeat.cc @@ -201,7 +201,7 @@ seastar::future<> Heartbeat::remove_peer(osd_id_t peer) }); } -seastar::future<> Heartbeat::ms_dispatch(ceph::net::ConnectionRef conn, +seastar::future<> Heartbeat::ms_dispatch(ceph::net::Connection* conn, MessageRef m) { switch (m->get_type()) { @@ -229,7 +229,7 @@ seastar::future<> Heartbeat::ms_handle_reset(ceph::net::ConnectionRef conn) }); } -seastar::future<> Heartbeat::handle_osd_ping(ceph::net::ConnectionRef conn, +seastar::future<> Heartbeat::handle_osd_ping(ceph::net::Connection* conn, Ref m) { switch (m->op) { @@ -244,7 +244,7 @@ seastar::future<> Heartbeat::handle_osd_ping(ceph::net::ConnectionRef conn, } } -seastar::future<> Heartbeat::handle_ping(ceph::net::ConnectionRef conn, +seastar::future<> Heartbeat::handle_ping(ceph::net::Connection* conn, Ref m) { auto min_message = static_cast( @@ -258,7 +258,7 @@ seastar::future<> Heartbeat::handle_ping(ceph::net::ConnectionRef conn, return conn->send(reply); } -seastar::future<> Heartbeat::handle_reply(ceph::net::ConnectionRef conn, +seastar::future<> Heartbeat::handle_reply(ceph::net::Connection* conn, Ref m) { const osd_id_t from = m->get_source().num(); diff --git a/src/crimson/osd/heartbeat.h b/src/crimson/osd/heartbeat.h index 081c06445ea8f..4046b208a9e92 100644 --- a/src/crimson/osd/heartbeat.h +++ b/src/crimson/osd/heartbeat.h @@ -42,17 +42,17 @@ public: const entity_addrvec_t& get_back_addrs() const; // Dispatcher methods - seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn, + seastar::future<> ms_dispatch(ceph::net::Connection* conn, MessageRef m) override; seastar::future<> ms_handle_reset(ceph::net::ConnectionRef conn) override; AuthAuthorizer* ms_get_authorizer(peer_type_t peer) const override; private: - seastar::future<> handle_osd_ping(ceph::net::ConnectionRef conn, + seastar::future<> handle_osd_ping(ceph::net::Connection* conn, Ref m); - seastar::future<> handle_ping(ceph::net::ConnectionRef conn, + seastar::future<> handle_ping(ceph::net::Connection* conn, Ref m); - seastar::future<> handle_reply(ceph::net::ConnectionRef conn, + seastar::future<> handle_reply(ceph::net::Connection* conn, Ref m); seastar::future<> handle_you_died(); diff --git a/src/test/crimson/perf_crimson_msgr.cc b/src/test/crimson/perf_crimson_msgr.cc index 07f67210a9115..ee678ccedf0d3 100644 --- a/src/test/crimson/perf_crimson_msgr.cc +++ b/src/test/crimson/perf_crimson_msgr.cc @@ -128,7 +128,7 @@ static seastar::future<> run( seastar::future<> stop() { return seastar::make_ready_future<>(); } - seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, + seastar::future<> ms_dispatch(ceph::net::Connection* c, MessageRef m) override { ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); @@ -224,7 +224,7 @@ static seastar::future<> run( active_session->connected_time = mono_clock::now(); return seastar::now(); } - seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, + seastar::future<> ms_dispatch(ceph::net::Connection* c, MessageRef m) override { // server replies with MOSDOp to generate server-side write workload ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); diff --git a/src/test/crimson/test_alien_echo.cc b/src/test/crimson/test_alien_echo.cc index 8e3272cab81ca..ff7c2df6d441e 100644 --- a/src/test/crimson/test_alien_echo.cc +++ b/src/test/crimson/test_alien_echo.cc @@ -42,7 +42,7 @@ struct Server { struct ServerDispatcher : ceph::net::Dispatcher { unsigned count = 0; seastar::condition_variable on_reply; - seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, + seastar::future<> ms_dispatch(ceph::net::Connection* c, MessageRef m) override { std::cout << "server got ping " << *m << std::endl; // reply with a pong @@ -79,7 +79,7 @@ struct Client { struct ClientDispatcher : ceph::net::Dispatcher { unsigned count = 0; seastar::condition_variable on_reply; - seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, + seastar::future<> ms_dispatch(ceph::net::Connection* c, MessageRef m) override { std::cout << "client got pong " << *m << std::endl; ++count; diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index de80376b8630a..ea942618ce40e 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -45,7 +45,7 @@ static seastar::future<> test_echo(unsigned rounds, seastar::future<> stop() { return seastar::make_ready_future<>(); } - seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, + seastar::future<> ms_dispatch(ceph::net::Connection* c, MessageRef m) override { if (verbose) { logger().info("server got {}", *m); @@ -93,7 +93,7 @@ static seastar::future<> test_echo(unsigned rounds, std::bernoulli_distribution keepalive_dist; ceph::net::Messenger *msgr = nullptr; std::map> pending_conns; - std::map sessions; + std::map sessions; MessageRef msg_ping{new MPing(), false}; ceph::auth::DummyAuthClientServer dummy_auth; @@ -101,7 +101,7 @@ static seastar::future<> test_echo(unsigned rounds, : rounds(rounds), keepalive_dist(std::bernoulli_distribution{keepalive_ratio}) {} - PingSessionRef find_session(ceph::net::ConnectionRef c) { + PingSessionRef find_session(ceph::net::Connection* c) { auto found = sessions.find(c); if (found == sessions.end()) { ceph_assert(false); @@ -118,13 +118,13 @@ static seastar::future<> test_echo(unsigned rounds, seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override { logger().info("{}: connected to {}", *conn, conn->get_peer_addr()); auto session = seastar::make_shared(); - auto [i, added] = sessions.emplace(conn, session); + auto [i, added] = sessions.emplace(conn.get(), session); std::ignore = i; ceph_assert(added); session->connected_time = mono_clock::now(); return seastar::now(); } - seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, + seastar::future<> ms_dispatch(ceph::net::Connection* c, MessageRef m) override { auto session = find_session(c); ++(session->count); @@ -133,10 +133,10 @@ static seastar::future<> test_echo(unsigned rounds, } if (session->count == rounds) { - logger().info("{}: finished receiving {} pongs", *c.get(), session->count); + logger().info("{}: finished receiving {} pongs", *c, session->count); session->finish_time = mono_clock::now(); - return container().invoke_on_all([conn = c.get()](auto &client) { - auto found = client.pending_conns.find(conn); + return container().invoke_on_all([c](auto &client) { + auto found = client.pending_conns.find(c); ceph_assert(found != client.pending_conns.end()); found->second.set_value(); }); @@ -181,7 +181,7 @@ static seastar::future<> test_echo(unsigned rounds, } }).finally([this, conn, start_time] { return container().invoke_on(conn->get()->shard_id(), [conn, start_time](auto &client) { - auto session = client.find_session((*conn)->shared_from_this()); + auto session = client.find_session(&**conn); std::chrono::duration dur_handshake = session->connected_time - start_time; std::chrono::duration dur_pingpong = session->finish_time - session->connected_time; logger().info("{}: handshake {}, pingpong {}", @@ -305,7 +305,7 @@ static seastar::future<> test_concurrent_dispatch(bool v2) seastar::promise<> on_done; // satisfied when first dispatch unblocks ceph::auth::DummyAuthClientServer dummy_auth; - seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, + seastar::future<> ms_dispatch(ceph::net::Connection* c, MessageRef m) override { switch (++count) { case 1: -- 2.39.5