From: Yingxin Cheng Date: Wed, 26 Apr 2023 08:13:48 +0000 (+0800) Subject: crimson/net: change the static IS_FIXED_CPU to runtime X-Git-Tag: v19.0.0~951^2~26 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=58491826d54bbbfeeab1543337c8ff327218db74;p=ceph.git crimson/net: change the static IS_FIXED_CPU to runtime Otherwise the messenger implementation needs to be templated. Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/net/Socket.cc b/src/crimson/net/Socket.cc index 342053a3615db..1b8e863f7209a 100644 --- a/src/crimson/net/Socket.cc +++ b/src/crimson/net/Socket.cc @@ -340,27 +340,23 @@ Socket::try_trap_post(bp_action_t& trap) { } #endif -#define SERVER_SOCKET ShardedServerSocket - -template -SERVER_SOCKET::ShardedServerSocket( +ShardedServerSocket::ShardedServerSocket( seastar::shard_id sid, + bool is_fixed_cpu, construct_tag) - : primary_sid{sid} + : primary_sid{sid}, is_fixed_cpu{is_fixed_cpu} { } -template -SERVER_SOCKET::~ShardedServerSocket() +ShardedServerSocket::~ShardedServerSocket() { assert(!listener); // detect whether user have called destroy() properly ceph_assert_always(!service); } -template listen_ertr::future<> -SERVER_SOCKET::listen(entity_addr_t addr) +ShardedServerSocket::listen(entity_addr_t addr) { ceph_assert_always(seastar::this_shard_id() == primary_sid); logger().debug("ShardedServerSocket({})::listen()...", addr); @@ -369,7 +365,7 @@ SERVER_SOCKET::listen(entity_addr_t addr) seastar::socket_address s_addr(addr.in4_addr()); seastar::listen_options lo; lo.reuse_address = true; - if constexpr (IS_FIXED_CPU) { + if (ss.is_fixed_cpu) { lo.set_fixed_cpu(ss.primary_sid); } ss.listener = seastar::listen(s_addr, lo); @@ -391,9 +387,8 @@ SERVER_SOCKET::listen(entity_addr_t addr) }); } -template seastar::future<> -SERVER_SOCKET::accept(accept_func_t &&_fn_accept) +ShardedServerSocket::accept(accept_func_t &&_fn_accept) { ceph_assert_always(seastar::this_shard_id() == primary_sid); logger().debug("ShardedServerSocket({})::accept()...", listen_addr); @@ -407,10 +402,12 @@ SERVER_SOCKET::accept(accept_func_t &&_fn_accept) return seastar::keep_doing([&ss] { return ss.listener->accept( ).then([&ss](seastar::accept_result accept_result) { - if constexpr (IS_FIXED_CPU) { +#ifndef NDEBUG + if (ss.is_fixed_cpu) { // see seastar::listen_options::set_fixed_cpu() ceph_assert_always(seastar::this_shard_id() == ss.primary_sid); } +#endif auto [socket, paddr] = std::move(accept_result); entity_addr_t peer_addr; peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); @@ -419,8 +416,8 @@ SERVER_SOCKET::accept(accept_func_t &&_fn_accept) std::move(socket), Socket::side_t::acceptor, peer_addr.get_port(), Socket::construct_tag{}); logger().debug("ShardedServerSocket({})::accept(): " - "accepted peer {}, socket {}", - ss.listen_addr, peer_addr, fmt::ptr(_socket)); + "accepted peer {}, socket {}, is_fixed = {}", + ss.listen_addr, peer_addr, fmt::ptr(_socket), ss.is_fixed_cpu); std::ignore = seastar::with_gate( ss.shutdown_gate, [socket=std::move(_socket), peer_addr, &ss]() mutable { @@ -462,9 +459,8 @@ SERVER_SOCKET::accept(accept_func_t &&_fn_accept) }); } -template seastar::future<> -SERVER_SOCKET::shutdown_destroy() +ShardedServerSocket::shutdown_destroy() { assert(seastar::this_shard_id() == primary_sid); logger().debug("ShardedServerSocket({})::shutdown_destroy()...", listen_addr); @@ -490,15 +486,14 @@ SERVER_SOCKET::shutdown_destroy() }); } -template -seastar::future -SERVER_SOCKET::create() +seastar::future +ShardedServerSocket::create(bool is_fixed_cpu) { auto primary_sid = seastar::this_shard_id(); // start the sharded service: we should only construct/stop shards on #0 - return seastar::smp::submit_to(0, [primary_sid] { + return seastar::smp::submit_to(0, [primary_sid, is_fixed_cpu] { auto service = std::make_unique(); - return service->start(primary_sid, construct_tag{} + return service->start(primary_sid, is_fixed_cpu, construct_tag{} ).then([service = std::move(service)]() mutable { auto p_shard = service.get(); p_shard->local().service = std::move(service); @@ -509,7 +504,4 @@ SERVER_SOCKET::create() }); } -template class ShardedServerSocket; -template class ShardedServerSocket; - } // namespace crimson::net diff --git a/src/crimson/net/Socket.h b/src/crimson/net/Socket.h index 58a4484aa87c1..6241c9fbad265 100644 --- a/src/crimson/net/Socket.h +++ b/src/crimson/net/Socket.h @@ -144,20 +144,22 @@ using listen_ertr = crimson::errorator< crimson::ct_error::address_not_available // https://techoverflow.net/2021/08/06/how-i-fixed-python-oserror-errno-99-cannot-assign-requested-address/ >; -template class ShardedServerSocket - : public seastar::peering_sharded_service > { + : public seastar::peering_sharded_service { struct construct_tag {}; public: - ShardedServerSocket(seastar::shard_id sid, construct_tag); + ShardedServerSocket(seastar::shard_id sid, bool is_fixed_cpu, construct_tag); ~ShardedServerSocket(); ShardedServerSocket(ShardedServerSocket&&) = delete; ShardedServerSocket(const ShardedServerSocket&) = delete; + ShardedServerSocket& operator=(ShardedServerSocket&&) = delete; ShardedServerSocket& operator=(const ShardedServerSocket&) = delete; + bool is_fixed() const { return is_fixed_cpu; } + listen_ertr::future<> listen(entity_addr_t addr); using accept_func_t = @@ -166,11 +168,12 @@ public: seastar::future<> shutdown_destroy(); - static seastar::future create(); + static seastar::future create(bool is_fixed_cpu); private: - // the fixed CPU if IS_FIXED_CPU is true + // the fixed CPU if is_fixed_cpu is true const seastar::shard_id primary_sid; + const bool is_fixed_cpu; entity_addr_t listen_addr; std::optional listener; seastar::gate shutdown_gate; diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index b3856e6f9a352..97795652388f9 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -92,7 +92,7 @@ SocketMessenger::do_listen(const entity_addrvec_t& addrs) set_myaddrs(addrs); return seastar::futurize_invoke([this] { if (!listener) { - return ShardedServerSocket::create( + return ShardedServerSocket::create(true ).then([this] (auto _listener) { listener = _listener; }); @@ -218,6 +218,7 @@ seastar::future<> SocketMessenger::start( ceph_assert(get_myaddr().get_port() > 0); return listener->accept([this](SocketRef socket, entity_addr_t peer_addr) { + assert(listener->is_fixed()); assert(seastar::this_shard_id() == sid); assert(get_myaddr().is_msgr2()); SocketConnectionRef conn = diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h index 940894ce1b3d1..6e749abac76ac 100644 --- a/src/crimson/net/SocketMessenger.h +++ b/src/crimson/net/SocketMessenger.h @@ -29,7 +29,6 @@ namespace crimson::net { -template class ShardedServerSocket; class SocketMessenger final : public Messenger { @@ -169,7 +168,7 @@ private: crimson::auth::AuthClient* auth_client = nullptr; crimson::auth::AuthServer* auth_server = nullptr; - ShardedServerSocket *listener = nullptr; + ShardedServerSocket *listener = nullptr; ChainedDispatchers dispatchers; std::map connections; std::set accepting_conns; diff --git a/src/test/crimson/test_socket.cc b/src/test/crimson/test_socket.cc index 4ca75c6e96191..8df0f1be747ea 100644 --- a/src/test/crimson/test_socket.cc +++ b/src/test/crimson/test_socket.cc @@ -70,14 +70,14 @@ future<> test_refused() { }); } -template -future<> test_bind_same() { +future<> test_bind_same(bool is_fixed_cpu) { logger().info("test_bind_same()..."); - return ShardedServerSocket::create().then([](auto pss1) { + return ShardedServerSocket::create(is_fixed_cpu + ).then([is_fixed_cpu](auto pss1) { auto saddr = get_server_addr(); - return pss1->listen(saddr).safe_then([saddr] { + return pss1->listen(saddr).safe_then([saddr, is_fixed_cpu] { // try to bind the same address - return ShardedServerSocket::create( + return ShardedServerSocket::create(is_fixed_cpu ).then([saddr](auto pss2) { return pss2->listen(saddr).safe_then([] { logger().error("test_bind_same() should raise address_in_use"); @@ -112,10 +112,9 @@ future<> test_bind_same() { }); } -template -future<> test_accept() { +future<> test_accept(bool is_fixed_cpu) { logger().info("test_accept()"); - return ShardedServerSocket::create( + return ShardedServerSocket::create(is_fixed_cpu ).then([](auto pss) { auto saddr = get_server_addr(); return pss->listen(saddr @@ -157,27 +156,29 @@ future<> test_accept() { }); } -template class SocketFactory { static constexpr seastar::shard_id CLIENT_CPU = 0u; SocketRef client_socket; seastar::promise<> server_connected; static constexpr seastar::shard_id SERVER_CPU = 1u; - ShardedServerSocket *pss = nullptr; + ShardedServerSocket *pss = nullptr; seastar::shard_id server_socket_CPU; SocketRef server_socket; public: template - static future<> dispatch_sockets(FuncC&& cb_client, FuncS&& cb_server) { + static future<> dispatch_sockets( + bool is_fixed_cpu, + FuncC&& cb_client, + FuncS&& cb_server) { ceph_assert_always(seastar::this_shard_id() == CLIENT_CPU); auto owner = std::make_unique(); auto psf = owner.get(); auto saddr = get_server_addr(); - return seastar::smp::submit_to(SERVER_CPU, [psf, saddr] { - return ShardedServerSocket::create( + return seastar::smp::submit_to(SERVER_CPU, [psf, saddr, is_fixed_cpu] { + return ShardedServerSocket::create(is_fixed_cpu ).then([psf, saddr](auto pss) { psf->pss = pss; return pss->listen(saddr @@ -201,7 +202,7 @@ class SocketFactory { logger().info("dispatch_sockets(): accepted at shard {}", seastar::this_shard_id()); psf->server_socket_CPU = seastar::this_shard_id(); - if constexpr (IS_FIXED_CPU) { + if (psf->pss->is_fixed()) { ceph_assert_always(SERVER_CPU == seastar::this_shard_id()); } psf->server_socket = std::move(socket); @@ -426,10 +427,10 @@ class Connection { } }; -template -future<> test_read_write() { +future<> test_read_write(bool is_fixed_cpu) { logger().info("test_read_write()..."); - return SocketFactory::dispatch_sockets( + return SocketFactory::dispatch_sockets( + is_fixed_cpu, [](auto cs) { return Connection::dispatch_rw_bounded(cs, 128); }, [](auto ss) { return Connection::dispatch_rw_bounded(ss, 128); } ).then([] { @@ -440,10 +441,10 @@ future<> test_read_write() { }); } -template -future<> test_unexpected_down() { +future<> test_unexpected_down(bool is_fixed_cpu) { logger().info("test_unexpected_down()..."); - return SocketFactory::dispatch_sockets( + return SocketFactory::dispatch_sockets( + is_fixed_cpu, [](auto cs) { return Connection::dispatch_rw_bounded(cs, 128, true ).handle_exception_type([](const std::system_error& e) { @@ -460,10 +461,10 @@ future<> test_unexpected_down() { }); } -template -future<> test_shutdown_propagated() { +future<> test_shutdown_propagated(bool is_fixed_cpu) { logger().info("test_shutdown_propagated()..."); - return SocketFactory::dispatch_sockets( + return SocketFactory::dispatch_sockets( + is_fixed_cpu, [](auto cs) { logger().debug("test_shutdown_propagated() shutdown client socket"); cs->shutdown(); @@ -478,10 +479,10 @@ future<> test_shutdown_propagated() { }); } -template -future<> test_preemptive_down() { +future<> test_preemptive_down(bool is_fixed_cpu) { logger().info("test_preemptive_down()..."); - return SocketFactory::dispatch_sockets( + return SocketFactory::dispatch_sockets( + is_fixed_cpu, [](auto cs) { return Connection::dispatch_rw_unbounded(cs, true); }, [](auto ss) { return Connection::dispatch_rw_unbounded(ss); } ).then([] { @@ -492,19 +493,18 @@ future<> test_preemptive_down() { }); } -template -future<> do_test_with_type() { - return test_bind_same( - ).then([] { - return test_accept(); - }).then([] { - return test_read_write(); - }).then([] { - return test_unexpected_down(); - }).then([] { - return test_shutdown_propagated(); - }).then([] { - return test_preemptive_down(); +future<> do_test_with_type(bool is_fixed_cpu) { + return test_bind_same(is_fixed_cpu + ).then([is_fixed_cpu] { + return test_accept(is_fixed_cpu); + }).then([is_fixed_cpu] { + return test_read_write(is_fixed_cpu); + }).then([is_fixed_cpu] { + return test_unexpected_down(is_fixed_cpu); + }).then([is_fixed_cpu] { + return test_shutdown_propagated(is_fixed_cpu); + }).then([is_fixed_cpu] { + return test_preemptive_down(is_fixed_cpu); }); } @@ -527,9 +527,9 @@ seastar::future do_test(seastar::app_template& app) }).then([] { return test_refused(); }).then([] { - return do_test_with_type(); + return do_test_with_type(true); }).then([] { - return do_test_with_type(); + return do_test_with_type(false); }).then([] { logger().info("All tests succeeded"); // Seastar has bugs to have events undispatched during shutdown,