From e411f89ca6a9ab004396642c74875fb432ad1792 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Mon, 19 Nov 2018 16:09:53 +0800 Subject: [PATCH] crimson/net: extract state transition out of repeat_connect() and extract state transition out of repeat_handle_connect() in this change, the connect/handle-connect loop is restructured, to avoid ad-hoc state changes in helper functions. this pave the road to explicit state transtion using named states. also, exception is thrown instead in handle_connect_reply(), we should not proceed in case of failures. and we need do error handling in the named state in future. currentl, `state` is set to `state_t::open` in `start_connect()` and `start_accept()`, the next step is to set it in a named state. Signed-off-by: Kefu Chai --- src/crimson/net/SocketConnection.cc | 59 ++++++++++++++++++----------- src/crimson/net/SocketConnection.h | 24 ++++++------ 2 files changed, 48 insertions(+), 35 deletions(-) diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index b98fb6b7545..5ebb9232be3 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -431,7 +431,7 @@ uint32_t SocketConnection::get_proto_version(entity_type_t peer_type, bool conne } } -seastar::future<> +seastar::future SocketConnection::repeat_handle_connect() { return socket->read(sizeof(h.connect)) @@ -475,7 +475,7 @@ SocketConnection::repeat_handle_connect() }); } -seastar::future<> +seastar::future SocketConnection::send_connect_reply(msgr_tag_t tag, bufferlist&& authorizer_reply) { @@ -487,10 +487,12 @@ SocketConnection::send_connect_reply(msgr_tag_t tag, return socket->write(make_static_packet(h.reply)) .then([this, reply=std::move(authorizer_reply)]() mutable { return socket->write_flush(std::move(reply)); + }).then([] { + return stop_t::no; }); } -seastar::future<> +seastar::future SocketConnection::send_connect_reply_ready(msgr_tag_t tag, bufferlist&& authorizer_reply) { @@ -526,7 +528,7 @@ SocketConnection::send_connect_reply_ready(msgr_tag_t tag, }).then([this] { messenger.register_conn(this); messenger.unaccept_conn(this); - state = state_t::open; + return stop_t::yes; }); } @@ -552,7 +554,7 @@ SocketConnection::handle_keepalive2_ack() }); } -seastar::future<> +seastar::future SocketConnection::handle_connect_with_existing(SocketConnectionRef existing, bufferlist&& authorizer_reply) { if (h.connect.global_seq < existing->peer_global_seq()) { @@ -594,9 +596,10 @@ SocketConnection::handle_connect_with_existing(SocketConnectionRef existing, buf } } -seastar::future<> SocketConnection::replace_existing(SocketConnectionRef existing, - bufferlist&& authorizer_reply, - bool is_reset_from_peer) +seastar::future +SocketConnection::replace_existing(SocketConnectionRef existing, + bufferlist&& authorizer_reply, + bool is_reset_from_peer) { msgr_tag_t reply_tag; if (HAVE_FEATURE(h.connect.features, RECONNECT_SEQ) && @@ -617,13 +620,16 @@ seastar::future<> SocketConnection::replace_existing(SocketConnectionRef existin return send_connect_reply_ready(reply_tag, std::move(authorizer_reply)); } -seastar::future<> SocketConnection::handle_connect_reply(msgr_tag_t tag) +seastar::future +SocketConnection::handle_connect_reply(msgr_tag_t tag) { switch (tag) { case CEPH_MSGR_TAG_FEATURES: - return fault(); + logger().error("{} connect protocol feature mispatch", __func__); + throw std::system_error(make_error_code(error::negotiation_failure)); case CEPH_MSGR_TAG_BADPROTOVER: - return fault(); + logger().error("{} connect protocol version mispatch", __func__); + throw std::system_error(make_error_code(error::negotiation_failure)); case CEPH_MSGR_TAG_BADAUTHORIZER: if (h.got_bad_auth) { logger().error("{} got bad authorizer", __func__); @@ -634,20 +640,21 @@ seastar::future<> SocketConnection::handle_connect_reply(msgr_tag_t tag) return messenger.get_authorizer(peer_type, true) .then([this](auto&& auth) { h.authorizer = std::move(auth); - return seastar::now(); + return stop_t::no; }); case CEPH_MSGR_TAG_RESETSESSION: reset_session(); - return seastar::now(); + return seastar::make_ready_future(stop_t::no); case CEPH_MSGR_TAG_RETRY_GLOBAL: h.global_seq = messenger.get_global_seq(h.reply.global_seq); - return seastar::now(); + return seastar::make_ready_future(stop_t::no); case CEPH_MSGR_TAG_RETRY_SESSION: ceph_assert(h.reply.connect_seq > h.connect_seq); h.connect_seq = h.reply.connect_seq; - return seastar::now(); + return seastar::make_ready_future(stop_t::no); case CEPH_MSGR_TAG_WAIT: - return fault(); + // TODO: state wait + throw std::system_error(make_error_code(error::negotiation_failure)); case CEPH_MSGR_TAG_SEQ: break; case CEPH_MSGR_TAG_READY: @@ -655,7 +662,8 @@ seastar::future<> SocketConnection::handle_connect_reply(msgr_tag_t tag) } if (auto missing = (policy.features_required & ~(uint64_t)h.reply.features); missing) { - return fault(); + logger().error("{} missing required features", __func__); + throw std::system_error(make_error_code(error::negotiation_failure)); } if (tag == CEPH_MSGR_TAG_SEQ) { return socket->read_exactly(sizeof(seq_num_t)) @@ -683,7 +691,7 @@ seastar::future<> SocketConnection::handle_connect_reply(msgr_tag_t tag) features)); } h.authorizer.reset(); - return seastar::now(); + return seastar::make_ready_future(stop_t::yes); } else { // unknown tag logger().error("{} got unknown tag", __func__, int(tag)); @@ -709,7 +717,8 @@ void SocketConnection::reset_session() } } -seastar::future<> SocketConnection::repeat_connect() +seastar::future +SocketConnection::repeat_connect() { // encode ceph_msg_connect memset(&h.connect, 0, sizeof(h.connect)); @@ -797,9 +806,11 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr, h.global_seq = messenger.get_global_seq(); return socket->write_flush(std::move(bl)); }).then([=] { - return seastar::do_until([=] { return state == state_t::open; }, - [=] { return repeat_connect(); }); + return seastar::repeat([this] { + return repeat_connect(); + }); }).then([this] { + state = state_t::open; // start background processing of tags read_tags_until_next_message(); }).then_wrapped([this] (auto fut) { @@ -837,9 +848,11 @@ SocketConnection::start_accept(seastar::connected_socket&& fd, peer_addr = addr; } }).then([this] { - return seastar::do_until([this] { return state == state_t::open; }, - [this] { return repeat_handle_connect(); }); + return seastar::repeat([this] { + return repeat_handle_connect(); + }); }).then([this] { + state = state_t::open; // start background processing of tags read_tags_until_next_message(); }).then_wrapped([this] (auto fut) { diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index c9d3e3c0e9a..3054744450d 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -65,16 +65,16 @@ class SocketConnection : public Connection { } h; /// server side of handshake negotiation - seastar::future<> repeat_handle_connect(); - seastar::future<> handle_connect_with_existing(SocketConnectionRef existing, - bufferlist&& authorizer_reply); - seastar::future<> replace_existing(SocketConnectionRef existing, - bufferlist&& authorizer_reply, - bool is_reset_from_peer = false); - seastar::future<> send_connect_reply(ceph::net::msgr_tag_t tag, - bufferlist&& authorizer_reply = {}); - seastar::future<> send_connect_reply_ready(ceph::net::msgr_tag_t tag, - bufferlist&& authorizer_reply); + seastar::future repeat_handle_connect(); + seastar::future handle_connect_with_existing(SocketConnectionRef existing, + bufferlist&& authorizer_reply); + seastar::future replace_existing(SocketConnectionRef existing, + bufferlist&& authorizer_reply, + bool is_reset_from_peer = false); + seastar::future send_connect_reply(ceph::net::msgr_tag_t tag, + bufferlist&& authorizer_reply = {}); + seastar::future send_connect_reply_ready(ceph::net::msgr_tag_t tag, + bufferlist&& authorizer_reply); seastar::future<> handle_keepalive2(); seastar::future<> handle_keepalive2_ack(); @@ -82,8 +82,8 @@ class SocketConnection : public Connection { bool require_auth_feature() const; uint32_t get_proto_version(entity_type_t peer_type, bool connec) const; /// client side of handshake negotiation - seastar::future<> repeat_connect(); - seastar::future<> handle_connect_reply(ceph::net::msgr_tag_t tag); + seastar::future repeat_connect(); + seastar::future handle_connect_reply(ceph::net::msgr_tag_t tag); void reset_session(); /// state for an incoming message -- 2.39.5