]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Revert "osd: replace the ALLOW_MESSAGES_FROM macro with use of OSDCaps functions"
authorGreg Farnum <gregf@hq.newdream.net>
Tue, 13 Apr 2010 18:03:07 +0000 (11:03 -0700)
committerGreg Farnum <gregf@hq.newdream.net>
Tue, 13 Apr 2010 18:03:45 +0000 (11:03 -0700)
This reverts commit 9ceb3be9a62f7b777b23a4d65b1cc302ec834e6b.
This will be coming back in another form shortly, but apparently we have
more connections without sessions on the OSD.

src/osd/OSD.cc

index 0dac62dd45950efb24a99333f5490be7817786ef..e7fafdd7574e4ac4c1c2f9778ca66041375a6672 100644 (file)
@@ -1507,15 +1507,6 @@ void OSD::send_pg_stats()
 void OSD::handle_pg_stats_ack(MPGStatsAck *ack)
 {
   dout(10) << "handle_pg_stats_ack " << dendl;
-  Session *session = (Session *)ack->get_connection()->get_priv();
-  if (!session->caps.is_mon()) {
-    dout(0) << "handle_pg_stats_ack received from non-osd connection "
-           << ack->get_connection() << dendl;
-    delete ack;
-    session->put();
-    return;
-  }
-  session->put();
 
   pg_stat_queue_lock.Lock();
 
@@ -1552,15 +1543,6 @@ void OSD::handle_pg_stats_ack(MPGStatsAck *ack)
 
 void OSD::handle_command(MMonCommand *m)
 {
-  Session *session = (Session *)m->get_connection()->get_priv();
-  if (!session->caps.is_mon()) {
-    dout(0) << "handle_command received from non-mon connection "
-           << m->get_connection() << dendl;
-    delete m;
-    session->put();
-    return;
-  }
-  session->put();
   dout(20) << "handle_command args: " << m->cmd << dendl;
   if (m->cmd[0] == "injectargs")
     parse_config_option_string(m->cmd[1]);
@@ -1766,13 +1748,27 @@ void OSD::do_waiters()
 
 void OSD::_dispatch(Message *m)
 {
+  bool check_from = false;
+
+#define ALLOW_MESSAGES_FROM(peers) \
+do { \
+  if (m->get_connection() && (m->get_connection()->get_peer_type() & (peers)) == 0) { \
+    dout(0) << "filtered out request, peer=" << m->get_connection()->get_peer_type() \
+           << " allowing=" << #peers << " message=" << *m << dendl; \
+    delete m; \
+    return; \
+  } \
+  check_from = true; \
+} while (0)
+
   assert(osd_lock.is_locked());
   dout(20) << "_dispatch " << m << " " << *m << dendl;
-  Session *session = NULL;
+
   switch (m->get_type()) {
 
     // -- don't need lock -- 
   case CEPH_MSG_PING:
+    ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_ANY);
     dout(10) << "ping from " << m->get_source() << dendl;
     delete m;
     break;
@@ -1781,32 +1777,34 @@ void OSD::_dispatch(Message *m)
 
     // map and replication
   case CEPH_MSG_OSD_MAP:
+    ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD);
     handle_osd_map((MOSDMap*)m);
     break;
 
     // osd
   case CEPH_MSG_SHUTDOWN:
-    session = (Session *)m->get_connection()->get_priv();
-    if (session->caps.is_mon() || session->caps.is_osd()) shutdown();
-    else dout(0) << "shutdown message from connection with insufficient privs!"
-                << m->get_connection() << dendl;
+    ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD);
+    shutdown();
     delete m;
-    session->put();
     break;
 
   case MSG_PGSTATSACK:
+    ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
     handle_pg_stats_ack((MPGStatsAck*)m);
     break;
 
   case MSG_MON_COMMAND:
+    ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
     handle_command((MMonCommand*) m);
     break;
 
   case MSG_OSD_SCRUB:
+    ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
     handle_scrub((MOSDScrub*)m);
     break;    
 
   case MSG_CLASS:
+    ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
     handle_class((MClass*)m);
     break;
 
@@ -1816,6 +1814,7 @@ void OSD::_dispatch(Message *m)
     {
       // no map?  starting up?
       if (!osdmap) {
+        ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_ANY);
         dout(7) << "no OSDMap, not booted" << dendl;
         waiting_for_osdmap.push_back(m);
         break;
@@ -1825,59 +1824,62 @@ void OSD::_dispatch(Message *m)
       switch (m->get_type()) {
 
       case MSG_OSD_PG_CREATE:
+        ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
        handle_pg_create((MOSDPGCreate*)m);
        break;
         
       case MSG_OSD_PG_NOTIFY:
+        ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_OSD);
         handle_pg_notify((MOSDPGNotify*)m);
         break;
       case MSG_OSD_PG_QUERY:
+        ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_OSD);
         handle_pg_query((MOSDPGQuery*)m);
         break;
       case MSG_OSD_PG_LOG:
+        ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_OSD);
         handle_pg_log((MOSDPGLog*)m);
         break;
       case MSG_OSD_PG_REMOVE:
