Lane qlane[nlanes];
- SubmitQueue()
+ int ix; /* atomicity by portal thread */
+
+ SubmitQueue() : ix(0)
{
int ix;
Lane* lane;
void deq(XioSubmit::Queue& send_q)
{
- int ix;
Lane* lane;
-
- for (ix = 0; ix < nlanes; ++ix) {
+ int cnt;
+ for (cnt = 0; cnt < nlanes; ++cnt, ++ix, ix = ix % nlanes) {
lane = &qlane[ix];
pthread_spin_lock(&lane->sp);
if (lane->size > 0) {
XioSubmit::Queue::const_iterator i1 = send_q.end();
send_q.splice(i1, lane->q);
lane->size = 0;
+ ++ix, ix = ix % nlanes;
+ pthread_spin_unlock(&lane->sp);
+ break;
}
pthread_spin_unlock(&lane->sp);
}
}
- };
+ }; /* SubmitQueue */
Messenger *msgr;
struct xio_context *ctx;
while (q_iter != send_q.end()) {
xs = &(*q_iter);
xcon = xs->xcon;
- xmsg = static_cast<XioMsg*>(xs);
-
- /* guard Accelio send queue */
- xio_qdepth_high = xcon->xio_qdepth_high_mark();
- if (unlikely((xcon->send_ctr + xmsg->hdr.msg_cnt) >
- xio_qdepth_high)) {
- requeue_all_xcon(xmsg, xcon, q_iter, send_q);
- goto restart;
- }
-
- q_iter = send_q.erase(q_iter);
switch (xs->type) {
case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
+ xmsg = static_cast<XioMsg*>(xs);
if (unlikely(!xs->xcon->conn))
code = ENOTCONN;
else {
+ /* XXX guard Accelio send queue (should be safe to rely
+ * on Accelio's check on below, but this assures that
+ * all chained xio_msg are accounted) */
+ xio_qdepth_high = xcon->xio_qdepth_high_mark();
+ if (unlikely((xcon->send_ctr + xmsg->hdr.msg_cnt) >
+ xio_qdepth_high)) {
+ requeue_all_xcon(xmsg, xcon, q_iter, send_q);
+ goto restart;
+ }
+
msg = &xmsg->req_0.msg;
code = xio_send_msg(xcon->conn, msg);
/* header trace moved here to capture xio serial# */
print_xio_msg_hdr(msgr->cct, "xio_send_msg", xmsg->hdr, msg);
print_ceph_msg(msgr->cct, "xio_send_msg", xmsg->m);
}
- }
+ } /* !ENOTCONN */
if (unlikely(code)) {
switch (code) {
case XIO_E_TX_QUEUE_OVERFLOW:
break;
default:
/* INCOMING_MSG_RELEASE */
+ q_iter = send_q.erase(q_iter);
release_xio_rsp(static_cast<XioRsp*>(xs));
- break;
- }
- }
- }
+ continue;
+ } /* switch (xs->type) */
+ q_iter = send_q.erase(q_iter);
+ } /* while */
+ } /* size > 0 */
pthread_spin_unlock(&sp);
xio_context_run_loop(ctx, 300);
void shutdown()
{
pthread_spin_lock(&sp);
- xio_context_stop_loop(ctx);
_shutdown = true;
pthread_spin_unlock(&sp);
}