]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: ping both front and back interfaces 312/head
authorSage Weil <sage@inktank.com>
Wed, 22 May 2013 15:44:52 +0000 (08:44 -0700)
committerSage Weil <sage@inktank.com>
Wed, 22 May 2013 23:13:37 +0000 (16:13 -0700)
Send ping requests to both the front and back hb addrs for peer osds.  If
the front hb addr is not present, do not send it and interpret a reply
as coming from both.  This handles the transition from old to new OSDs
seamlessly.

Note both the front and back rx times.  Both need to be up to date in order
for the peer to be healthy.

Signed-off-by: Sage Weil <sage@inktank.com>
src/msg/Message.h
src/osd/OSD.cc
src/osd/OSD.h

index 79a00c06fbe8c1f9f5764c0be7d4362a721f6f1c..18a64c1d02eb8ebbfd4096a41178f27c721a387d 100644 (file)
@@ -247,6 +247,10 @@ public:
     return pipe != NULL;
   }
 
+  Messenger *get_messenger() {
+    return msgr;
+  }
+
   int get_peer_type() { return peer_type; }
   void set_peer_type(int t) { peer_type = t; }
   
index 5b51b44f24d0d0da31d4c023a2b1326c7f6b9434..a37c95e2c7af184858c952dea5adebbbe8375bd1 100644 (file)
@@ -2250,16 +2250,24 @@ void OSD::_add_heartbeat_peer(int p)
 
   map<int,HeartbeatInfo>::iterator i = heartbeat_peers.find(p);
   if (i == heartbeat_peers.end()) {
-    ConnectionRef con = service.get_con_osd_hb(p, osdmap->get_epoch());
-    if (!con)
+    pair<ConnectionRef,ConnectionRef> cons = service.get_con_osd_hb(p, osdmap->get_epoch());
+    if (!cons.first)
       return;
     hi = &heartbeat_peers[p];
-    hi->con = con.get();
-    hi->con->get();
     hi->peer = p;
-    hi->con->set_priv(new HeartbeatSession(p));
+    HeartbeatSession *s = new HeartbeatSession(p);
+    hi->con_back = cons.first.get();
+    hi->con_back->get();
+    hi->con_back->set_priv(s);
+    if (cons.second) {
+      hi->con_front = cons.second.get();
+      hi->con_front->get();
+      hi->con_front->set_priv(s->get());
+    }
     dout(10) << "_add_heartbeat_peer: new peer osd." << p
-            << " " << hi->con->get_peer_addr() << dendl;
+            << " " << hi->con_back->get_peer_addr()
+            << " " << (hi->con_front ? hi->con_front->get_peer_addr() : entity_addr_t())
+            << dendl;
   } else {
     hi = &i->second;
   }
