]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
xio: support send and receive of keepalives and their acks
authorAvner BenHanoch <avnerb@mellanox.com>
Wed, 20 Apr 2016 10:42:12 +0000 (13:42 +0300)
committerAvner BenHanoch <avnerb@mellanox.com>
Wed, 4 May 2016 18:49:34 +0000 (21:49 +0300)
xio will now support the following additional tags:
 + CEPH_MSGR_TAG_KEEPALIVE
 + CEPH_MSGR_TAG_KEEPALIVE2
 + CEPH_MSGR_TAG_KEEPALIVE2_ACK

Signed-off-by: Avner BenHanoch <avnerb@mellanox.com>
src/msg/xio/XioConnection.cc
src/msg/xio/XioConnection.h
src/msg/xio/XioMessenger.cc
src/msg/xio/XioMessenger.h
src/msg/xio/XioMsg.h

index 460196d30b6f64aa917863537274c48eb3aac4c6..0da5dbcabeb35b4cc3c191eb3a77cac0068ad20e 100644 (file)
@@ -149,6 +149,66 @@ int XioConnection::send_message(Message *m)
   return ms->_send_message(m, this);
 }
 
+void XioConnection::send_keepalive_or_ack(bool ack, const utime_t *tp)
+{
+  /* If con is not in READY state, we need to queue the request */
+  if (cstate.session_state.read() != XioConnection::UP) {
+    pthread_spin_lock(&sp);
+    if (cstate.session_state.read() != XioConnection::UP) {
+      if (ack) {
+       outgoing.ack = true;
+       outgoing.ack_time = *tp;
+      }
+      else {
+       outgoing.keepalive = true;
+      }
+      pthread_spin_unlock(&sp);
+      return;
+    }
+    pthread_spin_unlock(&sp);
+  }
+
+  send_keepalive_or_ack_internal(ack, tp);
+}
+
+void XioConnection::send_keepalive_or_ack_internal(bool ack, const utime_t *tp)
+{
+  XioCommand *xcmd = pool_alloc_xio_command(this);
+  if (! xcmd) {
+    /* could happen if Accelio has been shutdown */
+    return;
+  }
+
+  struct ceph_timespec ts;
+  if (ack) {
+    assert(tp);
+    tp->encode_timeval(&ts);
+    xcmd->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE2_ACK);
+    xcmd->get_bl_ref().append((char*)&ts, sizeof(ts));
+  } else if (has_feature(CEPH_FEATURE_MSGR_KEEPALIVE2)) {
+    utime_t t = ceph_clock_now(msgr->cct);
+    t.encode_timeval(&ts);
+    xcmd->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE2);
+    xcmd->get_bl_ref().append((char*)&ts, sizeof(ts));
+  } else {
+    xcmd->get_bl_ref().append(CEPH_MSGR_TAG_KEEPALIVE);
+  }
+
+  const std::list<buffer::ptr>& header = xcmd->get_bl_ref().buffers();
+  assert(header.size() == 1);  /* accelio header must be without scatter gather */
+  list<bufferptr>::const_iterator pb = header.begin();
+  assert(pb->length() < XioMsgHdr::get_max_encoded_length());
+  struct xio_msg * msg = xcmd->get_xio_msg();
+  msg->out.header.iov_base = (char*) pb->c_str();
+  msg->out.header.iov_len = pb->length();
+
+  ldout(msgr->cct,8) << __func__ << " sending command with tag " << (int)(*(char*)msg->out.header.iov_base)
+       << " len " << msg->out.header.iov_len << dendl;
+
+  portal->enqueue_for_send(this, xcmd);
+}
+
+
 int XioConnection::passive_setup()
 {
   /* XXX passive setup is a placeholder for (potentially active-side
@@ -435,15 +495,53 @@ int XioConnection::on_msg(struct xio_session *session,
   ldout(msgr->cct,8) << __func__ << " receive msg with iov_len "
     << (int) msg->in.header.iov_len << " tag " << (int)tag << dendl;
 
+  //header_len_without_tag is only meaningful in case we have tag
+  size_t header_len_without_tag = msg->in.header.iov_len - sizeof(tag);
+
   switch(tag) {
   case CEPH_MSGR_TAG_MSG:
     ldout(msgr->cct, 20) << __func__ << " got data message" << dendl;
     return handle_data_msg(session, msg, more_in_batch, cb_user_context);
 
+  case CEPH_MSGR_TAG_KEEPALIVE:
+    ldout(msgr->cct, 20) << __func__ << " got KEEPALIVE" << dendl;
+    set_last_keepalive(ceph_clock_now(nullptr));
+    break;
+
+  case CEPH_MSGR_TAG_KEEPALIVE2:
+    if (header_len_without_tag < sizeof(ceph_timespec)) {
+      lderr(msgr->cct) << __func__ << " too few data for KEEPALIVE2: got " << header_len_without_tag <<
+         " bytes instead of " << sizeof(ceph_timespec) << " bytes" << dendl;
+    }
+    else {
+      ceph_timespec *t = (ceph_timespec *) ((char*)msg->in.header.iov_base + sizeof(tag));
+      utime_t kp_t = utime_t(*t);
+      ldout(msgr->cct, 20) << __func__ << " got KEEPALIVE2 with timestamp" << kp_t << dendl;
+      send_keepalive_or_ack(true, &kp_t);
+      set_last_keepalive(ceph_clock_now(nullptr));
+    }
+
+    break;
+
+  case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
+    if (header_len_without_tag < sizeof(ceph_timespec)) {
+      lderr(msgr->cct) << __func__ << " too few data for KEEPALIVE2_ACK: got " << header_len_without_tag <<
+         " bytes instead of " << sizeof(ceph_timespec) << " bytes" << dendl;
+    }
+    else {
+      ceph_timespec *t = (ceph_timespec *) ((char*)msg->in.header.iov_base + sizeof(tag));
+      utime_t kp_t(*t);
+      ldout(msgr->cct, 20) << __func__ << " got KEEPALIVE2_ACK with timestamp" << kp_t << dendl;
+      set_last_keepalive_ack(kp_t);
+    }
+    break;
+
   default:
-    assert(! "unrecognized message tag");
+    lderr(msgr->cct) << __func__ << " unsupported message tag " << (int) tag << dendl;
+    assert(! "unsupported message tag");
   }
 
+  xio_release_msg(msg);
   return 0;
 }
 
@@ -511,6 +609,16 @@ int XioConnection::flush_out_queues(uint32_t flags) {
   if (! (flags & CState::OP_FLAG_LOCKED))
     pthread_spin_lock(&sp);
 
+  if (outgoing.keepalive) {
+    outgoing.keepalive = false;
+    send_keepalive_or_ack_internal();
+  }
+
+  if (outgoing.ack) {
+    outgoing.ack = false;
+    send_keepalive_or_ack_internal(true, &outgoing.ack_time);
+  }
+
   // send deferred 1 (direct backpresssure)
   if (outgoing.requeue.size() > 0)
     portal->requeue(this, outgoing.requeue);
@@ -546,6 +654,8 @@ int XioConnection::discard_input_queue(uint32_t flags)
   XioSubmit::Queue::const_iterator i2 = deferred_q.end();
   deferred_q.splice(i2, outgoing.requeue);
 
+  outgoing.keepalive = outgoing.ack = false;
+
   if (! (flags & CState::OP_FLAG_LOCKED))
     pthread_spin_unlock(&sp);
 
@@ -736,3 +846,10 @@ int XioLoopbackConnection::send_message(Message *m)
   ms->ds_dispatch(m);
   return 0;
 }
+
+void XioLoopbackConnection::send_keepalive()
+{
+  utime_t t = ceph_clock_now(nullptr);
+  set_last_keepalive(t);
+  set_last_keepalive_ack(t);
+}
index 0b7f46166d0021b6e04222d92641d9aa5fcf3e56..4ea9a703c257f579bef83adfb83e25d99d17b2a0 100644 (file)
@@ -28,8 +28,8 @@ extern "C" {
 #include "include/atomic.h"
 #include "auth/AuthSessionHandler.h"
 
-#define XIO_ALL_FEATURES (CEPH_FEATURES_ALL & \
-                         ~CEPH_FEATURE_MSGR_KEEPALIVE2)
+#define XIO_ALL_FEATURES (CEPH_FEATURES_ALL)
+
 
 #define XIO_NOP_TAG_MARKDOWN 0x0001
 
@@ -195,8 +195,13 @@ private:
 
   // message submission queue
   struct SendQ {
+    bool keepalive;
+    bool ack;
+    utime_t ack_time;
     Message::Queue mqueue; // deferred
     XioSubmit::Queue requeue;
+
+    SendQ():keepalive(false), ack(false){}
   } outgoing;
 
   // conns_entity_map comparison functor
@@ -271,7 +276,8 @@ public:
   bool is_connected() override { return connected.read(); }
 
   int send_message(Message *m) override;
-  void send_keepalive() override {}
+  void send_keepalive() override {send_keepalive_or_ack();}
+  void send_keepalive_or_ack(bool ack = false, const utime_t *tp = nullptr);
   void mark_down() override;
   int _mark_down(uint32_t flags);
   void mark_disposable() override;
@@ -324,6 +330,8 @@ public:
   int flush_out_queues(uint32_t flags);
   int discard_input_queue(uint32_t flags);
   int adjust_clru(uint32_t flags);
+private:
+  void send_keepalive_or_ack_internal(bool ack = false, const utime_t *tp = nullptr);
 };
 
 typedef boost::intrusive_ptr<XioConnection> XioConnectionRef;
@@ -348,7 +356,7 @@ public:
   bool is_connected() override { return true; }
 
   int send_message(Message *m) override;
-  void send_keepalive() override {}
+  void send_keepalive() override;
   void mark_down() override {}
   void mark_disposable() override {}
 
index e354458ae2ccff3fc6a58b9a29623120ab1878f4..e3519e686771d88015c30b213d0bb3fd8e2dcb77 100644 (file)
@@ -790,6 +790,18 @@ static inline XioMsg* pool_alloc_xio_msg(Message *m, XioConnection *xcon,
   return xmsg;
 }
 
+XioCommand* pool_alloc_xio_command(XioConnection *xcon)
+{
+  struct xio_reg_mem mp_mem;
+  int e = xpool_alloc(xio_msgr_noreg_mpool, sizeof(XioCommand), &mp_mem);
+  if (!!e)
+    return NULL;
+  XioCommand *xcmd = reinterpret_cast<XioCommand*>(mp_mem.addr);
+  assert(!!xcmd);
+  new (xcmd) XioCommand(xcon, mp_mem);
+  return xcmd;
+}
+
 int XioMessenger::_send_message(Message *m, Connection *con)
 {
   if (con == loop_con.get() /* intrusive_ptr get() */) {
index a267962225795a943876760e690c5baaa2c9b3b7..38390c408fdd7966b42da62294d70be0ffe83c87 100644 (file)
@@ -130,12 +130,6 @@ public:
 
   virtual ConnectionRef get_loopback_connection();
 
-  virtual int send_keepalive(const entity_inst_t& dest)
-    { return EINVAL; }
-
-  virtual int send_keepalive(Connection *con)
-    { return EINVAL; }
-
   virtual void mark_down(const entity_addr_t& a);
   virtual void mark_down(Connection *con);
   virtual void mark_down_all();
@@ -165,4 +159,7 @@ public:
   uint64_t local_features;
 };
 
