From aec42c07b1602f85bbe82ecd570db4f8bafd29be Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Mon, 20 Feb 2023 09:41:58 +0800 Subject: [PATCH] crimson/net: cleanups to Socket.h/cc Signed-off-by: Yingxin Cheng --- src/crimson/net/Socket.cc | 351 ++++++++++++++++++++++------- src/crimson/net/Socket.h | 255 +++++---------------- src/crimson/net/SocketMessenger.cc | 2 +- src/test/crimson/test_socket.cc | 8 +- 4 files changed, 335 insertions(+), 281 deletions(-) diff --git a/src/crimson/net/Socket.cc b/src/crimson/net/Socket.cc index 6434a407f22a5..842304c68f4bf 100644 --- a/src/crimson/net/Socket.cc +++ b/src/crimson/net/Socket.cc @@ -19,6 +19,9 @@ seastar::logger& logger() { return crimson::get_logger(ceph_subsys_ms); } +using tmp_buf = Socket::tmp_buf; +using packet = Socket::packet; + // an input_stream consumer that reads buffer segments into a bufferlist up to // the given number of remaining bytes struct bufferlist_consumer { @@ -28,7 +31,6 @@ struct bufferlist_consumer { bufferlist_consumer(bufferlist& bl, size_t& remaining) : bl(bl), remaining(remaining) {} - using tmp_buf = seastar::temporary_buffer; using consumption_result_type = typename seastar::input_stream::consumption_result_type; // consume some or all of a buffer segment @@ -59,9 +61,58 @@ struct bufferlist_consumer { }; }; +seastar::future<> inject_delay() +{ + if (float delay_period = local_conf()->ms_inject_internal_delays; + delay_period) { + logger().debug("Socket::inject_delay: sleep for {}", delay_period); + return seastar::sleep( + std::chrono::milliseconds((int)(delay_period * 1000.0))); + } + return seastar::now(); +} + +void inject_failure() +{ + if (local_conf()->ms_inject_socket_failures) { + uint64_t rand = + ceph::util::generate_random_number(1, RAND_MAX); + if (rand % local_conf()->ms_inject_socket_failures == 0) { + logger().warn("Socket::inject_failure: injecting socket failure"); + throw std::system_error(make_error_code( + error::negotiation_failure)); + } + } +} + } // anonymous namespace -seastar::future Socket::read(size_t bytes) +Socket::Socket( + seastar::connected_socket &&_socket, + side_t _side, + uint16_t e_port, + construct_tag) + : sid{seastar::this_shard_id()}, + socket(std::move(_socket)), + in(socket.input()), + // the default buffer size 8192 is too small that may impact our write + // performance. see seastar::net::connected_socket::output() + out(socket.output(65536)), + socket_is_shutdown(false), + side(_side), + ephemeral_port(e_port) +{ +} + +Socket::~Socket() +{ +#ifndef NDEBUG + assert(closed); +#endif +} + +seastar::future +Socket::read(size_t bytes) { #ifdef UNIT_TESTS_BUILT return try_trap_pre(next_trap_read).then([bytes, this] { @@ -81,9 +132,9 @@ seastar::future Socket::read(size_t bytes) }); }); #ifdef UNIT_TESTS_BUILT - }).then([this] (auto buf) { + }).then([this](auto buf) { return try_trap_post(next_trap_read - ).then([buf = std::move(buf)] () mutable { + ).then([buf = std::move(buf)]() mutable { return std::move(buf); }); }); @@ -104,20 +155,70 @@ Socket::read_exactly(size_t bytes) { } inject_failure(); return inject_delay( - ).then([buf = std::move(buf)] () mutable { + ).then([buf = std::move(buf)]() mutable { return seastar::make_ready_future(std::move(buf)); }); }); #ifdef UNIT_TESTS_BUILT - }).then([this] (auto buf) { + }).then([this](auto buf) { return try_trap_post(next_trap_read - ).then([buf = std::move(buf)] () mutable { + ).then([buf = std::move(buf)]() mutable { return std::move(buf); }); }); #endif } +seastar::future<> +Socket::write(packet &&buf) +{ +#ifdef UNIT_TESTS_BUILT + return try_trap_pre(next_trap_write + ).then([buf = std::move(buf), this]() mutable { +#endif + inject_failure(); + return inject_delay( + ).then([buf = std::move(buf), this]() mutable { + return out.write(std::move(buf)); + }); +#ifdef UNIT_TESTS_BUILT + }).then([this] { + return try_trap_post(next_trap_write); + }); +#endif +} + +seastar::future<> +Socket::flush() +{ + inject_failure(); + return inject_delay().then([this] { + return out.flush(); + }); +} + +seastar::future<> +Socket::write_flush(packet &&buf) +{ +#ifdef UNIT_TESTS_BUILT + return try_trap_pre(next_trap_write + ).then([buf = std::move(buf), this]() mutable { +#endif + inject_failure(); + return inject_delay( + ).then([buf = std::move(buf), this]() mutable { + return out.write(std::move(buf) + ).then([this] { + return out.flush(); + }); + }); +#ifdef UNIT_TESTS_BUILT + }).then([this] { + return try_trap_post(next_trap_write); + }); +#endif +} + void Socket::shutdown() { socket_is_shutdown = true; socket.shutdown_input(); @@ -127,17 +228,18 @@ void Socket::shutdown() { static inline seastar::future<> close_and_handle_errors(seastar::output_stream& out) { - return out.close().handle_exception_type([] (const std::system_error& e) { + return out.close().handle_exception_type([](const std::system_error& e) { if (e.code() != std::errc::broken_pipe && e.code() != std::errc::connection_reset) { - logger().error("Socket::close(): unexpected error {}", e); + logger().error("Socket::close(): unexpected error {}", e.what()); ceph_abort(); } // can happen when out is already shutdown, ignore }); } -seastar::future<> Socket::close() { +seastar::future<> +Socket::close() { #ifndef NDEBUG ceph_assert(!closed); closed = true; @@ -148,39 +250,54 @@ seastar::future<> Socket::close() { close_and_handle_errors(out) ).then_unpack([] { return seastar::make_ready_future<>(); - }).handle_exception([] (auto eptr) { - logger().error("Socket::close(): unexpected exception {}", eptr); + }).handle_exception([](auto eptr) { + const char *e_what; + try { + std::rethrow_exception(eptr); + } catch (std::exception &e) { + e_what = e.what(); + } + logger().error("Socket::close(): unexpected exception {}", e_what); ceph_abort(); }); } -seastar::future<> Socket::inject_delay () { - if (float delay_period = local_conf()->ms_inject_internal_delays; - delay_period) { - logger().debug("Socket::inject_delay: sleep for {}", delay_period); - return seastar::sleep( - std::chrono::milliseconds((int)(delay_period * 1000.0))); - } - return seastar::now(); +seastar::future +Socket::connect(const entity_addr_t &peer_addr) +{ + inject_failure(); + return inject_delay( + ).then([peer_addr] { + return seastar::connect(peer_addr.in4_addr()); + }).then([peer_addr](seastar::connected_socket socket) { + auto ret = std::make_unique( + std::move(socket), side_t::connector, 0, construct_tag{}); + logger().debug("Socket::connect(): connected to {}, socket {}", + peer_addr, fmt::ptr(ret)); + return ret; + }); } -void Socket::inject_failure() -{ - if (local_conf()->ms_inject_socket_failures) { - uint64_t rand = - ceph::util::generate_random_number(1, RAND_MAX); - if (rand % local_conf()->ms_inject_socket_failures == 0) { - if (true) { - logger().warn("Socket::inject_failure: injecting socket failure"); - throw std::system_error(make_error_code( - crimson::net::error::negotiation_failure)); - } +#ifdef UNIT_TESTS_BUILT +void Socket::set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_) { + blocker = blocker_; + if (type == bp_type_t::READ) { + ceph_assert(next_trap_read == bp_action_t::CONTINUE); + next_trap_read = action; + } else { // type == bp_type_t::WRITE + if (next_trap_write == bp_action_t::CONTINUE) { + next_trap_write = action; + } else if (next_trap_write == bp_action_t::FAULT) { + // do_sweep_messages() may combine multiple write events into one socket write + ceph_assert(action == bp_action_t::FAULT || action == bp_action_t::CONTINUE); + } else { + ceph_abort(); } } } -#ifdef UNIT_TESTS_BUILT -seastar::future<> Socket::try_trap_pre(bp_action_t& trap) { +seastar::future<> +Socket::try_trap_pre(bp_action_t& trap) { auto action = trap; trap = bp_action_t::CONTINUE; switch (action) { @@ -188,7 +305,7 @@ seastar::future<> Socket::try_trap_pre(bp_action_t& trap) { break; case bp_action_t::FAULT: logger().info("[Test] got FAULT"); - throw std::system_error(make_error_code(crimson::net::error::negotiation_failure)); + throw std::system_error(make_error_code(error::negotiation_failure)); case bp_action_t::BLOCK: logger().info("[Test] got BLOCK"); return blocker->block(); @@ -201,7 +318,8 @@ seastar::future<> Socket::try_trap_pre(bp_action_t& trap) { return seastar::make_ready_future<>(); } -seastar::future<> Socket::try_trap_post(bp_action_t& trap) { +seastar::future<> +Socket::try_trap_post(bp_action_t& trap) { auto action = trap; trap = bp_action_t::CONTINUE; switch (action) { @@ -216,94 +334,161 @@ seastar::future<> Socket::try_trap_post(bp_action_t& trap) { } return seastar::make_ready_future<>(); } +#endif -void Socket::set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_) { - blocker = blocker_; - if (type == bp_type_t::READ) { - ceph_assert(next_trap_read == bp_action_t::CONTINUE); - next_trap_read = action; - } else { // type == bp_type_t::WRITE - if (next_trap_write == bp_action_t::CONTINUE) { - next_trap_write = action; - } else if (next_trap_write == bp_action_t::FAULT) { - // do_sweep_messages() may combine multiple write events into one socket write - ceph_assert(action == bp_action_t::FAULT || action == bp_action_t::CONTINUE); - } else { - ceph_abort(); - } - } +FixedCPUServerSocket::FixedCPUServerSocket( + seastar::shard_id cpu, + construct_tag) + : fixed_cpu{cpu} +{ } -#endif -crimson::net::listen_ertr::future<> +FixedCPUServerSocket::~FixedCPUServerSocket() +{ + assert(!listener); + // detect whether user have called destroy() properly + ceph_assert(!service); +} + +listen_ertr::future<> FixedCPUServerSocket::listen(entity_addr_t addr) { - assert(seastar::this_shard_id() == cpu); - logger().trace("FixedCPUServerSocket::listen({})...", addr); - return container().invoke_on_all([addr] (auto& ss) { - ss.addr = addr; + assert(seastar::this_shard_id() == fixed_cpu); + logger().debug("FixedCPUServerSocket({})::listen()...", addr); + return container().invoke_on_all([addr](auto& ss) { + ss.listen_addr = addr; seastar::socket_address s_addr(addr.in4_addr()); seastar::listen_options lo; lo.reuse_address = true; - lo.set_fixed_cpu(ss.cpu); + lo.set_fixed_cpu(ss.fixed_cpu); ss.listener = seastar::listen(s_addr, lo); }).then([] { return listen_ertr::now(); }).handle_exception_type( - [addr] (const std::system_error& e) -> listen_ertr::future<> { + [addr](const std::system_error& e) -> listen_ertr::future<> { if (e.code() == std::errc::address_in_use) { - logger().trace("FixedCPUServerSocket::listen({}): address in use", addr); + logger().debug("FixedCPUServerSocket({})::listen(): address in use", addr); return crimson::ct_error::address_in_use::make(); } else if (e.code() == std::errc::address_not_available) { - logger().trace("FixedCPUServerSocket::listen({}): address not available", + logger().debug("FixedCPUServerSocket({})::listen(): address not available", addr); return crimson::ct_error::address_not_available::make(); } - logger().error("FixedCPUServerSocket::listen({}): " - "got unexpeted error {}", addr, e); + logger().error("FixedCPUServerSocket({})::listen(): " + "got unexpeted error {}", addr, e.what()); ceph_abort(); }); } -seastar::future<> FixedCPUServerSocket::shutdown() +seastar::future<> +FixedCPUServerSocket::accept(accept_func_t &&_fn_accept) +{ + assert(seastar::this_shard_id() == fixed_cpu); + logger().debug("FixedCPUServerSocket({})::accept()...", listen_addr); + return container().invoke_on_all([_fn_accept](auto &ss) { + assert(ss.listener); + ss.fn_accept = _fn_accept; + // gate accepting + // FixedCPUServerSocket::shutdown() will drain the continuations in the gate + // so ignore the returned future + std::ignore = seastar::with_gate(ss.shutdown_gate, [&ss] { + return seastar::keep_doing([&ss] { + return ss.listener->accept( + ).then([&ss](seastar::accept_result accept_result) { + // assert seastar::listen_options::set_fixed_cpu() works + assert(seastar::this_shard_id() == ss.fixed_cpu); + auto [socket, paddr] = std::move(accept_result); + entity_addr_t peer_addr; + peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); + peer_addr.set_type(ss.listen_addr.get_type()); + SocketRef _socket = std::make_unique( + std::move(socket), Socket::side_t::acceptor, + peer_addr.get_port(), Socket::construct_tag{}); + logger().debug("FixedCPUServerSocket({})::accept(): " + "accepted peer {}, socket {}", + ss.listen_addr, peer_addr, fmt::ptr(_socket)); + std::ignore = seastar::with_gate( + ss.shutdown_gate, + [socket=std::move(_socket), peer_addr, &ss]() mutable { + return ss.fn_accept(std::move(socket), peer_addr + ).handle_exception([&ss, peer_addr](auto eptr) { + const char *e_what; + try { + std::rethrow_exception(eptr); + } catch (std::exception &e) { + e_what = e.what(); + } + logger().error("FixedCPUServerSocket({})::accept(): " + "fn_accept(s, {}) got unexpected exception {}", + ss.listen_addr, peer_addr, e_what); + ceph_abort(); + }); + }); + }); + }).handle_exception_type([&ss](const std::system_error& e) { + if (e.code() == std::errc::connection_aborted || + e.code() == std::errc::invalid_argument) { + logger().debug("FixedCPUServerSocket({})::accept(): stopped ({})", + ss.listen_addr, e.what()); + } else { + throw; + } + }).handle_exception([&ss](auto eptr) { + const char *e_what; + try { + std::rethrow_exception(eptr); + } catch (std::exception &e) { + e_what = e.what(); + } + logger().error("FixedCPUServerSocket({})::accept(): " + "got unexpected exception {}", ss.listen_addr, e_what); + ceph_abort(); + }); + }); + }); +} + +seastar::future<> +FixedCPUServerSocket::shutdown_destroy() { - assert(seastar::this_shard_id() == cpu); - logger().trace("FixedCPUServerSocket({})::shutdown()...", addr); - return container().invoke_on_all([] (auto& ss) { + assert(seastar::this_shard_id() == fixed_cpu); + logger().debug("FixedCPUServerSocket({})::shutdown_destroy()...", listen_addr); + // shutdown shards + return container().invoke_on_all([](auto& ss) { if (ss.listener) { ss.listener->abort_accept(); } return ss.shutdown_gate.close(); }).then([this] { - return reset(); - }); -} - -seastar::future<> FixedCPUServerSocket::destroy() -{ - assert(seastar::this_shard_id() == cpu); - return shutdown().then([this] { - // we should only construct/stop shards on #0 - return container().invoke_on(0, [] (auto& ss) { + // destroy shards + return container().invoke_on_all([](auto& ss) { + assert(ss.shutdown_gate.is_closed()); + ss.listen_addr = entity_addr_t(); + ss.listener.reset(); + }); + }).then([this] { + // stop the sharded service: we should only construct/stop shards on #0 + return container().invoke_on(0, [](auto& ss) { assert(ss.service); return ss.service->stop().finally([cleanup = std::move(ss.service)] {}); }); }); } -seastar::future FixedCPUServerSocket::create() +seastar::future +FixedCPUServerSocket::create() { - auto cpu = seastar::this_shard_id(); - // we should only construct/stop shards on #0 - return seastar::smp::submit_to(0, [cpu] { + auto fixed_cpu = seastar::this_shard_id(); + // start the sharded service: we should only construct/stop shards on #0 + return seastar::smp::submit_to(0, [fixed_cpu] { auto service = std::make_unique(); - return service->start(cpu, construct_tag{} - ).then([service = std::move(service)] () mutable { + return service->start(fixed_cpu, construct_tag{} + ).then([service = std::move(service)]() mutable { auto p_shard = service.get(); p_shard->local().service = std::move(service); return p_shard; }); - }).then([] (auto p_shard) { + }).then([](auto p_shard) { return &p_shard->local(); }); } diff --git a/src/crimson/net/Socket.h b/src/crimson/net/Socket.h index 5b8f6517dd2a8..f7393a9f34118 100644 --- a/src/crimson/net/Socket.h +++ b/src/crimson/net/Socket.h @@ -23,96 +23,59 @@ namespace crimson::net { class Socket; using SocketRef = std::unique_ptr; -class Socket -{ +class Socket { struct construct_tag {}; - public: +public: // if acceptor side, peer is using a different port (ephemeral_port) // if connector side, I'm using a different port (ephemeral_port) enum class side_t { acceptor, connector }; + Socket(seastar::connected_socket &&, side_t, uint16_t e_port, construct_tag); - Socket(seastar::connected_socket&& _socket, side_t _side, uint16_t e_port, construct_tag) - : sid{seastar::this_shard_id()}, - socket(std::move(_socket)), - in(socket.input()), - // the default buffer size 8192 is too small that may impact our write - // performance. see seastar::net::connected_socket::output() - out(socket.output(65536)), - socket_is_shutdown(false), - side(_side), - ephemeral_port(e_port) {} - - ~Socket() { -#ifndef NDEBUG - assert(closed); -#endif - } + ~Socket(); Socket(Socket&& o) = delete; - static seastar::future - connect(const entity_addr_t& peer_addr) { - inject_failure(); - return inject_delay( - ).then([peer_addr] { - return seastar::connect(peer_addr.in4_addr()); - }).then([] (seastar::connected_socket socket) { - return std::make_unique( - std::move(socket), side_t::connector, 0, construct_tag{}); - }); + side_t get_side() const { + return side; + } + + uint16_t get_ephemeral_port() const { + return ephemeral_port; + } + + seastar::socket_address get_local_address() const { + return socket.local_address(); + } + + bool is_shutdown() const { + return socket_is_shutdown; + } + + // learn my ephemeral_port as connector. + // unfortunately, there's no way to identify which port I'm using as + // connector with current seastar interface. + void learn_ephemeral_port_as_connector(uint16_t port) { + assert(side == side_t::connector && + (ephemeral_port == 0 || ephemeral_port == port)); + ephemeral_port = port; } /// read the requested number of bytes into a bufferlist seastar::future read(size_t bytes); + using tmp_buf = seastar::temporary_buffer; using packet = seastar::net::packet; seastar::future read_exactly(size_t bytes); - seastar::future<> write(packet&& buf) { -#ifdef UNIT_TESTS_BUILT - return try_trap_pre(next_trap_write - ).then([buf = std::move(buf), this] () mutable { -#endif - inject_failure(); - return inject_delay( - ).then([buf = std::move(buf), this] () mutable { - return out.write(std::move(buf)); - }); -#ifdef UNIT_TESTS_BUILT - }).then([this] { - return try_trap_post(next_trap_write); - }); -#endif - } - seastar::future<> flush() { - inject_failure(); - return inject_delay().then([this] { - return out.flush(); - }); - } - seastar::future<> write_flush(packet&& buf) { -#ifdef UNIT_TESTS_BUILT - return try_trap_pre(next_trap_write).then([buf = std::move(buf), this] () mutable { -#endif - inject_failure(); - return inject_delay( - ).then([buf = std::move(buf), this] () mutable { - return out.write(std::move(buf)).then([this] { return out.flush(); }); - }); -#ifdef UNIT_TESTS_BUILT - }).then([this] { - return try_trap_post(next_trap_write); - }); -#endif - } + seastar::future<> write(packet &&buf); - bool is_shutdown() const { - return socket_is_shutdown; - } + seastar::future<> flush(); + + seastar::future<> write_flush(packet &&buf); // preemptively disable further reads or writes, can only be shutdown once. void shutdown(); @@ -120,9 +83,12 @@ class Socket /// Socket can only be closed once. seastar::future<> close(); - static seastar::future<> inject_delay(); + static seastar::future + connect(const entity_addr_t& peer_addr); - static void inject_failure(); + /* + * test interfaces + */ // shutdown for tests void force_shutdown() { @@ -140,28 +106,7 @@ class Socket socket.shutdown_output(); } - side_t get_side() const { - return side; - } - - uint16_t get_ephemeral_port() const { - return ephemeral_port; - } - - // learn my ephemeral_port as connector. - // unfortunately, there's no way to identify which port I'm using as - // connector with current seastar interface. - void learn_ephemeral_port_as_connector(uint16_t port) { - assert(side == side_t::connector && - (ephemeral_port == 0 || ephemeral_port == port)); - ephemeral_port = port; - } - - seastar::socket_address get_local_address() const { - return socket.local_address(); - } - - private: +private: const seastar::shard_id sid; seastar::connected_socket socket; seastar::input_stream in; @@ -181,15 +126,17 @@ class Socket } r; #ifdef UNIT_TESTS_BUILT - public: +public: void set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_); - private: +private: + seastar::future<> try_trap_pre(bp_action_t& trap); + + seastar::future<> try_trap_post(bp_action_t& trap); + bp_action_t next_trap_read = bp_action_t::CONTINUE; bp_action_t next_trap_write = bp_action_t::CONTINUE; socket_blocker* blocker = nullptr; - seastar::future<> try_trap_pre(bp_action_t& trap); - seastar::future<> try_trap_post(bp_action_t& trap); #endif friend class FixedCPUServerSocket; @@ -202,38 +149,12 @@ using listen_ertr = crimson::errorator< class FixedCPUServerSocket : public seastar::peering_sharded_service { - const seastar::shard_id cpu; - entity_addr_t addr; - std::optional listener; - seastar::gate shutdown_gate; - using accept_func_t = - std::function(SocketRef, entity_addr_t)>; - accept_func_t fn_accept; - - using sharded_service_t = seastar::sharded; - std::unique_ptr service; - struct construct_tag {}; - static seastar::logger& logger() { - return crimson::get_logger(ceph_subsys_ms); - } - - seastar::future<> reset() { - return container().invoke_on_all([] (auto& ss) { - assert(ss.shutdown_gate.is_closed()); - ss.addr = entity_addr_t(); - ss.listener.reset(); - }); - } - public: - FixedCPUServerSocket(seastar::shard_id cpu, construct_tag) : cpu{cpu} {} - ~FixedCPUServerSocket() { - assert(!listener); - // detect whether user have called destroy() properly - ceph_assert(!service); - } + FixedCPUServerSocket(seastar::shard_id cpu, construct_tag); + + ~FixedCPUServerSocket(); FixedCPUServerSocket(FixedCPUServerSocket&&) = delete; FixedCPUServerSocket(const FixedCPUServerSocket&) = delete; @@ -241,75 +162,23 @@ public: listen_ertr::future<> listen(entity_addr_t addr); - seastar::future<> accept(accept_func_t &&_fn_accept) { - assert(seastar::this_shard_id() == cpu); - logger().debug("FixedCPUServerSocket({})::accept()...", addr); - return container().invoke_on_all([_fn_accept](auto &ss) { - assert(ss.listener); - ss.fn_accept = _fn_accept; - // gate accepting - // FixedCPUServerSocket::shutdown() will drain the continuations in the gate - // so ignore the returned future - std::ignore = seastar::with_gate(ss.shutdown_gate, [&ss] { - return seastar::keep_doing([&ss] { - return ss.listener->accept( - ).then([&ss](seastar::accept_result accept_result) { - // assert seastar::listen_options::set_fixed_cpu() works - assert(seastar::this_shard_id() == ss.cpu); - auto [socket, paddr] = std::move(accept_result); - entity_addr_t peer_addr; - peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); - peer_addr.set_type(ss.addr.get_type()); - SocketRef _socket = std::make_unique( - std::move(socket), Socket::side_t::acceptor, - peer_addr.get_port(), Socket::construct_tag{}); - logger().debug("FixedCPUServerSocket({})::accept(): " - "accepted peer {}, socket {}", - ss.addr, peer_addr, fmt::ptr(_socket)); - std::ignore = seastar::with_gate( - ss.shutdown_gate, - [socket=std::move(_socket), peer_addr, &ss]() mutable { - return ss.fn_accept(std::move(socket), peer_addr - ).handle_exception([&ss, peer_addr](auto eptr) { - const char *e_what; - try { - std::rethrow_exception(eptr); - } catch (std::exception &e) { - e_what = e.what(); - } - logger().error("FixedCPUServerSocket({})::accept(): " - "fn_accept(s, {}) got unexpected exception {}", - ss.addr, peer_addr, e_what); - ceph_abort(); - }); - }); - }); - }).handle_exception_type([&ss](const std::system_error& e) { - if (e.code() == std::errc::connection_aborted || - e.code() == std::errc::invalid_argument) { - logger().debug("FixedCPUServerSocket({})::accept(): stopped ({})", - ss.addr, e.what()); - } else { - throw; - } - }).handle_exception([&ss](auto eptr) { - const char *e_what; - try { - std::rethrow_exception(eptr); - } catch (std::exception &e) { - e_what = e.what(); - } - logger().error("FixedCPUServerSocket({})::accept(): " - "got unexpected exception {}", ss.addr, e_what); - ceph_abort(); - }); - }); - }); - } + using accept_func_t = + std::function(SocketRef, entity_addr_t)>; + seastar::future<> accept(accept_func_t &&_fn_accept); + + seastar::future<> shutdown_destroy(); - seastar::future<> shutdown(); - seastar::future<> destroy(); static seastar::future create(); + +private: + const seastar::shard_id fixed_cpu; + entity_addr_t listen_addr; + std::optional listener; + seastar::gate shutdown_gate; + accept_func_t fn_accept; + + using sharded_service_t = seastar::sharded; + std::unique_ptr service; }; } // namespace crimson::net diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index 8a83c1d59ceff..f0607c9a44597 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -255,7 +255,7 @@ seastar::future<> SocketMessenger::shutdown() if (listener) { auto d_listener = listener; listener = nullptr; - return d_listener->destroy(); + return d_listener->shutdown_destroy(); } else { return seastar::now(); } diff --git a/src/test/crimson/test_socket.cc b/src/test/crimson/test_socket.cc index 4861f0c91fa78..d43f536db1be9 100644 --- a/src/test/crimson/test_socket.cc +++ b/src/test/crimson/test_socket.cc @@ -93,7 +93,7 @@ future<> test_bind_same() { // runtime error: member access within null pointer of type 'struct promise_base' return seastar::now(); })).then([pss2] { - return pss2->destroy(); + return pss2->shutdown_destroy(); }); }); }, listen_ertr::all_same_way( @@ -102,7 +102,7 @@ future<> test_bind_same() { saddr); ceph_abort(); })).then([pss1] { - return pss1->destroy(); + return pss1->shutdown_destroy(); }).handle_exception([] (auto eptr) { logger.error("test_bind_same() got unexpeted exception {}", eptr); ceph_abort(); @@ -143,7 +143,7 @@ future<> test_accept() { }).then([] { logger.info("test_accept() ok\n"); }).then([pss] { - return pss->destroy(); + return pss->shutdown_destroy(); }).handle_exception([] (auto eptr) { logger.error("test_accept() got unexpeted exception {}", eptr); ceph_abort(); @@ -199,7 +199,7 @@ class SocketFactory { }).then([psf] { if (psf->pss) { return seastar::smp::submit_to(1u, [psf] { - return psf->pss->destroy(); + return psf->pss->shutdown_destroy(); }); } return seastar::now(); -- 2.39.5