From d747d79fd5ea8662a809c5636dfd2eaaa9bf8f5d Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 27 Mar 2014 21:09:13 -0700 Subject: [PATCH] msgr: add KEEPALIVE2 feature This is similar to KEEPALIVE, except a timestamp is also exchanged. It is sent with the KEEPALIVE, and then returned with the ACK. The last received stamp is stored in the Connection so that it can be queried for liveness. Since all of the users of keepalive are already regularly triggering a keepalive, they can check the liveness at the same time. See #7888. Signed-off-by: Sage Weil --- src/include/ceph_features.h | 2 + src/include/msgr.h | 2 + src/msg/Message.h | 5 +++ src/msg/Pipe.cc | 78 +++++++++++++++++++++++++++++++++++-- src/msg/Pipe.h | 5 ++- 5 files changed, 88 insertions(+), 4 deletions(-) diff --git a/src/include/ceph_features.h b/src/include/ceph_features.h index f2b8a851a45d8..d6c7d6f5f2ff9 100644 --- a/src/include/ceph_features.h +++ b/src/include/ceph_features.h @@ -50,6 +50,7 @@ #define CEPH_FEATURE_MDS_INLINE_DATA (1ULL<<40) #define CEPH_FEATURE_CRUSH_TUNABLES3 (1ULL<<41) #define CEPH_FEATURE_OSD_PRIMARY_AFFINITY (1ULL<<41) /* overlap w/ tunables3 */ +#define CEPH_FEATURE_MSGR_KEEPALIVE2 (1ULL<<42) /* * The introduction of CEPH_FEATURE_OSD_SNAPMAPPER caused the feature @@ -120,6 +121,7 @@ static inline unsigned long long ceph_sanitize_features(unsigned long long f) { CEPH_FEATURE_MDS_INLINE_DATA | \ CEPH_FEATURE_CRUSH_TUNABLES3 | \ CEPH_FEATURE_OSD_PRIMARY_AFFINITY | \ + CEPH_FEATURE_MSGR_KEEPALIVE2 | \ 0ULL) #define CEPH_FEATURES_SUPPORTED_DEFAULT CEPH_FEATURES_ALL diff --git a/src/include/msgr.h b/src/include/msgr.h index 585468270ddcc..a1ad993ac8bef 100644 --- a/src/include/msgr.h +++ b/src/include/msgr.h @@ -90,6 +90,8 @@ struct ceph_entity_inst { #define CEPH_MSGR_TAG_BADAUTHORIZER 11 /* bad authorizer */ #define CEPH_MSGR_TAG_FEATURES 12 /* insufficient features */ #define CEPH_MSGR_TAG_SEQ 13 /* 64-bit int follows with seen seq number */ +#define CEPH_MSGR_TAG_KEEPALIVE2 14 +#define CEPH_MSGR_TAG_KEEPALIVE2_ACK 15 /* keepalive reply */ /* diff --git a/src/msg/Message.h b/src/msg/Message.h index 3a833726ac322..a69944f839c49 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -177,6 +177,7 @@ struct Connection : private RefCountedObject { RefCountedObject *priv; int peer_type; entity_addr_t peer_addr; + utime_t last_keepalive_ack; private: uint64_t features; public: @@ -293,6 +294,10 @@ public: Mutex::Locker l(lock); rx_buffers.erase(tid); } + + utime_t get_last_keepalive_ack() const { + return last_keepalive_ack; + } }; typedef boost::intrusive_ptr ConnectionRef; diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc index 5c298b51986b5..3d86789a54be9 100644 --- a/src/msg/Pipe.cc +++ b/src/msg/Pipe.cc @@ -87,6 +87,7 @@ Pipe::Pipe(SimpleMessenger *r, int st, Connection *con) writer_running(false), in_q(&(r->dispatch_queue)), send_keepalive(false), + send_keepalive_ack(false), close_on_empty(false), connect_seq(0), peer_global_seq(0), out_seq(0), in_seq(0), in_seq_acked(0) { @@ -1420,6 +1421,37 @@ void Pipe::reader() pipe_lock.Lock(); continue; } + if (tag == CEPH_MSGR_TAG_KEEPALIVE2) { + ldout(msgr->cct,30) << "reader got KEEPALIVE2 tag ..." << dendl; + ceph_timespec t; + int rc = tcp_read((char*)&t, sizeof(t)); + pipe_lock.Lock(); + if (rc < 0) { + ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp " + << cpp_strerror(errno) << dendl; + fault(true); + } else { + send_keepalive_ack = true; + keepalive_ack_stamp = utime_t(t); + ldout(msgr->cct,20) << "reader got KEEPALIVE2 " << keepalive_ack_stamp + << dendl; + cond.Signal(); + } + continue; + } + if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) { + ldout(msgr->cct,20) << "reader got KEEPALIVE_ACK" << dendl; + struct ceph_timespec t; + int rc = tcp_read((char*)&t, sizeof(t)); + pipe_lock.Lock(); + if (rc < 0) { + ldout(msgr->cct,2) << "reader couldn't read KEEPALIVE2 stamp " << cpp_strerror(errno) << dendl; + fault(true); + } else { + connection_state->last_keepalive_ack = utime_t(t); + } + continue; + } // open ... if (tag == CEPH_MSGR_TAG_ACK) { @@ -1570,16 +1602,36 @@ void Pipe::writer() // keepalive? if (send_keepalive) { - pipe_lock.Unlock(); - int rc = write_keepalive(); + int rc; + if (connection_state->has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) { + pipe_lock.Unlock(); + rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2, + ceph_clock_now(msgr->cct)); + } else { + pipe_lock.Unlock(); + rc = write_keepalive(); + } pipe_lock.Lock(); if (rc < 0) { - ldout(msgr->cct,2) << "writer couldn't write keepalive, " << strerror_r(errno, buf, sizeof(buf)) << dendl; + ldout(msgr->cct,2) << "writer couldn't write keepalive[2], " + << strerror_r(errno, buf, sizeof(buf)) << dendl; fault(); continue; } send_keepalive = false; } + if (send_keepalive_ack) { + utime_t t = keepalive_ack_stamp; + pipe_lock.Unlock(); + int rc = write_keepalive2(CEPH_MSGR_TAG_KEEPALIVE2_ACK, t); + pipe_lock.Lock(); + if (rc < 0) { + ldout(msgr->cct,2) << "writer couldn't write keepalive_ack, " << strerror_r(errno, buf, sizeof(buf)) << dendl; + fault(); + continue; + } + send_keepalive_ack = false; + } // send ack? if (in_seq > in_seq_acked) { @@ -2039,6 +2091,26 @@ int Pipe::write_keepalive() return 0; } +int Pipe::write_keepalive2(char tag, const utime_t& t) +{ + ldout(msgr->cct,10) << "write_keepalive2 " << (int)tag << " " << t << dendl; + struct ceph_timespec ts; + t.encode_timeval(&ts); + struct msghdr msg; + memset(&msg, 0, sizeof(msg)); + struct iovec msgvec[2]; + msgvec[0].iov_base = &tag; + msgvec[0].iov_len = 1; + msgvec[1].iov_base = &ts; + msgvec[1].iov_len = sizeof(ts); + msg.msg_iov = msgvec; + msg.msg_iovlen = 2; + + if (do_sendmsg(&msg, 1 + sizeof(ts)) < 0) + return -1; + return 0; +} + int Pipe::write_message(ceph_msg_header& header, ceph_msg_footer& footer, bufferlist& blist) { diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h index 6e6a363640129..29d7958ecf80d 100644 --- a/src/msg/Pipe.h +++ b/src/msg/Pipe.h @@ -162,6 +162,8 @@ class DispatchQueue; list sent; Cond cond; bool send_keepalive; + bool send_keepalive_ack; + utime_t keepalive_ack_stamp; bool halt_delivery; //if a pipe's queue is destroyed, stop adding to it bool close_on_empty; @@ -195,6 +197,7 @@ class DispatchQueue; int do_sendmsg(struct msghdr *msg, int len, bool more=false); int write_ack(uint64_t s); int write_keepalive(); + int write_keepalive2(char tag, const utime_t &t); void fault(bool reader=false); @@ -218,7 +221,7 @@ class DispatchQueue; __u32 get_out_seq() { return out_seq; } - bool is_queued() { return !out_q.empty() || send_keepalive; } + bool is_queued() { return !out_q.empty() || send_keepalive || send_keepalive_ack; } entity_addr_t& get_peer_addr() { return peer_addr; } -- 2.39.5