+        ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_OSD);
         handle_pg_remove((MOSDPGRemove*)m);
         break;
       case MSG_OSD_PG_INFO:
+        ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_OSD);
         handle_pg_info((MOSDPGInfo*)m);
         break;
       case MSG_OSD_PG_TRIM:
+        ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_OSD);
         handle_pg_trim((MOSDPGTrim*)m);
         break;
 
        // client ops
       case CEPH_MSG_OSD_OP:
+        ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_ANY);
         handle_op((MOSDOp*)m);
         break;
         
         // for replication etc.
       case MSG_OSD_SUBOP:
+        ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_OSD);
        handle_sub_op((MOSDSubOp*)m);
        break;
       case MSG_OSD_SUBOPREPLY:
+        ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_OSD);
         handle_sub_op_reply((MOSDSubOpReply*)m);
         break;
         
       }
     }
   }
+  assert(check_from);
 }
 
 
 void OSD::handle_scrub(MOSDScrub *m)
 {
   dout(10) << "handle_scrub " << *m << dendl;
-  Session *session = (Session *)m->get_connection()->get_priv();
-  if (!session->caps.is_mon()) {
-    dout(0) << "handle_scrub received from non-mon connection "
-           << m->get_connection() << dendl;
-    delete m;
-    session->put();
-    return;
-  }
-  session->put();
+  
   if (ceph_fsid_compare(&m->fsid, &monc->get_fsid())) {
     dout(0) << "handle_scrub fsid " << m->fsid << " != " << monc->get_fsid() << dendl;
     delete m;
@@ -1979,15 +1981,6 @@ void OSD::handle_osd_map(MOSDMap *m)
     return;
   }
 
-  Session *session = (Session *)m->get_connection()->get_priv();
-  if (!(session->caps.is_mon() || session->caps.is_osd())) {
-    //not enough perms!
-    delete m;
-    session->put();
-    return;
-  }
-  session->put();
-
   if (osdmap) {
     dout(3) << "handle_osd_map epochs [" 
             << m->get_first() << "," << m->get_last() 
@@ -2951,15 +2944,7 @@ void OSD::split_pg(PG *parent, map<pg_t,PG*>& children, ObjectStore::Transaction
 void OSD::handle_pg_create(MOSDPGCreate *m)
 {
   dout(10) << "handle_pg_create " << *m << dendl;
-  Session *session = (Session *)m->get_connection()->get_priv();
-  if (!session->caps.is_mon()) {
-    dout(0) << "handle_pg_create received from non-mon connection "
-           << m->get_connection() << dendl;
-    delete m;
-    session->put();
-    return;
-  }
-  session->put();
+
   if (!require_same_or_newer_map(m, m->epoch)) return;
 
   map< int, map<pg_t,PG::Query> > query_map;
@@ -3127,15 +3112,7 @@ void OSD::handle_pg_notify(MOSDPGNotify *m)
 {
   dout(7) << "handle_pg_notify from " << m->get_source() << dendl;
   int from = m->get_source().num();
-  Session *session = (Session *)m->get_connection()->get_priv();
-  if (!session->caps.is_osd()) {
-    dout(0) << "handle_pg_notify received from non-osd connection "
-           << m->get_connection() << dendl;
-    delete m;
-    session->put();
-    return;
-  }
-  session->put();
+
   if (!require_same_or_newer_map(m, m->get_epoch())) return;
 
   ObjectStore::Transaction *t = new ObjectStore::Transaction;
@@ -3359,16 +3336,6 @@ void OSD::handle_pg_log(MOSDPGLog *m)
 {
   dout(7) << "handle_pg_log " << *m << " from " << m->get_source() << dendl;
 
-  Session *session = (Session *)m->get_connection()->get_priv();
-  if (!session->caps.is_osd()) {
-    dout(0) << "handle_scrub received from non-osd connection "
-           << m->get_connection() << dendl;
-    delete m;
-    session->put();
-    return;
-  }
-  session->put();
-
   int from = m->get_source().num();
   int created = 0;
   if (!require_same_or_newer_map(m, m->get_epoch())) return;
@@ -3385,15 +3352,6 @@ void OSD::handle_pg_log(MOSDPGLog *m)
 void OSD::handle_pg_info(MOSDPGInfo *m)
 {
   dout(7) << "handle_pg_info " << *m << " from " << m->get_source() << dendl;
-  Session *session = (Session *)m->get_connection()->get_priv();
-  if (!session->caps.is_osd()) {
-    dout(0) << "handle_scrub received from non-osd connection "
-           << m->get_connection() << dendl;
-    delete m;
-    session->put();
-    return;
-  }
-  session->put();
 
   int from = m->get_source().num();
   if (!require_same_or_newer_map(m, m->get_epoch())) return;
@@ -3419,16 +3377,6 @@ void OSD::handle_pg_trim(MOSDPGTrim *m)
 {
   dout(7) << "handle_pg_trim " << *m << " from " << m->get_source() << dendl;
 
-  Session *session = (Session *)m->get_connection()->get_priv();
-  if (!session->caps.is_osd()) {
-    dout(0) << "handle_scrub received from non-osd connection "
-           << m->get_connection() << dendl;
-    delete m;
-    session->put();
-    return;
-  }
-  session->put();
-
   int from = m->get_source().num();
   if (!require_same_or_newer_map(m, m->epoch)) return;
 
@@ -3474,15 +3422,6 @@ void OSD::handle_pg_trim(MOSDPGTrim *m)
 void OSD::handle_pg_query(MOSDPGQuery *m) 
 {
   assert(osd_lock.is_locked());
-  Session *session = (Session *)m->get_connection()->get_priv();
-  if (!session->caps.is_osd()) {
-    dout(0) << "handle_scrub received from non-mon connection "
-           << m->get_connection() << dendl;
-    delete m;
-    session->put();
-    return;
-  }
-  session->put();
 
   dout(7) << "handle_pg_query from " << m->get_source() << " epoch " << m->get_epoch() << dendl;
   int from = m->get_source().num();
@@ -3612,15 +3551,6 @@ void OSD::handle_pg_query(MOSDPGQuery *m)
 void OSD::handle_pg_remove(MOSDPGRemove *m)
 {
   assert(osd_lock.is_locked());
-  Session *session = (Session *)m->get_connection()->get_priv();
-  if (!session->caps.is_osd()) {
-    dout(0) << "handle_scrub received from non-osd connection "
-           << m->get_connection() << dendl;
-    delete m;
-    session->put();
-    return;
-  }
-  session->put();
 
   dout(7) << "handle_pg_remove from " << m->get_source() << " on "
          << m->pg_list.size() << " pgs" << dendl;
@@ -4298,16 +4228,6 @@ void OSD::handle_sub_op(MOSDSubOp *op)
     return;
   }
 
-  Session *session = (Session *)op->get_connection()->get_priv();
-  if (!session->caps.is_osd()) {
-    dout(0) << "handle_scrub received from non-osd connection "
-           << op->get_connection() << dendl;
-    delete op;
-    session->put();
-    return;
-  }
-  session->put();
-
   // must be a rep op.
   assert(op->get_source().is_osd());
   
@@ -4357,16 +4277,6 @@ void OSD::handle_sub_op_reply(MOSDSubOpReply *op)
     return;
   }
 
-  Session *session = (Session *)op->get_connection()->get_priv();
-  if (!session->caps.is_osd()) {
-    dout(0) << "handle_scrub received from non-osd connection "
-           << op->get_connection() << dendl;
-    delete op;
-    session->put();
-    return;
-  }
-  session->put();
-
   // must be a rep op.
   assert(op->get_source().is_osd());
   
@@ -4540,15 +4450,6 @@ void OSD::got_class(const nstring& cname)
 
 void OSD::handle_class(MClass *m)
 {
-  Session *session = (Session *)m->get_connection()->get_priv();
-  if (!session->caps.is_mon()) {
-    dout(0) << "handle_class received from non-mon connection "
-           << m->get_connection() << dendl;
-    delete m;
-    session->put();
-    return;
-  }
-  session->put();
   Mutex::Locker l(class_lock);
   dout(0) << "handle_class action=" << m->action << dendl;