From 1ebcebf6fff056a0c0bdf82dde69356e271be27e Mon Sep 17 00:00:00 2001 From: Greg Farnum Date: Fri, 4 Dec 2009 14:37:28 -0800 Subject: [PATCH] osd: Now uses separate messenger for heartbeats. Includes changes to OSDMonitor and OSDMap so things keep working --- src/cosd.cc | 10 ++++++++-- src/messages/MOSDBoot.h | 8 ++++++-- src/mon/OSDMonitor.cc | 1 + src/osd/OSD.cc | 3 ++- src/osd/OSDMap.cc | 2 +- src/osd/OSDMap.h | 31 +++++++++++++++++++++++-------- 6 files changed, 41 insertions(+), 14 deletions(-) diff --git a/src/cosd.cc b/src/cosd.cc index eee8d71841ed9..f4267a89736a0 100644 --- a/src/cosd.cc +++ b/src/cosd.cc @@ -135,8 +135,12 @@ int main(int argc, const char **argv) } // start up network - SimpleMessenger rank; + g_my_addr.ss_addr() = mc.get_my_addr().ss_addr(); + g_my_addr.set_port(0); + + SimpleMessenger rank, rank_hb; rank.bind(); + rank_hb.bind(); cout << "starting osd" << whoami << " at " << rank.get_rank_addr() @@ -151,7 +155,7 @@ int main(int argc, const char **argv) assert_warn(m); if (!m) return 1; - Messenger *hbm = rank.register_entity(entity_name_t::OSD(whoami)); + Messenger *hbm = rank_hb.register_entity(entity_name_t::OSD(whoami)); assert_warn(hbm); if (!hbm) return 1; @@ -161,6 +165,7 @@ int main(int argc, const char **argv) rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless_peer()); rank.start(); + rank_hb.start(true); // only need to daemon() once // start osd OSD *osd = new OSD(whoami, m, hbm, &mc, g_conf.osd_data, g_conf.osd_journal, mkjournal); @@ -170,6 +175,7 @@ int main(int argc, const char **argv) } rank.wait(); + rank_hb.wait(); // done delete osd; diff --git a/src/messages/MOSDBoot.h b/src/messages/MOSDBoot.h index ca4ad07009299..4cde49af9a27d 100644 --- a/src/messages/MOSDBoot.h +++ b/src/messages/MOSDBoot.h @@ -23,10 +23,12 @@ class MOSDBoot : public PaxosServiceMessage { public: OSDSuperblock sb; + entity_addr_t hb_addr; MOSDBoot() : PaxosServiceMessage( MSG_OSD_BOOT, 0){} - MOSDBoot(OSDSuperblock& s) : - PaxosServiceMessage(MSG_OSD_BOOT, s.current_epoch), sb(s) { + MOSDBoot(OSDSuperblock& s, entity_addr_t& hb_addr_ref) : + PaxosServiceMessage(MSG_OSD_BOOT, s.current_epoch), + sb(s), hb_addr(hb_addr_ref) { } const char *get_type_name() { return "osd_boot"; } @@ -37,11 +39,13 @@ class MOSDBoot : public PaxosServiceMessage { void encode_payload() { paxos_encode(); ::encode(sb, payload); + ::encode(hb_addr, payload); } void decode_payload() { bufferlist::iterator p = payload.begin(); paxos_decode(p); ::decode(sb, p); + ::decode(hb_addr, p); } }; diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc index a7d94f69411ac..af6c3378f097f 100644 --- a/src/mon/OSDMonitor.cc +++ b/src/mon/OSDMonitor.cc @@ -437,6 +437,7 @@ bool OSDMonitor::prepare_boot(MOSDBoot *m) // mark new guy up. down_pending_out.erase(from); // if any pending_inc.new_up[from] = m->get_orig_source_addr(); + pending_inc.new_hb_up[from] = m->hb_addr; // mark in? pending_inc.new_weight[from] = CEPH_OSD_IN; diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index f808dcf79227d..f79b0de093b12 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -1264,7 +1264,8 @@ void OSD::ms_handle_connect(Connection *con) void OSD::send_boot() { dout(10) << "send_boot" << dendl; - monc->send_mon_message(new MOSDBoot(superblock)); + entity_addr_t hb_addr = heartbeat_messenger->get_myaddr(); + monc->send_mon_message(new MOSDBoot(superblock, hb_addr)); } void OSD::queue_want_up_thru(epoch_t want) diff --git a/src/osd/OSDMap.cc b/src/osd/OSDMap.cc index 334950ebb63f0..54d6d44625bd0 100644 --- a/src/osd/OSDMap.cc +++ b/src/osd/OSDMap.cc @@ -52,7 +52,7 @@ void OSDMap::print(ostream& out) << " down_at " << info.down_at << " last_clean " << info.last_clean_first << "-" << info.last_clean_last << ")"; if (is_up(i)) - out << " " << get_addr(i); + out << " " << get_addr(i) << " " << get_hb_addr(i); out << "\n"; } } diff --git a/src/osd/OSDMap.h b/src/osd/OSDMap.h index 959390aca6acb..139a179b4bb1f 100644 --- a/src/osd/OSDMap.h +++ b/src/osd/OSDMap.h @@ -155,10 +155,11 @@ public: map new_blacklist; vector old_blacklist; + map new_hb_up; void encode(bufferlist& bl) { // base - __u16 v = 2; + __u16 v = 3; ::encode(v, bl); ::encode(fsid, bl); ::encode(epoch, bl); @@ -176,6 +177,7 @@ public: ::encode(new_pg_temp, bl); // extended + ::encode(new_hb_up, bl); ::encode(new_pool_names, bl); ::encode(new_up_thru, bl); ::encode(new_last_clean_interval, bl); @@ -204,6 +206,8 @@ public: ::decode(new_pg_temp, p); // extended + if (v >= 3) + ::decode(new_hb_up, p); ::decode(new_pool_names, p); ::decode(new_up_thru, p); ::decode(new_last_clean_interval, p); @@ -240,6 +244,7 @@ private: int32_t max_osd; vector osd_state; vector osd_addr; + vector osd_hb_addr; vector<__u32> osd_weight; // 16.16 fixed point, 0x10000 = "in", 0 = "out" vector osd_info; map > pg_temp; // temp pg mapping (e.g. while we rebuild) @@ -301,6 +306,7 @@ private: osd_weight[o] = CEPH_OSD_OUT; } osd_addr.resize(m); + osd_hb_addr.resize(m); } void get_all_osds(set& ls) { @@ -390,6 +396,10 @@ private: assert(exists(osd)); return osd_addr[osd]; } + const entity_addr_t &get_hb_addr(int osd) { + assert(exists(osd)); + return osd_hb_addr[osd]; + } entity_inst_t get_inst(int osd) { assert(exists(osd) && is_up(osd)); return entity_inst_t(entity_name_t::OSD(osd), @@ -406,8 +416,7 @@ private: entity_inst_t get_hb_inst(int osd) { assert(exists(osd)); entity_inst_t i(entity_name_t::OSD(osd), - osd_addr[osd]); - i.addr.erank = i.addr.erank + 1; // heartbeat addr erank is regular addr erank + 1 + osd_hb_addr[osd]); return i; } @@ -493,12 +502,18 @@ private: //cout << "epoch " << epoch << " down osd" << i->first << endl; } for (map::iterator i = inc.new_up.begin(); - i != inc.new_up.end(); + i != inc.new_up.end(); i++) { osd_state[i->first] |= CEPH_OSD_UP; osd_addr[i->first] = i->second; + if (inc.new_hb_up.empty()) { + //this is a backward-compatibility hack + osd_hb_addr[i->first] = i->second; + osd_hb_addr[i->first].erank = osd_hb_addr[i->first].erank + 1; + } + else osd_hb_addr[i->first] = inc.new_hb_up[i->first]; osd_info[i->first].up_from = epoch; - //cout << "epoch " << epoch << " up osd" << i->first << " at " << i->second << endl; + //cout << "epoch " << epoch << " up osd" << i->first << " at " << i->second << "with hb addr" << osd_hb_addr[i->first] << std::endl; } // info @@ -542,7 +557,7 @@ private: // serialize, unserialize void encode(bufferlist& bl) { - __u16 v = 2; + __u16 v = 3; ::encode(v, bl); // base @@ -572,9 +587,9 @@ private: ::encode(cbl, bl); // extended + ::encode(osd_hb_addr, bl); ::encode(osd_info, bl); ::encode(pool_name, bl); - ::encode(blacklist, bl); } @@ -609,8 +624,8 @@ private: crush.decode(cblp); // extended + ::decode(osd_hb_addr, p); ::decode(osd_info, p); - ::decode(pool_name, p); name_pool.clear(); for (map::iterator i = pool_name.begin(); i != pool_name.end(); i++) -- 2.39.5