From: Yingxin Date: Wed, 21 Nov 2018 22:00:08 +0000 (+0800) Subject: crimson/net: encapsulate protocol implementations with accept/connect X-Git-Tag: v14.1.0~574^2~2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=f56fcd095a68653e2eaa899cd64515beadd5be6e;p=ceph-ci.git crimson/net: encapsulate protocol implementations with accept/connect Signed-off-by: Yingxin --- diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index e0d4c36b915..e09d2b408e5 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -769,52 +769,9 @@ SocketConnection::repeat_connect() }); } -seastar::future<> -SocketConnection::start_connect() -{ - return seastar::connect(peer_addr.in4_addr()) - .then([this](seastar::connected_socket fd) { - if (state == state_t::closing) { - fd.shutdown_input(); - fd.shutdown_output(); - throw std::system_error(make_error_code(error::connection_aborted)); - } - socket.emplace(std::move(fd)); - // read server's handshake header - return socket->read(server_header_size); - }).then([this] (bufferlist headerbl) { - auto p = headerbl.cbegin(); - validate_banner(p); - entity_addr_t saddr, caddr; - ::decode(saddr, p); - ::decode(caddr, p); - ceph_assert(p.end()); - validate_peer_addr(saddr, peer_addr); - - if (my_addr != caddr) { - // take peer's address for me, but preserve my nonce - caddr.nonce = my_addr.nonce; - my_addr = caddr; - } - // encode/send client's handshake header - bufferlist bl; - bl.append(buffer::create_static(banner_size, banner)); - ::encode(my_addr, bl, 0); - h.global_seq = messenger.get_global_seq(); - return socket->write_flush(std::move(bl)); - }).then([=] { - return seastar::repeat([this] { - return repeat_connect(); - }); - }).then_wrapped([this] (auto fut) { - // satisfy the handshake's promise - fut.forward_to(std::move(h.promise)); - }); -} - void -SocketConnection::connect(const entity_addr_t& _peer_addr, - const entity_type_t& _peer_type) +SocketConnection::start_connect(const entity_addr_t& _peer_addr, + const entity_type_t& _peer_type) { ceph_assert(state == state_t::none); ceph_assert(!socket); @@ -823,8 +780,46 @@ SocketConnection::connect(const entity_addr_t& _peer_addr, messenger.register_conn(this); state = state_t::connecting; seastar::with_gate(pending_dispatch, [this] { - return start_connect() - .then([this] { + return seastar::connect(peer_addr.in4_addr()) + .then([this](seastar::connected_socket fd) { + if (state == state_t::closing) { + fd.shutdown_input(); + fd.shutdown_output(); + throw std::system_error(make_error_code(error::connection_aborted)); + } + socket.emplace(std::move(fd)); + // read server's handshake header + return socket->read(server_header_size); + }).then([this] (bufferlist headerbl) { + auto p = headerbl.cbegin(); + validate_banner(p); + entity_addr_t saddr, caddr; + ::decode(saddr, p); + ::decode(caddr, p); + ceph_assert(p.end()); + validate_peer_addr(saddr, peer_addr); + + if (my_addr != caddr) { + // take peer's address for me, but preserve my nonce + caddr.nonce = my_addr.nonce; + my_addr = caddr; + } + // encode/send client's handshake header + bufferlist bl; + bl.append(buffer::create_static(banner_size, banner)); + ::encode(my_addr, bl, 0); + h.global_seq = messenger.get_global_seq(); + return socket->write_flush(std::move(bl)); + }).then([=] { + return seastar::repeat([this] { + return repeat_connect(); + }); + }).then_wrapped([this] (auto fut) { + // TODO: do not forward the exception + // and let the reconnect happen transparently inside connection + // satisfy the handshake's promise + fut.forward_to(std::move(h.promise)); + }).then([this] { // notify the dispatcher and allow them to reject the connection return seastar::with_gate(messenger.pending_dispatch, [this] { return dispatcher.ms_handle_connect(this); @@ -838,40 +833,9 @@ SocketConnection::connect(const entity_addr_t& _peer_addr, }); } -seastar::future<> -SocketConnection::start_accept() -{ - // encode/send server's handshake header - bufferlist bl; - bl.append(buffer::create_static(banner_size, banner)); - ::encode(my_addr, bl, 0); - ::encode(peer_addr, bl, 0); - return socket->write_flush(std::move(bl)) - .then([this] { - // read client's handshake header and connect request - return socket->read(client_header_size); - }).then([this] (bufferlist bl) { - auto p = bl.cbegin(); - validate_banner(p); - entity_addr_t addr; - ::decode(addr, p); - ceph_assert(p.end()); - if (!addr.is_blank_ip()) { - peer_addr = addr; - } - }).then([this] { - return seastar::repeat([this] { - return repeat_handle_connect(); - }); - }).then_wrapped([this] (auto fut) { - // satisfy the handshake's promise - fut.forward_to(std::move(h.promise)); - }); -} - void -SocketConnection::accept(seastar::connected_socket&& fd, - const entity_addr_t& _peer_addr) +SocketConnection::start_accept(seastar::connected_socket&& fd, + const entity_addr_t& _peer_addr) { ceph_assert(state == state_t::none); ceph_assert(!socket); @@ -880,8 +844,34 @@ SocketConnection::accept(seastar::connected_socket&& fd, messenger.accept_conn(this); state = state_t::accepting; seastar::with_gate(pending_dispatch, [this] { - return start_accept() + // encode/send server's handshake header + bufferlist bl; + bl.append(buffer::create_static(banner_size, banner)); + ::encode(my_addr, bl, 0); + ::encode(peer_addr, bl, 0); + return socket->write_flush(std::move(bl)) .then([this] { + // read client's handshake header and connect request + return socket->read(client_header_size); + }).then([this] (bufferlist bl) { + auto p = bl.cbegin(); + validate_banner(p); + entity_addr_t addr; + ::decode(addr, p); + ceph_assert(p.end()); + if (!addr.is_blank_ip()) { + peer_addr = addr; + } + }).then([this] { + return seastar::repeat([this] { + return repeat_handle_connect(); + }); + }).then_wrapped([this] (auto fut) { + // TODO: do not forward the exception + // and let the reconnect happen transparently inside connection + // satisfy the handshake's promise + fut.forward_to(std::move(h.promise)); + }).then([this] { // notify the dispatcher and allow them to reject the connection return seastar::with_gate(messenger.pending_dispatch, [=] { return dispatcher.ms_handle_accept(this); diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index b69d78d3c63..effb594c14f 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -155,13 +155,6 @@ class SocketConnection : public Connection { void execute_open(); - /// start a handshake from the client's perspective, - /// only call when SocketConnection first construct - seastar::future<> start_connect(); - /// start a handshake from the server's perspective, - /// only call when SocketConnection first construct - seastar::future<> start_accept(); - public: SocketConnection(SocketMessenger& messenger, const entity_addr_t& my_addr, @@ -183,10 +176,14 @@ class SocketConnection : public Connection { seastar::future<> close() override; public: - void connect(const entity_addr_t& peer_addr, - const entity_type_t& peer_type); - void accept(seastar::connected_socket&& socket, - const entity_addr_t& peer_addr); + /// start a handshake from the client's perspective, + /// only call when SocketConnection first construct + void start_connect(const entity_addr_t& peer_addr, + const entity_type_t& peer_type); + /// start a handshake from the server's perspective, + /// only call when SocketConnection first construct + void start_accept(seastar::connected_socket&& socket, + const entity_addr_t& peer_addr); /// read a message from a connection that has completed its handshake seastar::future read_message(); diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index 6ecf9d3ddef..8ec2db7b293 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -56,7 +56,7 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp) peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher); // don't wait before accepting another - conn->accept(std::move(socket), peer_addr); + conn->start_accept(std::move(socket), peer_addr); }); }).handle_exception_type([this] (const std::system_error& e) { // stop gracefully on connection_aborted @@ -76,7 +76,7 @@ SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& pe return found; } SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher); - conn->connect(peer_addr, peer_type); + conn->start_connect(peer_addr, peer_type); return conn; }