]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: set exit_open atomically with write_dispatching
authorYingxin Cheng <yingxin.cheng@intel.com>
Mon, 16 Sep 2019 03:54:39 +0000 (11:54 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 18 Sep 2019 04:32:31 +0000 (12:32 +0800)
exit_open should be set atomically with state checks and
write_dispatching changes, or there would be chances to be blocked
forever by exit_open.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Protocol.cc
src/crimson/net/Protocol.h

index c7350fa445e8cdadda432174528444cf70a2e865..8b3883337115442590e0437113360e1b7e4bd34f 100644 (file)
@@ -178,9 +178,6 @@ seastar::future<> Protocol::do_write_dispatch_sweep()
       size_t num_msgs = conn.out_q.size();
       // we must have something to write...
       ceph_assert(is_queued());
-      assert(!open_write);
-      open_write = true;
-
       conn.pending_q.clear();
       conn.pending_q.swap(conn.out_q);
       if (!conn.policy.lossy) {
@@ -208,31 +205,46 @@ seastar::future<> Protocol::do_write_dispatch_sweep()
               // the dispatching can ONLY stop now
               ceph_assert(write_dispatching);
               write_dispatching = false;
-              assert(open_write);
-              open_write = false;
+              if (unlikely(exit_open.has_value())) {
+                exit_open->set_value();
+                exit_open = std::nullopt;
+                logger().info("{} write_event: nothing queued at {},"
+                              " set exit_open",
+                              conn, get_state_name(write_state));
+              }
               return seastar::make_ready_future<stop_t>(stop_t::yes);
             } else {
               // something is pending to send during flushing
-              assert(open_write);
-              open_write = false;
               return seastar::make_ready_future<stop_t>(stop_t::no);
             }
           });
         } else {
           // messages were enqueued during socket write
-          assert(open_write);
-          open_write = false;
           return seastar::make_ready_future<stop_t>(stop_t::no);
         }
       });
      }
      case write_state_t::delay:
       // delay dispatching writes until open
+      if (exit_open) {
+        exit_open->set_value();
+        exit_open = std::nullopt;
+        logger().info("{} write_event: delay and set exit_open ...", conn);
+      } else {
+        logger().info("{} write_event: delay ...", conn);
+      }
       return state_changed.get_shared_future()
       .then([] { return stop_t::no; });
      case write_state_t::drop:
       ceph_assert(write_dispatching);
       write_dispatching = false;
+      if (exit_open) {
+        exit_open->set_value();
+        exit_open = std::nullopt;
+        logger().info("{} write_event: dropped and set exit_open", conn);
+      } else {
+        logger().info("{} write_event: dropped", conn);
+      }
       return seastar::make_ready_future<stop_t>(stop_t::yes);
      default:
       ceph_assert(false);
@@ -241,20 +253,18 @@ seastar::future<> Protocol::do_write_dispatch_sweep()
     if (e.code() != error::broken_pipe &&
         e.code() != error::connection_reset &&
         e.code() != error::negotiation_failure) {
-      logger().error("{} do_write_dispatch_sweep(): unexpected error {}",
-                     conn, e);
+      logger().error("{} write_event(): unexpected error at {} -- {}",
+                     conn, get_state_name(write_state), e);
       ceph_abort();
     }
-    logger().debug("{} do_write_dispatch_sweep() fault: {}", conn, e);
-    assert(open_write);
-    open_write = false;
-    if (exit_open) {
-      exit_open->set_value();
-      exit_open = std::nullopt;
-    }
     socket->shutdown();
     if (write_state == write_state_t::open) {
+      logger().info("{} write_event(): fault at {}, going to delay -- {}",
+                    conn, get_state_name(write_state), e);
       write_state = write_state_t::delay;
+    } else {
+      logger().info("{} write_event(): fault at {} -- {}",
+                    conn, get_state_name(write_state), e);
     }
     return do_write_dispatch_sweep();
   });
index 031fa13e27ef1759c39e817f4ea4d8fa9f26e9ac..6287d198cfdcc6e20d70e368d35b34a3de7f6c81 100644 (file)
@@ -82,16 +82,21 @@ class Protocol {
     open,
     drop
   };
+
+  static const char* get_state_name(write_state_t state) {
+    static const char *const state_names[] = {"none",
+                                              "delay",
+                                              "open",
+                                              "drop"};
+    assert(static_cast<int>(state) < std::size(state_names));
+    return state_names[static_cast<int>(state)];
+  }
+
   void set_write_state(const write_state_t& state) {
     if (write_state == write_state_t::open &&
-        state == write_state_t::delay) {
-      if (open_write) {
-        exit_open = seastar::shared_promise<>();
-      }
-    }
-    if (state == write_state_t::drop && exit_open) {
-      exit_open->set_value();
-      exit_open = std::nullopt;
+        state != write_state_t::open &&
+        write_dispatching) {
+      exit_open = seastar::shared_promise<>();
     }
     write_state = state;
     state_changed.set_value();
@@ -133,11 +138,9 @@ class Protocol {
   std::optional<utime_t> keepalive_ack = std::nullopt;
   uint64_t ack_left = 0;
   bool write_dispatching = false;
-  // Indicate if we are in the middle of writing.
-  bool open_write = false;
   // If another continuation is trying to close or replace socket when
-  // open_write is true, it needs to wait for exit_open until writing is
-  // stopped or failed.
+  // write_dispatching is true and write_state is open,
+  // it needs to wait for exit_open until writing is stopped or failed.
   std::optional<seastar::shared_promise<>> exit_open;
 
   seastar::future<> do_write_dispatch_sweep();