From: Raju Kurunkad Date: Tue, 5 May 2015 14:10:38 +0000 (+0530) Subject: Update XIO client connection IP and nonce X-Git-Tag: v9.0.2~201^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=6158f18fb4ef4c7f58f58a4131f1227cb9a48f5c;p=ceph.git Update XIO client connection IP and nonce Obtain the local IP of the client and save the nonce provided when the messenger was created. This is required for RBD lock/unlock Fix script error in RBD concurrent test Reset did_bind during messenger shutdown Signed-off-by: Raju Kurunkad --- diff --git a/qa/workunits/rbd/concurrent.sh b/qa/workunits/rbd/concurrent.sh index ceb4563567fe..8b30b4ab19b1 100755 --- a/qa/workunits/rbd/concurrent.sh +++ b/qa/workunits/rbd/concurrent.sh @@ -283,7 +283,7 @@ function rbd_write_image() { # Offset and size here are meant to ensure beginning and end # cross both (4K or 64K) page and (4MB) rbd object boundaries. # It assumes the SOURCE_DATA file has size 66 * 2048 bytes - dd "${SOURCE_DATA}" of="/dev/rbd${id}" bs=2048 seek=2015 \ + dd if="${SOURCE_DATA}" of="/dev/rbd${id}" bs=2048 seek=2015 \ > /dev/null 2>&1 } @@ -323,7 +323,7 @@ function rbd_read_image() { # zero-fills unwritten data when the target object doesn't # exist. dd if="/dev/rbd${id}" of=/dev/null bs=2048 count=34 skip=4098 \ - /dev/null 2>&1 + > /dev/null 2>&1 } function rbd_unmap_image() { diff --git a/src/msg/xio/XioConnection.cc b/src/msg/xio/XioConnection.cc index 3b63320a5c04..83784955c012 100644 --- a/src/msg/xio/XioConnection.cc +++ b/src/msg/xio/XioConnection.cc @@ -106,7 +106,7 @@ XioConnection::XioConnection(XioMessenger *m, XioConnection::type _type, if (policy.throttler_messages) { max_msgs = policy.throttler_messages->get_max(); - ldout(m->cct,0) << "XioMessenger throttle_msgs: " << max_msgs << dendl; + ldout(m->cct,4) << "XioMessenger throttle_msgs: " << max_msgs << dendl; } xopt = m->cct->_conf->xio_queue_depth; @@ -125,7 +125,7 @@ XioConnection::XioConnection(XioMessenger *m, XioConnection::type _type, if (policy.throttler_bytes) { max_bytes = policy.throttler_bytes->get_max(); - ldout(m->cct,0) << "XioMessenger throttle_bytes: " << max_bytes << dendl; + ldout(m->cct,4) << "XioMessenger throttle_bytes: " << max_bytes << dendl; } bytes_opt = (2 << 28); /* default: 512 MB */ @@ -138,7 +138,7 @@ XioConnection::XioConnection(XioMessenger *m, XioConnection::type _type, xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_RCV_QUEUE_DEPTH_BYTES, &bytes_opt, sizeof(bytes_opt)); - ldout(m->cct,0) << "Peer type: " << peer.name.type_str() << + ldout(m->cct,4) << "Peer type: " << peer.name.type_str() << " throttle_msgs: " << xopt << " throttle_bytes: " << bytes_opt << dendl; /* XXXX fake features, aieee! */ diff --git a/src/msg/xio/XioMessenger.cc b/src/msg/xio/XioMessenger.cc index 7d0a88cf58e9..198fb00c4ad1 100644 --- a/src/msg/xio/XioMessenger.cc +++ b/src/msg/xio/XioMessenger.cc @@ -250,9 +250,9 @@ static string xio_uri_from_entity(const string &type, /* XioMessenger */ XioMessenger::XioMessenger(CephContext *cct, entity_name_t name, - string mname, uint64_t nonce, + string mname, uint64_t _nonce, DispatchStrategy *ds) - : SimplePolicyMessenger(cct, name, mname, nonce), + : SimplePolicyMessenger(cct, name, mname, _nonce), nsessions(0), shutdown_called(false), portals(this, cct->_conf->xio_portal_threads), @@ -260,7 +260,10 @@ XioMessenger::XioMessenger(CephContext *cct, entity_name_t name, loop_con(new XioLoopbackConnection(this)), special_handling(0), sh_mtx("XioMessenger session mutex"), - sh_cond() + sh_cond(), + need_addr(true), + did_bind(false), + nonce(_nonce) { if (cct->_conf->xio_trace_xcon) @@ -388,6 +391,30 @@ int XioMessenger::pool_hint(uint32_t dsize) { XMSG_MEMPOOL_QUANTUM); } +void XioMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) +{ + // be careful here: multiple threads may block here, and readers of + // my_inst.addr do NOT hold any lock. + + // this always goes from true -> false under the protection of the + // mutex. if it is already false, we need not retake the mutex at + // all. + if (!need_addr) + return; + + sh_mtx.Lock(); + if (need_addr) { + entity_addr_t t = peer_addr_for_me; + t.set_port(my_inst.addr.get_port()); + my_inst.addr.addr = t.addr; + ldout(cct,2) << "learned my addr " << my_inst.addr << dendl; + need_addr = false; + // init_local_connection(); + } + sh_mtx.Unlock(); + +} + int XioMessenger::new_session(struct xio_session *session, struct xio_new_session_req *req, void *cb_user_context) @@ -410,27 +437,45 @@ int XioMessenger::session_event(struct xio_session *session, switch (event_data->event) { case XIO_SESSION_CONNECTION_ESTABLISHED_EVENT: + { + struct xio_connection *conn = event_data->conn; + struct xio_connection_attr xcona; + entity_addr_t peer_addr_for_me, paddr; + xcon = static_cast(event_data->conn_user_context); ldout(cct,2) << "connection established " << event_data->conn << " session " << session << " xcon " << xcon << dendl; + (void) xio_query_connection(conn, &xcona, + XIO_CONNECTION_ATTR_LOCAL_ADDR| + XIO_CONNECTION_ATTR_PEER_ADDR); + (void) entity_addr_from_sockaddr(&peer_addr_for_me, (struct sockaddr *) &xcona.local_addr); + (void) entity_addr_from_sockaddr(&paddr, (struct sockaddr *) &xcona.peer_addr); + //set_myaddr(peer_addr_for_me); + learned_addr(peer_addr_for_me); + ldout(cct,2) << "client: connected from " << peer_addr_for_me << " to " << paddr << dendl; + /* notify hook */ this->ms_deliver_handle_connect(xcon); - break; + } + break; case XIO_SESSION_NEW_CONNECTION_EVENT: { struct xio_connection *conn = event_data->conn; struct xio_connection_attr xcona; entity_inst_t s_inst; + entity_addr_t peer_addr_for_me; (void) xio_query_connection(conn, &xcona, XIO_CONNECTION_ATTR_CTX| - XIO_CONNECTION_ATTR_PEER_ADDR); + XIO_CONNECTION_ATTR_PEER_ADDR| + XIO_CONNECTION_ATTR_LOCAL_ADDR); /* XXX assumes RDMA */ (void) entity_addr_from_sockaddr(&s_inst.addr, (struct sockaddr *) &xcona.peer_addr); + (void) entity_addr_from_sockaddr(&peer_addr_for_me, (struct sockaddr *) &xcona.local_addr); xcon = new XioConnection(this, XioConnection::PASSIVE, s_inst); xcon->session = session; @@ -460,6 +505,7 @@ int XioMessenger::session_event(struct xio_session *session, ldout(cct,2) << "new connection session " << session << " xcon " << xcon << dendl; + ldout(cct,2) << "server: connected from " << s_inst.addr << " to " << peer_addr_for_me << dendl; } break; case XIO_SESSION_CONNECTION_ERROR_EVENT: @@ -664,6 +710,7 @@ int XioMessenger::bind(const entity_addr_t& addr) if (*ep) { ldout(cct,0) << "WARNING: 'rdma local trailing garbage ignored: '" << ep << dendl; } + ldout(cct, 2) << "Found rdma_local address " << rdma_local_str.c_str() << dendl; int p = _addr.get_port(); _addr.set_sockaddr(reinterpret_cast( &local_rdma_addr.ss_addr())); @@ -685,6 +732,7 @@ int XioMessenger::bind(const entity_addr_t& addr) if (r == 0) { shift_addr.set_port(port0); set_myaddr(shift_addr); + did_bind = true; } return r; } /* bind */ @@ -699,6 +747,9 @@ int XioMessenger::start() { portals.start(); dispatch_strategy->start(); + if (!did_bind) { + my_inst.addr.nonce = nonce; + } started = true; return 0; } @@ -897,6 +948,7 @@ int XioMessenger::shutdown() } portals.shutdown(); dispatch_strategy->shutdown(); + did_bind = false; started = false; return 0; } /* shutdown */ diff --git a/src/msg/xio/XioMessenger.h b/src/msg/xio/XioMessenger.h index fe947fa663be..3814eccd092f 100644 --- a/src/msg/xio/XioMessenger.h +++ b/src/msg/xio/XioMessenger.h @@ -43,6 +43,11 @@ private: uint32_t special_handling; Mutex sh_mtx; Cond sh_cond; + bool need_addr; + bool did_bind; + + /// approximately unique ID set by the Constructor for use in entity_addr_t + uint64_t nonce; friend class XioConnection; @@ -132,6 +137,15 @@ public: void ds_dispatch(Message *m) { dispatch_strategy->ds_dispatch(m); } + /** + * Tell the XioMessenger its full IP address. + * + * This is used by clients when connecting to other endpoints, and + * probably shouldn't be called by anybody else. + */ + void learned_addr(const entity_addr_t& peer_addr_for_me); + + protected: virtual void ready() { }