]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: keep listen addr in ServerSocket, pass to new connections
authorSage Weil <sage@redhat.com>
Fri, 26 Oct 2018 21:36:59 +0000 (16:36 -0500)
committerSage Weil <sage@redhat.com>
Fri, 21 Dec 2018 21:30:18 +0000 (15:30 -0600)
When we accept a connection, we want to know what listening addr we
accepted on.  Because the addr can change after we create teh listening socket
(when we learn the addr and fill in the IP portion), instead store the position
in our myaddrs addrvec.

Signed-off-by: Sage Weil <sage@redhat.com>
13 files changed:
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h
src/msg/async/PosixStack.cc
src/msg/async/PosixStack.h
src/msg/async/Protocol.cc
src/msg/async/Stack.h
src/msg/async/rdma/RDMAIWARPServerSocketImpl.cc
src/msg/async/rdma/RDMAServerSocketImpl.cc
src/msg/async/rdma/RDMAStack.cc
src/msg/async/rdma/RDMAStack.h
src/test/msgr/test_async_networkstack.cc

index 5bc5c69fad96737a007a5525bde14c77dedf023d..f3be67904ade36e6c1db875eff4f3fd4ad9ecc96 100644 (file)
@@ -462,16 +462,19 @@ void AsyncConnection::_connect()
   center->dispatch_event_external(read_handler);
 }
 
-void AsyncConnection::accept(ConnectedSocket socket, entity_addr_t &addr)
+void AsyncConnection::accept(ConnectedSocket socket,
+                            const entity_addr_t &listen_addr,
+                            const entity_addr_t &peer_addr)
 {
   ldout(async_msgr->cct, 10) << __func__ << " sd=" << socket.fd()
-                            << " on " << addr << dendl;
+                            << " listen_addr " << listen_addr
+                            << " peer_addr " << peer_addr << dendl;
   ceph_assert(socket.fd() >= 0);
 
   std::lock_guard<std::mutex> l(lock);
   cs = std::move(socket);
-  socket_addr = addr;
-  target_addr = addr; // until we know better
+  socket_addr = listen_addr;
+  target_addr = peer_addr; // until we know better
   state = STATE_ACCEPTING;
   protocol->accept();
   // rescheduler connection in order to avoid lock dep
@@ -485,7 +488,7 @@ int AsyncConnection::send_message(Message *m)
                   1) << "-- " << async_msgr->get_myaddrs() << " --> "
                      << get_peer_addrs() << " -- "
                      << *m << " -- " << m << " con "
-                     << m->get_connection().get()
+                     << this
                      << dendl;
 
   // optimistic think it's ok to encode(actually may broken now)
