]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: extract state transition out of repeat_connect()
authorKefu Chai <kchai@redhat.com>
Mon, 19 Nov 2018 08:09:53 +0000 (16:09 +0800)
committerYingxin <yingxin.cheng@intel.com>
Tue, 20 Nov 2018 14:00:08 +0000 (22:00 +0800)
and extract state transition out of repeat_handle_connect()

in this change, the connect/handle-connect loop is restructured, to
avoid ad-hoc state changes in helper functions. this pave the road to
explicit state transtion using named states.

also, exception is thrown instead in handle_connect_reply(), we should
not proceed in case of failures. and we need do error handling in the
named state in future.

currentl, `state` is set to `state_t::open` in `start_connect()` and
`start_accept()`, the next step is to set it in a named state.

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h

index b98fb6b7545db372753588f15e67b73d65948e64..5ebb9232be3ee20827d0a9a0dd1b5bfe97c2e973 100644 (file)
@@ -431,7 +431,7 @@ uint32_t SocketConnection::get_proto_version(entity_type_t peer_type, bool conne
   }
 }
 
-seastar::future<>
+seastar::future<seastar::stop_iteration>
 SocketConnection::repeat_handle_connect()
 {
   return socket->read(sizeof(h.connect))
@@ -475,7 +475,7 @@ SocketConnection::repeat_handle_connect()
     });
 }
 
-seastar::future<>
+seastar::future<seastar::stop_iteration>
 SocketConnection::send_connect_reply(msgr_tag_t tag,
                                      bufferlist&& authorizer_reply)
 {
@@ -487,10 +487,12 @@ SocketConnection::send_connect_reply(msgr_tag_t tag,
   return socket->write(make_static_packet(h.reply))
     .then([this, reply=std::move(authorizer_reply)]() mutable {
       return socket->write_flush(std::move(reply));
+    }).then([] {
+      return stop_t::no;
     });
 }
 
-seastar::future<>
+seastar::future<seastar::stop_iteration>
 SocketConnection::send_connect_reply_ready(msgr_tag_t tag,
                                            bufferlist&& authorizer_reply)
 {
@@ -526,7 +528,7 @@ SocketConnection::send_connect_reply_ready(msgr_tag_t tag,
     }).then([this] {
       messenger.register_conn(this);
       messenger.unaccept_conn(this);
-      state = state_t::open;
+      return stop_t::yes;
     });
 }
 
@@ -552,7 +554,7 @@ SocketConnection::handle_keepalive2_ack()
     });
 }
 
