]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
xio: split XioMsg into base XioSend and derived XioMsg
authorAvner BenHanoch <avnerb@mellanox.com>
Thu, 14 Apr 2016 10:55:39 +0000 (13:55 +0300)
committerAvner BenHanoch <avnerb@mellanox.com>
Sun, 1 May 2016 09:56:11 +0000 (12:56 +0300)
The base class will allow creating additional derived classes for
accelio messages that do not contain a Ceph Message like keepalives
and acks (till now this was not supported)

Signed-off-by: Avner BenHanoch <avnerb@mellanox.com>
src/msg/xio/XioConnection.cc
src/msg/xio/XioConnection.h
src/msg/xio/XioMsg.h
src/msg/xio/XioPortal.h

index 8049da4a21c0b933765bc85c2a27d75e82ff080f..460196d30b6f64aa917863537274c48eb3aac4c6 100644 (file)
@@ -455,17 +455,22 @@ int XioConnection::on_ow_msg_send_complete(struct xio_session *session,
   /* requester send complete (one-way) */
   uint64_t rc = ++scount;
 
-  XioMsg* xmsg = static_cast<XioMsg*>(req->user_context);
+  XioSend* xsend = static_cast<XioSend*>(req->user_context);
   if (unlikely(magic & MSG_MAGIC_TRACE_CTR)) {
     if (unlikely((rc % 1000000) == 0)) {
       std::cout << "xio finished " << rc << " " << time(0) << std::endl;
     }
   } /* trace ctr */
 
-  ldout(msgr->cct,11) << "on_msg_delivered xcon: " << xmsg->xcon <<
-    " session: " << session << " msg: " << req << " sn: " << req->sn <<
-    " type: " << xmsg->m->get_type() << " tid: " << xmsg->m->get_tid() <<
-    " seq: " << xmsg->m->get_seq() << dendl;
+  ldout(msgr->cct,11) << "on_msg_delivered xcon: " << xsend->xcon <<
+    " session: " << session << " msg: " << req << " sn: " << req->sn << dendl;
+
+  XioMsg *xmsg = dynamic_cast<XioMsg*>(xsend);
+  if (xmsg) {
+    ldout(msgr->cct,11) << "on_msg_delivered xcon: " <<
+      " type: " << xmsg->m->get_type() << " tid: " << xmsg->m->get_tid() <<
+      " seq: " << xmsg->m->get_seq() << dendl;
+  }
 
   --send_ctr; /* atomic, because portal thread */
 
@@ -475,29 +480,29 @@ int XioConnection::on_ow_msg_send_complete(struct xio_session *session,
     if ((send_ctr <= uint32_t(xio_qdepth_low_mark())) &&
        (1 /* XXX memory <= memory low-water mark */))  {
       cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
-      ldout(msgr->cct,2) << "on_msg_delivered xcon: " << xmsg->xcon <<
+      ldout(msgr->cct,2) << "on_msg_delivered xcon: " << xsend->xcon <<
         " session: " << session << " up_ready from flow_controlled" << dendl;
     }
   }
 
-  xmsg->put();
+  xsend->put();
 
   return 0;
 }  /* on_msg_delivered */
 
-void XioConnection::msg_send_fail(XioMsg *xmsg, int code)
+void XioConnection::msg_send_fail(XioSend *xsend, int code)
 {
   ldout(msgr->cct,2) << "xio_send_msg FAILED xcon: " << this <<
-    " xmsg: " << xmsg->get_xio_msg() << " code=" << code <<
+    " msg: " << xsend->get_xio_msg() << " code=" << code <<
     " (" << xio_strerror(code) << ")" << dendl;
   /* return refs taken for each xio_msg */
-  xmsg->put_msg_refs();
+  xsend->put_msg_refs();
 } /* msg_send_fail */
 
 void XioConnection::msg_release_fail(struct xio_msg *msg, int code)
 {
   ldout(msgr->cct,2) << "xio_release_msg FAILED xcon: " << this <<
-    " xmsg: " << msg <<  "code=" << code <<
+    " msg: " << msg <<  "code=" << code <<
     " (" << xio_strerror(code) << ")" << dendl;
 } /* msg_release_fail */
 
@@ -533,7 +538,7 @@ int XioConnection::discard_input_queue(uint32_t flags)
 
   /* the two send queues contain different objects:
    * - anything on the mqueue is a Message
-   * - anything on the requeue is an XioMsg
+   * - anything on the requeue is an XioSend
    */
   Message::Queue::const_iterator i1 = disc_q.end();
   disc_q.splice(i1, outgoing.mqueue);
@@ -556,13 +561,13 @@ int XioConnection::discard_input_queue(uint32_t flags)
   while (!deferred_q.empty()) {
     XioSubmit::Queue::iterator q_iter = deferred_q.begin();
     XioSubmit* xs = &(*q_iter);
-    XioMsg* xmsg;
+    XioSend* xsend;
     switch (xs->type) {
       case XioSubmit::OUTGOING_MSG:
-       xmsg = static_cast<XioMsg*>(xs);
+       xsend = static_cast<XioSend*>(xs);
        deferred_q.erase(q_iter);
        // release once for each chained xio_msg
-       xmsg->put(xmsg->get_msg_count());
+       xsend->put(xsend->get_msg_count());
        break;
       case XioSubmit::INCOMING_MSG_RELEASE:
        deferred_q.erase(q_iter);
@@ -606,9 +611,9 @@ int XioConnection::on_msg_error(struct xio_session *session,
                                struct xio_msg  *msg,
                                void *conn_user_context)
 {
-  XioMsg *xmsg = static_cast<XioMsg*>(msg->user_context);
-  if (xmsg)
-    xmsg->put();
+  XioSend *xsend = static_cast<XioSend*>(msg->user_context);
+  if (xsend)
+    xsend->put();
 
   --send_ctr; /* atomic, because portal thread */
   return 0;
index e439cbcb936a2e9b79b8a7edfd8d1a8f58606cdc..0b7f46166d0021b6e04222d92641d9aa5fcf3e56 100644 (file)
@@ -37,7 +37,7 @@ namespace bi = boost::intrusive;
 
 class XioPortal;
 class XioMessenger;
-class XioMsg;
+class XioSend;
 
 class XioConnection : public Connection
 {
@@ -231,7 +231,7 @@ private:
   friend class XioMessenger;
   friend class XioDispatchHook;
   friend class XioMarkDownHook;
-  friend class XioMsg;
+  friend class XioSend;
 
   int on_disconnect_event() {
     connected.set(false);
@@ -319,7 +319,7 @@ public:
                              void *conn_user_context);
   int on_msg_error(struct xio_session *session, enum xio_status error,
                   struct xio_msg  *msg, void *conn_user_context);
-  void msg_send_fail(XioMsg *xmsg, int code);
+  void msg_send_fail(XioSend *xsend, int code);
   void msg_release_fail(struct xio_msg *msg, int code);
   int flush_out_queues(uint32_t flags);
   int discard_input_queue(uint32_t flags);
index 1e1fa889692b6547220547a5bba9c489b9622a5e..2f44b6f80fa998711f82a80fd620ed970697657a 100644 (file)
@@ -181,22 +181,65 @@ struct xio_msg_ex
   }
 };
 
-struct XioMsg : public XioSubmit
+class XioSend : public XioSubmit
 {
 public:
-  Message* m;
-  XioMsgHdr hdr;
+  virtual void print_debug(CephContext *cct, const char *tag) const {};
+  const struct xio_msg * get_xio_msg() const {return &req_0.msg;}
+  struct xio_msg * get_xio_msg() {return &req_0.msg;}
+  virtual size_t get_msg_count() const {return 1;}
+
+  XioSend(XioConnection *_xcon, struct xio_reg_mem& _mp, int _ex_cnt) :
+    XioSubmit(XioSubmit::OUTGOING_MSG, _xcon),
+    req_0(this), mp_this(_mp), nrefs(_ex_cnt+1)
+  {
+    xpool_inc_msgcnt();
+    xcon->get();
+  }
+
+  XioSend* get() { nrefs.inc(); return this; };
+
+  void put(int n) {
+    int refs = nrefs.sub(n);
+    if (refs == 0) {
+      struct xio_reg_mem *mp = &this->mp_this;
+      this->~XioSend();
+      xpool_free(sizeof(XioSend), mp);
+    }
+  }
+
+  void put() {
+    put(1);
+  }
+
+  void put_msg_refs() {
+    put(get_msg_count());
+  }
+
+  virtual ~XioSend() {
+    xpool_dec_msgcnt();
+    xcon->put();
+  }
+
+private:
   xio_msg_ex req_0;
-  xio_msg_ex* req_arr;
   struct xio_reg_mem mp_this;
   atomic_t nrefs;
+};
+
+struct XioMsg : public XioSend
+{
+public:
+  Message* m;
+  XioMsgHdr hdr;
+  xio_msg_ex* req_arr;
 
 public:
   XioMsg(Message *_m, XioConnection *_xcon, struct xio_reg_mem& _mp,
         int _ex_cnt) :
-    XioSubmit(XioSubmit::OUTGOING_MSG, _xcon),
+    XioSend(_xcon, _mp, _ex_cnt),
     m(_m), hdr(m->get_header(), m->get_footer()),
-    req_0(this), req_arr(NULL), mp_this(_mp), nrefs(_ex_cnt+1)
+    req_arr(NULL)
     {
       const entity_inst_t &inst = xcon->get_messenger()->get_myinst();
       hdr.peer_type = inst.name.type();
@@ -208,35 +251,11 @@ public:
       if (unlikely(_ex_cnt > 0)) {
        alloc_trailers(_ex_cnt);
       }
-
-      xpool_inc_msgcnt();
-
-      // submit queue ref
-      xcon->get();
-    }
-
-  void print_debug(CephContext *cct, const char *tag) const;
-  const struct xio_msg * get_xio_msg() const {return &req_0.msg;}
-  struct xio_msg * get_xio_msg() {return &req_0.msg;}
-  size_t get_msg_count() const {return hdr.msg_cnt;}
-
-  XioMsg* get() { nrefs.inc(); return this; };
-
-  void put(int n) {
-    int refs = nrefs.sub(n);
-    if (refs == 0) {
-      struct xio_reg_mem *mp = &this->mp_this;
-      this->~XioMsg();
-      xpool_free(sizeof(XioMsg), mp);
     }
-  }
 
-  void put() {
-    put(1);
-  }
-
-  void put_msg_refs() {
-    put(get_msg_count());
+  void print_debug(CephContext *cct, const char *tag) const override;
+  size_t get_msg_count() const override {
+    return hdr.msg_cnt;
   }
 
   void alloc_trailers(int cnt) {
@@ -272,11 +291,6 @@ public:
          /* the normal case: done with message */
          m->put();
       }
-
-      xpool_dec_msgcnt();
-
-      /* submit queue ref */
-      xcon->put();
     }
 };
 
index f7f0770efe81dc8fc5cba3ae43c078e39199596d..907e29297fc5a6ebc441b9b499c667c4121ec642 100644 (file)
@@ -187,8 +187,8 @@ public:
       switch(xs->type) {
       case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
       {
-       XioMsg* xmsg = static_cast<XioMsg*>(xs);
-       xs->xcon->msg_send_fail(xmsg, -EINVAL);
+       XioSend* xsend = static_cast<XioSend*>(xs);
+       xs->xcon->msg_send_fail(xsend, -EINVAL);
       }
        break;
       default:
@@ -236,7 +236,7 @@ public:
       struct xio_msg *msg = NULL;
       XioConnection *xcon;
       XioSubmit *xs;
-      XioMsg *xmsg;
+      XioSend *xsend;
 
       do {
        submit_q.deq(send_q);
@@ -248,7 +248,7 @@ public:
        size = send_q.size();
 
        if (_shutdown) {
-         // XXX XioMsg queues for flow-controlled connections may require
+         // XXX XioSend queues for flow-controlled connections may require
          // cleanup
          drained = true;
        }
@@ -261,7 +261,7 @@ public:
 
            switch (xs->type) {
            case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
-             xmsg = static_cast<XioMsg*>(xs);
+             xsend = static_cast<XioSend*>(xs);
              if (unlikely(!xcon->conn || !xcon->is_connected()))
                code = ENOTCONN;
              else {
@@ -269,17 +269,17 @@ public:
                 * on Accelio's check on below, but this assures that
                 * all chained xio_msg are accounted) */
                xio_qdepth_high = xcon->xio_qdepth_high_mark();
-               if (unlikely((xcon->send_ctr + xmsg->get_msg_count()) >
+               if (unlikely((xcon->send_ctr + xsend->get_msg_count()) >
                             xio_qdepth_high)) {
                  requeue_all_xcon(xcon, q_iter, send_q);
                  goto restart;
                }
 
-               msg = xmsg->get_xio_msg();
+               msg = xsend->get_xio_msg();
                code = xio_send_msg(xcon->conn, msg);
                /* header trace moved here to capture xio serial# */
                if (ldlog_p1(msgr->cct, ceph_subsys_xio, 11)) {
-                 xmsg->print_debug(msgr->cct, "xio_send_msg");
+                 xsend->print_debug(msgr->cct, "xio_send_msg");
                }
                /* get the right Accelio's errno code */
                if (unlikely(code)) {
@@ -306,13 +306,13 @@ public:
                  break;
                default:
                  q_iter = send_q.erase(q_iter);
-                 xcon->msg_send_fail(xmsg, code);
+                 xcon->msg_send_fail(xsend, code);
                  continue;
                  break;
                };
              } else {
                xcon->send.set(msg->timestamp); // need atomic?
-               xcon->send_ctr += xmsg->get_msg_count(); // only inc if cb promised
+               xcon->send_ctr += xsend->get_msg_count(); // only inc if cb promised
              }
              break;
            default: