From: Avner BenHanoch Date: Wed, 20 Apr 2016 10:42:12 +0000 (+0300) Subject: xio: support send and receive of keepalives and their acks X-Git-Tag: v11.0.0~444^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=fca1d8613cf565de454246c72413e9fd6b935c00;p=ceph.git xio: support send and receive of keepalives and their acks xio will now support the following additional tags: + CEPH_MSGR_TAG_KEEPALIVE + CEPH_MSGR_TAG_KEEPALIVE2 + CEPH_MSGR_TAG_KEEPALIVE2_ACK Signed-off-by: Avner BenHanoch --- diff --git a/src/msg/xio/XioConnection.cc b/src/msg/xio/XioConnection.cc index 460196d30b6f..0da5dbcabeb3 100644 --- a/src/msg/xio/XioConnection.cc +++ b/src/msg/xio/XioConnection.cc @@ -149,6 +149,66 @@ int XioConnection::send_message(Message *m) return ms->_send_message(m, this); } +void XioConnection::send_keepalive_or_ack(bool ack, const utime_t *tp) +{ + /* If con is not in READY state, we need to queue the request */ + if (cstate.session_state.read() != XioConnection::UP) { + pthread_spin_lock(&sp); + if (cstate.session_state.read() != XioConnection::UP) { + if (ack) { + outgoing.ack = true; + outgoing.ack_time = *tp; + } + else { + outgoing.keepalive = true; + } + pthread_spin_unlock(&sp); + return; + } + pthread_spin_unlock(&sp); + } + + send_keepalive_or_ack_internal(ack, tp); +} + +void XioConnection::send_keepalive_or_ack_internal(bool ack, const utime_t *tp) +{ + XioCommand *xcmd = pool_alloc_xio_command(this); + if (! xcmd) { + /* could happen if Accelio has been shutdown */ + return; + } + + struct ceph_timespec ts; + if (ack) { + assert(tp); + tp->encode_timeval(&ts); + xcmd->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE2_ACK); + xcmd->get_bl_ref().append((char*)&ts, sizeof(ts)); + } else if (has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) { + utime_t t = ceph_clock_now(msgr->cct); + t.encode_timeval(&ts); + xcmd->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE2); + xcmd->get_bl_ref().append((char*)&ts, sizeof(ts)); + } else { + xcmd->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE); + } + + const std::list& header = xcmd->get_bl_ref().buffers(); + assert(header.size() == 1); /* accelio header must be without scatter gather */ + list::const_iterator pb = header.begin(); + assert(pb->length() < XioMsgHdr::get_max_encoded_length()); + struct xio_msg * msg = xcmd->get_xio_msg(); + msg->out.header.iov_base = (char*) pb->c_str(); + msg->out.header.iov_len = pb->length(); + + ldout(msgr->cct,8) << __func__ << " sending command with tag " << (int)(*(char*)msg->out.header.iov_base) + << " len " << msg->out.header.iov_len << dendl; + + portal->enqueue_for_send(this, xcmd); +} + + int XioConnection::passive_setup() { /* XXX passive setup is a placeholder for (potentially active-side @@ -435,15 +495,53 @@ int XioConnection::on_msg(struct xio_session *session, ldout(msgr->cct,8) << __func__ << " receive msg with iov_len " << (int) msg->in.header.iov_len << " tag " << (int)tag << dendl; + //header_len_without_tag is only meaningful in case we have tag + size_t header_len_without_tag = msg->in.header.iov_len - sizeof(tag); + switch(tag) { case CEPH_MSGR_TAG_MSG: ldout(msgr->cct, 20) << __func__ << " got data message" << dendl; return handle_data_msg(session, msg, more_in_batch, cb_user_context); + case CEPH_MSGR_TAG_KEEPALIVE: + ldout(msgr->cct, 20) << __func__ << " got KEEPALIVE" << dendl; + set_last_keepalive(ceph_clock_now(nullptr)); + break; + + case CEPH_MSGR_TAG_KEEPALIVE2: + if (header_len_without_tag < sizeof(ceph_timespec)) { + lderr(msgr->cct) << __func__ << " too few data for KEEPALIVE2: got " << header_len_without_tag << + " bytes instead of " << sizeof(ceph_timespec) << " bytes" << dendl; + } + else { + ceph_timespec *t = (ceph_timespec *) ((char*)msg->in.header.iov_base + sizeof(tag)); + utime_t kp_t = utime_t(*t); + ldout(msgr->cct, 20) << __func__ << " got KEEPALIVE2 with timestamp" << kp_t << dendl; + send_keepalive_or_ack(true, &kp_t); + set_last_keepalive(ceph_clock_now(nullptr)); + } + + break; + + case CEPH_MSGR_TAG_KEEPALIVE2_ACK: + if (header_len_without_tag < sizeof(ceph_timespec)) { + lderr(msgr->cct) << __func__ << " too few data for KEEPALIVE2_ACK: got " << header_len_without_tag << + " bytes instead of " << sizeof(ceph_timespec) << " bytes" << dendl; + } + else { + ceph_timespec *t = (ceph_timespec *) ((char*)msg->in.header.iov_base + sizeof(tag)); + utime_t kp_t(*t); + ldout(msgr->cct, 20) << __func__ << " got KEEPALIVE2_ACK with timestamp" << kp_t << dendl; + set_last_keepalive_ack(kp_t); + } + break; + default: - assert(! "unrecognized message tag"); + lderr(msgr->cct) << __func__ << " unsupported message tag " << (int) tag << dendl; + assert(! "unsupported message tag"); } + xio_release_msg(msg); return 0; } @@ -511,6 +609,16 @@ int XioConnection::flush_out_queues(uint32_t flags) { if (! (flags & CState::OP_FLAG_LOCKED)) pthread_spin_lock(&sp); + if (outgoing.keepalive) { + outgoing.keepalive = false; + send_keepalive_or_ack_internal(); + } + + if (outgoing.ack) { + outgoing.ack = false; + send_keepalive_or_ack_internal(true, &outgoing.ack_time); + } + // send deferred 1 (direct backpresssure) if (outgoing.requeue.size() > 0) portal->requeue(this, outgoing.requeue); @@ -546,6 +654,8 @@ int XioConnection::discard_input_queue(uint32_t flags) XioSubmit::Queue::const_iterator i2 = deferred_q.end(); deferred_q.splice(i2, outgoing.requeue); + outgoing.keepalive = outgoing.ack = false; + if (! (flags & CState::OP_FLAG_LOCKED)) pthread_spin_unlock(&sp); @@ -736,3 +846,10 @@ int XioLoopbackConnection::send_message(Message *m) ms->ds_dispatch(m); return 0; } + +void XioLoopbackConnection::send_keepalive() +{ + utime_t t = ceph_clock_now(nullptr); + set_last_keepalive(t); + set_last_keepalive_ack(t); +} diff --git a/src/msg/xio/XioConnection.h b/src/msg/xio/XioConnection.h index 0b7f46166d00..4ea9a703c257 100644 --- a/src/msg/xio/XioConnection.h +++ b/src/msg/xio/XioConnection.h @@ -28,8 +28,8 @@ extern "C" { #include "include/atomic.h" #include "auth/AuthSessionHandler.h" -#define XIO_ALL_FEATURES (CEPH_FEATURES_ALL & \ - ~CEPH_FEATURE_MSGR_KEEPALIVE2) +#define XIO_ALL_FEATURES (CEPH_FEATURES_ALL) + #define XIO_NOP_TAG_MARKDOWN 0x0001 @@ -195,8 +195,13 @@ private: // message submission queue struct SendQ { + bool keepalive; + bool ack; + utime_t ack_time; Message::Queue mqueue; // deferred XioSubmit::Queue requeue; + + SendQ():keepalive(false), ack(false){} } outgoing; // conns_entity_map comparison functor @@ -271,7 +276,8 @@ public: bool is_connected() override { return connected.read(); } int send_message(Message *m) override; - void send_keepalive() override {} + void send_keepalive() override {send_keepalive_or_ack();} + void send_keepalive_or_ack(bool ack = false, const utime_t *tp = nullptr); void mark_down() override; int _mark_down(uint32_t flags); void mark_disposable() override; @@ -324,6 +330,8 @@ public: int flush_out_queues(uint32_t flags); int discard_input_queue(uint32_t flags); int adjust_clru(uint32_t flags); +private: + void send_keepalive_or_ack_internal(bool ack = false, const utime_t *tp = nullptr); }; typedef boost::intrusive_ptr XioConnectionRef; @@ -348,7 +356,7 @@ public: bool is_connected() override { return true; } int send_message(Message *m) override; - void send_keepalive() override {} + void send_keepalive() override; void mark_down() override {} void mark_disposable() override {} diff --git a/src/msg/xio/XioMessenger.cc b/src/msg/xio/XioMessenger.cc index e354458ae2cc..e3519e686771 100644 --- a/src/msg/xio/XioMessenger.cc +++ b/src/msg/xio/XioMessenger.cc @@ -790,6 +790,18 @@ static inline XioMsg* pool_alloc_xio_msg(Message *m, XioConnection *xcon, return xmsg; } +XioCommand* pool_alloc_xio_command(XioConnection *xcon) +{ + struct xio_reg_mem mp_mem; + int e = xpool_alloc(xio_msgr_noreg_mpool, sizeof(XioCommand), &mp_mem); + if (!!e) + return NULL; + XioCommand *xcmd = reinterpret_cast(mp_mem.addr); + assert(!!xcmd); + new (xcmd) XioCommand(xcon, mp_mem); + return xcmd; +} + int XioMessenger::_send_message(Message *m, Connection *con) { if (con == loop_con.get() /* intrusive_ptr get() */) { diff --git a/src/msg/xio/XioMessenger.h b/src/msg/xio/XioMessenger.h index a26796222579..38390c408fdd 100644 --- a/src/msg/xio/XioMessenger.h +++ b/src/msg/xio/XioMessenger.h @@ -130,12 +130,6 @@ public: virtual ConnectionRef get_loopback_connection(); - virtual int send_keepalive(const entity_inst_t& dest) - { return EINVAL; } - - virtual int send_keepalive(Connection *con) - { return EINVAL; } - virtual void mark_down(const entity_addr_t& a); virtual void mark_down(Connection *con); virtual void mark_down_all(); @@ -165,4 +159,7 @@ public: uint64_t local_features; }; +XioCommand* pool_alloc_xio_command(XioConnection *xcon); + + #endif /* XIO_MESSENGER_H */ diff --git a/src/msg/xio/XioMsg.h b/src/msg/xio/XioMsg.h index 2f44b6f80fa9..e7c3a3942dac 100644 --- a/src/msg/xio/XioMsg.h +++ b/src/msg/xio/XioMsg.h @@ -189,7 +189,7 @@ public: struct xio_msg * get_xio_msg() {return &req_0.msg;} virtual size_t get_msg_count() const {return 1;} - XioSend(XioConnection *_xcon, struct xio_reg_mem& _mp, int _ex_cnt) : + XioSend(XioConnection *_xcon, struct xio_reg_mem& _mp, int _ex_cnt=0) : XioSubmit(XioSubmit::OUTGOING_MSG, _xcon), req_0(this), mp_this(_mp), nrefs(_ex_cnt+1) { @@ -227,6 +227,18 @@ private: atomic_t nrefs; }; +class XioCommand : public XioSend +{ +public: + XioCommand(XioConnection *_xcon, struct xio_reg_mem& _mp):XioSend(_xcon, _mp) { + } + + buffer::list& get_bl_ref() { return bl; }; + +private: + buffer::list bl; +}; + struct XioMsg : public XioSend { public: