]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: use cluster/client_messenger as appropriate.
authorGreg Farnum <gregf@hq.newdream.net>
Wed, 7 Jul 2010 23:23:43 +0000 (16:23 -0700)
committerGreg Farnum <gregf@hq.newdream.net>
Tue, 3 Aug 2010 21:56:01 +0000 (14:56 -0700)
Modify send_incremental_map to choose based on the type of the entity.

src/osd/OSD.cc

index 442072be923c4e0432010f9ca6fb0bd52cf8d574..f40121ef3b314206735b11a52262b80a793dd6f0 100644 (file)
@@ -337,7 +337,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, M
   monc(mc),
   logger(NULL), logger_started(false),
   store(NULL),
-  logclient(messenger, &mc->monmap, mc),
+  logclient(cluster_messenger, &mc->monmap, mc),
   whoami(id),
   dev_path(dev), journal_path(jdev),
   dispatch_running(false),
@@ -378,7 +378,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, M
   scrub_wq(this, &disk_tp),
   remove_wq(this, &disk_tp)
 {
-  monc->set_messenger(messenger);
+  monc->set_messenger(cluster_messenger);
   
   osdmap = 0;
 
@@ -1358,7 +1358,7 @@ void OSD::heartbeat()
     dout(0) << "got SIGTERM, shutting down" << dendl;
     Message *m = new MGenericMessage(CEPH_MSG_SHUTDOWN);
     m->set_priority(CEPH_MSG_PRIO_HIGHEST);
-    messenger->send_message(m, messenger->get_myinst());
+    cluster_messenger->send_message(m, cluster_messenger->get_myinst());
     return;
   }
 
@@ -1437,8 +1437,8 @@ void OSD::tick()
 
   if (got_sigterm) {
     dout(0) << "got SIGTERM, shutting down" << dendl;
-    messenger->send_message(new MGenericMessage(CEPH_MSG_SHUTDOWN),
-                           messenger->get_myinst());
+    cluster_messenger->send_message(new MGenericMessage(CEPH_MSG_SHUTDOWN),
+                           cluster_messenger->get_myinst());
     return;
   }
 
@@ -1468,7 +1468,7 @@ void OSD::tick()
         q++) {
       if (osdmap->is_up(q->first)) {
        MOSDPGRemove *m = new MOSDPGRemove(p->first, q->second);
-       messenger->send_message(m, osdmap->get_inst(q->first));
+       cluster_messenger->send_message(m, osdmap->get_cluster_inst(q->first));
       }
     }
   remove_list.clear();
@@ -1525,7 +1525,7 @@ void OSD::send_boot()
   entity_addr_t hb_addr = heartbeat_messenger->get_myaddr();
   if (hb_addr.is_blank_addr()) {
     int port = hb_addr.get_port();
-    hb_addr = messenger->get_myaddr();
+    hb_addr = cluster_messenger->get_myaddr();
     hb_addr.set_port(port);
   }
   monc->send_mon_message(new MOSDBoot(superblock, hb_addr));
@@ -2104,7 +2104,7 @@ void OSD::wait_for_new_map(Message *m)
 
 void OSD::note_down_osd(int osd)
 {
-  messenger->mark_down(osdmap->get_addr(osd));
+  cluster_messenger->mark_down(osdmap->get_cluster_addr(osd));
 
   heartbeat_lock.Lock();
 
@@ -2369,7 +2369,7 @@ void OSD::handle_osd_map(MOSDMap *m)
   // all the way?
   if (advanced && cur == superblock.newest_map) {
     if (osdmap->is_up(whoami) &&
-       osdmap->get_addr(whoami) == messenger->get_myaddr()) {
+       osdmap->get_addr(whoami) == client_messenger->get_myaddr()) {
       // yay!
       activate_map(t, fin->contexts);
 
@@ -2392,7 +2392,7 @@ void OSD::handle_osd_map(MOSDMap *m)
   if (osdmap->get_epoch() > 0 &&
       state != STATE_BOOTING &&
       (!osdmap->exists(whoami) || 
-       (!osdmap->is_up(whoami) && osdmap->get_addr(whoami) == messenger->get_myaddr()))) {
+       (!osdmap->is_up(whoami) && osdmap->get_addr(whoami) == client_messenger->get_myaddr()))) {
     dout(0) << "map says i am down.  switching to boot state." << dendl;
     //shutdown();
 
@@ -2458,7 +2458,7 @@ void OSD::advance_map(ObjectStore::Transaction& t)
 
   if (!up_epoch &&
       osdmap->is_up(whoami) &&
-      osdmap->get_inst(whoami) == messenger->get_myinst()) {
+      osdmap->get_inst(whoami) == client_messenger->get_myinst()) {
     up_epoch = osdmap->get_epoch();
     dout(10) << "up_epoch is " << up_epoch << dendl;
     if (!boot_epoch) {
@@ -2798,11 +2798,12 @@ void OSD::send_incremental_map(epoch_t since, const entity_inst_t& inst, bool la
       assert(0);  // we should have all maps.
     }
   }
-
+  Messenger *msgr = client_messenger;
+  if (entity_name_t::TYPE_OSD == inst.name._type) msgr = cluster_messenger;
   if (lazy)
-    messenger->lazy_send_message(m, inst);  // only if we already have an open connection
+    msgr->lazy_send_message(m, inst);  // only if we already have an open connection
   else
-    messenger->send_message(m, inst);
+    msgr->send_message(m, inst);
 }
 
 bool OSD::get_map_bl(epoch_t e, bufferlist& bl)
@@ -3304,8 +3305,8 @@ void OSD::do_notifies(map< int, vector<PG::Info> >& notify_list)
     }
     dout(7) << "do_notify osd" << it->first << " on " << it->second.size() << " PGs" << dendl;
     MOSDPGNotify *m = new MOSDPGNotify(osdmap->get_epoch(), it->second);
-    _share_map_outgoing(osdmap->get_inst(it->first));
-    messenger->send_message(m, osdmap->get_inst(it->first));
+    _share_map_outgoing(osdmap->get_cluster_inst(it->first));
+    cluster_messenger->send_message(m, osdmap->get_cluster_inst(it->first));
   }
 }
 
@@ -3322,8 +3323,8 @@ void OSD::do_queries(map< int, map<pg_t,PG::Query> >& query_map)
     dout(7) << "do_queries querying osd" << who
             << " on " << pit->second.size() << " PGs" << dendl;
     MOSDPGQuery *m = new MOSDPGQuery(osdmap->get_epoch(), pit->second);
-    _share_map_outgoing(osdmap->get_inst(who));
-    messenger->send_message(m, osdmap->get_inst(who));
+    _share_map_outgoing(osdmap->get_cluster_inst(who));
+    cluster_messenger->send_message(m, osdmap->get_cluster_inst(who));
   }
 }
 
@@ -3333,7 +3334,7 @@ void OSD::do_infos(map<int,MOSDPGInfo*>& info_map)
   for (map<int,MOSDPGInfo*>::iterator p = info_map.begin();
        p != info_map.end();
        ++p) 
-    messenger->send_message(p->second, osdmap->get_inst(p->first));
+    cluster_messenger->send_message(p->second, osdmap->get_inst(p->first));
   info_map.clear();
 }
 
@@ -3815,8 +3816,8 @@ void OSD::handle_pg_query(MOSDPGQuery *m)
        dout(10) << *pg << " sending " << mlog->log << " " << mlog->missing << dendl;
        //m->log.print(cout);
        
-       _share_map_outgoing(osdmap->get_inst(from));
-       messenger->send_message(mlog, m->get_connection());
+       _share_map_outgoing(osdmap->get_cluster_inst(from));
+       cluster_messenger->send_message(mlog, m->get_connection());
       }
     }    
 
@@ -4070,8 +4071,8 @@ void OSD::generate_backlog(PG *pg)
     MOSDPGLog *m = new MOSDPGLog(osdmap->get_epoch(), pg->info);
     m->missing = pg->missing;
     m->log = pg->log;
-    _share_map_outgoing(osdmap->get_inst(pg->get_primary()));
-    messenger->send_message(m, osdmap->get_inst(pg->get_primary()));
+    _share_map_outgoing(osdmap->get_cluster_inst(pg->get_primary()));
+    cluster_messenger->send_message(m, osdmap->get_cluster_inst(pg->get_primary()));
   } else {
     dout(10) << *pg << "  generated backlog, peering" << dendl;
 
@@ -4277,7 +4278,10 @@ void OSD::reply_op_error(MOSDOp *op, int err)
     flags = CEPH_OSD_FLAG_ACK;
 
   MOSDOpReply *reply = new MOSDOpReply(op, err, osdmap->get_epoch(), flags);
-  messenger->send_message(reply, op->get_connection());
+  Messenger *msgr = client_messenger;
+  if (op->get_source().is_osd())
+    msgr = cluster_messenger;
+  msgr->send_message(reply, op->get_connection());
   op->put();
 }
 
@@ -4652,7 +4656,7 @@ void OSD::dequeue_op(PG *pg)
     //  do this preemptively while we hold osd_lock and pg->lock
     //  to avoid lock ordering issues later.
     for (unsigned i=1; i<pg->acting.size(); i++) 
-      _share_map_outgoing( osdmap->get_inst(pg->acting[i]) ); 
+      _share_map_outgoing( osdmap->get_cluster_inst(pg->acting[i]) );
   }
   osd_lock.Unlock();