From 6158f18fb4ef4c7f58f58a4131f1227cb9a48f5c Mon Sep 17 00:00:00 2001 From: Raju Kurunkad Date: Tue, 5 May 2015 19:40:38 +0530 Subject: [PATCH] Update XIO client connection IP and nonce Obtain the local IP of the client and save the nonce provided when the messenger was created. This is required for RBD lock/unlock Fix script error in RBD concurrent test Reset did_bind during messenger shutdown Signed-off-by: Raju Kurunkad --- qa/workunits/rbd/concurrent.sh | 4 +-- src/msg/xio/XioConnection.cc | 6 ++-- src/msg/xio/XioMessenger.cc | 62 +++++++++++++++++++++++++++++++--- src/msg/xio/XioMessenger.h | 14 ++++++++ 4 files changed, 76 insertions(+), 10 deletions(-) diff --git a/qa/workunits/rbd/concurrent.sh b/qa/workunits/rbd/concurrent.sh index ceb4563567fe..8b30b4ab19b1 100755 --- a/qa/workunits/rbd/concurrent.sh +++ b/qa/workunits/rbd/concurrent.sh @@ -283,7 +283,7 @@ function rbd_write_image() { # Offset and size here are meant to ensure beginning and end # cross both (4K or 64K) page and (4MB) rbd object boundaries. # It assumes the SOURCE_DATA file has size 66 * 2048 bytes - dd "${SOURCE_DATA}" of="/dev/rbd${id}" bs=2048 seek=2015 \ + dd if="${SOURCE_DATA}" of="/dev/rbd${id}" bs=2048 seek=2015 \ > /dev/null 2>&1 } @@ -323,7 +323,7 @@ function rbd_read_image() { # zero-fills unwritten data when the target object doesn't # exist. dd if="/dev/rbd${id}" of=/dev/null bs=2048 count=34 skip=4098 \ - /dev/null 2>&1 + > /dev/null 2>&1 } function rbd_unmap_image() { diff --git a/src/msg/xio/XioConnection.cc b/src/msg/xio/XioConnection.cc index 3b63320a5c04..83784955c012 100644 --- a/src/msg/xio/XioConnection.cc +++ b/src/msg/xio/XioConnection.cc @@ -106,7 +106,7 @@ XioConnection::XioConnection(XioMessenger *m, XioConnection::type _type, if (policy.throttler_messages) { max_msgs = policy.throttler_messages->get_max(); - ldout(m->cct,0) << "XioMessenger throttle_msgs: " << max_msgs << dendl; + ldout(m->cct,4) << "XioMessenger throttle_msgs: " << max_msgs << dendl; } xopt = m->cct->_conf->xio_queue_depth; @@ -125,7 +125,7 @@ XioConnection::XioConnection(XioMessenger *m, XioConnection::type _type, if (policy.throttler_bytes) { max_bytes = policy.throttler_bytes->get_max(); - ldout(m->cct,0) << "XioMessenger throttle_bytes: " << max_bytes << dendl; + ldout(m->cct,4) << "XioMessenger throttle_bytes: " << max_bytes << dendl; } bytes_opt = (2 << 28); /* default: 512 MB */ @@ -138,7 +138,7 @@ XioConnection::XioConnection(XioMessenger *m, XioConnection::type _type, xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_RCV_QUEUE_DEPTH_BYTES, &bytes_opt, sizeof(bytes_opt)); - ldout(m->cct,0) << "Peer type: " << peer.name.type_str() << + ldout(m->cct,4) << "Peer type: " << peer.name.type_str() << " throttle_msgs: " << xopt << " throttle_bytes: " << bytes_opt << dendl; /* XXXX fake features, aieee! */ diff --git a/src/msg/xio/XioMessenger.cc b/src/msg/xio/XioMessenger.cc index 7d0a88cf58e9..198fb00c4ad1 100644 --- a/src/msg/xio/XioMessenger.cc +++ b/src/msg/xio/XioMessenger.cc @@ -250,9 +250,9 @@ static string xio_uri_from_entity(const string &type, /* XioMessenger */ XioMessenger::XioMessenger(CephContext *cct, entity_name_t name, - string mname, uint64_t nonce, + string mname, uint64_t _nonce, DispatchStrategy *ds) - : SimplePolicyMessenger(cct, name, mname, nonce), + : SimplePolicyMessenger(cct, name, mname, _nonce), nsessions(0), shutdown_called(false), portals(this, cct->_conf->xio_portal_threads), @@ -260,7 +260,10 @@ XioMessenger::XioMessenger(CephContext *cct, entity_name_t name, loop_con(new XioLoopbackConnection(this)), special_handling(0), sh_mtx("XioMessenger session mutex"), - sh_cond() + sh_cond(), + need_addr(true), + did_bind(false), + nonce(_nonce) { if (cct->_conf->xio_trace_xcon) @@ -388,6 +391,30 @@ int XioMessenger::pool_hint(uint32_t dsize) { XMSG_MEMPOOL_QUANTUM); } +void XioMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) +{ + // be careful here: multiple threads may block here, and readers of + // my_inst.addr do NOT hold any lock. + + // this always goes from true -> false under the protection of the + // mutex. if it is already false, we need not retake the mutex at + // all. + if (!need_addr) + return; + + sh_mtx.Lock(); + if (need_addr) { + entity_addr_t t = peer_addr_for_me; + t.set_port(my_inst.addr.get_port()); + my_inst.addr.addr = t.addr; + ldout(cct,2) << "learned my addr " << my_inst.addr << dendl; + need_addr = false; + // init_local_connection(); + } + sh_mtx.Unlock(); + +} + int XioMessenger::new_session(struct xio_session *session, struct xio_new_session_req *req, void *cb_user_context) @@ -410,27 +437,45 @@ int XioMessenger::session_event(struct xio_session *session, switch (event_data->event) { case XIO_SESSION_CONNECTION_ESTABLISHED_EVENT: + { + struct xio_connection *conn = event_data->conn; + struct xio_connection_attr xcona; + entity_addr_t peer_addr_for_me, paddr; + xcon = static_cast(event_data->conn_user_context); ldout(cct,2) << "connection established " << event_data->conn << " session " << session << " xcon " << xcon << dendl; + (void) xio_query_connection(conn, &xcona, + XIO_CONNECTION_ATTR_LOCAL_ADDR| + XIO_CONNECTION_ATTR_PEER_ADDR); + (void) entity_addr_from_sockaddr(&peer_addr_for_me, (struct sockaddr *) &xcona.local_addr); + (void) entity_addr_from_sockaddr(&paddr, (struct sockaddr *) &xcona.peer_addr); + //set_myaddr(peer_addr_for_me); + learned_addr(peer_addr_for_me); + ldout(cct,2) << "client: connected from " << peer_addr_for_me << " to " << paddr << dendl; + /* notify hook */ this->ms_deliver_handle_connect(xcon); - break; + } + break; case XIO_SESSION_NEW_CONNECTION_EVENT: { struct xio_connection *conn = event_data->conn; struct xio_connection_attr xcona; entity_inst_t s_inst; + entity_addr_t peer_addr_for_me; (void) xio_query_connection(conn, &xcona, XIO_CONNECTION_ATTR_CTX| - XIO_CONNECTION_ATTR_PEER_ADDR); + XIO_CONNECTION_ATTR_PEER_ADDR| + XIO_CONNECTION_ATTR_LOCAL_ADDR); /* XXX assumes RDMA */ (void) entity_addr_from_sockaddr(&s_inst.addr, (struct sockaddr *) &xcona.peer_addr); + (void) entity_addr_from_sockaddr(&peer_addr_for_me, (struct sockaddr *) &xcona.local_addr); xcon = new XioConnection(this, XioConnection::PASSIVE, s_inst); xcon->session = session; @@ -460,6 +505,7 @@ int XioMessenger::session_event(struct xio_session *session, ldout(cct,2) << "new connection session " << session << " xcon " << xcon << dendl; + ldout(cct,2) << "server: connected from " << s_inst.addr << " to " << peer_addr_for_me << dendl; } break; case XIO_SESSION_CONNECTION_ERROR_EVENT: @@ -664,6 +710,7 @@ int XioMessenger::bind(const entity_addr_t& addr) if (*ep) { ldout(cct,0) << "WARNING: 'rdma local trailing garbage ignored: '" << ep << dendl; } + ldout(cct, 2) << "Found rdma_local address " << rdma_local_str.c_str() << dendl; int p = _addr.get_port(); _addr.set_sockaddr(reinterpret_cast( &local_rdma_addr.ss_addr())); @@ -685,6 +732,7 @@ int XioMessenger::bind(const entity_addr_t& addr) if (r == 0) { shift_addr.set_port(port0); set_myaddr(shift_addr); + did_bind = true; } return r; } /* bind */ @@ -699,6 +747,9 @@ int XioMessenger::start() { portals.start(); dispatch_strategy->start(); + if (!did_bind) { + my_inst.addr.nonce = nonce; + } started = true; return 0; } @@ -897,6 +948,7 @@ int XioMessenger::shutdown() } portals.shutdown(); dispatch_strategy->shutdown(); + did_bind = false; started = false; return 0; } /* shutdown */ diff --git a/src/msg/xio/XioMessenger.h b/src/msg/xio/XioMessenger.h index fe947fa663be..3814eccd092f 100644 --- a/src/msg/xio/XioMessenger.h +++ b/src/msg/xio/XioMessenger.h @@ -43,6 +43,11 @@ private: uint32_t special_handling; Mutex sh_mtx; Cond sh_cond; + bool need_addr; + bool did_bind; + + /// approximately unique ID set by the Constructor for use in entity_addr_t + uint64_t nonce; friend class XioConnection; @@ -132,6 +137,15 @@ public: void ds_dispatch(Message *m) { dispatch_strategy->ds_dispatch(m); } + /** + * Tell the XioMessenger its full IP address. + * + * This is used by clients when connecting to other endpoints, and + * probably shouldn't be called by anybody else. + */ + void learned_addr(const entity_addr_t& peer_addr_for_me); + + protected: virtual void ready() { } -- 2.47.3