From: Yingxin Cheng Date: Fri, 17 Feb 2023 09:03:02 +0000 (+0800) Subject: crimson/net: drop the template from FixedCPUServerSocket::accept() X-Git-Tag: v18.2.1~166^2~36 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=d05fbd05e7a660453535a1bae33eb0ccd69a1e8f;p=ceph-ci.git crimson/net: drop the template from FixedCPUServerSocket::accept() Signed-off-by: Yingxin Cheng (cherry picked from commit 91c5ddc7ef06b1eaabddf06efc6ab8ed4859e27d) --- diff --git a/src/crimson/net/Socket.h b/src/crimson/net/Socket.h index b6125eb8a02..5b8f6517dd2 100644 --- a/src/crimson/net/Socket.h +++ b/src/crimson/net/Socket.h @@ -206,6 +206,9 @@ class FixedCPUServerSocket entity_addr_t addr; std::optional listener; seastar::gate shutdown_gate; + using accept_func_t = + std::function(SocketRef, entity_addr_t)>; + accept_func_t fn_accept; using sharded_service_t = seastar::sharded; std::unique_ptr service; @@ -238,24 +241,19 @@ public: listen_ertr::future<> listen(entity_addr_t addr); - // fn_accept should be a nothrow function of type - // seastar::future<>(SocketRef, entity_addr_t) - template - seastar::future<> accept(Func&& fn_accept) { + seastar::future<> accept(accept_func_t &&_fn_accept) { assert(seastar::this_shard_id() == cpu); - logger().trace("FixedCPUServerSocket({})::accept()...", addr); - return container().invoke_on_all( - [fn_accept = std::move(fn_accept)] (auto& ss) mutable { + logger().debug("FixedCPUServerSocket({})::accept()...", addr); + return container().invoke_on_all([_fn_accept](auto &ss) { assert(ss.listener); + ss.fn_accept = _fn_accept; // gate accepting // FixedCPUServerSocket::shutdown() will drain the continuations in the gate // so ignore the returned future - std::ignore = seastar::with_gate(ss.shutdown_gate, - [&ss, fn_accept = std::move(fn_accept)] () mutable { - return seastar::keep_doing([&ss, fn_accept = std::move(fn_accept)] () mutable { - return ss.listener->accept().then( - [&ss, fn_accept = std::move(fn_accept)] - (seastar::accept_result accept_result) mutable { + std::ignore = seastar::with_gate(ss.shutdown_gate, [&ss] { + return seastar::keep_doing([&ss] { + return ss.listener->accept( + ).then([&ss](seastar::accept_result accept_result) { // assert seastar::listen_options::set_fixed_cpu() works assert(seastar::this_shard_id() == ss.cpu); auto [socket, paddr] = std::move(accept_result); @@ -265,31 +263,44 @@ public: SocketRef _socket = std::make_unique( std::move(socket), Socket::side_t::acceptor, peer_addr.get_port(), Socket::construct_tag{}); - std::ignore = seastar::with_gate(ss.shutdown_gate, - [socket = std::move(_socket), peer_addr, - &ss, fn_accept = std::move(fn_accept)] () mutable { - logger().trace("FixedCPUServerSocket({})::accept(): " - "accepted peer {}", ss.addr, peer_addr); - return fn_accept(std::move(socket), peer_addr - ).handle_exception([&ss, peer_addr] (auto eptr) { + logger().debug("FixedCPUServerSocket({})::accept(): " + "accepted peer {}, socket {}", + ss.addr, peer_addr, fmt::ptr(_socket)); + std::ignore = seastar::with_gate( + ss.shutdown_gate, + [socket=std::move(_socket), peer_addr, &ss]() mutable { + return ss.fn_accept(std::move(socket), peer_addr + ).handle_exception([&ss, peer_addr](auto eptr) { + const char *e_what; + try { + std::rethrow_exception(eptr); + } catch (std::exception &e) { + e_what = e.what(); + } logger().error("FixedCPUServerSocket({})::accept(): " "fn_accept(s, {}) got unexpected exception {}", - ss.addr, peer_addr, eptr); + ss.addr, peer_addr, e_what); ceph_abort(); }); }); }); - }).handle_exception_type([&ss] (const std::system_error& e) { + }).handle_exception_type([&ss](const std::system_error& e) { if (e.code() == std::errc::connection_aborted || e.code() == std::errc::invalid_argument) { - logger().trace("FixedCPUServerSocket({})::accept(): stopped ({})", - ss.addr, e); + logger().debug("FixedCPUServerSocket({})::accept(): stopped ({})", + ss.addr, e.what()); } else { throw; } - }).handle_exception([&ss] (auto eptr) { + }).handle_exception([&ss](auto eptr) { + const char *e_what; + try { + std::rethrow_exception(eptr); + } catch (std::exception &e) { + e_what = e.what(); + } logger().error("FixedCPUServerSocket({})::accept(): " - "got unexpected exception {}", ss.addr, eptr); + "got unexpected exception {}", ss.addr, e_what); ceph_abort(); }); }); diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index a112b50800d..8a83c1d59ce 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -214,7 +214,7 @@ seastar::future<> SocketMessenger::start( ceph_assert(get_myaddr().is_msgr2()); ceph_assert(get_myaddr().get_port() > 0); - return listener->accept([this] (SocketRef socket, entity_addr_t peer_addr) { + return listener->accept([this](SocketRef socket, entity_addr_t peer_addr) { assert(seastar::this_shard_id() == master_sid); assert(get_myaddr().is_msgr2()); SocketConnectionRef conn = diff --git a/src/test/crimson/test_socket.cc b/src/test/crimson/test_socket.cc index 5fd596a0928..4861f0c91fa 100644 --- a/src/test/crimson/test_socket.cc +++ b/src/test/crimson/test_socket.cc @@ -115,10 +115,12 @@ future<> test_accept() { return FixedCPUServerSocket::create().then([] (auto pss) { auto saddr = get_server_addr(); return pss->listen(saddr).safe_then([pss] { - return pss->accept([] (auto socket, auto paddr) { + return pss->accept([](auto socket, auto paddr) { // simple accept - return seastar::sleep(100ms).then([socket = std::move(socket)] () mutable { - return socket->close().finally([cleanup = std::move(socket)] {}); + return seastar::sleep(100ms + ).then([socket = std::move(socket)]() mutable { + return socket->close( + ).finally([cleanup = std::move(socket)] {}); }); }); }, listen_ertr::all_same_way(