From 743305cedb725bf56ed2dbe6016b46213f167dc9 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Fri, 27 Nov 2020 12:53:24 +0800 Subject: [PATCH] crimson/net: return tuple> from ms_dispatch() Return a boolean to notify whether the Dispatcher has claimed the message. And fix all the ms_dispatch() to return future only for throttling purposes, which is currently always seastar::now(). Signed-off-by: Yingxin Cheng --- src/crimson/mgr/client.cc | 9 ++-- src/crimson/mgr/client.h | 4 +- src/crimson/mon/MonClient.cc | 7 ++- src/crimson/mon/MonClient.h | 4 +- src/crimson/net/Dispatcher.h | 10 +++- src/crimson/net/chained_dispatchers.cc | 24 ++++++--- src/crimson/net/chained_dispatchers.h | 7 --- src/crimson/osd/heartbeat.cc | 9 ++-- src/crimson/osd/heartbeat.h | 4 +- src/crimson/osd/osd.cc | 15 +++--- src/crimson/osd/osd.h | 2 +- src/test/crimson/test_messenger.cc | 75 ++++++++++++++------------ src/tools/crimson/perf_crimson_msgr.cc | 13 ++--- 13 files changed, 105 insertions(+), 78 deletions(-) diff --git a/src/crimson/mgr/client.cc b/src/crimson/mgr/client.cc index fcc8f8326070a..a98480ac8b519 100644 --- a/src/crimson/mgr/client.cc +++ b/src/crimson/mgr/client.cc @@ -47,19 +47,22 @@ seastar::future<> Client::stop() return fut; } -seastar::future<> Client::ms_dispatch(crimson::net::Connection* conn, - MessageRef m) +std::tuple> +Client::ms_dispatch(crimson::net::Connection* conn, MessageRef m) { - return gate.dispatch(__func__, *this, [this, conn, &m] { + bool dispatched = true; + gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] { switch(m->get_type()) { case MSG_MGR_MAP: return handle_mgr_map(conn, boost::static_pointer_cast(m)); case MSG_MGR_CONFIGURE: return handle_mgr_conf(conn, boost::static_pointer_cast(m)); default: + dispatched = false; return seastar::now(); } }); + return {dispatched, seastar::now()}; } void Client::ms_handle_connect(crimson::net::ConnectionRef c) diff --git a/src/crimson/mgr/client.h b/src/crimson/mgr/client.h index 19e4cd6ee25d4..78640a3509aba 100644 --- a/src/crimson/mgr/client.h +++ b/src/crimson/mgr/client.h @@ -37,8 +37,8 @@ public: void report(); private: - seastar::future<> ms_dispatch(crimson::net::Connection* conn, - Ref m) override; + std::tuple> ms_dispatch( + crimson::net::Connection* conn, Ref m) override; void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final; void ms_handle_connect(crimson::net::ConnectionRef conn) final; seastar::future<> handle_mgr_map(crimson::net::Connection* conn, diff --git a/src/crimson/mon/MonClient.cc b/src/crimson/mon/MonClient.cc index 9be91ce84972e..835bce01cae7c 100644 --- a/src/crimson/mon/MonClient.cc +++ b/src/crimson/mon/MonClient.cc @@ -518,10 +518,11 @@ bool Client::is_hunting() const { return !active_con; } -seastar::future<> +std::tuple> Client::ms_dispatch(crimson::net::Connection* conn, MessageRef m) { - return gate.dispatch(__func__, *this, [this, conn, &m] { + bool dispatched = true; + gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] { // we only care about these message types switch (m->get_type()) { case CEPH_MSG_MON_MAP: @@ -545,9 +546,11 @@ Client::ms_dispatch(crimson::net::Connection* conn, MessageRef m) return handle_config( boost::static_pointer_cast(m)); default: + dispatched = false; return seastar::now(); } }); + return {dispatched, seastar::now()}; } void Client::ms_handle_reset(crimson::net::ConnectionRef conn, bool /* is_replace */) diff --git a/src/crimson/mon/MonClient.h b/src/crimson/mon/MonClient.h index 0f651dd5d1769..bf5daa850ca9c 100644 --- a/src/crimson/mon/MonClient.h +++ b/src/crimson/mon/MonClient.h @@ -140,8 +140,8 @@ private: private: void tick(); - seastar::future<> ms_dispatch(crimson::net::Connection* conn, - MessageRef m) override; + std::tuple> ms_dispatch(crimson::net::Connection* conn, + MessageRef m) override; void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override; seastar::future<> handle_monmap(crimson::net::Connection* conn, diff --git a/src/crimson/net/Dispatcher.h b/src/crimson/net/Dispatcher.h index fd26d146166a7..7c39277f6b14e 100644 --- a/src/crimson/net/Dispatcher.h +++ b/src/crimson/net/Dispatcher.h @@ -31,9 +31,15 @@ class Dispatcher : public boost::intrusive::slist_base_hook< public: virtual ~Dispatcher() {} - virtual seastar::future<> ms_dispatch(Connection* conn, MessageRef m) { - return seastar::make_ready_future<>(); + // Dispatchers are put into a chain as described by chain-of-responsibility + // pattern. If any of the dispatchers claims this message, it returns true + // to prevent other dispatchers from processing it, and returns a future + // to throttle the connection if it's too busy. Else, it returns false and + // the second future is ignored. + virtual std::tuple> ms_dispatch(Connection* conn, MessageRef m) { + return {false, seastar::now<>()}; } + virtual void ms_handle_accept(ConnectionRef conn) {} virtual void ms_handle_connect(ConnectionRef conn) {} diff --git a/src/crimson/net/chained_dispatchers.cc b/src/crimson/net/chained_dispatchers.cc index 1977e1da4d747..717a2db748ece 100644 --- a/src/crimson/net/chained_dispatchers.cc +++ b/src/crimson/net/chained_dispatchers.cc @@ -13,19 +13,27 @@ seastar::future<> ChainedDispatchers::ms_dispatch(crimson::net::Connection* conn, MessageRef m) { try { - return seastar::do_for_each(dispatchers, [conn, m](Dispatcher& dispatcher) { - return dispatcher.ms_dispatch(conn, m); - }).handle_exception([conn] (std::exception_ptr eptr) { - logger().error("{} got unexpected exception in ms_dispatch() throttling {}", - *conn, eptr); - ceph_abort(); - }); + for (auto& dispatcher : dispatchers) { + auto [dispatched, throttle_future] = dispatcher.ms_dispatch(conn, m); + if (dispatched) { + return std::move(throttle_future + ).handle_exception([conn] (std::exception_ptr eptr) { + logger().error("{} got unexpected exception in ms_dispatch() throttling {}", + *conn, eptr); + ceph_abort(); + }); + } + assert(throttle_future.available()); + } } catch (...) { logger().error("{} got unexpected exception in ms_dispatch() {}", *conn, std::current_exception()); ceph_abort(); - return seastar::now(); } + if (!dispatchers.empty()) { + logger().error("ms_dispatch unhandled message {}", *m); + } + return seastar::now(); } void diff --git a/src/crimson/net/chained_dispatchers.h b/src/crimson/net/chained_dispatchers.h index a5facef2b96de..139a825e9445e 100644 --- a/src/crimson/net/chained_dispatchers.h +++ b/src/crimson/net/chained_dispatchers.h @@ -10,13 +10,6 @@ using crimson::net::Dispatcher; -// in existing Messenger, dispatchers are put into a chain as described by -// chain-of-responsibility pattern. we could do the same to stop processing -// the message once any of the dispatchers claims this message, and prevent -// other dispatchers from reading it. but this change is more involved as -// it requires changing the ms_ methods to return a bool. so as an intermediate -// solution, we are using an observer dispatcher to notify all the interested -// or unintersted parties. class ChainedDispatchers { boost::intrusive::slist< Dispatcher, diff --git a/src/crimson/osd/heartbeat.cc b/src/crimson/osd/heartbeat.cc index d8dc1e550e168..0f8b20768e46d 100644 --- a/src/crimson/osd/heartbeat.cc +++ b/src/crimson/osd/heartbeat.cc @@ -206,17 +206,20 @@ void Heartbeat::remove_peer(osd_id_t peer) peers.erase(peer); } -seastar::future<> Heartbeat::ms_dispatch(crimson::net::Connection* conn, - MessageRef m) +std::tuple> +Heartbeat::ms_dispatch(crimson::net::Connection* conn, MessageRef m) { - return gate.dispatch(__func__, *this, [this, conn, &m] { + bool dispatched = true; + gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] { switch (m->get_type()) { case MSG_OSD_PING: return handle_osd_ping(conn, boost::static_pointer_cast(m)); default: + dispatched = false; return seastar::now(); } }); + return {dispatched, seastar::now()}; } void Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) diff --git a/src/crimson/osd/heartbeat.h b/src/crimson/osd/heartbeat.h index b8d13ee356712..9d85b526ca253 100644 --- a/src/crimson/osd/heartbeat.h +++ b/src/crimson/osd/heartbeat.h @@ -48,8 +48,8 @@ public: void set_require_authorizer(bool); // Dispatcher methods - seastar::future<> ms_dispatch(crimson::net::Connection* conn, - MessageRef m) override; + std::tuple> ms_dispatch( + crimson::net::Connection* conn, MessageRef m) override; void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override; void ms_handle_connect(crimson::net::ConnectionRef conn) override; void ms_handle_accept(crimson::net::ConnectionRef conn) override; diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index 430d8ed2ebb0e..c169ece16000b 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -617,12 +617,14 @@ seastar::future> OSD::load_pg(spg_t pgid) }); } -seastar::future<> OSD::ms_dispatch(crimson::net::Connection* conn, MessageRef m) +std::tuple> +OSD::ms_dispatch(crimson::net::Connection* conn, MessageRef m) { - return gate.dispatch(__func__, *this, [this, conn, &m] { - if (state.is_stopping()) { - return seastar::now(); - } + if (state.is_stopping()) { + return {false, seastar::now()}; + } + bool dispatched = true; + gate.dispatch_in_background(__func__, *this, [this, conn, &m, &dispatched] { switch (m->get_type()) { case CEPH_MSG_OSD_MAP: return handle_osd_map(conn, boost::static_pointer_cast(m)); @@ -675,10 +677,11 @@ seastar::future<> OSD::ms_dispatch(crimson::net::Connection* conn, MessageRef m) case MSG_OSD_SCRUB2: return handle_scrub(conn, boost::static_pointer_cast(m)); default: - logger().info("ms_dispatch unhandled message {}", *m); + dispatched = false; return seastar::now(); } }); + return {dispatched, seastar::now()}; } void OSD::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index 72a892fb2ffbc..eac8be594279e 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -97,7 +97,7 @@ class OSD final : public crimson::net::Dispatcher, OSDSuperblock superblock; // Dispatcher methods - seastar::future<> ms_dispatch(crimson::net::Connection* conn, MessageRef m) final; + std::tuple> ms_dispatch(crimson::net::Connection*, MessageRef) final; void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) final; void ms_handle_remote_reset(crimson::net::ConnectionRef conn) final; diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index 429509c435235..74c6aeadb3845 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -48,13 +48,14 @@ static seastar::future<> test_echo(unsigned rounds, crimson::net::MessengerRef msgr; crimson::auth::DummyAuthClientServer dummy_auth; - seastar::future<> ms_dispatch(crimson::net::Connection* c, - MessageRef m) override { + std::tuple> ms_dispatch( + crimson::net::Connection* c, MessageRef m) override { if (verbose) { logger().info("server got {}", *m); } // reply with a pong - return c->send(make_message()); + std::ignore = c->send(make_message()); + return {true, seastar::now()}; } seastar::future<> init(const entity_name_t& name, @@ -114,8 +115,8 @@ static seastar::future<> test_echo(unsigned rounds, ceph_assert(added); session->connected_time = mono_clock::now(); } - seastar::future<> ms_dispatch(crimson::net::Connection* c, - MessageRef m) override { + std::tuple> ms_dispatch( + crimson::net::Connection* c, MessageRef m) override { auto session = find_session(c); ++(session->count); if (verbose) { @@ -129,7 +130,7 @@ static seastar::future<> test_echo(unsigned rounds, ceph_assert(found != pending_conns.end()); found->second.set_value(); } - return seastar::now(); + return {true, seastar::now()}; } seastar::future<> init(const entity_name_t& name, @@ -268,20 +269,20 @@ static seastar::future<> test_concurrent_dispatch(bool v2) seastar::promise<> on_done; // satisfied when first dispatch unblocks crimson::auth::DummyAuthClientServer dummy_auth; - seastar::future<> ms_dispatch(crimson::net::Connection* c, - MessageRef m) override { + std::tuple> ms_dispatch( + crimson::net::Connection* c, MessageRef m) override { switch (++count) { case 1: // block on the first request until we reenter with the second - return on_second.get_future().then([this] { - on_done.set_value(); - }); + std::ignore = on_second.get_future().then([this] { on_done.set_value(); }); + break; case 2: on_second.set_value(); - return seastar::now(); + break; default: throw std::runtime_error("unexpected count"); } + return {true, seastar::now()}; } seastar::future<> wait() { return on_done.get_future(); } @@ -364,9 +365,10 @@ seastar::future<> test_preemptive_shutdown(bool v2) { crimson::net::MessengerRef msgr; crimson::auth::DummyAuthClientServer dummy_auth; - seastar::future<> ms_dispatch(crimson::net::Connection* c, - MessageRef m) override { - return c->send(make_message()); + std::tuple> ms_dispatch( + crimson::net::Connection* c, MessageRef m) override { + std::ignore = c->send(make_message()); + return {true, seastar::now()}; } public: @@ -401,9 +403,9 @@ seastar::future<> test_preemptive_shutdown(bool v2) { bool stop_send = false; seastar::promise<> stopped_send_promise; - seastar::future<> ms_dispatch(crimson::net::Connection* c, - MessageRef m) override { - return seastar::now(); + std::tuple> ms_dispatch( + crimson::net::Connection* c, MessageRef m) override { + return {true, seastar::now()}; } public: @@ -798,7 +800,7 @@ class FailoverSuite : public Dispatcher { unsigned pending_peer_receive = 0; unsigned pending_receive = 0; - seastar::future<> ms_dispatch(Connection* c, MessageRef m) override { + std::tuple> ms_dispatch(Connection* c, MessageRef m) override { auto result = interceptor.find_result(c->shared_from_this()); if (result == nullptr) { logger().error("Untracked ms dispatched connection: {}", *c); @@ -820,7 +822,7 @@ class FailoverSuite : public Dispatcher { } logger().info("[Test] got op, left {} ops -- [{}] {}", pending_receive, result->index, *c); - return seastar::now(); + return {true, seastar::now()}; } void ms_handle_accept(ConnectionRef conn) override { @@ -1192,29 +1194,30 @@ class FailoverTest : public Dispatcher { std::unique_ptr test_suite; - seastar::future<> ms_dispatch(Connection* c, MessageRef m) override { + std::tuple> ms_dispatch(Connection* c, MessageRef m) override { switch (m->get_type()) { case CEPH_MSG_PING: ceph_assert(recv_pong); recv_pong->set_value(); recv_pong = std::nullopt; - return seastar::now(); + break; case MSG_COMMAND_REPLY: ceph_assert(recv_cmdreply); recv_cmdreply->set_value(); recv_cmdreply = std::nullopt; - return seastar::now(); + break; case MSG_COMMAND: { auto m_cmd = boost::static_pointer_cast(m); ceph_assert(static_cast(m_cmd->cmd[0][0]) == cmd_t::suite_recv_op); ceph_assert(test_suite); test_suite->notify_peer_reply(); - return seastar::now(); + break; } default: logger().error("{} got unexpected msg from cmd server: {}", *c, *m); ceph_abort(); } + return {true, seastar::now()}; } private: @@ -1391,11 +1394,12 @@ class FailoverSuitePeer : public Dispatcher { ConnectionRef tracked_conn; unsigned pending_send = 0; - seastar::future<> ms_dispatch(Connection* c, MessageRef m) override { + std::tuple> ms_dispatch(Connection* c, MessageRef m) override { logger().info("[TestPeer] got op from Test"); ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); ceph_assert(tracked_conn == c->shared_from_this()); - return op_callback(); + std::ignore = op_callback(); + return {true, seastar::now()}; } void ms_handle_accept(ConnectionRef conn) override { @@ -1518,29 +1522,32 @@ class FailoverTestPeer : public Dispatcher { const entity_addr_t test_peer_addr; std::unique_ptr test_suite; - seastar::future<> ms_dispatch(Connection* c, MessageRef m) override { + std::tuple> ms_dispatch(Connection* c, MessageRef m) override { ceph_assert(cmd_conn == c->shared_from_this()); switch (m->get_type()) { case CEPH_MSG_PING: - return c->send(make_message()); + std::ignore = c->send(make_message()); + break; case MSG_COMMAND: { auto m_cmd = boost::static_pointer_cast(m); auto cmd = static_cast(m_cmd->cmd[0][0]); if (cmd == cmd_t::shutdown) { logger().info("CmdSrv shutdown..."); // forwarded to FailoverTestPeer::wait() - cmd_msgr->remove_dispatcher(*this); - (void) cmd_msgr->shutdown(); - return seastar::now(); + cmd_msgr->stop(); + std::ignore = cmd_msgr->shutdown(); + } else { + std::ignore = handle_cmd(cmd, m_cmd).then([c] { + return c->send(make_message()); + }); } - return handle_cmd(cmd, m_cmd).then([c] { - return c->send(make_message()); - }); + break; } default: logger().error("{} got unexpected msg from cmd client: {}", *c, m); ceph_abort(); } + return {true, seastar::now()}; } void ms_handle_accept(ConnectionRef conn) override { diff --git a/src/tools/crimson/perf_crimson_msgr.cc b/src/tools/crimson/perf_crimson_msgr.cc index 1efef3a2f574c..9e939a125fd78 100644 --- a/src/tools/crimson/perf_crimson_msgr.cc +++ b/src/tools/crimson/perf_crimson_msgr.cc @@ -152,8 +152,8 @@ static seastar::future<> run( msg_data.append_zero(msg_len); } - seastar::future<> ms_dispatch(crimson::net::Connection* c, - MessageRef m) override { + std::tuple> ms_dispatch( + crimson::net::Connection* c, MessageRef m) override { ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); // server replies with MOSDOp to generate server-side write workload @@ -166,7 +166,8 @@ static seastar::future<> run( bufferlist data(msg_data); rep->write(0, msg_len, data); rep->set_tid(m->get_tid()); - return c->send(std::move(rep)); + std::ignore = c->send(std::move(rep)); + return {true, seastar::now()}; } seastar::future<> init(bool v1_crc_enabled, const entity_addr_t& addr) { @@ -302,8 +303,8 @@ static seastar::future<> run( void ms_handle_connect(crimson::net::ConnectionRef conn) override { conn_stats.connected_time = mono_clock::now(); } - seastar::future<> ms_dispatch(crimson::net::Connection* c, - MessageRef m) override { + std::tuple> ms_dispatch( + crimson::net::Connection* c, MessageRef m) override { // server replies with MOSDOp to generate server-side write workload ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); @@ -322,7 +323,7 @@ static seastar::future<> run( ++(conn_stats.received_count); depth.signal(1); - return seastar::now(); + return {true, seastar::now()}; } // should start messenger at this shard? -- 2.39.5