From 32ad076cebfa982b5b3f38d0fdc67f4c54b0f120 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Fri, 15 Feb 2019 11:34:16 +0800 Subject: [PATCH] crimson/net: implement factory method of Socket Signed-off-by: Yingxin Cheng --- src/crimson/net/ProtocolV1.cc | 14 ++++++++------ src/crimson/net/Socket.h | 28 +++++++++++++++++++++++++++- src/crimson/net/SocketMessenger.cc | 14 +++++--------- 3 files changed, 40 insertions(+), 16 deletions(-) diff --git a/src/crimson/net/ProtocolV1.cc b/src/crimson/net/ProtocolV1.cc index 672e04c0f14..03fa8b7ef5b 100644 --- a/src/crimson/net/ProtocolV1.cc +++ b/src/crimson/net/ProtocolV1.cc @@ -296,14 +296,16 @@ void ProtocolV1::start_connect(const entity_addr_t& _peer_addr, messenger.register_conn( seastar::static_pointer_cast(conn.shared_from_this())); seastar::with_gate(pending_dispatch, [this] { - return seastar::connect(conn.peer_addr.in4_addr()) - .then([this](seastar::connected_socket fd) { + return Socket::connect(conn.peer_addr) + .then([this](SocketFRef sock) { + socket = std::move(sock); if (state == state_t::closing) { - fd.shutdown_input(); - fd.shutdown_output(); - throw std::system_error(make_error_code(error::connection_aborted)); + return socket->close().then([] { + throw std::system_error(make_error_code(error::connection_aborted)); + }); } - socket = seastar::make_foreign(std::make_unique(std::move(fd))); + return seastar::now(); + }).then([this] { // read server's handshake header return socket->read(server_header_size); }).then([this] (bufferlist headerbl) { diff --git a/src/crimson/net/Socket.h b/src/crimson/net/Socket.h index 95fc78fbebe..5191a2b5207 100644 --- a/src/crimson/net/Socket.h +++ b/src/crimson/net/Socket.h @@ -8,6 +8,7 @@ #include #include "include/buffer.h" +#include "msg/msg_types.h" namespace ceph::net { @@ -27,14 +28,39 @@ class Socket size_t remaining; } r; + struct construct_tag {}; + public: - explicit Socket(seastar::connected_socket&& _socket) + Socket(seastar::connected_socket&& _socket, construct_tag) : sid{seastar::engine().cpu_id()}, socket(std::move(_socket)), in(socket.input()), out(socket.output()) {} + Socket(Socket&& o) = delete; + static seastar::future + connect(const entity_addr_t& peer_addr) { + return seastar::connect(peer_addr.in4_addr()) + .then([] (seastar::connected_socket socket) { + return seastar::make_foreign(std::make_unique(std::move(socket), + construct_tag{})); + }); + } + + static seastar::future + accept(seastar::server_socket& listener) { + return listener.accept().then([] (seastar::connected_socket socket, + seastar::socket_address paddr) { + entity_addr_t peer_addr; + peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); + return seastar::make_ready_future( + seastar::make_foreign(std::make_unique(std::move(socket), + construct_tag{})), + peer_addr); + }); + } + /// read the requested number of bytes into a bufferlist seastar::future read(size_t bytes); using tmp_buf = seastar::temporary_buffer; diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index 13aa5c56337..6b33e01ffd3 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -146,18 +146,14 @@ seastar::future<> SocketMessenger::do_start(Dispatcher *disp) // start listening if bind() was called if (listener) { seastar::keep_doing([this] { - return listener->accept() - .then([this] (seastar::connected_socket socket, - seastar::socket_address paddr) { - // allocate the connection - entity_addr_t peer_addr; - peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); + return Socket::accept(*listener) + .then([this] (SocketFRef socket, + entity_addr_t peer_addr) { auto shard = locate_shard(peer_addr); + // don't wait before accepting another #warning fixme // we currently do dangerous i/o from a Connection core, different from the Socket core. - auto sock = seastar::make_foreign(std::make_unique(std::move(socket))); - // don't wait before accepting another - container().invoke_on(shard, [sock = std::move(sock), peer_addr, this](auto& msgr) mutable { + container().invoke_on(shard, [sock = std::move(socket), peer_addr, this](auto& msgr) mutable { SocketConnectionRef conn = seastar::make_shared(msgr, *msgr.dispatcher); conn->start_accept(std::move(sock), peer_addr); }); -- 2.39.5