From ff2c3b597de4c5707e18529dfa6bed162026014c Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Fri, 27 Nov 2020 14:03:43 +0800 Subject: [PATCH] crimson/net: cleanup interfaces to start and stop a messenger There is no on-going iterations in the new ChainedDispatchers::ms_dispatch() implementation, so we no longer need to worry about removing dispatchers when stopping the messenger. So the ""boost::intrusive::slist" is not needed, and we can use cleaner interfaces to start and stop the messenger. Also fixed an regression issue in perf_crimson_msgr caused by ChainedDispatchers. Signed-off-by: Yingxin Cheng --- src/crimson/net/Dispatcher.h | 9 +--- src/crimson/net/Fwd.h | 4 +- src/crimson/net/Messenger.h | 23 ++++++--- src/crimson/net/Protocol.cc | 8 +-- src/crimson/net/Protocol.h | 4 +- src/crimson/net/ProtocolV1.cc | 10 ++-- src/crimson/net/ProtocolV1.h | 2 +- src/crimson/net/ProtocolV2.cc | 14 ++--- src/crimson/net/ProtocolV2.h | 2 +- src/crimson/net/SocketConnection.cc | 6 +-- src/crimson/net/SocketConnection.h | 4 +- src/crimson/net/SocketMessenger.cc | 9 ++-- src/crimson/net/SocketMessenger.h | 25 ++++----- src/crimson/net/chained_dispatchers.cc | 15 ++++-- src/crimson/net/chained_dispatchers.h | 30 +++++------ src/crimson/osd/heartbeat.cc | 21 +++----- src/crimson/osd/heartbeat.h | 4 +- src/crimson/osd/osd.cc | 28 ++++------ src/crimson/osd/osd.h | 1 - src/test/crimson/test_messenger.cc | 71 +++++++++----------------- src/tools/crimson/perf_crimson_msgr.cc | 12 ++--- 21 files changed, 130 insertions(+), 172 deletions(-) diff --git a/src/crimson/net/Dispatcher.h b/src/crimson/net/Dispatcher.h index e106f7a4b4e5f..00cd7d474a2a3 100644 --- a/src/crimson/net/Dispatcher.h +++ b/src/crimson/net/Dispatcher.h @@ -14,20 +14,13 @@ #pragma once -#include -#include -#include - -#include "crimson/common/gated.h" #include "Fwd.h" class AuthAuthorizer; namespace crimson::net { -class Dispatcher : public boost::intrusive::slist_base_hook< - boost::intrusive::link_mode< - boost::intrusive::safe_link>> { +class Dispatcher { public: virtual ~Dispatcher() {} diff --git a/src/crimson/net/Fwd.h b/src/crimson/net/Fwd.h index 2221533967243..8dab402b39621 100644 --- a/src/crimson/net/Fwd.h +++ b/src/crimson/net/Fwd.h @@ -14,6 +14,8 @@ #pragma once +#include +#include #include #include @@ -34,7 +36,7 @@ using stop_t = seastar::stop_iteration; class Connection; using ConnectionRef = seastar::shared_ptr; -class Dispatcher; +class ChainedDispatchers; class Messenger; using MessengerRef = seastar::shared_ptr; diff --git a/src/crimson/net/Messenger.h b/src/crimson/net/Messenger.h index 1ba943665926d..60326135bc6ea 100644 --- a/src/crimson/net/Messenger.h +++ b/src/crimson/net/Messenger.h @@ -14,11 +14,10 @@ #pragma once -#include +#include #include "Fwd.h" #include "crimson/common/throttle.h" -#include "crimson/net/chained_dispatchers.h" #include "msg/Message.h" #include "msg/Policy.h" @@ -35,6 +34,8 @@ namespace crimson::net { class Interceptor; #endif +class Dispatcher; + using Throttle = crimson::common::Throttle; using SocketPolicy = ceph::net::Policy; @@ -77,7 +78,13 @@ public: uint32_t min_port, uint32_t max_port) = 0; /// start the messenger - virtual seastar::future<> start(ChainedDispatchersRef) = 0; + virtual seastar::future<> start(const std::list&) = 0; + + seastar::future<> start(Dispatcher& dispatcher) { + std::list dispatchers; + dispatchers.push_back(&dispatcher); + return start(dispatchers); + } /// either return an existing connection to the peer, /// or a new pending connection @@ -94,11 +101,13 @@ public: // wait for messenger shutdown virtual seastar::future<> wait() = 0; - virtual void remove_dispatcher(Dispatcher&) = 0; + // stop dispatching events and messages + virtual void stop() = 0; + + virtual bool is_started() const = 0; - virtual bool dispatcher_chain_empty() const = 0; - /// stop listenening and wait for all connections to close. safe to destruct - /// after this future becomes available + // free internal resources before destruction, must be called after stopped, + // and must be called if is bound. virtual seastar::future<> shutdown() = 0; uint32_t get_crc_flags() const { diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index 6bbb8e7bff77c..541f227e45d20 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -7,7 +7,7 @@ #include "crimson/common/log.h" #include "crimson/net/Errors.h" -#include "crimson/net/Dispatcher.h" +#include "crimson/net/chained_dispatchers.h" #include "crimson/net/Socket.h" #include "crimson/net/SocketConnection.h" #include "msg/Message.h" @@ -21,10 +21,10 @@ namespace { namespace crimson::net { Protocol::Protocol(proto_t type, - ChainedDispatchersRef& dispatcher, + ChainedDispatchers& dispatchers, SocketConnection& conn) : proto_type(type), - dispatcher(dispatcher), + dispatchers(dispatchers), conn(conn), auth_meta{seastar::make_lw_shared()} {} @@ -73,7 +73,7 @@ void Protocol::close(bool dispatch_reset, auto gate_closed = gate.close(); if (dispatch_reset) { - dispatcher->ms_handle_reset( + dispatchers.ms_handle_reset( seastar::static_pointer_cast(conn.shared_from_this()), is_replace); } diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h index 3deb706acb4eb..dc4e4f2af8f33 100644 --- a/src/crimson/net/Protocol.h +++ b/src/crimson/net/Protocol.h @@ -50,7 +50,7 @@ class Protocol { virtual void print(std::ostream&) const = 0; protected: Protocol(proto_t type, - ChainedDispatchersRef& dispatcher, + ChainedDispatchers& dispatchers, SocketConnection& conn); virtual void trigger_close() = 0; @@ -71,7 +71,7 @@ class Protocol { SocketRef socket; protected: - ChainedDispatchersRef dispatcher; + ChainedDispatchers& dispatchers; SocketConnection &conn; AuthConnectionMetaRef auth_meta; diff --git a/src/crimson/net/ProtocolV1.cc b/src/crimson/net/ProtocolV1.cc index 9b6c59f892320..95afb61c1cc9e 100644 --- a/src/crimson/net/ProtocolV1.cc +++ b/src/crimson/net/ProtocolV1.cc @@ -15,7 +15,7 @@ #include "crimson/auth/AuthClient.h" #include "crimson/auth/AuthServer.h" #include "crimson/common/log.h" -#include "Dispatcher.h" +#include "chained_dispatchers.h" #include "Errors.h" #include "Socket.h" #include "SocketConnection.h" @@ -125,10 +125,10 @@ void discard_up_to(std::deque* queue, namespace crimson::net { -ProtocolV1::ProtocolV1(ChainedDispatchersRef& dispatcher, +ProtocolV1::ProtocolV1(ChainedDispatchers& dispatchers, SocketConnection& conn, SocketMessenger& messenger) - : Protocol(proto_t::v1, dispatcher, conn), messenger{messenger} {} + : Protocol(proto_t::v1, dispatchers, conn), messenger{messenger} {} ProtocolV1::~ProtocolV1() {} @@ -917,10 +917,10 @@ void ProtocolV1::execute_open(open_t type) set_write_state(write_state_t::open); if (type == open_t::connected) { - dispatcher->ms_handle_connect( + dispatchers.ms_handle_connect( seastar::static_pointer_cast(conn.shared_from_this())); } else { // type == open_t::accepted - dispatcher->ms_handle_accept( + dispatchers.ms_handle_accept( seastar::static_pointer_cast(conn.shared_from_this())); } diff --git a/src/crimson/net/ProtocolV1.h b/src/crimson/net/ProtocolV1.h index d7d642c572757..c71af598bcfc5 100644 --- a/src/crimson/net/ProtocolV1.h +++ b/src/crimson/net/ProtocolV1.h @@ -12,7 +12,7 @@ namespace crimson::net { class ProtocolV1 final : public Protocol { public: - ProtocolV1(ChainedDispatchersRef& dispatcher, + ProtocolV1(ChainedDispatchers& dispatchers, SocketConnection& conn, SocketMessenger& messenger); ~ProtocolV1() override; diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index de1d6f55f810c..194b6af486bc2 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -12,7 +12,7 @@ #include "crimson/auth/AuthServer.h" #include "crimson/common/formatter.h" -#include "Dispatcher.h" +#include "chained_dispatchers.h" #include "Errors.h" #include "Socket.h" #include "SocketConnection.h" @@ -143,10 +143,10 @@ seastar::future<> ProtocolV2::Timer::backoff(double seconds) }); } -ProtocolV2::ProtocolV2(ChainedDispatchersRef& dispatcher, +ProtocolV2::ProtocolV2(ChainedDispatchers& dispatchers, SocketConnection& conn, SocketMessenger& messenger) - : Protocol(proto_t::v2, dispatcher, conn), + : Protocol(proto_t::v2, dispatchers, conn), messenger{messenger}, protocol_timer{conn} {} @@ -385,7 +385,7 @@ void ProtocolV2::reset_session(bool full) client_cookie = generate_client_cookie(); peer_global_seq = 0; reset_write(); - dispatcher->ms_handle_remote_reset( + dispatchers.ms_handle_remote_reset( seastar::static_pointer_cast(conn.shared_from_this())); } } @@ -1601,7 +1601,7 @@ void ProtocolV2::execute_establishing( accept_me(); } - dispatcher->ms_handle_accept( + dispatchers.ms_handle_accept( seastar::static_pointer_cast(conn.shared_from_this())); gated_execute("execute_establishing", [this] { @@ -1699,7 +1699,7 @@ void ProtocolV2::trigger_replacing(bool reconnect, if (socket) { socket->shutdown(); } - dispatcher->ms_handle_accept( + dispatchers.ms_handle_accept( seastar::static_pointer_cast(conn.shared_from_this())); gate.dispatch_in_background("trigger_replacing", *this, [this, @@ -1942,7 +1942,7 @@ void ProtocolV2::execute_ready(bool dispatch_connect) assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0)); trigger_state(state_t::READY, write_state_t::open, false); if (dispatch_connect) { - dispatcher->ms_handle_connect( + dispatchers.ms_handle_connect( seastar::static_pointer_cast(conn.shared_from_this())); } #ifdef UNIT_TESTS_BUILT diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index d4672c4ce49e3..be9a228166875 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -13,7 +13,7 @@ namespace crimson::net { class ProtocolV2 final : public Protocol { public: - ProtocolV2(ChainedDispatchersRef& dispatcher, + ProtocolV2(ChainedDispatchers& dispatchers, SocketConnection& conn, SocketMessenger& messenger); ~ProtocolV2() override; diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index b0c7197eedb2f..623dca32f0b1c 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -26,14 +26,14 @@ using namespace crimson::net; using crimson::common::local_conf; SocketConnection::SocketConnection(SocketMessenger& messenger, - ChainedDispatchersRef& dispatcher, + ChainedDispatchers& dispatchers, bool is_msgr2) : messenger(messenger) { if (is_msgr2) { - protocol = std::make_unique(dispatcher, *this, messenger); + protocol = std::make_unique(dispatchers, *this, messenger); } else { - protocol = std::make_unique(dispatcher, *this, messenger); + protocol = std::make_unique(dispatchers, *this, messenger); } #ifdef UNIT_TESTS_BUILT if (messenger.interceptor) { diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index 0af08e0e4f28c..9c977c7cf66c3 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -18,13 +18,11 @@ #include "msg/Policy.h" #include "crimson/common/throttle.h" -#include "crimson/net/chained_dispatchers.h" #include "crimson/net/Connection.h" #include "crimson/net/Socket.h" namespace crimson::net { -class Dispatcher; class Protocol; class SocketMessenger; class SocketConnection; @@ -55,7 +53,7 @@ class SocketConnection : public Connection { public: SocketConnection(SocketMessenger& messenger, - ChainedDispatchersRef& dispatcher, + ChainedDispatchers& dispatchers, bool is_msgr2); ~SocketConnection() override; diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index 11914d71bd3d1..07a86bfdfa9a3 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -19,7 +19,6 @@ #include "auth/Auth.h" #include "Errors.h" -#include "Dispatcher.h" #include "Socket.h" namespace { @@ -111,10 +110,10 @@ SocketMessenger::try_bind(const entity_addrvec_t& addrs, }); } -seastar::future<> SocketMessenger::start(ChainedDispatchersRef chained_dispatchers) { +seastar::future<> SocketMessenger::start(const std::list& _dispatchers) { assert(seastar::this_shard_id() == master_sid); - dispatchers = chained_dispatchers; + dispatchers.assign(_dispatchers); if (listener) { // make sure we have already bound to a valid address ceph_assert(get_myaddr().is_legacy() || get_myaddr().is_msgr2()); @@ -154,9 +153,7 @@ seastar::future<> SocketMessenger::shutdown() { assert(seastar::this_shard_id() == master_sid); return seastar::futurize_invoke([this] { - if (dispatchers) { - assert(dispatchers->empty()); - } + assert(dispatchers.empty()); if (listener) { auto d_listener = listener; listener = nullptr; diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h index 90c38accf1086..33da5f1a4805b 100644 --- a/src/crimson/net/SocketMessenger.h +++ b/src/crimson/net/SocketMessenger.h @@ -14,9 +14,10 @@ #pragma once +#include #include -#include #include +#include #include #include #include @@ -35,15 +36,7 @@ class SocketMessenger final : public Messenger { seastar::promise<> shutdown_promise; FixedCPUServerSocket* listener = nullptr; - // as we want to unregister a dispatcher from the messengers when stopping - // that dispatcher, we have to use intrusive slist which, when used with - // "boost::intrusive::linear", can tolerate ongoing iteration of the - // list when removing an element. However, the downside of this is that an - // element can only be attached to one slist. So, as we need to make multiple - // messenger reference the same set of dispatchers, we have to make them share - // the same ChainedDispatchers, which means registering/unregistering an element - // to one messenger will affect other messengers that share the same ChainedDispatchers. - ChainedDispatchersRef dispatchers; + ChainedDispatchers dispatchers; std::map connections; std::set accepting_conns; std::vector closing_conns; @@ -73,7 +66,7 @@ class SocketMessenger final : public Messenger { seastar::future<> try_bind(const entity_addrvec_t& addr, uint32_t min_port, uint32_t max_port) override; - seastar::future<> start(ChainedDispatchersRef dispatchers) override; + seastar::future<> start(const std::list& dispatchers) override; ConnectionRef connect(const entity_addr_t& peer_addr, const entity_name_t& peer_name) override; @@ -83,12 +76,14 @@ class SocketMessenger final : public Messenger { return shutdown_promise.get_future(); } - void remove_dispatcher(Dispatcher& disp) override { - dispatchers->erase(disp); + void stop() override { + dispatchers.clear(); } - bool dispatcher_chain_empty() const override { - return !dispatchers || dispatchers->empty(); + + bool is_started() const override { + return !dispatchers.empty(); } + seastar::future<> shutdown() override; void print(ostream& out) const override { diff --git a/src/crimson/net/chained_dispatchers.cc b/src/crimson/net/chained_dispatchers.cc index 717a2db748ece..6aa9b93293135 100644 --- a/src/crimson/net/chained_dispatchers.cc +++ b/src/crimson/net/chained_dispatchers.cc @@ -1,6 +1,7 @@ #include "crimson/common/log.h" #include "crimson/net/chained_dispatchers.h" #include "crimson/net/Connection.h" +#include "crimson/net/Dispatcher.h" #include "msg/Message.h" namespace { @@ -9,12 +10,14 @@ namespace { } } +namespace crimson::net { + seastar::future<> ChainedDispatchers::ms_dispatch(crimson::net::Connection* conn, MessageRef m) { try { for (auto& dispatcher : dispatchers) { - auto [dispatched, throttle_future] = dispatcher.ms_dispatch(conn, m); + auto [dispatched, throttle_future] = dispatcher->ms_dispatch(conn, m); if (dispatched) { return std::move(throttle_future ).handle_exception([conn] (std::exception_ptr eptr) { @@ -40,7 +43,7 @@ void ChainedDispatchers::ms_handle_accept(crimson::net::ConnectionRef conn) { try { for (auto& dispatcher : dispatchers) { - dispatcher.ms_handle_accept(conn); + dispatcher->ms_handle_accept(conn); } } catch (...) { logger().error("{} got unexpected exception in ms_handle_accept() {}", @@ -53,7 +56,7 @@ void ChainedDispatchers::ms_handle_connect(crimson::net::ConnectionRef conn) { try { for(auto& dispatcher : dispatchers) { - dispatcher.ms_handle_connect(conn); + dispatcher->ms_handle_connect(conn); } } catch (...) { logger().error("{} got unexpected exception in ms_handle_connect() {}", @@ -66,7 +69,7 @@ void ChainedDispatchers::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) { try { for (auto& dispatcher : dispatchers) { - dispatcher.ms_handle_reset(conn, is_replace); + dispatcher->ms_handle_reset(conn, is_replace); } } catch (...) { logger().error("{} got unexpected exception in ms_handle_reset() {}", @@ -79,7 +82,7 @@ void ChainedDispatchers::ms_handle_remote_reset(crimson::net::ConnectionRef conn) { try { for (auto& dispatcher : dispatchers) { - dispatcher.ms_handle_remote_reset(conn); + dispatcher->ms_handle_remote_reset(conn); } } catch (...) { logger().error("{} got unexpected exception in ms_handle_remote_reset() {}", @@ -87,3 +90,5 @@ ChainedDispatchers::ms_handle_remote_reset(crimson::net::ConnectionRef conn) { ceph_abort(); } } + +} diff --git a/src/crimson/net/chained_dispatchers.h b/src/crimson/net/chained_dispatchers.h index 139a825e9445e..9df7b36f1f77a 100644 --- a/src/crimson/net/chained_dispatchers.h +++ b/src/crimson/net/chained_dispatchers.h @@ -3,27 +3,24 @@ #pragma once -#include +#include -#include "crimson/net/Dispatcher.h" +#include "Fwd.h" #include "crimson/common/log.h" -using crimson::net::Dispatcher; +namespace crimson::net { + +class Dispatcher; class ChainedDispatchers { - boost::intrusive::slist< - Dispatcher, - boost::intrusive::linear, - boost::intrusive::cache_last> dispatchers; public: - void push_front(Dispatcher& dispatcher) { - dispatchers.push_front(dispatcher); - } - void push_back(Dispatcher& dispatcher) { - dispatchers.push_back(dispatcher); + void assign(const std::list _dispatchers) { + assert(empty()); + assert(!_dispatchers.empty()); + dispatchers = _dispatchers; } - void erase(Dispatcher& dispatcher) { - dispatchers.erase(dispatchers.iterator_to(dispatcher)); + void clear() { + dispatchers.clear(); } bool empty() const { return dispatchers.empty(); @@ -33,6 +30,9 @@ public: void ms_handle_connect(crimson::net::ConnectionRef conn); void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace); void ms_handle_remote_reset(crimson::net::ConnectionRef conn); + + private: + std::list dispatchers; }; -using ChainedDispatchersRef = seastar::lw_shared_ptr; +} diff --git a/src/crimson/osd/heartbeat.cc b/src/crimson/osd/heartbeat.cc index 0f8b20768e46d..fcf1f30b4e77d 100644 --- a/src/crimson/osd/heartbeat.cc +++ b/src/crimson/osd/heartbeat.cc @@ -57,14 +57,10 @@ seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs, SocketPolicy::lossy_client(0)); back_msgr->set_policy(entity_name_t::TYPE_OSD, SocketPolicy::lossy_client(0)); - auto chained_dispatchers = seastar::make_lw_shared(); - chained_dispatchers->push_back(*this); return seastar::when_all_succeed(start_messenger(*front_msgr, - front_addrs, - chained_dispatchers), + front_addrs), start_messenger(*back_msgr, - back_addrs, - chained_dispatchers)) + back_addrs)) .then_unpack([this] { timer.arm_periodic( std::chrono::seconds(local_conf()->osd_heartbeat_interval)); @@ -73,14 +69,13 @@ seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs, seastar::future<> Heartbeat::start_messenger(crimson::net::Messenger& msgr, - const entity_addrvec_t& addrs, - ChainedDispatchersRef chained_dispatchers) + const entity_addrvec_t& addrs) { return msgr.try_bind(addrs, local_conf()->ms_bind_port_min, local_conf()->ms_bind_port_max) - .then([&msgr, chained_dispatchers]() mutable { - return msgr.start(chained_dispatchers); + .then([this, &msgr]() mutable { + return msgr.start(*this); }); } @@ -88,10 +83,8 @@ seastar::future<> Heartbeat::stop() { logger().info("{}", __func__); timer.cancel(); - if (!front_msgr->dispatcher_chain_empty()) - front_msgr->remove_dispatcher(*this); - if (!back_msgr->dispatcher_chain_empty()) - back_msgr->remove_dispatcher(*this); + front_msgr->stop(); + back_msgr->stop(); return gate.close().then([this] { return seastar::when_all_succeed(front_msgr->shutdown(), back_msgr->shutdown()); diff --git a/src/crimson/osd/heartbeat.h b/src/crimson/osd/heartbeat.h index 9d85b526ca253..3e875565696a9 100644 --- a/src/crimson/osd/heartbeat.h +++ b/src/crimson/osd/heartbeat.h @@ -6,7 +6,6 @@ #include #include #include "common/ceph_time.h" -#include "crimson/net/chained_dispatchers.h" #include "crimson/net/Dispatcher.h" #include "crimson/net/Fwd.h" @@ -71,8 +70,7 @@ private: void add_reporter_peers(int whoami); seastar::future<> start_messenger(crimson::net::Messenger& msgr, - const entity_addrvec_t& addrs, - ChainedDispatchersRef); + const entity_addrvec_t& addrs); private: const osd_id_t whoami; const crimson::osd::ShardServices& service; diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index c169ece16000b..eeb8915e668d4 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -265,22 +265,22 @@ seastar::future<> OSD::start() cluster_msgr->set_policy(entity_name_t::TYPE_CLIENT, SocketPolicy::stateless_server(0)); - auto chained_dispatchers = seastar::make_lw_shared(); - chained_dispatchers->push_front(*mgrc); - chained_dispatchers->push_front(*monc); - chained_dispatchers->push_front(*this); + std::list dispatchers; + dispatchers.push_front(mgrc.get()); + dispatchers.push_front(monc.get()); + dispatchers.push_front(this); return seastar::when_all_succeed( cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER), local_conf()->ms_bind_port_min, local_conf()->ms_bind_port_max) - .then([this, chained_dispatchers]() mutable { - return cluster_msgr->start(chained_dispatchers); + .then([this, dispatchers]() mutable { + return cluster_msgr->start(dispatchers); }), public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC), local_conf()->ms_bind_port_min, local_conf()->ms_bind_port_max) - .then([this, chained_dispatchers]() mutable { - return public_msgr->start(chained_dispatchers); + .then([this, dispatchers]() mutable { + return public_msgr->start(dispatchers); })); }).then_unpack([this] { return seastar::when_all_succeed(monc->start(), @@ -452,16 +452,8 @@ seastar::future<> OSD::stop() return prepare_to_stop().then([this] { state.set_stopping(); logger().debug("prepared to stop"); - if (!public_msgr->dispatcher_chain_empty()) { - public_msgr->remove_dispatcher(*this); - public_msgr->remove_dispatcher(*mgrc); - public_msgr->remove_dispatcher(*monc); - } - if (!cluster_msgr->dispatcher_chain_empty()) { - cluster_msgr->remove_dispatcher(*this); - cluster_msgr->remove_dispatcher(*mgrc); - cluster_msgr->remove_dispatcher(*monc); - } + public_msgr->stop(); + cluster_msgr->stop(); auto gate_close_fut = gate.close(); return asok->stop().then([this] { return heartbeat->stop(); diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index eac8be594279e..2f25fbe79a7d1 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -13,7 +13,6 @@ #include "crimson/common/type_helpers.h" #include "crimson/common/auth_handler.h" #include "crimson/common/gated.h" -#include "crimson/net/chained_dispatchers.h" #include "crimson/admin/admin_socket.h" #include "crimson/common/simple_lru.h" #include "crimson/common/shared_lru.h" diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index d50e082e16600..3691a68537c2c 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -67,15 +67,13 @@ static seastar::future<> test_echo(unsigned rounds, msgr->set_require_authorizer(false); msgr->set_auth_client(&dummy_auth); msgr->set_auth_server(&dummy_auth); - auto chained_dispatchers = seastar::make_lw_shared(); - chained_dispatchers->push_back(*this); - return msgr->bind(entity_addrvec_t{addr}).then([this, chained_dispatchers]() mutable { - return msgr->start(chained_dispatchers); + return msgr->bind(entity_addrvec_t{addr}).then([this] { + return msgr->start(*this); }); } seastar::future<> shutdown() { ceph_assert(msgr); - msgr->remove_dispatcher(*this); + msgr->stop(); return msgr->shutdown(); } }; @@ -140,14 +138,12 @@ static seastar::future<> test_echo(unsigned rounds, msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); msgr->set_auth_client(&dummy_auth); msgr->set_auth_server(&dummy_auth); - auto chained_dispatchers = seastar::make_lw_shared(); - chained_dispatchers->push_back(*this); - return msgr->start(chained_dispatchers); + return msgr->start(*this); } seastar::future<> shutdown() { ceph_assert(msgr); - msgr->remove_dispatcher(*this); + msgr->stop(); return msgr->shutdown(); } @@ -295,10 +291,8 @@ static seastar::future<> test_concurrent_dispatch(bool v2) msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); msgr->set_auth_client(&dummy_auth); msgr->set_auth_server(&dummy_auth); - auto chained_dispatchers = seastar::make_lw_shared(); - chained_dispatchers->push_back(*this); - return msgr->bind(entity_addrvec_t{addr}).then([this, chained_dispatchers]() mutable { - return msgr->start(chained_dispatchers); + return msgr->bind(entity_addrvec_t{addr}).then([this] { + return msgr->start(*this); }); } }; @@ -320,9 +314,7 @@ static seastar::future<> test_concurrent_dispatch(bool v2) msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); msgr->set_auth_client(&dummy_auth); msgr->set_auth_server(&dummy_auth); - auto chained_dispatchers = seastar::make_lw_shared(); - chained_dispatchers->push_back(*this); - return msgr->start(chained_dispatchers); + return msgr->start(*this); } }; }; @@ -352,11 +344,11 @@ static seastar::future<> test_concurrent_dispatch(bool v2) return server->wait(); }).finally([client] { logger().info("client shutdown..."); - client->msgr->remove_dispatcher(*client); + client->msgr->stop(); return client->msgr->shutdown(); }).finally([server] { logger().info("server shutdown..."); - server->msgr->remove_dispatcher(*server); + server->msgr->stop(); return server->msgr->shutdown(); }).finally([server, client] { logger().info("test_concurrent_dispatch() done!\n"); @@ -385,17 +377,15 @@ seastar::future<> test_preemptive_shutdown(bool v2) { msgr->set_default_policy(crimson::net::SocketPolicy::stateless_server(0)); msgr->set_auth_client(&dummy_auth); msgr->set_auth_server(&dummy_auth); - auto chained_dispatchers = seastar::make_lw_shared(); - chained_dispatchers->push_back(*this); - return msgr->bind(entity_addrvec_t{addr}).then([this, chained_dispatchers]() mutable { - return msgr->start(chained_dispatchers); + return msgr->bind(entity_addrvec_t{addr}).then([this] { + return msgr->start(*this); }); } entity_addr_t get_addr() const { return msgr->get_myaddr(); } seastar::future<> shutdown() { - msgr->remove_dispatcher(*this); + msgr->stop(); return msgr->shutdown(); } }; @@ -421,9 +411,7 @@ seastar::future<> test_preemptive_shutdown(bool v2) { msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); msgr->set_auth_client(&dummy_auth); msgr->set_auth_server(&dummy_auth); - auto chained_dispatchers = seastar::make_lw_shared(); - chained_dispatchers->push_back(*this); - return msgr->start(chained_dispatchers); + return msgr->start(*this); } void send_pings(const entity_addr_t& addr) { auto conn = msgr->connect(addr, entity_name_t::TYPE_OSD); @@ -440,7 +428,7 @@ seastar::future<> test_preemptive_shutdown(bool v2) { }); } seastar::future<> shutdown() { - msgr->remove_dispatcher(*this); + msgr->stop(); return msgr->shutdown().then([this] { stop_send = true; return stopped_send_promise.get_future(); @@ -918,10 +906,8 @@ class FailoverSuite : public Dispatcher { test_msgr->set_auth_client(&dummy_auth); test_msgr->set_auth_server(&dummy_auth); test_msgr->interceptor = &interceptor; - auto chained_dispatchers = seastar::make_lw_shared(); - chained_dispatchers->push_back(*this); - return test_msgr->bind(entity_addrvec_t{addr}).then([this, chained_dispatchers]() mutable { - return test_msgr->start(chained_dispatchers); + return test_msgr->bind(entity_addrvec_t{addr}).then([this] { + return test_msgr->start(*this); }); } @@ -1042,7 +1028,7 @@ class FailoverSuite : public Dispatcher { } seastar::future<> shutdown() { - test_msgr->remove_dispatcher(*this); + test_msgr->stop(); return test_msgr->shutdown(); } @@ -1266,9 +1252,7 @@ class FailoverTest : public Dispatcher { cmd_msgr->set_default_policy(SocketPolicy::lossy_client(0)); cmd_msgr->set_auth_client(&dummy_auth); cmd_msgr->set_auth_server(&dummy_auth); - auto chained_dispatchers = seastar::make_lw_shared(); - chained_dispatchers->push_back(*this); - return cmd_msgr->start(chained_dispatchers).then([this, cmd_peer_addr] { + return cmd_msgr->start(*this).then([this, cmd_peer_addr] { logger().info("CmdCli connect to CmdSrv({}) ...", cmd_peer_addr); cmd_conn = cmd_msgr->connect(cmd_peer_addr, entity_name_t::TYPE_OSD); return pingpong(); @@ -1291,7 +1275,7 @@ class FailoverTest : public Dispatcher { return cmd_conn->send(m).then([] { return seastar::sleep(200ms); }).finally([this] { - cmd_msgr->remove_dispatcher(*this); + cmd_msgr->stop(); return cmd_msgr->shutdown(); }); } @@ -1427,10 +1411,8 @@ class FailoverSuitePeer : public Dispatcher { peer_msgr->set_default_policy(policy); peer_msgr->set_auth_client(&dummy_auth); peer_msgr->set_auth_server(&dummy_auth); - auto chained_dispatchers = seastar::make_lw_shared(); - return peer_msgr->bind(entity_addrvec_t{addr}).then([this, chained_dispatchers]() mutable { - chained_dispatchers->push_back(*this); - return peer_msgr->start(chained_dispatchers); + return peer_msgr->bind(entity_addrvec_t{addr}).then([this] { + return peer_msgr->start(*this); }); } @@ -1462,7 +1444,7 @@ class FailoverSuitePeer : public Dispatcher { : peer_msgr(peer_msgr), op_callback(op_callback) { } seastar::future<> shutdown() { - peer_msgr->remove_dispatcher(*this); + peer_msgr->stop(); return peer_msgr->shutdown(); } @@ -1609,11 +1591,8 @@ class FailoverTestPeer : public Dispatcher { cmd_msgr->set_default_policy(SocketPolicy::stateless_server(0)); cmd_msgr->set_auth_client(&dummy_auth); cmd_msgr->set_auth_server(&dummy_auth); - auto chained_dispatchers = seastar::make_lw_shared(); - chained_dispatchers->push_back(*this); - return cmd_msgr->bind(entity_addrvec_t{cmd_peer_addr}).then( - [this, chained_dispatchers]() mutable { - return cmd_msgr->start(chained_dispatchers); + return cmd_msgr->bind(entity_addrvec_t{cmd_peer_addr}).then([this] { + return cmd_msgr->start(*this); }).handle_exception_type([cmd_peer_addr](const std::system_error& e) { if (e.code() == std::errc::address_in_use) { logger().error("FailoverTestPeer::init({}) " diff --git a/src/tools/crimson/perf_crimson_msgr.cc b/src/tools/crimson/perf_crimson_msgr.cc index 9e939a125fd78..48f82f776f0b8 100644 --- a/src/tools/crimson/perf_crimson_msgr.cc +++ b/src/tools/crimson/perf_crimson_msgr.cc @@ -182,9 +182,7 @@ static seastar::future<> run( msgr->set_crc_data(); } return msgr->bind(entity_addrvec_t{addr}).then([this] { - auto chained_dispatchers = seastar::make_lw_shared(); - chained_dispatchers->push_back(*this); - return msgr->start(chained_dispatchers); + return msgr->start(*this); }); }); } @@ -192,6 +190,7 @@ static seastar::future<> run( logger().info("{} shutdown...", lname); return seastar::smp::submit_to(msgr_sid, [this] { ceph_assert(msgr); + msgr->stop(); return msgr->shutdown(); }); } @@ -333,9 +332,7 @@ static seastar::future<> run( } seastar::future<> init(bool v1_crc_enabled) { - auto chained_dispatchers = seastar::make_lw_shared(); - chained_dispatchers->push_back(*this); - return container().invoke_on_all([v1_crc_enabled, chained_dispatchers] (auto& client) mutable { + return container().invoke_on_all([v1_crc_enabled] (auto& client) { if (client.is_active()) { client.msgr = crimson::net::Messenger::create(entity_name_t::OSD(client.sid), client.lname, client.sid); client.msgr->set_default_policy(crimson::net::SocketPolicy::lossy_client(0)); @@ -346,7 +343,7 @@ static seastar::future<> run( client.msgr->set_crc_header(); client.msgr->set_crc_data(); } - return client.msgr->start(chained_dispatchers); + return client.msgr->start(client); } return seastar::now(); }); @@ -357,6 +354,7 @@ static seastar::future<> run( if (client.is_active()) { logger().info("{} shutdown...", client.lname); ceph_assert(client.msgr); + client.msgr->stop(); return client.msgr->shutdown().then([&client] { return client.stop_dispatch_messages(); }); -- 2.39.5