]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: wait_write_exit() to wait for writer stopped
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 9 Aug 2019 09:24:55 +0000 (17:24 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Mon, 12 Aug 2019 09:02:51 +0000 (17:02 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Protocol.cc
src/crimson/net/Protocol.h

index 4e53c7bfbdcd2c76f380b8891bb89e8b7492f313..bb0657f50475f21732dd9066cd483785a07f160e 100644 (file)
@@ -6,6 +6,7 @@
 #include "auth/Auth.h"
 
 #include "crimson/common/log.h"
+#include "Errors.h"
 #include "Socket.h"
 #include "SocketConnection.h"
 
@@ -29,6 +30,7 @@ Protocol::Protocol(proto_t type,
 Protocol::~Protocol()
 {
   ceph_assert(pending_dispatch.is_closed());
+  assert(!exit_open);
 }
 
 bool Protocol::is_connected() const
@@ -101,6 +103,9 @@ seastar::future<stop_t> Protocol::do_write_dispatch_sweep()
     size_t num_msgs = conn.out_q.size();
     // we must have something to write...
     ceph_assert(is_queued());
+    assert(!open_write);
+    open_write = true;
+
     MessageRef front_msg;
     if (likely(num_msgs)) {
       front_msg = conn.out_q.front();
@@ -126,19 +131,46 @@ seastar::future<stop_t> Protocol::do_write_dispatch_sweep()
             // the dispatching can ONLY stop now
             ceph_assert(write_dispatching);
             write_dispatching = false;
+            assert(open_write);
+            open_write = false;
             return seastar::make_ready_future<stop_t>(stop_t::yes);
           } else {
             // something is pending to send during flushing
+            assert(open_write);
+            open_write = false;
             return seastar::make_ready_future<stop_t>(stop_t::no);
           }
         });
       } else {
         // messages were enqueued during socket write
+        assert(open_write);
+        open_write = false;
         return seastar::make_ready_future<stop_t>(stop_t::no);
       }
+    }).handle_exception_type([this] (const std::system_error& e) {
+      if (e.code() != error::broken_pipe &&
+          e.code() != error::connection_reset) {
+        logger().error("{} do_write_dispatch_sweep(): unexpected error {}",
+                       conn, e);
+        ceph_abort();
+      }
+
+      logger().debug("{} do_write_dispatch_sweep() fault: {}", conn, e);
+      assert(open_write);
+      open_write = false;
+      if (exit_open) {
+        exit_open->set_value();
+        exit_open = std::nullopt;
+      }
+      socket->shutdown();
+      if (write_state == write_state_t::open) {
+        write_state = write_state_t::delay;
+      }
+      return seastar::make_ready_future<stop_t>(stop_t::no);
     }).handle_exception([this] (std::exception_ptr eptr) {
-      logger().warn("{} do_write_dispatch_sweep() fault: {}", conn, eptr);
-      close();
+      logger().error("{} do_write_dispatch_sweep(): unexpected exception {}",
+                     conn, eptr);
+      ceph_abort();
       return seastar::make_ready_future<stop_t>(stop_t::no);
     });
    }
index fe3598f19d46f978c78ee31bea1ac2c4ed978917..66794ab66a5d71c8475f390902f16894cbe71b82 100644 (file)
@@ -67,6 +67,7 @@ class Protocol {
   seastar::future<> send(MessageRef msg);
   seastar::future<> keepalive();
 
+// TODO: encapsulate a SessionedSender class
  protected:
   // write_state is changed with state atomically, indicating the write
   // behavior of the according state.
@@ -77,11 +78,28 @@ class Protocol {
     drop
   };
   void set_write_state(const write_state_t& state) {
+    if (write_state == write_state_t::open &&
+        state == write_state_t::delay) {
+      if (open_write) {
+        exit_open = seastar::shared_promise<>();
+      }
+    }
+    if (state == write_state_t::drop && exit_open) {
+      exit_open->set_value();
+      exit_open = std::nullopt;
+    }
     write_state = state;
     state_changed.set_value();
     state_changed = seastar::shared_promise<>();
   }
 
+  seastar::future<> wait_write_exit() {
+    if (exit_open) {
+      return exit_open->get_shared_future();
+    }
+    return seastar::now();
+  }
+
   void notify_keepalive_ack(utime_t keepalive_ack);
 
   bool is_queued() const {
@@ -98,6 +116,13 @@ class Protocol {
   bool need_keepalive = false;
   std::optional<utime_t> keepalive_ack = std::nullopt;
   bool write_dispatching = false;
+  // Indicate if we are in the middle of writing.
+  bool open_write = false;
+  // If another continuation is trying to close or replace socket when
+  // open_write is true, it needs to wait for exit_open until writing is
+  // stopped or failed.
+  std::optional<seastar::shared_promise<>> exit_open;
+
   seastar::future<stop_t> do_write_dispatch_sweep();
   void write_event();
 };