/* requester send complete (one-way) */
uint64_t rc = ++scount;
- XioMsg* xmsg = static_cast<XioMsg*>(req->user_context);
+ XioSend* xsend = static_cast<XioSend*>(req->user_context);
if (unlikely(magic & MSG_MAGIC_TRACE_CTR)) {
if (unlikely((rc % 1000000) == 0)) {
std::cout << "xio finished " << rc << " " << time(0) << std::endl;
}
} /* trace ctr */
- ldout(msgr->cct,11) << "on_msg_delivered xcon: " << xmsg->xcon <<
- " session: " << session << " msg: " << req << " sn: " << req->sn <<
- " type: " << xmsg->m->get_type() << " tid: " << xmsg->m->get_tid() <<
- " seq: " << xmsg->m->get_seq() << dendl;
+ ldout(msgr->cct,11) << "on_msg_delivered xcon: " << xsend->xcon <<
+ " session: " << session << " msg: " << req << " sn: " << req->sn << dendl;
+
+ XioMsg *xmsg = dynamic_cast<XioMsg*>(xsend);
+ if (xmsg) {
+ ldout(msgr->cct,11) << "on_msg_delivered xcon: " <<
+ " type: " << xmsg->m->get_type() << " tid: " << xmsg->m->get_tid() <<
+ " seq: " << xmsg->m->get_seq() << dendl;
+ }
--send_ctr; /* atomic, because portal thread */
if ((send_ctr <= uint32_t(xio_qdepth_low_mark())) &&
(1 /* XXX memory <= memory low-water mark */)) {
cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
- ldout(msgr->cct,2) << "on_msg_delivered xcon: " << xmsg->xcon <<
+ ldout(msgr->cct,2) << "on_msg_delivered xcon: " << xsend->xcon <<
" session: " << session << " up_ready from flow_controlled" << dendl;
}
}
- xmsg->put();
+ xsend->put();
return 0;
} /* on_msg_delivered */
-void XioConnection::msg_send_fail(XioMsg *xmsg, int code)
+void XioConnection::msg_send_fail(XioSend *xsend, int code)
{
ldout(msgr->cct,2) << "xio_send_msg FAILED xcon: " << this <<
- " xmsg: " << xmsg->get_xio_msg() << " code=" << code <<
+ " msg: " << xsend->get_xio_msg() << " code=" << code <<
" (" << xio_strerror(code) << ")" << dendl;
/* return refs taken for each xio_msg */
- xmsg->put_msg_refs();
+ xsend->put_msg_refs();
} /* msg_send_fail */
void XioConnection::msg_release_fail(struct xio_msg *msg, int code)
{
ldout(msgr->cct,2) << "xio_release_msg FAILED xcon: " << this <<
- " xmsg: " << msg << "code=" << code <<
+ " msg: " << msg << "code=" << code <<
" (" << xio_strerror(code) << ")" << dendl;
} /* msg_release_fail */
/* the two send queues contain different objects:
* - anything on the mqueue is a Message
- * - anything on the requeue is an XioMsg
+ * - anything on the requeue is an XioSend
*/
Message::Queue::const_iterator i1 = disc_q.end();
disc_q.splice(i1, outgoing.mqueue);
while (!deferred_q.empty()) {
XioSubmit::Queue::iterator q_iter = deferred_q.begin();
XioSubmit* xs = &(*q_iter);
- XioMsg* xmsg;
+ XioSend* xsend;
switch (xs->type) {
case XioSubmit::OUTGOING_MSG:
- xmsg = static_cast<XioMsg*>(xs);
+ xsend = static_cast<XioSend*>(xs);
deferred_q.erase(q_iter);
// release once for each chained xio_msg
- xmsg->put(xmsg->get_msg_count());
+ xsend->put(xsend->get_msg_count());
break;
case XioSubmit::INCOMING_MSG_RELEASE:
deferred_q.erase(q_iter);
struct xio_msg *msg,
void *conn_user_context)
{
- XioMsg *xmsg = static_cast<XioMsg*>(msg->user_context);
- if (xmsg)
- xmsg->put();
+ XioSend *xsend = static_cast<XioSend*>(msg->user_context);
+ if (xsend)
+ xsend->put();
--send_ctr; /* atomic, because portal thread */
return 0;
class XioPortal;
class XioMessenger;
-class XioMsg;
+class XioSend;
class XioConnection : public Connection
{
friend class XioMessenger;
friend class XioDispatchHook;
friend class XioMarkDownHook;
- friend class XioMsg;
+ friend class XioSend;
int on_disconnect_event() {
connected.set(false);
void *conn_user_context);
int on_msg_error(struct xio_session *session, enum xio_status error,
struct xio_msg *msg, void *conn_user_context);
- void msg_send_fail(XioMsg *xmsg, int code);
+ void msg_send_fail(XioSend *xsend, int code);
void msg_release_fail(struct xio_msg *msg, int code);
int flush_out_queues(uint32_t flags);
int discard_input_queue(uint32_t flags);
}
};
-struct XioMsg : public XioSubmit
+class XioSend : public XioSubmit
{
public:
- Message* m;
- XioMsgHdr hdr;
+ virtual void print_debug(CephContext *cct, const char *tag) const {};
+ const struct xio_msg * get_xio_msg() const {return &req_0.msg;}
+ struct xio_msg * get_xio_msg() {return &req_0.msg;}
+ virtual size_t get_msg_count() const {return 1;}
+
+ XioSend(XioConnection *_xcon, struct xio_reg_mem& _mp, int _ex_cnt) :
+ XioSubmit(XioSubmit::OUTGOING_MSG, _xcon),
+ req_0(this), mp_this(_mp), nrefs(_ex_cnt+1)
+ {
+ xpool_inc_msgcnt();
+ xcon->get();
+ }
+
+ XioSend* get() { nrefs.inc(); return this; };
+
+ void put(int n) {
+ int refs = nrefs.sub(n);
+ if (refs == 0) {
+ struct xio_reg_mem *mp = &this->mp_this;
+ this->~XioSend();
+ xpool_free(sizeof(XioSend), mp);
+ }
+ }
+
+ void put() {
+ put(1);
+ }
+
+ void put_msg_refs() {
+ put(get_msg_count());
+ }
+
+ virtual ~XioSend() {
+ xpool_dec_msgcnt();
+ xcon->put();
+ }
+
+private:
xio_msg_ex req_0;
- xio_msg_ex* req_arr;
struct xio_reg_mem mp_this;
atomic_t nrefs;
+};
+
+struct XioMsg : public XioSend
+{
+public:
+ Message* m;
+ XioMsgHdr hdr;
+ xio_msg_ex* req_arr;
public:
XioMsg(Message *_m, XioConnection *_xcon, struct xio_reg_mem& _mp,
int _ex_cnt) :
- XioSubmit(XioSubmit::OUTGOING_MSG, _xcon),
+ XioSend(_xcon, _mp, _ex_cnt),
m(_m), hdr(m->get_header(), m->get_footer()),
- req_0(this), req_arr(NULL), mp_this(_mp), nrefs(_ex_cnt+1)
+ req_arr(NULL)
{
const entity_inst_t &inst = xcon->get_messenger()->get_myinst();
hdr.peer_type = inst.name.type();
if (unlikely(_ex_cnt > 0)) {
alloc_trailers(_ex_cnt);
}
-
- xpool_inc_msgcnt();
-
- // submit queue ref
- xcon->get();
- }
-
- void print_debug(CephContext *cct, const char *tag) const;
- const struct xio_msg * get_xio_msg() const {return &req_0.msg;}
- struct xio_msg * get_xio_msg() {return &req_0.msg;}
- size_t get_msg_count() const {return hdr.msg_cnt;}
-
- XioMsg* get() { nrefs.inc(); return this; };
-
- void put(int n) {
- int refs = nrefs.sub(n);
- if (refs == 0) {
- struct xio_reg_mem *mp = &this->mp_this;
- this->~XioMsg();
- xpool_free(sizeof(XioMsg), mp);
}
- }
- void put() {
- put(1);
- }
-
- void put_msg_refs() {
- put(get_msg_count());
+ void print_debug(CephContext *cct, const char *tag) const override;
+ size_t get_msg_count() const override {
+ return hdr.msg_cnt;
}
void alloc_trailers(int cnt) {
/* the normal case: done with message */
m->put();
}
-
- xpool_dec_msgcnt();
-
- /* submit queue ref */
- xcon->put();
}
};
switch(xs->type) {
case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
{
- XioMsg* xmsg = static_cast<XioMsg*>(xs);
- xs->xcon->msg_send_fail(xmsg, -EINVAL);
+ XioSend* xsend = static_cast<XioSend*>(xs);
+ xs->xcon->msg_send_fail(xsend, -EINVAL);
}
break;
default:
struct xio_msg *msg = NULL;
XioConnection *xcon;
XioSubmit *xs;
- XioMsg *xmsg;
+ XioSend *xsend;
do {
submit_q.deq(send_q);
size = send_q.size();
if (_shutdown) {
- // XXX XioMsg queues for flow-controlled connections may require
+ // XXX XioSend queues for flow-controlled connections may require
// cleanup
drained = true;
}
switch (xs->type) {
case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
- xmsg = static_cast<XioMsg*>(xs);
+ xsend = static_cast<XioSend*>(xs);
if (unlikely(!xcon->conn || !xcon->is_connected()))
code = ENOTCONN;
else {
* on Accelio's check on below, but this assures that
* all chained xio_msg are accounted) */
xio_qdepth_high = xcon->xio_qdepth_high_mark();
- if (unlikely((xcon->send_ctr + xmsg->get_msg_count()) >
+ if (unlikely((xcon->send_ctr + xsend->get_msg_count()) >
xio_qdepth_high)) {
requeue_all_xcon(xcon, q_iter, send_q);
goto restart;
}
- msg = xmsg->get_xio_msg();
+ msg = xsend->get_xio_msg();
code = xio_send_msg(xcon->conn, msg);
/* header trace moved here to capture xio serial# */
if (ldlog_p1(msgr->cct, ceph_subsys_xio, 11)) {
- xmsg->print_debug(msgr->cct, "xio_send_msg");
+ xsend->print_debug(msgr->cct, "xio_send_msg");
}
/* get the right Accelio's errno code */
if (unlikely(code)) {
break;
default:
q_iter = send_q.erase(q_iter);
- xcon->msg_send_fail(xmsg, code);
+ xcon->msg_send_fail(xsend, code);
continue;
break;
};
} else {
xcon->send.set(msg->timestamp); // need atomic?
- xcon->send_ctr += xmsg->get_msg_count(); // only inc if cb promised
+ xcon->send_ctr += xsend->get_msg_count(); // only inc if cb promised
}
break;
default: