From 90a77bcebec1b351c9ae5a36ce1e34caf89caa7b Mon Sep 17 00:00:00 2001 From: Patrick Donnelly Date: Tue, 14 May 2024 14:15:21 -0400 Subject: [PATCH] msg: add priority to dispatcher invocation order So we can ensure that e.g. MDSRank::ms_dispatch is lowest priority so that we do not acquire the mds_lock when looking at beacons. This change maintains the current behavior when the priority is unset: the use of std::stable_sort will ensure that the add_dispatcher_head and add_dispatcher_tail calls will preserve order when dispatcher priorities are equal. Fixes: 7fc04be9332704946ba6f0e95cfcd1afc34fc0fe Signed-off-by: Patrick Donnelly (cherry picked from commit b463d93b08f392ebd636c24bf5f0fa4249600256) --- src/msg/Dispatcher.h | 6 ++++ src/msg/Messenger.h | 82 +++++++++++++++++++++++++++----------------- 2 files changed, 57 insertions(+), 31 deletions(-) diff --git a/src/msg/Dispatcher.h b/src/msg/Dispatcher.h index 885f1843b31..b04ba361485 100644 --- a/src/msg/Dispatcher.h +++ b/src/msg/Dispatcher.h @@ -29,6 +29,12 @@ class KeyStore; class Dispatcher { public: + /* Ordering of dispatch for a list of Dispatchers. */ + using priority_t = uint32_t; + static constexpr priority_t PRIORITY_HIGH = std::numeric_limits::max() / 4; + static constexpr priority_t PRIORITY_DEFAULT = std::numeric_limits::max() / 2; + static constexpr priority_t PRIORITY_LOW = (std::numeric_limits::max() / 4) * 3; + explicit Dispatcher(CephContext *cct_) : cct(cct_) { diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index 71b7d2549c5..9f767a788aa 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -17,9 +17,9 @@ #ifndef CEPH_MESSENGER_H #define CEPH_MESSENGER_H -#include #include #include +#include #include #include @@ -92,8 +92,18 @@ struct Interceptor { class Messenger { private: - std::deque dispatchers; - std::deque fast_dispatchers; + struct PriorityDispatcher { + using priority_t = Dispatcher::priority_t; + priority_t priority; + Dispatcher* dispatcher; + + bool operator<(const PriorityDispatcher& other) const { + return priority < other.priority; + } + }; + std::vector dispatchers; + std::vector fast_dispatchers; + ZTracer::Endpoint trace_endpoint; protected: @@ -390,11 +400,14 @@ public: * * @param d The Dispatcher to insert into the list. */ - void add_dispatcher_head(Dispatcher *d) { + void add_dispatcher_head(Dispatcher *d, PriorityDispatcher::priority_t priority=Dispatcher::PRIORITY_DEFAULT) { bool first = dispatchers.empty(); - dispatchers.push_front(d); - if (d->ms_can_fast_dispatch_any()) - fast_dispatchers.push_front(d); + dispatchers.insert(dispatchers.begin(), PriorityDispatcher{priority, d}); + std::stable_sort(dispatchers.begin(), dispatchers.end()); + if (d->ms_can_fast_dispatch_any()) { + fast_dispatchers.insert(fast_dispatchers.begin(), PriorityDispatcher{priority, d}); + std::stable_sort(fast_dispatchers.begin(), fast_dispatchers.end()); + } if (first) ready(); } @@ -405,11 +418,14 @@ public: * * @param d The Dispatcher to insert into the list. */ - void add_dispatcher_tail(Dispatcher *d) { + void add_dispatcher_tail(Dispatcher *d, PriorityDispatcher::priority_t priority=Dispatcher::PRIORITY_DEFAULT) { bool first = dispatchers.empty(); - dispatchers.push_back(d); - if (d->ms_can_fast_dispatch_any()) - fast_dispatchers.push_back(d); + dispatchers.push_back(PriorityDispatcher{priority, d}); + std::stable_sort(dispatchers.begin(), dispatchers.end()); + if (d->ms_can_fast_dispatch_any()) { + fast_dispatchers.push_back(PriorityDispatcher{priority, d}); + std::stable_sort(fast_dispatchers.begin(), fast_dispatchers.end()); + } if (first) ready(); } @@ -668,9 +684,10 @@ public: * @param m The Message we are testing. */ bool ms_can_fast_dispatch(const ceph::cref_t& m) { - for (const auto &dispatcher : fast_dispatchers) { - if (dispatcher->ms_can_fast_dispatch2(m)) - return true; + for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) { + if (dispatcher->ms_can_fast_dispatch2(m)) { + return true; + } } return false; } @@ -683,10 +700,10 @@ public: */ void ms_fast_dispatch(const ceph::ref_t &m) { m->set_dispatch_stamp(ceph_clock_now()); - for (const auto &dispatcher : fast_dispatchers) { + for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) { if (dispatcher->ms_can_fast_dispatch2(m)) { - dispatcher->ms_fast_dispatch2(m); - return; + dispatcher->ms_fast_dispatch2(m); + return; } } ceph_abort(); @@ -698,7 +715,7 @@ public: * */ void ms_fast_preprocess(const ceph::ref_t &m) { - for (const auto &dispatcher : fast_dispatchers) { + for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) { dispatcher->ms_fast_preprocess2(m); } } @@ -711,9 +728,10 @@ public: */ void ms_deliver_dispatch(const ceph::ref_t &m) { m->set_dispatch_stamp(ceph_clock_now()); - for (const auto &dispatcher : dispatchers) { - if (dispatcher->ms_dispatch2(m)) - return; + for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) { + if (dispatcher->ms_dispatch2(m)) { + return; + } } lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from " << m->get_source_inst() << dendl; @@ -730,7 +748,7 @@ public: * @param con Pointer to the new Connection. */ void ms_deliver_handle_connect(Connection *con) { - for (const auto& dispatcher : dispatchers) { + for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) { dispatcher->ms_handle_connect(con); } } @@ -743,7 +761,7 @@ public: * @param con Pointer to the new Connection. */ void ms_deliver_handle_fast_connect(Connection *con) { - for (const auto& dispatcher : fast_dispatchers) { + for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) { dispatcher->ms_handle_fast_connect(con); } } @@ -755,7 +773,7 @@ public: * @param con Pointer to the new Connection. */ void ms_deliver_handle_accept(Connection *con) { - for (const auto& dispatcher : dispatchers) { + for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) { dispatcher->ms_handle_accept(con); } } @@ -767,7 +785,7 @@ public: * @param con Pointer to the new Connection. */ void ms_deliver_handle_fast_accept(Connection *con) { - for (const auto& dispatcher : fast_dispatchers) { + for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) { dispatcher->ms_handle_fast_accept(con); } } @@ -780,9 +798,10 @@ public: * @param con Pointer to the broken Connection. */ void ms_deliver_handle_reset(Connection *con) { - for (const auto& dispatcher : dispatchers) { - if (dispatcher->ms_handle_reset(con)) - return; + for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) { + if (dispatcher->ms_handle_reset(con)) { + return; + } } } /** @@ -793,7 +812,7 @@ public: * @param con Pointer to the broken Connection. */ void ms_deliver_handle_remote_reset(Connection *con) { - for (const auto& dispatcher : dispatchers) { + for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) { dispatcher->ms_handle_remote_reset(con); } } @@ -807,9 +826,10 @@ public: * @param con Pointer to the broken Connection. */ void ms_deliver_handle_refused(Connection *con) { - for (const auto& dispatcher : dispatchers) { - if (dispatcher->ms_handle_refused(con)) + for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) { + if (dispatcher->ms_handle_refused(con)) { return; + } } } -- 2.39.5