index 17bd1b39a93cb9f9b082520d1e9ba26305850cce..26a3b949d0dd5e348d7d06dd4b9b690d897dac0a 100644 (file)
@@ -120,7 +120,9 @@ class AsyncConnection : public Connection {
   void connect(const entity_addrvec_t& addrs, int type, entity_addr_t& target);
 
   // Only call when AsyncConnection first construct
-  void accept(ConnectedSocket socket, entity_addr_t &addr);
+  void accept(ConnectedSocket socket,
+             const entity_addr_t &listen_addr,
+             const entity_addr_t &peer_addr);
   int send_message(Message *m) override;
 
   void send_keepalive() override;
@@ -191,8 +193,8 @@ class AsyncConnection : public Connection {
 
   // Accepting state
   bool msgr2 = false;
-  entity_addr_t socket_addr;
-  entity_addr_t target_addr;  // which of the peer_addrs we're using
+  entity_addr_t socket_addr;  ///< local socket addr
+  entity_addr_t target_addr;  ///< which of the peer_addrs we're using
 
   // used only by "read_until"
   uint64_t state_offset;
index 062fbdb34ff8762268449561f33986d992f0cb4c..bcb74c4bd260cbc4f4d4861ff9f1f65b87b9f2ea 100644 (file)
@@ -88,9 +88,10 @@ int Processor::bind(const entity_addrvec_t &bind_addrs,
       }
 
       if (listen_addr.get_port()) {
-       worker->center.submit_to(worker->center.get_id(),
-                                [this, k, &listen_addr, &opts, &r]() {
-           r = worker->listen(listen_addr, opts, &listen_sockets[k]);
+       worker->center.submit_to(
+         worker->center.get_id(),
+         [this, k, &listen_addr, &opts, &r]() {
+           r = worker->listen(listen_addr, k, opts, &listen_sockets[k]);
          }, false);
        if (r < 0) {
          lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr
@@ -106,9 +107,10 @@ int Processor::bind(const entity_addrvec_t &bind_addrs,
            continue;
 
          listen_addr.set_port(port);
-         worker->center.submit_to(worker->center.get_id(),
-                                  [this, k, &listen_addr, &opts, &r]() {
-             r = worker->listen(listen_addr, opts, &listen_sockets[k]);
+         worker->center.submit_to(
+           worker->center.get_id(),
+           [this, k, &listen_addr, &opts, &r]() {
+             r = worker->listen(listen_addr, k, opts, &listen_sockets[k]);
            }, false);
          if (r == 0)
            break;
@@ -186,7 +188,10 @@ void Processor::accept()
        ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd "
                             << cli_socket.fd() << dendl;
 
-       msgr->add_accept(w, std::move(cli_socket), addr);
+       msgr->add_accept(
+         w, std::move(cli_socket),
+         msgr->get_myaddrs().v[listen_socket.get_addr_slot()],
+         addr);
        accept_error_num = 0;
        continue;
       } else {
@@ -549,12 +554,14 @@ void AsyncMessenger::wait()
   started = false;
 }
 
-void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr)
+void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket,
+                               const entity_addr_t &listen_addr,
+                               const entity_addr_t &peer_addr)
 {
   lock.Lock();
   AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w,
-                                               addr.is_msgr2(), false);
-  conn->accept(std::move(cli_socket), addr);
+                                               listen_addr.is_msgr2(), false);
+  conn->accept(std::move(cli_socket), listen_addr, peer_addr);
   accepting_conns.insert(conn);
   lock.Unlock();
 }
index 3f2333b9500393608c6202f18ad93c09ed97d60e..b3a826a92c433e6fcf69d07febaddf5844b82161 100644 (file)
@@ -373,7 +373,9 @@ public:
   }
 
   void learned_addr(const entity_addr_t &peer_addr_for_me);
-  void add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr);
+  void add_accept(Worker *w, ConnectedSocket cli_socket,
+                 const entity_addr_t &listen_addr,
+                 const entity_addr_t &peer_addr);
   NetworkStack *get_stack() {
     return stack;
   }
index 9411a31de311c38359e7da1ba87a065326cb087c..0b3f678390ed85ef0a717047f3662a0e71d5ce82 100644 (file)
@@ -170,8 +170,9 @@ class PosixServerSocketImpl : public ServerSocketImpl {
   int _fd;
 
  public:
-  explicit PosixServerSocketImpl(NetHandler &h, int f, int type)
-    : ServerSocketImpl(type),
+  explicit PosixServerSocketImpl(NetHandler &h, int f,
+                                const entity_addr_t& listen_addr, unsigned slot)
+    : ServerSocketImpl(listen_addr.get_type(), slot),
       handler(h), _fd(f) {}
   int accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
   void abort_accept() override {
@@ -218,7 +219,9 @@ void PosixWorker::initialize()
 {
 }
 
-int PosixWorker::listen(entity_addr_t &sa, const SocketOptions &opt,
+int PosixWorker::listen(entity_addr_t &sa,
+                       unsigned addr_slot,
+                       const SocketOptions &opt,
                         ServerSocket *sock)
 {
   int listen_sd = net.create_socket(sa.get_family(), true);
@@ -257,7 +260,7 @@ int PosixWorker::listen(entity_addr_t &sa, const SocketOptions &opt,
 
   *sock = ServerSocket(
           std::unique_ptr<PosixServerSocketImpl>(
-           new PosixServerSocketImpl(net, listen_sd, sa.get_type())));
+           new PosixServerSocketImpl(net, listen_sd, sa, addr_slot)));
   return 0;
 }
 
index e70fa650b0e35c8d20f8b1a93a4ae9dfc51e719e..bc55c4d9eb6b6f4cf84988ad8cf8a1fae23a098c 100644 (file)
@@ -30,8 +30,10 @@ class PosixWorker : public Worker {
  public:
   PosixWorker(CephContext *c, unsigned i)
       : Worker(c, i), net(c) {}
-  int listen(entity_addr_t &sa, const SocketOptions &opt,
-                     ServerSocket *socks) override;
+  int listen(entity_addr_t &sa,
+            unsigned addr_slot,
+            const SocketOptions &opt,
+            ServerSocket *socks) override;
   int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override;
 };
 
index 5e202412502bcdf344dd4d1baca2a781c8c9f20c..0585fabeb72ff155b0d72fdc779b7a7a690653ed 100644 (file)
@@ -1727,10 +1727,13 @@ CtPtr ProtocolV1::send_server_banner() {
   auto legacy = messenger->get_myaddrs().legacy_addr();
   encode(legacy, bl, 0);  // legacy
   connection->port = legacy.get_port();
-  encode(connection->socket_addr, bl, 0);  // legacy
+  encode(connection->target_addr, bl, 0);  // legacy
 
-  ldout(cct, 1) << __func__ << " sd=" << connection->cs.fd() << " "
-                << connection->socket_addr << dendl;
+  ldout(cct, 1) << __func__ << " sd=" << connection->cs.fd()
+               << " legacy " << legacy
+               << " socket_addr " << connection->socket_addr
+               << " target_addr " << connection->target_addr
+               << dendl;
 
   return WRITE(bl, handle_server_banner_write);
 }
@@ -1785,7 +1788,7 @@ CtPtr ProtocolV1::handle_client_banner(char *buffer, int r) {
   if (peer_addr.is_blank_ip()) {
     // peer apparently doesn't know what ip they have; figure it out for them.
     int port = peer_addr.get_port();
-    peer_addr.u = connection->socket_addr.u;
+    peer_addr.u = connection->target_addr.u;
     peer_addr.set_port(port);
 
     ldout(cct, 0) << __func__ << " accept peer addr is really " << peer_addr
index 32f9a5b2ee02f57a34de7dfa00c16168d0f8b8ba..bccdaab5e65e603b2457c429eddca710217f8efb 100644 (file)
@@ -47,8 +47,10 @@ struct SocketOptions {
 /// \cond internal
 class ServerSocketImpl {
  public:
-  int addr_type = 0;
-  ServerSocketImpl(int t) : addr_type(t) {}
+  unsigned addr_type; ///< entity_addr_t::TYPE_*
+  unsigned addr_slot; ///< position of our addr in myaddrs().v
+  ServerSocketImpl(unsigned type, unsigned slot)
+    : addr_type(type), addr_slot(slot) {}
   virtual ~ServerSocketImpl() {}
   virtual int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) = 0;
   virtual void abort_accept() = 0;
@@ -178,6 +180,11 @@ class ServerSocket {
     return _ssi->fd();
   }
 
+  /// get listen/bind addr
+  unsigned get_addr_slot() {
+    return _ssi->addr_slot;
+  }
+
   explicit operator bool() const {
     return _ssi.get();
   }
@@ -250,7 +257,7 @@ class Worker {
     }
   }
 
-  virtual int listen(entity_addr_t &addr,
+  virtual int listen(entity_addr_t &addr, unsigned addr_slot,
                      const SocketOptions &opts, ServerSocket *) = 0;
   virtual int connect(const entity_addr_t &addr,
                       const SocketOptions &opts, ConnectedSocket *socket) = 0;
index f0f82f53f93b408a3fa386752bed33388348a476..7ffb8fbe56a5178aea08db805cf32e99e9bfede3 100644 (file)
@@ -7,13 +7,15 @@
 #undef dout_prefix
 #define dout_prefix *_dout << " RDMAIWARPServerSocketImpl "
 
-RDMAIWARPServerSocketImpl::RDMAIWARPServerSocketImpl(CephContext *cct, Infiniband* i,
-    RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a)
-  : RDMAServerSocketImpl(cct, i, s, w, a)
+RDMAIWARPServerSocketImpl::RDMAIWARPServerSocketImpl(
+  CephContext *cct, Infiniband* i,
+  RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a, unsigned addr_slot)
+  : RDMAServerSocketImpl(cct, i, s, w, a, addr_slot)
 {
 }
 
-int RDMAIWARPServerSocketImpl::listen(entity_addr_t &sa, const SocketOptions &opt)
+int RDMAIWARPServerSocketImpl::listen(entity_addr_t &sa,
+                                     const SocketOptions &opt)
 {
   ldout(cct, 20) << __func__ << " bind to rdma point" << dendl;
   cm_channel = rdma_create_event_channel();
index 1bdf5e716588b2694c80843fb186ad40c8d460c1..98402cfd35469a2a852819ce3ab26ac7167e93d1 100644 (file)
@@ -26,8 +26,8 @@
 
 RDMAServerSocketImpl::RDMAServerSocketImpl(
   CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w,
-  entity_addr_t& a)
-  : ServerSocketImpl(a.get_type()),
+  entity_addr_t& a, unsigned slot)
+  : ServerSocketImpl(a.get_type(), slot),
     cct(cct), net(cct), server_setup_socket(-1), infiniband(i),
     dispatcher(s), worker(w), sa(a)
 {
index 9c05f23be9d4698592df824a9d362e9224430ed2..d8e2a4957cb3b9c15c4b93f20fcbbcaba6208648 100644 (file)
@@ -483,15 +483,20 @@ void RDMAWorker::initialize()
   }
 }
 
-int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
+int RDMAWorker::listen(entity_addr_t &sa, unsigned addr_slot,
+                      const SocketOptions &opt,ServerSocket *sock)
 {
   get_stack()->get_infiniband().init();
   dispatcher->polling_start();
   RDMAServerSocketImpl *p;
   if (cct->_conf->ms_async_rdma_type == "iwarp") {
-    p = new RDMAIWARPServerSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this, sa);
+    p = new RDMAIWARPServerSocketImpl(
+      cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this,
+      sa, addr_slot);
   } else {
-    p = new RDMAServerSocketImpl(cct, &get_stack()->get_infiniband(), &get_stack()->get_dispatcher(), this, sa);
+    p = new RDMAServerSocketImpl(cct, &get_stack()->get_infiniband(),
+                                &get_stack()->get_dispatcher(), this, sa,
+                                addr_slot);
   }
   int r = p->listen(sa, opt);
   if (r < 0) {
index f363d2fefed041efaafa351262c025969f06e876..c10a5389bcf3af6fae290fbfd6429db1a62ec617 100644 (file)
@@ -147,7 +147,9 @@ class RDMAWorker : public Worker {
   PerfCounters *perf_logger;
   explicit RDMAWorker(CephContext *c, unsigned i);
   virtual ~RDMAWorker();
-  virtual int listen(entity_addr_t &addr, const SocketOptions &opts, ServerSocket *) override;
+  virtual int listen(entity_addr_t &addr,
+                    unsigned addr_slot,
+                    const SocketOptions &opts, ServerSocket *) override;
   virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override;
   virtual void initialize() override;
   RDMAStack *get_stack() { return stack; }
@@ -309,7 +311,7 @@ class RDMAServerSocketImpl : public ServerSocketImpl {
 
  public:
   RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s,
-      RDMAWorker *w, entity_addr_t& a);
+                      RDMAWorker *w, entity_addr_t& a, unsigned slot);
 
   virtual int listen(entity_addr_t &sa, const SocketOptions &opt);
   virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
@@ -320,7 +322,9 @@ class RDMAServerSocketImpl : public ServerSocketImpl {
 
 class RDMAIWARPServerSocketImpl : public RDMAServerSocketImpl {
   public:
-    RDMAIWARPServerSocketImpl(CephContext *cct, Infiniband *i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a);
+    RDMAIWARPServerSocketImpl(
+      CephContext *cct, Infiniband *i, RDMADispatcher *s, RDMAWorker *w,
+      entity_addr_t& addr, unsigned addr_slot);
     virtual int listen(entity_addr_t &sa, const SocketOptions &opt) override;
     virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
     virtual void abort_accept() override;
index 921539b1f9bd58ba3f684ccdf0699a29217ba2d1..88a196b05b6fe89e0c15e5bdcd665b5282e3d7e4 100644 (file)
@@ -185,7 +185,7 @@ TEST_P(NetworkWorkerTest, SimpleTest) {
     EventCenter *center = &worker->center;
     ssize_t r = 0;
     if (stack->support_local_listen_table() || worker->id == 0)
-      r = worker->listen(bind_addr, options, &bind_socket);
+      r = worker->listen(bind_addr, 0, options, &bind_socket);
     ASSERT_EQ(0, r);
 
     ConnectedSocket cli_socket, srv_socket;
@@ -287,7 +287,7 @@ TEST_P(NetworkWorkerTest, ConnectFailedTest) {
     ServerSocket bind_socket;
     int r = 0;
     if (stack->support_local_listen_table() || worker->id == 0)
-      r = worker->listen(bind_addr, options, &bind_socket);
+      r = worker->listen(bind_addr, 0, options, &bind_socket);
     ASSERT_EQ(0, r);
 
     ConnectedSocket cli_socket1, cli_socket2;
@@ -328,10 +328,10 @@ TEST_P(NetworkWorkerTest, ListenTest) {
   ASSERT_TRUE(bind_addr.parse(get_addr().c_str()));
   SocketOptions options;
   ServerSocket bind_socket1, bind_socket2;
-  int r = worker->listen(bind_addr, options, &bind_socket1);
+  int r = worker->listen(bind_addr, 0, options, &bind_socket1);
   ASSERT_EQ(0, r);
 
-  r = worker->listen(bind_addr, options, &bind_socket2);
+  r = worker->listen(bind_addr, 0, options, &bind_socket2);
   ASSERT_EQ(-EADDRINUSE, r);
 }
 
@@ -350,7 +350,7 @@ TEST_P(NetworkWorkerTest, AcceptAndCloseTest) {
     {
       ServerSocket bind_socket;
       if (stack->support_local_listen_table() || worker->id == 0)
-        r = worker->listen(bind_addr, options, &bind_socket);
+        r = worker->listen(bind_addr, 0, options, &bind_socket);
       ASSERT_EQ(0, r);
 
       ConnectedSocket srv_socket, cli_socket;
@@ -457,7 +457,7 @@ TEST_P(NetworkWorkerTest, ComplexTest) {
     ServerSocket bind_socket;
     int r = 0;
     if (stack->support_local_listen_table() || worker->id == 0) {
-      r = worker->listen(bind_addr, options, &bind_socket);
+      r = worker->listen(bind_addr, 0, options, &bind_socket);
       ASSERT_EQ(0, r);
       *listen_p = true;
     }
@@ -1010,7 +1010,7 @@ class StressFactory {
     t_data.worker = worker;
     ServerSocket bind_socket;
     if (stack->support_local_listen_table() || worker->id == 0) {
-      r = worker->listen(bind_addr, options, &bind_socket);
+      r = worker->listen(bind_addr, 0, options, &bind_socket);
       ASSERT_EQ(0, r);
       already_bind = true;
     }