]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: add KEEPALIVE2 feature
authorSage Weil <sage@inktank.com>
Fri, 28 Mar 2014 04:09:13 +0000 (21:09 -0700)
committerSage Weil <sage@inktank.com>
Fri, 28 Mar 2014 23:09:26 +0000 (16:09 -0700)
This is similar to KEEPALIVE, except a timestamp is also exchanged.  It is
sent with the KEEPALIVE, and then returned with the ACK.  The last
received stamp is stored in the Connection so that it can be queried for
liveness.  Since all of the users of keepalive are already regularly
triggering a keepalive, they can check the liveness at the same time.

See #7888.

Signed-off-by: Sage Weil <sage@inktank.com>
src/include/ceph_features.h
src/include/msgr.h
src/msg/Message.h
src/msg/Pipe.cc
src/msg/Pipe.h

index f2b8a851a45d8d5c0e61138dfdf1e781f7edb3a6..d6c7d6f5f2ff9893d3150c1f52794b2ec7307d53 100644 (file)
@@ -50,6 +50,7 @@
 #define CEPH_FEATURE_MDS_INLINE_DATA     (1ULL<<40)
 #define CEPH_FEATURE_CRUSH_TUNABLES3     (1ULL<<41)
 #define CEPH_FEATURE_OSD_PRIMARY_AFFINITY (1ULL<<41)  /* overlap w/ tunables3 */
+#define CEPH_FEATURE_MSGR_KEEPALIVE2   (1ULL<<42)
 
 /*
  * The introduction of CEPH_FEATURE_OSD_SNAPMAPPER caused the feature
@@ -120,6 +121,7 @@ static inline unsigned long long ceph_sanitize_features(unsigned long long f) {
         CEPH_FEATURE_MDS_INLINE_DATA |     \
         CEPH_FEATURE_CRUSH_TUNABLES3 |     \
         CEPH_FEATURE_OSD_PRIMARY_AFFINITY |    \
+        CEPH_FEATURE_MSGR_KEEPALIVE2 | \
         0ULL)
 
 #define CEPH_FEATURES_SUPPORTED_DEFAULT  CEPH_FEATURES_ALL
index 585468270ddcc90099255a35585ba2b3f237b8ad..a1ad993ac8bef045e8ef43be46abb6ce0e99fae0 100644 (file)
@@ -90,6 +90,8 @@ struct ceph_entity_inst {
 #define CEPH_MSGR_TAG_BADAUTHORIZER 11 /* bad authorizer */
 #define CEPH_MSGR_TAG_FEATURES      12 /* insufficient features */
 #define CEPH_MSGR_TAG_SEQ           13 /* 64-bit int follows with seen seq number */
