From 4fa1c4c07d7b369a4d511ec388f4e15ad4af7001 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Fri, 9 Aug 2019 17:24:55 +0800 Subject: [PATCH] crimson/net: wait_write_exit() to wait for writer stopped Signed-off-by: Yingxin Cheng --- src/crimson/net/Protocol.cc | 36 ++++++++++++++++++++++++++++++++++-- src/crimson/net/Protocol.h | 25 +++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 2 deletions(-) diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index 4e53c7bfbdcd2..bb0657f50475f 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -6,6 +6,7 @@ #include "auth/Auth.h" #include "crimson/common/log.h" +#include "Errors.h" #include "Socket.h" #include "SocketConnection.h" @@ -29,6 +30,7 @@ Protocol::Protocol(proto_t type, Protocol::~Protocol() { ceph_assert(pending_dispatch.is_closed()); + assert(!exit_open); } bool Protocol::is_connected() const @@ -101,6 +103,9 @@ seastar::future Protocol::do_write_dispatch_sweep() size_t num_msgs = conn.out_q.size(); // we must have something to write... ceph_assert(is_queued()); + assert(!open_write); + open_write = true; + MessageRef front_msg; if (likely(num_msgs)) { front_msg = conn.out_q.front(); @@ -126,19 +131,46 @@ seastar::future Protocol::do_write_dispatch_sweep() // the dispatching can ONLY stop now ceph_assert(write_dispatching); write_dispatching = false; + assert(open_write); + open_write = false; return seastar::make_ready_future(stop_t::yes); } else { // something is pending to send during flushing + assert(open_write); + open_write = false; return seastar::make_ready_future(stop_t::no); } }); } else { // messages were enqueued during socket write + assert(open_write); + open_write = false; return seastar::make_ready_future(stop_t::no); } + }).handle_exception_type([this] (const std::system_error& e) { + if (e.code() != error::broken_pipe && + e.code() != error::connection_reset) { + logger().error("{} do_write_dispatch_sweep(): unexpected error {}", + conn, e); + ceph_abort(); + } + + logger().debug("{} do_write_dispatch_sweep() fault: {}", conn, e); + assert(open_write); + open_write = false; + if (exit_open) { + exit_open->set_value(); + exit_open = std::nullopt; + } + socket->shutdown(); + if (write_state == write_state_t::open) { + write_state = write_state_t::delay; + } + return seastar::make_ready_future(stop_t::no); }).handle_exception([this] (std::exception_ptr eptr) { - logger().warn("{} do_write_dispatch_sweep() fault: {}", conn, eptr); - close(); + logger().error("{} do_write_dispatch_sweep(): unexpected exception {}", + conn, eptr); + ceph_abort(); return seastar::make_ready_future(stop_t::no); }); } diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h index fe3598f19d46f..66794ab66a5d7 100644 --- a/src/crimson/net/Protocol.h +++ b/src/crimson/net/Protocol.h @@ -67,6 +67,7 @@ class Protocol { seastar::future<> send(MessageRef msg); seastar::future<> keepalive(); +// TODO: encapsulate a SessionedSender class protected: // write_state is changed with state atomically, indicating the write // behavior of the according state. @@ -77,11 +78,28 @@ class Protocol { drop }; void set_write_state(const write_state_t& state) { + if (write_state == write_state_t::open && + state == write_state_t::delay) { + if (open_write) { + exit_open = seastar::shared_promise<>(); + } + } + if (state == write_state_t::drop && exit_open) { + exit_open->set_value(); + exit_open = std::nullopt; + } write_state = state; state_changed.set_value(); state_changed = seastar::shared_promise<>(); } + seastar::future<> wait_write_exit() { + if (exit_open) { + return exit_open->get_shared_future(); + } + return seastar::now(); + } + void notify_keepalive_ack(utime_t keepalive_ack); bool is_queued() const { @@ -98,6 +116,13 @@ class Protocol { bool need_keepalive = false; std::optional keepalive_ack = std::nullopt; bool write_dispatching = false; + // Indicate if we are in the middle of writing. + bool open_write = false; + // If another continuation is trying to close or replace socket when + // open_write is true, it needs to wait for exit_open until writing is + // stopped or failed. + std::optional> exit_open; + seastar::future do_write_dispatch_sweep(); void write_event(); }; -- 2.39.5