]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
test/crimson: configure seastar to accept on a fixed core
authorYingxin Cheng <yingxin.cheng@intel.com>
Sun, 19 Jan 2020 07:07:51 +0000 (15:07 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 12 Feb 2020 02:46:44 +0000 (10:46 +0800)
Adopt FixedCPUServerSocket and don't move sockets after
connected/accepted.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/test/crimson/test_socket.cc

index ec306700bd7fda69e8e6b643563d00f77d6793f5..29dd2cdb37bc817aee002544f6b89e5fde7f90ec 100644 (file)
 
 namespace {
 
+using seastar::engine;
 using seastar::future;
 using crimson::net::error;
+using crimson::net::FixedCPUServerSocket;
 using crimson::net::Socket;
 using crimson::net::SocketFRef;
+using crimson::net::SocketRef;
 using crimson::net::stop_t;
 
 static seastar::logger logger{"crimsontest"};
-
-template <typename ConcreteService>
-class SocketFactoryBase
-    : public seastar::peering_sharded_service<ConcreteService> {
-  static constexpr const char* server_addr = "127.0.0.1:9020";
-
-  seastar::gate shutdown_gate;
-  std::optional<seastar::server_socket> listener;
-
- public:
-  virtual ~SocketFactoryBase() = default;
-
-  virtual future<> bind_accept() {
-    return this->container().invoke_on_all([] (auto& factory) {
-      entity_addr_t addr;
-      addr.parse(server_addr, nullptr);
-      seastar::socket_address s_addr(addr.in4_addr());
-      seastar::listen_options lo;
-      lo.reuse_address = true;
-      factory.listener = seastar::listen(s_addr, lo);
-    }).then([this] {
-      return this->container().invoke_on_all([] (auto& factory) {
-        // gate accepting
-        // SocketFactoryBase::shutdown() will drain the continuations in the gate
-        // so ignore the returned future
-        std::ignore = seastar::with_gate(factory.shutdown_gate, [&factory] {
-          return seastar::keep_doing([&factory] {
-            return Socket::accept(*factory.listener).then(
-              [&factory] (SocketFRef socket,
-                         entity_addr_t peer_addr) {
-              // gate socket dispatching
-              std::ignore = seastar::with_gate(factory.shutdown_gate,
-                  [&factory, socket = std::move(socket)] () mutable {
-                return factory.handle_server_socket(std::move(socket))
-                .handle_exception([] (auto eptr) {
-                  logger.error("handle_server_socket():"
-                               "got unexpected exception {}", eptr);
-                  ceph_abort();
-                });
-              });
-            });
-          }).handle_exception_type([] (const std::system_error& e) {
-            if (e.code() != error::connection_aborted &&
-                e.code() != error::invalid_argument) {
-              logger.error("accepting: got unexpected error {}", e);
-              ceph_abort();
-            }
-            // successful
-          }).handle_exception([] (auto eptr) {
-            logger.error("accepting: got unexpected exception {}", eptr);
-            ceph_abort();
-          });
-        });
-      });
-    });
-  }
-
-  future<> shutdown() {
-    return this->container().invoke_on_all([] (auto& factory) {
-      if (factory.listener) {
-        factory.listener.value().abort_accept();
-      }
-      return factory.shutdown_gate.close();
-    });
-  }
-
-  future<> stop() { return seastar::now(); }
-
-  static future<SocketFRef> connect() {
-    entity_addr_t addr;
-    addr.parse(server_addr, nullptr);
-    return Socket::connect(addr);
-  }
-
- protected:
-  virtual future<> handle_server_socket(SocketFRef&& socket) = 0;
-};
-
-class AcceptTest final
-    : public SocketFactoryBase<AcceptTest> {
- public:
-  future<> handle_server_socket(SocketFRef&& socket) override {
-    return seastar::sleep(100ms
-    ).then([socket = std::move(socket)] () mutable {
-      return socket->close()
-      .finally([socket = std::move(socket)] {});
-    });
-  }
-};
+static entity_addr_t server_addr = [] {
+  entity_addr_t saddr;
+  saddr.parse("127.0.0.1:9020", nullptr);
+  return saddr;
+} ();
+
+future<SocketRef> socket_connect() {
+  logger.debug("socket_connect()...");
+  return Socket::connect(server_addr).then([] (auto socket) {
+    logger.debug("socket_connect() connected");
+    return socket.release();
+  });
+}
 
 future<> test_refused() {
   logger.info("test_refused()...");
-  return AcceptTest::connect().discard_result(
-  ).then([] {
+  return socket_connect().discard_result().then([] {
     ceph_abort_msg("connection is not refused");
   }).handle_exception_type([] (const std::system_error& e) {
     if (e.code() != error::connection_refused) {
       logger.error("test_refused() got unexpeted error {}", e);
       ceph_abort();
+    } else {
+      logger.info("test_refused() ok\n");
     }
-    // successful
   }).handle_exception([] (auto eptr) {
     logger.error("test_refused() got unexpeted exception {}", eptr);
     ceph_abort();
@@ -129,27 +56,28 @@ future<> test_refused() {
 
 future<> test_bind_same() {
   logger.info("test_bind_same()...");
-  return crimson::net::create_sharded<AcceptTest>().then(
-    [] (AcceptTest* factory) {
-    return factory->bind_accept().then([] {
+  return FixedCPUServerSocket::create().then([] (auto pss1) {
+    return pss1->listen(server_addr).then([] {
       // try to bind the same address
-      return crimson::net::create_sharded<AcceptTest>().then(
-        [] (AcceptTest* factory2) {
-        return factory2->bind_accept().then([] {
-          ceph_abort_msg("bind should raise addr-in-use");
-         return seastar::now();
-        }).finally([factory2] {
-          return factory2->shutdown();
+      return FixedCPUServerSocket::create().then([] (auto pss2) {
+        return pss2->listen(server_addr).then([] {
+          ceph_abort("Should raise address_in_use!");
+        }).handle_exception_type([] (const std::system_error& e) {
+          assert(e.code() == std::errc::address_in_use);
+          // successful!
+        }).finally([pss2] {
+          return pss2->destroy();
         }).handle_exception_type([] (const std::system_error& e) {
           if (e.code() != error::address_in_use) {
             logger.error("test_bind_same() got unexpeted error {}", e);
             ceph_abort();
+          } else {
+            logger.info("test_bind_same() ok\n");
           }
-          // successful
         });
       });
-    }).finally([factory] {
-      return factory->shutdown();
+    }).finally([pss1] {
+      return pss1->destroy();
     }).handle_exception([] (auto eptr) {
       logger.error("test_bind_same() got unexpeted exception {}", eptr);
       ceph_abort();
@@ -159,22 +87,30 @@ future<> test_bind_same() {
 
 future<> test_accept() {
   logger.info("test_accept()");
-  return crimson::net::create_sharded<AcceptTest>(
-  ).then([] (AcceptTest* factory) {
-    return factory->bind_accept().then([factory] {
+  return FixedCPUServerSocket::create().then([] (auto pss) {
+    return pss->listen(server_addr).then([pss] {
+      return pss->accept([] (auto socket, auto paddr) {
+        // simple accept
+        return seastar::sleep(100ms).then([socket = std::move(socket)] () mutable {
+          return socket->close().finally([cleanup = std::move(socket)] {});
+        });
+      });
+    }).then([] {
       return seastar::when_all(
-        factory->connect().then([] (auto socket) {
+        socket_connect().then([] (auto socket) {
           return socket->close().finally([cleanup = std::move(socket)] {}); }),
-        factory->connect().then([] (auto socket) {
+        socket_connect().then([] (auto socket) {
           return socket->close().finally([cleanup = std::move(socket)] {}); }),
-        factory->connect().then([] (auto socket) {
+        socket_connect().then([] (auto socket) {
           return socket->close().finally([cleanup = std::move(socket)] {}); })
       ).discard_result();
     }).then([] {
       // should be enough to be connected locally
       return seastar::sleep(50ms);
-    }).finally([factory] {
-      return factory->shutdown();
+    }).then([] {
+      logger.info("test_accept() ok\n");
+    }).finally([pss] {
+      return pss->destroy();
     }).handle_exception([] (auto eptr) {
       logger.error("test_accept() got unexpeted exception {}", eptr);
       ceph_abort();
@@ -182,44 +118,78 @@ future<> test_accept() {
   });
 }
 
-class SocketFactory final
-    : public SocketFactoryBase<SocketFactory> {
-  const seastar::shard_id target_shard;
-  seastar::promise<SocketFRef> socket_promise;
-
-  future<> bind_accept() override {
-    return SocketFactoryBase<SocketFactory>::bind_accept();
-  }
-
-  future<SocketFRef> get_accepted() {
-    return socket_promise.get_future();
-  }
+class SocketFactory {
+  SocketRef client_socket;
+  SocketFRef server_socket;
+  FixedCPUServerSocket *pss = nullptr;
+  seastar::promise<> server_connected;
 
  public:
-  SocketFactory(seastar::shard_id shard) : target_shard{shard} {}
-
-  future<> handle_server_socket(SocketFRef&& socket) override {
-    return container().invoke_on(target_shard,
-        [socket = std::move(socket)] (auto& factory) mutable {
-      factory.socket_promise.set_value(std::move(socket));
-    });
-  }
-
-  static future<tuple<SocketFRef, SocketFRef>> get_sockets() {
-    return crimson::net::create_sharded<SocketFactory>(seastar::engine().cpu_id()
-    ).then([] (SocketFactory* factory) {
-      return factory->bind_accept().then([factory] {
-        return connect();
-      }).then([factory] (auto fp_client_socket) {
-        return factory->get_accepted().then(
-          [fp_client_socket = std::move(fp_client_socket)](auto fp_server_socket) mutable {
-         return seastar::make_ready_future<tuple<SocketFRef, SocketFRef>>(
-            std::make_tuple(std::move(fp_client_socket), std::move(fp_server_socket)));
-        });
-      }).finally([factory] {
-        return factory->shutdown();
+  // cb_client() on CPU#0, cb_server() on CPU#1
+  template <typename FuncC, typename FuncS>
+  static future<> dispatch_sockets(FuncC&& cb_client, FuncS&& cb_server) {
+    assert(engine().cpu_id() == 0u);
+    auto owner = std::make_unique<SocketFactory>();
+    auto psf = owner.get();
+    return seastar::smp::submit_to(1u, [psf] {
+      return FixedCPUServerSocket::create().then([psf] (auto pss) {
+        psf->pss = pss;
+        return pss->listen(server_addr);
       });
-    });
+    }).then([psf] {
+      return seastar::when_all_succeed(
+        seastar::smp::submit_to(0u, [psf] {
+          return socket_connect().then([psf] (auto socket) {
+            psf->client_socket = std::move(socket);
+          });
+        }),
+        seastar::smp::submit_to(1u, [psf] {
+          return psf->pss->accept([psf] (auto socket, auto paddr) {
+            psf->server_socket = seastar::make_foreign(std::move(socket));
+            return seastar::smp::submit_to(0u, [psf] {
+              psf->server_connected.set_value();
+            });
+          });
+        })
+      );
+    }).then([psf] {
+      return psf->server_connected.get_future();
+    }).finally([psf] {
+      if (psf->pss) {
+        return seastar::smp::submit_to(1u, [psf] {
+          return psf->pss->destroy();
+        });
+      }
+      return seastar::now();
+    }).then([psf,
+             cb_client = std::move(cb_client),
+             cb_server = std::move(cb_server)] () mutable {
+      logger.debug("dispatch_sockets(): client/server socket are ready");
+      return seastar::when_all_succeed(
+        seastar::smp::submit_to(0u, [socket = psf->client_socket.get(),
+                                     cb_client = std::move(cb_client)] {
+          return cb_client(socket).finally([socket] {
+            logger.debug("closing client socket...");
+            return socket->close();
+          }).handle_exception([] (auto eptr) {
+            logger.error("dispatch_sockets():"
+                " cb_client() got unexpeted exception {}", eptr);
+            ceph_abort();
+          });
+        }),
+        seastar::smp::submit_to(1u, [socket = psf->server_socket.get(),
+                                     cb_server = std::move(cb_server)] {
+          return cb_server(socket).finally([socket] {
+            logger.debug("closing server socket...");
+            return socket->close();
+          }).handle_exception([] (auto eptr) {
+            logger.error("dispatch_sockets():"
+                " cb_server() got unexpeted exception {}", eptr);
+            ceph_abort();
+          });
+        })
+      );
+    }).finally([cleanup = std::move(owner)] {});
   }
 };
 
@@ -233,20 +203,25 @@ class Connection {
     ceph_assert(data[DATA_SIZE - 1] = DATA_TAIL);
   }
 
-  SocketFRef socket;
+  Socket* socket = nullptr;
   uint64_t write_count = 0;
   uint64_t read_count = 0;
 
-  Connection(SocketFRef&& socket) : socket{std::move(socket)} {
+  Connection(Socket* socket) : socket{socket} {
+    assert(socket);
     data[DATA_SIZE - 1] = DATA_TAIL;
   }
 
   future<> dispatch_write(unsigned round = 0, bool force_shut = false) {
+    logger.debug("dispatch_write(round={}, force_shut={})...", round, force_shut);
     return seastar::repeat([this, round, force_shut] {
       if (round != 0 && round <= write_count) {
         return seastar::futurize_apply([this, force_shut] {
           if (force_shut) {
+            logger.debug("dispatch_write() done, force shutdown output");
             socket->force_shutdown_out();
+          } else {
+            logger.debug("dispatch_write() done");
           }
         }).then([] {
           return seastar::make_ready_future<stop_t>(stop_t::yes);
@@ -269,7 +244,7 @@ class Connection {
     return dispatch_write(
     ).then([] {
       ceph_abort();
-    }).handle_exception_type([] (const std::system_error& e) {
+    }).handle_exception_type([this] (const std::system_error& e) {
       if (e.code() != error::broken_pipe &&
           e.code() != error::connection_reset) {
         logger.error("dispatch_write_unbounded(): "
@@ -279,15 +254,20 @@ class Connection {
       // successful
       logger.debug("dispatch_write_unbounded(): "
                    "expected error {}", e);
+      shutdown();
     });
   }
 
   future<> dispatch_read(unsigned round = 0, bool force_shut = false) {
+    logger.debug("dispatch_read(round={}, force_shut={})...", round, force_shut);
     return seastar::repeat([this, round, force_shut] {
       if (round != 0 && round <= read_count) {
         return seastar::futurize_apply([this, force_shut] {
           if (force_shut) {
+            logger.debug("dispatch_read() done, force shutdown input");
             socket->force_shutdown_in();
+          } else {
+            logger.debug("dispatch_read() done");
           }
         }).then([] {
           return seastar::make_ready_future<stop_t>(stop_t::yes);
@@ -322,7 +302,7 @@ class Connection {
     return dispatch_read(
     ).then([] {
       ceph_abort();
-    }).handle_exception_type([] (const std::system_error& e) {
+    }).handle_exception_type([this] (const std::system_error& e) {
       if (e.code() != error::read_eof
        && e.code() != error::connection_reset) {
         logger.error("dispatch_read_unbounded(): "
@@ -332,6 +312,7 @@ class Connection {
       // successful
       logger.debug("dispatch_read_unbounded(): "
                    "expected error {}", e);
+      shutdown();
     });
   }
 
@@ -339,68 +320,49 @@ class Connection {
     socket->shutdown();
   }
 
-  future<> close() {
-    return socket->close();
-  }
-
  public:
-  static future<> dispatch_rw_bounded(SocketFRef&& socket, bool is_client,
-                                      unsigned round, bool force_shut = false) {
-    return seastar::smp::submit_to(is_client ? 0 : 1,
-        [socket = std::move(socket), round, force_shut] () mutable {
-      return seastar::do_with(Connection{std::move(socket)},
-                              [round, force_shut] (auto& conn) {
-        ceph_assert(round != 0);
-        return seastar::when_all_succeed(
-          conn.dispatch_write(round, force_shut),
-          conn.dispatch_read(round, force_shut)
-        ).finally([&conn] {
-          return conn.close();
-        });
-      });
-    }).handle_exception([is_client] (auto eptr) {
-      logger.error("dispatch_rw_bounded(): {} got unexpected exception {}",
-                   is_client ? "client" : "server", eptr);
-      ceph_abort();
+  static future<> dispatch_rw_bounded(Socket* socket, unsigned round,
+                                      bool force_shut = false) {
+    logger.debug("dispatch_rw_bounded(round={}, force_shut={})...",
+                 round, force_shut);
+    return seastar::do_with(Connection{socket},
+                            [round, force_shut] (auto& conn) {
+      ceph_assert(round != 0);
+      return seastar::when_all_succeed(
+        conn.dispatch_write(round, force_shut),
+        conn.dispatch_read(round, force_shut)
+      );
     });
   }
 
-  static future<> dispatch_rw_unbounded(SocketFRef&& socket, bool is_client,
-                                        bool preemptive_shut = false) {
-    return seastar::smp::submit_to(is_client ? 0 : 1,
-        [socket = std::move(socket), preemptive_shut, is_client] () mutable {
-      return seastar::do_with(Connection{std::move(socket)},
-                              [preemptive_shut, is_client] (auto& conn) {
-        return seastar::when_all_succeed(
-          conn.dispatch_write_unbounded(),
-          conn.dispatch_read_unbounded(),
-          seastar::futurize_apply([&conn, preemptive_shut] {
-            if (preemptive_shut) {
-              return seastar::sleep(100ms).then([&conn] { conn.shutdown(); });
-            } else {
-              return seastar::now();
-            }
-          })
-        ).finally([&conn] {
-          return conn.close();
-        });
-      });
-    }).handle_exception([is_client] (auto eptr) {
-      logger.error("dispatch_rw_unbounded(): {} got unexpected exception {}",
-                   is_client ? "client" : "server", eptr);
-      ceph_abort();
+  static future<> dispatch_rw_unbounded(Socket* socket, bool preemptive_shut = false) {
+    logger.debug("dispatch_rw_unbounded(preemptive_shut={})...", preemptive_shut);
+    return seastar::do_with(Connection{socket}, [preemptive_shut] (auto& conn) {
+      return seastar::when_all_succeed(
+        conn.dispatch_write_unbounded(),
+        conn.dispatch_read_unbounded(),
+        seastar::futurize_apply([&conn, preemptive_shut] {
+          if (preemptive_shut) {
+            return seastar::sleep(100ms).then([&conn] {
+              logger.debug("dispatch_rw_unbounded() shutdown socket preemptively(100ms)");
+              conn.shutdown();
+            });
+          } else {
+            return seastar::now();
+          }
+        })
+      );
     });
   }
 };
 
 future<> test_read_write() {
   logger.info("test_read_write()...");
-  return SocketFactory::get_sockets().then([] (auto sockets) {
-    auto [client_socket, server_socket] = std::move(sockets);
-    return seastar::when_all_succeed(
-      Connection::dispatch_rw_bounded(std::move(client_socket), true, 128),
-      Connection::dispatch_rw_bounded(std::move(server_socket), false, 128)
-    );
+  return SocketFactory::dispatch_sockets(
+    [] (auto cs) { return Connection::dispatch_rw_bounded(cs, 128); },
+    [] (auto ss) { return Connection::dispatch_rw_bounded(ss, 128); }
+  ).then([] {
+    logger.info("test_read_write() ok\n");
   }).handle_exception([] (auto eptr) {
     logger.error("test_read_write() got unexpeted exception {}", eptr);
     ceph_abort();
@@ -409,12 +371,11 @@ future<> test_read_write() {
 
 future<> test_unexpected_down() {
   logger.info("test_unexpected_down()...");
-  return SocketFactory::get_sockets().then([] (auto sockets) {
-    auto [client_socket, server_socket] = std::move(sockets);
-    return seastar::when_all_succeed(
-      Connection::dispatch_rw_bounded(std::move(client_socket), true, 128, true),
-      Connection::dispatch_rw_unbounded(std::move(server_socket), false)
-    );
+  return SocketFactory::dispatch_sockets(
+    [] (auto cs) { return Connection::dispatch_rw_bounded(cs, 128, true); },
+    [] (auto ss) { return Connection::dispatch_rw_unbounded(ss); }
+  ).then([] {
+    logger.info("test_unexpected_down() ok\n");
   }).handle_exception([] (auto eptr) {
     logger.error("test_unexpected_down() got unexpeted exception {}", eptr);
     ceph_abort();
@@ -423,14 +384,15 @@ future<> test_unexpected_down() {
 
 future<> test_shutdown_propagated() {
   logger.info("test_shutdown_propagated()...");
-  return SocketFactory::get_sockets().then([] (auto sockets) {
-    auto [client_socket, server_socket] = std::move(sockets);
-    client_socket->shutdown();
-    return Connection::dispatch_rw_unbounded(std::move(server_socket), false
-    ).finally([client_socket = std::move(client_socket)] () mutable {
-      return client_socket->close(
-      ).finally([cleanup = std::move(client_socket)] {});
-    });
+  return SocketFactory::dispatch_sockets(
+    [] (auto cs) {
+      logger.debug("test_shutdown_propagated() shutdown client socket");
+      cs->shutdown();
+      return seastar::now();
+    },
+    [] (auto ss) { return Connection::dispatch_rw_unbounded(ss); }
+  ).then([] {
+    logger.info("test_shutdown_propagated() ok\n");
   }).handle_exception([] (auto eptr) {
     logger.error("test_shutdown_propagated() got unexpeted exception {}", eptr);
     ceph_abort();
@@ -439,12 +401,11 @@ future<> test_shutdown_propagated() {
 
 future<> test_preemptive_down() {
   logger.info("test_preemptive_down()...");
-  return SocketFactory::get_sockets().then([] (auto sockets) {
-    auto [client_socket, server_socket] = std::move(sockets);
-    return seastar::when_all_succeed(
-      Connection::dispatch_rw_unbounded(std::move(client_socket), true, true),
-      Connection::dispatch_rw_unbounded(std::move(server_socket), false)
-    );
+  return SocketFactory::dispatch_sockets(
+    [] (auto cs) { return Connection::dispatch_rw_unbounded(cs, true); },
+    [] (auto ss) { return Connection::dispatch_rw_unbounded(ss); }
+  ).then([] {
+    logger.info("test_preemptive_down() ok\n");
   }).handle_exception([] (auto eptr) {
     logger.error("test_preemptive_down() got unexpeted exception {}", eptr);
     ceph_abort();
@@ -457,7 +418,9 @@ int main(int argc, char** argv)
 {
   seastar::app_template app;
   return app.run(argc, argv, [] {
-    return test_refused().then([] {
+    return seastar::futurize_apply([] {
+      return test_refused();
+    }).then([] {
       return test_bind_same();
     }).then([] {
       return test_accept();