From 761110982331ee56c9986e98098a26f966f5c62c Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Mon, 20 Feb 2023 14:37:11 +0800 Subject: [PATCH] test/crimson: cleanup test_socket.cc Signed-off-by: Yingxin Cheng --- src/test/crimson/test_socket.cc | 304 +++++++++++++++++--------------- 1 file changed, 157 insertions(+), 147 deletions(-) diff --git a/src/test/crimson/test_socket.cc b/src/test/crimson/test_socket.cc index d43f536db1be9..6d082c101bfbf 100644 --- a/src/test/crimson/test_socket.cc +++ b/src/test/crimson/test_socket.cc @@ -32,61 +32,62 @@ using crimson::net::stop_t; using SocketFRef = seastar::foreign_ptr; -static seastar::logger logger{"crimsontest"}; -static entity_addr_t get_server_addr() { - static int port = 9020; - ++port; - ceph_assert(port < 9030 && "socket and messenger test ports should not overlap"); +seastar::logger &logger() { + return crimson::get_logger(ceph_subsys_test); +} + +entity_addr_t get_server_addr() { entity_addr_t saddr; saddr.parse("127.0.0.1", nullptr); - saddr.set_port(port); + saddr.set_port(9020); return saddr; } future socket_connect(const entity_addr_t& saddr) { - logger.debug("socket_connect() to {} ...", saddr); - return Socket::connect(saddr).then([] (auto socket) { - logger.debug("socket_connect() connected"); + logger().debug("socket_connect() to {} ...", saddr); + return Socket::connect(saddr).then([](auto socket) { + logger().debug("socket_connect() connected"); return socket; }); } future<> test_refused() { - logger.info("test_refused()..."); + logger().info("test_refused()..."); auto saddr = get_server_addr(); return socket_connect(saddr).discard_result().then([saddr] { - logger.error("test_refused(): connection to {} is not refused", saddr); + logger().error("test_refused(): connection to {} is not refused", saddr); ceph_abort(); - }).handle_exception_type([] (const std::system_error& e) { + }).handle_exception_type([](const std::system_error& e) { if (e.code() != std::errc::connection_refused) { - logger.error("test_refused() got unexpeted error {}", e); + logger().error("test_refused() got unexpeted error {}", e); ceph_abort(); } else { - logger.info("test_refused() ok\n"); + logger().info("test_refused() ok\n"); } - }).handle_exception([] (auto eptr) { - logger.error("test_refused() got unexpeted exception {}", eptr); + }).handle_exception([](auto eptr) { + logger().error("test_refused() got unexpeted exception {}", eptr); ceph_abort(); }); } future<> test_bind_same() { - logger.info("test_bind_same()..."); - return FixedCPUServerSocket::create().then([] (auto pss1) { + logger().info("test_bind_same()..."); + return FixedCPUServerSocket::create().then([](auto pss1) { auto saddr = get_server_addr(); return pss1->listen(saddr).safe_then([saddr] { // try to bind the same address - return FixedCPUServerSocket::create().then([saddr] (auto pss2) { + return FixedCPUServerSocket::create( + ).then([saddr](auto pss2) { return pss2->listen(saddr).safe_then([] { - logger.error("test_bind_same() should raise address_in_use"); + logger().error("test_bind_same() should raise address_in_use"); ceph_abort(); }, listen_ertr::all_same_way( - [] (const std::error_code& e) { + [](const std::error_code& e) { if (e == std::errc::address_in_use) { // successful! - logger.info("test_bind_same() ok\n"); + logger().info("test_bind_same() ok\n"); } else { - logger.error("test_bind_same() got unexpected error {}", e); + logger().error("test_bind_same() got unexpected error {}", e); ceph_abort(); } // Note: need to return a explicit ready future, or there will be a @@ -97,25 +98,28 @@ future<> test_bind_same() { }); }); }, listen_ertr::all_same_way( - [saddr] (const std::error_code& e) { - logger.error("test_bind_same(): there is another instance running at {}", - saddr); + [saddr](const std::error_code& e) { + logger().error("test_bind_same(): there is another instance running at {}", + saddr); ceph_abort(); })).then([pss1] { return pss1->shutdown_destroy(); - }).handle_exception([] (auto eptr) { - logger.error("test_bind_same() got unexpeted exception {}", eptr); + }).handle_exception([](auto eptr) { + logger().error("test_bind_same() got unexpeted exception {}", eptr); ceph_abort(); }); }); } future<> test_accept() { - logger.info("test_accept()"); - return FixedCPUServerSocket::create().then([] (auto pss) { + logger().info("test_accept()"); + return FixedCPUServerSocket::create( + ).then([](auto pss) { auto saddr = get_server_addr(); - return pss->listen(saddr).safe_then([pss] { + return pss->listen(saddr + ).safe_then([pss] { return pss->accept([](auto socket, auto paddr) { + logger().info("test_accept(): accepted at shard {}", seastar::this_shard_id()); // simple accept return seastar::sleep(100ms ).then([socket = std::move(socket)]() mutable { @@ -124,69 +128,76 @@ future<> test_accept() { }); }); }, listen_ertr::all_same_way( - [saddr] (const std::error_code& e) { - logger.error("test_accept(): there is another instance running at {}", - saddr); + [saddr](const std::error_code& e) { + logger().error("test_accept(): there is another instance running at {}", + saddr); ceph_abort(); })).then([saddr] { return seastar::when_all( - socket_connect(saddr).then([] (auto socket) { + socket_connect(saddr).then([](auto socket) { return socket->close().finally([cleanup = std::move(socket)] {}); }), - socket_connect(saddr).then([] (auto socket) { + socket_connect(saddr).then([](auto socket) { return socket->close().finally([cleanup = std::move(socket)] {}); }), - socket_connect(saddr).then([] (auto socket) { + socket_connect(saddr).then([](auto socket) { return socket->close().finally([cleanup = std::move(socket)] {}); }) ).discard_result(); }).then([] { // should be enough to be connected locally return seastar::sleep(50ms); }).then([] { - logger.info("test_accept() ok\n"); + logger().info("test_accept() ok\n"); }).then([pss] { return pss->shutdown_destroy(); - }).handle_exception([] (auto eptr) { - logger.error("test_accept() got unexpeted exception {}", eptr); + }).handle_exception([](auto eptr) { + logger().error("test_accept() got unexpeted exception {}", eptr); ceph_abort(); }); }); } class SocketFactory { + static constexpr seastar::shard_id CLIENT_CPU = 0u; SocketRef client_socket; - SocketFRef server_socket; - FixedCPUServerSocket *pss = nullptr; seastar::promise<> server_connected; + static constexpr seastar::shard_id SERVER_CPU = 1u; + FixedCPUServerSocket *pss = nullptr; + SocketRef server_socket; + public: - // cb_client() on CPU#0, cb_server() on CPU#1 template static future<> dispatch_sockets(FuncC&& cb_client, FuncS&& cb_server) { - assert(seastar::this_shard_id() == 0u); + ceph_assert_always(seastar::this_shard_id() == CLIENT_CPU); auto owner = std::make_unique(); auto psf = owner.get(); auto saddr = get_server_addr(); - return seastar::smp::submit_to(1u, [psf, saddr] { - return FixedCPUServerSocket::create().then([psf, saddr] (auto pss) { + return seastar::smp::submit_to(SERVER_CPU, [psf, saddr] { + return FixedCPUServerSocket::create( + ).then([psf, saddr](auto pss) { psf->pss = pss; return pss->listen(saddr - ).safe_then([]{}, listen_ertr::all_same_way( - [saddr] (const std::error_code& e) { - logger.error("dispatch_sockets(): there is another instance running at {}", - saddr); + ).safe_then([] { + }, listen_ertr::all_same_way([saddr](const std::error_code& e) { + logger().error("dispatch_sockets(): there is another instance running at {}", + saddr); ceph_abort(); })); }); }).then([psf, saddr] { return seastar::when_all_succeed( - seastar::smp::submit_to(0u, [psf, saddr] { - return socket_connect(saddr).then([psf] (auto socket) { + seastar::smp::submit_to(CLIENT_CPU, [psf, saddr] { + return socket_connect(saddr).then([psf](auto socket) { + ceph_assert_always(seastar::this_shard_id() == CLIENT_CPU); psf->client_socket = std::move(socket); }); }), - seastar::smp::submit_to(1u, [psf] { - return psf->pss->accept([psf] (auto socket, auto paddr) { - psf->server_socket = seastar::make_foreign(std::move(socket)); - return seastar::smp::submit_to(0u, [psf] { + seastar::smp::submit_to(SERVER_CPU, [psf] { + return psf->pss->accept([psf](auto socket, auto paddr) { + logger().info("dispatch_sockets(): accepted at shard {}", + seastar::this_shard_id()); + ceph_assert_always(SERVER_CPU == seastar::this_shard_id()); + psf->server_socket = std::move(socket); + return seastar::smp::submit_to(CLIENT_CPU, [psf] { psf->server_connected.set_value(); }); }); @@ -198,35 +209,35 @@ class SocketFactory { return psf->server_connected.get_future(); }).then([psf] { if (psf->pss) { - return seastar::smp::submit_to(1u, [psf] { + return seastar::smp::submit_to(SERVER_CPU, [psf] { return psf->pss->shutdown_destroy(); }); } return seastar::now(); }).then([psf, cb_client = std::move(cb_client), - cb_server = std::move(cb_server)] () mutable { - logger.debug("dispatch_sockets(): client/server socket are ready"); + cb_server = std::move(cb_server)]() mutable { + logger().debug("dispatch_sockets(): client/server socket are ready"); return seastar::when_all_succeed( - seastar::smp::submit_to(0u, [socket = psf->client_socket.get(), - cb_client = std::move(cb_client)] { + seastar::smp::submit_to(CLIENT_CPU, + [socket = psf->client_socket.get(), cb_client = std::move(cb_client)] { return cb_client(socket).then([socket] { - logger.debug("closing client socket..."); + logger().debug("closing client socket..."); return socket->close(); - }).handle_exception([] (auto eptr) { - logger.error("dispatch_sockets():" - " cb_client() got unexpeted exception {}", eptr); + }).handle_exception([](auto eptr) { + logger().error("dispatch_sockets():" + " cb_client() got unexpeted exception {}", eptr); ceph_abort(); }); }), - seastar::smp::submit_to(1u, [socket = psf->server_socket.get(), - cb_server = std::move(cb_server)] { + seastar::smp::submit_to(SERVER_CPU, + [socket = psf->server_socket.get(), cb_server = std::move(cb_server)] { return cb_server(socket).then([socket] { - logger.debug("closing server socket..."); + logger().debug("closing server socket..."); return socket->close(); - }).handle_exception([] (auto eptr) { - logger.error("dispatch_sockets():" - " cb_server() got unexpeted exception {}", eptr); + }).handle_exception([](auto eptr) { + logger().error("dispatch_sockets():" + " cb_server() got unexpeted exception {}", eptr); ceph_abort(); }); }) @@ -257,15 +268,15 @@ class Connection { } future<> dispatch_write(unsigned round = 0, bool force_shut = false) { - logger.debug("dispatch_write(round={}, force_shut={})...", round, force_shut); + logger().debug("dispatch_write(round={}, force_shut={})...", round, force_shut); return seastar::repeat([this, round, force_shut] { if (round != 0 && round <= write_count) { return seastar::futurize_invoke([this, force_shut] { if (force_shut) { - logger.debug("dispatch_write() done, force shutdown output"); + logger().debug("dispatch_write() done, force shutdown output"); socket->force_shutdown_out(); } else { - logger.debug("dispatch_write() done"); + logger().debug("dispatch_write() done"); } }).then([] { return seastar::make_ready_future(stop_t::yes); @@ -288,30 +299,30 @@ class Connection { return dispatch_write( ).then([] { ceph_abort(); - }).handle_exception_type([this] (const std::system_error& e) { + }).handle_exception_type([this](const std::system_error& e) { if (e.code() != std::errc::broken_pipe && e.code() != std::errc::connection_reset) { - logger.error("dispatch_write_unbounded(): " - "unexpected error {}", e); + logger().error("dispatch_write_unbounded(): " + "unexpected error {}", e); throw; } // successful - logger.debug("dispatch_write_unbounded(): " - "expected error {}", e); + logger().debug("dispatch_write_unbounded(): " + "expected error {}", e); shutdown(); }); } future<> dispatch_read(unsigned round = 0, bool force_shut = false) { - logger.debug("dispatch_read(round={}, force_shut={})...", round, force_shut); + logger().debug("dispatch_read(round={}, force_shut={})...", round, force_shut); return seastar::repeat([this, round, force_shut] { if (round != 0 && round <= read_count) { return seastar::futurize_invoke([this, force_shut] { if (force_shut) { - logger.debug("dispatch_read() done, force shutdown input"); + logger().debug("dispatch_read() done, force shutdown input"); socket->force_shutdown_in(); } else { - logger.debug("dispatch_read() done"); + logger().debug("dispatch_read() done"); } }).then([] { return seastar::make_ready_future(stop_t::yes); @@ -321,7 +332,7 @@ class Connection { // we want to test both Socket::read() and Socket::read_exactly() if (read_count % 2) { return socket->read(DATA_SIZE * sizeof(uint64_t) - ).then([this] (ceph::bufferlist bl) { + ).then([this](ceph::bufferlist bl) { uint64_t read_data[DATA_SIZE]; auto p = bl.cbegin(); ::ceph::decode_raw(read_data, p); @@ -329,7 +340,7 @@ class Connection { }); } else { return socket->read_exactly(DATA_SIZE * sizeof(uint64_t) - ).then([this] (auto buf) { + ).then([this](auto buf) { auto read_data = reinterpret_cast(buf.get()); verify_data_read(read_data); }); @@ -346,16 +357,16 @@ class Connection { return dispatch_read( ).then([] { ceph_abort(); - }).handle_exception_type([this] (const std::system_error& e) { + }).handle_exception_type([this](const std::system_error& e) { if (e.code() != error::read_eof && e.code() != std::errc::connection_reset) { - logger.error("dispatch_read_unbounded(): " - "unexpected error {}", e); + logger().error("dispatch_read_unbounded(): " + "unexpected error {}", e); throw; } // successful - logger.debug("dispatch_read_unbounded(): " - "expected error {}", e); + logger().debug("dispatch_read_unbounded(): " + "expected error {}", e); shutdown(); }); } @@ -367,10 +378,10 @@ class Connection { public: static future<> dispatch_rw_bounded(Socket* socket, unsigned round, bool force_shut = false) { - logger.debug("dispatch_rw_bounded(round={}, force_shut={})...", - round, force_shut); + logger().debug("dispatch_rw_bounded(round={}, force_shut={})...", + round, force_shut); return seastar::do_with(Connection{socket}, - [round, force_shut] (auto& conn) { + [round, force_shut](auto& conn) { ceph_assert(round != 0); return seastar::when_all_succeed( conn.dispatch_write(round, force_shut), @@ -382,15 +393,15 @@ class Connection { } static future<> dispatch_rw_unbounded(Socket* socket, bool preemptive_shut = false) { - logger.debug("dispatch_rw_unbounded(preemptive_shut={})...", preemptive_shut); - return seastar::do_with(Connection{socket}, [preemptive_shut] (auto& conn) { + logger().debug("dispatch_rw_unbounded(preemptive_shut={})...", preemptive_shut); + return seastar::do_with(Connection{socket}, [preemptive_shut](auto& conn) { return seastar::when_all_succeed( conn.dispatch_write_unbounded(), conn.dispatch_read_unbounded(), seastar::futurize_invoke([&conn, preemptive_shut] { if (preemptive_shut) { return seastar::sleep(100ms).then([&conn] { - logger.debug("dispatch_rw_unbounded() shutdown socket preemptively(100ms)"); + logger().debug("dispatch_rw_unbounded() shutdown socket preemptively(100ms)"); conn.shutdown(); }); } else { @@ -405,63 +416,63 @@ class Connection { }; future<> test_read_write() { - logger.info("test_read_write()..."); + logger().info("test_read_write()..."); return SocketFactory::dispatch_sockets( - [] (auto cs) { return Connection::dispatch_rw_bounded(cs, 128); }, - [] (auto ss) { return Connection::dispatch_rw_bounded(ss, 128); } + [](auto cs) { return Connection::dispatch_rw_bounded(cs, 128); }, + [](auto ss) { return Connection::dispatch_rw_bounded(ss, 128); } ).then([] { - logger.info("test_read_write() ok\n"); - }).handle_exception([] (auto eptr) { - logger.error("test_read_write() got unexpeted exception {}", eptr); + logger().info("test_read_write() ok\n"); + }).handle_exception([](auto eptr) { + logger().error("test_read_write() got unexpeted exception {}", eptr); ceph_abort(); }); } future<> test_unexpected_down() { - logger.info("test_unexpected_down()..."); + logger().info("test_unexpected_down()..."); return SocketFactory::dispatch_sockets( - [] (auto cs) { + [](auto cs) { return Connection::dispatch_rw_bounded(cs, 128, true - ).handle_exception_type([] (const std::system_error& e) { - logger.debug("test_unexpected_down(): client get error {}", e); + ).handle_exception_type([](const std::system_error& e) { + logger().debug("test_unexpected_down(): client get error {}", e); ceph_assert(e.code() == error::read_eof); }); }, - [] (auto ss) { return Connection::dispatch_rw_unbounded(ss); } + [](auto ss) { return Connection::dispatch_rw_unbounded(ss); } ).then([] { - logger.info("test_unexpected_down() ok\n"); - }).handle_exception([] (auto eptr) { - logger.error("test_unexpected_down() got unexpeted exception {}", eptr); + logger().info("test_unexpected_down() ok\n"); + }).handle_exception([](auto eptr) { + logger().error("test_unexpected_down() got unexpeted exception {}", eptr); ceph_abort(); }); } future<> test_shutdown_propagated() { - logger.info("test_shutdown_propagated()..."); + logger().info("test_shutdown_propagated()..."); return SocketFactory::dispatch_sockets( - [] (auto cs) { - logger.debug("test_shutdown_propagated() shutdown client socket"); + [](auto cs) { + logger().debug("test_shutdown_propagated() shutdown client socket"); cs->shutdown(); return seastar::now(); }, - [] (auto ss) { return Connection::dispatch_rw_unbounded(ss); } + [](auto ss) { return Connection::dispatch_rw_unbounded(ss); } ).then([] { - logger.info("test_shutdown_propagated() ok\n"); - }).handle_exception([] (auto eptr) { - logger.error("test_shutdown_propagated() got unexpeted exception {}", eptr); + logger().info("test_shutdown_propagated() ok\n"); + }).handle_exception([](auto eptr) { + logger().error("test_shutdown_propagated() got unexpeted exception {}", eptr); ceph_abort(); }); } future<> test_preemptive_down() { - logger.info("test_preemptive_down()..."); + logger().info("test_preemptive_down()..."); return SocketFactory::dispatch_sockets( - [] (auto cs) { return Connection::dispatch_rw_unbounded(cs, true); }, - [] (auto ss) { return Connection::dispatch_rw_unbounded(ss); } + [](auto cs) { return Connection::dispatch_rw_unbounded(cs, true); }, + [](auto ss) { return Connection::dispatch_rw_unbounded(ss); } ).then([] { - logger.info("test_preemptive_down() ok\n"); - }).handle_exception([] (auto eptr) { - logger.error("test_preemptive_down() got unexpeted exception {}", eptr); + logger().info("test_preemptive_down() ok\n"); + }).handle_exception([](auto eptr) { + logger().error("test_preemptive_down() got unexpeted exception {}", eptr); ceph_abort(); }); } @@ -477,37 +488,36 @@ seastar::future do_test(seastar::app_template& app) CEPH_ENTITY_TYPE_CLIENT, &cluster, &conf_file_list); - return crimson::common::sharded_conf().start(init_params.name, cluster) - .then([conf_file_list] { + return crimson::common::sharded_conf().start(init_params.name, cluster + ).then([conf_file_list] { return local_conf().parse_config_files(conf_file_list); }).then([] { - return local_conf().set_val("ms_inject_internal_delays", "0") - .then([] { - return test_refused(); - }).then([] { - return test_bind_same(); - }).then([] { - return test_accept(); - }).then([] { - return test_read_write(); - }).then([] { - return test_unexpected_down(); - }).then([] { - return test_shutdown_propagated(); - }).then([] { - return test_preemptive_down(); - }).then([] { - logger.info("All tests succeeded"); - // Seastar has bugs to have events undispatched during shutdown, - // which will result in memory leak and thus fail LeakSanitizer. - return seastar::sleep(100ms); - }); + return local_conf().set_val("ms_inject_internal_delays", "0"); + }).then([] { + return test_refused(); + }).then([] { + return test_bind_same(); + }).then([] { + return test_accept(); + }).then([] { + return test_read_write(); + }).then([] { + return test_unexpected_down(); + }).then([] { + return test_shutdown_propagated(); + }).then([] { + return test_preemptive_down(); + }).then([] { + logger().info("All tests succeeded"); + // Seastar has bugs to have events undispatched during shutdown, + // which will result in memory leak and thus fail LeakSanitizer. + return seastar::sleep(100ms); }).then([] { return crimson::common::sharded_conf().stop(); }).then([] { return 0; - }).handle_exception([] (auto eptr) { - logger.error("Test failed: got exception {}", eptr); + }).handle_exception([](auto eptr) { + logger().error("Test failed: got exception {}", eptr); return 1; }); } -- 2.39.5