__le64 num;
} __attribute__ ((packed));
-#define CEPH_ENTITY_TYPE_MON 1
-#define CEPH_ENTITY_TYPE_MDS 2
-#define CEPH_ENTITY_TYPE_OSD 4
-#define CEPH_ENTITY_TYPE_CLIENT 8
-#define CEPH_ENTITY_TYPE_ADMIN 16
-#define CEPH_ENTITY_TYPE_AUTH 32
+#define CEPH_ENTITY_TYPE_MON 0x01
+#define CEPH_ENTITY_TYPE_MDS 0x02
+#define CEPH_ENTITY_TYPE_OSD 0x04
+#define CEPH_ENTITY_TYPE_CLIENT 0x08
+#define CEPH_ENTITY_TYPE_ADMIN 0x10
+#define CEPH_ENTITY_TYPE_AUTH 0x20
+
+#define CEPH_ENTITY_TYPE_ANY 0xFF
extern const char *ceph_entity_type_name(int type);
void OSD::_dispatch(Message *m)
{
+ bool check_from = false;
+
+#define ALLOW_MESSAGES_FROM(peers) \
+do { \
+ if ((m->get_connection()->get_peer_type() & (peers)) == 0) { \
+ dout(0) << "filtered out request, peer=" << m->get_connection()->get_peer_type() \
+ << " allowing=" << #peers << " message=" << *m << dendl; \
+ delete m; \
+ return; \
+ } \
+ check_from = true; \
+} while (0)
+
assert(osd_lock.is_locked());
dout(20) << "_dispatch " << m << " " << *m << dendl;
// -- don't need lock --
case CEPH_MSG_PING:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_ANY);
dout(10) << "ping from " << m->get_source() << dendl;
delete m;
break;
// map and replication
case CEPH_MSG_OSD_MAP:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD);
handle_osd_map((MOSDMap*)m);
break;
// osd
case CEPH_MSG_SHUTDOWN:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
shutdown();
delete m;
break;
case MSG_PGSTATSACK:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
handle_pg_stats_ack((MPGStatsAck*)m);
break;
case MSG_MON_COMMAND:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
parse_config_option_string(((MMonCommand*)m)->cmd[0]);
delete m;
break;
case MSG_OSD_SCRUB:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
handle_scrub((MOSDScrub*)m);
break;
case MSG_CLASS:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
handle_class((MClass*)m);
break;
{
// no map? starting up?
if (!osdmap) {
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_ANY);
dout(7) << "no OSDMap, not booted" << dendl;
waiting_for_osdmap.push_back(m);
break;
switch (m->get_type()) {
case MSG_OSD_PG_CREATE:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_OSD);
handle_pg_create((MOSDPGCreate*)m);
break;
case MSG_OSD_PG_NOTIFY:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_OSD);
handle_pg_notify((MOSDPGNotify*)m);
break;
case MSG_OSD_PG_QUERY:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_OSD);
handle_pg_query((MOSDPGQuery*)m);
break;
case MSG_OSD_PG_LOG:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_OSD);
handle_pg_log((MOSDPGLog*)m);
break;
case MSG_OSD_PG_REMOVE:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_OSD);
handle_pg_remove((MOSDPGRemove*)m);
break;
case MSG_OSD_PG_INFO:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_OSD);
handle_pg_info((MOSDPGInfo*)m);
break;
case MSG_OSD_PG_TRIM:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_OSD);
handle_pg_trim((MOSDPGTrim*)m);
break;
// client ops
case CEPH_MSG_OSD_OP:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_ANY);
handle_op((MOSDOp*)m);
break;
// for replication etc.
case MSG_OSD_SUBOP:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_OSD);
handle_sub_op((MOSDSubOp*)m);
break;
case MSG_OSD_SUBOPREPLY:
+ ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_OSD);
handle_sub_op_reply((MOSDSubOpReply*)m);
break;
}
}
}
+ assert(check_from);
}