From: Yingxin Cheng Date: Mon, 26 Aug 2019 04:43:34 +0000 (+0800) Subject: crimson/net: move write exception handling to seastar::repeat() X-Git-Tag: v15.1.0~1675^2~9 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=1f842bd06d0d60b339d544c815f019b5cc6bc614;p=ceph-ci.git crimson/net: move write exception handling to seastar::repeat() * prevent exception leak in do_write_dispatch_sweep() * no exception handling logic inside each repeat() Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index 35861b040c4..9c2c9a67ba1 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -170,98 +170,93 @@ void Protocol::ack_writes(seq_num_t seq) } } -seastar::future Protocol::do_write_dispatch_sweep() +seastar::future<> Protocol::do_write_dispatch_sweep() { - switch (write_state) { - case write_state_t::open: { - size_t num_msgs = conn.out_q.size(); - // we must have something to write... - ceph_assert(is_queued()); - assert(!open_write); - open_write = true; + return seastar::repeat([this] { + switch (write_state) { + case write_state_t::open: { + size_t num_msgs = conn.out_q.size(); + // we must have something to write... + ceph_assert(is_queued()); + assert(!open_write); + open_write = true; - conn.pending_q.clear(); - conn.pending_q.swap(conn.out_q); - if (!conn.policy.lossy) { - conn.sent.insert(conn.sent.end(), - conn.pending_q.begin(), - conn.pending_q.end()); - } - auto acked = ack_left; - assert(acked == 0 || conn.in_seq > 0); - // sweep all pending writes with the concrete Protocol - return socket->write(do_sweep_messages( - conn.pending_q, num_msgs, need_keepalive, keepalive_ack, acked > 0) - ).then([this, prv_keepalive_ack=keepalive_ack, acked] { - need_keepalive = false; - if (keepalive_ack == prv_keepalive_ack) { - keepalive_ack = std::nullopt; - } - assert(ack_left >= acked); - ack_left -= acked; - if (!is_queued()) { - // good, we have nothing pending to send now. - return socket->flush().then([this] { - if (!is_queued()) { - // still nothing pending to send after flush, - // the dispatching can ONLY stop now - ceph_assert(write_dispatching); - write_dispatching = false; - assert(open_write); - open_write = false; - return seastar::make_ready_future(stop_t::yes); - } else { - // something is pending to send during flushing - assert(open_write); - open_write = false; - return seastar::make_ready_future(stop_t::no); - } - }); - } else { - // messages were enqueued during socket write - assert(open_write); - open_write = false; - return seastar::make_ready_future(stop_t::no); + conn.pending_q.clear(); + conn.pending_q.swap(conn.out_q); + if (!conn.policy.lossy) { + conn.sent.insert(conn.sent.end(), + conn.pending_q.begin(), + conn.pending_q.end()); } - }).handle_exception_type([this] (const std::system_error& e) { - if (e.code() != error::broken_pipe && - e.code() != error::connection_reset) { - logger().error("{} do_write_dispatch_sweep(): unexpected error {}", - conn, e); - ceph_abort(); - } - - logger().debug("{} do_write_dispatch_sweep() fault: {}", conn, e); - assert(open_write); - open_write = false; - if (exit_open) { - exit_open->set_value(); - exit_open = std::nullopt; - } - socket->shutdown(); - if (write_state == write_state_t::open) { - write_state = write_state_t::delay; - } - return seastar::make_ready_future(stop_t::no); - }).handle_exception([this] (std::exception_ptr eptr) { - logger().error("{} do_write_dispatch_sweep(): unexpected exception {}", - conn, eptr); + auto acked = ack_left; + assert(acked == 0 || conn.in_seq > 0); + // sweep all pending writes with the concrete Protocol + return socket->write(do_sweep_messages( + conn.pending_q, num_msgs, need_keepalive, keepalive_ack, acked > 0) + ).then([this, prv_keepalive_ack=keepalive_ack, acked] { + need_keepalive = false; + if (keepalive_ack == prv_keepalive_ack) { + keepalive_ack = std::nullopt; + } + assert(ack_left >= acked); + ack_left -= acked; + if (!is_queued()) { + // good, we have nothing pending to send now. + return socket->flush().then([this] { + if (!is_queued()) { + // still nothing pending to send after flush, + // the dispatching can ONLY stop now + ceph_assert(write_dispatching); + write_dispatching = false; + assert(open_write); + open_write = false; + return seastar::make_ready_future(stop_t::yes); + } else { + // something is pending to send during flushing + assert(open_write); + open_write = false; + return seastar::make_ready_future(stop_t::no); + } + }); + } else { + // messages were enqueued during socket write + assert(open_write); + open_write = false; + return seastar::make_ready_future(stop_t::no); + } + }); + } + case write_state_t::delay: + // delay dispatching writes until open + return state_changed.get_shared_future() + .then([] { return stop_t::no; }); + case write_state_t::drop: + ceph_assert(write_dispatching); + write_dispatching = false; + return seastar::make_ready_future(stop_t::yes); + default: + ceph_assert(false); + } + }).handle_exception_type([this] (const std::system_error& e) { + if (e.code() != error::broken_pipe && + e.code() != error::connection_reset) { + logger().error("{} do_write_dispatch_sweep(): unexpected error {}", + conn, e); ceph_abort(); - return seastar::make_ready_future(stop_t::no); - }); - } - case write_state_t::delay: { - // delay dispatching writes until open - return state_changed.get_shared_future() - .then([] { return stop_t::no; }); - } - case write_state_t::drop: - ceph_assert(write_dispatching); - write_dispatching = false; - return seastar::make_ready_future(stop_t::yes); - default: - ceph_assert(false); - } + } + logger().debug("{} do_write_dispatch_sweep() fault: {}", conn, e); + assert(open_write); + open_write = false; + if (exit_open) { + exit_open->set_value(); + exit_open = std::nullopt; + } + socket->shutdown(); + if (write_state == write_state_t::open) { + write_state = write_state_t::delay; + } + return do_write_dispatch_sweep(); + }); } void Protocol::write_event() @@ -277,8 +272,11 @@ void Protocol::write_event() [[fallthrough]]; case write_state_t::delay: seastar::with_gate(pending_dispatch, [this] { - return seastar::repeat([this] { - return do_write_dispatch_sweep(); + return do_write_dispatch_sweep( + ).handle_exception([this] (std::exception_ptr eptr) { + logger().error("{} do_write_dispatch_sweep(): unexpected exception {}", + conn, eptr); + ceph_abort(); }); }); return; diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h index 84c5f1f470c..031fa13e27e 100644 --- a/src/crimson/net/Protocol.h +++ b/src/crimson/net/Protocol.h @@ -140,7 +140,7 @@ class Protocol { // stopped or failed. std::optional> exit_open; - seastar::future do_write_dispatch_sweep(); + seastar::future<> do_write_dispatch_sweep(); void write_event(); };