-seastar::future<>
+seastar::future<seastar::stop_iteration>
 SocketConnection::handle_connect_with_existing(SocketConnectionRef existing, bufferlist&& authorizer_reply)
 {
   if (h.connect.global_seq < existing->peer_global_seq()) {
@@ -594,9 +596,10 @@ SocketConnection::handle_connect_with_existing(SocketConnectionRef existing, buf
   }
 }
 
-seastar::future<> SocketConnection::replace_existing(SocketConnectionRef existing,
-                                                     bufferlist&& authorizer_reply,
-                                                    bool is_reset_from_peer)
+seastar::future<seastar::stop_iteration>
+SocketConnection::replace_existing(SocketConnectionRef existing,
+                                   bufferlist&& authorizer_reply,
+                                   bool is_reset_from_peer)
 {
   msgr_tag_t reply_tag;
   if (HAVE_FEATURE(h.connect.features, RECONNECT_SEQ) &&
@@ -617,13 +620,16 @@ seastar::future<> SocketConnection::replace_existing(SocketConnectionRef existin
   return send_connect_reply_ready(reply_tag, std::move(authorizer_reply));
 }
 
-seastar::future<> SocketConnection::handle_connect_reply(msgr_tag_t tag)
+seastar::future<seastar::stop_iteration>
+SocketConnection::handle_connect_reply(msgr_tag_t tag)
 {
   switch (tag) {
   case CEPH_MSGR_TAG_FEATURES:
-    return fault();
+    logger().error("{} connect protocol feature mispatch", __func__);
+    throw std::system_error(make_error_code(error::negotiation_failure));
   case CEPH_MSGR_TAG_BADPROTOVER:
-    return fault();
+    logger().error("{} connect protocol version mispatch", __func__);
+    throw std::system_error(make_error_code(error::negotiation_failure));
   case CEPH_MSGR_TAG_BADAUTHORIZER:
     if (h.got_bad_auth) {
       logger().error("{} got bad authorizer", __func__);
@@ -634,20 +640,21 @@ seastar::future<> SocketConnection::handle_connect_reply(msgr_tag_t tag)
     return messenger.get_authorizer(peer_type, true)
       .then([this](auto&& auth) {
         h.authorizer = std::move(auth);
-       return seastar::now();
+        return stop_t::no;
       });
   case CEPH_MSGR_TAG_RESETSESSION:
     reset_session();
-    return seastar::now();
+    return seastar::make_ready_future<stop_t>(stop_t::no);
   case CEPH_MSGR_TAG_RETRY_GLOBAL:
     h.global_seq = messenger.get_global_seq(h.reply.global_seq);
-    return seastar::now();
+    return seastar::make_ready_future<stop_t>(stop_t::no);
   case CEPH_MSGR_TAG_RETRY_SESSION:
     ceph_assert(h.reply.connect_seq > h.connect_seq);
     h.connect_seq = h.reply.connect_seq;
-    return seastar::now();
+    return seastar::make_ready_future<stop_t>(stop_t::no);
   case CEPH_MSGR_TAG_WAIT:
-    return fault();
+    // TODO: state wait
+    throw std::system_error(make_error_code(error::negotiation_failure));
   case CEPH_MSGR_TAG_SEQ:
     break;
   case CEPH_MSGR_TAG_READY:
@@ -655,7 +662,8 @@ seastar::future<> SocketConnection::handle_connect_reply(msgr_tag_t tag)
   }
   if (auto missing = (policy.features_required & ~(uint64_t)h.reply.features);
       missing) {
-    return fault();
+    logger().error("{} missing required features", __func__);
+    throw std::system_error(make_error_code(error::negotiation_failure));
   }
   if (tag == CEPH_MSGR_TAG_SEQ) {
     return socket->read_exactly(sizeof(seq_num_t))
@@ -683,7 +691,7 @@ seastar::future<> SocketConnection::handle_connect_reply(msgr_tag_t tag)
                                    features));
     }
     h.authorizer.reset();
-    return seastar::now();
+    return seastar::make_ready_future<stop_t>(stop_t::yes);
   } else {
     // unknown tag
     logger().error("{} got unknown tag", __func__, int(tag));
@@ -709,7 +717,8 @@ void SocketConnection::reset_session()
   }
 }
 
-seastar::future<> SocketConnection::repeat_connect()
+seastar::future<seastar::stop_iteration>
+SocketConnection::repeat_connect()
 {
   // encode ceph_msg_connect
   memset(&h.connect, 0, sizeof(h.connect));
@@ -797,9 +806,11 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr,
       h.global_seq = messenger.get_global_seq();
       return socket->write_flush(std::move(bl));
     }).then([=] {
-      return seastar::do_until([=] { return state == state_t::open; },
-                               [=] { return repeat_connect(); });
+      return seastar::repeat([this] {
+        return repeat_connect();
+      });
     }).then([this] {
+      state = state_t::open;
       // start background processing of tags
       read_tags_until_next_message();
     }).then_wrapped([this] (auto fut) {
@@ -837,9 +848,11 @@ SocketConnection::start_accept(seastar::connected_socket&& fd,
         peer_addr = addr;
       }
     }).then([this] {
-      return seastar::do_until([this] { return state == state_t::open; },
-                               [this] { return repeat_handle_connect(); });
+      return seastar::repeat([this] {
+        return repeat_handle_connect();
+      });
     }).then([this] {
+      state = state_t::open;
       // start background processing of tags
       read_tags_until_next_message();
     }).then_wrapped([this] (auto fut) {
index c9d3e3c0e9a23ca1ab741c4cea7bfd82891204d3..3054744450d687d1b61df1adfb53fa787a488e3e 100644 (file)
@@ -65,16 +65,16 @@ class SocketConnection : public Connection {
   } h;
 
   /// server side of handshake negotiation
-  seastar::future<> repeat_handle_connect();
-  seastar::future<> handle_connect_with_existing(SocketConnectionRef existing,
-                                                bufferlist&& authorizer_reply);
-  seastar::future<> replace_existing(SocketConnectionRef existing,
-                                    bufferlist&& authorizer_reply,
-                                    bool is_reset_from_peer = false);
-  seastar::future<> send_connect_reply(ceph::net::msgr_tag_t tag,
-                                      bufferlist&& authorizer_reply = {});
-  seastar::future<> send_connect_reply_ready(ceph::net::msgr_tag_t tag,
-                                            bufferlist&& authorizer_reply);
+  seastar::future<stop_t> repeat_handle_connect();
+  seastar::future<stop_t> handle_connect_with_existing(SocketConnectionRef existing,
+                                                        bufferlist&& authorizer_reply);
+  seastar::future<stop_t> replace_existing(SocketConnectionRef existing,
+                                            bufferlist&& authorizer_reply,
+                                            bool is_reset_from_peer = false);
+  seastar::future<stop_t> send_connect_reply(ceph::net::msgr_tag_t tag,
+                                              bufferlist&& authorizer_reply = {});
+  seastar::future<stop_t> send_connect_reply_ready(ceph::net::msgr_tag_t tag,
+                                                    bufferlist&& authorizer_reply);
 
   seastar::future<> handle_keepalive2();
   seastar::future<> handle_keepalive2_ack();
@@ -82,8 +82,8 @@ class SocketConnection : public Connection {
   bool require_auth_feature() const;
   uint32_t get_proto_version(entity_type_t peer_type, bool connec) const;
   /// client side of handshake negotiation
-  seastar::future<> repeat_connect();
-  seastar::future<> handle_connect_reply(ceph::net::msgr_tag_t tag);
+  seastar::future<stop_t> repeat_connect();
+  seastar::future<stop_t> handle_connect_reply(ceph::net::msgr_tag_t tag);
   void reset_session();
 
   /// state for an incoming message