if (policy.throttler_messages) {
max_msgs = policy.throttler_messages->get_max();
- ldout(m->cct,0) << "XioMessenger throttle_msgs: " << max_msgs << dendl;
+ ldout(m->cct,4) << "XioMessenger throttle_msgs: " << max_msgs << dendl;
}
xopt = m->cct->_conf->xio_queue_depth;
if (policy.throttler_bytes) {
max_bytes = policy.throttler_bytes->get_max();
- ldout(m->cct,0) << "XioMessenger throttle_bytes: " << max_bytes << dendl;
+ ldout(m->cct,4) << "XioMessenger throttle_bytes: " << max_bytes << dendl;
}
bytes_opt = (2 << 28); /* default: 512 MB */
xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_RCV_QUEUE_DEPTH_BYTES,
&bytes_opt, sizeof(bytes_opt));
- ldout(m->cct,0) << "Peer type: " << peer.name.type_str() <<
+ ldout(m->cct,4) << "Peer type: " << peer.name.type_str() <<
" throttle_msgs: " << xopt << " throttle_bytes: " << bytes_opt << dendl;
/* XXXX fake features, aieee! */
/* XioMessenger */
XioMessenger::XioMessenger(CephContext *cct, entity_name_t name,
- string mname, uint64_t nonce,
+ string mname, uint64_t _nonce,
DispatchStrategy *ds)
- : SimplePolicyMessenger(cct, name, mname, nonce),
+ : SimplePolicyMessenger(cct, name, mname, _nonce),
nsessions(0),
shutdown_called(false),
portals(this, cct->_conf->xio_portal_threads),
loop_con(new XioLoopbackConnection(this)),
special_handling(0),
sh_mtx("XioMessenger session mutex"),
- sh_cond()
+ sh_cond(),
+ need_addr(true),
+ did_bind(false),
+ nonce(_nonce)
{
if (cct->_conf->xio_trace_xcon)
XMSG_MEMPOOL_QUANTUM);
}
+void XioMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
+{
+ // be careful here: multiple threads may block here, and readers of
+ // my_inst.addr do NOT hold any lock.
+
+ // this always goes from true -> false under the protection of the
+ // mutex. if it is already false, we need not retake the mutex at
+ // all.
+ if (!need_addr)
+ return;
+
+ sh_mtx.Lock();
+ if (need_addr) {
+ entity_addr_t t = peer_addr_for_me;
+ t.set_port(my_inst.addr.get_port());
+ my_inst.addr.addr = t.addr;
+ ldout(cct,2) << "learned my addr " << my_inst.addr << dendl;
+ need_addr = false;
+ // init_local_connection();
+ }
+ sh_mtx.Unlock();
+
+}
+
int XioMessenger::new_session(struct xio_session *session,
struct xio_new_session_req *req,
void *cb_user_context)
switch (event_data->event) {
case XIO_SESSION_CONNECTION_ESTABLISHED_EVENT:
+ {
+ struct xio_connection *conn = event_data->conn;
+ struct xio_connection_attr xcona;
+ entity_addr_t peer_addr_for_me, paddr;
+
xcon = static_cast<XioConnection*>(event_data->conn_user_context);
ldout(cct,2) << "connection established " << event_data->conn
<< " session " << session << " xcon " << xcon << dendl;
+ (void) xio_query_connection(conn, &xcona,
+ XIO_CONNECTION_ATTR_LOCAL_ADDR|
+ XIO_CONNECTION_ATTR_PEER_ADDR);
+ (void) entity_addr_from_sockaddr(&peer_addr_for_me, (struct sockaddr *) &xcona.local_addr);
+ (void) entity_addr_from_sockaddr(&paddr, (struct sockaddr *) &xcona.peer_addr);
+ //set_myaddr(peer_addr_for_me);
+ learned_addr(peer_addr_for_me);
+ ldout(cct,2) << "client: connected from " << peer_addr_for_me << " to " << paddr << dendl;
+
/* notify hook */
this->ms_deliver_handle_connect(xcon);
- break;
+ }
+ break;
case XIO_SESSION_NEW_CONNECTION_EVENT:
{
struct xio_connection *conn = event_data->conn;
struct xio_connection_attr xcona;
entity_inst_t s_inst;
+ entity_addr_t peer_addr_for_me;
(void) xio_query_connection(conn, &xcona,
XIO_CONNECTION_ATTR_CTX|
- XIO_CONNECTION_ATTR_PEER_ADDR);
+ XIO_CONNECTION_ATTR_PEER_ADDR|
+ XIO_CONNECTION_ATTR_LOCAL_ADDR);
/* XXX assumes RDMA */
(void) entity_addr_from_sockaddr(&s_inst.addr,
(struct sockaddr *) &xcona.peer_addr);
+ (void) entity_addr_from_sockaddr(&peer_addr_for_me, (struct sockaddr *) &xcona.local_addr);
xcon = new XioConnection(this, XioConnection::PASSIVE, s_inst);
xcon->session = session;
ldout(cct,2) << "new connection session " << session
<< " xcon " << xcon << dendl;
+ ldout(cct,2) << "server: connected from " << s_inst.addr << " to " << peer_addr_for_me << dendl;
}
break;
case XIO_SESSION_CONNECTION_ERROR_EVENT:
if (*ep) {
ldout(cct,0) << "WARNING: 'rdma local trailing garbage ignored: '" << ep << dendl;
}
+ ldout(cct, 2) << "Found rdma_local address " << rdma_local_str.c_str() << dendl;
int p = _addr.get_port();
_addr.set_sockaddr(reinterpret_cast<struct sockaddr *>(
&local_rdma_addr.ss_addr()));
if (r == 0) {
shift_addr.set_port(port0);
set_myaddr(shift_addr);
+ did_bind = true;
}
return r;
} /* bind */
{
portals.start();
dispatch_strategy->start();
+ if (!did_bind) {
+ my_inst.addr.nonce = nonce;
+ }
started = true;
return 0;
}
}
portals.shutdown();
dispatch_strategy->shutdown();
+ did_bind = false;
started = false;
return 0;
} /* shutdown */