From: Sage Weil Date: Thu, 22 Mar 2012 14:50:44 +0000 (-0700) Subject: osd: simplify heartbeat logic X-Git-Tag: v0.45~28^2~6 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=4e2f0d149d9bb9310dde12c201417d7747c52a85;p=ceph.git osd: simplify heartbeat logic Simplify heartbeats to use a simple request/reply model. - avoid any weirdness with map update timing - no from/to distinction - lossy client/server model Signed-off-by: Sage Weil --- diff --git a/src/ceph_osd.cc b/src/ceph_osd.cc index 340fecefd90e..e1e51190cce3 100644 --- a/src/ceph_osd.cc +++ b/src/ceph_osd.cc @@ -355,6 +355,11 @@ int main(int argc, const char **argv) cluster_messenger->set_policy(entity_name_t::TYPE_CLIENT, Messenger::Policy::stateless_server(0, 0)); + messenger_hbin->set_policy(entity_name_t::TYPE_OSD, + Messenger::Policy::client(0, 0)); + messenger_hbout->set_policy(entity_name_t::TYPE_OSD, + Messenger::Policy::stateless_server(0, 0)); + r = client_messenger->bind(g_conf->public_addr); if (r < 0) exit(1); diff --git a/src/messages/MOSDPing.h b/src/messages/MOSDPing.h index 6c3d7e8b3878..4451a478b1f7 100644 --- a/src/messages/MOSDPing.h +++ b/src/messages/MOSDPing.h @@ -22,12 +22,18 @@ class MOSDPing : public Message { + + static const int HEAD_VERSION = 2; + static const int COMPAT_VERSION = 1; + public: enum { HEARTBEAT = 0, START_HEARTBEAT = 1, YOU_DIED = 2, STOP_HEARTBEAT = 3, + PING = 4, + PING_REPLY = 5, }; const char *get_op_name(int op) const { switch (op) { @@ -35,6 +41,8 @@ class MOSDPing : public Message { case START_HEARTBEAT: return "start_heartbeat"; case STOP_HEARTBEAT: return "stop_heartbeat"; case YOU_DIED: return "you_died"; + case PING: return "ping"; + case PING_REPLY: return "ping_reply"; default: return "???"; } } @@ -43,12 +51,15 @@ class MOSDPing : public Message { epoch_t map_epoch, peer_as_of_epoch; __u8 op; osd_peer_stat_t peer_stat; + utime_t stamp; - MOSDPing(const uuid_d& f, epoch_t e, epoch_t pe, __u8 o) : - Message(MSG_OSD_PING), fsid(f), map_epoch(e), peer_as_of_epoch(pe), op(o) { } - MOSDPing(const uuid_d& f, epoch_t e, epoch_t pe, osd_peer_stat_t& ps, __u8 o=HEARTBEAT) : - Message(MSG_OSD_PING), fsid(f), map_epoch(e), peer_as_of_epoch(pe), op(o), peer_stat(ps) { } - MOSDPing() : Message(MSG_OSD_PING) {} + MOSDPing(const uuid_d& f, epoch_t e, __u8 o, utime_t s) + : Message(MSG_OSD_PING, HEAD_VERSION, COMPAT_VERSION), + fsid(f), map_epoch(e), peer_as_of_epoch(0), op(o), stamp(s) + { } + MOSDPing() + : Message(MSG_OSD_PING, HEAD_VERSION, COMPAT_VERSION) + {} private: ~MOSDPing() {} @@ -60,6 +71,8 @@ public: ::decode(peer_as_of_epoch, p); ::decode(op, p); ::decode(peer_stat, p); + if (header.version >= 2) + ::decode(stamp, p); } void encode_payload(uint64_t features) { ::encode(fsid, payload); @@ -67,11 +80,16 @@ public: ::encode(peer_as_of_epoch, payload); ::encode(op, payload); ::encode(peer_stat, payload); + ::encode(stamp, payload); } const char *get_type_name() const { return "osd_ping"; } void print(ostream& out) const { - out << "osd_ping(" << get_op_name(op) << " e" << map_epoch << " as_of " << peer_as_of_epoch << ")"; + out << "osd_ping(" << get_op_name(op) + << " e" << map_epoch + //<< " as_of " << peer_as_of_epoch + << " stamp " << stamp + << ")"; } }; diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 7ef3b8d7f461..0361f5905acf 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -1430,37 +1430,29 @@ void OSD::update_osd_stat() osd_stat.kb_avail = stbuf.f_bavail * stbuf.f_bsize / 1024; osd_stat.hb_in.clear(); - for (map::iterator p = heartbeat_from.begin(); p != heartbeat_from.end(); p++) + for (map::iterator p = heartbeat_peers.begin(); p != heartbeat_peers.end(); p++) osd_stat.hb_in.push_back(p->first); osd_stat.hb_out.clear(); - for (map::iterator p = heartbeat_to.begin(); p != heartbeat_to.end(); p++) - osd_stat.hb_out.push_back(p->first); dout(20) << "update_osd_stat " << osd_stat << dendl; } -void OSD::_add_heartbeat_source(int p, map& old_from, map& old_from_stamp, - map& old_con) +void OSD::_add_heartbeat_peer(int p) { if (p == whoami) return; - if (heartbeat_from.count(p)) - return; - - heartbeat_from[p] = osdmap->get_epoch(); - Connection *con = hbin_messenger->get_connection(osdmap->get_hb_inst(p)); - heartbeat_from_con[p] = con; - if (old_from_stamp.count(p) && old_from.count(p) && old_con[p] == con) { - // have a stamp _AND_ i'm not new to the set - heartbeat_from_stamp[p] = old_from_stamp[p]; + HeartbeatInfo *hi; + + map::iterator i = heartbeat_peers.find(p); + if (i == heartbeat_peers.end()) { + hi = &heartbeat_peers[p]; + hi->con = hbin_messenger->get_connection(osdmap->get_hb_inst(p)); + dout(10) << "_add_heartbeat_peer: new peer osd." << p + << " " << hi->con->get_peer_addr() << dendl; } else { - dout(10) << "update_heartbeat_peers: new _from osd." << p - << " " << con->get_peer_addr() << dendl; - heartbeat_from_stamp[p] = ceph_clock_now(g_ceph_context); - MOSDPing *m = new MOSDPing(monc->get_fsid(), 0, heartbeat_epoch, - MOSDPing::START_HEARTBEAT); - hbin_messenger->send_message(m, con); + hi = &i->second; } + hi->epoch = osdmap->get_epoch(); } void OSD::need_heartbeat_peer_update() @@ -1480,16 +1472,6 @@ void OSD::maybe_update_heartbeat_peers() return; heartbeat_need_update = false; - // filter heartbeat_from_stamp to only include osds that remain in - // heartbeat_from. - map old_from_stamp; - old_from_stamp.swap(heartbeat_from_stamp); - - map old_from; - map old_con; - old_from.swap(heartbeat_from); - old_con.swap(heartbeat_from_con); - heartbeat_epoch = osdmap->get_epoch(); // build heartbeat from set @@ -1503,73 +1485,37 @@ void OSD::maybe_update_heartbeat_peers() p != pg->heartbeat_peers.end(); ++p) if (osdmap->is_up(*p)) - _add_heartbeat_source(*p, old_from, old_from_stamp, old_con); + _add_heartbeat_peer(*p); pg->heartbeat_peer_lock.Unlock(); } - for (map::iterator p = old_from.begin(); - p != old_from.end(); - p++) { - assert(old_con.count(p->first)); - Connection *con = old_con[p->first]; - - if (heartbeat_from.count(p->first) && heartbeat_from_con[p->first] == con) { - con->put(); - continue; - } - - // share latest map with this peer, just to be nice. - if (osdmap->is_up(p->first) && is_active()) { - dout(10) << "update_heartbeat_peers: sharing map with old _from peer osd." << p->first << dendl; - _share_map_outgoing(osdmap->get_cluster_inst(p->first)); - } - - dout(10) << "update_heartbeat_peers: will mark down old _from peer osd." << p->first - << " " << con->get_peer_addr() - << " as of " << p->second << dendl; - - if (!osdmap->is_up(p->first) && is_active()) { - dout(10) << "update_heartbeat_peers: telling old peer osd." << p->first - << " " << old_con[p->first]->get_peer_addr() - << " they are down" << dendl; - hbin_messenger->send_message(new MOSDPing(monc->get_fsid(), heartbeat_epoch, - heartbeat_epoch, - MOSDPing::YOU_DIED), con); - hbin_messenger->mark_disposable(con); - hbin_messenger->mark_down_on_empty(con); + map::iterator p = heartbeat_peers.begin(); + while (p != heartbeat_peers.end()) { + if (p->second.epoch < osdmap->get_epoch()) { + dout(20) << " removing heartbeat peer osd." << p->first + << " " << p->second.con->get_peer_addr() + << dendl; + hbin_messenger->mark_down(p->second.con); + p->second.con->put(); + heartbeat_peers.erase(p++); } else { - // tell them to stop sending heartbeats - hbin_messenger->send_message(new MOSDPing(monc->get_fsid(), 0, heartbeat_epoch, - MOSDPing::STOP_HEARTBEAT), con); - } - if (!osdmap->is_up(p->first)) { - forget_peer_epoch(p->first, osdmap->get_epoch()); + ++p; } - con->put(); } - - dout(10) << "update_heartbeat_peers: hb to: " << heartbeat_to << dendl; - dout(10) << "update_heartbeat_peers: hb from: " << heartbeat_from << dendl; + dout(10) << "maybe_update_heartbeat_peers " << heartbeat_peers.size() << " peers" << dendl; } void OSD::reset_heartbeat_peers() { dout(10) << "reset_heartbeat_peers" << dendl; heartbeat_lock.Lock(); - heartbeat_to.clear(); - heartbeat_from.clear(); - heartbeat_from_stamp.clear(); - while (!heartbeat_to_con.empty()) { - heartbeat_to_con.begin()->second->put(); - heartbeat_to_con.erase(heartbeat_to_con.begin()); - } - while (!heartbeat_from_con.empty()) { - heartbeat_from_con.begin()->second->put(); - heartbeat_from_con.erase(heartbeat_from_con.begin()); + while (!heartbeat_peers.empty()) { + hbin_messenger->mark_down(heartbeat_peers.begin()->second.con); + heartbeat_peers.begin()->second.con->put(); + heartbeat_peers.erase(heartbeat_peers.begin()); } failure_queue.clear(); heartbeat_lock.Unlock(); - } void OSD::handle_osd_ping(MOSDPing *m) @@ -1581,91 +1527,42 @@ void OSD::handle_osd_ping(MOSDPing *m) return; } - heartbeat_lock.Lock(); int from = m->get_source().num(); + + heartbeat_lock.Lock(); + bool locked = map_lock.try_get_read(); switch (m->op) { - case MOSDPing::START_HEARTBEAT: - if (heartbeat_to.count(from)) { - if (heartbeat_to_con[from] != m->get_connection()) { - // different connection - if (heartbeat_to[from] > m->peer_as_of_epoch) { - dout(5) << "handle_osd_ping marking down peer " << m->get_source_inst() - << " after old start message from epoch " << m->peer_as_of_epoch - << " <= current " << heartbeat_to[from] << dendl; - hbout_messenger->mark_down(m->get_connection()); - } else { - dout(5) << "handle_osd_ping replacing old peer " - << heartbeat_to_con[from]->get_peer_addr() - << " epoch " << heartbeat_to[from] - << " with new peer " << m->get_source_inst() - << " epoch " << m->peer_as_of_epoch - << dendl; - hbout_messenger->mark_down(heartbeat_to_con[from]); - heartbeat_to_con[from]->put(); - - // remember new connection - heartbeat_to[from] = m->peer_as_of_epoch; - heartbeat_to_con[from] = m->get_connection(); - heartbeat_to_con[from]->get(); - } - } else { - // same connection - dout(5) << "handle_osd_ping dup peer " << m->get_source_inst() - << " request for heartbeats as_of " << m->peer_as_of_epoch - << ", same connection" << dendl; - heartbeat_to[from] = m->peer_as_of_epoch; - } - } else { - // new - dout(5) << "handle_osd_ping peer " << m->get_source_inst() - << " requesting heartbeats as_of " << m->peer_as_of_epoch << dendl; - heartbeat_to[from] = m->peer_as_of_epoch; - heartbeat_to_con[from] = m->get_connection(); - heartbeat_to_con[from]->get(); - - if (locked && m->map_epoch && is_active()) { - _share_map_incoming(m->get_source_inst(), m->map_epoch, - (Session*) m->get_connection()->get_priv()); - } - } - break; - case MOSDPing::STOP_HEARTBEAT: + case MOSDPing::PING: { - if (heartbeat_to.count(from) && heartbeat_to_con[from] == m->get_connection()) { - dout(5) << "handle_osd_ping peer " << m->get_source_inst() - << " stopping heartbeats as_of " << m->peer_as_of_epoch << dendl; - heartbeat_to.erase(from); - //hbout_messenger->mark_down(heartbeat_to_con[from]); - heartbeat_to_con[from]->put(); - heartbeat_to_con.erase(from); - } else { - dout(5) << "handle_osd_ping peer " << m->get_source_inst() - << " ignoring stop request as_of " << m->peer_as_of_epoch << dendl; + Message *r = new MOSDPing(monc->get_fsid(), + locked ? osdmap->get_epoch():0, + MOSDPing::PING_REPLY, + m->stamp); + hbout_messenger->send_message(r, m->get_connection()); + + if (osdmap->is_up(from)) { + note_peer_epoch(from, m->map_epoch); + if (locked && is_active()) + _share_map_outgoing(osdmap->get_cluster_inst(from)); } } break; - case MOSDPing::HEARTBEAT: - if (heartbeat_from.count(from) && heartbeat_from_con[from] == m->get_connection()) { - dout(20) << "handle_osd_ping " << m->get_source_inst() << dendl; + case MOSDPing::PING_REPLY: + { + map::iterator i = heartbeat_peers.find(from); + if (i != heartbeat_peers.end()) { + i->second.last_rx = m->stamp; + } - note_peer_epoch(from, m->map_epoch); - if (locked && is_active()) - _share_map_outgoing(osdmap->get_cluster_inst(from)); - - heartbeat_from_stamp[from] = ceph_clock_now(g_ceph_context); // don't let _my_ lag interfere. - - // remove from failure lists if needed - if (failure_pending.count(from)) { - send_still_alive(failure_pending[from]); - failure_pending.erase(from); + if (osdmap->is_up(from)) { + note_peer_epoch(from, m->map_epoch); + if (locked && is_active()) + _share_map_outgoing(osdmap->get_cluster_inst(from)); } - failure_queue.erase(from); - } else { - dout(10) << "handle_osd_ping ignoring " << m->get_source_inst() << dendl; } break; @@ -1682,7 +1579,6 @@ void OSD::handle_osd_ping(MOSDPing *m) heartbeat_lock.Unlock(); m->put(); - } void OSD::heartbeat_entry() @@ -1707,26 +1603,32 @@ void OSD::heartbeat_check() // we should also have map_lock rdlocked. // check for incoming heartbeats (move me elsewhere?) - utime_t grace = ceph_clock_now(g_ceph_context); - grace -= g_conf->osd_heartbeat_grace; - for (map::iterator p = heartbeat_from.begin(); - p != heartbeat_from.end(); + utime_t cutoff = ceph_clock_now(g_ceph_context); + cutoff -= g_conf->osd_heartbeat_grace; + for (map::iterator p = heartbeat_peers.begin(); + p != heartbeat_peers.end(); p++) { - if (heartbeat_from_stamp.count(p->first) && - heartbeat_from_stamp[p->first] < grace) { - derr << "heartbeat_check: no heartbeat from osd." << p->first - << " since " << heartbeat_from_stamp[p->first] - << " (cutoff " << grace << ")" << dendl; - queue_failure(p->first); - + if (p->second.last_rx == utime_t()) { + if (p->second.last_tx > cutoff) + continue; // just started sending recently + derr << "heartbeat_check: no reply from osd." << p->first + << " ever, first ping sent " << p->second.first_tx + << " (cutoff " << cutoff << ")" << dendl; + } else { + if (p->second.last_rx > cutoff) + continue; // got recent reply + derr << "heartbeat_check: no reply from osd." << p->first + << " since " << p->second.last_rx + << " (cutoff " << cutoff << ")" << dendl; } + + // fail! + queue_failure(p->first); } } void OSD::heartbeat() { - utime_t now = ceph_clock_now(g_ceph_context); - dout(30) << "heartbeat" << dendl; // get CPU load avg @@ -1747,18 +1649,23 @@ void OSD::heartbeat() bool map_locked = map_lock.try_get_read(); dout(30) << "heartbeat map_locked=" << map_locked << dendl; + utime_t now = ceph_clock_now(g_ceph_context); + // send heartbeats - for (map::iterator i = heartbeat_to.begin(); - i != heartbeat_to.end(); + for (map::iterator i = heartbeat_peers.begin(); + i != heartbeat_peers.end(); i++) { int peer = i->first; dout(30) << "heartbeat allocating ping for osd." << peer << dendl; Message *m = new MOSDPing(monc->get_fsid(), map_locked ? osdmap->get_epoch():0, - i->second, MOSDPing::HEARTBEAT); - m->set_priority(CEPH_MSG_PRIO_HIGH); + MOSDPing::PING, + now); + i->second.last_tx = now; + if (i->second.first_tx == utime_t()) + i->second.first_tx = now; dout(30) << "heartbeat sending ping to osd." << peer << dendl; - hbout_messenger->send_message(m, heartbeat_to_con[peer]); + hbout_messenger->send_message(m, i->second.con); } if (map_locked) { @@ -1769,13 +1676,13 @@ void OSD::heartbeat() } if (logger) { - logger->set(l_osd_hb_to, heartbeat_to.size()); - logger->set(l_osd_hb_from, heartbeat_from.size()); + logger->set(l_osd_hb_to, heartbeat_peers.size()); + logger->set(l_osd_hb_from, 0); } // hmm.. am i all alone? dout(30) << "heartbeat lonely?" << dendl; - if (heartbeat_from.empty() || heartbeat_to.empty()) { + if (heartbeat_peers.empty()) { if (now - last_mon_heartbeat > g_conf->osd_mon_heartbeat_interval && is_active()) { last_mon_heartbeat = now; dout(10) << "i have no heartbeat peers; checking mon for new map" << dendl; @@ -3111,22 +3018,8 @@ void OSD::note_down_osd(int peer) cluster_messenger->mark_down(osdmap->get_cluster_addr(peer)); heartbeat_lock.Lock(); - - // clean out down osds i am sending heartbeats _to_. - // update_heartbeat_peers() will clean out peers i expect heartbeast _from_. - if (heartbeat_to.count(peer) && - heartbeat_to[peer] < osdmap->get_epoch()) { - dout(10) << "note_down_osd osd." << peer << " marking down hbout connection " - << heartbeat_to_con[peer]->get_peer_addr() << dendl; - hbout_messenger->mark_down(heartbeat_to_con[peer]); - heartbeat_to_con[peer]->put(); - heartbeat_to_con.erase(peer); - heartbeat_to.erase(peer); - } - failure_queue.erase(peer); failure_pending.erase(peer); - heartbeat_lock.Unlock(); } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 8e1f5f1dfa4b..b8621303ae80 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -255,18 +255,24 @@ public: private: // -- heartbeat -- + /// information about a heartbeat peer + struct HeartbeatInfo { + Connection *con; ///< peer connection + utime_t first_tx; ///< time we sent our first ping request + utime_t last_tx; ///< last time we sent a ping request + utime_t last_rx; ///< last time we got a ping reply + epoch_t epoch; ///< most recent epoch we wanted this peer + }; Mutex heartbeat_lock; Cond heartbeat_cond; - bool heartbeat_stop, heartbeat_need_update; - epoch_t heartbeat_epoch; - map heartbeat_to, heartbeat_from; - map heartbeat_from_stamp; - map heartbeat_to_con, heartbeat_from_con; + bool heartbeat_stop; + bool heartbeat_need_update; ///< true if we need to refresh our heartbeat peers + epoch_t heartbeat_epoch; ///< last epoch we updated our heartbeat peers + map heartbeat_peers; ///< map of osd id to HeartbeatInfo utime_t last_mon_heartbeat; Messenger *hbin_messenger, *hbout_messenger; - void _add_heartbeat_source(int p, map& old_from, map& old_from_stamp, - map& old_con); + void _add_heartbeat_peer(int p); void maybe_update_heartbeat_peers(); void reset_heartbeat_peers(); void heartbeat();