]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: next_step_t for explicit decision of next state
authorYingxin Cheng <yingxin.cheng@intel.com>
Wed, 7 Aug 2019 13:43:59 +0000 (21:43 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Mon, 12 Aug 2019 08:34:45 +0000 (16:34 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h

index 49aa51549c4fb87b400e889610a7a89aa160376d..54fcb588a2c0206417c686977871787e4abef8e6 100644 (file)
@@ -641,17 +641,19 @@ seastar::future<> ProtocolV2::client_auth(std::vector<uint32_t> &allowed_methods
   }
 }
 
-seastar::future<bool> ProtocolV2::process_wait()
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::process_wait()
 {
   return read_frame_payload().then([this] {
     // handle_wait() logic
     logger().warn("{} GOT WaitFrame", conn);
     WaitFrame::Decode(rx_segments_data.back());
-    return false;
+    return next_step_t::wait;
   });
 }
 
-seastar::future<bool> ProtocolV2::client_connect()
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::client_connect()
 {
   // send_client_ident() logic
   if (!conn.policy.lossy && !client_cookie) {
@@ -692,8 +694,7 @@ seastar::future<bool> ProtocolV2::client_connect()
                         " (client does not support all server features)",
                         conn, ident_missing.features());
           abort_in_fault();
-          // won't be executed
-          return false;
+          return next_step_t::none;
         });
       case Tag::WAIT:
         return process_wait();
@@ -748,18 +749,18 @@ seastar::future<bool> ProtocolV2::client_connect()
             ceph_abort("unexpected exception from ms_handle_connect()");
           });
         }).then([this] {
-          return true;
+          return next_step_t::ready;
         });
       default: {
         unexpected_tag(tag, conn, "post_client_connect");
-        // won't be executed
-        return seastar::make_ready_future<bool>(false);
+        return seastar::make_ready_future<next_step_t>(next_step_t::none);
       }
     }
   });
 }
 
-seastar::future<bool> ProtocolV2::client_reconnect()
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::client_reconnect()
 {
   // send_reconnect() logic
   auto reconnect = ReconnectFrame::Encode(messenger.get_myaddrs(),
@@ -825,12 +826,11 @@ seastar::future<bool> ProtocolV2::client_reconnect()
             ceph_abort("unexpected exception from ms_handle_connect()");
           });
         }).then([this] {
-          return true;
+          return next_step_t::ready;
         });
       default: {
         unexpected_tag(tag, conn, "post_client_reconnect");
-        // won't be executed
-        return seastar::make_ready_future<bool>(false);
+        return seastar::make_ready_future<next_step_t>(next_step_t::none);
       }
     }
   });
@@ -890,15 +890,23 @@ void ProtocolV2::execute_connecting()
             ceph_assert(false);
             return client_reconnect();
           }
-        }).then([this] (bool proceed_or_wait) {
-          if (proceed_or_wait) {
+        }).then([this] (next_step_t next) {
+          switch (next) {
+           case next_step_t::ready: {
             logger().info("{} connected: gs={}, pgs={}, cs={},"
                           " client_cookie={}, server_cookie={}, in_seq={}, out_seq={}",
                           conn, global_seq, peer_global_seq, connect_seq,
                           client_cookie, server_cookie, conn.in_seq, conn.out_seq);
             execute_ready();
-          } else {
+            break;
+           }
+           case next_step_t::wait: {
             execute_wait();
+            break;
+           }
+           default: {
+            ceph_abort("impossible next step");
+           }
           }
         }).handle_exception([this] (std::exception_ptr eptr) {
           // TODO: handle fault in CONNECTING state
@@ -1005,16 +1013,18 @@ seastar::future<> ProtocolV2::server_auth()
   });
 }
 
-seastar::future<bool> ProtocolV2::send_wait()
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::send_wait()
 {
   auto wait = WaitFrame::Encode();
   logger().warn("{} WRITE WaitFrame", conn);
   return write_frame(wait).then([this] {
-    return false;
+    return next_step_t::wait;
   });
 }
 
