]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg: add alternate statuses for ms_dispatch2 handling
authorPatrick Donnelly <pdonnell@ibm.com>
Tue, 18 Feb 2025 17:41:52 +0000 (12:41 -0500)
committerPatrick Donnelly <pdonnell@ibm.com>
Fri, 28 Feb 2025 00:55:47 +0000 (19:55 -0500)
Many dispatchers return false to allow other dispatchers also common messages
like MOSDMap or MFSMap. They implicitly depend on some dispatcher which is
always at the "tail" of the dispatcher queue to return "true" indicating the
msg was processed to avoid messages like:

    2025-02-18T05:31:17.738+0000 7f5206546640  0 ms_deliver_dispatch: unhandled message 0x5632d05f0700 fsmap(e 9) from mon.0 v2:172.21.3.230:40412/0

but this cannot always happen when some libraries like the RadosClient used standalone.

So, add a variant for encapsulating other indications for how the message was
processed by dispatch2.  For example, a message may be "acknowledged" but
explicitly allow other dispatchers to try processing the message.

Note: we're using a variant to avoid updating all of the ms_dispatch code to
use the sentinel classes.

Signed-off-by: Patrick Donnelly <pdonnell@ibm.com>
22 files changed:
src/client/Client.cc
src/client/Client.h
src/mds/Beacon.cc
src/mds/Beacon.h
src/mds/MDSDaemon.cc
src/mds/MDSDaemon.h
src/mds/MetricAggregator.cc
src/mds/MetricAggregator.h
src/mds/MetricsHandler.cc
src/mds/MetricsHandler.h
src/mgr/DaemonServer.cc
src/mgr/DaemonServer.h
src/mgr/MgrClient.cc
src/mgr/MgrClient.h
src/mgr/MgrStandby.cc
src/mgr/MgrStandby.h
src/msg/Dispatcher.h
src/msg/Messenger.h
src/nvmeof/NVMeofGwMonitorClient.cc
src/nvmeof/NVMeofGwMonitorClient.h
src/tools/cephfs_mirror/ClusterWatcher.cc
src/tools/cephfs_mirror/ClusterWatcher.h

index 28d1a0f3afde9096faf801df4faeb1eac0e5ad1e..f134f2e355d3718c70fb52c097fef4028308a788 100644 (file)
@@ -3080,7 +3080,7 @@ void Client::handle_osd_map(const MConstRef<MOSDMap>& m)
 // incoming messages
 
 
-bool Client::ms_dispatch2(const MessageRef &m)
+Dispatcher::dispatch_result_t Client::ms_dispatch2(const MessageRef &m)
 {
   RWRef_t iref_reader(initialize_state, CLIENT_INITIALIZED);
   if (!iref_reader.is_state_satisfied()) {
index eb3020fc0e55724f6313880696addc29042fccb0..ac600f434b28bbc1257ff73bd65c28f369261819 100644 (file)
@@ -1162,7 +1162,7 @@ protected:
 
   void dump_status(Formatter *f);  // debug
 
-  bool ms_dispatch2(const MessageRef& m) override;
+  Dispatcher::dispatch_result_t ms_dispatch2(const MessageRef& m) override;
 
   void ms_handle_connect(Connection *con) override;
   bool ms_handle_reset(Connection *con) override;
index 1c1eeb4ecf8a50ca82ff97ff163320fa5f5a7401..14d2a605c70c208a4c6b725a91972603cae4c842 100644 (file)
@@ -106,7 +106,7 @@ void Beacon::init(const MDSMap &mdsmap)
   });
 }
 