@@ -2310,10 +2318,15 @@ void OSD::maybe_update_heartbeat_peers()
   while (p != heartbeat_peers.end()) {
     if (p->second.epoch < osdmap->get_epoch()) {
       dout(20) << " removing heartbeat peer osd." << p->first
-              << " " << p->second.con->get_peer_addr()
+              << " " << p->second.con_back->get_peer_addr()
+              << " " << (p->second.con_front ? p->second.con_front->get_peer_addr() : entity_addr_t())
               << dendl;
-      hbclient_messenger->mark_down(p->second.con);
-      p->second.con->put();
+      hbclient_messenger->mark_down(p->second.con_back);
+      p->second.con_back->put();
+      if (p->second.con_front) {
+       hbclient_messenger->mark_down(p->second.con_front);
+       p->second.con_front->put();
+      }
       heartbeat_peers.erase(p++);
     } else {
       ++p;
@@ -2328,8 +2341,13 @@ void OSD::reset_heartbeat_peers()
   dout(10) << "reset_heartbeat_peers" << dendl;
   Mutex::Locker l(heartbeat_lock);
   while (!heartbeat_peers.empty()) {
-    hbclient_messenger->mark_down(heartbeat_peers.begin()->second.con);
-    heartbeat_peers.begin()->second.con->put();
+    HeartbeatInfo& hi = heartbeat_peers.begin()->second;
+    hbclient_messenger->mark_down(hi.con_back);
+    hi.con_back->put();
+    if (hi.con_front) {
+      hbclient_messenger->mark_down(hi.con_front);
+      hi.con_front->put();
+    }
     heartbeat_peers.erase(heartbeat_peers.begin());
   }
   failure_queue.clear();
@@ -2389,7 +2407,7 @@ void OSD::handle_osd_ping(MOSDPing *m)
                                curmap->get_epoch(),
                                MOSDPing::PING_REPLY,
                                m->stamp);
-      hb_back_server_messenger->send_message(r, m->get_connection());
+      m->get_connection()->get_messenger()->send_message(r, m->get_connection());
 
       if (curmap->is_up(from)) {
        note_peer_epoch(from, m->map_epoch);
@@ -2407,12 +2425,26 @@ void OSD::handle_osd_ping(MOSDPing *m)
     {
       map<int,HeartbeatInfo>::iterator i = heartbeat_peers.find(from);
       if (i != heartbeat_peers.end()) {
-       dout(25) << "handle_osd_ping got reply from osd." << from
-                << " first_rx " << i->second.first_tx
-                << " last_tx " << i->second.last_tx
-                << " last_rx " << i->second.last_rx << " -> " << m->stamp
-                << dendl;
-       i->second.last_rx = m->stamp;
+       if (m->get_connection() == i->second.con_back) {
+         dout(25) << "handle_osd_ping got reply from osd." << from
+                  << " first_rx " << i->second.first_tx
+                  << " last_tx " << i->second.last_tx
+                  << " last_rx_back " << i->second.last_rx_back << " -> " << m->stamp
+                  << " last_rx_front " << i->second.last_rx_front
+                  << dendl;
+         i->second.last_rx_back = m->stamp;
+         // if there is no front con, set both stamps.
+         if (i->second.con_front == NULL)
+           i->second.last_rx_front = m->stamp;
+       } else if (m->get_connection() == i->second.con_front) {
+         dout(25) << "handle_osd_ping got reply from osd." << from
+                  << " first_rx " << i->second.first_tx
+                  << " last_tx " << i->second.last_tx
+                  << " last_rx_back " << i->second.last_rx_back
+                  << " last_rx_front " << i->second.last_rx_front << " -> " << m->stamp
+                  << dendl;
+         i->second.last_rx_front = m->stamp;
+       }
       }
 
       if (m->map_epoch &&
@@ -2426,12 +2458,19 @@ void OSD::handle_osd_ping(MOSDPing *m)
        }
       }
 
-      // Cancel false reports
-      if (failure_queue.count(from))
-       failure_queue.erase(from);
-      if (failure_pending.count(from)) {
-       send_still_alive(curmap->get_epoch(), failure_pending[from]);
-       failure_pending.erase(from);
+      utime_t cutoff = ceph_clock_now(g_ceph_context);
+      cutoff -= g_conf->osd_heartbeat_grace;
+      if (i->second.is_healthy(cutoff)) {
+       // Cancel false reports
+       if (failure_queue.count(from)) {
+         dout(10) << "handle_osd_ping canceling queued failure report for osd." << from<< dendl;
+         failure_queue.erase(from);
+       }
+       if (failure_pending.count(from)) {
+         dout(10) << "handle_osd_ping canceling in-flight failure report for osd." << from<< dendl;
+         send_still_alive(curmap->get_epoch(), failure_pending[from]);
+         failure_pending.erase(from);
+       }
       }
     }
     break;
@@ -2486,27 +2525,25 @@ void OSD::heartbeat_check()
     dout(25) << "heartbeat_check osd." << p->first
             << " first_tx " << p->second.first_tx
             << " last_tx " << p->second.last_tx
-            << " last_rx " << p->second.last_rx
+            << " last_rx_back " << p->second.last_rx_back
+            << " last_rx_front " << p->second.last_rx_front
             << dendl;
-    if (p->second.last_rx == utime_t()) {
-      if (p->second.last_tx == utime_t() ||
-         p->second.first_tx > cutoff)
-       continue;  // just started sending recently
-      derr << "heartbeat_check: no reply from osd." << p->first
-          << " ever, first ping sent " << p->second.first_tx
-          << " (cutoff " << cutoff << ")" << dendl;
-
-      // fail
-      failure_queue[p->first] = p->second.last_tx;
-    } else {
-      if (p->second.last_rx > cutoff)
-       continue;  // got recent reply
-      derr << "heartbeat_check: no reply from osd." << p->first
-          << " since " << p->second.last_rx
-          << " (cutoff " << cutoff << ")" << dendl;
-
-      // fail
-      failure_queue[p->first] = p->second.last_rx;
+    if (!p->second.is_healthy(cutoff)) {
+      if (p->second.last_rx_back == utime_t() ||
+         p->second.last_rx_front == utime_t()) {
+       derr << "heartbeat_check: no reply from osd." << p->first
+            << " ever on either front or back, first ping sent " << p->second.first_tx
+            << " (cutoff " << cutoff << ")" << dendl;
+       // fail
+       failure_queue[p->first] = p->second.last_tx;
+      } else {
+       derr << "heartbeat_check: no reply from osd." << p->first
+            << " since back " << p->second.last_rx_back
+            << " front " << p->second.last_rx_front
+            << " (cutoff " << cutoff << ")" << dendl;
+       // fail
+       failure_queue[p->first] = MIN(p->second.last_rx_back, p->second.last_rx_front);
+      }
     }
   }
 }
@@ -2537,16 +2574,21 @@ void OSD::heartbeat()
        i != heartbeat_peers.end();
        ++i) {
     int peer = i->first;
-    dout(30) << "heartbeat allocating ping for osd." << peer << dendl;
-    Message *m = new MOSDPing(monc->get_fsid(),
-                             service.get_osdmap()->get_epoch(),
-                             MOSDPing::PING,
-                             now);
     i->second.last_tx = now;
     if (i->second.first_tx == utime_t())
       i->second.first_tx = now;
     dout(30) << "heartbeat sending ping to osd." << peer << dendl;
-    hbclient_messenger->send_message(m, i->second.con);
+    hbclient_messenger->send_message(new MOSDPing(monc->get_fsid(),
+                                                 service.get_osdmap()->get_epoch(),
+                                                 MOSDPing::PING,
+                                                 now),
+                                    i->second.con_back);
+    if (i->second.con_front)
+      hbclient_messenger->send_message(new MOSDPing(monc->get_fsid(),
+                                                   service.get_osdmap()->get_epoch(),
+                                                   MOSDPing::PING,
+                                                   now),
+                                      i->second.con_front);
   }
 
   dout(30) << "heartbeat check" << dendl;
@@ -2580,20 +2622,30 @@ bool OSD::heartbeat_reset(Connection *con)
     }
     map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(s->peer);
     if (p != heartbeat_peers.end() &&
-       p->second.con == con) {
-      ConnectionRef newcon = service.get_con_osd_hb(p->second.peer, p->second.epoch);
-      if (!newcon) {
+       p->second.con_back == con) {
+      pair<ConnectionRef,ConnectionRef> newcon = service.get_con_osd_hb(p->second.peer, p->second.epoch);
+      if (!newcon.first) {
        dout(10) << "heartbeat_reset reopen failed hb con " << con << " but failed to reopen" << dendl;
       } else {
        dout(10) << "heartbeat_reset reopen failed hb con " << con << dendl;
-       p->second.con = newcon.get();
-       p->second.con->get();
-       p->second.con->set_priv(s);
+       hbclient_messenger->mark_down(p->second.con_back);
+       p->second.con_back = newcon.first.get();
+       p->second.con_back->get();
+       p->second.con_back->set_priv(s);
+       if (p->second.con_front)
+         hbclient_messenger->mark_down(p->second.con_front);
+       if (newcon.second) {
+         p->second.con_front = newcon.second.get();
+         p->second.con_front->get();
+         p->second.con_front->set_priv(s->get());
+       } else {
+         p->second.con_front = NULL;
+       }
       }
     } else {
       dout(10) << "heartbeat_reset closing (old) failed hb con " << con << dendl;
+      hbclient_messenger->mark_down(con);
     }
-    hbclient_messenger->mark_down(con);
     heartbeat_lock.Unlock();
     s->put();
   }