+XioCommand* pool_alloc_xio_command(XioConnection *xcon);
+
+
 #endif /* XIO_MESSENGER_H */
index 2f44b6f80fa998711f82a80fd620ed970697657a..e7c3a3942dac5181ed4d980b6971d750d4d4ecb8 100644 (file)
@@ -189,7 +189,7 @@ public:
   struct xio_msg * get_xio_msg() {return &req_0.msg;}
   virtual size_t get_msg_count() const {return 1;}
 
-  XioSend(XioConnection *_xcon, struct xio_reg_mem& _mp, int _ex_cnt) :
+  XioSend(XioConnection *_xcon, struct xio_reg_mem& _mp, int _ex_cnt=0) :
     XioSubmit(XioSubmit::OUTGOING_MSG, _xcon),
     req_0(this), mp_this(_mp), nrefs(_ex_cnt+1)
   {
@@ -227,6 +227,18 @@ private:
   atomic_t nrefs;
 };
 
+class XioCommand : public XioSend
+{
+public:
+  XioCommand(XioConnection *_xcon, struct xio_reg_mem& _mp):XioSend(_xcon, _mp) {
+  }
+
+  buffer::list& get_bl_ref() { return bl; };
+
+private:
+  buffer::list bl;
+};
+
 struct XioMsg : public XioSend
 {
 public: