]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: Now uses separate messenger for heartbeats.
authorGreg Farnum <gregf@hq.newdream.net>
Fri, 4 Dec 2009 22:37:28 +0000 (14:37 -0800)
committerGreg Farnum <gregf@hq.newdream.net>
Sat, 5 Dec 2009 00:03:12 +0000 (16:03 -0800)
Includes changes to OSDMonitor and OSDMap so things keep working

src/cosd.cc
src/messages/MOSDBoot.h
src/mon/OSDMonitor.cc
src/osd/OSD.cc
src/osd/OSDMap.cc
src/osd/OSDMap.h

index eee8d71841ed914997662f452eb8b97a6204c49c..f4267a89736a0e6f3e155f88b8ffe86f7702da66 100644 (file)
@@ -135,8 +135,12 @@ int main(int argc, const char **argv)
   }
 
   // start up network
-  SimpleMessenger rank;
+  g_my_addr.ss_addr() = mc.get_my_addr().ss_addr();
+  g_my_addr.set_port(0);
+
+  SimpleMessenger rank, rank_hb;
   rank.bind();
+  rank_hb.bind();
 
   cout << "starting osd" << whoami
        << " at " << rank.get_rank_addr() 
@@ -151,7 +155,7 @@ int main(int argc, const char **argv)
   assert_warn(m);
   if (!m)
     return 1;
-  Messenger *hbm = rank.register_entity(entity_name_t::OSD(whoami));
+  Messenger *hbm = rank_hb.register_entity(entity_name_t::OSD(whoami));
   assert_warn(hbm);
   if (!hbm)
     return 1;
@@ -161,6 +165,7 @@ int main(int argc, const char **argv)
   rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless_peer());
 
   rank.start();
+  rank_hb.start(true);  // only need to daemon() once
 
   // start osd
   OSD *osd = new OSD(whoami, m, hbm, &mc, g_conf.osd_data, g_conf.osd_journal, mkjournal);
@@ -170,6 +175,7 @@ int main(int argc, const char **argv)
   }
 
   rank.wait();
+  rank_hb.wait();
 
   // done
   delete osd;
index ca4ad0700929984902f6de3e50cff0a85cfcfac0..4cde49af9a27d8e625979c174ac0751d5eadbdbb 100644 (file)
 class MOSDBoot : public PaxosServiceMessage {
  public:
   OSDSuperblock sb;
+  entity_addr_t hb_addr;
 
   MOSDBoot() : PaxosServiceMessage( MSG_OSD_BOOT, 0){}
-  MOSDBoot(OSDSuperblock& s) : 
-    PaxosServiceMessage(MSG_OSD_BOOT, s.current_epoch), sb(s) {
+  MOSDBoot(OSDSuperblock& s, entity_addr_t& hb_addr_ref) : 
+    PaxosServiceMessage(MSG_OSD_BOOT, s.current_epoch),
+    sb(s), hb_addr(hb_addr_ref) {
   }
 
   const char *get_type_name() { return "osd_boot"; }
@@ -37,11 +39,13 @@ class MOSDBoot : public PaxosServiceMessage {
   void encode_payload() {
     paxos_encode();
     ::encode(sb, payload);
+    ::encode(hb_addr, payload);
   }
   void decode_payload() {
     bufferlist::iterator p = payload.begin();
     paxos_decode(p);
     ::decode(sb, p);
+    ::decode(hb_addr, p);
   }
 };
 
index a7d94f69411ac613d04d29e63a0bbadd66a7a30a..af6c3378f097ffe468e2708195c7c5811555cf33 100644 (file)
@@ -437,6 +437,7 @@ bool OSDMonitor::prepare_boot(MOSDBoot *m)
     // mark new guy up.
     down_pending_out.erase(from);  // if any
     pending_inc.new_up[from] = m->get_orig_source_addr();
+    pending_inc.new_hb_up[from] = m->hb_addr;
     
     // mark in?
     pending_inc.new_weight[from] = CEPH_OSD_IN;
index f808dcf79227df61174d19ad30c1f9c4161f5ed9..f79b0de093b12c92bd68e2d0836ed23b090e1279 100644 (file)
@@ -1264,7 +1264,8 @@ void OSD::ms_handle_connect(Connection *con)
 void OSD::send_boot()
 {
   dout(10) << "send_boot" << dendl;
-  monc->send_mon_message(new MOSDBoot(superblock));
+  entity_addr_t hb_addr = heartbeat_messenger->get_myaddr();
+  monc->send_mon_message(new MOSDBoot(superblock, hb_addr));
 }
 
 void OSD::queue_want_up_thru(epoch_t want)
index 334950ebb63f0733bc9c1f1cb01b2017a359b34a..54d6d44625bd0c5fe01c53981e5b487e49ca6919 100644 (file)
@@ -52,7 +52,7 @@ void OSDMap::print(ostream& out)
          << " down_at " << info.down_at
          << " last_clean " << info.last_clean_first << "-" << info.last_clean_last << ")";
       if (is_up(i))
-       out << " " << get_addr(i);
+       out << " " << get_addr(i) << " " << get_hb_addr(i);
       out << "\n";
     }
   }
index 959390aca6acbcea93b48aa08a468aad91981bb0..139a179b4bb1fc2f23272bb852830b6700a85e49 100644 (file)
@@ -155,10 +155,11 @@ public:
 
     map<entity_addr_t,utime_t> new_blacklist;
     vector<entity_addr_t> old_blacklist;
+    map<int32_t, entity_addr_t> new_hb_up;
 
     void encode(bufferlist& bl) {
       // base
-      __u16 v = 2;
+      __u16 v = 3;
       ::encode(v, bl);
       ::encode(fsid, bl);
       ::encode(epoch, bl); 
@@ -176,6 +177,7 @@ public:
       ::encode(new_pg_temp, bl);
 
       // extended
+      ::encode(new_hb_up, bl);
       ::encode(new_pool_names, bl);
       ::encode(new_up_thru, bl);
       ::encode(new_last_clean_interval, bl);
@@ -204,6 +206,8 @@ public:
        ::decode(new_pg_temp, p);
       
       // extended
+      if (v >= 3)
+       ::decode(new_hb_up, p);
       ::decode(new_pool_names, p);
       ::decode(new_up_thru, p);
       ::decode(new_last_clean_interval, p);
@@ -240,6 +244,7 @@ private:
   int32_t max_osd;
   vector<uint8_t> osd_state;
   vector<entity_addr_t> osd_addr;
+  vector<entity_addr_t> osd_hb_addr;
   vector<__u32>   osd_weight;   // 16.16 fixed point, 0x10000 = "in", 0 = "out"
   vector<osd_info_t> osd_info;
   map<pg_t,vector<int> > pg_temp;  // temp pg mapping (e.g. while we rebuild)
@@ -301,6 +306,7 @@ private:
       osd_weight[o] = CEPH_OSD_OUT;
     }
     osd_addr.resize(m);
+    osd_hb_addr.resize(m);
   }
 
   void get_all_osds(set<int32_t>& ls) { 
@@ -390,6 +396,10 @@ private:
     assert(exists(osd));
     return osd_addr[osd];
   }
+  const entity_addr_t &get_hb_addr(int osd) {
+    assert(exists(osd));
+    return osd_hb_addr[osd];
+  }
   entity_inst_t get_inst(int osd) {
     assert(exists(osd) && is_up(osd));
     return entity_inst_t(entity_name_t::OSD(osd),
@@ -406,8 +416,7 @@ private:
   entity_inst_t get_hb_inst(int osd) {
     assert(exists(osd));
     entity_inst_t i(entity_name_t::OSD(osd),
-                   osd_addr[osd]);
-    i.addr.erank = i.addr.erank + 1;  // heartbeat addr erank is regular addr erank + 1
+                   osd_hb_addr[osd]);
     return i;
   }
 
@@ -493,12 +502,18 @@ private:
       //cout << "epoch " << epoch << " down osd" << i->first << endl;
     }
     for (map<int32_t,entity_addr_t>::iterator i = inc.new_up.begin();
-         i != inc.new_up.end(); 
+         i != inc.new_up.end();
          i++) {
       osd_state[i->first] |= CEPH_OSD_UP;
       osd_addr[i->first] = i->second;
+      if (inc.new_hb_up.empty()) {
+       //this is a backward-compatibility hack
+       osd_hb_addr[i->first] = i->second;
+       osd_hb_addr[i->first].erank = osd_hb_addr[i->first].erank + 1;
+      }
+      else osd_hb_addr[i->first] = inc.new_hb_up[i->first];
       osd_info[i->first].up_from = epoch;
-      //cout << "epoch " << epoch << " up osd" << i->first << " at " << i->second << endl;
+      //cout << "epoch " << epoch << " up osd" << i->first << " at " << i->second << "with hb addr" << osd_hb_addr[i->first] << std::endl;
     }
 
     // info
@@ -542,7 +557,7 @@ private:
 
   // serialize, unserialize
   void encode(bufferlist& bl) {
-    __u16 v = 2;
+    __u16 v = 3;
     ::encode(v, bl);
 
     // base
@@ -572,9 +587,9 @@ private:
     ::encode(cbl, bl);
 
     // extended
+    ::encode(osd_hb_addr, bl);
     ::encode(osd_info, bl);
     ::encode(pool_name, bl);
-
     ::encode(blacklist, bl);
   }
   
@@ -609,8 +624,8 @@ private:
     crush.decode(cblp);
 
     // extended
+    ::decode(osd_hb_addr, p);
     ::decode(osd_info, p);
-
     ::decode(pool_name, p);
     name_pool.clear();
     for (map<int,nstring>::iterator i = pool_name.begin(); i != pool_name.end(); i++)