From: Yingxin Cheng Date: Sun, 19 Jan 2020 07:07:51 +0000 (+0800) Subject: test/crimson: configure seastar to accept on a fixed core X-Git-Tag: v15.1.1~394^2~9 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=31b931d0e0650cd33f18797c4f2cc3cf074eb59c;p=ceph.git test/crimson: configure seastar to accept on a fixed core Adopt FixedCPUServerSocket and don't move sockets after connected/accepted. Signed-off-by: Yingxin Cheng --- diff --git a/src/test/crimson/test_socket.cc b/src/test/crimson/test_socket.cc index ec306700bd7..29dd2cdb37b 100644 --- a/src/test/crimson/test_socket.cc +++ b/src/test/crimson/test_socket.cc @@ -13,114 +13,41 @@ namespace { +using seastar::engine; using seastar::future; using crimson::net::error; +using crimson::net::FixedCPUServerSocket; using crimson::net::Socket; using crimson::net::SocketFRef; +using crimson::net::SocketRef; using crimson::net::stop_t; static seastar::logger logger{"crimsontest"}; - -template -class SocketFactoryBase - : public seastar::peering_sharded_service { - static constexpr const char* server_addr = "127.0.0.1:9020"; - - seastar::gate shutdown_gate; - std::optional listener; - - public: - virtual ~SocketFactoryBase() = default; - - virtual future<> bind_accept() { - return this->container().invoke_on_all([] (auto& factory) { - entity_addr_t addr; - addr.parse(server_addr, nullptr); - seastar::socket_address s_addr(addr.in4_addr()); - seastar::listen_options lo; - lo.reuse_address = true; - factory.listener = seastar::listen(s_addr, lo); - }).then([this] { - return this->container().invoke_on_all([] (auto& factory) { - // gate accepting - // SocketFactoryBase::shutdown() will drain the continuations in the gate - // so ignore the returned future - std::ignore = seastar::with_gate(factory.shutdown_gate, [&factory] { - return seastar::keep_doing([&factory] { - return Socket::accept(*factory.listener).then( - [&factory] (SocketFRef socket, - entity_addr_t peer_addr) { - // gate socket dispatching - std::ignore = seastar::with_gate(factory.shutdown_gate, - [&factory, socket = std::move(socket)] () mutable { - return factory.handle_server_socket(std::move(socket)) - .handle_exception([] (auto eptr) { - logger.error("handle_server_socket():" - "got unexpected exception {}", eptr); - ceph_abort(); - }); - }); - }); - }).handle_exception_type([] (const std::system_error& e) { - if (e.code() != error::connection_aborted && - e.code() != error::invalid_argument) { - logger.error("accepting: got unexpected error {}", e); - ceph_abort(); - } - // successful - }).handle_exception([] (auto eptr) { - logger.error("accepting: got unexpected exception {}", eptr); - ceph_abort(); - }); - }); - }); - }); - } - - future<> shutdown() { - return this->container().invoke_on_all([] (auto& factory) { - if (factory.listener) { - factory.listener.value().abort_accept(); - } - return factory.shutdown_gate.close(); - }); - } - - future<> stop() { return seastar::now(); } - - static future connect() { - entity_addr_t addr; - addr.parse(server_addr, nullptr); - return Socket::connect(addr); - } - - protected: - virtual future<> handle_server_socket(SocketFRef&& socket) = 0; -}; - -class AcceptTest final - : public SocketFactoryBase { - public: - future<> handle_server_socket(SocketFRef&& socket) override { - return seastar::sleep(100ms - ).then([socket = std::move(socket)] () mutable { - return socket->close() - .finally([socket = std::move(socket)] {}); - }); - } -}; +static entity_addr_t server_addr = [] { + entity_addr_t saddr; + saddr.parse("127.0.0.1:9020", nullptr); + return saddr; +} (); + +future socket_connect() { + logger.debug("socket_connect()..."); + return Socket::connect(server_addr).then([] (auto socket) { + logger.debug("socket_connect() connected"); + return socket.release(); + }); +} future<> test_refused() { logger.info("test_refused()..."); - return AcceptTest::connect().discard_result( - ).then([] { + return socket_connect().discard_result().then([] { ceph_abort_msg("connection is not refused"); }).handle_exception_type([] (const std::system_error& e) { if (e.code() != error::connection_refused) { logger.error("test_refused() got unexpeted error {}", e); ceph_abort(); + } else { + logger.info("test_refused() ok\n"); } - // successful }).handle_exception([] (auto eptr) { logger.error("test_refused() got unexpeted exception {}", eptr); ceph_abort(); @@ -129,27 +56,28 @@ future<> test_refused() { future<> test_bind_same() { logger.info("test_bind_same()..."); - return crimson::net::create_sharded().then( - [] (AcceptTest* factory) { - return factory->bind_accept().then([] { + return FixedCPUServerSocket::create().then([] (auto pss1) { + return pss1->listen(server_addr).then([] { // try to bind the same address - return crimson::net::create_sharded().then( - [] (AcceptTest* factory2) { - return factory2->bind_accept().then([] { - ceph_abort_msg("bind should raise addr-in-use"); - return seastar::now(); - }).finally([factory2] { - return factory2->shutdown(); + return FixedCPUServerSocket::create().then([] (auto pss2) { + return pss2->listen(server_addr).then([] { + ceph_abort("Should raise address_in_use!"); + }).handle_exception_type([] (const std::system_error& e) { + assert(e.code() == std::errc::address_in_use); + // successful! + }).finally([pss2] { + return pss2->destroy(); }).handle_exception_type([] (const std::system_error& e) { if (e.code() != error::address_in_use) { logger.error("test_bind_same() got unexpeted error {}", e); ceph_abort(); + } else { + logger.info("test_bind_same() ok\n"); } - // successful }); }); - }).finally([factory] { - return factory->shutdown(); + }).finally([pss1] { + return pss1->destroy(); }).handle_exception([] (auto eptr) { logger.error("test_bind_same() got unexpeted exception {}", eptr); ceph_abort(); @@ -159,22 +87,30 @@ future<> test_bind_same() { future<> test_accept() { logger.info("test_accept()"); - return crimson::net::create_sharded( - ).then([] (AcceptTest* factory) { - return factory->bind_accept().then([factory] { + return FixedCPUServerSocket::create().then([] (auto pss) { + return pss->listen(server_addr).then([pss] { + return pss->accept([] (auto socket, auto paddr) { + // simple accept + return seastar::sleep(100ms).then([socket = std::move(socket)] () mutable { + return socket->close().finally([cleanup = std::move(socket)] {}); + }); + }); + }).then([] { return seastar::when_all( - factory->connect().then([] (auto socket) { + socket_connect().then([] (auto socket) { return socket->close().finally([cleanup = std::move(socket)] {}); }), - factory->connect().then([] (auto socket) { + socket_connect().then([] (auto socket) { return socket->close().finally([cleanup = std::move(socket)] {}); }), - factory->connect().then([] (auto socket) { + socket_connect().then([] (auto socket) { return socket->close().finally([cleanup = std::move(socket)] {}); }) ).discard_result(); }).then([] { // should be enough to be connected locally return seastar::sleep(50ms); - }).finally([factory] { - return factory->shutdown(); + }).then([] { + logger.info("test_accept() ok\n"); + }).finally([pss] { + return pss->destroy(); }).handle_exception([] (auto eptr) { logger.error("test_accept() got unexpeted exception {}", eptr); ceph_abort(); @@ -182,44 +118,78 @@ future<> test_accept() { }); } -class SocketFactory final - : public SocketFactoryBase { - const seastar::shard_id target_shard; - seastar::promise socket_promise; - - future<> bind_accept() override { - return SocketFactoryBase::bind_accept(); - } - - future get_accepted() { - return socket_promise.get_future(); - } +class SocketFactory { + SocketRef client_socket; + SocketFRef server_socket; + FixedCPUServerSocket *pss = nullptr; + seastar::promise<> server_connected; public: - SocketFactory(seastar::shard_id shard) : target_shard{shard} {} - - future<> handle_server_socket(SocketFRef&& socket) override { - return container().invoke_on(target_shard, - [socket = std::move(socket)] (auto& factory) mutable { - factory.socket_promise.set_value(std::move(socket)); - }); - } - - static future> get_sockets() { - return crimson::net::create_sharded(seastar::engine().cpu_id() - ).then([] (SocketFactory* factory) { - return factory->bind_accept().then([factory] { - return connect(); - }).then([factory] (auto fp_client_socket) { - return factory->get_accepted().then( - [fp_client_socket = std::move(fp_client_socket)](auto fp_server_socket) mutable { - return seastar::make_ready_future>( - std::make_tuple(std::move(fp_client_socket), std::move(fp_server_socket))); - }); - }).finally([factory] { - return factory->shutdown(); + // cb_client() on CPU#0, cb_server() on CPU#1 + template + static future<> dispatch_sockets(FuncC&& cb_client, FuncS&& cb_server) { + assert(engine().cpu_id() == 0u); + auto owner = std::make_unique(); + auto psf = owner.get(); + return seastar::smp::submit_to(1u, [psf] { + return FixedCPUServerSocket::create().then([psf] (auto pss) { + psf->pss = pss; + return pss->listen(server_addr); }); - }); + }).then([psf] { + return seastar::when_all_succeed( + seastar::smp::submit_to(0u, [psf] { + return socket_connect().then([psf] (auto socket) { + psf->client_socket = std::move(socket); + }); + }), + seastar::smp::submit_to(1u, [psf] { + return psf->pss->accept([psf] (auto socket, auto paddr) { + psf->server_socket = seastar::make_foreign(std::move(socket)); + return seastar::smp::submit_to(0u, [psf] { + psf->server_connected.set_value(); + }); + }); + }) + ); + }).then([psf] { + return psf->server_connected.get_future(); + }).finally([psf] { + if (psf->pss) { + return seastar::smp::submit_to(1u, [psf] { + return psf->pss->destroy(); + }); + } + return seastar::now(); + }).then([psf, + cb_client = std::move(cb_client), + cb_server = std::move(cb_server)] () mutable { + logger.debug("dispatch_sockets(): client/server socket are ready"); + return seastar::when_all_succeed( + seastar::smp::submit_to(0u, [socket = psf->client_socket.get(), + cb_client = std::move(cb_client)] { + return cb_client(socket).finally([socket] { + logger.debug("closing client socket..."); + return socket->close(); + }).handle_exception([] (auto eptr) { + logger.error("dispatch_sockets():" + " cb_client() got unexpeted exception {}", eptr); + ceph_abort(); + }); + }), + seastar::smp::submit_to(1u, [socket = psf->server_socket.get(), + cb_server = std::move(cb_server)] { + return cb_server(socket).finally([socket] { + logger.debug("closing server socket..."); + return socket->close(); + }).handle_exception([] (auto eptr) { + logger.error("dispatch_sockets():" + " cb_server() got unexpeted exception {}", eptr); + ceph_abort(); + }); + }) + ); + }).finally([cleanup = std::move(owner)] {}); } }; @@ -233,20 +203,25 @@ class Connection { ceph_assert(data[DATA_SIZE - 1] = DATA_TAIL); } - SocketFRef socket; + Socket* socket = nullptr; uint64_t write_count = 0; uint64_t read_count = 0; - Connection(SocketFRef&& socket) : socket{std::move(socket)} { + Connection(Socket* socket) : socket{socket} { + assert(socket); data[DATA_SIZE - 1] = DATA_TAIL; } future<> dispatch_write(unsigned round = 0, bool force_shut = false) { + logger.debug("dispatch_write(round={}, force_shut={})...", round, force_shut); return seastar::repeat([this, round, force_shut] { if (round != 0 && round <= write_count) { return seastar::futurize_apply([this, force_shut] { if (force_shut) { + logger.debug("dispatch_write() done, force shutdown output"); socket->force_shutdown_out(); + } else { + logger.debug("dispatch_write() done"); } }).then([] { return seastar::make_ready_future(stop_t::yes); @@ -269,7 +244,7 @@ class Connection { return dispatch_write( ).then([] { ceph_abort(); - }).handle_exception_type([] (const std::system_error& e) { + }).handle_exception_type([this] (const std::system_error& e) { if (e.code() != error::broken_pipe && e.code() != error::connection_reset) { logger.error("dispatch_write_unbounded(): " @@ -279,15 +254,20 @@ class Connection { // successful logger.debug("dispatch_write_unbounded(): " "expected error {}", e); + shutdown(); }); } future<> dispatch_read(unsigned round = 0, bool force_shut = false) { + logger.debug("dispatch_read(round={}, force_shut={})...", round, force_shut); return seastar::repeat([this, round, force_shut] { if (round != 0 && round <= read_count) { return seastar::futurize_apply([this, force_shut] { if (force_shut) { + logger.debug("dispatch_read() done, force shutdown input"); socket->force_shutdown_in(); + } else { + logger.debug("dispatch_read() done"); } }).then([] { return seastar::make_ready_future(stop_t::yes); @@ -322,7 +302,7 @@ class Connection { return dispatch_read( ).then([] { ceph_abort(); - }).handle_exception_type([] (const std::system_error& e) { + }).handle_exception_type([this] (const std::system_error& e) { if (e.code() != error::read_eof && e.code() != error::connection_reset) { logger.error("dispatch_read_unbounded(): " @@ -332,6 +312,7 @@ class Connection { // successful logger.debug("dispatch_read_unbounded(): " "expected error {}", e); + shutdown(); }); } @@ -339,68 +320,49 @@ class Connection { socket->shutdown(); } - future<> close() { - return socket->close(); - } - public: - static future<> dispatch_rw_bounded(SocketFRef&& socket, bool is_client, - unsigned round, bool force_shut = false) { - return seastar::smp::submit_to(is_client ? 0 : 1, - [socket = std::move(socket), round, force_shut] () mutable { - return seastar::do_with(Connection{std::move(socket)}, - [round, force_shut] (auto& conn) { - ceph_assert(round != 0); - return seastar::when_all_succeed( - conn.dispatch_write(round, force_shut), - conn.dispatch_read(round, force_shut) - ).finally([&conn] { - return conn.close(); - }); - }); - }).handle_exception([is_client] (auto eptr) { - logger.error("dispatch_rw_bounded(): {} got unexpected exception {}", - is_client ? "client" : "server", eptr); - ceph_abort(); + static future<> dispatch_rw_bounded(Socket* socket, unsigned round, + bool force_shut = false) { + logger.debug("dispatch_rw_bounded(round={}, force_shut={})...", + round, force_shut); + return seastar::do_with(Connection{socket}, + [round, force_shut] (auto& conn) { + ceph_assert(round != 0); + return seastar::when_all_succeed( + conn.dispatch_write(round, force_shut), + conn.dispatch_read(round, force_shut) + ); }); } - static future<> dispatch_rw_unbounded(SocketFRef&& socket, bool is_client, - bool preemptive_shut = false) { - return seastar::smp::submit_to(is_client ? 0 : 1, - [socket = std::move(socket), preemptive_shut, is_client] () mutable { - return seastar::do_with(Connection{std::move(socket)}, - [preemptive_shut, is_client] (auto& conn) { - return seastar::when_all_succeed( - conn.dispatch_write_unbounded(), - conn.dispatch_read_unbounded(), - seastar::futurize_apply([&conn, preemptive_shut] { - if (preemptive_shut) { - return seastar::sleep(100ms).then([&conn] { conn.shutdown(); }); - } else { - return seastar::now(); - } - }) - ).finally([&conn] { - return conn.close(); - }); - }); - }).handle_exception([is_client] (auto eptr) { - logger.error("dispatch_rw_unbounded(): {} got unexpected exception {}", - is_client ? "client" : "server", eptr); - ceph_abort(); + static future<> dispatch_rw_unbounded(Socket* socket, bool preemptive_shut = false) { + logger.debug("dispatch_rw_unbounded(preemptive_shut={})...", preemptive_shut); + return seastar::do_with(Connection{socket}, [preemptive_shut] (auto& conn) { + return seastar::when_all_succeed( + conn.dispatch_write_unbounded(), + conn.dispatch_read_unbounded(), + seastar::futurize_apply([&conn, preemptive_shut] { + if (preemptive_shut) { + return seastar::sleep(100ms).then([&conn] { + logger.debug("dispatch_rw_unbounded() shutdown socket preemptively(100ms)"); + conn.shutdown(); + }); + } else { + return seastar::now(); + } + }) + ); }); } }; future<> test_read_write() { logger.info("test_read_write()..."); - return SocketFactory::get_sockets().then([] (auto sockets) { - auto [client_socket, server_socket] = std::move(sockets); - return seastar::when_all_succeed( - Connection::dispatch_rw_bounded(std::move(client_socket), true, 128), - Connection::dispatch_rw_bounded(std::move(server_socket), false, 128) - ); + return SocketFactory::dispatch_sockets( + [] (auto cs) { return Connection::dispatch_rw_bounded(cs, 128); }, + [] (auto ss) { return Connection::dispatch_rw_bounded(ss, 128); } + ).then([] { + logger.info("test_read_write() ok\n"); }).handle_exception([] (auto eptr) { logger.error("test_read_write() got unexpeted exception {}", eptr); ceph_abort(); @@ -409,12 +371,11 @@ future<> test_read_write() { future<> test_unexpected_down() { logger.info("test_unexpected_down()..."); - return SocketFactory::get_sockets().then([] (auto sockets) { - auto [client_socket, server_socket] = std::move(sockets); - return seastar::when_all_succeed( - Connection::dispatch_rw_bounded(std::move(client_socket), true, 128, true), - Connection::dispatch_rw_unbounded(std::move(server_socket), false) - ); + return SocketFactory::dispatch_sockets( + [] (auto cs) { return Connection::dispatch_rw_bounded(cs, 128, true); }, + [] (auto ss) { return Connection::dispatch_rw_unbounded(ss); } + ).then([] { + logger.info("test_unexpected_down() ok\n"); }).handle_exception([] (auto eptr) { logger.error("test_unexpected_down() got unexpeted exception {}", eptr); ceph_abort(); @@ -423,14 +384,15 @@ future<> test_unexpected_down() { future<> test_shutdown_propagated() { logger.info("test_shutdown_propagated()..."); - return SocketFactory::get_sockets().then([] (auto sockets) { - auto [client_socket, server_socket] = std::move(sockets); - client_socket->shutdown(); - return Connection::dispatch_rw_unbounded(std::move(server_socket), false - ).finally([client_socket = std::move(client_socket)] () mutable { - return client_socket->close( - ).finally([cleanup = std::move(client_socket)] {}); - }); + return SocketFactory::dispatch_sockets( + [] (auto cs) { + logger.debug("test_shutdown_propagated() shutdown client socket"); + cs->shutdown(); + return seastar::now(); + }, + [] (auto ss) { return Connection::dispatch_rw_unbounded(ss); } + ).then([] { + logger.info("test_shutdown_propagated() ok\n"); }).handle_exception([] (auto eptr) { logger.error("test_shutdown_propagated() got unexpeted exception {}", eptr); ceph_abort(); @@ -439,12 +401,11 @@ future<> test_shutdown_propagated() { future<> test_preemptive_down() { logger.info("test_preemptive_down()..."); - return SocketFactory::get_sockets().then([] (auto sockets) { - auto [client_socket, server_socket] = std::move(sockets); - return seastar::when_all_succeed( - Connection::dispatch_rw_unbounded(std::move(client_socket), true, true), - Connection::dispatch_rw_unbounded(std::move(server_socket), false) - ); + return SocketFactory::dispatch_sockets( + [] (auto cs) { return Connection::dispatch_rw_unbounded(cs, true); }, + [] (auto ss) { return Connection::dispatch_rw_unbounded(ss); } + ).then([] { + logger.info("test_preemptive_down() ok\n"); }).handle_exception([] (auto eptr) { logger.error("test_preemptive_down() got unexpeted exception {}", eptr); ceph_abort(); @@ -457,7 +418,9 @@ int main(int argc, char** argv) { seastar::app_template app; return app.run(argc, argv, [] { - return test_refused().then([] { + return seastar::futurize_apply([] { + return test_refused(); + }).then([] { return test_bind_same(); }).then([] { return test_accept();