]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
msg: add priority to dispatcher invocation order
authorPatrick Donnelly <pdonnell@redhat.com>
Tue, 14 May 2024 18:15:21 +0000 (14:15 -0400)
committerPatrick Donnelly <pdonnell@redhat.com>
Thu, 23 May 2024 19:39:02 +0000 (15:39 -0400)
So we can ensure that e.g. MDSRank::ms_dispatch is lowest priority so that we
do not acquire the mds_lock when looking at beacons.

This change maintains the current behavior when the priority is unset: the use
of std::stable_sort will ensure that the add_dispatcher_head and
add_dispatcher_tail calls will preserve order when dispatcher priorities are
equal.

Fixes: 7fc04be9332704946ba6f0e95cfcd1afc34fc0fe
Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
(cherry picked from commit b463d93b08f392ebd636c24bf5f0fa4249600256)

src/msg/Dispatcher.h
src/msg/Messenger.h

index 885f1843b31c417c35f766fbecbb390f4ef0510c..b04ba36148572ab52553c45d02ce110f63ccf6e2 100644 (file)
@@ -29,6 +29,12 @@ class KeyStore;
 
 class Dispatcher {
 public:
+  /* Ordering of dispatch for a list of Dispatchers. */
+  using priority_t = uint32_t;
+  static constexpr priority_t PRIORITY_HIGH = std::numeric_limits<priority_t>::max() / 4;
+  static constexpr priority_t PRIORITY_DEFAULT = std::numeric_limits<priority_t>::max() / 2;
+  static constexpr priority_t PRIORITY_LOW = (std::numeric_limits<priority_t>::max() / 4) * 3;
+
   explicit Dispatcher(CephContext *cct_)
     : cct(cct_)
   {
index 71b7d2549c51f12e9618356da53555968fbbdca3..9f767a788aa04224b0fa9117e1fcf9c6be5f00f3 100644 (file)
@@ -17,9 +17,9 @@
 #ifndef CEPH_MESSENGER_H
 #define CEPH_MESSENGER_H
 
-#include <deque>
 #include <map>
 #include <optional>
+#include <vector>
 
 #include <errno.h>
 #include <sstream>
@@ -92,8 +92,18 @@ struct Interceptor {
 
 class Messenger {
 private:
-  std::deque<Dispatcher*> dispatchers;
-  std::deque<Dispatcher*> fast_dispatchers;
+  struct PriorityDispatcher {
+    using priority_t = Dispatcher::priority_t;
+    priority_t priority;
+    Dispatcher* dispatcher;
+
+    bool operator<(const PriorityDispatcher& other) const {
+      return priority < other.priority;
+    }
+  };
+  std::vector<PriorityDispatcher> dispatchers;
+  std::vector<PriorityDispatcher> fast_dispatchers;
+
   ZTracer::Endpoint trace_endpoint;
 
 protected:
@@ -390,11 +400,14 @@ public:
    *
    * @param d The Dispatcher to insert into the list.
    */
-  void add_dispatcher_head(Dispatcher *d) {
+  void add_dispatcher_head(Dispatcher *d, PriorityDispatcher::priority_t priority=Dispatcher::PRIORITY_DEFAULT) {
     bool first = dispatchers.empty();
-    dispatchers.push_front(d);
-    if (d->ms_can_fast_dispatch_any())
-      fast_dispatchers.push_front(d);
+    dispatchers.insert(dispatchers.begin(), PriorityDispatcher{priority, d});
+    std::stable_sort(dispatchers.begin(), dispatchers.end());
+    if (d->ms_can_fast_dispatch_any()) {
+      fast_dispatchers.insert(fast_dispatchers.begin(), PriorityDispatcher{priority, d});
+      std::stable_sort(fast_dispatchers.begin(), fast_dispatchers.end());
+    }
     if (first)
       ready();
   }
@@ -405,11 +418,14 @@ public:
    *
    * @param d The Dispatcher to insert into the list.
    */
-  void add_dispatcher_tail(Dispatcher *d) {
+  void add_dispatcher_tail(Dispatcher *d, PriorityDispatcher::priority_t priority=Dispatcher::PRIORITY_DEFAULT) {
     bool first = dispatchers.empty();
-    dispatchers.push_back(d);
-    if (d->ms_can_fast_dispatch_any())
-      fast_dispatchers.push_back(d);
+    dispatchers.push_back(PriorityDispatcher{priority, d});
+    std::stable_sort(dispatchers.begin(), dispatchers.end());
+    if (d->ms_can_fast_dispatch_any()) {
+      fast_dispatchers.push_back(PriorityDispatcher{priority, d});
+      std::stable_sort(fast_dispatchers.begin(), fast_dispatchers.end());
+    }
     if (first)
       ready();
   }
@@ -668,9 +684,10 @@ public:
    * @param m The Message we are testing.
    */
   bool ms_can_fast_dispatch(const ceph::cref_t<Message>& m) {
-    for (const auto &dispatcher : fast_dispatchers) {
-      if (dispatcher->ms_can_fast_dispatch2(m))
-       return true;
+    for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) {
+      if (dispatcher->ms_can_fast_dispatch2(m)) {
+        return true;
+      }
     }
     return false;
   }
@@ -683,10 +700,10 @@ public:
    */
   void ms_fast_dispatch(const ceph::ref_t<Message> &m) {
     m->set_dispatch_stamp(ceph_clock_now());
-    for (const auto &dispatcher : fast_dispatchers) {
+    for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) {
       if (dispatcher->ms_can_fast_dispatch2(m)) {
-       dispatcher->ms_fast_dispatch2(m);
-       return;
+        dispatcher->ms_fast_dispatch2(m);
+        return;
       }
     }
     ceph_abort();
@@ -698,7 +715,7 @@ public:
    *
    */
   void ms_fast_preprocess(const ceph::ref_t<Message> &m) {
-    for (const auto &dispatcher : fast_dispatchers) {
+    for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) {
       dispatcher->ms_fast_preprocess2(m);
     }
   }
@@ -711,9 +728,10 @@ public:
    */
   void ms_deliver_dispatch(const ceph::ref_t<Message> &m) {
     m->set_dispatch_stamp(ceph_clock_now());
-    for (const auto &dispatcher : dispatchers) {
-      if (dispatcher->ms_dispatch2(m))
-       return;
+    for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) {
+      if (dispatcher->ms_dispatch2(m)) {
+        return;
+      }
     }
     lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from "
                         << m->get_source_inst() << dendl;
@@ -730,7 +748,7 @@ public:
    * @param con Pointer to the new Connection.
    */
   void ms_deliver_handle_connect(Connection *con) {
-    for (const auto& dispatcher : dispatchers) {
+    for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) {
       dispatcher->ms_handle_connect(con);
     }
   }
@@ -743,7 +761,7 @@ public:
    * @param con Pointer to the new Connection.
    */
   void ms_deliver_handle_fast_connect(Connection *con) {
-    for (const auto& dispatcher : fast_dispatchers) {
+    for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) {
       dispatcher->ms_handle_fast_connect(con);
     }
   }
@@ -755,7 +773,7 @@ public:
    * @param con Pointer to the new Connection.
    */
   void ms_deliver_handle_accept(Connection *con) {
-    for (const auto& dispatcher : dispatchers) {
+    for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) {
       dispatcher->ms_handle_accept(con);
     }
   }
@@ -767,7 +785,7 @@ public:
    * @param con Pointer to the new Connection.
    */
   void ms_deliver_handle_fast_accept(Connection *con) {
-    for (const auto& dispatcher : fast_dispatchers) {
+    for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) {
       dispatcher->ms_handle_fast_accept(con);
     }
   }
@@ -780,9 +798,10 @@ public:
    * @param con Pointer to the broken Connection.
    */
   void ms_deliver_handle_reset(Connection *con) {
-    for (const auto& dispatcher : dispatchers) {
-      if (dispatcher->ms_handle_reset(con))
-       return;
+    for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) {
+      if (dispatcher->ms_handle_reset(con)) {
+        return;
+      }
     }
   }
   /**
@@ -793,7 +812,7 @@ public:
    * @param con Pointer to the broken Connection.
    */
   void ms_deliver_handle_remote_reset(Connection *con) {
-    for (const auto& dispatcher : dispatchers) {
+    for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) {
       dispatcher->ms_handle_remote_reset(con);
     }
   }
@@ -807,9 +826,10 @@ public:
    * @param con Pointer to the broken Connection.
    */
   void ms_deliver_handle_refused(Connection *con) {
-    for (const auto& dispatcher : dispatchers) {
-      if (dispatcher->ms_handle_refused(con))
+    for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) {
+      if (dispatcher->ms_handle_refused(con)) {
         return;
+      }
     }
   }