From 9eda402ff4345b347f5da2b050e8f6ed87db7f68 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Fri, 10 Mar 2023 10:22:56 +0800 Subject: [PATCH] crimson/net/socket: extend the sockets to multiple shards Signed-off-by: Yingxin Cheng --- src/crimson/net/Socket.cc | 91 +++++++++++++++++------------- src/crimson/net/Socket.h | 24 ++++---- src/crimson/net/SocketMessenger.cc | 3 +- src/crimson/net/SocketMessenger.h | 5 +- src/test/crimson/test_socket.cc | 64 +++++++++++++-------- 5 files changed, 113 insertions(+), 74 deletions(-) diff --git a/src/crimson/net/Socket.cc b/src/crimson/net/Socket.cc index 842304c68f4..a28211911c8 100644 --- a/src/crimson/net/Socket.cc +++ b/src/crimson/net/Socket.cc @@ -241,7 +241,7 @@ close_and_handle_errors(seastar::output_stream& out) seastar::future<> Socket::close() { #ifndef NDEBUG - ceph_assert(!closed); + ceph_assert_always(!closed); closed = true; #endif return seastar::when_all_succeed( @@ -282,14 +282,14 @@ Socket::connect(const entity_addr_t &peer_addr) void Socket::set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_) { blocker = blocker_; if (type == bp_type_t::READ) { - ceph_assert(next_trap_read == bp_action_t::CONTINUE); + ceph_assert_always(next_trap_read == bp_action_t::CONTINUE); next_trap_read = action; } else { // type == bp_type_t::WRITE if (next_trap_write == bp_action_t::CONTINUE) { next_trap_write = action; } else if (next_trap_write == bp_action_t::FAULT) { // do_sweep_messages() may combine multiple write events into one socket write - ceph_assert(action == bp_action_t::FAULT || action == bp_action_t::CONTINUE); + ceph_assert_always(action == bp_action_t::FAULT || action == bp_action_t::CONTINUE); } else { ceph_abort(); } @@ -336,67 +336,77 @@ Socket::try_trap_post(bp_action_t& trap) { } #endif -FixedCPUServerSocket::FixedCPUServerSocket( - seastar::shard_id cpu, +#define SERVER_SOCKET ShardedServerSocket + +template +SERVER_SOCKET::ShardedServerSocket( + seastar::shard_id sid, construct_tag) - : fixed_cpu{cpu} + : primary_sid{sid} { } -FixedCPUServerSocket::~FixedCPUServerSocket() +template +SERVER_SOCKET::~ShardedServerSocket() { assert(!listener); // detect whether user have called destroy() properly - ceph_assert(!service); + ceph_assert_always(!service); } +template listen_ertr::future<> -FixedCPUServerSocket::listen(entity_addr_t addr) +SERVER_SOCKET::listen(entity_addr_t addr) { - assert(seastar::this_shard_id() == fixed_cpu); - logger().debug("FixedCPUServerSocket({})::listen()...", addr); - return container().invoke_on_all([addr](auto& ss) { + ceph_assert_always(seastar::this_shard_id() == primary_sid); + logger().debug("ShardedServerSocket({})::listen()...", addr); + return this->container().invoke_on_all([addr](auto& ss) { ss.listen_addr = addr; seastar::socket_address s_addr(addr.in4_addr()); seastar::listen_options lo; lo.reuse_address = true; - lo.set_fixed_cpu(ss.fixed_cpu); + if constexpr (IS_FIXED_CPU) { + lo.set_fixed_cpu(ss.primary_sid); + } ss.listener = seastar::listen(s_addr, lo); }).then([] { return listen_ertr::now(); }).handle_exception_type( [addr](const std::system_error& e) -> listen_ertr::future<> { if (e.code() == std::errc::address_in_use) { - logger().debug("FixedCPUServerSocket({})::listen(): address in use", addr); + logger().debug("ShardedServerSocket({})::listen(): address in use", addr); return crimson::ct_error::address_in_use::make(); } else if (e.code() == std::errc::address_not_available) { - logger().debug("FixedCPUServerSocket({})::listen(): address not available", + logger().debug("ShardedServerSocket({})::listen(): address not available", addr); return crimson::ct_error::address_not_available::make(); } - logger().error("FixedCPUServerSocket({})::listen(): " + logger().error("ShardedServerSocket({})::listen(): " "got unexpeted error {}", addr, e.what()); ceph_abort(); }); } +template seastar::future<> -FixedCPUServerSocket::accept(accept_func_t &&_fn_accept) +SERVER_SOCKET::accept(accept_func_t &&_fn_accept) { - assert(seastar::this_shard_id() == fixed_cpu); - logger().debug("FixedCPUServerSocket({})::accept()...", listen_addr); - return container().invoke_on_all([_fn_accept](auto &ss) { + ceph_assert_always(seastar::this_shard_id() == primary_sid); + logger().debug("ShardedServerSocket({})::accept()...", listen_addr); + return this->container().invoke_on_all([_fn_accept](auto &ss) { assert(ss.listener); ss.fn_accept = _fn_accept; // gate accepting - // FixedCPUServerSocket::shutdown() will drain the continuations in the gate + // ShardedServerSocket::shutdown() will drain the continuations in the gate // so ignore the returned future std::ignore = seastar::with_gate(ss.shutdown_gate, [&ss] { return seastar::keep_doing([&ss] { return ss.listener->accept( ).then([&ss](seastar::accept_result accept_result) { - // assert seastar::listen_options::set_fixed_cpu() works - assert(seastar::this_shard_id() == ss.fixed_cpu); + if constexpr (IS_FIXED_CPU) { + // see seastar::listen_options::set_fixed_cpu() + ceph_assert_always(seastar::this_shard_id() == ss.primary_sid); + } auto [socket, paddr] = std::move(accept_result); entity_addr_t peer_addr; peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); @@ -404,7 +414,7 @@ FixedCPUServerSocket::accept(accept_func_t &&_fn_accept) SocketRef _socket = std::make_unique( std::move(socket), Socket::side_t::acceptor, peer_addr.get_port(), Socket::construct_tag{}); - logger().debug("FixedCPUServerSocket({})::accept(): " + logger().debug("ShardedServerSocket({})::accept(): " "accepted peer {}, socket {}", ss.listen_addr, peer_addr, fmt::ptr(_socket)); std::ignore = seastar::with_gate( @@ -418,7 +428,7 @@ FixedCPUServerSocket::accept(accept_func_t &&_fn_accept) } catch (std::exception &e) { e_what = e.what(); } - logger().error("FixedCPUServerSocket({})::accept(): " + logger().error("ShardedServerSocket({})::accept(): " "fn_accept(s, {}) got unexpected exception {}", ss.listen_addr, peer_addr, e_what); ceph_abort(); @@ -428,7 +438,7 @@ FixedCPUServerSocket::accept(accept_func_t &&_fn_accept) }).handle_exception_type([&ss](const std::system_error& e) { if (e.code() == std::errc::connection_aborted || e.code() == std::errc::invalid_argument) { - logger().debug("FixedCPUServerSocket({})::accept(): stopped ({})", + logger().debug("ShardedServerSocket({})::accept(): stopped ({})", ss.listen_addr, e.what()); } else { throw; @@ -440,7 +450,7 @@ FixedCPUServerSocket::accept(accept_func_t &&_fn_accept) } catch (std::exception &e) { e_what = e.what(); } - logger().error("FixedCPUServerSocket({})::accept(): " + logger().error("ShardedServerSocket({})::accept(): " "got unexpected exception {}", ss.listen_addr, e_what); ceph_abort(); }); @@ -448,41 +458,43 @@ FixedCPUServerSocket::accept(accept_func_t &&_fn_accept) }); } +template seastar::future<> -FixedCPUServerSocket::shutdown_destroy() +SERVER_SOCKET::shutdown_destroy() { - assert(seastar::this_shard_id() == fixed_cpu); - logger().debug("FixedCPUServerSocket({})::shutdown_destroy()...", listen_addr); + assert(seastar::this_shard_id() == primary_sid); + logger().debug("ShardedServerSocket({})::shutdown_destroy()...", listen_addr); // shutdown shards - return container().invoke_on_all([](auto& ss) { + return this->container().invoke_on_all([](auto& ss) { if (ss.listener) { ss.listener->abort_accept(); } return ss.shutdown_gate.close(); }).then([this] { // destroy shards - return container().invoke_on_all([](auto& ss) { + return this->container().invoke_on_all([](auto& ss) { assert(ss.shutdown_gate.is_closed()); ss.listen_addr = entity_addr_t(); ss.listener.reset(); }); }).then([this] { // stop the sharded service: we should only construct/stop shards on #0 - return container().invoke_on(0, [](auto& ss) { + return this->container().invoke_on(0, [](auto& ss) { assert(ss.service); return ss.service->stop().finally([cleanup = std::move(ss.service)] {}); }); }); } -seastar::future -FixedCPUServerSocket::create() +template +seastar::future +SERVER_SOCKET::create() { - auto fixed_cpu = seastar::this_shard_id(); + auto primary_sid = seastar::this_shard_id(); // start the sharded service: we should only construct/stop shards on #0 - return seastar::smp::submit_to(0, [fixed_cpu] { + return seastar::smp::submit_to(0, [primary_sid] { auto service = std::make_unique(); - return service->start(fixed_cpu, construct_tag{} + return service->start(primary_sid, construct_tag{} ).then([service = std::move(service)]() mutable { auto p_shard = service.get(); p_shard->local().service = std::move(service); @@ -493,4 +505,7 @@ FixedCPUServerSocket::create() }); } +template class ShardedServerSocket; +template class ShardedServerSocket; + } // namespace crimson::net diff --git a/src/crimson/net/Socket.h b/src/crimson/net/Socket.h index f7393a9f341..beb66c08bfb 100644 --- a/src/crimson/net/Socket.h +++ b/src/crimson/net/Socket.h @@ -139,7 +139,7 @@ private: socket_blocker* blocker = nullptr; #endif - friend class FixedCPUServerSocket; + friend class ShardedServerSocket; }; using listen_ertr = crimson::errorator< @@ -147,18 +147,19 @@ using listen_ertr = crimson::errorator< crimson::ct_error::address_not_available // https://techoverflow.net/2021/08/06/how-i-fixed-python-oserror-errno-99-cannot-assign-requested-address/ >; -class FixedCPUServerSocket - : public seastar::peering_sharded_service { +template +class ShardedServerSocket + : public seastar::peering_sharded_service > { struct construct_tag {}; public: - FixedCPUServerSocket(seastar::shard_id cpu, construct_tag); + ShardedServerSocket(seastar::shard_id sid, construct_tag); - ~FixedCPUServerSocket(); + ~ShardedServerSocket(); - FixedCPUServerSocket(FixedCPUServerSocket&&) = delete; - FixedCPUServerSocket(const FixedCPUServerSocket&) = delete; - FixedCPUServerSocket& operator=(const FixedCPUServerSocket&) = delete; + ShardedServerSocket(ShardedServerSocket&&) = delete; + ShardedServerSocket(const ShardedServerSocket&) = delete; + ShardedServerSocket& operator=(const ShardedServerSocket&) = delete; listen_ertr::future<> listen(entity_addr_t addr); @@ -168,16 +169,17 @@ public: seastar::future<> shutdown_destroy(); - static seastar::future create(); + static seastar::future create(); private: - const seastar::shard_id fixed_cpu; + // the fixed CPU if IS_FIXED_CPU is true + const seastar::shard_id primary_sid; entity_addr_t listen_addr; std::optional listener; seastar::gate shutdown_gate; accept_func_t fn_accept; - using sharded_service_t = seastar::sharded; + using sharded_service_t = seastar::sharded; std::unique_ptr service; }; diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index f0607c9a445..8bc7ebbbedc 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -91,7 +91,8 @@ SocketMessenger::do_listen(const entity_addrvec_t& addrs) set_myaddrs(addrs); return seastar::futurize_invoke([this] { if (!listener) { - return FixedCPUServerSocket::create().then([this] (auto _listener) { + return ShardedServerSocket::create( + ).then([this] (auto _listener) { listener = _listener; }); } else { diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h index 4eebaab3080..60510666a12 100644 --- a/src/crimson/net/SocketMessenger.h +++ b/src/crimson/net/SocketMessenger.h @@ -29,7 +29,8 @@ namespace crimson::net { -class FixedCPUServerSocket; +template +class ShardedServerSocket; class SocketMessenger final : public Messenger { const seastar::shard_id master_sid; @@ -42,7 +43,7 @@ class SocketMessenger final : public Messenger { crimson::auth::AuthClient* auth_client = nullptr; crimson::auth::AuthServer* auth_server = nullptr; - FixedCPUServerSocket* listener = nullptr; + ShardedServerSocket *listener = nullptr; ChainedDispatchers dispatchers; std::map connections; std::set accepting_conns; diff --git a/src/test/crimson/test_socket.cc b/src/test/crimson/test_socket.cc index 916583cc5c5..423ae0cf255 100644 --- a/src/test/crimson/test_socket.cc +++ b/src/test/crimson/test_socket.cc @@ -24,8 +24,8 @@ using namespace std::chrono_literals; using seastar::engine; using seastar::future; using crimson::net::error; -using crimson::net::FixedCPUServerSocket; using crimson::net::listen_ertr; +using crimson::net::ShardedServerSocket; using crimson::net::Socket; using crimson::net::SocketRef; using crimson::net::stop_t; @@ -70,13 +70,14 @@ future<> test_refused() { }); } +template future<> test_bind_same() { logger().info("test_bind_same()..."); - return FixedCPUServerSocket::create().then([](auto pss1) { + return ShardedServerSocket::create().then([](auto pss1) { auto saddr = get_server_addr(); return pss1->listen(saddr).safe_then([saddr] { // try to bind the same address - return FixedCPUServerSocket::create( + return ShardedServerSocket::create( ).then([saddr](auto pss2) { return pss2->listen(saddr).safe_then([] { logger().error("test_bind_same() should raise address_in_use"); @@ -111,9 +112,10 @@ future<> test_bind_same() { }); } +template future<> test_accept() { logger().info("test_accept()"); - return FixedCPUServerSocket::create( + return ShardedServerSocket::create( ).then([](auto pss) { auto saddr = get_server_addr(); return pss->listen(saddr @@ -155,13 +157,16 @@ future<> test_accept() { }); } +template class SocketFactory { static constexpr seastar::shard_id CLIENT_CPU = 0u; SocketRef client_socket; seastar::promise<> server_connected; static constexpr seastar::shard_id SERVER_CPU = 1u; - FixedCPUServerSocket *pss = nullptr; + ShardedServerSocket *pss = nullptr; + + seastar::shard_id server_socket_CPU; SocketRef server_socket; public: @@ -172,7 +177,7 @@ class SocketFactory { auto psf = owner.get(); auto saddr = get_server_addr(); return seastar::smp::submit_to(SERVER_CPU, [psf, saddr] { - return FixedCPUServerSocket::create( + return ShardedServerSocket::create( ).then([psf, saddr](auto pss) { psf->pss = pss; return pss->listen(saddr @@ -195,7 +200,10 @@ class SocketFactory { return psf->pss->accept([psf](auto socket, auto paddr) { logger().info("dispatch_sockets(): accepted at shard {}", seastar::this_shard_id()); - ceph_assert_always(SERVER_CPU == seastar::this_shard_id()); + psf->server_socket_CPU = seastar::this_shard_id(); + if constexpr (IS_FIXED_CPU) { + ceph_assert_always(SERVER_CPU == seastar::this_shard_id()); + } psf->server_socket = std::move(socket); return seastar::smp::submit_to(CLIENT_CPU, [psf] { psf->server_connected.set_value(); @@ -230,7 +238,7 @@ class SocketFactory { ceph_abort(); }); }), - seastar::smp::submit_to(SERVER_CPU, + seastar::smp::submit_to(psf->server_socket_CPU, [socket = psf->server_socket.get(), cb_server = std::move(cb_server)] { return cb_server(socket).then([socket] { logger().debug("closing server socket..."); @@ -416,9 +424,10 @@ class Connection { } }; +template future<> test_read_write() { logger().info("test_read_write()..."); - return SocketFactory::dispatch_sockets( + return SocketFactory::dispatch_sockets( [](auto cs) { return Connection::dispatch_rw_bounded(cs, 128); }, [](auto ss) { return Connection::dispatch_rw_bounded(ss, 128); } ).then([] { @@ -429,9 +438,10 @@ future<> test_read_write() { }); } +template future<> test_unexpected_down() { logger().info("test_unexpected_down()..."); - return SocketFactory::dispatch_sockets( + return SocketFactory::dispatch_sockets( [](auto cs) { return Connection::dispatch_rw_bounded(cs, 128, true ).handle_exception_type([](const std::system_error& e) { @@ -448,9 +458,10 @@ future<> test_unexpected_down() { }); } +template future<> test_shutdown_propagated() { logger().info("test_shutdown_propagated()..."); - return SocketFactory::dispatch_sockets( + return SocketFactory::dispatch_sockets( [](auto cs) { logger().debug("test_shutdown_propagated() shutdown client socket"); cs->shutdown(); @@ -465,9 +476,10 @@ future<> test_shutdown_propagated() { }); } +template future<> test_preemptive_down() { logger().info("test_preemptive_down()..."); - return SocketFactory::dispatch_sockets( + return SocketFactory::dispatch_sockets( [](auto cs) { return Connection::dispatch_rw_unbounded(cs, true); }, [](auto ss) { return Connection::dispatch_rw_unbounded(ss); } ).then([] { @@ -478,6 +490,22 @@ future<> test_preemptive_down() { }); } +template +future<> do_test_with_type() { + return test_bind_same( + ).then([] { + return test_accept(); + }).then([] { + return test_read_write(); + }).then([] { + return test_unexpected_down(); + }).then([] { + return test_shutdown_propagated(); + }).then([] { + return test_preemptive_down(); + }); +} + } seastar::future do_test(seastar::app_template& app) @@ -497,17 +525,9 @@ seastar::future do_test(seastar::app_template& app) }).then([] { return test_refused(); }).then([] { - return test_bind_same(); - }).then([] { - return test_accept(); - }).then([] { - return test_read_write(); - }).then([] { - return test_unexpected_down(); - }).then([] { - return test_shutdown_propagated(); + return do_test_with_type(); }).then([] { - return test_preemptive_down(); + return do_test_with_type(); }).then([] { logger().info("All tests succeeded"); // Seastar has bugs to have events undispatched during shutdown, -- 2.39.5