From 7607633db942f79bb2d489ac207b452f41206d41 Mon Sep 17 00:00:00 2001 From: Patrick Donnelly Date: Tue, 18 Feb 2025 12:41:52 -0500 Subject: [PATCH] msg: add alternate statuses for ms_dispatch2 handling Many dispatchers return false to allow other dispatchers also common messages like MOSDMap or MFSMap. They implicitly depend on some dispatcher which is always at the "tail" of the dispatcher queue to return "true" indicating the msg was processed to avoid messages like: 2025-02-18T05:31:17.738+0000 7f5206546640 0 ms_deliver_dispatch: unhandled message 0x5632d05f0700 fsmap(e 9) from mon.0 v2:172.21.3.230:40412/0 but this cannot always happen when some libraries like the RadosClient used standalone. So, add a variant for encapsulating other indications for how the message was processed by dispatch2. For example, a message may be "acknowledged" but explicitly allow other dispatchers to try processing the message. Note: we're using a variant to avoid updating all of the ms_dispatch code to use the sentinel classes. Signed-off-by: Patrick Donnelly (cherry picked from commit c9d0913f53b0c4d632746075c05c2f2187d77c02) Conflicts: src/nvmeof/NVMeofGwMonitorClient.cc: not present in squid src/nvmeof/NVMeofGwMonitorClient.h: not present in squid --- src/client/Client.cc | 2 +- src/client/Client.h | 2 +- src/mds/Beacon.cc | 2 +- src/mds/Beacon.h | 2 +- src/mds/MDSDaemon.cc | 2 +- src/mds/MDSDaemon.h | 2 +- src/mds/MetricAggregator.cc | 2 +- src/mds/MetricAggregator.h | 2 +- src/mds/MetricsHandler.cc | 2 +- src/mds/MetricsHandler.h | 2 +- src/mgr/DaemonServer.cc | 2 +- src/mgr/DaemonServer.h | 2 +- src/mgr/MgrClient.cc | 2 +- src/mgr/MgrClient.h | 2 +- src/mgr/MgrStandby.cc | 2 +- src/mgr/MgrStandby.h | 2 +- src/msg/Dispatcher.h | 21 ++++++++++++++++++++- src/msg/Messenger.h | 8 +++++++- src/tools/cephfs_mirror/ClusterWatcher.cc | 2 +- src/tools/cephfs_mirror/ClusterWatcher.h | 2 +- 20 files changed, 45 insertions(+), 20 deletions(-) diff --git a/src/client/Client.cc b/src/client/Client.cc index 9aca9e9d4b299..f62136f8d3bc3 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -3065,7 +3065,7 @@ void Client::handle_osd_map(const MConstRef& m) // incoming messages -bool Client::ms_dispatch2(const MessageRef &m) +Dispatcher::dispatch_result_t Client::ms_dispatch2(const MessageRef &m) { RWRef_t iref_reader(initialize_state, CLIENT_INITIALIZED); if (!iref_reader.is_state_satisfied()) { diff --git a/src/client/Client.h b/src/client/Client.h index 19cc7a81d6585..76b9a1d8ea4ec 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -1160,7 +1160,7 @@ protected: void dump_status(Formatter *f); // debug - bool ms_dispatch2(const MessageRef& m) override; + Dispatcher::dispatch_result_t ms_dispatch2(const MessageRef& m) override; void ms_handle_connect(Connection *con) override; bool ms_handle_reset(Connection *con) override; diff --git a/src/mds/Beacon.cc b/src/mds/Beacon.cc index 4673a010b43df..01d016130960b 100644 --- a/src/mds/Beacon.cc +++ b/src/mds/Beacon.cc @@ -103,7 +103,7 @@ void Beacon::init(const MDSMap &mdsmap) }); } -bool Beacon::ms_dispatch2(const ref_t& m) +Dispatcher::dispatch_result_t Beacon::ms_dispatch2(const ref_t& m) { dout(25) << __func__ << ": processing " << m << dendl; if (m->get_type() == MSG_MDS_BEACON) { diff --git a/src/mds/Beacon.h b/src/mds/Beacon.h index fb7d1cffa2941..3f0c336f2bc50 100644 --- a/src/mds/Beacon.h +++ b/src/mds/Beacon.h @@ -53,7 +53,7 @@ public: void init(const MDSMap &mdsmap); void shutdown(); - bool ms_dispatch2(const ref_t &m) override; + Dispatcher::dispatch_result_t ms_dispatch2(const ref_t &m) override; void ms_handle_connect(Connection *c) override {} bool ms_handle_reset(Connection *c) override {return false;} void ms_handle_remote_reset(Connection *c) override {} diff --git a/src/mds/MDSDaemon.cc b/src/mds/MDSDaemon.cc index 6e54b4b9a3ad4..60fb3a8742297 100644 --- a/src/mds/MDSDaemon.cc +++ b/src/mds/MDSDaemon.cc @@ -984,7 +984,7 @@ void MDSDaemon::respawn() -bool MDSDaemon::ms_dispatch2(const ref_t &m) +Dispatcher::dispatch_result_t MDSDaemon::ms_dispatch2(const ref_t &m) { dout(25) << __func__ << ": processing " << m << dendl; std::lock_guard l(mds_lock); diff --git a/src/mds/MDSDaemon.h b/src/mds/MDSDaemon.h index c1999a32029c8..e7cb3151bfa16 100644 --- a/src/mds/MDSDaemon.h +++ b/src/mds/MDSDaemon.h @@ -145,7 +145,7 @@ class MDSDaemon : public Dispatcher { class MDSSocketHook *asok_hook = nullptr; private: - bool ms_dispatch2(const ref_t &m) override; + Dispatcher::dispatch_result_t ms_dispatch2(const ref_t &m) override; bool ms_handle_fast_authentication(Connection *con) override; void ms_handle_accept(Connection *con) override; void ms_handle_connect(Connection *con) override; diff --git a/src/mds/MetricAggregator.cc b/src/mds/MetricAggregator.cc index 6cdc3ab193e98..d854d37dde681 100644 --- a/src/mds/MetricAggregator.cc +++ b/src/mds/MetricAggregator.cc @@ -126,7 +126,7 @@ void MetricAggregator::shutdown() { } } -bool MetricAggregator::ms_dispatch2(const ref_t &m) { +Dispatcher::dispatch_result_t MetricAggregator::ms_dispatch2(const ref_t &m) { dout(25) << " processing " << m << dendl; if (m->get_type() == MSG_MDS_METRICS && m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MDS) { diff --git a/src/mds/MetricAggregator.h b/src/mds/MetricAggregator.h index 72c37217e624d..677cf39829c57 100644 --- a/src/mds/MetricAggregator.h +++ b/src/mds/MetricAggregator.h @@ -34,7 +34,7 @@ public: void notify_mdsmap(const MDSMap &mdsmap); - bool ms_dispatch2(const ref_t &m) override; + Dispatcher::dispatch_result_t ms_dispatch2(const ref_t &m) override; void ms_handle_connect(Connection *c) override { } diff --git a/src/mds/MetricsHandler.cc b/src/mds/MetricsHandler.cc index 833d7ef19984c..6b7484ea9e95c 100644 --- a/src/mds/MetricsHandler.cc +++ b/src/mds/MetricsHandler.cc @@ -20,7 +20,7 @@ MetricsHandler::MetricsHandler(CephContext *cct, MDSRank *mds) mds(mds) { } -bool MetricsHandler::ms_dispatch2(const ref_t &m) { +Dispatcher::dispatch_result_t MetricsHandler::ms_dispatch2(const ref_t &m) { if (m->get_type() == CEPH_MSG_CLIENT_METRICS && m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_CLIENT) { handle_client_metrics(ref_cast(m)); diff --git a/src/mds/MetricsHandler.h b/src/mds/MetricsHandler.h index 25ee208aa9562..9f6fc11b55f7e 100644 --- a/src/mds/MetricsHandler.h +++ b/src/mds/MetricsHandler.h @@ -25,7 +25,7 @@ class MetricsHandler : public Dispatcher { public: MetricsHandler(CephContext *cct, MDSRank *mds); - bool ms_dispatch2(const ref_t &m) override; + Dispatcher::dispatch_result_t ms_dispatch2(const ref_t &m) override; void ms_handle_connect(Connection *c) override { } diff --git a/src/mgr/DaemonServer.cc b/src/mgr/DaemonServer.cc index 60c0637d551a0..34e18b0bf74de 100644 --- a/src/mgr/DaemonServer.cc +++ b/src/mgr/DaemonServer.cc @@ -339,7 +339,7 @@ bool DaemonServer::ms_handle_refused(Connection *con) return false; } -bool DaemonServer::ms_dispatch2(const ref_t& m) +Dispatcher::dispatch_result_t DaemonServer::ms_dispatch2(const ref_t& m) { // Note that we do *not* take ::lock here, in order to avoid // serializing all message handling. It's up to each handler diff --git a/src/mgr/DaemonServer.h b/src/mgr/DaemonServer.h index c1c92883f3d5b..719b44634b652 100644 --- a/src/mgr/DaemonServer.h +++ b/src/mgr/DaemonServer.h @@ -273,7 +273,7 @@ public: LogChannelRef auditcl); ~DaemonServer() override; - bool ms_dispatch2(const ceph::ref_t& m) override; + Dispatcher::dispatch_result_t ms_dispatch2(const ceph::ref_t& m) override; bool ms_handle_fast_authentication(Connection *con) override; void ms_handle_accept(Connection *con) override; bool ms_handle_reset(Connection *con) override; diff --git a/src/mgr/MgrClient.cc b/src/mgr/MgrClient.cc index 6250ea3b9f18e..291ca5c5531cc 100644 --- a/src/mgr/MgrClient.cc +++ b/src/mgr/MgrClient.cc @@ -97,7 +97,7 @@ void MgrClient::shutdown() } } -bool MgrClient::ms_dispatch2(const ref_t& m) +Dispatcher::dispatch_result_t MgrClient::ms_dispatch2(const ref_t& m) { std::lock_guard l(lock); diff --git a/src/mgr/MgrClient.h b/src/mgr/MgrClient.h index 1f9bb397fbec1..8f7feaac7dd57 100644 --- a/src/mgr/MgrClient.h +++ b/src/mgr/MgrClient.h @@ -120,7 +120,7 @@ public: void set_mgr_optional(bool optional_) {mgr_optional = optional_;} - bool ms_dispatch2(const ceph::ref_t& m) override; + Dispatcher::dispatch_result_t ms_dispatch2(const ceph::ref_t& m) override; bool ms_handle_reset(Connection *con) override; void ms_handle_remote_reset(Connection *con) override {} bool ms_handle_refused(Connection *con) override; diff --git a/src/mgr/MgrStandby.cc b/src/mgr/MgrStandby.cc index 3fc14d9eb29d7..cb2bd10f92dd0 100644 --- a/src/mgr/MgrStandby.cc +++ b/src/mgr/MgrStandby.cc @@ -402,7 +402,7 @@ void MgrStandby::handle_mgr_map(ref_t mmap) } } -bool MgrStandby::ms_dispatch2(const ref_t& m) +Dispatcher::dispatch_result_t MgrStandby::ms_dispatch2(const ref_t& m) { std::lock_guard l(lock); dout(10) << state_str() << " " << *m << dendl; diff --git a/src/mgr/MgrStandby.h b/src/mgr/MgrStandby.h index 35ce43c0c8085..ab5a8d05cbf02 100644 --- a/src/mgr/MgrStandby.h +++ b/src/mgr/MgrStandby.h @@ -71,7 +71,7 @@ public: MgrStandby(int argc, const char **argv); ~MgrStandby() override; - bool ms_dispatch2(const ceph::ref_t& m) override; + Dispatcher::dispatch_result_t ms_dispatch2(const ceph::ref_t& m) override; bool ms_handle_reset(Connection *con) override { return false; } void ms_handle_remote_reset(Connection *con) override {} bool ms_handle_refused(Connection *con) override; diff --git a/src/msg/Dispatcher.h b/src/msg/Dispatcher.h index 8b3c80542ac6c..312336a79e629 100644 --- a/src/msg/Dispatcher.h +++ b/src/msg/Dispatcher.h @@ -22,6 +22,8 @@ #include "include/common_fwd.h" #include "msg/MessageRef.h" +#include + class Messenger; class Connection; class CryptoKey; @@ -124,7 +126,24 @@ public: } /* ms_dispatch2 because otherwise the child must define both */ - virtual bool ms_dispatch2(const MessageRef &m) { + struct HANDLED {}; + struct UNHANDLED {}; + struct ACKNOWLEDGED {}; + typedef std::variant dispatch_result_t; + + static inline dispatch_result_t fold_dispatch_result(dispatch_result_t r) { + if (std::holds_alternative(r)) { + if (std::get(r)) { + return HANDLED(); + } else { + return UNHANDLED(); + } + } else { + return r; + } + } + + virtual dispatch_result_t ms_dispatch2(const MessageRef &m) { /* allow old style dispatch handling that expects a Message * with a floating ref */ MessageRef mr(m); if (ms_dispatch(mr.get())) { diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index 078418fe715fc..53ffab4e67ef6 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -736,11 +736,17 @@ public: */ void ms_deliver_dispatch(const ceph::ref_t &m) { m->set_dispatch_stamp(ceph_clock_now()); + bool acked = false; for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) { - if (dispatcher->ms_dispatch2(m)) { + auto r = Dispatcher::fold_dispatch_result(dispatcher->ms_dispatch2(m)); + if (std::holds_alternative(r)) { return; + } else if (std::holds_alternative(r)) { + acked = true; } } + if (acked) + return; lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from " << m->get_source_inst() << dendl; ceph_assert(!cct->_conf->ms_die_on_unhandled_msg); diff --git a/src/tools/cephfs_mirror/ClusterWatcher.cc b/src/tools/cephfs_mirror/ClusterWatcher.cc index 8d53a1c725412..133133a9d1190 100644 --- a/src/tools/cephfs_mirror/ClusterWatcher.cc +++ b/src/tools/cephfs_mirror/ClusterWatcher.cc @@ -31,7 +31,7 @@ ClusterWatcher::ClusterWatcher(CephContext *cct, MonClient *monc, ServiceDaemon ClusterWatcher::~ClusterWatcher() { } -bool ClusterWatcher::ms_dispatch2(const ref_t &m) { +Dispatcher::dispatch_result_t ClusterWatcher::ms_dispatch2(const ref_t &m) { if (m->get_type() == CEPH_MSG_FS_MAP) { if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MON) { handle_fsmap(ref_cast(m)); diff --git a/src/tools/cephfs_mirror/ClusterWatcher.h b/src/tools/cephfs_mirror/ClusterWatcher.h index 312aeccd63c4e..5b9c3e85dc137 100644 --- a/src/tools/cephfs_mirror/ClusterWatcher.h +++ b/src/tools/cephfs_mirror/ClusterWatcher.h @@ -38,7 +38,7 @@ public: Listener &listener); ~ClusterWatcher(); - bool ms_dispatch2(const ref_t &m) override; + Dispatcher::dispatch_result_t ms_dispatch2(const ref_t &m) override; void ms_handle_connect(Connection *c) override { } -- 2.39.5