From 8723218379e80725a449b0594a4b15eb1c236b05 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 27 Mar 2014 21:09:13 -0700 Subject: [PATCH] msgr: add KEEPALIVE2 feature This is similar to KEEPALIVE, except a timestamp is also exchanged. It is sent with the KEEPALIVE, and then returned with the ACK. The last received stamp is stored in the Connection so that it can be queried for liveness. Since all of the users of keepalive are already regularly triggering a keepalive, they can check the liveness at the same time. See #7888. Signed-off-by: Sage Weil (cherry picked from commit d747d79fd5ea8662a809c5636dfd2eaaa9bf8f5d) Conflicts: src/include/ceph_features.h --- src/include/ceph_features.h | 2 + src/include/msgr.h | 2 + src/msg/Message.h | 5 +++ src/msg/Pipe.cc | 78 +++++++++++++++++++++++++++++++++++-- src/msg/Pipe.h | 5 ++- 5 files changed, 88 insertions(+), 4 deletions(-) diff --git a/src/include/ceph_features.h b/src/include/ceph_features.h index 362a459bde683..0ab208557b74c 100644 --- a/src/include/ceph_features.h +++ b/src/include/ceph_features.h @@ -39,6 +39,7 @@ #define CEPH_FEATURE_OSD_SNAPMAPPER (1ULL<<32) #define CEPH_FEATURE_MON_SCRUB (1ULL<<33) #define CEPH_FEATURE_OSD_PACKED_RECOVERY (1ULL<<34) +#define CEPH_FEATURE_MSGR_KEEPALIVE2 (1ULL<<42) /* * The introduction of CEPH_FEATURE_OSD_SNAPMAPPER caused the feature @@ -101,6 +102,7 @@ static inline unsigned long long ceph_sanitize_features(unsigned long long f) { CEPH_FEATURE_OSD_SNAPMAPPER | \ CEPH_FEATURE_MON_SCRUB | \ CEPH_FEATURE_OSD_PACKED_RECOVERY | \ + CEPH_FEATURE_MSGR_KEEPALIVE2 | \ 0ULL) #define CEPH_FEATURES_SUPPORTED_DEFAULT CEPH_FEATURES_ALL diff --git a/src/include/msgr.h b/src/include/msgr.h index 585468270ddcc..a1ad993ac8bef 100644 --- a/src/include/msgr.h +++ b/src/include/msgr.h @@ -90,6 +90,8 @@ struct ceph_entity_inst { #define CEPH_MSGR_TAG_BADAUTHORIZER 11 /* bad authorizer */ #define CEPH_MSGR_TAG_FEATURES 12 /* insufficient features */ #define CEPH_MSGR_TAG_SEQ 13 /* 64-bit int follows with seen seq number */ +#define CEPH_MSGR_TAG_KEEPALIVE2 14 +#define CEPH_MSGR_TAG_KEEPALIVE2_ACK 15 /* keepalive reply */ /* diff --git a/src/msg/Message.h b/src/msg/Message.h index 3ed8ee667d242..696d3bdba8277 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -172,6 +172,7 @@ struct Connection : private RefCountedObject { RefCountedObject *priv; int peer_type; entity_addr_t peer_addr; + utime_t last_keepalive_ack; private: uint64_t features; public: @@ -288,6 +289,10 @@ public: Mutex::Locker l(lock); rx_buffers.erase(tid); } + + utime_t get_last_keepalive_ack() const { + return last_keepalive_ack; + } }; typedef boost::intrusive_ptr ConnectionRef; diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc index 994a4cc13ca0d..9f11bc4220a36 100644 --- a/src/msg/Pipe.cc +++ b/src/msg/Pipe.cc @@ -69,6 +69,7 @@ Pipe::Pipe(SimpleMessenger *r, int st, Connection *con) writer_running(false), in_q(&(r->dispatch_queue)), send_keepalive(false), + send_keepalive_ack(false), close_on_empty(false), connect_seq(0), peer_global_seq(0), out_seq(0), in_seq(0), in_seq_acked(0) { @@ -1379,6 +1380,37 @@ void Pipe::reader() pipe_lock.Lock(); continue; } + if (tag == CEPH_MSGR_TAG_KEEPALIVE2) { + ldout(msgr->cct,30) << "reader got KEEPALIVE2 tag ..." << dendl; + ceph_timespec t; + int rc = tcp_read((char*)&t, sizeof(t)); + pipe_lock.Lock(); + if (rc < 0) { + ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp " + << cpp_strerror(errno) << dendl; + fault(true); + } else { + send_keepalive_ack = true; + keepalive_ack_stamp = utime_t(t); + ldout(msgr->cct,20) << "reader got KEEPALIVE2 " << keepalive_ack_stamp + << dendl; + cond.Signal(); + } + continue; + } + if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) { + ldout(msgr->cct,20) << "reader got KEEPALIVE_ACK" << dendl; + struct ceph_timespec t; + int rc = tcp_read((char*)&t, sizeof(t)); + pipe_lock.Lock(); + if (rc < 0) { + ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp " << cpp_strerror(errno) << dendl; + fault(true); + } else { + connection_state->last_keepalive_ack = utime_t(t); + } + continue; + } // open ... if (tag == CEPH_MSGR_TAG_ACK) { @@ -1529,16 +1561,36 @@ void Pipe::writer() // keepalive? if (send_keepalive) { - pipe_lock.Unlock(); - int rc = write_keepalive(); + int rc; + if (connection_state->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) { + pipe_lock.Unlock(); + rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2, + ceph_clock_now(msgr->cct)); + } else { + pipe_lock.Unlock(); + rc = write_keepalive(); + } pipe_lock.Lock(); if (rc < 0) { - ldout(msgr->cct,2) << "writer couldn't write keepalive, " << strerror_r(errno, buf, sizeof(buf)) << dendl; + ldout(msgr->cct,2) << "writer couldn't write keepalive[2], " + << strerror_r(errno, buf, sizeof(buf)) << dendl; fault(); continue; } send_keepalive = false; } + if (send_keepalive_ack) { + utime_t t = keepalive_ack_stamp; + pipe_lock.Unlock(); + int rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2_ACK, t); + pipe_lock.Lock(); + if (rc < 0) { + ldout(msgr->cct,2) << "writer couldn't write keepalive_ack, " << strerror_r(errno, buf, sizeof(buf)) << dendl; + fault(); + continue; + } + send_keepalive_ack = false; + } // send ack? if (in_seq > in_seq_acked) { @@ -1998,6 +2050,26 @@ int Pipe::write_keepalive() return 0; } +int Pipe::write_keepalive2(char tag, const utime_t& t) +{ + ldout(msgr->cct,10) << "write_keepalive2 " << (int)tag << " " << t << dendl; + struct ceph_timespec ts; + t.encode_timeval(&ts); + struct msghdr msg; + memset(&msg, 0, sizeof(msg)); + struct iovec msgvec[2]; + msgvec[0].iov_base = &tag; + msgvec[0].iov_len = 1; + msgvec[1].iov_base = &ts; + msgvec[1].iov_len = sizeof(ts); + msg.msg_iov = msgvec; + msg.msg_iovlen = 2; + + if (do_sendmsg(&msg, 1 + sizeof(ts)) < 0) + return -1; + return 0; +} + int Pipe::write_message(ceph_msg_header& header, ceph_msg_footer& footer, bufferlist& blist) { diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h index d2f9c49257350..d8d2402707f32 100644 --- a/src/msg/Pipe.h +++ b/src/msg/Pipe.h @@ -162,6 +162,8 @@ class DispatchQueue; list sent; Cond cond; bool send_keepalive; + bool send_keepalive_ack; + utime_t keepalive_ack_stamp; bool halt_delivery; //if a pipe's queue is destroyed, stop adding to it bool close_on_empty; @@ -195,6 +197,7 @@ class DispatchQueue; int do_sendmsg(struct msghdr *msg, int len, bool more=false); int write_ack(uint64_t s); int write_keepalive(); + int write_keepalive2(char tag, const utime_t &t); void fault(bool reader=false); @@ -218,7 +221,7 @@ class DispatchQueue; __u32 get_out_seq() { return out_seq; } - bool is_queued() { return !out_q.empty() || send_keepalive; } + bool is_queued() { return !out_q.empty() || send_keepalive || send_keepalive_ack; } entity_addr_t& get_peer_addr() { return peer_addr; } -- 2.39.5