});
}
-seastar::future<>
-SocketConnection::start_connect()
-{
- return seastar::connect(peer_addr.in4_addr())
- .then([this](seastar::connected_socket fd) {
- if (state == state_t::closing) {
- fd.shutdown_input();
- fd.shutdown_output();
- throw std::system_error(make_error_code(error::connection_aborted));
- }
- socket.emplace(std::move(fd));
- // read server's handshake header
- return socket->read(server_header_size);
- }).then([this] (bufferlist headerbl) {
- auto p = headerbl.cbegin();
- validate_banner(p);
- entity_addr_t saddr, caddr;
- ::decode(saddr, p);
- ::decode(caddr, p);
- ceph_assert(p.end());
- validate_peer_addr(saddr, peer_addr);
-
- if (my_addr != caddr) {
- // take peer's address for me, but preserve my nonce
- caddr.nonce = my_addr.nonce;
- my_addr = caddr;
- }
- // encode/send client's handshake header
- bufferlist bl;
- bl.append(buffer::create_static(banner_size, banner));
- ::encode(my_addr, bl, 0);
- h.global_seq = messenger.get_global_seq();
- return socket->write_flush(std::move(bl));
- }).then([=] {
- return seastar::repeat([this] {
- return repeat_connect();
- });
- }).then_wrapped([this] (auto fut) {
- // satisfy the handshake's promise
- fut.forward_to(std::move(h.promise));
- });
-}
-
void
-SocketConnection::connect(const entity_addr_t& _peer_addr,
- const entity_type_t& _peer_type)
+SocketConnection::start_connect(const entity_addr_t& _peer_addr,
+ const entity_type_t& _peer_type)
{
ceph_assert(state == state_t::none);
ceph_assert(!socket);
messenger.register_conn(this);
state = state_t::connecting;
seastar::with_gate(pending_dispatch, [this] {
- return start_connect()
- .then([this] {
+ return seastar::connect(peer_addr.in4_addr())
+ .then([this](seastar::connected_socket fd) {
+ if (state == state_t::closing) {
+ fd.shutdown_input();
+ fd.shutdown_output();
+ throw std::system_error(make_error_code(error::connection_aborted));
+ }
+ socket.emplace(std::move(fd));
+ // read server's handshake header
+ return socket->read(server_header_size);
+ }).then([this] (bufferlist headerbl) {
+ auto p = headerbl.cbegin();
+ validate_banner(p);
+ entity_addr_t saddr, caddr;
+ ::decode(saddr, p);
+ ::decode(caddr, p);
+ ceph_assert(p.end());
+ validate_peer_addr(saddr, peer_addr);
+
+ if (my_addr != caddr) {
+ // take peer's address for me, but preserve my nonce
+ caddr.nonce = my_addr.nonce;
+ my_addr = caddr;
+ }
+ // encode/send client's handshake header
+ bufferlist bl;
+ bl.append(buffer::create_static(banner_size, banner));
+ ::encode(my_addr, bl, 0);
+ h.global_seq = messenger.get_global_seq();
+ return socket->write_flush(std::move(bl));
+ }).then([=] {
+ return seastar::repeat([this] {
+ return repeat_connect();
+ });
+ }).then_wrapped([this] (auto fut) {
+ // TODO: do not forward the exception
+ // and let the reconnect happen transparently inside connection
+ // satisfy the handshake's promise
+ fut.forward_to(std::move(h.promise));
+ }).then([this] {
// notify the dispatcher and allow them to reject the connection
return seastar::with_gate(messenger.pending_dispatch, [this] {
return dispatcher.ms_handle_connect(this);
});
}
-seastar::future<>
-SocketConnection::start_accept()
-{
- // encode/send server's handshake header
- bufferlist bl;
- bl.append(buffer::create_static(banner_size, banner));
- ::encode(my_addr, bl, 0);
- ::encode(peer_addr, bl, 0);
- return socket->write_flush(std::move(bl))
- .then([this] {
- // read client's handshake header and connect request
- return socket->read(client_header_size);
- }).then([this] (bufferlist bl) {
- auto p = bl.cbegin();
- validate_banner(p);
- entity_addr_t addr;
- ::decode(addr, p);
- ceph_assert(p.end());
- if (!addr.is_blank_ip()) {
- peer_addr = addr;
- }
- }).then([this] {
- return seastar::repeat([this] {
- return repeat_handle_connect();
- });
- }).then_wrapped([this] (auto fut) {
- // satisfy the handshake's promise
- fut.forward_to(std::move(h.promise));
- });
-}
-
void
-SocketConnection::accept(seastar::connected_socket&& fd,
- const entity_addr_t& _peer_addr)
+SocketConnection::start_accept(seastar::connected_socket&& fd,
+ const entity_addr_t& _peer_addr)
{
ceph_assert(state == state_t::none);
ceph_assert(!socket);
messenger.accept_conn(this);
state = state_t::accepting;
seastar::with_gate(pending_dispatch, [this] {
- return start_accept()
+ // encode/send server's handshake header
+ bufferlist bl;
+ bl.append(buffer::create_static(banner_size, banner));
+ ::encode(my_addr, bl, 0);
+ ::encode(peer_addr, bl, 0);
+ return socket->write_flush(std::move(bl))
.then([this] {
+ // read client's handshake header and connect request
+ return socket->read(client_header_size);
+ }).then([this] (bufferlist bl) {
+ auto p = bl.cbegin();
+ validate_banner(p);
+ entity_addr_t addr;
+ ::decode(addr, p);
+ ceph_assert(p.end());
+ if (!addr.is_blank_ip()) {
+ peer_addr = addr;
+ }
+ }).then([this] {
+ return seastar::repeat([this] {
+ return repeat_handle_connect();
+ });
+ }).then_wrapped([this] (auto fut) {
+ // TODO: do not forward the exception
+ // and let the reconnect happen transparently inside connection
+ // satisfy the handshake's promise
+ fut.forward_to(std::move(h.promise));
+ }).then([this] {
// notify the dispatcher and allow them to reject the connection
return seastar::with_gate(messenger.pending_dispatch, [=] {
return dispatcher.ms_handle_accept(this);
void execute_open();
- /// start a handshake from the client's perspective,
- /// only call when SocketConnection first construct
- seastar::future<> start_connect();
- /// start a handshake from the server's perspective,
- /// only call when SocketConnection first construct
- seastar::future<> start_accept();
-
public:
SocketConnection(SocketMessenger& messenger,
const entity_addr_t& my_addr,
seastar::future<> close() override;
public:
- void connect(const entity_addr_t& peer_addr,
- const entity_type_t& peer_type);
- void accept(seastar::connected_socket&& socket,
- const entity_addr_t& peer_addr);
+ /// start a handshake from the client's perspective,
+ /// only call when SocketConnection first construct
+ void start_connect(const entity_addr_t& peer_addr,
+ const entity_type_t& peer_type);
+ /// start a handshake from the server's perspective,
+ /// only call when SocketConnection first construct
+ void start_accept(seastar::connected_socket&& socket,
+ const entity_addr_t& peer_addr);
/// read a message from a connection that has completed its handshake
seastar::future<MessageRef> read_message();