Many dispatchers return false to allow other dispatchers also common messages
like MOSDMap or MFSMap. They implicitly depend on some dispatcher which is
always at the "tail" of the dispatcher queue to return "true" indicating the
msg was processed to avoid messages like:
2025-02-18T05:31:17.738+0000
7f5206546640 0 ms_deliver_dispatch: unhandled message 0x5632d05f0700 fsmap(e 9) from mon.0 v2:172.21.3.230:40412/0
but this cannot always happen when some libraries like the RadosClient used standalone.
So, add a variant for encapsulating other indications for how the message was
processed by dispatch2. For example, a message may be "acknowledged" but
explicitly allow other dispatchers to try processing the message.
Note: we're using a variant to avoid updating all of the ms_dispatch code to
use the sentinel classes.
Signed-off-by: Patrick Donnelly <pdonnell@ibm.com>
// incoming messages
-bool Client::ms_dispatch2(const MessageRef &m)
+Dispatcher::dispatch_result_t Client::ms_dispatch2(const MessageRef &m)
{
RWRef_t iref_reader(initialize_state, CLIENT_INITIALIZED);
if (!iref_reader.is_state_satisfied()) {
void dump_status(Formatter *f); // debug
- bool ms_dispatch2(const MessageRef& m) override;
+ Dispatcher::dispatch_result_t ms_dispatch2(const MessageRef& m) override;
void ms_handle_connect(Connection *con) override;
bool ms_handle_reset(Connection *con) override;
});
}
-bool Beacon::ms_dispatch2(const ref_t<Message>& m)
+Dispatcher::dispatch_result_t Beacon::ms_dispatch2(const ref_t<Message>& m)
{
dout(25) << __func__ << ": processing " << m << dendl;
if (m->get_type() == MSG_MDS_BEACON) {
void init(const MDSMap &mdsmap);
void shutdown();
- bool ms_dispatch2(const ref_t<Message> &m) override;
+ Dispatcher::dispatch_result_t ms_dispatch2(const ref_t<Message> &m) override;
void ms_handle_connect(Connection *c) override {}
bool ms_handle_reset(Connection *c) override {return false;}
void ms_handle_remote_reset(Connection *c) override {}
-bool MDSDaemon::ms_dispatch2(const ref_t<Message> &m)
+Dispatcher::dispatch_result_t MDSDaemon::ms_dispatch2(const ref_t<Message> &m)
{
dout(25) << __func__ << ": processing " << m << dendl;
std::lock_guard l(mds_lock);
class MDSSocketHook *asok_hook = nullptr;
private:
- bool ms_dispatch2(const ref_t<Message> &m) override;
+ Dispatcher::dispatch_result_t ms_dispatch2(const ref_t<Message> &m) override;
bool ms_handle_fast_authentication(Connection *con) override;
void ms_handle_accept(Connection *con) override;
void ms_handle_connect(Connection *con) override;
}
}
-bool MetricAggregator::ms_dispatch2(const ref_t<Message> &m) {
+Dispatcher::dispatch_result_t MetricAggregator::ms_dispatch2(const ref_t<Message> &m) {
dout(25) << " processing " << m << dendl;
if (m->get_type() == MSG_MDS_METRICS &&
m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MDS) {
void notify_mdsmap(const MDSMap &mdsmap);
- bool ms_dispatch2(const ref_t<Message> &m) override;
+ Dispatcher::dispatch_result_t ms_dispatch2(const ref_t<Message> &m) override;
void ms_handle_connect(Connection *c) override {
}
mds(mds) {
}
-bool MetricsHandler::ms_dispatch2(const ref_t<Message> &m) {
+Dispatcher::dispatch_result_t MetricsHandler::ms_dispatch2(const ref_t<Message> &m) {
if (m->get_type() == CEPH_MSG_CLIENT_METRICS &&
m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_CLIENT) {
handle_client_metrics(ref_cast<MClientMetrics>(m));
public:
MetricsHandler(CephContext *cct, MDSRank *mds);
- bool ms_dispatch2(const ref_t<Message> &m) override;
+ Dispatcher::dispatch_result_t ms_dispatch2(const ref_t<Message> &m) override;
void ms_handle_connect(Connection *c) override {
}
return false;
}
-bool DaemonServer::ms_dispatch2(const ref_t<Message>& m)
+Dispatcher::dispatch_result_t DaemonServer::ms_dispatch2(const ref_t<Message>& m)
{
// Note that we do *not* take ::lock here, in order to avoid
// serializing all message handling. It's up to each handler
LogChannelRef auditcl);
~DaemonServer() override;
- bool ms_dispatch2(const ceph::ref_t<Message>& m) override;
+ Dispatcher::dispatch_result_t ms_dispatch2(const ceph::ref_t<Message>& m) override;
bool ms_handle_fast_authentication(Connection *con) override;
void ms_handle_accept(Connection *con) override;
bool ms_handle_reset(Connection *con) override;
}
}
-bool MgrClient::ms_dispatch2(const ref_t<Message>& m)
+Dispatcher::dispatch_result_t MgrClient::ms_dispatch2(const ref_t<Message>& m)
{
std::lock_guard l(lock);
void set_mgr_optional(bool optional_) {mgr_optional = optional_;}
- bool ms_dispatch2(const ceph::ref_t<Message>& m) override;
+ Dispatcher::dispatch_result_t ms_dispatch2(const ceph::ref_t<Message>& m) override;
bool ms_handle_reset(Connection *con) override;
void ms_handle_remote_reset(Connection *con) override {}
bool ms_handle_refused(Connection *con) override;
}
}
-bool MgrStandby::ms_dispatch2(const ref_t<Message>& m)
+Dispatcher::dispatch_result_t MgrStandby::ms_dispatch2(const ref_t<Message>& m)
{
std::lock_guard l(lock);
dout(10) << state_str() << " " << *m << dendl;
MgrStandby(int argc, const char **argv);
~MgrStandby() override;
- bool ms_dispatch2(const ceph::ref_t<Message>& m) override;
+ Dispatcher::dispatch_result_t ms_dispatch2(const ceph::ref_t<Message>& m) override;
bool ms_handle_reset(Connection *con) override { return false; }
void ms_handle_remote_reset(Connection *con) override {}
bool ms_handle_refused(Connection *con) override;
#include "include/common_fwd.h"
#include "msg/MessageRef.h"
+#include <variant>
+
class Messenger;
class Connection;
class CryptoKey;
}
/* ms_dispatch2 because otherwise the child must define both */
- virtual bool ms_dispatch2(const MessageRef &m) {
+ struct HANDLED {};
+ struct UNHANDLED {};
+ struct ACKNOWLEDGED {};
+ typedef std::variant<bool, HANDLED, UNHANDLED, ACKNOWLEDGED> dispatch_result_t;
+
+ static inline dispatch_result_t fold_dispatch_result(dispatch_result_t r) {
+ if (std::holds_alternative<bool>(r)) {
+ if (std::get<bool>(r)) {
+ return HANDLED();
+ } else {
+ return UNHANDLED();
+ }
+ } else {
+ return r;
+ }
+ }
+
+ virtual dispatch_result_t ms_dispatch2(const MessageRef &m) {
/* allow old style dispatch handling that expects a Message * with a floating ref */
MessageRef mr(m);
if (ms_dispatch(mr.get())) {
*/
void ms_deliver_dispatch(const ceph::ref_t<Message> &m) {
m->set_dispatch_stamp(ceph_clock_now());
+ bool acked = false;
for ([[maybe_unused]] const auto& [priority, dispatcher] : dispatchers) {
- if (dispatcher->ms_dispatch2(m)) {
+ auto r = Dispatcher::fold_dispatch_result(dispatcher->ms_dispatch2(m));
+ if (std::holds_alternative<Dispatcher::HANDLED>(r)) {
return;
+ } else if (std::holds_alternative<Dispatcher::ACKNOWLEDGED>(r)) {
+ acked = true;
}
}
+ if (acked)
+ return;
lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from "
<< m->get_source_inst() << dendl;
ceph_assert(!cct->_conf->ms_die_on_unhandled_msg);
map = new_map;
}
-bool NVMeofGwMonitorClient::ms_dispatch2(const ref_t<Message>& m)
+Dispatcher::dispatch_result_t NVMeofGwMonitorClient::ms_dispatch2(const ref_t<Message>& m)
{
std::lock_guard l(lock);
dout(10) << "got map type " << m->get_type() << dendl;
~NVMeofGwMonitorClient() override;
// Dispatcher interface
- bool ms_dispatch2(const ceph::ref_t<Message>& m) override;
+ Dispatcher::dispatch_result_t ms_dispatch2(const ceph::ref_t<Message>& m) override;
bool ms_handle_reset(Connection *con) override { return false; }
void ms_handle_remote_reset(Connection *con) override {}
bool ms_handle_refused(Connection *con) override { return false; };
ClusterWatcher::~ClusterWatcher() {
}
-bool ClusterWatcher::ms_dispatch2(const ref_t<Message> &m) {
+Dispatcher::dispatch_result_t ClusterWatcher::ms_dispatch2(const ref_t<Message> &m) {
if (m->get_type() == CEPH_MSG_FS_MAP) {
if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
handle_fsmap(ref_cast<MFSMap>(m));
Listener &listener);
~ClusterWatcher();
- bool ms_dispatch2(const ref_t<Message> &m) override;
+ Dispatcher::dispatch_result_t ms_dispatch2(const ref_t<Message> &m) override;
void ms_handle_connect(Connection *c) override {
}