From 25e75d4b1b92433f57a1157f41ca115705423fa6 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Mon, 8 Apr 2019 19:51:04 +0800 Subject: [PATCH] crimson/net: extract do_write_dispatch_sweep() Signed-off-by: Yingxin Cheng --- src/crimson/net/Protocol.cc | 108 +++++++++++++++++++----------------- src/crimson/net/Protocol.h | 1 + 2 files changed, 59 insertions(+), 50 deletions(-) diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index c2d9b67605a2c..1248bbc933539 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -95,6 +95,63 @@ void Protocol::notify_keepalive_ack() } } +seastar::future Protocol::do_write_dispatch_sweep() +{ + switch (write_state) { + case write_state_t::open: + return seastar::futurize_apply([this] { + if (need_keepalive) { + return do_keepalive() + .then([this] { need_keepalive = false; }); + } + return seastar::now(); + }).then([this] { + if (need_keepalive_ack) { + return do_keepalive_ack() + .then([this] { need_keepalive_ack = false; }); + } + return seastar::now(); + }).then([this] { + if (!conn.out_q.empty()){ + MessageRef msg = conn.out_q.front(); + return write_message(msg) + .then([this, msg] { + if (msg == conn.out_q.front()) { + conn.out_q.pop(); + } + return stop_t::no; + }); + } else { + return socket->flush().then([this] { + if (!conn.out_q.empty()) { + return stop_t::no; + } else { + // the dispatching can only stop when out_q is empty + ceph_assert(write_dispatching); + write_dispatching = false; + return stop_t::yes; + } + }); + } + }).handle_exception([this] (std::exception_ptr eptr) { + logger().warn("{} do_write_dispatch_sweep() fault: {}", conn, eptr); + close(); + return stop_t::no; + }); + case write_state_t::delay: { + // delay dispatching writes until open + return state_changed.get_shared_future() + .then([] { return stop_t::no; }); + } + case write_state_t::drop: + ceph_assert(write_dispatching); + write_dispatching = false; + return seastar::make_ready_future(stop_t::yes); + default: + ceph_assert(false); + } +} + void Protocol::write_event() { if (write_dispatching) { @@ -108,56 +165,7 @@ void Protocol::write_event() case write_state_t::delay: seastar::with_gate(pending_dispatch, [this] { return seastar::repeat([this] { - switch (write_state) { - case write_state_t::open: - return seastar::futurize_apply([this] { - if (need_keepalive) { - return do_keepalive() - .then([this] { need_keepalive = false; }); - } - return seastar::now(); - }).then([this] { - if (need_keepalive_ack) { - return do_keepalive_ack() - .then([this] { need_keepalive_ack = false; }); - } - return seastar::now(); - }).then([this] { - if (!conn.out_q.empty()){ - MessageRef msg = conn.out_q.front(); - return write_message(msg) - .then([this, msg] { - if (msg == conn.out_q.front()) { - conn.out_q.pop(); - } - return stop_t::no; - }); - } else { - return socket->flush() - .then([this] { - if (!conn.out_q.empty()) { - return stop_t::no; - } else { - write_dispatching = false; - return stop_t::yes; - } - }); - } - }).handle_exception([this] (std::exception_ptr eptr) { - logger().warn("{} write_event fault: {}", conn, eptr); - close(); - return stop_t::no; - }); - case write_state_t::delay: - // delay dispatching writes until open - return state_changed.get_shared_future() - .then([] { return stop_t::no; }); - case write_state_t::drop: - write_dispatching = false; - return seastar::make_ready_future(stop_t::yes); - default: - ceph_assert(false); - } + return do_write_dispatch_sweep(); }); }); return; diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h index 55af592705505..73c43b6f695bb 100644 --- a/src/crimson/net/Protocol.h +++ b/src/crimson/net/Protocol.h @@ -90,6 +90,7 @@ class Protocol { bool need_keepalive = false; bool need_keepalive_ack = false; bool write_dispatching = false; + seastar::future do_write_dispatch_sweep(); void write_event(); }; -- 2.39.5