bool MDS::_dispatch(Message *m)
{
+ bool check_from = false;
+
+#define ALLOW_MESSAGES_FROM(peers) \
+do { \
+ if ((m->get_connection()->get_peer_type() & (peers)) == 0) { \
+ dout(0) << "filtered out request, peer=" << m->get_connection()->get_peer_type() \
+ << " allowing=" << #peers << " message=" << *m << dendl; \
+ delete m; \
+ return true; \
+ } \
+ check_from = true; \
+} while (0)
+
// from bad mds?
if (m->get_source().is_mds()) {
int from = m->get_source().num();
switch (m->get_type()) {
case CEPH_MSG_MON_MAP:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
delete m;
break;
// MDS
case CEPH_MSG_MDS_MAP:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_MDS);
handle_mds_map((MMDSMap*)m);
break;
case MSG_MDS_BEACON:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
handle_mds_beacon((MMDSBeacon*)m);
break;
// misc
case MSG_MON_COMMAND:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
parse_config_option_string(((MMonCommand*)m)->cmd[0]);
delete m;
break;
int port = m->get_type() & 0xff00;
switch (port) {
case MDS_PORT_CACHE:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
mdcache->dispatch(m);
break;
case MDS_PORT_LOCKER:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
locker->dispatch(m);
break;
case MDS_PORT_MIGRATOR:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
mdcache->migrator->dispatch(m);
break;
case CEPH_MSG_CLIENT_SESSION:
case CEPH_MSG_CLIENT_REQUEST:
case CEPH_MSG_CLIENT_RECONNECT:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT);
+ server->dispatch(m);
+ break;
case MSG_MDS_SLAVE_REQUEST:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
server->dispatch(m);
break;
case MSG_MDS_HEARTBEAT:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
balancer->proc_message(m);
break;
case MSG_MDS_TABLE_REQUEST:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS);
{
MMDSTableRequest *req = (MMDSTableRequest*)m;
if (req->op < 0) {
// OSD
case CEPH_MSG_OSD_OPREPLY:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_OSD);
objecter->handle_osd_op_reply((class MOSDOpReply*)m);
break;
case CEPH_MSG_OSD_MAP:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD);
objecter->handle_osd_map((MOSDMap*)m);
if (is_active() && snapserver)
snapserver->check_osd_map(true);
}
}
+ assert(check_from);
+
if (laggy)
return true;