From: Patrick Donnelly Date: Tue, 14 May 2024 18:15:21 +0000 (-0400) Subject: msg: add priority to dispatcher invocation order X-Git-Tag: v20.0.0~1881^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b463d93b08f392ebd636c24bf5f0fa4249600256;p=ceph.git msg: add priority to dispatcher invocation order So we can ensure that e.g. MDSRank::ms_dispatch is lowest priority so that we do not acquire the mds_lock when looking at beacons. This change maintains the current behavior when the priority is unset: the use of std::stable_sort will ensure that the add_dispatcher_head and add_dispatcher_tail calls will preserve order when dispatcher priorities are equal. Fixes: 7fc04be9332704946ba6f0e95cfcd1afc34fc0fe Signed-off-by: Patrick Donnelly --- diff --git a/src/msg/Dispatcher.h b/src/msg/Dispatcher.h index ddf273288b3..799a4a9de91 100644 --- a/src/msg/Dispatcher.h +++ b/src/msg/Dispatcher.h @@ -29,6 +29,12 @@ class KeyStore; class Dispatcher { public: + /* Ordering of dispatch for a list of Dispatchers. */ + using priority_t = uint32_t; + static constexpr priority_t PRIORITY_HIGH = std::numeric_limits::max() / 4; + static constexpr priority_t PRIORITY_DEFAULT = std::numeric_limits::max() / 2; + static constexpr priority_t PRIORITY_LOW = (std::numeric_limits::max() / 4) * 3; + explicit Dispatcher(CephContext *cct_) : cct(cct_) { diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index fe8d7a72b38..830ae9050bb 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -17,9 +17,9 @@ #ifndef CEPH_MESSENGER_H #define CEPH_MESSENGER_H -#include #include #include +#include #include #include @@ -92,8 +92,18 @@ struct Interceptor { class Messenger { private: - std::deque dispatchers; - std::deque fast_dispatchers; + struct PriorityDispatcher { + using priority_t = Dispatcher::priority_t; + priority_t priority; + Dispatcher* dispatcher; + + bool operator<(const PriorityDispatcher& other) const { + return priority < other.priority; + } + }; + std::vector dispatchers; + std::vector fast_dispatchers; + ZTracer::Endpoint trace_endpoint; protected: @@ -389,11 +399,14 @@ public: * * @param d The Dispatcher to insert into the list. */ - void add_dispatcher_head(Dispatcher *d) { + void add_dispatcher_head(Dispatcher *d, PriorityDispatcher::priority_t priority=Dispatcher::PRIORITY_DEFAULT) { bool first = dispatchers.empty(); - dispatchers.push_front(d); - if (d->ms_can_fast_dispatch_any()) - fast_dispatchers.push_front(d); + dispatchers.insert(dispatchers.begin(), PriorityDispatcher{priority, d}); + std::stable_sort(dispatchers.begin(), dispatchers.end()); + if (d->ms_can_fast_dispatch_any()) { + fast_dispatchers.insert(fast_dispatchers.begin(), PriorityDispatcher{priority, d}); + std::stable_sort(fast_dispatchers.begin(), fast_dispatchers.end()); + } if (first) ready(); } @@ -404,11 +417,14 @@ public: * * @param d The Dispatcher to insert into the list. */ - void add_dispatcher_tail(Dispatcher *d) { + void add_dispatcher_tail(Dispatcher *d, PriorityDispatcher::priority_t priority=Dispatcher::PRIORITY_DEFAULT) { bool first = dispatchers.empty(); - dispatchers.push_back(d); - if (d->ms_can_fast_dispatch_any()) - fast_dispatchers.push_back(d); + dispatchers.push_back(PriorityDispatcher{priority, d}); + std::stable_sort(dispatchers.begin(), dispatchers.end()); + if (d->ms_can_fast_dispatch_any()) { + fast_dispatchers.push_back(PriorityDispatcher{priority, d}); + std::stable_sort(fast_dispatchers.begin(), fast_dispatchers.end()); + } if (first) ready(); } @@ -667,9 +683,10 @@ public: * @param m The Message we are testing. */ bool ms_can_fast_dispatch(const ceph::cref_t& m) { - for (const auto &dispatcher : fast_dispatchers) { - if (dispatcher->ms_can_fast_dispatch2(m)) - return true; + for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) { + if (dispatcher->ms_can_fast_dispatch2(m)) { + return true; + } } return false; } @@ -682,10 +699,10 @@ public: */ void ms_fast_dispatch(const ceph::ref_t &m) { m->set_dispatch_stamp(ceph_clock_now()); - for (const auto &dispatcher : fast_dispatchers) { + for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) { if (dispatcher->ms_can_fast_dispatch2(m)) { - dispatcher->ms_fast_dispatch2(m); - return; + dispatcher->ms_fast_dispatch2(m); + return; } } ceph_abort(); @@ -697,7 +714,7 @@ public: * */ void ms_fast_preprocess(const ceph::ref_t &m) { - for (const auto &dispatcher : fast_dispatchers) { + for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) { dispatcher->ms_fast_preprocess2(m); } } @@ -710,9 +727,10 @@ public: */ void ms_deliver_dispatch(const ceph::ref_t &m) { m->set_dispatch_stamp(ceph_clock_now()); - for (const auto &dispatcher : dispatchers) { - if (dispatcher->ms_dispatch2(m)) - return; + for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) { + if (dispatcher->ms_dispatch2(m)) { + return; + } } lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from " << m->get_source_inst() << dendl; @@ -729,7 +747,7 @@ public: * @param con Pointer to the new Connection. */ void ms_deliver_handle_connect(Connection *con) { - for (const auto& dispatcher : dispatchers) { + for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) { dispatcher->ms_handle_connect(con); } } @@ -742,7 +760,7 @@ public: * @param con Pointer to the new Connection. */ void ms_deliver_handle_fast_connect(Connection *con) { - for (const auto& dispatcher : fast_dispatchers) { + for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) { dispatcher->ms_handle_fast_connect(con); } } @@ -754,7 +772,7 @@ public: * @param con Pointer to the new Connection. */ void ms_deliver_handle_accept(Connection *con) { - for (const auto& dispatcher : dispatchers) { + for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) { dispatcher->ms_handle_accept(con); } } @@ -766,7 +784,7 @@ public: * @param con Pointer to the new Connection. */ void ms_deliver_handle_fast_accept(Connection *con) { - for (const auto& dispatcher : fast_dispatchers) { + for ([[maybe_unused]] const auto& [priority, dispatcher] : fast_dispatchers) { dispatcher->ms_handle_fast_accept(con); } } @@ -779,9 +797,10 @@ public: * @param con Pointer to the broken Connection. */ void ms_deliver_handle_reset(Connection *con) { - for (const auto& dispatcher : dispatchers) { - if (dispatcher->ms_handle_reset(con)) - return; + for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) { + if (dispatcher->ms_handle_reset(con)) { + return; + } } } /** @@ -792,7 +811,7 @@ public: * @param con Pointer to the broken Connection. */ void ms_deliver_handle_remote_reset(Connection *con) { - for (const auto& dispatcher : dispatchers) { + for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) { dispatcher->ms_handle_remote_reset(con); } } @@ -806,9 +825,10 @@ public: * @param con Pointer to the broken Connection. */ void ms_deliver_handle_refused(Connection *con) { - for (const auto& dispatcher : dispatchers) { - if (dispatcher->ms_handle_refused(con)) + for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) { + if (dispatcher->ms_handle_refused(con)) { return; + } } }