]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: simplify heartbeat logic
authorSage Weil <sage@newdream.net>
Thu, 22 Mar 2012 14:50:44 +0000 (07:50 -0700)
committerSage Weil <sage@newdream.net>
Fri, 30 Mar 2012 15:44:26 +0000 (08:44 -0700)
Simplify heartbeats to use a simple request/reply model.

 - avoid any weirdness with map update timing
 - no from/to distinction
 - lossy client/server model

Signed-off-by: Sage Weil <sage@newdream.net>
src/ceph_osd.cc
src/messages/MOSDPing.h
src/osd/OSD.cc
src/osd/OSD.h

index 340fecefd90ea01ba22817009fe3af9d06c72de7..e1e51190cce3709b5e5950f14d1279ac1fb324ea 100644 (file)
@@ -355,6 +355,11 @@ int main(int argc, const char **argv)
   cluster_messenger->set_policy(entity_name_t::TYPE_CLIENT,
                                Messenger::Policy::stateless_server(0, 0));
 
+  messenger_hbin->set_policy(entity_name_t::TYPE_OSD,
+                            Messenger::Policy::client(0, 0));
+  messenger_hbout->set_policy(entity_name_t::TYPE_OSD,
+                            Messenger::Policy::stateless_server(0, 0));
+
   r = client_messenger->bind(g_conf->public_addr);
   if (r < 0)
     exit(1);
index 6c3d7e8b38782d5e61b438973ea143991efe9561..4451a478b1f7e303ad5927e0ca75fc37e28d9044 100644 (file)
 
 
 class MOSDPing : public Message {
+
+  static const int HEAD_VERSION = 2;
+  static const int COMPAT_VERSION = 1;
+
  public:
   enum {
     HEARTBEAT = 0,
     START_HEARTBEAT = 1,
     YOU_DIED = 2,
     STOP_HEARTBEAT = 3,
+    PING = 4,
+    PING_REPLY = 5,
   };
   const char *get_op_name(int op) const {
     switch (op) {
@@ -35,6 +41,8 @@ class MOSDPing : public Message {
     case START_HEARTBEAT: return "start_heartbeat";
     case STOP_HEARTBEAT: return "stop_heartbeat";
     case YOU_DIED: return "you_died";
+    case PING: return "ping";
+    case PING_REPLY: return "ping_reply";
     default: return "???";
     }
   }
@@ -43,12 +51,15 @@ class MOSDPing : public Message {
   epoch_t map_epoch, peer_as_of_epoch;
   __u8 op;
   osd_peer_stat_t peer_stat;
+  utime_t stamp;
 
-  MOSDPing(const uuid_d& f, epoch_t e, epoch_t pe, __u8 o) : 
-    Message(MSG_OSD_PING), fsid(f), map_epoch(e), peer_as_of_epoch(pe), op(o) { }
-  MOSDPing(const uuid_d& f, epoch_t e, epoch_t pe, osd_peer_stat_t& ps, __u8 o=HEARTBEAT) : 
-    Message(MSG_OSD_PING), fsid(f), map_epoch(e), peer_as_of_epoch(pe), op(o), peer_stat(ps) { }
-  MOSDPing() : Message(MSG_OSD_PING) {}
+  MOSDPing(const uuid_d& f, epoch_t e, __u8 o, utime_t s)
+    : Message(MSG_OSD_PING, HEAD_VERSION, COMPAT_VERSION),
+      fsid(f), map_epoch(e), peer_as_of_epoch(0), op(o), stamp(s)
+  { }
+  MOSDPing()
+    : Message(MSG_OSD_PING, HEAD_VERSION, COMPAT_VERSION)
+  {}
 private:
   ~MOSDPing() {}
 
@@ -60,6 +71,8 @@ public:
     ::decode(peer_as_of_epoch, p);
     ::decode(op, p);
     ::decode(peer_stat, p);
+    if (header.version >= 2)
+      ::decode(stamp, p);
   }
   void encode_payload(uint64_t features) {
     ::encode(fsid, payload);
@@ -67,11 +80,16 @@ public:
     ::encode(peer_as_of_epoch, payload);
     ::encode(op, payload);
     ::encode(peer_stat, payload);
+    ::encode(stamp, payload);
   }
 
   const char *get_type_name() const { return "osd_ping"; }
   void print(ostream& out) const {
-    out << "osd_ping(" << get_op_name(op) << " e" << map_epoch << " as_of " << peer_as_of_epoch << ")";
+    out << "osd_ping(" << get_op_name(op)
+       << " e" << map_epoch
+      //<< " as_of " << peer_as_of_epoch
+       << " stamp " << stamp
+       << ")";
   }
 };
 
index 7ef3b8d7f461c58769d022243a4714ca3d0f9dee..0361f5905acfacd48af0760a31a8b578f50ed259 100644 (file)
@@ -1430,37 +1430,29 @@ void OSD::update_osd_stat()
   osd_stat.kb_avail = stbuf.f_bavail * stbuf.f_bsize / 1024;
 
   osd_stat.hb_in.clear();
-  for (map<int,epoch_t>::iterator p = heartbeat_from.begin(); p != heartbeat_from.end(); p++)
+  for (map<int,HeartbeatInfo>::iterator p = heartbeat_peers.begin(); p != heartbeat_peers.end(); p++)
     osd_stat.hb_in.push_back(p->first);
   osd_stat.hb_out.clear();
-  for (map<int,epoch_t>::iterator p = heartbeat_to.begin(); p != heartbeat_to.end(); p++)
-    osd_stat.hb_out.push_back(p->first);
 
   dout(20) << "update_osd_stat " << osd_stat << dendl;
 }
 