@@ -3121,20 +3173,23 @@ ConnectionRef OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch)
   return ret;
 }
 
-ConnectionRef OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
+pair<ConnectionRef,ConnectionRef> OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
 {
   Mutex::Locker l(pre_publish_lock);
 
   // service map is always newer/newest
   assert(from_epoch <= next_osdmap->get_epoch());
 
+  pair<ConnectionRef,ConnectionRef> ret;
   if (next_osdmap->is_down(peer) ||
       next_osdmap->get_info(peer).up_from > from_epoch) {
-    return NULL;
+    return ret;
   }
-  ConnectionRef ret(
-    osd->hbclient_messenger->get_connection(next_osdmap->get_hb_back_inst(peer)));
-  ret->put(); // Ref from get_connection
+  ret.first = osd->hbclient_messenger->get_connection(next_osdmap->get_hb_back_inst(peer));
+  ret.first->put(); // Ref from get_connection
+  ret.second = osd->hbclient_messenger->get_connection(next_osdmap->get_hb_front_inst(peer));
+  if (ret.second)
+    ret.second->put(); // Ref from get_connection
   return ret;
 }
 
@@ -4216,8 +4271,12 @@ void OSD::note_down_osd(int peer)
   failure_pending.erase(peer);
   map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(peer);
   if (p != heartbeat_peers.end()) {
-    hbclient_messenger->mark_down(p->second.con);
-    p->second.con->put();
+    hbclient_messenger->mark_down(p->second.con_back);
+    p->second.con_back->put();
+    if (p->second.con_front) {
+      hbclient_messenger->mark_down(p->second.con_back);
+      p->second.con_front->put();
+    }
     heartbeat_peers.erase(p);
   }
   heartbeat_lock.Unlock();
index b26e0598f4c8ee4480d0dc94c6d156fe419f441e..428284c85abc6750a21fc02826335779e2075e0e 100644 (file)
@@ -295,7 +295,7 @@ public:
     next_osdmap = map;
   }
   ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
-  ConnectionRef get_con_osd_hb(int peer, epoch_t from_epoch);
+  pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch);  // (back, front)
   void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch);
   void send_message_osd_cluster(Message *m, Connection *con) {
     cluster_messenger->send_message(m, con);
@@ -696,11 +696,23 @@ private:
   /// information about a heartbeat peer
   struct HeartbeatInfo {
     int peer;           ///< peer
-    Connection *con;    ///< peer connection
+    Connection *con_front;   ///< peer connection (front)
+    Connection *con_back;    ///< peer connection (back)
     utime_t first_tx;   ///< time we sent our first ping request
     utime_t last_tx;    ///< last time we sent a ping request
-    utime_t last_rx;    ///< last time we got a ping reply
+    utime_t last_rx_front;  ///< last time we got a ping reply on the front side
+    utime_t last_rx_back;   ///< last time we got a ping reply on the back side
     epoch_t epoch;      ///< most recent epoch we wanted this peer
+
+    bool is_healthy(utime_t cutoff) {
+      return
+       (last_rx_front > cutoff ||
+        (last_rx_front == utime_t() && (last_tx == utime_t() ||
+                                        first_tx > cutoff))) &&
+       (last_rx_back > cutoff ||
+        (last_rx_back == utime_t() && (last_tx == utime_t() ||
+                                       first_tx > cutoff)));
+    }
   };
   /// state attached to outgoing heartbeat connections
   struct HeartbeatSession : public RefCountedObject {