]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: add KEEPALIVE2 feature
authorSage Weil <sage@inktank.com>
Fri, 28 Mar 2014 04:09:13 +0000 (21:09 -0700)
committerSage Weil <sage@inktank.com>
Fri, 28 Mar 2014 23:49:49 +0000 (16:49 -0700)
This is similar to KEEPALIVE, except a timestamp is also exchanged.  It is
sent with the KEEPALIVE, and then returned with the ACK.  The last
received stamp is stored in the Connection so that it can be queried for
liveness.  Since all of the users of keepalive are already regularly
triggering a keepalive, they can check the liveness at the same time.

See #7888.

Signed-off-by: Sage Weil <sage@inktank.com>
(cherry picked from commit d747d79fd5ea8662a809c5636dfd2eaaa9bf8f5d)

Conflicts:

src/include/ceph_features.h

src/include/ceph_features.h
src/include/msgr.h
src/msg/Message.h
src/msg/Pipe.cc
src/msg/Pipe.h

index 362a459bde6831f3e25760e28966c0a73a1f8f00..0ab208557b74c48cafdda4674b28f201ec1b6dd3 100644 (file)
@@ -39,6 +39,7 @@
 #define CEPH_FEATURE_OSD_SNAPMAPPER (1ULL<<32)
 #define CEPH_FEATURE_MON_SCRUB      (1ULL<<33)
 #define CEPH_FEATURE_OSD_PACKED_RECOVERY (1ULL<<34)
+#define CEPH_FEATURE_MSGR_KEEPALIVE2   (1ULL<<42)
 
 /*
  * The introduction of CEPH_FEATURE_OSD_SNAPMAPPER caused the feature
@@ -101,6 +102,7 @@ static inline unsigned long long ceph_sanitize_features(unsigned long long f) {
         CEPH_FEATURE_OSD_SNAPMAPPER |      \
         CEPH_FEATURE_MON_SCRUB |           \
         CEPH_FEATURE_OSD_PACKED_RECOVERY | \
+        CEPH_FEATURE_MSGR_KEEPALIVE2 | \
         0ULL)
 
 #define CEPH_FEATURES_SUPPORTED_DEFAULT  CEPH_FEATURES_ALL
index 585468270ddcc90099255a35585ba2b3f237b8ad..a1ad993ac8bef045e8ef43be46abb6ce0e99fae0 100644 (file)
@@ -90,6 +90,8 @@ struct ceph_entity_inst {
 #define CEPH_MSGR_TAG_BADAUTHORIZER 11 /* bad authorizer */
 #define CEPH_MSGR_TAG_FEATURES      12 /* insufficient features */
 #define CEPH_MSGR_TAG_SEQ           13 /* 64-bit int follows with seen seq number */
+#define CEPH_MSGR_TAG_KEEPALIVE2     14
+#define CEPH_MSGR_TAG_KEEPALIVE2_ACK 15  /* keepalive reply */
 
 
 /*
index 3ed8ee667d242c07b00e50450901f39c36429b7c..696d3bdba827744385ec797b3291b2e4ffdafa75 100644 (file)
@@ -172,6 +172,7 @@ struct Connection : private RefCountedObject {
   RefCountedObject *priv;
   int peer_type;
   entity_addr_t peer_addr;
+  utime_t last_keepalive_ack;
 private:
   uint64_t features;
 public:
@@ -288,6 +289,10 @@ public:
     Mutex::Locker l(lock);
     rx_buffers.erase(tid);
   }
+
+  utime_t get_last_keepalive_ack() const {
+    return last_keepalive_ack;
+  }
 };
 typedef boost::intrusive_ptr<Connection> ConnectionRef;
 
index 994a4cc13ca0d4989db5c6ef509fb1da142b198c..9f11bc4220a36bbad21a7057d7004559fb980f0f 100644 (file)
@@ -69,6 +69,7 @@ Pipe::Pipe(SimpleMessenger *r, int st, Connection *con)
     writer_running(false),
     in_q(&(r->dispatch_queue)),
     send_keepalive(false),
+    send_keepalive_ack(false),
     close_on_empty(false),
     connect_seq(0), peer_global_seq(0),
     out_seq(0), in_seq(0), in_seq_acked(0) {
@@ -1379,6 +1380,37 @@ void Pipe::reader()
       pipe_lock.Lock();
       continue;
     }
+    if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
+      ldout(msgr->cct,30) << "reader got KEEPALIVE2 tag ..." << dendl;
+      ceph_timespec t;
+      int rc = tcp_read((char*)&t, sizeof(t));
+      pipe_lock.Lock();
+      if (rc < 0) {
+       ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp "
+                          << cpp_strerror(errno) << dendl;
+       fault(true);
+      } else {
+       send_keepalive_ack = true;
+       keepalive_ack_stamp = utime_t(t);
+       ldout(msgr->cct,20) << "reader got KEEPALIVE2 " << keepalive_ack_stamp
+                           << dendl;
+       cond.Signal();
+      }
+      continue;
+    }
+    if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
+      ldout(msgr->cct,20) << "reader got KEEPALIVE_ACK" << dendl;
+      struct ceph_timespec t;
+      int rc = tcp_read((char*)&t, sizeof(t));
+      pipe_lock.Lock();
+      if (rc < 0) {
+       ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp " << cpp_strerror(errno) << dendl;
+       fault(true);
+      } else {
+       connection_state->last_keepalive_ack = utime_t(t);
+      }
+      continue;
+    }
 
     // open ...
     if (tag == CEPH_MSGR_TAG_ACK) {
@@ -1529,16 +1561,36 @@ void Pipe::writer()
 
       // keepalive?
       if (send_keepalive) {
-       pipe_lock.Unlock();
-       int rc = write_keepalive();
+       int rc;
+       if (connection_state->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
+         pipe_lock.Unlock();
+         rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2,
+                               ceph_clock_now(msgr->cct));
+       } else {
+         pipe_lock.Unlock();
+         rc = write_keepalive();
+       }
        pipe_lock.Lock();
        if (rc < 0) {
-         ldout(msgr->cct,2) << "writer couldn't write keepalive, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
+         ldout(msgr->cct,2) << "writer couldn't write keepalive[2], "
+                            << strerror_r(errno, buf, sizeof(buf)) << dendl;
          fault();
          continue;
        }
        send_keepalive = false;
       }
+      if (send_keepalive_ack) {
+       utime_t t = keepalive_ack_stamp;
+       pipe_lock.Unlock();
+       int rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2_ACK, t);
+       pipe_lock.Lock();
+       if (rc < 0) {
+         ldout(msgr->cct,2) << "writer couldn't write keepalive_ack, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
+         fault();
+         continue;
+       }
+       send_keepalive_ack = false;
+      }
 
       // send ack?
       if (in_seq > in_seq_acked) {
@@ -1998,6 +2050,26 @@ int Pipe::write_keepalive()
   return 0;
 }
 
+int Pipe::write_keepalive2(char tag, const utime_t& t)
+{
+  ldout(msgr->cct,10) << "write_keepalive2 " << (int)tag << " " << t << dendl;
+  struct ceph_timespec ts;
+  t.encode_timeval(&ts);
+  struct msghdr msg;
+  memset(&msg, 0, sizeof(msg));
+  struct iovec msgvec[2];
+  msgvec[0].iov_base = &tag;
+  msgvec[0].iov_len = 1;
+  msgvec[1].iov_base = &ts;
+  msgvec[1].iov_len = sizeof(ts);
+  msg.msg_iov = msgvec;
+  msg.msg_iovlen = 2;
+
+  if (do_sendmsg(&msg, 1 + sizeof(ts)) < 0)
+    return -1;
+  return 0;
+}
+
 
 int Pipe::write_message(ceph_msg_header& header, ceph_msg_footer& footer, bufferlist& blist)
 {
index d2f9c49257350fc493bd0be5d70d5a5b42effefa..d8d2402707f32b8a29112594d45a8c4afa354ab6 100644 (file)
@@ -162,6 +162,8 @@ class DispatchQueue;
     list<Message*> sent;
     Cond cond;
     bool send_keepalive;
+    bool send_keepalive_ack;
+    utime_t keepalive_ack_stamp;
     bool halt_delivery; //if a pipe's queue is destroyed, stop adding to it
     bool close_on_empty;
     
@@ -195,6 +197,7 @@ class DispatchQueue;
     int do_sendmsg(struct msghdr *msg, int len, bool more=false);
     int write_ack(uint64_t s);
     int write_keepalive();
+    int write_keepalive2(char tag, const utime_t &t);
 
     void fault(bool reader=false);
 
@@ -218,7 +221,7 @@ class DispatchQueue;
 
     __u32 get_out_seq() { return out_seq; }
 
-    bool is_queued() { return !out_q.empty() || send_keepalive; }
+    bool is_queued() { return !out_q.empty() || send_keepalive || send_keepalive_ack; }
 
     entity_addr_t& get_peer_addr() { return peer_addr; }