-void OSD::_add_heartbeat_source(int p, map<int, epoch_t>& old_from, map<int, utime_t>& old_from_stamp,
-                               map<int,Connection*>& old_con)
+void OSD::_add_heartbeat_peer(int p)
 {
   if (p == whoami)
     return;
-  if (heartbeat_from.count(p))
-    return;
-
-  heartbeat_from[p] = osdmap->get_epoch();
-  Connection *con = hbin_messenger->get_connection(osdmap->get_hb_inst(p));
-  heartbeat_from_con[p] = con;
-  if (old_from_stamp.count(p) && old_from.count(p) && old_con[p] == con) {
-    // have a stamp _AND_ i'm not new to the set
-    heartbeat_from_stamp[p] = old_from_stamp[p];
+  HeartbeatInfo *hi;
+
+  map<int,HeartbeatInfo>::iterator i = heartbeat_peers.find(p);
+  if (i == heartbeat_peers.end()) {
+    hi = &heartbeat_peers[p];
+    hi->con = hbin_messenger->get_connection(osdmap->get_hb_inst(p));
+    dout(10) << "_add_heartbeat_peer: new peer osd." << p
+            << " " << hi->con->get_peer_addr() << dendl;
   } else {
-    dout(10) << "update_heartbeat_peers: new _from osd." << p
-            << " " << con->get_peer_addr() << dendl;
-    heartbeat_from_stamp[p] = ceph_clock_now(g_ceph_context);  
-    MOSDPing *m = new MOSDPing(monc->get_fsid(), 0, heartbeat_epoch,
-                              MOSDPing::START_HEARTBEAT);
-    hbin_messenger->send_message(m, con);
+    hi = &i->second;
   }
+  hi->epoch = osdmap->get_epoch();
 }
 
 void OSD::need_heartbeat_peer_update()
@@ -1480,16 +1472,6 @@ void OSD::maybe_update_heartbeat_peers()
     return;
   heartbeat_need_update = false;
 
-  // filter heartbeat_from_stamp to only include osds that remain in
-  // heartbeat_from.
-  map<int, utime_t> old_from_stamp;
-  old_from_stamp.swap(heartbeat_from_stamp);
-
-  map<int, epoch_t> old_from;
-  map<int, Connection*> old_con;
-  old_from.swap(heartbeat_from);
-  old_con.swap(heartbeat_from_con);
-
   heartbeat_epoch = osdmap->get_epoch();
 
   // build heartbeat from set
@@ -1503,73 +1485,37 @@ void OSD::maybe_update_heartbeat_peers()
         p != pg->heartbeat_peers.end();
         ++p)
       if (osdmap->is_up(*p))
