peer_type = peer.name.type();
set_peer_addr(peer.addr);
+ Messenger::Policy policy;
+ int64_t max_msgs = 0, max_bytes = 0, bytes_opt = 0;
+ int xopt;
+
+ policy = m->get_policy(peer_type);
+
+ if (policy.throttler_messages) {
+ max_msgs = policy.throttler_messages->get_max();
+ ldout(m->cct,0) << "XioMessenger throttle_msgs: " << max_msgs << dendl;
+ }
+
+ xopt = m->cct->_conf->xio_queue_depth;
+ if (max_msgs > xopt)
+ xopt = max_msgs;
+
+ /* set high mark for send, reserved 20% for credits */
+ q_high_mark = xopt * 4 / 5;
+ q_low_mark = q_high_mark/2;
+
+ /* set send & receive msgs queue depth */
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_SND_QUEUE_DEPTH_MSGS,
+ &xopt, sizeof(xopt));
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_RCV_QUEUE_DEPTH_MSGS,
+ &xopt, sizeof(xopt));
+
+ if (policy.throttler_bytes) {
+ max_bytes = policy.throttler_bytes->get_max();
+ ldout(m->cct,0) << "XioMessenger throttle_bytes: " << max_bytes << dendl;
+ }
+
+ bytes_opt = (2 << 28); /* default: 512 MB */
+ if (max_bytes > bytes_opt)
+ bytes_opt = max_bytes;
+
+ /* set send & receive total bytes throttle */
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_SND_QUEUE_DEPTH_BYTES,
+ &bytes_opt, sizeof(bytes_opt));
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_RCV_QUEUE_DEPTH_BYTES,
+ &bytes_opt, sizeof(bytes_opt));
+
+ ldout(m->cct,0) << "Peer type: " << peer.name.type_str() <<
+ " throttle_msgs: " << xopt << " throttle_bytes: " << bytes_opt << dendl;
+
/* XXXX fake features, aieee! */
set_features(XIO_ALL_FEATURES);
}
xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_OUT_IOVLEN,
&xopt, sizeof(xopt));
- xopt = cct->_conf->xio_queue_depth; // defaults to 512
- xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_SND_QUEUE_DEPTH_MSGS,
- &xopt, sizeof(xopt));
- xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_RCV_QUEUE_DEPTH_MSGS,
- &xopt, sizeof(xopt));
+ /* enable flow-control */
+ xopt = 1;
+ xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_ENABLE_FLOW_CONTROL,
+ &xopt, sizeof(xopt));
/* and set threshold for buffer callouts */
xopt = 16384;
void *entry()
{
int size, code = 0;
- uint32_t xio_qdepth;
+ uint32_t xio_qdepth_high;
XioSubmit::Queue send_q;
XioSubmit::Queue::iterator q_iter;
struct xio_msg *msg = NULL;
xmsg = static_cast<XioMsg*>(xs);
/* guard Accelio send queue */
- xio_qdepth = xcon->xio_queue_depth();
- if (unlikely((xcon->send_ctr + xmsg->hdr.msg_cnt) > xio_qdepth)) {
+ xio_qdepth_high = xcon->xio_qdepth_high_mark();
+ if (unlikely((xcon->send_ctr + xmsg->hdr.msg_cnt) > xio_qdepth_high)) {
++q_iter;
continue;
}