]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/net: move write exception handling to seastar::repeat()
authorYingxin Cheng <yingxin.cheng@intel.com>
Mon, 26 Aug 2019 04:43:34 +0000 (12:43 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 28 Aug 2019 02:16:39 +0000 (10:16 +0800)
* prevent exception leak in do_write_dispatch_sweep()
* no exception handling logic inside each repeat()

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/Protocol.cc
src/crimson/net/Protocol.h

index 35861b040c4247af6e296c56b0f509d607444290..9c2c9a67ba1d5ee597f9fb4a13a19d7b22ea9706 100644 (file)
@@ -170,98 +170,93 @@ void Protocol::ack_writes(seq_num_t seq)
   }
 }
 
-seastar::future<stop_t> Protocol::do_write_dispatch_sweep()
+seastar::future<> Protocol::do_write_dispatch_sweep()
 {
-  switch (write_state) {
-   case write_state_t::open: {
-    size_t num_msgs = conn.out_q.size();
-    // we must have something to write...
-    ceph_assert(is_queued());
-    assert(!open_write);
-    open_write = true;
+  return seastar::repeat([this] {
+    switch (write_state) {
+     case write_state_t::open: {
+      size_t num_msgs = conn.out_q.size();
+      // we must have something to write...
+      ceph_assert(is_queued());
+      assert(!open_write);
+      open_write = true;
 
-    conn.pending_q.clear();
-    conn.pending_q.swap(conn.out_q);
-    if (!conn.policy.lossy) {
-      conn.sent.insert(conn.sent.end(),
-                       conn.pending_q.begin(),
-                       conn.pending_q.end());
-    }
-    auto acked = ack_left;
-    assert(acked == 0 || conn.in_seq > 0);
-    // sweep all pending writes with the concrete Protocol
-    return socket->write(do_sweep_messages(
-        conn.pending_q, num_msgs, need_keepalive, keepalive_ack, acked > 0)
-    ).then([this, prv_keepalive_ack=keepalive_ack, acked] {
-      need_keepalive = false;
-      if (keepalive_ack == prv_keepalive_ack) {
-        keepalive_ack = std::nullopt;
-      }
-      assert(ack_left >= acked);
-      ack_left -= acked;
-      if (!is_queued()) {
-        // good, we have nothing pending to send now.
-        return socket->flush().then([this] {
-          if (!is_queued()) {
-            // still nothing pending to send after flush,
-            // the dispatching can ONLY stop now
-            ceph_assert(write_dispatching);
-            write_dispatching = false;
-            assert(open_write);
-            open_write = false;
-            return seastar::make_ready_future<stop_t>(stop_t::yes);
-          } else {
-            // something is pending to send during flushing
-            assert(open_write);
-            open_write = false;
-            return seastar::make_ready_future<stop_t>(stop_t::no);
-          }
-        });
-      } else {
-        // messages were enqueued during socket write
-        assert(open_write);
-        open_write = false;
-        return seastar::make_ready_future<stop_t>(stop_t::no);
+      conn.pending_q.clear();
+      conn.pending_q.swap(conn.out_q);
+      if (!conn.policy.lossy) {
+        conn.sent.insert(conn.sent.end(),
+                         conn.pending_q.begin(),
+                         conn.pending_q.end());
       }
-    }).handle_exception_type([this] (const std::system_error& e) {
-      if (e.code() != error::broken_pipe &&
-          e.code() != error::connection_reset) {
-        logger().error("{} do_write_dispatch_sweep(): unexpected error {}",
-                       conn, e);
-        ceph_abort();
-      }
-
-      logger().debug("{} do_write_dispatch_sweep() fault: {}", conn, e);
-      assert(open_write);
-      open_write = false;
-      if (exit_open) {
-        exit_open->set_value();
-        exit_open = std::nullopt;
-      }
-      socket->shutdown();
-      if (write_state == write_state_t::open) {
-        write_state = write_state_t::delay;
-      }
-      return seastar::make_ready_future<stop_t>(stop_t::no);
-    }).handle_exception([this] (std::exception_ptr eptr) {
-      logger().error("{} do_write_dispatch_sweep(): unexpected exception {}",
-                     conn, eptr);
+      auto acked = ack_left;
+      assert(acked == 0 || conn.in_seq > 0);
+      // sweep all pending writes with the concrete Protocol
+      return socket->write(do_sweep_messages(
+          conn.pending_q, num_msgs, need_keepalive, keepalive_ack, acked > 0)
+      ).then([this, prv_keepalive_ack=keepalive_ack, acked] {
+        need_keepalive = false;
+        if (keepalive_ack == prv_keepalive_ack) {
+          keepalive_ack = std::nullopt;
+        }
+        assert(ack_left >= acked);
+        ack_left -= acked;
+        if (!is_queued()) {
+          // good, we have nothing pending to send now.
+          return socket->flush().then([this] {
+            if (!is_queued()) {
+              // still nothing pending to send after flush,
+              // the dispatching can ONLY stop now
+              ceph_assert(write_dispatching);
+              write_dispatching = false;
+              assert(open_write);
+              open_write = false;
+              return seastar::make_ready_future<stop_t>(stop_t::yes);
+            } else {
+              // something is pending to send during flushing
+              assert(open_write);
+              open_write = false;
+              return seastar::make_ready_future<stop_t>(stop_t::no);
+            }
+          });
+        } else {
+          // messages were enqueued during socket write
+          assert(open_write);
+          open_write = false;
+          return seastar::make_ready_future<stop_t>(stop_t::no);
+        }
+      });
+     }
+     case write_state_t::delay:
+      // delay dispatching writes until open
+      return state_changed.get_shared_future()
+      .then([] { return stop_t::no; });
+     case write_state_t::drop:
+      ceph_assert(write_dispatching);
+      write_dispatching = false;
+      return seastar::make_ready_future<stop_t>(stop_t::yes);
+     default:
+      ceph_assert(false);
+    }
+  }).handle_exception_type([this] (const std::system_error& e) {
+    if (e.code() != error::broken_pipe &&
+        e.code() != error::connection_reset) {
+      logger().error("{} do_write_dispatch_sweep(): unexpected error {}",
+                     conn, e);
       ceph_abort();
-      return seastar::make_ready_future<stop_t>(stop_t::no);
-    });
-   }
-   case write_state_t::delay: {
-    // delay dispatching writes until open
-    return state_changed.get_shared_future()
-    .then([] { return stop_t::no; });
-   }
-   case write_state_t::drop:
-    ceph_assert(write_dispatching);
-    write_dispatching = false;
-    return seastar::make_ready_future<stop_t>(stop_t::yes);
-   default:
-    ceph_assert(false);
-  }
+    }
+    logger().debug("{} do_write_dispatch_sweep() fault: {}", conn, e);
+    assert(open_write);
+    open_write = false;
+    if (exit_open) {
+      exit_open->set_value();
+      exit_open = std::nullopt;
+    }
+    socket->shutdown();
+    if (write_state == write_state_t::open) {
+      write_state = write_state_t::delay;
+    }
+    return do_write_dispatch_sweep();
+  });
 }
 
 void Protocol::write_event()
@@ -277,8 +272,11 @@ void Protocol::write_event()
      [[fallthrough]];
    case write_state_t::delay:
     seastar::with_gate(pending_dispatch, [this] {
-      return seastar::repeat([this] {
-        return do_write_dispatch_sweep();
+      return do_write_dispatch_sweep(
+      ).handle_exception([this] (std::exception_ptr eptr) {
+        logger().error("{} do_write_dispatch_sweep(): unexpected exception {}",
+                       conn, eptr);
+        ceph_abort();
       });
     });
     return;
index 84c5f1f470c673653b753855239d4aece3628420..031fa13e27ef1759c39e817f4ea4d8fa9f26e9ac 100644 (file)
@@ -140,7 +140,7 @@ class Protocol {
   // stopped or failed.
   std::optional<seastar::shared_promise<>> exit_open;
 
-  seastar::future<stop_t> do_write_dispatch_sweep();
+  seastar::future<> do_write_dispatch_sweep();
   void write_event();
 };