]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: extract do_write_dispatch_sweep() 27428/head
authorYingxin Cheng <yingxincheng@gmail.com>
Mon, 8 Apr 2019 11:51:04 +0000 (19:51 +0800)
committerYingxin Cheng <yingxincheng@gmail.com>
Tue, 9 Apr 2019 14:40:58 +0000 (22:40 +0800)
Signed-off-by: Yingxin Cheng <yingxincheng@gmail.com>
src/crimson/net/Protocol.cc
src/crimson/net/Protocol.h

index c2d9b67605a2cc0c629631034fe90636372c93dd..1248bbc9335395666bc0a356f5b3d643b68b6b0b 100644 (file)
@@ -95,6 +95,63 @@ void Protocol::notify_keepalive_ack()
   }
 }
 
+seastar::future<stop_t> Protocol::do_write_dispatch_sweep()
+{
+  switch (write_state) {
+   case write_state_t::open:
+    return seastar::futurize_apply([this] {
+      if (need_keepalive) {
+        return do_keepalive()
+        .then([this] { need_keepalive = false; });
+      }
+      return seastar::now();
+    }).then([this] {
+      if (need_keepalive_ack) {
+        return do_keepalive_ack()
+        .then([this] { need_keepalive_ack = false; });
+      }
+      return seastar::now();
+    }).then([this] {
+      if (!conn.out_q.empty()){
+        MessageRef msg = conn.out_q.front();
+        return write_message(msg)
+        .then([this, msg] {
+          if (msg == conn.out_q.front()) {
+            conn.out_q.pop();
+          }
+          return stop_t::no;
+        });
+      } else {
+        return socket->flush().then([this] {
+          if (!conn.out_q.empty()) {
+            return stop_t::no;
+          } else {
+            // the dispatching can only stop when out_q is empty
+            ceph_assert(write_dispatching);
+            write_dispatching = false;
+            return stop_t::yes;
+          }
+        });
+      }
+    }).handle_exception([this] (std::exception_ptr eptr) {
+      logger().warn("{} do_write_dispatch_sweep() fault: {}", conn, eptr);
+      close();
+      return stop_t::no;
+    });
+   case write_state_t::delay: {
+    // delay dispatching writes until open
+    return state_changed.get_shared_future()
+    .then([] { return stop_t::no; });
+   }
+   case write_state_t::drop:
+    ceph_assert(write_dispatching);
+    write_dispatching = false;
+    return seastar::make_ready_future<stop_t>(stop_t::yes);
+   default:
+    ceph_assert(false);
+  }
+}
+
 void Protocol::write_event()
 {
   if (write_dispatching) {
@@ -108,56 +165,7 @@ void Protocol::write_event()
    case write_state_t::delay:
     seastar::with_gate(pending_dispatch, [this] {
       return seastar::repeat([this] {
-        switch (write_state) {
-         case write_state_t::open:
-          return seastar::futurize_apply([this] {
-            if (need_keepalive) {
-              return do_keepalive()
-              .then([this] { need_keepalive = false; });
-            }
-            return seastar::now();
-          }).then([this] {
-            if (need_keepalive_ack) {
-              return do_keepalive_ack()
-              .then([this] { need_keepalive_ack = false; });
-            }
-            return seastar::now();
-          }).then([this] {
-            if (!conn.out_q.empty()){
-              MessageRef msg = conn.out_q.front();
-              return write_message(msg)
-              .then([this, msg] {
-                if (msg == conn.out_q.front()) {
-                  conn.out_q.pop();
-                }
-                return stop_t::no;
-              });
-            } else {
-              return socket->flush()
-              .then([this] {
-                if (!conn.out_q.empty()) {
-                  return stop_t::no;
-                } else {
-                  write_dispatching = false;
-                  return stop_t::yes;
-                }
-              });
-            }
-          }).handle_exception([this] (std::exception_ptr eptr) {
-            logger().warn("{} write_event fault: {}", conn, eptr);
-            close();
-            return stop_t::no;
-          });
-         case write_state_t::delay:
-          // delay dispatching writes until open
-          return state_changed.get_shared_future()
-          .then([] { return stop_t::no; });
-         case write_state_t::drop:
-          write_dispatching = false;
-          return seastar::make_ready_future<stop_t>(stop_t::yes);
-         default:
-          ceph_assert(false);
-        }
+        return do_write_dispatch_sweep();
       });
     });
     return;
index 55af5927055059fcc252ee127a0d0873e060306f..73c43b6f695bbd7780b65ad218b6e03f086137b0 100644 (file)
@@ -90,6 +90,7 @@ class Protocol {
   bool need_keepalive = false;
   bool need_keepalive_ack = false;
   bool write_dispatching = false;
+  seastar::future<stop_t> do_write_dispatch_sweep();
   void write_event();
 };