]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
xio: add message tag to protocol and write/read it with standard messages
authorAvner BenHanoch <avnerb@mellanox.com>
Wed, 13 Apr 2016 09:45:59 +0000 (12:45 +0300)
committerAvner BenHanoch <avnerb@mellanox.com>
Sun, 1 May 2016 09:56:11 +0000 (12:56 +0300)
(at this phase the tag is always CEPH_MSGR_TAG_MSG since currently
xio only have standard messages)

Signed-off-by: Avner BenHanoch <avnerb@mellanox.com>
src/msg/xio/XioConnection.cc
src/msg/xio/XioConnection.h
src/msg/xio/XioMessenger.cc
src/msg/xio/XioMsg.h

index 31f1cf685e3bea79e74e5dbff491c8c6e85b0891..8543f0ced3fd6babb721b25b6beb5576850b059d 100644 (file)
@@ -196,7 +196,7 @@ static inline XioDispatchHook* pool_alloc_xio_dispatch_hook(
   return xhook;
 }
 
-int XioConnection::on_msg(struct xio_session *session,
+int XioConnection::handle_data_msg(struct xio_session *session,
                              struct xio_msg *msg,
                              int more_in_batch,
                              void *cb_user_context)
@@ -212,9 +212,10 @@ int XioConnection::on_msg(struct xio_session *session,
        xio_release_msg(msg);
        return 0;
     }
+    const size_t sizeof_tag = 1;
     XioMsgCnt msg_cnt(
-      buffer::create_static(tmsg->in.header.iov_len,
-                           (char*) tmsg->in.header.iov_base));
+      buffer::create_static(tmsg->in.header.iov_len-sizeof_tag,
+                           ((char*) tmsg->in.header.iov_base)+sizeof_tag));
     ldout(msgr->cct,10) << __func__ << " receive msg " << "tmsg " << tmsg
       << " msg_cnt " << msg_cnt.msg_cnt
       << " iov_base " << tmsg->in.header.iov_base
@@ -422,6 +423,31 @@ int XioConnection::on_msg(struct xio_session *session,
   return 0;
 }
 
+int XioConnection::on_msg(struct xio_session *session,
+                             struct xio_msg *msg,
+                             int more_in_batch,
+                             void *cb_user_context)
+{
+  char tag = CEPH_MSGR_TAG_MSG;
+  if (msg->in.header.iov_len)
+    tag = *(char*)msg->in.header.iov_base;
+
+  ldout(msgr->cct,8) << __func__ << " receive msg with iov_len "
+    << (int) msg->in.header.iov_len << " tag " << (int)tag << dendl;
+
+  switch(tag) {
+  case CEPH_MSGR_TAG_MSG:
+    ldout(msgr->cct, 20) << __func__ << " got data message" << dendl;
+    return handle_data_msg(session, msg, more_in_batch, cb_user_context);
+
+  default:
+    assert(! "unrecognized message tag");
+  }
+
+  return 0;
+}
+
+
 int XioConnection::on_ow_msg_send_complete(struct xio_session *session,
                                           struct xio_msg *req,
                                           void *conn_user_context)
index e55ea983197ece090332980458adc0f5d544130b..e439cbcb936a2e9b79b8a7edfd8d1a8f58606cdc 100644 (file)
@@ -311,6 +311,8 @@ public:
 
   int passive_setup(); /* XXX */
 
+  int handle_data_msg(struct xio_session *session, struct xio_msg *msg,
+                int more_in_batch, void *cb_user_context);
   int on_msg(struct xio_session *session, struct xio_msg *msg,
                 int more_in_batch, void *cb_user_context);
   int on_ow_msg_send_complete(struct xio_session *session, struct xio_msg *msg,
index 31ce6fe6a32a3556d746edcd09a267e00cc4a285..8b42832108ced9947d609e82e32cff902c3d46d6 100644 (file)
@@ -860,6 +860,7 @@ int XioMessenger::_send_message_impl(Message* m, XioConnection* xcon)
   }
 
   ldout(cct,4) << __func__ << " " << m << " new XioMsg " << xmsg
+       << " tag " << (int)xmsg->hdr.tag
        << " req_0 " << &xmsg->req_0.msg << " msg type " << m->get_type()
        << " features: " << xcon->get_features()
        << " conn " << xcon->conn << " sess " << xcon->session << dendl;
index 10a26511b3f2219ee82b7f898fc86008ffd96786..c0141a85e2364313da7d0ec12884e6e6cbcb7a5a 100644 (file)
@@ -45,6 +45,7 @@ public:
 class XioMsgHdr
 {
 public:
+  char tag;
   __le32 msg_cnt;
   __le32 peer_type;
   entity_addr_t addr; /* XXX hack! */
@@ -53,7 +54,7 @@ public:
   buffer::list bl;
 public:
   XioMsgHdr(ceph_msg_header& _hdr, ceph_msg_footer& _ftr)
-    : msg_cnt(0), hdr(&_hdr), ftr(&_ftr)
+    : tag(CEPH_MSGR_TAG_MSG), msg_cnt(0), hdr(&_hdr), ftr(&_ftr)
     { }
 
   XioMsgHdr(ceph_msg_header& _hdr, ceph_msg_footer &_ftr, buffer::ptr p)
@@ -69,6 +70,7 @@ public:
   const buffer::list& get_bl() { encode(bl); return bl; };
 
   inline void encode_hdr(buffer::list& bl) const {
+    ::encode(tag, bl);
     ::encode(msg_cnt, bl);
     ::encode(peer_type, bl);
     ::encode(addr, bl);
@@ -101,6 +103,7 @@ public:
   }
 
   inline void decode_hdr(buffer::list::iterator& bl) {
+    ::decode(tag, bl);
     ::decode(msg_cnt, bl);
     ::decode(peer_type, bl);
     ::decode(addr, bl);