}
}
+seastar::future<stop_t> Protocol::try_exit_sweep() {
+ assert(!is_queued());
+ return socket->flush().then([this] {
+ if (!is_queued()) {
+ // still nothing pending to send after flush,
+ // the dispatching can ONLY stop now
+ ceph_assert(write_dispatching);
+ write_dispatching = false;
+ if (unlikely(exit_open.has_value())) {
+ exit_open->set_value();
+ exit_open = std::nullopt;
+ logger().info("{} write_event: nothing queued at {},"
+ " set exit_open",
+ conn, get_state_name(write_state));
+ }
+ return seastar::make_ready_future<stop_t>(stop_t::yes);
+ } else {
+ // something is pending to send during flushing
+ return seastar::make_ready_future<stop_t>(stop_t::no);
+ }
+ });
+}
+
seastar::future<> Protocol::do_write_dispatch_sweep()
{
return seastar::repeat([this] {
switch (write_state) {
case write_state_t::open: {
size_t num_msgs = conn.out_q.size();
- // we must have something to write...
- ceph_assert(is_queued());
+ bool still_queued = is_queued();
+ if (unlikely(!still_queued)) {
+ return try_exit_sweep();
+ }
conn.pending_q.clear();
conn.pending_q.swap(conn.out_q);
if (!conn.policy.lossy) {
assert(ack_left >= acked);
ack_left -= acked;
if (!is_queued()) {
- // good, we have nothing pending to send now.
- return socket->flush().then([this] {
- if (!is_queued()) {
- // still nothing pending to send after flush,
- // the dispatching can ONLY stop now
- ceph_assert(write_dispatching);
- write_dispatching = false;
- if (unlikely(exit_open.has_value())) {
- exit_open->set_value();
- exit_open = std::nullopt;
- logger().info("{} write_event: nothing queued at {},"
- " set exit_open",
- conn, get_state_name(write_state));
- }
- return seastar::make_ready_future<stop_t>(stop_t::yes);
- } else {
- // something is pending to send during flushing
- return seastar::make_ready_future<stop_t>(stop_t::no);
- }
- });
+ return try_exit_sweep();
} else {
// messages were enqueued during socket write
return seastar::make_ready_future<stop_t>(stop_t::no);