From: Sage Weil Date: Fri, 20 May 2011 21:45:36 +0000 (-0700) Subject: osd: more heartbeat rework X-Git-Tag: v0.28.1~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a51bf3e9df027bb9ed58679666ee4207b4185961;p=ceph.git osd: more heartbeat rework A few things: - track Connection* instead of entity_inst_t for hb peers - we can only send maps over the cluster_messenger - if peer is still alive, do that - if peer is not, send dying MOSDPing ping with YOU_DIED flag --- diff --git a/src/messages/MOSDPing.h b/src/messages/MOSDPing.h index f27122bea66..d6b40768ed4 100644 --- a/src/messages/MOSDPing.h +++ b/src/messages/MOSDPing.h @@ -23,13 +23,27 @@ class MOSDPing : public Message { public: + enum { + HEARTBEAT = 0, + REQUEST_HEARTBEAT = 1, + YOU_DIED = 2, + }; + const char *get_op_name(int op) { + switch (op) { + case HEARTBEAT: return "heartbeat"; + case REQUEST_HEARTBEAT: return "request_heartbeat"; + case YOU_DIED: return "you_died"; + default: return "???"; + } + } + ceph_fsid_t fsid; epoch_t map_epoch, peer_as_of_epoch; - bool ack; + __u8 op; osd_peer_stat_t peer_stat; - MOSDPing(const ceph_fsid_t& f, epoch_t e, epoch_t pe, osd_peer_stat_t& ps, bool a=false) : - Message(MSG_OSD_PING), fsid(f), map_epoch(e), peer_as_of_epoch(pe), ack(a), peer_stat(ps) { } + MOSDPing(const ceph_fsid_t& f, epoch_t e, epoch_t pe, osd_peer_stat_t& ps, __u8 o=HEARTBEAT) : + Message(MSG_OSD_PING), fsid(f), map_epoch(e), peer_as_of_epoch(pe), op(o), peer_stat(ps) { } MOSDPing() {} private: ~MOSDPing() {} @@ -40,23 +54,21 @@ public: ::decode(fsid, p); ::decode(map_epoch, p); ::decode(peer_as_of_epoch, p); - ::decode(ack, p); + ::decode(op, p); ::decode(peer_stat, p); } void encode_payload() { ::encode(fsid, payload); ::encode(map_epoch, payload); ::encode(peer_as_of_epoch, payload); - ::encode(ack, payload); + ::encode(op, payload); ::encode(peer_stat, payload); } const char *get_type_name() { return "osd_ping"; } void print(ostream& out) { - out << "osd_ping(e" << map_epoch << " as_of " << peer_as_of_epoch; - if (ack) - out << " ACK"; - out << ")"; + out << "osd_ping(e" << map_epoch << " as_of " << peer_as_of_epoch + << " " << get_op_name(op) << ")"; } }; diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 316e4ae1526..f564d2dda84 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -1408,10 +1408,10 @@ void OSD::update_heartbeat_peers() old_from_stamp.swap(heartbeat_from_stamp); map old_to, old_from; - map old_inst; + map old_con; old_to.swap(heartbeat_to); old_from.swap(heartbeat_from); - old_inst.swap(heartbeat_inst); + old_con.swap(heartbeat_con); utime_t now = g_clock.now(); @@ -1422,10 +1422,11 @@ void OSD::update_heartbeat_peers() p != old_to.end(); p++) { if (p->second > osdmap->get_epoch()) { - dout(10) << "update_heartbeat_peers: keeping newer _to peer " << old_inst[p->first] + dout(10) << "update_heartbeat_peers: keeping newer _to peer osd" << p->first + << " " << old_con[p->first]->get_peer_addr() << " as of " << p->second << dendl; heartbeat_to[p->first] = p->second; - heartbeat_inst[p->first] = old_inst[p->first]; + heartbeat_con[p->first] = old_con[p->first]; } } @@ -1442,9 +1443,10 @@ void OSD::update_heartbeat_peers() if (heartbeat_to.count(p)) continue; heartbeat_to[p] = osdmap->get_epoch(); - heartbeat_inst[p] = osdmap->get_hb_inst(p); - if (old_to.count(p) == 0 || old_inst[p] != heartbeat_inst[p]) - dout(10) << "update_heartbeat_peers: new _to osd" << p << " " << heartbeat_inst[p] << dendl; + heartbeat_con[p] = heartbeat_messenger->get_connection(osdmap->get_hb_inst(p)); + if (old_to.count(p) == 0 || old_con[p] != heartbeat_con[p]) + dout(10) << "update_heartbeat_peers: new _to osd" << p + << " " << heartbeat_con[p]->get_peer_addr() << dendl; } else if (pg->get_role() == 0) { assert(pg->acting[0] == whoami); @@ -1454,74 +1456,91 @@ void OSD::update_heartbeat_peers() if (heartbeat_from.count(p)) continue; heartbeat_from[p] = osdmap->get_epoch(); - heartbeat_inst[p] = osdmap->get_hb_inst(p); + heartbeat_con[p] = heartbeat_messenger->get_connection(osdmap->get_hb_inst(p)); if (old_from_stamp.count(p) && old_from.count(p) && - old_inst[p] == heartbeat_inst[p]) { + old_con[p] == heartbeat_con[p]) { // have a stamp _AND_ i'm not new to the set heartbeat_from_stamp[p] = old_from_stamp[p]; } else { - dout(10) << "update_heartbeat_peers: new _from osd" << p << " " << heartbeat_inst[p] << dendl; + dout(10) << "update_heartbeat_peers: new _from osd" << p + << " " << heartbeat_con[p]->get_peer_addr() << dendl; heartbeat_from_stamp[p] = now; - MOSDPing *m = new MOSDPing(osdmap->get_fsid(), 0, heartbeat_epoch, my_stat, true); // request hb - heartbeat_messenger->send_message(m, heartbeat_inst[p]); + MOSDPing *m = new MOSDPing(osdmap->get_fsid(), 0, heartbeat_epoch, my_stat, + MOSDPing::REQUEST_HEARTBEAT); + heartbeat_messenger->send_message(m, heartbeat_con[p]); } } } } + + map down; + for (map::iterator p = old_to.begin(); p != old_to.end(); p++) { - assert(old_inst.count(p->first)); - if (heartbeat_to.count(p->first) && heartbeat_inst[p->first] == old_inst[p->first]) + assert(old_con.count(p->first)); + if (heartbeat_to.count(p->first) && heartbeat_con[p->first] == old_con[p->first]) continue; assert(p->second <= osdmap->get_epoch()); // share latest map with this peer so they know not to expect // heartbeats from us. otherwise they may mark us down! - dout(10) << "update_heartbeat_peers: sharing map with old _to peer " << old_inst[p->first] - << " as of " << p->second << dendl; - _share_map_outgoing(old_inst[p->first]); + if (osdmap->is_up(p->first)) { + dout(10) << "update_heartbeat_peers: sharing map with old _to peer osd" << p->first << dendl; + _share_map_outgoing(osdmap->get_cluster_inst(p->first)); + } - if (heartbeat_from.count(p->first) && old_inst[p->first] == heartbeat_inst[p->first]) { - dout(10) << "update_heartbeat_peers: old _to peer " << old_inst[p->first] + if (heartbeat_from.count(p->first) && old_con[p->first] == heartbeat_con[p->first]) { + dout(10) << "update_heartbeat_peers: old _to peer osd" << p->first + << " " << old_con[p->first]->get_peer_addr() << " is still a _from peer, not marking down" << dendl; } else { - dout(10) << "update_heartbeat_peers: marking down old _to peer " << old_inst[p->first] + dout(10) << "update_heartbeat_peers: will mark down old _to peer osd" << p->first + << " " << old_con[p->first]->get_peer_addr() << " as of " << p->second << dendl; - Connection *con = heartbeat_messenger->get_connection(old_inst[p->first]); - heartbeat_messenger->mark_disposable(con); - heartbeat_messenger->mark_down_on_empty(con); - con->put(); - if (!osdmap->is_up(p->first)) - forget_peer_epoch(p->first, osdmap->get_epoch()); + down[p->first] = old_con[p->first]; } } for (map::iterator p = old_from.begin(); p != old_from.end(); p++) { - assert(old_inst.count(p->first)); - if (heartbeat_from.count(p->first) && heartbeat_inst[p->first] == old_inst[p->first]) + assert(old_con.count(p->first)); + if (heartbeat_from.count(p->first) && heartbeat_con[p->first] == old_con[p->first]) continue; // share latest map with this peer, just to be nice. - dout(10) << "update_heartbeat_peers: sharing map with old _from peer " << old_inst[p->first] - << dendl; - _share_map_outgoing(old_inst[p->first]); + if (osdmap->is_up(p->first)) { + dout(10) << "update_heartbeat_peers: sharing map with old _from peer osd" << p->first << dendl; + _share_map_outgoing(osdmap->get_cluster_inst(p->first)); + } - if (heartbeat_to.count(p->first) && old_inst[p->first] == heartbeat_inst[p->first]) { - dout(10) << "update_heartbeat_peers: old _from peer " << old_inst[p->first] + if (heartbeat_to.count(p->first) && old_con[p->first] == heartbeat_con[p->first]) { + dout(10) << "update_heartbeat_peers: old _from peer osd" << p->first + << " " << old_con[p->first]->get_peer_addr() << " is still a _to peer, not marking down" << dendl; } else { - dout(10) << "update_heartbeat_peers: marking down old _from peer " << old_inst[p->first] + dout(10) << "update_heartbeat_peers: will mark down old _from peer osd" << p->first + << " " << old_con[p->first]->get_peer_addr() << " as of " << p->second << dendl; - Connection *con = heartbeat_messenger->get_connection(old_inst[p->first]); - heartbeat_messenger->mark_disposable(con); - heartbeat_messenger->mark_down_on_empty(con); - con->put(); - if (!osdmap->is_up(p->first)) - forget_peer_epoch(p->first, osdmap->get_epoch()); + down[p->first] = old_con[p->first]; } } + for (map::iterator p = down.begin(); p != down.end(); ++p) { + Connection *con = p->second; + heartbeat_messenger->mark_disposable(con); + if (!osdmap->is_up(p->first)) { + dout(10) << "update_heartbeat_peers: telling old peer osd" << p->first + << " " << old_con[p->first]->get_peer_addr() + << " they are down" << dendl; + heartbeat_messenger->send_message(new MOSDPing(osdmap->get_fsid(), heartbeat_epoch, + heartbeat_epoch, my_stat, + MOSDPing::YOU_DIED), con); + } + heartbeat_messenger->mark_down_on_empty(con); + con->put(); + if (!osdmap->is_up(p->first)) + forget_peer_epoch(p->first, osdmap->get_epoch()); + } dout(10) << "update_heartbeat_peers: hb to: " << heartbeat_to << dendl; dout(10) << "update_heartbeat_peers: hb from: " << heartbeat_from << dendl; @@ -1545,7 +1564,10 @@ void OSD::reset_heartbeat_peers() heartbeat_to.clear(); heartbeat_from.clear(); heartbeat_from_stamp.clear(); - heartbeat_inst.clear(); + while (!heartbeat_con.empty()) { + heartbeat_con.begin()->second->put(); + heartbeat_con.erase(heartbeat_con.begin()); + } failure_queue.clear(); heartbeat_lock.Unlock(); @@ -1565,7 +1587,8 @@ void OSD::handle_osd_ping(MOSDPing *m) bool locked = map_lock.try_get_read(); - if (m->ack) { + switch (m->op) { + case MOSDPing::REQUEST_HEARTBEAT: if (heartbeat_to.count(from) && m->peer_as_of_epoch <= heartbeat_to[from]) { dout(5) << "handle_osd_ping ignoring peer " << m->get_source_inst() << " request for heartbeats as_of " << m->peer_as_of_epoch @@ -1574,41 +1597,52 @@ void OSD::handle_osd_ping(MOSDPing *m) dout(5) << "handle_osd_ping peer " << m->get_source_inst() << " requesting heartbeats as_of " << m->peer_as_of_epoch << dendl; heartbeat_to[from] = m->peer_as_of_epoch; - heartbeat_inst[from] = m->get_source_inst(); + heartbeat_con[from] = m->get_connection(); + heartbeat_con[from]->get(); if (locked && m->map_epoch && !is_booting()) _share_map_incoming(m->get_source_inst(), m->map_epoch, (Session*) m->get_connection()->get_priv()); } - } + break; - if (heartbeat_from.count(from) && - heartbeat_inst[from] == m->get_source_inst()) { - // only take peer stat or share map now if map_lock is uncontended - if (locked) { - dout(20) << "handle_osd_ping " << m->get_source_inst() - << " took stat " << m->peer_stat << dendl; - if (m->map_epoch && !is_booting()) - _share_map_incoming(m->get_source_inst(), m->map_epoch, - (Session*) m->get_connection()->get_priv()); - take_peer_stat(from, m->peer_stat); // only with map_lock held! + case MOSDPing::HEARTBEAT: + if (heartbeat_from.count(from) && + heartbeat_con[from] == m->get_connection()) { + // only take peer stat or share map now if map_lock is uncontended + if (locked) { + dout(20) << "handle_osd_ping " << m->get_source_inst() + << " took stat " << m->peer_stat << dendl; + if (m->map_epoch && !is_booting()) + _share_map_incoming(m->get_source_inst(), m->map_epoch, + (Session*) m->get_connection()->get_priv()); + take_peer_stat(from, m->peer_stat); // only with map_lock held! + } else { + dout(20) << "handle_osd_ping " << m->get_source_inst() + << " dropped stat " << m->peer_stat << dendl; + } + + note_peer_epoch(from, m->map_epoch); + + heartbeat_from_stamp[from] = g_clock.now(); // don't let _my_ lag interfere. + + // remove from failure lists if needed + if (failure_pending.count(from)) { + send_still_alive(from); + failure_pending.erase(from); + } + failure_queue.erase(from); } else { - dout(20) << "handle_osd_ping " << m->get_source_inst() - << " dropped stat " << m->peer_stat << dendl; + dout(10) << "handle_osd_ping ignoring " << m->get_source_inst() << dendl; } + break; - note_peer_epoch(from, m->map_epoch); - - heartbeat_from_stamp[from] = g_clock.now(); // don't let _my_ lag interfere. - - // remove from failure lists if needed - if (failure_pending.count(from)) { - send_still_alive(from); - failure_pending.erase(from); - } - failure_queue.erase(from); - } else { - dout(10) << "handle_osd_ping ignoring " << m->get_source_inst() << dendl; + case MOSDPing::YOU_DIED: + dout(10) << "handle_osd_ping " << m->get_source_inst() << " says i am down in " << m->map_epoch + << dendl; + monc->sub_want("osdmap", m->map_epoch, CEPH_SUBSCRIBE_ONETIME); + monc->renew_subs(); + break; } if (locked) @@ -1616,6 +1650,7 @@ void OSD::handle_osd_ping(MOSDPing *m) heartbeat_lock.Unlock(); m->put(); + } void OSD::heartbeat_entry() @@ -1707,7 +1742,7 @@ void OSD::heartbeat() i != heartbeat_to.end(); i++) { int peer = i->first; - if (heartbeat_inst.count(peer)) { + if (heartbeat_con.count(peer)) { my_stat_on_peer[peer] = my_stat; dout(30) << "heartbeat allocating ping for osd" << peer << dendl; Message *m = new MOSDPing(osdmap->get_fsid(), @@ -1716,7 +1751,7 @@ void OSD::heartbeat() my_stat); m->set_priority(CEPH_MSG_PRIO_HIGH); dout(30) << "heartbeat sending ping to osd" << peer << dendl; - heartbeat_messenger->send_message(m, heartbeat_inst[peer]); + heartbeat_messenger->send_message(m, heartbeat_con[peer]); } } @@ -2361,13 +2396,6 @@ void OSD::forget_peer_epoch(int peer, epoch_t as_of) } } -/* - * share incremental maps with peers. it's possible we could send - * incremental 3->4 on cluster msgr - * incremental 4->5 on heartbeat msgr - * and the second incremental would arrive first. who cares. the peer - * will figure it out sooner or later. - */ bool OSD::_share_map_incoming(const entity_inst_t& inst, epoch_t epoch, Session* session) @@ -2432,8 +2460,10 @@ void OSD::_share_map_outgoing(const entity_inst_t& inst) if (pe < osdmap->get_epoch()) { send_incremental_map(pe, inst); note_peer_epoch(peer, osdmap->get_epoch()); - } + } else + dout(20) << "_share_map_outgoing " << inst << " already has epoch " << pe << dendl; } else { + dout(20) << "_share_map_outgoing " << inst << " don't know epoch, doing nothing" << dendl; // no idea about peer's epoch. // ??? send recent ??? // do nothing. @@ -3392,11 +3422,8 @@ void OSD::activate_map(ObjectStore::Transaction& t, list& tfin) } -void OSD::send_incremental_map(epoch_t since, const entity_inst_t& inst, bool lazy) +MOSDMap *OSD::build_incremental_map_msg(epoch_t since) { - dout(10) << "send_incremental_map " << since << " -> " << osdmap->get_epoch() - << " to " << inst << dendl; - MOSDMap *m = new MOSDMap(monc->get_fsid()); for (epoch_t e = osdmap->get_epoch(); @@ -3413,6 +3440,15 @@ void OSD::send_incremental_map(epoch_t since, const entity_inst_t& inst, bool la assert(0); // we should have all maps. } } + return m; +} + +void OSD::send_incremental_map(epoch_t since, const entity_inst_t& inst, bool lazy) +{ + dout(10) << "send_incremental_map " << since << " -> " << osdmap->get_epoch() + << " to " << inst << dendl; + + MOSDMap *m = build_incremental_map_msg(since); Messenger *msgr = client_messenger; if (entity_name_t::TYPE_OSD == inst.name._type) msgr = cluster_messenger; diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 5fdd78e35a3..8572f0693f9 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -238,7 +238,7 @@ private: epoch_t heartbeat_epoch; map heartbeat_to, heartbeat_from; map heartbeat_from_stamp; - map heartbeat_inst; + map heartbeat_con; utime_t last_mon_heartbeat; Messenger *heartbeat_messenger; @@ -491,6 +491,7 @@ private: bool get_inc_map_bl(epoch_t e, bufferlist& bl); bool get_inc_map(epoch_t e, OSDMap::Incremental &inc); + MOSDMap *build_incremental_map_msg(epoch_t since); void send_incremental_map(epoch_t since, const entity_inst_t& inst, bool lazy=false); protected: diff --git a/src/vstart.sh b/src/vstart.sh index 845c5fc1ec3..887c7fedbac 100755 --- a/src/vstart.sh +++ b/src/vstart.sh @@ -161,7 +161,7 @@ else debug ms = 1' COSDDEBUG=' lockdep = 1 - debug ms = 1 + debug ms = 20 debug osd = 25 debug monc = 20 debug journal = 20