]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: simplify logics and centralize fault handling in execute_open() 25716/head
authorYingxin <yingxin.cheng@intel.com>
Fri, 28 Dec 2018 02:59:45 +0000 (10:59 +0800)
committerYingxin Cheng <yingxincheng@gmail.com>
Fri, 4 Jan 2019 06:38:37 +0000 (14:38 +0800)
Signed-off-by: Yingxin <yingxin.cheng@intel.com>
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h

index 23765345369141cef21ef870115ffbcca146c798..0d3d1181fa0950dfdf870a9079e9c186d1bed5a3 100644 (file)
@@ -92,50 +92,40 @@ seastar::future<> SocketConnection::keepalive()
     });
 }
 
-void SocketConnection::read_tags_until_next_message()
+seastar::future<> SocketConnection::handle_tags()
 {
-  seastar::repeat([this] {
+  return seastar::keep_doing([this] {
       // read the next tag
       return socket->read_exactly(1)
         .then([this] (auto buf) {
           switch (buf[0]) {
           case CEPH_MSGR_TAG_MSG:
-            // stop looping and notify read_header()
-            return seastar::make_ready_future<stop_t>(stop_t::yes);
+            return read_message();
           case CEPH_MSGR_TAG_ACK:
             return handle_ack();
           case CEPH_MSGR_TAG_KEEPALIVE:
-            break;
+            return seastar::now();
           case CEPH_MSGR_TAG_KEEPALIVE2:
-            return handle_keepalive2()
-              .then([this] { return stop_t::no; });
+            return handle_keepalive2();
           case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
-            return handle_keepalive2_ack()
-              .then([this] { return stop_t::no; });
+            return handle_keepalive2_ack();
           case CEPH_MSGR_TAG_CLOSE:
             logger().info("{} got tag close", *this);
-            break;
+            throw std::system_error(make_error_code(error::connection_aborted));
+          default:
+            logger().error("{} got unknown msgr tag {}", *this, static_cast<int>(buf[0]));
+            throw std::system_error(make_error_code(error::read_eof));
           }
-          return seastar::make_ready_future<stop_t>(stop_t::no);
         });
-    }).handle_exception_type([this] (const std::system_error& e) {
-      if (e.code() == error::read_eof) {
-        close();
-      }
-      throw e;
-    }).then_wrapped([this] (auto fut) {
-      // satisfy the message promise
-      fut.forward_to(std::move(on_message));
     });
 }
 
-seastar::future<seastar::stop_iteration> SocketConnection::handle_ack()
+seastar::future<> SocketConnection::handle_ack()
 {
   return socket->read_exactly(sizeof(ceph_le64))
     .then([this] (auto buf) {
       auto seq = reinterpret_cast<const ceph_le64*>(buf.get());
       discard_up_to(&sent, *seq);
-      return stop_t::no;
     });
 }
 
@@ -169,14 +159,10 @@ seastar::future<> SocketConnection::maybe_throttle()
   return policy.throttler_bytes->get(to_read);
 }
 
-seastar::future<MessageRef> SocketConnection::do_read_message()
+seastar::future<> SocketConnection::read_message()
 {
-  return on_message.get_future()
-    .then([this] {
-      on_message = seastar::promise<>{};
-      // read header
-      return socket->read(sizeof(m.header));
-    }).then([this] (bufferlist bl) {
+  return socket->read(sizeof(m.header))
+    .then([this] (bufferlist bl) {
       // throttle the traffic, maybe
       auto p = bl.cbegin();
       ::decode(m.header, p);
@@ -197,30 +183,27 @@ seastar::future<MessageRef> SocketConnection::do_read_message()
       // read footer
       return socket->read(sizeof(m.footer));
     }).then([this] (bufferlist bl) {
-      // resume background processing of tags
-      read_tags_until_next_message();
-
       auto p = bl.cbegin();
       ::decode(m.footer, p);
       auto msg = ::decode_message(nullptr, 0, m.header, m.footer,
                                   m.front, m.middle, m.data, nullptr);
       // TODO: set time stamps
       msg->set_byte_throttler(policy.throttler_bytes);
-      constexpr bool add_ref = false; // Message starts with 1 ref
-      return MessageRef{msg, add_ref};
-    });
-}
 
-seastar::future<MessageRef> SocketConnection::read_message()
-{
-  return seastar::repeat_until_value([this] {
-      return do_read_message()
-        .then([this] (MessageRef msg) -> std::optional<MessageRef> {
-          if (!update_rx_seq(msg->get_seq())) {
-            // skip this request and read the next
-            return {};
-          }
-          return msg;
+      if (!update_rx_seq(msg->get_seq())) {
+        // skip this message
+        return;
+      }
+
+      constexpr bool add_ref = false; // Message starts with 1 ref
+      auto msg_ref = MessageRef{msg, add_ref};
+      // start dispatch, ignoring exceptions from the application layer
+      seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] {
+          return dispatcher.ms_dispatch(this, std::move(msg))
+            .handle_exception([this] (std::exception_ptr eptr) {
+              logger().error("{} ms_dispatch caught exception: {}", *this, eptr);
+              ceph_assert(false);
+            });
         });
     });
 }
@@ -914,30 +897,27 @@ SocketConnection::execute_open()
   h.promise.set_value();
   seastar::with_gate(pending_dispatch, [this] {
       // start background processing of tags
-      read_tags_until_next_message();
-      return seastar::keep_doing([this] {
-          return read_message()
-            .then([this] (MessageRef msg) {
-              // start dispatch, ignoring exceptions from the application layer
-              seastar::with_gate(pending_dispatch, [this, msg = std::move(msg)] {
-                  return dispatcher.ms_dispatch(this, std::move(msg))
-                    .handle_exception([] (std::exception_ptr eptr) {});
-                });
-              // return immediately to start on the next message
-              return seastar::now();
-            });
-        }).handle_exception_type([this] (const std::system_error& e) {
+      return handle_tags()
+        .handle_exception_type([this] (const std::system_error& e) {
+          logger().warn("{} open fault: {}", *this, e);
           if (e.code() == error::connection_aborted ||
               e.code() == error::connection_reset) {
-            return dispatcher.ms_handle_reset(this);
+            return dispatcher.ms_handle_reset(this)
+              .then([this] {
+                close();
+              });
           } else if (e.code() == error::read_eof) {
-            return dispatcher.ms_handle_remote_reset(this);
+            return dispatcher.ms_handle_remote_reset(this)
+              .then([this] {
+                close();
+              });
           } else {
             throw e;
           }
         }).handle_exception([this] (std::exception_ptr eptr) {
           // TODO: handle fault in the open state
           logger().warn("{} open fault: {}", *this, eptr);
+          close();
         });
     });
 }
index ecb2a9df595b38492c9d82885d5f2a5b07e13216..8cbe575f69abd58429bd9ba626c2af596159ca9e 100644 (file)
@@ -108,13 +108,9 @@ class SocketConnection : public Connection {
     bufferlist data;
   } m;
 
-  /// satisfied when a CEPH_MSGR_TAG_MSG is read, indicating that a message
-  /// header will follow
-  seastar::promise<> on_message;
-
   seastar::future<> maybe_throttle();
-  void read_tags_until_next_message();
-  seastar::future<stop_t> handle_ack();
+  seastar::future<> handle_tags();
+  seastar::future<> handle_ack();
 
   /// becomes available when handshake completes, and when all previous messages
   /// have been sent to the output stream. send() chains new messages as
@@ -139,7 +135,7 @@ class SocketConnection : public Connection {
   ///          false otherwise.
   bool update_rx_seq(seq_num_t seq);
 
-  seastar::future<MessageRef> do_read_message();
+  seastar::future<read_message();
 
   std::unique_ptr<AuthSessionHandler> session_security;
 
@@ -199,9 +195,6 @@ class SocketConnection : public Connection {
   void start_accept(seastar::connected_socket&& socket,
                     const entity_addr_t& peer_addr);
 
-  /// read a message from a connection that has completed its handshake
-  seastar::future<MessageRef> read_message();
-
   /// the number of connections initiated in this session, increment when a
   /// new connection is established
   uint32_t connect_seq() const {