From: Sage Weil Date: Fri, 26 Oct 2018 21:36:59 +0000 (-0500) Subject: msg/async: keep listen addr in ServerSocket, pass to new connections X-Git-Tag: v14.1.0~484^2~106 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=3a52d3cf037e021c06f6c0d6bd7260af8600f924;p=ceph.git msg/async: keep listen addr in ServerSocket, pass to new connections When we accept a connection, we want to know what listening addr we accepted on. Because the addr can change after we create teh listening socket (when we learn the addr and fill in the IP portion), instead store the position in our myaddrs addrvec. Signed-off-by: Sage Weil --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 5bc5c69fad9..f3be67904ad 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -462,16 +462,19 @@ void AsyncConnection::_connect() center->dispatch_event_external(read_handler); } -void AsyncConnection::accept(ConnectedSocket socket, entity_addr_t &addr) +void AsyncConnection::accept(ConnectedSocket socket, + const entity_addr_t &listen_addr, + const entity_addr_t &peer_addr) { ldout(async_msgr->cct, 10) << __func__ << " sd=" << socket.fd() - << " on " << addr << dendl; + << " listen_addr " << listen_addr + << " peer_addr " << peer_addr << dendl; ceph_assert(socket.fd() >= 0); std::lock_guard l(lock); cs = std::move(socket); - socket_addr = addr; - target_addr = addr; // until we know better + socket_addr = listen_addr; + target_addr = peer_addr; // until we know better state = STATE_ACCEPTING; protocol->accept(); // rescheduler connection in order to avoid lock dep @@ -485,7 +488,7 @@ int AsyncConnection::send_message(Message *m) 1) << "-- " << async_msgr->get_myaddrs() << " --> " << get_peer_addrs() << " -- " << *m << " -- " << m << " con " - << m->get_connection().get() + << this << dendl; // optimistic think it's ok to encode(actually may broken now) diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 17bd1b39a93..26a3b949d0d 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -120,7 +120,9 @@ class AsyncConnection : public Connection { void connect(const entity_addrvec_t& addrs, int type, entity_addr_t& target); // Only call when AsyncConnection first construct - void accept(ConnectedSocket socket, entity_addr_t &addr); + void accept(ConnectedSocket socket, + const entity_addr_t &listen_addr, + const entity_addr_t &peer_addr); int send_message(Message *m) override; void send_keepalive() override; @@ -191,8 +193,8 @@ class AsyncConnection : public Connection { // Accepting state bool msgr2 = false; - entity_addr_t socket_addr; - entity_addr_t target_addr; // which of the peer_addrs we're using + entity_addr_t socket_addr; ///< local socket addr + entity_addr_t target_addr; ///< which of the peer_addrs we're using // used only by "read_until" uint64_t state_offset; diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 062fbdb34ff..bcb74c4bd26 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -88,9 +88,10 @@ int Processor::bind(const entity_addrvec_t &bind_addrs, } if (listen_addr.get_port()) { - worker->center.submit_to(worker->center.get_id(), - [this, k, &listen_addr, &opts, &r]() { - r = worker->listen(listen_addr, opts, &listen_sockets[k]); + worker->center.submit_to( + worker->center.get_id(), + [this, k, &listen_addr, &opts, &r]() { + r = worker->listen(listen_addr, k, opts, &listen_sockets[k]); }, false); if (r < 0) { lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr @@ -106,9 +107,10 @@ int Processor::bind(const entity_addrvec_t &bind_addrs, continue; listen_addr.set_port(port); - worker->center.submit_to(worker->center.get_id(), - [this, k, &listen_addr, &opts, &r]() { - r = worker->listen(listen_addr, opts, &listen_sockets[k]); + worker->center.submit_to( + worker->center.get_id(), + [this, k, &listen_addr, &opts, &r]() { + r = worker->listen(listen_addr, k, opts, &listen_sockets[k]); }, false); if (r == 0) break; @@ -186,7 +188,10 @@ void Processor::accept() ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << cli_socket.fd() << dendl; - msgr->add_accept(w, std::move(cli_socket), addr); + msgr->add_accept( + w, std::move(cli_socket), + msgr->get_myaddrs().v[listen_socket.get_addr_slot()], + addr); accept_error_num = 0; continue; } else { @@ -549,12 +554,14 @@ void AsyncMessenger::wait() started = false; } -void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr) +void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, + const entity_addr_t &listen_addr, + const entity_addr_t &peer_addr) { lock.Lock(); AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w, - addr.is_msgr2(), false); - conn->accept(std::move(cli_socket), addr); + listen_addr.is_msgr2(), false); + conn->accept(std::move(cli_socket), listen_addr, peer_addr); accepting_conns.insert(conn); lock.Unlock(); } diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 3f2333b9500..b3a826a92c4 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -373,7 +373,9 @@ public: } void learned_addr(const entity_addr_t &peer_addr_for_me); - void add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr); + void add_accept(Worker *w, ConnectedSocket cli_socket, + const entity_addr_t &listen_addr, + const entity_addr_t &peer_addr); NetworkStack *get_stack() { return stack; } diff --git a/src/msg/async/PosixStack.cc b/src/msg/async/PosixStack.cc index 9411a31de31..0b3f678390e 100644 --- a/src/msg/async/PosixStack.cc +++ b/src/msg/async/PosixStack.cc @@ -170,8 +170,9 @@ class PosixServerSocketImpl : public ServerSocketImpl { int _fd; public: - explicit PosixServerSocketImpl(NetHandler &h, int f, int type) - : ServerSocketImpl(type), + explicit PosixServerSocketImpl(NetHandler &h, int f, + const entity_addr_t& listen_addr, unsigned slot) + : ServerSocketImpl(listen_addr.get_type(), slot), handler(h), _fd(f) {} int accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out, Worker *w) override; void abort_accept() override { @@ -218,7 +219,9 @@ void PosixWorker::initialize() { } -int PosixWorker::listen(entity_addr_t &sa, const SocketOptions &opt, +int PosixWorker::listen(entity_addr_t &sa, + unsigned addr_slot, + const SocketOptions &opt, ServerSocket *sock) { int listen_sd = net.create_socket(sa.get_family(), true); @@ -257,7 +260,7 @@ int PosixWorker::listen(entity_addr_t &sa, const SocketOptions &opt, *sock = ServerSocket( std::unique_ptr( - new PosixServerSocketImpl(net, listen_sd, sa.get_type()))); + new PosixServerSocketImpl(net, listen_sd, sa, addr_slot))); return 0; } diff --git a/src/msg/async/PosixStack.h b/src/msg/async/PosixStack.h index e70fa650b0e..bc55c4d9eb6 100644 --- a/src/msg/async/PosixStack.h +++ b/src/msg/async/PosixStack.h @@ -30,8 +30,10 @@ class PosixWorker : public Worker { public: PosixWorker(CephContext *c, unsigned i) : Worker(c, i), net(c) {} - int listen(entity_addr_t &sa, const SocketOptions &opt, - ServerSocket *socks) override; + int listen(entity_addr_t &sa, + unsigned addr_slot, + const SocketOptions &opt, + ServerSocket *socks) override; int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override; }; diff --git a/src/msg/async/Protocol.cc b/src/msg/async/Protocol.cc index 5e202412502..0585fabeb72 100644 --- a/src/msg/async/Protocol.cc +++ b/src/msg/async/Protocol.cc @@ -1727,10 +1727,13 @@ CtPtr ProtocolV1::send_server_banner() { auto legacy = messenger->get_myaddrs().legacy_addr(); encode(legacy, bl, 0); // legacy connection->port = legacy.get_port(); - encode(connection->socket_addr, bl, 0); // legacy + encode(connection->target_addr, bl, 0); // legacy - ldout(cct, 1) << __func__ << " sd=" << connection->cs.fd() << " " - << connection->socket_addr << dendl; + ldout(cct, 1) << __func__ << " sd=" << connection->cs.fd() + << " legacy " << legacy + << " socket_addr " << connection->socket_addr + << " target_addr " << connection->target_addr + << dendl; return WRITE(bl, handle_server_banner_write); } @@ -1785,7 +1788,7 @@ CtPtr ProtocolV1::handle_client_banner(char *buffer, int r) { if (peer_addr.is_blank_ip()) { // peer apparently doesn't know what ip they have; figure it out for them. int port = peer_addr.get_port(); - peer_addr.u = connection->socket_addr.u; + peer_addr.u = connection->target_addr.u; peer_addr.set_port(port); ldout(cct, 0) << __func__ << " accept peer addr is really " << peer_addr diff --git a/src/msg/async/Stack.h b/src/msg/async/Stack.h index 32f9a5b2ee0..bccdaab5e65 100644 --- a/src/msg/async/Stack.h +++ b/src/msg/async/Stack.h @@ -47,8 +47,10 @@ struct SocketOptions { /// \cond internal class ServerSocketImpl { public: - int addr_type = 0; - ServerSocketImpl(int t) : addr_type(t) {} + unsigned addr_type; ///< entity_addr_t::TYPE_* + unsigned addr_slot; ///< position of our addr in myaddrs().v + ServerSocketImpl(unsigned type, unsigned slot) + : addr_type(type), addr_slot(slot) {} virtual ~ServerSocketImpl() {} virtual int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) = 0; virtual void abort_accept() = 0; @@ -178,6 +180,11 @@ class ServerSocket { return _ssi->fd(); } + /// get listen/bind addr + unsigned get_addr_slot() { + return _ssi->addr_slot; + } + explicit operator bool() const { return _ssi.get(); } @@ -250,7 +257,7 @@ class Worker { } } - virtual int listen(entity_addr_t &addr, + virtual int listen(entity_addr_t &addr, unsigned addr_slot, const SocketOptions &opts, ServerSocket *) = 0; virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) = 0; diff --git a/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc b/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc index f0f82f53f93..7ffb8fbe56a 100644 --- a/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc +++ b/src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc @@ -7,13 +7,15 @@ #undef dout_prefix #define dout_prefix *_dout << " RDMAIWARPServerSocketImpl " -RDMAIWARPServerSocketImpl::RDMAIWARPServerSocketImpl(CephContext *cct, Infiniband* i, - RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a) - : RDMAServerSocketImpl(cct, i, s, w, a) +RDMAIWARPServerSocketImpl::RDMAIWARPServerSocketImpl( + CephContext *cct, Infiniband* i, + RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a, unsigned addr_slot) + : RDMAServerSocketImpl(cct, i, s, w, a, addr_slot) { } -int RDMAIWARPServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt) +int RDMAIWARPServerSocketImpl::listen(entity_addr_t &sa, + const SocketOptions &opt) { ldout(cct, 20) << __func__ << " bind to rdma point" << dendl; cm_channel = rdma_create_event_channel(); diff --git a/src/msg/async/rdma/RDMAServerSocketImpl.cc b/src/msg/async/rdma/RDMAServerSocketImpl.cc index 1bdf5e71658..98402cfd354 100644 --- a/src/msg/async/rdma/RDMAServerSocketImpl.cc +++ b/src/msg/async/rdma/RDMAServerSocketImpl.cc @@ -26,8 +26,8 @@ RDMAServerSocketImpl::RDMAServerSocketImpl( CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, - entity_addr_t& a) - : ServerSocketImpl(a.get_type()), + entity_addr_t& a, unsigned slot) + : ServerSocketImpl(a.get_type(), slot), cct(cct), net(cct), server_setup_socket(-1), infiniband(i), dispatcher(s), worker(w), sa(a) { diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc index 9c05f23be9d..d8e2a4957cb 100644 --- a/src/msg/async/rdma/RDMAStack.cc +++ b/src/msg/async/rdma/RDMAStack.cc @@ -483,15 +483,20 @@ void RDMAWorker::initialize() } } -int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock) +int RDMAWorker::listen(entity_addr_t &sa, unsigned addr_slot, + const SocketOptions &opt,ServerSocket *sock) { get_stack()->get_infiniband().init(); dispatcher->polling_start(); RDMAServerSocketImpl *p; if (cct->_conf->ms_async_rdma_type == "iwarp") { - p = new RDMAIWARPServerSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this, sa); + p = new RDMAIWARPServerSocketImpl( + cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this, + sa, addr_slot); } else { - p = new RDMAServerSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this, sa); + p = new RDMAServerSocketImpl(cct, &get_stack()->get_infiniband(), + &get_stack()->get_dispatcher(), this, sa, + addr_slot); } int r = p->listen(sa, opt); if (r < 0) { diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h index f363d2fefed..c10a5389bcf 100644 --- a/src/msg/async/rdma/RDMAStack.h +++ b/src/msg/async/rdma/RDMAStack.h @@ -147,7 +147,9 @@ class RDMAWorker : public Worker { PerfCounters *perf_logger; explicit RDMAWorker(CephContext *c, unsigned i); virtual ~RDMAWorker(); - virtual int listen(entity_addr_t &addr, const SocketOptions &opts, ServerSocket *) override; + virtual int listen(entity_addr_t &addr, + unsigned addr_slot, + const SocketOptions &opts, ServerSocket *) override; virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override; virtual void initialize() override; RDMAStack *get_stack() { return stack; } @@ -309,7 +311,7 @@ class RDMAServerSocketImpl : public ServerSocketImpl { public: RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, - RDMAWorker *w, entity_addr_t& a); + RDMAWorker *w, entity_addr_t& a, unsigned slot); virtual int listen(entity_addr_t &sa, const SocketOptions &opt); virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override; @@ -320,7 +322,9 @@ class RDMAServerSocketImpl : public ServerSocketImpl { class RDMAIWARPServerSocketImpl : public RDMAServerSocketImpl { public: - RDMAIWARPServerSocketImpl(CephContext *cct, Infiniband *i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a); + RDMAIWARPServerSocketImpl( + CephContext *cct, Infiniband *i, RDMADispatcher *s, RDMAWorker *w, + entity_addr_t& addr, unsigned addr_slot); virtual int listen(entity_addr_t &sa, const SocketOptions &opt) override; virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override; virtual void abort_accept() override; diff --git a/src/test/msgr/test_async_networkstack.cc b/src/test/msgr/test_async_networkstack.cc index 921539b1f9b..88a196b05b6 100644 --- a/src/test/msgr/test_async_networkstack.cc +++ b/src/test/msgr/test_async_networkstack.cc @@ -185,7 +185,7 @@ TEST_P(NetworkWorkerTest, SimpleTest) { EventCenter *center = &worker->center; ssize_t r = 0; if (stack->support_local_listen_table() || worker->id == 0) - r = worker->listen(bind_addr, options, &bind_socket); + r = worker->listen(bind_addr, 0, options, &bind_socket); ASSERT_EQ(0, r); ConnectedSocket cli_socket, srv_socket; @@ -287,7 +287,7 @@ TEST_P(NetworkWorkerTest, ConnectFailedTest) { ServerSocket bind_socket; int r = 0; if (stack->support_local_listen_table() || worker->id == 0) - r = worker->listen(bind_addr, options, &bind_socket); + r = worker->listen(bind_addr, 0, options, &bind_socket); ASSERT_EQ(0, r); ConnectedSocket cli_socket1, cli_socket2; @@ -328,10 +328,10 @@ TEST_P(NetworkWorkerTest, ListenTest) { ASSERT_TRUE(bind_addr.parse(get_addr().c_str())); SocketOptions options; ServerSocket bind_socket1, bind_socket2; - int r = worker->listen(bind_addr, options, &bind_socket1); + int r = worker->listen(bind_addr, 0, options, &bind_socket1); ASSERT_EQ(0, r); - r = worker->listen(bind_addr, options, &bind_socket2); + r = worker->listen(bind_addr, 0, options, &bind_socket2); ASSERT_EQ(-EADDRINUSE, r); } @@ -350,7 +350,7 @@ TEST_P(NetworkWorkerTest, AcceptAndCloseTest) { { ServerSocket bind_socket; if (stack->support_local_listen_table() || worker->id == 0) - r = worker->listen(bind_addr, options, &bind_socket); + r = worker->listen(bind_addr, 0, options, &bind_socket); ASSERT_EQ(0, r); ConnectedSocket srv_socket, cli_socket; @@ -457,7 +457,7 @@ TEST_P(NetworkWorkerTest, ComplexTest) { ServerSocket bind_socket; int r = 0; if (stack->support_local_listen_table() || worker->id == 0) { - r = worker->listen(bind_addr, options, &bind_socket); + r = worker->listen(bind_addr, 0, options, &bind_socket); ASSERT_EQ(0, r); *listen_p = true; } @@ -1010,7 +1010,7 @@ class StressFactory { t_data.worker = worker; ServerSocket bind_socket; if (stack->support_local_listen_table() || worker->id == 0) { - r = worker->listen(bind_addr, options, &bind_socket); + r = worker->listen(bind_addr, 0, options, &bind_socket); ASSERT_EQ(0, r); already_bind = true; }