-seastar::future<bool> ProtocolV2::handle_existing_connection(SocketConnectionRef existing)
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::handle_existing_connection(SocketConnectionRef existing)
 {
   // handle_existing_connection() logic
   logger().trace("{} {}: {}", conn, __func__, *existing);
@@ -1024,9 +1034,7 @@ seastar::future<bool> ProtocolV2::handle_existing_connection(SocketConnectionRef
 
   if (exproto->state == state_t::CLOSING) {
     logger().warn("{} existing connection {} already closed.", conn, *existing);
-    return send_server_ident().then([this] {
-      return true;
-    });
+    return send_server_ident();
   }
 
   if (exproto->state == state_t::REPLACING) {
@@ -1050,16 +1058,15 @@ seastar::future<bool> ProtocolV2::handle_existing_connection(SocketConnectionRef
                   " this connection", conn, *existing);
     exproto->dispatch_reset();
     exproto->close();
-    return send_server_ident().then([this] {
-      return true;
-    });
+    return send_server_ident();
   }
 
   // TODO: lossless policy
   ceph_assert(false);
 }
 
-seastar::future<bool> ProtocolV2::server_connect()
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::server_connect()
 {
   return read_frame_payload().then([this] {
     // handle_client_ident() logic
@@ -1117,7 +1124,7 @@ seastar::future<bool> ProtocolV2::server_connect()
       logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)",
                     conn, feat_missing);
       return write_frame(ident_missing_features).then([this] {
-        return false;
+        return next_step_t::wait;
       });
     }
     connection_features =
@@ -1144,14 +1151,12 @@ seastar::future<bool> ProtocolV2::server_connect()
     }
 
     // if everything is OK reply with server identification
-    return send_server_ident().then([this] {
-      // goto ready
-      return true;
-    });
+    return send_server_ident();
   });
 }
 
-seastar::future<bool> ProtocolV2::read_reconnect()
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::read_reconnect()
 {
   return read_main_preamble()
   .then([this] (Tag tag) {
@@ -1160,7 +1165,8 @@ seastar::future<bool> ProtocolV2::read_reconnect()
   });
 }
 
-seastar::future<bool> ProtocolV2::send_retry(uint64_t connect_seq)
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::send_retry(uint64_t connect_seq)
 {
   auto retry = RetryFrame::Encode(connect_seq);
   logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq);
@@ -1169,7 +1175,8 @@ seastar::future<bool> ProtocolV2::send_retry(uint64_t connect_seq)
   });
 }
 
-seastar::future<bool> ProtocolV2::send_retry_global(uint64_t global_seq)
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::send_retry_global(uint64_t global_seq)
 {
   auto retry = RetryGlobalFrame::Encode(global_seq);
   logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq);
@@ -1178,7 +1185,8 @@ seastar::future<bool> ProtocolV2::send_retry_global(uint64_t global_seq)
   });
 }
 
-seastar::future<bool> ProtocolV2::send_reset(bool full)
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::send_reset(bool full)
 {
   auto reset = ResetFrame::Encode(full);
   logger().warn("{} WRITE ResetFrame: full={}", conn, full);
@@ -1190,7 +1198,8 @@ seastar::future<bool> ProtocolV2::send_reset(bool full)
   });
 }
 
-seastar::future<bool> ProtocolV2::server_reconnect()
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::server_reconnect()
 {
   return read_frame_payload().then([this] {
     // handle_reconnect() logic
@@ -1350,12 +1359,12 @@ void ProtocolV2::execute_accepting()
               return server_reconnect();
             default: {
               unexpected_tag(tag, conn, "post_server_auth");
-              // won't be executed
-              return seastar::make_ready_future<bool>(false);
+              return seastar::make_ready_future<next_step_t>(next_step_t::none);
             }
           }
-        }).then([this] (bool proceed_or_wait) {
-          if (proceed_or_wait) {
+        }).then([this] (next_step_t next) {
+          switch (next) {
+           case next_step_t::ready: {
             messenger.register_conn(
               seastar::static_pointer_cast<SocketConnection>(
                 conn.shared_from_this()));
@@ -1367,8 +1376,15 @@ void ProtocolV2::execute_accepting()
                           conn, global_seq, peer_global_seq, connect_seq,
                           client_cookie, server_cookie, conn.in_seq, conn.out_seq);
             execute_ready();
-          } else {
+            break;
+           }
+           case next_step_t::wait: {
             execute_server_wait();
+            break;
+           }
+           default: {
+            ceph_abort("impossible next step");
+           }
           }
         }).handle_exception([this] (std::exception_ptr eptr) {
           // TODO: handle fault in ACCEPTING state
@@ -1414,7 +1430,8 @@ seastar::future<> ProtocolV2::finish_auth()
 
 // ACCEPTING or REPLACING state
 
-seastar::future<> ProtocolV2::send_server_ident()
+seastar::future<ProtocolV2::next_step_t>
+ProtocolV2::send_server_ident()
 {
   // send_server_ident() logic
 
@@ -1466,6 +1483,8 @@ seastar::future<> ProtocolV2::send_server_ident()
     });
 
     return write_frame(server_ident);
+  }).then([] {
+    return next_step_t::ready;
   });
 }
 
index 52ffaf5c86ccc17dc924cce3a30bf53b020db2ab..a6435e088c16a8f68419e7d916f1eebe37c28259 100644 (file)
@@ -101,6 +101,12 @@ class ProtocolV2 final : public Protocol {
   void reset_session(bool full);
   seastar::future<entity_type_t, entity_addr_t> banner_exchange();
 
+  enum class next_step_t {
+    ready,
+    wait,
+    none,       // protocol should have been aborted or failed
+  };
+
   // CONNECTING (client)
   seastar::future<> handle_auth_reply();
   inline seastar::future<> client_auth() {
@@ -109,9 +115,9 @@ class ProtocolV2 final : public Protocol {
   }
   seastar::future<> client_auth(std::vector<uint32_t> &allowed_methods);
 
-  seastar::future<bool> process_wait();
-  seastar::future<bool> client_connect();
-  seastar::future<bool> client_reconnect();
+  seastar::future<next_step_t> process_wait();
+  seastar::future<next_step_t> client_connect();
+  seastar::future<next_step_t> client_reconnect();
   void execute_connecting();
 
   // ACCEPTING (server)
@@ -119,16 +125,16 @@ class ProtocolV2 final : public Protocol {
   seastar::future<> _handle_auth_request(bufferlist& auth_payload, bool more);
   seastar::future<> server_auth();
 
-  seastar::future<bool> send_wait();
+  seastar::future<next_step_t> send_wait();
 
-  seastar::future<bool> handle_existing_connection(SocketConnectionRef existing);
-  seastar::future<bool> server_connect();
+  seastar::future<next_step_t> handle_existing_connection(SocketConnectionRef existing);
+  seastar::future<next_step_t> server_connect();
 
-  seastar::future<bool> read_reconnect();
-  seastar::future<bool> send_retry(uint64_t connect_seq);
-  seastar::future<bool> send_retry_global(uint64_t global_seq);
-  seastar::future<bool> send_reset(bool full);
-  seastar::future<bool> server_reconnect();
+  seastar::future<next_step_t> read_reconnect();
+  seastar::future<next_step_t> send_retry(uint64_t connect_seq);
+  seastar::future<next_step_t> send_retry_global(uint64_t global_seq);
+  seastar::future<next_step_t> send_reset(bool full);
+  seastar::future<next_step_t> server_reconnect();
 
   void execute_accepting();
 
@@ -136,7 +142,7 @@ class ProtocolV2 final : public Protocol {
   seastar::future<> finish_auth();
 
   // ACCEPTING/REPLACING (server)
-  seastar::future<> send_server_ident();
+  seastar::future<next_step_t> send_server_ident();
 
   // REPLACING (server)
   seastar::future<> send_reconnect_ok();