-bool Beacon::ms_dispatch2(const ref_t<Message>& m)
+Dispatcher::dispatch_result_t Beacon::ms_dispatch2(const ref_t<Message>& m)
 {
   dout(25) << __func__ << ": processing " << m << dendl;
   if (m->get_type() == MSG_MDS_BEACON) {
index fb7d1cffa2941fddabeb0ed161e5b74ceea279cb..3f0c336f2bc50c7b38cdab405e11c1d6ddab740f 100644 (file)
@@ -53,7 +53,7 @@ public:
   void init(const MDSMap &mdsmap);
   void shutdown();
 
-  bool ms_dispatch2(const ref_t<Message> &m) override;
+  Dispatcher::dispatch_result_t ms_dispatch2(const ref_t<Message> &m) override;
   void ms_handle_connect(Connection *c) override {}
   bool ms_handle_reset(Connection *c) override {return false;}
   void ms_handle_remote_reset(Connection *c) override {}
index 62cd5cc26d24d71a3ec53764f6cb0012a0950e2c..6187c254eae706f3cd9efa420c7ea2caf87816cd 100644 (file)
@@ -989,7 +989,7 @@ void MDSDaemon::respawn()
 
 
 
-bool MDSDaemon::ms_dispatch2(const ref_t<Message> &m)
+Dispatcher::dispatch_result_t MDSDaemon::ms_dispatch2(const ref_t<Message> &m)
 {
   dout(25) << __func__ << ": processing " << m << dendl;
   std::lock_guard l(mds_lock);
index c1999a32029c832ecf93232f3dfb0b6f922507c6..e7cb3151bfa163f8bc0059225edd6acbdb6e1646 100644 (file)
@@ -145,7 +145,7 @@ class MDSDaemon : public Dispatcher {
   class MDSSocketHook *asok_hook = nullptr;
 
  private:
-  bool ms_dispatch2(const ref_t<Message> &m) override;
+  Dispatcher::dispatch_result_t ms_dispatch2(const ref_t<Message> &m) override;
   bool ms_handle_fast_authentication(Connection *con) override;
   void ms_handle_accept(Connection *con) override;
   void ms_handle_connect(Connection *con) override;
index 6cbd9a094c01063cbdecdaf477627819fd49c30d..e8afb04d974708cc05fc0044cbc1ab8b48c8795f 100644 (file)
@@ -127,7 +127,7 @@ void MetricAggregator::shutdown() {
   }
 }
 
-bool MetricAggregator::ms_dispatch2(const ref_t<Message> &m) {
+Dispatcher::dispatch_result_t MetricAggregator::ms_dispatch2(const ref_t<Message> &m) {
   dout(25) << " processing " << m << dendl;
   if (m->get_type() == MSG_MDS_METRICS &&
       m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MDS) {
index 72c37217e624d6288db3f8c044fa07d398216365..677cf39829c57f5466b63910ef78da8b1c0d6705 100644 (file)
@@ -34,7 +34,7 @@ public:
 
   void notify_mdsmap(const MDSMap &mdsmap);
 
-  bool ms_dispatch2(const ref_t<Message> &m) override;
+  Dispatcher::dispatch_result_t ms_dispatch2(const ref_t<Message> &m) override;
 
   void ms_handle_connect(Connection *c) override {
   }
index d9c09e06b270d3144a3f970411ec768e92ad177a..20ac41c89f1e2ca3703fd1bd89c27ff920a2a25b 100644 (file)
@@ -20,7 +20,7 @@ MetricsHandler::MetricsHandler(CephContext *cct, MDSRank *mds)
     mds(mds) {
 }
 
-bool MetricsHandler::ms_dispatch2(const ref_t<Message> &m) {
+Dispatcher::dispatch_result_t MetricsHandler::ms_dispatch2(const ref_t<Message> &m) {
   if (m->get_type() == CEPH_MSG_CLIENT_METRICS &&
       m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_CLIENT) {
     handle_client_metrics(ref_cast<MClientMetrics>(m));
index 25ee208aa956226467da27e4294667589a7cfb01..9f6fc11b55f7e85ce17977c02893a9b10763a29e 100644 (file)
@@ -25,7 +25,7 @@ class MetricsHandler : public Dispatcher {
 public:
   MetricsHandler(CephContext *cct, MDSRank *mds);
 
-  bool ms_dispatch2(const ref_t<Message> &m) override;
+  Dispatcher::dispatch_result_t ms_dispatch2(const ref_t<Message> &m) override;
 
   void ms_handle_connect(Connection *c) override {
   }
index 5820b3f1abd55044a0ecb46c6be492369d570fda..a9427df8b56f3a4574af2d9c235f011aa6d44c09 100644 (file)
@@ -348,7 +348,7 @@ bool DaemonServer::ms_handle_refused(Connection *con)
   return false;
 }
 
-bool DaemonServer::ms_dispatch2(const ref_t<Message>& m)
+Dispatcher::dispatch_result_t DaemonServer::ms_dispatch2(const ref_t<Message>& m)
 {
   // Note that we do *not* take ::lock here, in order to avoid
   // serializing all message handling.  It's up to each handler
index ee0c26f94dea2cbe1422be843b0e52899a829334..7e87aba922fcc03f7fbcaf2e5a040ff7465f2f15 100644 (file)
@@ -275,7 +275,7 @@ public:
               LogChannelRef auditcl);
   ~DaemonServer() override;
 
-  bool ms_dispatch2(const ceph::ref_t<Message>& m) override;
+  Dispatcher::dispatch_result_t ms_dispatch2(const ceph::ref_t<Message>& m) override;
   bool ms_handle_fast_authentication(Connection *con) override;
   void ms_handle_accept(Connection *con) override;
   bool ms_handle_reset(Connection *con) override;
index 934d492e2866a9a5f84acf2d40f75977c3843459..01dbf2914eb4ddd5a0d8814a1e3c0eddaa19017d 100644 (file)
@@ -98,7 +98,7 @@ void MgrClient::shutdown()
   }
 }
 
-bool MgrClient::ms_dispatch2(const ref_t<Message>& m)
+Dispatcher::dispatch_result_t MgrClient::ms_dispatch2(const ref_t<Message>& m)
 {
   std::lock_guard l(lock);
 
index d8a2bfa623809f8a7185749d8fd85e9b7d652651..65b94b8de4d47f5da5769c9f6a1356e37b3af6e1 100644 (file)
@@ -116,7 +116,7 @@ public:
 
   void set_mgr_optional(bool optional_) {mgr_optional = optional_;}
 
-  bool ms_dispatch2(const ceph::ref_t<Message>& m) override;
+  Dispatcher::dispatch_result_t ms_dispatch2(const ceph::ref_t<Message>& m) override;
   bool ms_handle_reset(Connection *con) override;
   void ms_handle_remote_reset(Connection *con) override {}
   bool ms_handle_refused(Connection *con) override;
index 3fc14d9eb29d76fa362fd7b36b6edf18e6acee4c..cb2bd10f92dd02f1028d01490539f0f3ddcb9ee9 100644 (file)
@@ -402,7 +402,7 @@ void MgrStandby::handle_mgr_map(ref_t<MMgrMap> mmap)
   }
 }
 
-bool MgrStandby::ms_dispatch2(const ref_t<Message>& m)
+Dispatcher::dispatch_result_t MgrStandby::ms_dispatch2(const ref_t<Message>& m)
 {
   std::lock_guard l(lock);
   dout(10) << state_str() << " " << *m << dendl;
index 35ce43c0c808582057906565183882c2dbe1c9e0..ab5a8d05cbf02f9461f5c8deff1ba3c51263ac55 100644 (file)
@@ -71,7 +71,7 @@ public:
   MgrStandby(int argc, const char **argv);
   ~MgrStandby() override;
 
-  bool ms_dispatch2(const ceph::ref_t<Message>& m) override;
+  Dispatcher::dispatch_result_t ms_dispatch2(const ceph::ref_t<Message>& m) override;
   bool ms_handle_reset(Connection *con) override { return false; }
   void ms_handle_remote_reset(Connection *con) override {}
   bool ms_handle_refused(Connection *con) override;
index 8b7a65c795a7910099b0bc7a10f04d9eb5ee1b5e..c871a4543a012a20e37fdfd8b4fae85c7a971726 100644 (file)
@@ -22,6 +22,8 @@
 #include "include/common_fwd.h"
 #include "msg/MessageRef.h"
 
+#include <variant>
+
 class Messenger;
 class Connection;
 class CryptoKey;
@@ -124,7 +126,24 @@ public:
   }
 
   /* ms_dispatch2 because otherwise the child must define both */
-  virtual bool ms_dispatch2(const MessageRef &m) {
+  struct HANDLED {};
+  struct UNHANDLED {};
+  struct ACKNOWLEDGED {};
+  typedef std::variant<bool, HANDLED, UNHANDLED, ACKNOWLEDGED> dispatch_result_t;
+
+  static inline dispatch_result_t fold_dispatch_result(dispatch_result_t r) {
+    if (std::holds_alternative<bool>(r)) {
+      if (std::get<bool>(r)) {
+        return HANDLED();
+      } else {
+        return UNHANDLED();
+      }
+    } else {
+      return r;
+    }
+  }
+
+  virtual dispatch_result_t ms_dispatch2(const MessageRef &m) {
     /* allow old style dispatch handling that expects a Message * with a floating ref */
     MessageRef mr(m);
     if (ms_dispatch(mr.get())) {
index 078418fe715fce45674f2fb157f02cd8db9da2de..53ffab4e67ef650797647320b4d0ae5651865c69 100644 (file)
@@ -736,11 +736,17 @@ public:
    */
   void ms_deliver_dispatch(const ceph::ref_t<Message> &m) {
     m->set_dispatch_stamp(ceph_clock_now());
+    bool acked = false;
     for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) {
-      if (dispatcher->ms_dispatch2(m)) {
+      auto r = Dispatcher::fold_dispatch_result(dispatcher->ms_dispatch2(m));
+      if (std::holds_alternative<Dispatcher::HANDLED>(r)) {
         return;
+      } else if (std::holds_alternative<Dispatcher::ACKNOWLEDGED>(r)) {
+        acked = true;
       }
     }
+    if (acked)
+      return;
     lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from "
                         << m->get_source_inst() << dendl;
     ceph_assert(!cct->_conf->ms_die_on_unhandled_msg);
index 967ffc59711173ceea34771bb0c35c48cd9d4d0d..e8024d444a2629d77ce54781e341bd88f8d2f510 100644 (file)
@@ -428,7 +428,7 @@ void NVMeofGwMonitorClient::handle_nvmeof_gw_map(ceph::ref_t<MNVMeofGwMap> nmap)
   map = new_map;
 }
 
-bool NVMeofGwMonitorClient::ms_dispatch2(const ref_t<Message>& m)
+Dispatcher::dispatch_result_t NVMeofGwMonitorClient::ms_dispatch2(const ref_t<Message>& m)
 {
   std::lock_guard l(lock);
   dout(10) << "got map type " << m->get_type() << dendl;
index cfe1b8971f0bbde86c266728330ef685c8bb3328..d18a53512fae7b2c2a8d1b4cd424d6375972a512 100644 (file)
@@ -74,7 +74,7 @@ public:
   ~NVMeofGwMonitorClient() override;
 
   // Dispatcher interface
-  bool ms_dispatch2(const ceph::ref_t<Message>& m) override;
+  Dispatcher::dispatch_result_t ms_dispatch2(const ceph::ref_t<Message>& m) override;
   bool ms_handle_reset(Connection *con) override { return false; }
   void ms_handle_remote_reset(Connection *con) override {}
   bool ms_handle_refused(Connection *con) override { return false; };
index 8d53a1c725412e618ce81d8646222a3d260750c3..133133a9d1190866a4da2e6fd0c54356d5dfe5da 100644 (file)
@@ -31,7 +31,7 @@ ClusterWatcher::ClusterWatcher(CephContext *cct, MonClient *monc, ServiceDaemon
 ClusterWatcher::~ClusterWatcher() {
 }
 
-bool ClusterWatcher::ms_dispatch2(const ref_t<Message> &m) {
+Dispatcher::dispatch_result_t ClusterWatcher::ms_dispatch2(const ref_t<Message> &m) {
   if (m->get_type() == CEPH_MSG_FS_MAP) {
     if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
       handle_fsmap(ref_cast<MFSMap>(m));
index 312aeccd63c4ef5c01232682e43858f1f72cb929..5b9c3e85dc137c5ddd7145898d31e5f1e9d98075 100644 (file)
@@ -38,7 +38,7 @@ public:
                  Listener &listener);
   ~ClusterWatcher();
 
-  bool ms_dispatch2(const ref_t<Message> &m) override;
+  Dispatcher::dispatch_result_t ms_dispatch2(const ref_t<Message> &m) override;
 
   void ms_handle_connect(Connection *c) override {
   }