+#define CEPH_MSGR_TAG_KEEPALIVE2     14
+#define CEPH_MSGR_TAG_KEEPALIVE2_ACK 15  /* keepalive reply */
 
 
 /*
index 3a833726ac3222d382f143dac676011002155482..a69944f839c49fc86b4ac288e0086a76ba46e048 100644 (file)
@@ -177,6 +177,7 @@ struct Connection : private RefCountedObject {
   RefCountedObject *priv;
   int peer_type;
   entity_addr_t peer_addr;
+  utime_t last_keepalive_ack;
 private:
   uint64_t features;
 public:
@@ -293,6 +294,10 @@ public:
     Mutex::Locker l(lock);
     rx_buffers.erase(tid);
   }
+
+  utime_t get_last_keepalive_ack() const {
+    return last_keepalive_ack;
+  }
 };
 typedef boost::intrusive_ptr<Connection> ConnectionRef;
 
index 5c298b51986b5245b95b357d526b92dfa478d026..3d86789a54be98628b26a7cf95b8c83509470d9d 100644 (file)
@@ -87,6 +87,7 @@ Pipe::Pipe(SimpleMessenger *r, int st, Connection *con)
     writer_running(false),
     in_q(&(r->dispatch_queue)),
     send_keepalive(false),
+    send_keepalive_ack(false),
     close_on_empty(false),
     connect_seq(0), peer_global_seq(0),
     out_seq(0), in_seq(0), in_seq_acked(0) {
@@ -1420,6 +1421,37 @@ void Pipe::reader()
       pipe_lock.Lock();
       continue;
     }
+    if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
+      ldout(msgr->cct,30) << "reader got KEEPALIVE2 tag ..." << dendl;
+      ceph_timespec t;
+      int rc = tcp_read((char*)&t, sizeof(t));
+      pipe_lock.Lock();
+      if (rc < 0) {
+       ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp "
+                          << cpp_strerror(errno) << dendl;
+       fault(true);
+      } else {
+       send_keepalive_ack = true;
+       keepalive_ack_stamp = utime_t(t);
+       ldout(msgr->cct,20) << "reader got KEEPALIVE2 " << keepalive_ack_stamp
+                           << dendl;
+       cond.Signal();
+      }
+      continue;
+    }
+    if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
+      ldout(msgr->cct,20) << "reader got KEEPALIVE_ACK" << dendl;
+      struct ceph_timespec t;
+      int rc = tcp_read((char*)&t, sizeof(t));
+      pipe_lock.Lock();
+      if (rc < 0) {
+       ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp " << cpp_strerror(errno) << dendl;
+       fault(true);
+      } else {
+       connection_state->last_keepalive_ack = utime_t(t);
+      }
+      continue;
+    }
 
     // open ...
     if (tag == CEPH_MSGR_TAG_ACK) {
@@ -1570,16 +1602,36 @@ void Pipe::writer()
 
       // keepalive?
       if (send_keepalive) {
-       pipe_lock.Unlock();
-       int rc = write_keepalive();
+       int rc;
+       if (connection_state->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
+         pipe_lock.Unlock();
+         rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2,
+                               ceph_clock_now(msgr->cct));
+       } else {
+         pipe_lock.Unlock();
+         rc = write_keepalive();
+       }
        pipe_lock.Lock();
        if (rc < 0) {
-         ldout(msgr->cct,2) << "writer couldn't write keepalive, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
+         ldout(msgr->cct,2) << "writer couldn't write keepalive[2], "
+                            << strerror_r(errno, buf, sizeof(buf)) << dendl;
          fault();
          continue;
        }
        send_keepalive = false;
       }
+      if (send_keepalive_ack) {
+       utime_t t = keepalive_ack_stamp;
+       pipe_lock.Unlock();
+       int rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2_ACK, t);
+       pipe_lock.Lock();
+       if (rc < 0) {
+         ldout(msgr->cct,2) << "writer couldn't write keepalive_ack, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
+         fault();
+         continue;
+       }
+       send_keepalive_ack = false;
+      }
 
       // send ack?
       if (in_seq > in_seq_acked) {
@@ -2039,6 +2091,26 @@ int Pipe::write_keepalive()
   return 0;
 }
 
+int Pipe::write_keepalive2(char tag, const utime_t& t)
+{
+  ldout(msgr->cct,10) << "write_keepalive2 " << (int)tag << " " << t << dendl;
+  struct ceph_timespec ts;
+  t.encode_timeval(&ts);
+  struct msghdr msg;
+  memset(&msg, 0, sizeof(msg));
+  struct iovec msgvec[2];
+  msgvec[0].iov_base = &tag;
+  msgvec[0].iov_len = 1;
+  msgvec[1].iov_base = &ts;
+  msgvec[1].iov_len = sizeof(ts);
+  msg.msg_iov = msgvec;
+  msg.msg_iovlen = 2;
+
+  if (do_sendmsg(&msg, 1 + sizeof(ts)) < 0)
+    return -1;
+  return 0;
+}
+
 
 int Pipe::write_message(ceph_msg_header& header, ceph_msg_footer& footer, bufferlist& blist)
 {
index 6e6a363640129a9fb189dd98ff7d5f8f3c2802a4..29d7958ecf80d71ceaeb5bdde5b35dfc627e5164 100644 (file)
@@ -162,6 +162,8 @@ class DispatchQueue;
     list<Message*> sent;
     Cond cond;
     bool send_keepalive;
+    bool send_keepalive_ack;
+    utime_t keepalive_ack_stamp;
     bool halt_delivery; //if a pipe's queue is destroyed, stop adding to it
     bool close_on_empty;
     
@@ -195,6 +197,7 @@ class DispatchQueue;
     int do_sendmsg(struct msghdr *msg, int len, bool more=false);
     int write_ack(uint64_t s);
     int write_keepalive();
+    int write_keepalive2(char tag, const utime_t &t);
 
     void fault(bool reader=false);
 
@@ -218,7 +221,7 @@ class DispatchQueue;
 
     __u32 get_out_seq() { return out_seq; }
 
-    bool is_queued() { return !out_q.empty() || send_keepalive; }
+    bool is_queued() { return !out_q.empty() || send_keepalive || send_keepalive_ack; }
 
     entity_addr_t& get_peer_addr() { return peer_addr; }