From: Yingxin Cheng Date: Wed, 26 Apr 2023 09:06:06 +0000 (+0800) Subject: crimson/net: use SocketFRef X-Git-Tag: v19.0.0~951^2~23 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=23c7a85084d9934e252c0c94a6bb39bd3534a861;p=ceph.git crimson/net: use SocketFRef Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/net/FrameAssemblerV2.cc b/src/crimson/net/FrameAssemblerV2.cc index c952bb1bd5693..b621279353e5a 100644 --- a/src/crimson/net/FrameAssemblerV2.cc +++ b/src/crimson/net/FrameAssemblerV2.cc @@ -117,7 +117,7 @@ FrameAssemblerV2::stop_recording() bool FrameAssemblerV2::has_socket() const { assert((socket && conn.socket) || (!socket && !conn.socket)); - return socket != nullptr; + return bool(socket); } bool FrameAssemblerV2::is_socket_valid() const @@ -125,16 +125,17 @@ bool FrameAssemblerV2::is_socket_valid() const return has_socket() && !socket->is_shutdown(); } -SocketRef FrameAssemblerV2::move_socket() +SocketFRef FrameAssemblerV2::move_socket() { assert(has_socket()); conn.set_socket(nullptr); return std::move(socket); } -void FrameAssemblerV2::set_socket(SocketRef &&new_socket) +void FrameAssemblerV2::set_socket(SocketFRef &&new_socket) { assert(!has_socket()); + assert(new_socket); socket = std::move(new_socket); conn.set_socket(socket.get()); assert(is_socket_valid()); @@ -152,7 +153,7 @@ void FrameAssemblerV2::shutdown_socket() socket->shutdown(); } -seastar::future<> FrameAssemblerV2::replace_shutdown_socket(SocketRef &&new_socket) +seastar::future<> FrameAssemblerV2::replace_shutdown_socket(SocketFRef &&new_socket) { assert(has_socket()); assert(socket->is_shutdown()); diff --git a/src/crimson/net/FrameAssemblerV2.h b/src/crimson/net/FrameAssemblerV2.h index a99b5fce14b50..0ba5f53cc9f84 100644 --- a/src/crimson/net/FrameAssemblerV2.h +++ b/src/crimson/net/FrameAssemblerV2.h @@ -38,7 +38,7 @@ public: */ struct mover_t { - SocketRef socket; + SocketFRef socket; ceph::crypto::onwire::rxtx_t session_stream_handlers; ceph::compression::onwire::rxtx_t session_comp_handlers; }; @@ -66,13 +66,13 @@ public: // the socket exists and not shutdown bool is_socket_valid() const; - void set_socket(SocketRef &&); + void set_socket(SocketFRef &&); void learn_socket_ephemeral_port_as_connector(uint16_t port); void shutdown_socket(); - seastar::future<> replace_shutdown_socket(SocketRef &&); + seastar::future<> replace_shutdown_socket(SocketFRef &&); seastar::future<> close_shutdown_socket(); @@ -127,7 +127,7 @@ public: private: bool has_socket() const; - SocketRef move_socket(); + SocketFRef move_socket(); void log_main_preamble(const ceph::bufferlist &bl); @@ -137,7 +137,7 @@ private: SocketConnection &conn; - SocketRef socket; + SocketFRef socket; /* * auth signature diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 9d1956c3c1117..ee8f1a99906f0 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -172,7 +172,7 @@ void ProtocolV2::start_connect(const entity_addr_t& _peer_addr, execute_connecting(); } -void ProtocolV2::start_accept(SocketRef&& new_socket, +void ProtocolV2::start_accept(SocketFRef&& new_socket, const entity_addr_t& _peer_addr) { ceph_assert(state == state_t::NONE); @@ -813,15 +813,16 @@ void ProtocolV2::execute_connecting() abort_protocol(); } return Socket::connect(conn.peer_addr); - }).then([this](SocketRef new_socket) { + }).then([this](SocketRef _new_socket) { logger().debug("{} socket connected", conn); if (unlikely(state != state_t::CONNECTING)) { logger().debug("{} triggered {} during Socket::connect()", conn, get_state_name(state)); - return new_socket->close().then([sock=std::move(new_socket)] { + return _new_socket->close().then([sock=std::move(_new_socket)] { abort_protocol(); }); } + SocketFRef new_socket = seastar::make_foreign(std::move(_new_socket)); if (!has_socket) { frame_assembler->set_socket(std::move(new_socket)); has_socket = true; diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index b6f67b566510e..f81ffdbfbc69a 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -41,7 +41,7 @@ public: void start_connect(const entity_addr_t& peer_addr, const entity_name_t& peer_name); - void start_accept(SocketRef&& socket, + void start_accept(SocketFRef&& socket, const entity_addr_t& peer_addr); seastar::future<> close_clean_yielded(); diff --git a/src/crimson/net/Socket.h b/src/crimson/net/Socket.h index 789a83a6c03a8..3d42abdb44b89 100644 --- a/src/crimson/net/Socket.h +++ b/src/crimson/net/Socket.h @@ -21,6 +21,7 @@ namespace crimson::net { class Socket; using SocketRef = std::unique_ptr; +using SocketFRef = seastar::foreign_ptr; class Socket { struct construct_tag {}; diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 70abd6fc62fe4..9f3740a5c7770 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -118,7 +118,7 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr, } void -SocketConnection::start_accept(SocketRef&& sock, +SocketConnection::start_accept(SocketFRef&& sock, const entity_addr_t& _peer_addr) { assert(seastar::this_shard_id() == msgr_sid); diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index d1d4ae8a71201..f54ac7fd9a474 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -134,7 +134,7 @@ class SocketConnection : public Connection { /// start a handshake from the server's perspective, /// only call when SocketConnection first construct - void start_accept(SocketRef&& socket, + void start_accept(SocketFRef&& socket, const entity_addr_t& peer_addr); seastar::future<> close_clean_yielded(); diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index 97795652388f9..11cec9b974fff 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -207,6 +207,16 @@ SocketMessenger::bind(const entity_addrvec_t& addrs) }); } +seastar::future<> SocketMessenger::accept( + SocketFRef &&socket, const entity_addr_t &peer_addr) +{ + assert(seastar::this_shard_id() == sid); + SocketConnectionRef conn = + seastar::make_shared(*this, dispatchers); + conn->start_accept(std::move(socket), peer_addr); + return seastar::now(); +} + seastar::future<> SocketMessenger::start( const dispatchers_t& _dispatchers) { assert(seastar::this_shard_id() == sid); @@ -217,14 +227,17 @@ seastar::future<> SocketMessenger::start( ceph_assert(get_myaddr().is_msgr2()); ceph_assert(get_myaddr().get_port() > 0); - return listener->accept([this](SocketRef socket, entity_addr_t peer_addr) { - assert(listener->is_fixed()); - assert(seastar::this_shard_id() == sid); + return listener->accept([this](SocketRef _socket, entity_addr_t peer_addr) { assert(get_myaddr().is_msgr2()); - SocketConnectionRef conn = - seastar::make_shared(*this, dispatchers); - conn->start_accept(std::move(socket), peer_addr); - return seastar::now(); + SocketFRef socket = seastar::make_foreign(std::move(_socket)); + if (listener->is_fixed()) { + return accept(std::move(socket), peer_addr); + } else { + return seastar::smp::submit_to(sid, + [this, peer_addr, socket = std::move(socket)]() mutable { + return accept(std::move(socket), peer_addr); + }); + } }); } return seastar::now(); diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h index 6e749abac76ac..36d814379a611 100644 --- a/src/crimson/net/SocketMessenger.h +++ b/src/crimson/net/SocketMessenger.h @@ -152,6 +152,8 @@ public: #endif private: + seastar::future<> accept(SocketFRef &&, const entity_addr_t &); + listen_ertr::future<> do_listen(const entity_addrvec_t& addr); /// try to bind to the first unused port of given address diff --git a/src/test/crimson/test_socket.cc b/src/test/crimson/test_socket.cc index 8df0f1be747ea..1fa029c2ca2b4 100644 --- a/src/test/crimson/test_socket.cc +++ b/src/test/crimson/test_socket.cc @@ -165,7 +165,7 @@ class SocketFactory { ShardedServerSocket *pss = nullptr; seastar::shard_id server_socket_CPU; - SocketRef server_socket; + SocketFRef server_socket; public: template @@ -198,13 +198,14 @@ class SocketFactory { }); }), seastar::smp::submit_to(SERVER_CPU, [psf] { - return psf->pss->accept([psf](auto socket, auto paddr) { + return psf->pss->accept([psf](auto _socket, auto paddr) { logger().info("dispatch_sockets(): accepted at shard {}", seastar::this_shard_id()); psf->server_socket_CPU = seastar::this_shard_id(); if (psf->pss->is_fixed()) { ceph_assert_always(SERVER_CPU == seastar::this_shard_id()); } + SocketFRef socket = seastar::make_foreign(std::move(_socket)); psf->server_socket = std::move(socket); return seastar::smp::submit_to(CLIENT_CPU, [psf] { psf->server_connected.set_value();