From f368daf5a1ccb57377e598579da7cd7a87a6bae9 Mon Sep 17 00:00:00 2001 From: Yingxin Date: Tue, 16 Oct 2018 04:54:23 +0800 Subject: [PATCH] crimson/net: implement accepting/connecting states - introduce the new accepting/connecting connection states. - return ConnectionRef immediately when connect(). - manage the ownership of the accepting connections. - manage the ownership of the registered connections. - encapsulate a Socket class because it is not created when constructing a SocketConnection, and allow it to be replaced in the future. - refactor related interfaces. Signed-off-by: Yingxin --- src/crimson/CMakeLists.txt | 3 +- src/crimson/mon/MonClient.cc | 15 +- src/crimson/net/Connection.h | 6 +- src/crimson/net/Messenger.h | 7 +- src/crimson/net/Socket.cc | 70 +++++++++ src/crimson/net/Socket.h | 56 ++++++++ src/crimson/net/SocketConnection.cc | 215 +++++++++++----------------- src/crimson/net/SocketConnection.h | 40 ++---- src/crimson/net/SocketMessenger.cc | 87 ++++++----- src/crimson/net/SocketMessenger.h | 9 +- 10 files changed, 295 insertions(+), 213 deletions(-) create mode 100644 src/crimson/net/Socket.cc create mode 100644 src/crimson/net/Socket.h diff --git a/src/crimson/CMakeLists.txt b/src/crimson/CMakeLists.txt index 519c1629840..cc17277c7b0 100644 --- a/src/crimson/CMakeLists.txt +++ b/src/crimson/CMakeLists.txt @@ -118,7 +118,8 @@ set(crimson_net_srcs net/Dispatcher.cc net/Errors.cc net/SocketConnection.cc - net/SocketMessenger.cc) + net/SocketMessenger.cc + net/Socket.cc) set(crimson_thread_srcs thread/ThreadPool.cc thread/Throttle.cc) diff --git a/src/crimson/mon/MonClient.cc b/src/crimson/mon/MonClient.cc index ab8e53ce854..9f1994ba9f5 100644 --- a/src/crimson/mon/MonClient.cc +++ b/src/crimson/mon/MonClient.cc @@ -500,14 +500,13 @@ seastar::future<> Client::reopen_session(int rank) return seastar::parallel_for_each(mons, [this](auto rank) { auto peer = monmap.get_addr(rank); logger().info("connecting to mon.{}", rank); - return msgr.connect(peer, CEPH_ENTITY_TYPE_MON).then([this](auto conn) { - auto& mc = pending_conns.emplace_back(conn, &keyring); - return mc.authenticate( - monmap.get_epoch(), entity_name, - auth_methods, want_keys).handle_exception([conn](auto ep) { - return conn->close().then([ep = std::move(ep)] { - std::rethrow_exception(ep); - }); + auto conn = msgr.connect(peer, CEPH_ENTITY_TYPE_MON); + auto& mc = pending_conns.emplace_back(conn, &keyring); + return mc.authenticate( + monmap.get_epoch(), entity_name, + auth_methods, want_keys).handle_exception([conn](auto ep) { + return conn->close().then([ep = std::move(ep)] { + std::rethrow_exception(ep); }); }).then([peer, this] { if (!is_hunting()) { diff --git a/src/crimson/net/Connection.h b/src/crimson/net/Connection.h index 6aa600d3b5e..cc2f4eabf85 100644 --- a/src/crimson/net/Connection.h +++ b/src/crimson/net/Connection.h @@ -29,11 +29,11 @@ class Connection : public boost::intrusive_ref_counter start(Dispatcher *dispatcher) = 0; - /// establish a client connection and complete a handshake - virtual seastar::future connect(const entity_addr_t& addr, - entity_type_t peer_type) = 0; + /// either return an existing connection to the peer, + /// or a new pending connection + virtual ConnectionRef connect(const entity_addr_t& peer_addr, + const entity_type_t& peer_type) = 0; /// stop listenening and wait for all connections to close. safe to destruct /// after this future becomes available diff --git a/src/crimson/net/Socket.cc b/src/crimson/net/Socket.cc new file mode 100644 index 00000000000..3c12c61d8cd --- /dev/null +++ b/src/crimson/net/Socket.cc @@ -0,0 +1,70 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "Socket.h" + +#include "Errors.h" + +namespace ceph::net { + +namespace { + +// an input_stream consumer that reads buffer segments into a bufferlist up to +// the given number of remaining bytes +struct bufferlist_consumer { + bufferlist& bl; + size_t& remaining; + + bufferlist_consumer(bufferlist& bl, size_t& remaining) + : bl(bl), remaining(remaining) {} + + using tmp_buf = seastar::temporary_buffer; + using consumption_result_type = typename seastar::input_stream::consumption_result_type; + + // consume some or all of a buffer segment + seastar::future operator()(tmp_buf&& data) { + if (remaining >= data.size()) { + // consume the whole buffer + remaining -= data.size(); + bl.append(buffer::create_foreign(std::move(data))); + if (remaining > 0) { + // return none to request more segments + return seastar::make_ready_future( + seastar::continue_consuming{}); + } else { + // return an empty buffer to singal that we're done + return seastar::make_ready_future( + consumption_result_type::stop_consuming_type({})); + } + } + if (remaining > 0) { + // consume the front + bl.append(buffer::create_foreign(data.share(0, remaining))); + data.trim_front(remaining); + remaining = 0; + } + // give the rest back to signal that we're done + return seastar::make_ready_future( + consumption_result_type::stop_consuming_type{std::move(data)}); + }; +}; + +} // anonymous namespace + +seastar::future Socket::read(size_t bytes) +{ + if (bytes == 0) { + return seastar::make_ready_future(); + } + r.buffer.clear(); + r.remaining = bytes; + return in.consume(bufferlist_consumer{r.buffer, r.remaining}) + .then([this] { + if (r.remaining) { // throw on short reads + throw std::system_error(make_error_code(error::read_eof)); + } + return seastar::make_ready_future(std::move(r.buffer)); + }); +} + +} // namespace ceph::net diff --git a/src/crimson/net/Socket.h b/src/crimson/net/Socket.h new file mode 100644 index 00000000000..07ab18954d1 --- /dev/null +++ b/src/crimson/net/Socket.h @@ -0,0 +1,56 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include + +#include "include/buffer.h" + +namespace ceph::net { + +class Socket +{ + seastar::connected_socket socket; + seastar::input_stream in; + seastar::output_stream out; + + /// buffer state for read() + struct { + bufferlist buffer; + size_t remaining; + } r; + + public: + explicit Socket(seastar::connected_socket&& _socket) + : socket(std::move(_socket)), + in(socket.input()), + out(socket.output()) {} + Socket(Socket&& o) = default; + + /// read the requested number of bytes into a bufferlist + seastar::future read(size_t bytes); + using tmp_buf = seastar::temporary_buffer; + using packet = seastar::net::packet; + seastar::future read_exactly(size_t bytes) { + return in.read_exactly(bytes); + } + + seastar::future<> write(packet&& buf) { + return out.write(std::move(buf)); + } + seastar::future<> flush() { + return out.flush(); + } + seastar::future<> write_flush(packet&& buf) { + return out.write(std::move(buf)).then([this] { return out.flush(); }); + } + + /// Socket can only be closed once. + seastar::future<> close() { + return seastar::when_all(in.close(), out.close()).discard_result(); + } +}; + +} // namespace ceph::net diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 1f0dd74de14..26bd1e491f3 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -43,14 +43,9 @@ namespace { } SocketConnection::SocketConnection(SocketMessenger& messenger, - const entity_addr_t& my_addr, - const entity_addr_t& peer_addr, - seastar::connected_socket&& fd) - : Connection(my_addr, peer_addr), + const entity_addr_t& my_addr) + : Connection(my_addr), messenger(messenger), - socket(std::move(fd)), - in(socket.input()), - out(socket.output()), send_ready(h.promise.get_future()) { } @@ -72,67 +67,11 @@ bool SocketConnection::is_connected() return !send_ready.failed(); } -// an input_stream consumer that reads buffer segments into a bufferlist up to -// the given number of remaining bytes -struct bufferlist_consumer { - bufferlist& bl; - size_t& remaining; - - bufferlist_consumer(bufferlist& bl, size_t& remaining) - : bl(bl), remaining(remaining) {} - - using tmp_buf = seastar::temporary_buffer; - using consumption_result_type = typename seastar::input_stream::consumption_result_type; - - // consume some or all of a buffer segment - seastar::future operator()(tmp_buf&& data) { - if (remaining >= data.size()) { - // consume the whole buffer - remaining -= data.size(); - bl.append(buffer::create_foreign(std::move(data))); - if (remaining > 0) { - // return none to request more segments - return seastar::make_ready_future( - seastar::continue_consuming{}); - } else { - // return an empty buffer to singal that we're done - return seastar::make_ready_future( - consumption_result_type::stop_consuming_type({})); - } - } - if (remaining > 0) { - // consume the front - bl.append(buffer::create_foreign(data.share(0, remaining))); - data.trim_front(remaining); - remaining = 0; - } - // give the rest back to signal that we're done - return seastar::make_ready_future( - consumption_result_type::stop_consuming_type{std::move(data)}); - }; -}; - -seastar::future SocketConnection::read(size_t bytes) -{ - if (bytes == 0) { - return seastar::make_ready_future(); - } - r.buffer.clear(); - r.remaining = bytes; - return in.consume(bufferlist_consumer{r.buffer, r.remaining}) - .then([this] { - if (r.remaining) { // throw on short reads - throw std::system_error(make_error_code(error::read_eof)); - } - return seastar::make_ready_future(std::move(r.buffer)); - }); -} - void SocketConnection::read_tags_until_next_message() { seastar::repeat([this] { // read the next tag - return in.read_exactly(1) + return socket->read_exactly(1) .then([this] (auto buf) { if (buf.empty()) { throw std::system_error(make_error_code(error::read_eof)); @@ -172,7 +111,7 @@ void SocketConnection::read_tags_until_next_message() seastar::future SocketConnection::handle_ack() { - return in.read_exactly(sizeof(ceph_le64)) + return socket->read_exactly(sizeof(ceph_le64)) .then([this] (auto buf) { auto seq = reinterpret_cast(buf.get()); discard_up_to(&sent, *seq); @@ -216,7 +155,7 @@ seastar::future SocketConnection::do_read_message() .then([this] { on_message = seastar::promise<>{}; // read header - return read(sizeof(m.header)); + return socket->read(sizeof(m.header)); }).then([this] (bufferlist bl) { // throttle the traffic, maybe auto p = bl.cbegin(); @@ -224,19 +163,19 @@ seastar::future SocketConnection::do_read_message() return maybe_throttle(); }).then([this] { // read front - return read(m.header.front_len); + return socket->read(m.header.front_len); }).then([this] (bufferlist bl) { m.front = std::move(bl); // read middle - return read(m.header.middle_len); + return socket->read(m.header.middle_len); }).then([this] (bufferlist bl) { m.middle = std::move(bl); // read data - return read(m.header.data_len); + return socket->read(m.header.data_len); }).then([this] (bufferlist bl) { m.data = std::move(bl); // read footer - return read(sizeof(m.footer)); + return socket->read(sizeof(m.footer)); }).then([this] (bufferlist bl) { // resume background processing of tags read_tags_until_next_message(); @@ -316,8 +255,7 @@ seastar::future<> SocketConnection::write_message(MessageRef msg) bl.append((const char*)&old_footer, sizeof(old_footer)); } // write as a seastar::net::packet - return out.write(std::move(bl)) - .then([this] { return out.flush(); }) + return socket->write_flush(std::move(bl)) .then([this, msg = std::move(msg)] { if (!policy.lossy) { sent.push(std::move(msg)); @@ -344,9 +282,7 @@ seastar::future<> SocketConnection::keepalive() seastar::shared_future<> f = send_ready.then([this] { k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec( ceph::coarse_real_clock::now()); - return out.write(make_static_packet(k.req)); - }).then([this] { - return out.flush(); + return socket->write_flush(make_static_packet(k.req)); }); send_ready = f.get_future(); return f.get_future(); @@ -354,24 +290,28 @@ seastar::future<> SocketConnection::keepalive() seastar::future<> SocketConnection::close() { - if (state == state_t::closed) { + if (state == state_t::closing) { // already closing assert(close_ready.valid()); return close_ready.get_future(); } - state = state_t::closed; - // unregister_conn() drops a reference, so hold another until completion auto cleanup = [conn = SocketConnectionRef(this)] {}; - messenger.unregister_conn(this); + if (state == state_t::accepting) { + messenger.unaccept_conn(this); + } else if (state >= state_t::connecting && state < state_t::closing) { + messenger.unregister_conn(this); + } else { + // cannot happen + ceph_assert(false); + } + state = state_t::closing; - // close_ready become valid only after state is state_t::closed + // close_ready become valid only after state is state_t::closing assert(!close_ready.valid()); - close_ready = seastar::when_all(in.close(), out.close()) - .discard_result() - .finally(std::move(cleanup)); + close_ready = socket->close().finally(std::move(cleanup)); return close_ready.get_future(); } @@ -487,13 +427,15 @@ uint32_t SocketConnection::get_proto_version(entity_type_t peer_type, bool conne } } -seastar::future<> SocketConnection::handle_connect() +seastar::future<> +SocketConnection::repeat_handle_connect() { - return read(sizeof(h.connect)) + return socket->read(sizeof(h.connect)) .then([this](bufferlist bl) { auto p = bl.cbegin(); ::decode(h.connect, p); - return read(h.connect.authorizer_len); + peer_type = h.connect.host_type; + return socket->read(h.connect.authorizer_len); }).then([this] (bufferlist authorizer) { if (h.connect.protocol_version != get_proto_version(h.connect.host_type, false)) { return seastar::make_ready_future( @@ -507,9 +449,9 @@ seastar::future<> SocketConnection::handle_connect() return seastar::make_ready_future( CEPH_MSGR_TAG_FEATURES, bufferlist{}); } - return messenger.verify_authorizer(get_peer_type(), - h.connect.authorizer_protocol, - authorizer); + return messenger.verify_authorizer(peer_type, + h.connect.authorizer_protocol, + authorizer); }).then([this] (ceph::net::msgr_tag_t tag, bufferlist&& authorizer_reply) { memset(&h.reply, 0, sizeof(h.reply)); if (tag) { @@ -538,11 +480,9 @@ SocketConnection::send_connect_reply(msgr_tag_t tag, policy.features_supported) | policy.features_required); h.reply.authorizer_len = authorizer_reply.length(); - return out.write(make_static_packet(h.reply)) + return socket->write(make_static_packet(h.reply)) .then([this, reply=std::move(authorizer_reply)]() mutable { - return out.write(std::move(reply)); - }).then([this] { - return out.flush(); + return socket->write_flush(std::move(reply)); }); } @@ -560,28 +500,28 @@ SocketConnection::send_connect_reply_ready(msgr_tag_t tag, h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY; } h.reply.authorizer_len = authorizer_reply.length(); - return out.write(make_static_packet(h.reply)) + return socket->write(make_static_packet(h.reply)) .then([this, reply=std::move(authorizer_reply)]() mutable { if (reply.length()) { - return out.write(std::move(reply)); + return socket->write(std::move(reply)); } else { return seastar::now(); } }).then([this] { if (h.reply.tag == CEPH_MSGR_TAG_SEQ) { - return out.write(make_static_packet(in_seq)) + return socket->write_flush(make_static_packet(in_seq)) .then([this] { - return out.flush(); - }).then([this] { - return in.read_exactly(sizeof(seq_num_t)); + return socket->read_exactly(sizeof(seq_num_t)); }).then([this] (auto buf) { auto acked_seq = reinterpret_cast(buf.get()); discard_up_to(&out_q, *acked_seq); }); } else { - return out.flush(); + return socket->flush(); } }).then([this] { + messenger.register_conn(this); + messenger.unaccept_conn(this); state = state_t::open; }); } @@ -589,20 +529,18 @@ SocketConnection::send_connect_reply_ready(msgr_tag_t tag, seastar::future<> SocketConnection::handle_keepalive2() { - return in.read_exactly(sizeof(ceph_timespec)) + return socket->read_exactly(sizeof(ceph_timespec)) .then([this] (auto buf) { k.ack.stamp = *reinterpret_cast(buf.get()); std::cout << "keepalive2 " << k.ack.stamp.tv_sec << std::endl; - return out.write(make_static_packet(k.ack)); - }).then([this] { - return out.flush(); + return socket->write_flush(make_static_packet(k.ack)); }); } seastar::future<> SocketConnection::handle_keepalive2_ack() { - return in.read_exactly(sizeof(ceph_timespec)) + return socket->read_exactly(sizeof(ceph_timespec)) .then([this] (auto buf) { auto t = reinterpret_cast(buf.get()); k.ack_stamp = *t; @@ -689,7 +627,7 @@ seastar::future<> SocketConnection::handle_connect_reply(msgr_tag_t tag) } h.got_bad_auth = true; // try harder - return messenger.get_authorizer(h.peer_type, true) + return messenger.get_authorizer(peer_type, true) .then([this](auto&& auth) { h.authorizer = std::move(auth); return seastar::now(); @@ -716,13 +654,11 @@ seastar::future<> SocketConnection::handle_connect_reply(msgr_tag_t tag) return fault(); } if (tag == CEPH_MSGR_TAG_SEQ) { - return in.read_exactly(sizeof(seq_num_t)) + return socket->read_exactly(sizeof(seq_num_t)) .then([this] (auto buf) { auto acked_seq = reinterpret_cast(buf.get()); discard_up_to(&out_q, *acked_seq); - return out.write(make_static_packet(in_seq)); - }).then([this] { - return out.flush(); + return socket->write_flush(make_static_packet(in_seq)); }).then([this] { return handle_connect_reply(CEPH_MSGR_TAG_READY); }); @@ -769,14 +705,12 @@ void SocketConnection::reset_session() } } -seastar::future<> SocketConnection::connect(entity_type_t peer_type, - entity_type_t host_type) +seastar::future<> SocketConnection::repeat_connect() { // encode ceph_msg_connect - h.peer_type = peer_type; memset(&h.connect, 0, sizeof(h.connect)); h.connect.features = policy.features_supported; - h.connect.host_type = host_type; + h.connect.host_type = messenger.get_myname().type(); h.connect.global_seq = h.global_seq; h.connect.connect_seq = h.connect_seq; h.connect.protocol_version = get_proto_version(peer_type, true); @@ -797,17 +731,15 @@ seastar::future<> SocketConnection::connect(entity_type_t peer_type, h.connect.authorizer_len = 0; bl.append(create_static(h.connect)); }; - return out.write(std::move(bl)); - }).then([this] { - return out.flush(); + return socket->write_flush(std::move(bl)); }).then([this] { // read the reply - return read(sizeof(h.reply)); + return socket->read(sizeof(h.reply)); }).then([this] (bufferlist bl) { auto p = bl.cbegin(); ::decode(h.reply, p); ceph_assert(p.end()); - return read(h.reply.authorizer_len); + return socket->read(h.reply.authorizer_len); }).then([this] (bufferlist bl) { if (h.authorizer) { auto reply = bl.cbegin(); @@ -820,12 +752,22 @@ seastar::future<> SocketConnection::connect(entity_type_t peer_type, }); } -seastar::future<> SocketConnection::client_handshake(entity_type_t peer_type, - entity_type_t host_type) -{ - // read server's handshake header - return read(server_header_size) - .then([this] (bufferlist headerbl) { +seastar::future<> +SocketConnection::start_connect(const entity_addr_t& _peer_addr, + const entity_type_t& _peer_type) +{ + ceph_assert(state == state_t::none); + ceph_assert(!socket); + peer_addr = _peer_addr; + peer_type = _peer_type; + messenger.register_conn(this); + state = state_t::connecting; + return seastar::connect(peer_addr.in4_addr()) + .then([this](seastar::connected_socket fd) { + socket.emplace(std::move(fd)); + // read server's handshake header + return socket->read(server_header_size); + }).then([this] (bufferlist headerbl) { auto p = headerbl.cbegin(); validate_banner(p); entity_addr_t saddr, caddr; @@ -844,10 +786,10 @@ seastar::future<> SocketConnection::client_handshake(entity_type_t peer_type, bl.append(buffer::create_static(banner_size, banner)); ::encode(my_addr, bl, 0); h.global_seq = messenger.get_global_seq(); - return out.write(std::move(bl)).then([this] { return out.flush(); }); + return socket->write_flush(std::move(bl)); }).then([=] { return seastar::do_until([=] { return state == state_t::open; }, - [=] { return connect(peer_type, host_type); }); + [=] { return repeat_connect(); }); }).then([this] { // start background processing of tags read_tags_until_next_message(); @@ -857,18 +799,25 @@ seastar::future<> SocketConnection::client_handshake(entity_type_t peer_type, }); } -seastar::future<> SocketConnection::server_handshake() -{ +seastar::future<> +SocketConnection::start_accept(seastar::connected_socket&& fd, + const entity_addr_t& _peer_addr) +{ + ceph_assert(state == state_t::none); + ceph_assert(!socket); + peer_addr = _peer_addr; + socket.emplace(std::move(fd)); + messenger.accept_conn(this); + state = state_t::accepting; // encode/send server's handshake header bufferlist bl; bl.append(buffer::create_static(banner_size, banner)); ::encode(my_addr, bl, 0); ::encode(peer_addr, bl, 0); - return out.write(std::move(bl)) - .then([this] { return out.flush(); }) + return socket->write_flush(std::move(bl)) .then([this] { // read client's handshake header and connect request - return read(client_header_size); + return socket->read(client_header_size); }).then([this] (bufferlist bl) { auto p = bl.cbegin(); validate_banner(p); @@ -880,7 +829,7 @@ seastar::future<> SocketConnection::server_handshake() } }).then([this] { return seastar::do_until([this] { return state == state_t::open; }, - [this] { return handle_connect(); }); + [this] { return repeat_handle_connect(); }); }).then([this] { // start background processing of tags read_tags_until_next_message(); diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index b53f7e1a962..87c8e5d1232 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -19,6 +19,7 @@ #include "msg/Policy.h" #include "Connection.h" +#include "Socket.h" #include "crimson/thread/Throttle.h" class AuthAuthorizer; @@ -32,38 +33,28 @@ using SocketConnectionRef = boost::intrusive_ptr; class SocketConnection : public Connection { SocketMessenger& messenger; - seastar::connected_socket socket; - seastar::input_stream in; - seastar::output_stream out; + std::optional socket; enum class state_t { none, + accepting, + connecting, open, standby, - closed, - wait + wait, + closing }; state_t state = state_t::none; - /// become valid only when state is state_t::closed + /// become valid only when state is state_t::closing seastar::shared_future<> close_ready; - /// buffer state for read() - struct Reader { - bufferlist buffer; - size_t remaining; - } r; - - /// read the requested number of bytes into a bufferlist - seastar::future read(size_t bytes); - /// state for handshake struct Handshake { ceph_msg_connect connect; ceph_msg_connect_reply reply; bool got_bad_auth = false; std::unique_ptr authorizer; - peer_type_t peer_type; std::chrono::milliseconds backoff; uint32_t connect_seq = 0; uint32_t peer_global_seq = 0; @@ -72,7 +63,7 @@ class SocketConnection : public Connection { } h; /// server side of handshake negotiation - seastar::future<> handle_connect(); + seastar::future<> repeat_handle_connect(); seastar::future<> handle_connect_with_existing(SocketConnectionRef existing, bufferlist&& authorizer_reply); seastar::future<> replace_existing(SocketConnectionRef existing, @@ -89,7 +80,7 @@ class SocketConnection : public Connection { bool require_auth_feature() const; uint32_t get_proto_version(entity_type_t peer_type, bool connec) const; /// client side of handshake negotiation - seastar::future<> connect(entity_type_t peer_type, entity_type_t host_type); + seastar::future<> repeat_connect(); seastar::future<> handle_connect_reply(ceph::net::msgr_tag_t tag); void reset_session(); @@ -159,15 +150,13 @@ class SocketConnection : public Connection { public: SocketConnection(SocketMessenger& messenger, - const entity_addr_t& my_addr, - const entity_addr_t& peer_addr, - seastar::connected_socket&& socket); + const entity_addr_t& my_addr); ~SocketConnection(); Messenger* get_messenger() const override; int get_peer_type() const override { - return h.connect.host_type; + return peer_type; } bool is_connected() override; @@ -180,11 +169,12 @@ class SocketConnection : public Connection { public: /// complete a handshake from the client's perspective - seastar::future<> client_handshake(entity_type_t peer_type, - entity_type_t host_type); + seastar::future<> start_connect(const entity_addr_t& peer_addr, + const entity_type_t& peer_type); /// complete a handshake from the server's perspective - seastar::future<> server_handshake(); + seastar::future<> start_accept(seastar::connected_socket&& socket, + const entity_addr_t& peer_addr); /// read a message from a connection that has completed its handshake seastar::future read_message(); diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index 1fefd4e1dd2..827267f238d 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -42,10 +42,6 @@ void SocketMessenger::bind(const entity_addr_t& addr) seastar::future<> SocketMessenger::dispatch(SocketConnectionRef conn) { - auto [i, added] = connections.emplace(conn->get_peer_addr(), conn); - std::ignore = i; - ceph_assert(added); - return seastar::keep_doing([=] { return conn->read_message() .then([=] (MessageRef msg) { @@ -80,11 +76,10 @@ seastar::future<> SocketMessenger::accept(seastar::connected_socket socket, entity_addr_t peer_addr; peer_addr.set_type(entity_addr_t::TYPE_DEFAULT); peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); - SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), - peer_addr, std::move(socket)); + SocketConnectionRef conn = new SocketConnection(*this, get_myaddr()); // initiate the handshake - return conn->server_handshake() - .then([=] { + return conn->start_accept(std::move(socket), peer_addr) + .then([this, conn] { // notify the dispatcher and allow them to reject the connection return seastar::with_gate(pending_dispatch, [=] { return dispatcher->ms_handle_accept(conn); @@ -126,35 +121,30 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp) return seastar::now(); } -seastar::future -SocketMessenger::connect(const entity_addr_t& addr, entity_type_t peer_type) +ceph::net::ConnectionRef +SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type) { - if (auto found = lookup_conn(addr); found) { - return seastar::make_ready_future(found); + if (auto found = lookup_conn(peer_addr); found) { + return found; } - return seastar::connect(addr.in4_addr()) - .then([=] (seastar::connected_socket socket) { - SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), addr, - std::move(socket)); - // complete the handshake before returning to the caller - return conn->client_handshake(peer_type, get_myname().type()) - .then([=] { - // notify the dispatcher and allow them to reject the connection - return seastar::with_gate(pending_dispatch, [=] { - return dispatcher->ms_handle_connect(conn); - }); - }).handle_exception([conn] (std::exception_ptr eptr) { - // close the connection before returning errors - return seastar::make_exception_future<>(eptr) - .finally([conn] { return conn->close(); }); - // TODO: retry on fault - }).then([=] { - // dispatch replies on this connection - dispatch(conn) - .handle_exception([] (std::exception_ptr eptr) {}); - return ConnectionRef(conn); - }); + SocketConnectionRef conn = new SocketConnection(*this, get_myaddr()); + conn->start_connect(peer_addr, peer_type) + .then([this, conn] { + // notify the dispatcher and allow them to reject the connection + return seastar::with_gate(pending_dispatch, [this, conn] { + return dispatcher->ms_handle_connect(conn); + }); + }).handle_exception([conn] (std::exception_ptr eptr) { + // close the connection before returning errors + return seastar::make_exception_future<>(eptr) + .finally([conn] { return conn->close(); }); + // TODO: retry on fault + }).then([this, conn] { + // dispatch replies on this connection + dispatch(conn) + .handle_exception([] (std::exception_ptr eptr) {}); }); + return conn; } seastar::future<> SocketMessenger::shutdown() @@ -163,11 +153,15 @@ seastar::future<> SocketMessenger::shutdown() listener->abort_accept(); } // close all connections - return seastar::parallel_for_each(connections.begin(), connections.end(), - [this] (auto conn) { - return conn.second->close(); + return seastar::parallel_for_each(accepting_conns, [] (auto conn) { + return conn->close(); + }).then([this] { + ceph_assert(accepting_conns.empty()); + return seastar::parallel_for_each(connections, [] (auto conn) { + return conn.second->close(); + }); }).finally([this] { - connections.clear(); + ceph_assert(connections.empty()); // closing connections will unblock any dispatchers that were waiting to // send(). wait for any pending calls to finish return pending_dispatch.close(); @@ -202,6 +196,23 @@ ceph::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& } } +void SocketMessenger::accept_conn(SocketConnectionRef conn) +{ + accepting_conns.insert(conn); +} + +void SocketMessenger::unaccept_conn(SocketConnectionRef conn) +{ + accepting_conns.erase(conn); +} + +void SocketMessenger::register_conn(SocketConnectionRef conn) +{ + auto [i, added] = connections.emplace(conn->get_peer_addr(), conn); + std::ignore = i; + ceph_assert(added); +} + void SocketMessenger::unregister_conn(SocketConnectionRef conn) { ceph_assert(conn); diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h index e025daa733e..d2ef0b6456d 100644 --- a/src/crimson/net/SocketMessenger.h +++ b/src/crimson/net/SocketMessenger.h @@ -16,6 +16,7 @@ #include #include +#include #include #include @@ -32,6 +33,7 @@ class SocketMessenger final : public Messenger { std::optional listener; Dispatcher *dispatcher = nullptr; std::map connections; + std::set accepting_conns; using Throttle = ceph::thread::Throttle; ceph::net::PolicySet policy_set; seastar::gate pending_dispatch; @@ -48,8 +50,8 @@ class SocketMessenger final : public Messenger { seastar::future<> start(Dispatcher *dispatcher) override; - seastar::future connect(const entity_addr_t& addr, - entity_type_t peer_type) override; + ConnectionRef connect(const entity_addr_t& peer_addr, + const entity_type_t& peer_type) override; seastar::future<> shutdown() override; @@ -68,6 +70,9 @@ class SocketMessenger final : public Messenger { void set_policy_throttler(entity_type_t peer_type, Throttle* throttle); SocketConnectionRef lookup_conn(const entity_addr_t& addr); + void accept_conn(SocketConnectionRef); + void unaccept_conn(SocketConnectionRef); + void register_conn(SocketConnectionRef); void unregister_conn(SocketConnectionRef); }; -- 2.39.5