-       _add_heartbeat_source(*p, old_from, old_from_stamp, old_con);
+       _add_heartbeat_peer(*p);
     pg->heartbeat_peer_lock.Unlock();
   }
 
-  for (map<int,epoch_t>::iterator p = old_from.begin();
-       p != old_from.end();
-       p++) {
-    assert(old_con.count(p->first));
-    Connection *con = old_con[p->first];
-
-    if (heartbeat_from.count(p->first) && heartbeat_from_con[p->first] == con) {
-      con->put();
-      continue;
-    }
-
-    // share latest map with this peer, just to be nice.
-    if (osdmap->is_up(p->first) && is_active()) {
-      dout(10) << "update_heartbeat_peers: sharing map with old _from peer osd." << p->first << dendl;
-      _share_map_outgoing(osdmap->get_cluster_inst(p->first));
-    }
-
-    dout(10) << "update_heartbeat_peers: will mark down old _from peer osd." << p->first
-            << " " << con->get_peer_addr()
-            << " as of " << p->second << dendl;
-    
-    if (!osdmap->is_up(p->first) && is_active()) {
-      dout(10) << "update_heartbeat_peers: telling old peer osd." << p->first
-              << " " << old_con[p->first]->get_peer_addr()
-              << " they are down" << dendl;
-      hbin_messenger->send_message(new MOSDPing(monc->get_fsid(), heartbeat_epoch,
-                                               heartbeat_epoch,
-                                               MOSDPing::YOU_DIED), con);
-      hbin_messenger->mark_disposable(con);
-      hbin_messenger->mark_down_on_empty(con);
+  map<int,HeartbeatInfo>::iterator p = heartbeat_peers.begin();
+  while (p != heartbeat_peers.end()) {
+    if (p->second.epoch < osdmap->get_epoch()) {
+      dout(20) << " removing heartbeat peer osd." << p->first
+              << " " << p->second.con->get_peer_addr()
+              << dendl;
+      hbin_messenger->mark_down(p->second.con);
+      p->second.con->put();
+      heartbeat_peers.erase(p++);
     } else {
-      // tell them to stop sending heartbeats
-      hbin_messenger->send_message(new MOSDPing(monc->get_fsid(), 0, heartbeat_epoch,
-                                               MOSDPing::STOP_HEARTBEAT), con);
-    }
-    if (!osdmap->is_up(p->first)) {
-      forget_peer_epoch(p->first, osdmap->get_epoch());
+      ++p;
     }
-    con->put();
   }
-
-  dout(10) << "update_heartbeat_peers: hb   to: " << heartbeat_to << dendl;
-  dout(10) << "update_heartbeat_peers: hb from: " << heartbeat_from << dendl;
+  dout(10) << "maybe_update_heartbeat_peers " << heartbeat_peers.size() << " peers" << dendl;
 }
 
 void OSD::reset_heartbeat_peers()
 {
   dout(10) << "reset_heartbeat_peers" << dendl;
   heartbeat_lock.Lock();
-  heartbeat_to.clear();
-  heartbeat_from.clear();
-  heartbeat_from_stamp.clear();
-  while (!heartbeat_to_con.empty()) {
-    heartbeat_to_con.begin()->second->put();
-    heartbeat_to_con.erase(heartbeat_to_con.begin());
-  }
-  while (!heartbeat_from_con.empty()) {
-    heartbeat_from_con.begin()->second->put();
-    heartbeat_from_con.erase(heartbeat_from_con.begin());
+  while (!heartbeat_peers.empty()) {
+    hbin_messenger->mark_down(heartbeat_peers.begin()->second.con);
+    heartbeat_peers.begin()->second.con->put();
+    heartbeat_peers.erase(heartbeat_peers.begin());
   }
   failure_queue.clear();
   heartbeat_lock.Unlock();
-
 }
 
 void OSD::handle_osd_ping(MOSDPing *m)
@@ -1581,91 +1527,42 @@ void OSD::handle_osd_ping(MOSDPing *m)
     return;
   }
 
-  heartbeat_lock.Lock();
   int from = m->get_source().num();
+
+  heartbeat_lock.Lock();
+
   bool locked = map_lock.try_get_read();
   
   switch (m->op) {
-  case MOSDPing::START_HEARTBEAT:
-    if (heartbeat_to.count(from)) {
-      if (heartbeat_to_con[from] != m->get_connection()) {
-       // different connection
-       if (heartbeat_to[from] > m->peer_as_of_epoch) {
-         dout(5) << "handle_osd_ping marking down peer " << m->get_source_inst()
-                 << " after old start message from epoch " << m->peer_as_of_epoch
-                 << " <= current " << heartbeat_to[from] << dendl;
-         hbout_messenger->mark_down(m->get_connection());
-       } else {
-         dout(5) << "handle_osd_ping replacing old peer "
-                 << heartbeat_to_con[from]->get_peer_addr()
-                 << " epoch " << heartbeat_to[from]
-                 << " with new peer " << m->get_source_inst()
-                 << " epoch " << m->peer_as_of_epoch
-                 << dendl;
-         hbout_messenger->mark_down(heartbeat_to_con[from]);
-         heartbeat_to_con[from]->put();
-         
-         // remember new connection
-         heartbeat_to[from] = m->peer_as_of_epoch;
-         heartbeat_to_con[from] = m->get_connection();
-         heartbeat_to_con[from]->get();
-       }
-      } else {
-       // same connection
-       dout(5) << "handle_osd_ping dup peer " << m->get_source_inst()
-               << " request for heartbeats as_of " << m->peer_as_of_epoch
-               << ", same connection" << dendl;
-       heartbeat_to[from] = m->peer_as_of_epoch;
-      }
-    } else {
-      // new
-      dout(5) << "handle_osd_ping peer " << m->get_source_inst()
-             << " requesting heartbeats as_of " << m->peer_as_of_epoch << dendl;
-      heartbeat_to[from] = m->peer_as_of_epoch;
-      heartbeat_to_con[from] = m->get_connection();
-      heartbeat_to_con[from]->get();
-      
-      if (locked && m->map_epoch && is_active()) {
-       _share_map_incoming(m->get_source_inst(), m->map_epoch,
-                           (Session*) m->get_connection()->get_priv());
-      }
-    }
-    break;
 
-  case MOSDPing::STOP_HEARTBEAT:
+  case MOSDPing::PING:
     {
-      if (heartbeat_to.count(from) && heartbeat_to_con[from] == m->get_connection()) {
-       dout(5) << "handle_osd_ping peer " << m->get_source_inst()
-               << " stopping heartbeats as_of " << m->peer_as_of_epoch << dendl;
-       heartbeat_to.erase(from);
-       //hbout_messenger->mark_down(heartbeat_to_con[from]);
-       heartbeat_to_con[from]->put();
-       heartbeat_to_con.erase(from);
-      } else {
-       dout(5) << "handle_osd_ping peer " << m->get_source_inst()
-               << " ignoring stop request as_of " << m->peer_as_of_epoch << dendl;
+      Message *r = new MOSDPing(monc->get_fsid(),
+                               locked ? osdmap->get_epoch():0, 
+                               MOSDPing::PING_REPLY,
+                               m->stamp);
+      hbout_messenger->send_message(r, m->get_connection());
+
+      if (osdmap->is_up(from)) {
+       note_peer_epoch(from, m->map_epoch);
+       if (locked && is_active())
+         _share_map_outgoing(osdmap->get_cluster_inst(from));
       }
     }
     break;
 
-  case MOSDPing::HEARTBEAT:
-    if (heartbeat_from.count(from) && heartbeat_from_con[from] == m->get_connection()) {
-      dout(20) << "handle_osd_ping " << m->get_source_inst() << dendl;
+  case MOSDPing::PING_REPLY:
+    {
+      map<int,HeartbeatInfo>::iterator i = heartbeat_peers.find(from);
+      if (i != heartbeat_peers.end()) {
+       i->second.last_rx = m->stamp;
+      }
 
-      note_peer_epoch(from, m->map_epoch);
-      if (locked && is_active())
-       _share_map_outgoing(osdmap->get_cluster_inst(from));
-      
-      heartbeat_from_stamp[from] = ceph_clock_now(g_ceph_context);  // don't let _my_ lag interfere.
-      
-      // remove from failure lists if needed
-      if (failure_pending.count(from)) {
-       send_still_alive(failure_pending[from]);
-       failure_pending.erase(from);
+      if (osdmap->is_up(from)) {
+       note_peer_epoch(from, m->map_epoch);
+       if (locked && is_active())
+         _share_map_outgoing(osdmap->get_cluster_inst(from));
       }
-      failure_queue.erase(from);
-    } else {
-      dout(10) << "handle_osd_ping ignoring " << m->get_source_inst() << dendl;
     }
     break;
 
@@ -1682,7 +1579,6 @@ void OSD::handle_osd_ping(MOSDPing *m)
 
   heartbeat_lock.Unlock();
   m->put();
-
 }
 
 void OSD::heartbeat_entry()
@@ -1707,26 +1603,32 @@ void OSD::heartbeat_check()
   // we should also have map_lock rdlocked.
 
   // check for incoming heartbeats (move me elsewhere?)
-  utime_t grace = ceph_clock_now(g_ceph_context);
-  grace -= g_conf->osd_heartbeat_grace;
-  for (map<int, epoch_t>::iterator p = heartbeat_from.begin();
-       p != heartbeat_from.end();
+  utime_t cutoff = ceph_clock_now(g_ceph_context);
+  cutoff -= g_conf->osd_heartbeat_grace;
+  for (map<int,HeartbeatInfo>::iterator p = heartbeat_peers.begin();
+       p != heartbeat_peers.end();
        p++) {
-    if (heartbeat_from_stamp.count(p->first) &&
-       heartbeat_from_stamp[p->first] < grace) {
-      derr << "heartbeat_check: no heartbeat from osd." << p->first
-          << " since " << heartbeat_from_stamp[p->first]
-          << " (cutoff " << grace << ")" << dendl;
-      queue_failure(p->first);
-
+    if (p->second.last_rx == utime_t()) {
+      if (p->second.last_tx > cutoff)
+       continue;  // just started sending recently
+      derr << "heartbeat_check: no reply from osd." << p->first
+          << " ever, first ping sent " << p->second.first_tx
+          << " (cutoff " << cutoff << ")" << dendl;
+    } else {
+      if (p->second.last_rx > cutoff)
+       continue;  // got recent reply
+      derr << "heartbeat_check: no reply from osd." << p->first
+          << " since " << p->second.last_rx
+          << " (cutoff " << cutoff << ")" << dendl;
     }
+
+    // fail!
+    queue_failure(p->first);
   }
 }
 
 void OSD::heartbeat()
 {
-  utime_t now = ceph_clock_now(g_ceph_context);
-
   dout(30) << "heartbeat" << dendl;
 
   // get CPU load avg
@@ -1747,18 +1649,23 @@ void OSD::heartbeat()
   bool map_locked = map_lock.try_get_read();
   dout(30) << "heartbeat map_locked=" << map_locked << dendl;
 
+  utime_t now = ceph_clock_now(g_ceph_context);
+
   // send heartbeats
-  for (map<int, epoch_t>::iterator i = heartbeat_to.begin();
-       i != heartbeat_to.end();
+  for (map<int,HeartbeatInfo>::iterator i = heartbeat_peers.begin();
+       i != heartbeat_peers.end();
        i++) {
     int peer = i->first;
     dout(30) << "heartbeat allocating ping for osd." << peer << dendl;
     Message *m = new MOSDPing(monc->get_fsid(),
                              map_locked ? osdmap->get_epoch():0, 
-                             i->second, MOSDPing::HEARTBEAT);
-    m->set_priority(CEPH_MSG_PRIO_HIGH);
+                             MOSDPing::PING,
+                             now);
+    i->second.last_tx = now;
+    if (i->second.first_tx == utime_t())
+      i->second.first_tx = now;
     dout(30) << "heartbeat sending ping to osd." << peer << dendl;
-    hbout_messenger->send_message(m, heartbeat_to_con[peer]);
+    hbout_messenger->send_message(m, i->second.con);
   }
 
   if (map_locked) {
@@ -1769,13 +1676,13 @@ void OSD::heartbeat()
   }
 
   if (logger) {
-    logger->set(l_osd_hb_to, heartbeat_to.size());
-    logger->set(l_osd_hb_from, heartbeat_from.size());
+    logger->set(l_osd_hb_to, heartbeat_peers.size());
+    logger->set(l_osd_hb_from, 0);
   }
   
   // hmm.. am i all alone?
   dout(30) << "heartbeat lonely?" << dendl;
-  if (heartbeat_from.empty() || heartbeat_to.empty()) {
+  if (heartbeat_peers.empty()) {
     if (now - last_mon_heartbeat > g_conf->osd_mon_heartbeat_interval && is_active()) {
       last_mon_heartbeat = now;
       dout(10) << "i have no heartbeat peers; checking mon for new map" << dendl;
@@ -3111,22 +3018,8 @@ void OSD::note_down_osd(int peer)
   cluster_messenger->mark_down(osdmap->get_cluster_addr(peer));
 
   heartbeat_lock.Lock();
-
-  // clean out down osds i am sending heartbeats _to_.
-  // update_heartbeat_peers() will clean out peers i expect heartbeast _from_.
-  if (heartbeat_to.count(peer) &&
-      heartbeat_to[peer] < osdmap->get_epoch()) {
-    dout(10) << "note_down_osd osd." << peer << " marking down hbout connection "
-            << heartbeat_to_con[peer]->get_peer_addr() << dendl;
-    hbout_messenger->mark_down(heartbeat_to_con[peer]);
-    heartbeat_to_con[peer]->put();
-    heartbeat_to_con.erase(peer);
-    heartbeat_to.erase(peer);
-  }
-
   failure_queue.erase(peer);
   failure_pending.erase(peer);
-
   heartbeat_lock.Unlock();
 }
 
index 8e1f5f1dfa4b0fdf8b29aadd790a680c84cf983b..b8621303ae80e4d4c1cdf570f9e0673c8c485256 100644 (file)
@@ -255,18 +255,24 @@ public:
 
 private:
   // -- heartbeat --
+  /// information about a heartbeat peer
+  struct HeartbeatInfo {
+    Connection *con;    ///< peer connection
+    utime_t first_tx;   ///< time we sent our first ping request
+    utime_t last_tx;    ///< last time we sent a ping request
+    utime_t last_rx;    ///< last time we got a ping reply
+    epoch_t epoch;      ///< most recent epoch we wanted this peer
+  };
   Mutex heartbeat_lock;
   Cond heartbeat_cond;
-  bool heartbeat_stop, heartbeat_need_update;
-  epoch_t heartbeat_epoch;
-  map<int, epoch_t> heartbeat_to, heartbeat_from;
-  map<int, utime_t> heartbeat_from_stamp;
-  map<int, Connection*> heartbeat_to_con, heartbeat_from_con;
+  bool heartbeat_stop;
+  bool heartbeat_need_update;   ///< true if we need to refresh our heartbeat peers
+  epoch_t heartbeat_epoch;      ///< last epoch we updated our heartbeat peers
+  map<int,HeartbeatInfo> heartbeat_peers;  ///< map of osd id to HeartbeatInfo
   utime_t last_mon_heartbeat;
   Messenger *hbin_messenger, *hbout_messenger;
   
-  void _add_heartbeat_source(int p, map<int, epoch_t>& old_from, map<int, utime_t>& old_from_stamp,
-                            map<int,Connection*>& old_con);
+  void _add_heartbeat_peer(int p);
   void maybe_update_heartbeat_peers();
   void reset_heartbeat_peers();
   void heartbeat();