From: Yingxin Cheng Date: Mon, 16 Sep 2019 03:54:39 +0000 (+0800) Subject: crimson/net: set exit_open atomically with write_dispatching X-Git-Tag: v15.1.0~1515^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=76b902b3ff0f63705d09eb63a9da5606f91cdfff;p=ceph.git crimson/net: set exit_open atomically with write_dispatching exit_open should be set atomically with state checks and write_dispatching changes, or there would be chances to be blocked forever by exit_open. Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/net/Protocol.cc b/src/crimson/net/Protocol.cc index c7350fa445e8..8b3883337115 100644 --- a/src/crimson/net/Protocol.cc +++ b/src/crimson/net/Protocol.cc @@ -178,9 +178,6 @@ seastar::future<> Protocol::do_write_dispatch_sweep() size_t num_msgs = conn.out_q.size(); // we must have something to write... ceph_assert(is_queued()); - assert(!open_write); - open_write = true; - conn.pending_q.clear(); conn.pending_q.swap(conn.out_q); if (!conn.policy.lossy) { @@ -208,31 +205,46 @@ seastar::future<> Protocol::do_write_dispatch_sweep() // the dispatching can ONLY stop now ceph_assert(write_dispatching); write_dispatching = false; - assert(open_write); - open_write = false; + if (unlikely(exit_open.has_value())) { + exit_open->set_value(); + exit_open = std::nullopt; + logger().info("{} write_event: nothing queued at {}," + " set exit_open", + conn, get_state_name(write_state)); + } return seastar::make_ready_future(stop_t::yes); } else { // something is pending to send during flushing - assert(open_write); - open_write = false; return seastar::make_ready_future(stop_t::no); } }); } else { // messages were enqueued during socket write - assert(open_write); - open_write = false; return seastar::make_ready_future(stop_t::no); } }); } case write_state_t::delay: // delay dispatching writes until open + if (exit_open) { + exit_open->set_value(); + exit_open = std::nullopt; + logger().info("{} write_event: delay and set exit_open ...", conn); + } else { + logger().info("{} write_event: delay ...", conn); + } return state_changed.get_shared_future() .then([] { return stop_t::no; }); case write_state_t::drop: ceph_assert(write_dispatching); write_dispatching = false; + if (exit_open) { + exit_open->set_value(); + exit_open = std::nullopt; + logger().info("{} write_event: dropped and set exit_open", conn); + } else { + logger().info("{} write_event: dropped", conn); + } return seastar::make_ready_future(stop_t::yes); default: ceph_assert(false); @@ -241,20 +253,18 @@ seastar::future<> Protocol::do_write_dispatch_sweep() if (e.code() != error::broken_pipe && e.code() != error::connection_reset && e.code() != error::negotiation_failure) { - logger().error("{} do_write_dispatch_sweep(): unexpected error {}", - conn, e); + logger().error("{} write_event(): unexpected error at {} -- {}", + conn, get_state_name(write_state), e); ceph_abort(); } - logger().debug("{} do_write_dispatch_sweep() fault: {}", conn, e); - assert(open_write); - open_write = false; - if (exit_open) { - exit_open->set_value(); - exit_open = std::nullopt; - } socket->shutdown(); if (write_state == write_state_t::open) { + logger().info("{} write_event(): fault at {}, going to delay -- {}", + conn, get_state_name(write_state), e); write_state = write_state_t::delay; + } else { + logger().info("{} write_event(): fault at {} -- {}", + conn, get_state_name(write_state), e); } return do_write_dispatch_sweep(); }); diff --git a/src/crimson/net/Protocol.h b/src/crimson/net/Protocol.h index 031fa13e27ef..6287d198cfdc 100644 --- a/src/crimson/net/Protocol.h +++ b/src/crimson/net/Protocol.h @@ -82,16 +82,21 @@ class Protocol { open, drop }; + + static const char* get_state_name(write_state_t state) { + static const char *const state_names[] = {"none", + "delay", + "open", + "drop"}; + assert(static_cast(state) < std::size(state_names)); + return state_names[static_cast(state)]; + } + void set_write_state(const write_state_t& state) { if (write_state == write_state_t::open && - state == write_state_t::delay) { - if (open_write) { - exit_open = seastar::shared_promise<>(); - } - } - if (state == write_state_t::drop && exit_open) { - exit_open->set_value(); - exit_open = std::nullopt; + state != write_state_t::open && + write_dispatching) { + exit_open = seastar::shared_promise<>(); } write_state = state; state_changed.set_value(); @@ -133,11 +138,9 @@ class Protocol { std::optional keepalive_ack = std::nullopt; uint64_t ack_left = 0; bool write_dispatching = false; - // Indicate if we are in the middle of writing. - bool open_write = false; // If another continuation is trying to close or replace socket when - // open_write is true, it needs to wait for exit_open until writing is - // stopped or failed. + // write_dispatching is true and write_state is open, + // it needs to wait for exit_open until writing is stopped or failed. std::optional> exit_open; seastar::future<> do_write_dispatch_sweep();