From: Yingxin Cheng Date: Wed, 7 Aug 2019 13:43:59 +0000 (+0800) Subject: crimson/net: next_step_t for explicit decision of next state X-Git-Tag: v15.1.0~1910^2~18 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=014a662b208ce761a17764969d90fb36b7897b08;p=ceph.git crimson/net: next_step_t for explicit decision of next state Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index 49aa51549c4f..54fcb588a2c0 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -641,17 +641,19 @@ seastar::future<> ProtocolV2::client_auth(std::vector &allowed_methods } } -seastar::future ProtocolV2::process_wait() +seastar::future +ProtocolV2::process_wait() { return read_frame_payload().then([this] { // handle_wait() logic logger().warn("{} GOT WaitFrame", conn); WaitFrame::Decode(rx_segments_data.back()); - return false; + return next_step_t::wait; }); } -seastar::future ProtocolV2::client_connect() +seastar::future +ProtocolV2::client_connect() { // send_client_ident() logic if (!conn.policy.lossy && !client_cookie) { @@ -692,8 +694,7 @@ seastar::future ProtocolV2::client_connect() " (client does not support all server features)", conn, ident_missing.features()); abort_in_fault(); - // won't be executed - return false; + return next_step_t::none; }); case Tag::WAIT: return process_wait(); @@ -748,18 +749,18 @@ seastar::future ProtocolV2::client_connect() ceph_abort("unexpected exception from ms_handle_connect()"); }); }).then([this] { - return true; + return next_step_t::ready; }); default: { unexpected_tag(tag, conn, "post_client_connect"); - // won't be executed - return seastar::make_ready_future(false); + return seastar::make_ready_future(next_step_t::none); } } }); } -seastar::future ProtocolV2::client_reconnect() +seastar::future +ProtocolV2::client_reconnect() { // send_reconnect() logic auto reconnect = ReconnectFrame::Encode(messenger.get_myaddrs(), @@ -825,12 +826,11 @@ seastar::future ProtocolV2::client_reconnect() ceph_abort("unexpected exception from ms_handle_connect()"); }); }).then([this] { - return true; + return next_step_t::ready; }); default: { unexpected_tag(tag, conn, "post_client_reconnect"); - // won't be executed - return seastar::make_ready_future(false); + return seastar::make_ready_future(next_step_t::none); } } }); @@ -890,15 +890,23 @@ void ProtocolV2::execute_connecting() ceph_assert(false); return client_reconnect(); } - }).then([this] (bool proceed_or_wait) { - if (proceed_or_wait) { + }).then([this] (next_step_t next) { + switch (next) { + case next_step_t::ready: { logger().info("{} connected: gs={}, pgs={}, cs={}," " client_cookie={}, server_cookie={}, in_seq={}, out_seq={}", conn, global_seq, peer_global_seq, connect_seq, client_cookie, server_cookie, conn.in_seq, conn.out_seq); execute_ready(); - } else { + break; + } + case next_step_t::wait: { execute_wait(); + break; + } + default: { + ceph_abort("impossible next step"); + } } }).handle_exception([this] (std::exception_ptr eptr) { // TODO: handle fault in CONNECTING state @@ -1005,16 +1013,18 @@ seastar::future<> ProtocolV2::server_auth() }); } -seastar::future ProtocolV2::send_wait() +seastar::future +ProtocolV2::send_wait() { auto wait = WaitFrame::Encode(); logger().warn("{} WRITE WaitFrame", conn); return write_frame(wait).then([this] { - return false; + return next_step_t::wait; }); } -seastar::future ProtocolV2::handle_existing_connection(SocketConnectionRef existing) +seastar::future +ProtocolV2::handle_existing_connection(SocketConnectionRef existing) { // handle_existing_connection() logic logger().trace("{} {}: {}", conn, __func__, *existing); @@ -1024,9 +1034,7 @@ seastar::future ProtocolV2::handle_existing_connection(SocketConnectionRef if (exproto->state == state_t::CLOSING) { logger().warn("{} existing connection {} already closed.", conn, *existing); - return send_server_ident().then([this] { - return true; - }); + return send_server_ident(); } if (exproto->state == state_t::REPLACING) { @@ -1050,16 +1058,15 @@ seastar::future ProtocolV2::handle_existing_connection(SocketConnectionRef " this connection", conn, *existing); exproto->dispatch_reset(); exproto->close(); - return send_server_ident().then([this] { - return true; - }); + return send_server_ident(); } // TODO: lossless policy ceph_assert(false); } -seastar::future ProtocolV2::server_connect() +seastar::future +ProtocolV2::server_connect() { return read_frame_payload().then([this] { // handle_client_ident() logic @@ -1117,7 +1124,7 @@ seastar::future ProtocolV2::server_connect() logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)", conn, feat_missing); return write_frame(ident_missing_features).then([this] { - return false; + return next_step_t::wait; }); } connection_features = @@ -1144,14 +1151,12 @@ seastar::future ProtocolV2::server_connect() } // if everything is OK reply with server identification - return send_server_ident().then([this] { - // goto ready - return true; - }); + return send_server_ident(); }); } -seastar::future ProtocolV2::read_reconnect() +seastar::future +ProtocolV2::read_reconnect() { return read_main_preamble() .then([this] (Tag tag) { @@ -1160,7 +1165,8 @@ seastar::future ProtocolV2::read_reconnect() }); } -seastar::future ProtocolV2::send_retry(uint64_t connect_seq) +seastar::future +ProtocolV2::send_retry(uint64_t connect_seq) { auto retry = RetryFrame::Encode(connect_seq); logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq); @@ -1169,7 +1175,8 @@ seastar::future ProtocolV2::send_retry(uint64_t connect_seq) }); } -seastar::future ProtocolV2::send_retry_global(uint64_t global_seq) +seastar::future +ProtocolV2::send_retry_global(uint64_t global_seq) { auto retry = RetryGlobalFrame::Encode(global_seq); logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq); @@ -1178,7 +1185,8 @@ seastar::future ProtocolV2::send_retry_global(uint64_t global_seq) }); } -seastar::future ProtocolV2::send_reset(bool full) +seastar::future +ProtocolV2::send_reset(bool full) { auto reset = ResetFrame::Encode(full); logger().warn("{} WRITE ResetFrame: full={}", conn, full); @@ -1190,7 +1198,8 @@ seastar::future ProtocolV2::send_reset(bool full) }); } -seastar::future ProtocolV2::server_reconnect() +seastar::future +ProtocolV2::server_reconnect() { return read_frame_payload().then([this] { // handle_reconnect() logic @@ -1350,12 +1359,12 @@ void ProtocolV2::execute_accepting() return server_reconnect(); default: { unexpected_tag(tag, conn, "post_server_auth"); - // won't be executed - return seastar::make_ready_future(false); + return seastar::make_ready_future(next_step_t::none); } } - }).then([this] (bool proceed_or_wait) { - if (proceed_or_wait) { + }).then([this] (next_step_t next) { + switch (next) { + case next_step_t::ready: { messenger.register_conn( seastar::static_pointer_cast( conn.shared_from_this())); @@ -1367,8 +1376,15 @@ void ProtocolV2::execute_accepting() conn, global_seq, peer_global_seq, connect_seq, client_cookie, server_cookie, conn.in_seq, conn.out_seq); execute_ready(); - } else { + break; + } + case next_step_t::wait: { execute_server_wait(); + break; + } + default: { + ceph_abort("impossible next step"); + } } }).handle_exception([this] (std::exception_ptr eptr) { // TODO: handle fault in ACCEPTING state @@ -1414,7 +1430,8 @@ seastar::future<> ProtocolV2::finish_auth() // ACCEPTING or REPLACING state -seastar::future<> ProtocolV2::send_server_ident() +seastar::future +ProtocolV2::send_server_ident() { // send_server_ident() logic @@ -1466,6 +1483,8 @@ seastar::future<> ProtocolV2::send_server_ident() }); return write_frame(server_ident); + }).then([] { + return next_step_t::ready; }); } diff --git a/src/crimson/net/ProtocolV2.h b/src/crimson/net/ProtocolV2.h index 52ffaf5c86cc..a6435e088c16 100644 --- a/src/crimson/net/ProtocolV2.h +++ b/src/crimson/net/ProtocolV2.h @@ -101,6 +101,12 @@ class ProtocolV2 final : public Protocol { void reset_session(bool full); seastar::future banner_exchange(); + enum class next_step_t { + ready, + wait, + none, // protocol should have been aborted or failed + }; + // CONNECTING (client) seastar::future<> handle_auth_reply(); inline seastar::future<> client_auth() { @@ -109,9 +115,9 @@ class ProtocolV2 final : public Protocol { } seastar::future<> client_auth(std::vector &allowed_methods); - seastar::future process_wait(); - seastar::future client_connect(); - seastar::future client_reconnect(); + seastar::future process_wait(); + seastar::future client_connect(); + seastar::future client_reconnect(); void execute_connecting(); // ACCEPTING (server) @@ -119,16 +125,16 @@ class ProtocolV2 final : public Protocol { seastar::future<> _handle_auth_request(bufferlist& auth_payload, bool more); seastar::future<> server_auth(); - seastar::future send_wait(); + seastar::future send_wait(); - seastar::future handle_existing_connection(SocketConnectionRef existing); - seastar::future server_connect(); + seastar::future handle_existing_connection(SocketConnectionRef existing); + seastar::future server_connect(); - seastar::future read_reconnect(); - seastar::future send_retry(uint64_t connect_seq); - seastar::future send_retry_global(uint64_t global_seq); - seastar::future send_reset(bool full); - seastar::future server_reconnect(); + seastar::future read_reconnect(); + seastar::future send_retry(uint64_t connect_seq); + seastar::future send_retry_global(uint64_t global_seq); + seastar::future send_reset(bool full); + seastar::future server_reconnect(); void execute_accepting(); @@ -136,7 +142,7 @@ class ProtocolV2 final : public Protocol { seastar::future<> finish_auth(); // ACCEPTING/REPLACING (server) - seastar::future<> send_server_ident(); + seastar::future send_server_ident(); // REPLACING (server) seastar::future<> send_reconnect_ok();