/*
* high priority messages we always process
*/
+
+#define ALLOW_MESSAGES_FROM(peers) \
+ do { \
+ if (m->get_connection() && (m->get_connection()->get_peer_type() & (peers)) == 0) { \
+ dout(0) << __FILE__ << "." << __LINE__ << ": filtered out request, peer=" \
+ << m->get_connection()->get_peer_type() << " allowing=" \
+ << #peers << " message=" << *m << dendl; \
+ return true; \
+ } \
+ } while (0)
+
bool MDSDaemon::handle_core_message(const cref_t<Message> &m)
{
switch (m->get_type()) {
if (is_stale_message(m)) {
return true;
}
+ // do not proceed if this message cannot be handled
+ if (!is_valid_message(m)) {
+ return false;
+ }
if (beacon.is_laggy()) {
dout(5) << " laggy, deferring " << *m << dendl;
dout(5) << " there are deferred messages, deferring " << *m << dendl;
waiting_for_nolaggy.push_back(m);
} else {
- if (!handle_deferrable_message(m)) {
- return false;
- }
-
+ handle_message(m);
heartbeat_reset();
}
}
}
+// message types that the mds can handle
+bool MDSRank::is_valid_message(const cref_t<Message> &m) {
+ int port = m->get_type() & 0xff00;
+ int type = m->get_type();
+
+ if (port == MDS_PORT_CACHE ||
+ port == MDS_PORT_MIGRATOR ||
+ type == CEPH_MSG_CLIENT_SESSION ||
+ type == CEPH_MSG_CLIENT_RECONNECT ||
+ type == CEPH_MSG_CLIENT_RECLAIM ||
+ type == CEPH_MSG_CLIENT_REQUEST ||
+ type == MSG_MDS_SLAVE_REQUEST ||
+ type == MSG_MDS_HEARTBEAT ||
+ type == MSG_MDS_TABLE_REQUEST ||
+ type == MSG_MDS_LOCK ||
+ type == MSG_MDS_INODEFILECAPS ||
+ type == CEPH_MSG_CLIENT_CAPS ||
+ type == CEPH_MSG_CLIENT_CAPRELEASE ||
+ type == CEPH_MSG_CLIENT_LEASE) {
+ return true;
+ }
+
+ return false;
+}
+
/*
* lower priority messages we defer if we seem laggy
*/
-bool MDSRank::handle_deferrable_message(const cref_t<Message> &m)
+
+#define ALLOW_MESSAGES_FROM(peers) \
+ do { \
+ if (m->get_connection() && (m->get_connection()->get_peer_type() & (peers)) == 0) { \
+ dout(0) << __FILE__ << "." << __LINE__ << ": filtered out request, peer=" << m->get_connection()->get_peer_type() \
+ << " allowing=" << #peers << " message=" << *m << dendl; \
+ return; \
+ } \
+ } while (0)
+
+void MDSRank::handle_message(const cref_t<Message> &m)
{
int port = m->get_type() & 0xff00;
break;
default:
- return false;
+ derr << "unrecogonized message " << *m << dendl;
}
}
-
- return true;
}
/**
if (!is_stale_message(old)) {
dout(7) << " processing laggy deferred " << *old << dendl;
- if (!handle_deferrable_message(old)) {
- dout(0) << "unrecognized message " << *old << dendl;
- }
+ ceph_assert(is_valid_message(old));
+ handle_message(old);
}
heartbeat_reset();
void inc_dispatch_depth() { ++dispatch_depth; }
void dec_dispatch_depth() { --dispatch_depth; }
void retry_dispatch(const cref_t<Message> &m);
- bool handle_deferrable_message(const cref_t<Message> &m);
+ bool is_valid_message(const cref_t<Message> &m);
+ void handle_message(const cref_t<Message> &m);
void _advance_queues();
bool _dispatch(const cref_t<Message> &m, bool new_msg);
bool is_stale_message(const cref_t<Message> &m) const;
bool ms_dispatch(const cref_t<Message> &m);
};
-// This utility for MDS and MDSRank dispatchers.
-#define ALLOW_MESSAGES_FROM(peers) \
-do { \
- if (m->get_connection() && (m->get_connection()->get_peer_type() & (peers)) == 0) { \
- dout(0) << __FILE__ << "." << __LINE__ << ": filtered out request, peer=" << m->get_connection()->get_peer_type() \
- << " allowing=" << #peers << " message=" << *m << dendl; \
- return true; \
- } \
-} while (0)
-
#endif // MDS_RANK_H_