XioInit(cct),
nsessions(0),
shutdown_called(false),
- portals(this, cct->_conf->xio_portal_threads),
+ portals(this, get_nportals(), get_nconns_per_portal()),
dispatch_strategy(ds),
loop_con(new XioLoopbackConnection(this)),
special_handling(0),
local_features = features;
loop_con->set_features(features);
+ ldout(cct,2) << "Create msgr: " << this << " instance: "
+ << nInstances.read() << " type: " << name.type_str()
+ << " subtype: " << mname << " nportals: " << get_nportals()
+ << " nconns_per_portal: " << get_nconns_per_portal() << " features: "
+ << features << dendl;
+
} /* ctor */
int XioMessenger::pool_hint(uint32_t dsize) {
XMSG_MEMPOOL_QUANTUM, 0);
}
+int XioMessenger::get_nconns_per_portal()
+{
+ return max(cct->_conf->xio_max_conns_per_portal, 32);
+}
+
+int XioMessenger::get_nportals()
+{
+ return max(cct->_conf->xio_portal_threads, 1);
+}
+
void XioMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
{
// be careful here: multiple threads may block here, and readers of
/* XXXX pre-merge of session startup negotiation ONLY! */
xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
- ldout(cct,2) << "new connection session " << session
- << " xcon " << xcon << dendl;
- ldout(cct,2) << "server: connected from " << s_inst.addr << " to " << peer_addr_for_me << dendl;
+ ldout(cct,2) << "New connection session " << session
+ << " xcon " << xcon << " on msgr: " << this << " portal: " << xcon->portal << dendl;
+ ldout(cct,2) << "Server: connected from " << s_inst.addr << " to " << peer_addr_for_me << dendl;
}
break;
case XIO_SESSION_CONNECTION_ERROR_EVENT:
/* XXXX pre-merge of session startup negotiation ONLY! */
xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
- ldout(cct,2) << "new connection xcon: " << xcon <<
- " up_ready on session " << xcon->session << dendl;
+ ldout(cct,2) << "New connection xcon: " << xcon <<
+ " up_ready on session " << xcon->session <<
+ " on msgr: " << this << " portal: " << xcon->portal << dendl;
return xcon->get(); /* nref +1 */
}
friend class XioMessenger;
public:
- explicit XioPortal(Messenger *_msgr) :
- msgr(_msgr), ctx(NULL), server(NULL), submit_q(), xio_uri(""),
- portal_id(NULL), _shutdown(false), drained(false),
- magic(0),
- special_handling(0)
- {
- pthread_spin_init(&sp, PTHREAD_PROCESS_PRIVATE);
- pthread_mutex_init(&mtx, NULL);
-
- /* a portal is an xio_context and event loop */
- ctx = xio_context_create(NULL, 0 /* poll timeout */, -1 /* cpu hint */);
-
- /* associate this XioPortal object with the xio_context handle */
- struct xio_context_attr xca;
- xca.user_context = this;
- xio_modify_context(ctx, &xca, XIO_CONTEXT_ATTR_USER_CTX);
-
- if (magic & (MSG_MAGIC_XIO)) {
- printf("XioPortal %p created ev_loop %p ctx %p\n",
- this, ev_loop, ctx);
- }
- }
+ explicit XioPortal(Messenger *_msgr, int max_conns) :
+ msgr(_msgr), ctx(NULL), server(NULL), submit_q(), xio_uri(""),
+ portal_id(NULL), _shutdown(false), drained(false),
+ magic(0),
+ special_handling(0)
+ {
+ pthread_spin_init(&sp, PTHREAD_PROCESS_PRIVATE);
+ pthread_mutex_init(&mtx, NULL);
+
+ struct xio_context_params ctx_params;
+ memset(&ctx_params, 0, sizeof(ctx_params));
+ ctx_params.user_context = this;
+ /*
+ * hint to Accelio the total number of connections that will share
+ * this context's resources: internal primary task pool...
+ */
+ ctx_params.max_conns_per_ctx = max_conns;
+
+ /* a portal is an xio_context and event loop */
+ ctx = xio_context_create(&ctx_params, 0 /* poll timeout */, -1 /* cpu hint */);
+ assert(ctx && "Whoops, failed to create portal/ctx");
+ }
int bind(struct xio_session_ops *ops, const string &base_uri,
uint16_t port, uint16_t *assigned_port);
vector<XioPortal*> portals;
char **p_vec;
int n;
- int last_use;
+ int last_unused;
+ int max_conns_per_ctx;
public:
- XioPortals(Messenger *msgr, int _n) : p_vec(NULL)
+ XioPortals(Messenger *msgr, int _n, int nconns) : p_vec(NULL), last_unused(0)
{
+ max_conns_per_ctx = nconns;
+ n = max(_n, 1);
/* portal0 */
- portals.push_back(new XioPortal(msgr));
- last_use = 0;
-
- /* enforce at least two portals if bind */
- if (_n < 2)
- _n = 2;
- n = _n;
-
+ portals.push_back(new XioPortal(msgr, nconns));
/* additional portals allocated on bind() */
}
return n;
}
- int get_last_use()
+ int get_last_unused()
{
- int pix = last_use;
- if (++last_use >= get_portals_len() - 1)
- last_use = 0;
+ int pix = last_unused;
+ if (++last_unused >= get_portals_len())
+ last_unused = 0;
return pix;
}
void *cb_user_context)
{
const char **portals_vec = get_vec();
- int pix = get_last_use();
-
- return xio_accept(session,
- (const char **)&(portals_vec[pix]),
- 1, NULL, 0);
+ int pix = get_last_unused();
+
+ if (pix == 0) {
+ return xio_accept(session, NULL, 0, NULL, 0);
+ } else {
+ return xio_accept(session,
+ (const char **)&(portals_vec[pix]),
+ 1, NULL, 0);
+ }
}
void start()
XioPortal *portal;
int p_ix, nportals = portals.size();
- /* portal_0 is the new-session handler, portal_1+ terminate
- * active sessions */
-
- p_vec = new char*[(nportals-1)];
- for (p_ix = 1; p_ix < nportals; ++p_ix) {
+ p_vec = new char*[nportals];
+ for (p_ix = 0; p_ix < nportals; ++p_ix) {
portal = portals[p_ix];
/* shift left */
- p_vec[(p_ix-1)] = (char*) /* portal->xio_uri.c_str() */
+ p_vec[p_ix] = (char*) /* portal->xio_uri.c_str() */
portal->portal_id;
}