return xhook;
}
-int XioConnection::on_msg(struct xio_session *session,
+int XioConnection::handle_data_msg(struct xio_session *session,
struct xio_msg *msg,
int more_in_batch,
void *cb_user_context)
xio_release_msg(msg);
return 0;
}
+ const size_t sizeof_tag = 1;
XioMsgCnt msg_cnt(
- buffer::create_static(tmsg->in.header.iov_len,
- (char*) tmsg->in.header.iov_base));
+ buffer::create_static(tmsg->in.header.iov_len-sizeof_tag,
+ ((char*) tmsg->in.header.iov_base)+sizeof_tag));
ldout(msgr->cct,10) << __func__ << " receive msg " << "tmsg " << tmsg
<< " msg_cnt " << msg_cnt.msg_cnt
<< " iov_base " << tmsg->in.header.iov_base
return 0;
}
+int XioConnection::on_msg(struct xio_session *session,
+ struct xio_msg *msg,
+ int more_in_batch,
+ void *cb_user_context)
+{
+ char tag = CEPH_MSGR_TAG_MSG;
+ if (msg->in.header.iov_len)
+ tag = *(char*)msg->in.header.iov_base;
+
+ ldout(msgr->cct,8) << __func__ << " receive msg with iov_len "
+ << (int) msg->in.header.iov_len << " tag " << (int)tag << dendl;
+
+ switch(tag) {
+ case CEPH_MSGR_TAG_MSG:
+ ldout(msgr->cct, 20) << __func__ << " got data message" << dendl;
+ return handle_data_msg(session, msg, more_in_batch, cb_user_context);
+
+ default:
+ assert(! "unrecognized message tag");
+ }
+
+ return 0;
+}
+
+
int XioConnection::on_ow_msg_send_complete(struct xio_session *session,
struct xio_msg *req,
void *conn_user_context)
int passive_setup(); /* XXX */
+ int handle_data_msg(struct xio_session *session, struct xio_msg *msg,
+ int more_in_batch, void *cb_user_context);
int on_msg(struct xio_session *session, struct xio_msg *msg,
int more_in_batch, void *cb_user_context);
int on_ow_msg_send_complete(struct xio_session *session, struct xio_msg *msg,
}
ldout(cct,4) << __func__ << " " << m << " new XioMsg " << xmsg
+ << " tag " << (int)xmsg->hdr.tag
<< " req_0 " << &xmsg->req_0.msg << " msg type " << m->get_type()
<< " features: " << xcon->get_features()
<< " conn " << xcon->conn << " sess " << xcon->session << dendl;
class XioMsgHdr
{
public:
+ char tag;
__le32 msg_cnt;
__le32 peer_type;
entity_addr_t addr; /* XXX hack! */
buffer::list bl;
public:
XioMsgHdr(ceph_msg_header& _hdr, ceph_msg_footer& _ftr)
- : msg_cnt(0), hdr(&_hdr), ftr(&_ftr)
+ : tag(CEPH_MSGR_TAG_MSG), msg_cnt(0), hdr(&_hdr), ftr(&_ftr)
{ }
XioMsgHdr(ceph_msg_header& _hdr, ceph_msg_footer &_ftr, buffer::ptr p)
const buffer::list& get_bl() { encode(bl); return bl; };
inline void encode_hdr(buffer::list& bl) const {
+ ::encode(tag, bl);
::encode(msg_cnt, bl);
::encode(peer_type, bl);
::encode(addr, bl);
}
inline void decode_hdr(buffer::list::iterator& bl) {
+ ::decode(tag, bl);
::decode(msg_cnt, bl);
::decode(peer_type, bl);
::decode(addr, bl);