}
}
-seastar::future<stop_t> Protocol::do_write_dispatch_sweep()
+seastar::future<> Protocol::do_write_dispatch_sweep()
{
- switch (write_state) {
- case write_state_t::open: {
- size_t num_msgs = conn.out_q.size();
- // we must have something to write...
- ceph_assert(is_queued());
- assert(!open_write);
- open_write = true;
+ return seastar::repeat([this] {
+ switch (write_state) {
+ case write_state_t::open: {
+ size_t num_msgs = conn.out_q.size();
+ // we must have something to write...
+ ceph_assert(is_queued());
+ assert(!open_write);
+ open_write = true;
- conn.pending_q.clear();
- conn.pending_q.swap(conn.out_q);
- if (!conn.policy.lossy) {
- conn.sent.insert(conn.sent.end(),
- conn.pending_q.begin(),
- conn.pending_q.end());
- }
- auto acked = ack_left;
- assert(acked == 0 || conn.in_seq > 0);
- // sweep all pending writes with the concrete Protocol
- return socket->write(do_sweep_messages(
- conn.pending_q, num_msgs, need_keepalive, keepalive_ack, acked > 0)
- ).then([this, prv_keepalive_ack=keepalive_ack, acked] {
- need_keepalive = false;
- if (keepalive_ack == prv_keepalive_ack) {
- keepalive_ack = std::nullopt;
- }
- assert(ack_left >= acked);
- ack_left -= acked;
- if (!is_queued()) {
- // good, we have nothing pending to send now.
- return socket->flush().then([this] {
- if (!is_queued()) {
- // still nothing pending to send after flush,
- // the dispatching can ONLY stop now
- ceph_assert(write_dispatching);
- write_dispatching = false;
- assert(open_write);
- open_write = false;
- return seastar::make_ready_future<stop_t>(stop_t::yes);
- } else {
- // something is pending to send during flushing
- assert(open_write);
- open_write = false;
- return seastar::make_ready_future<stop_t>(stop_t::no);
- }
- });
- } else {
- // messages were enqueued during socket write
- assert(open_write);
- open_write = false;
- return seastar::make_ready_future<stop_t>(stop_t::no);
+ conn.pending_q.clear();
+ conn.pending_q.swap(conn.out_q);
+ if (!conn.policy.lossy) {
+ conn.sent.insert(conn.sent.end(),
+ conn.pending_q.begin(),
+ conn.pending_q.end());
}
- }).handle_exception_type([this] (const std::system_error& e) {
- if (e.code() != error::broken_pipe &&
- e.code() != error::connection_reset) {
- logger().error("{} do_write_dispatch_sweep(): unexpected error {}",
- conn, e);
- ceph_abort();
- }
-
- logger().debug("{} do_write_dispatch_sweep() fault: {}", conn, e);
- assert(open_write);
- open_write = false;
- if (exit_open) {
- exit_open->set_value();
- exit_open = std::nullopt;
- }
- socket->shutdown();
- if (write_state == write_state_t::open) {
- write_state = write_state_t::delay;
- }
- return seastar::make_ready_future<stop_t>(stop_t::no);
- }).handle_exception([this] (std::exception_ptr eptr) {
- logger().error("{} do_write_dispatch_sweep(): unexpected exception {}",
- conn, eptr);
+ auto acked = ack_left;
+ assert(acked == 0 || conn.in_seq > 0);
+ // sweep all pending writes with the concrete Protocol
+ return socket->write(do_sweep_messages(
+ conn.pending_q, num_msgs, need_keepalive, keepalive_ack, acked > 0)
+ ).then([this, prv_keepalive_ack=keepalive_ack, acked] {
+ need_keepalive = false;
+ if (keepalive_ack == prv_keepalive_ack) {
+ keepalive_ack = std::nullopt;
+ }
+ assert(ack_left >= acked);
+ ack_left -= acked;
+ if (!is_queued()) {
+ // good, we have nothing pending to send now.
+ return socket->flush().then([this] {
+ if (!is_queued()) {
+ // still nothing pending to send after flush,
+ // the dispatching can ONLY stop now
+ ceph_assert(write_dispatching);
+ write_dispatching = false;
+ assert(open_write);
+ open_write = false;
+ return seastar::make_ready_future<stop_t>(stop_t::yes);
+ } else {
+ // something is pending to send during flushing
+ assert(open_write);
+ open_write = false;
+ return seastar::make_ready_future<stop_t>(stop_t::no);
+ }
+ });
+ } else {
+ // messages were enqueued during socket write
+ assert(open_write);
+ open_write = false;
+ return seastar::make_ready_future<stop_t>(stop_t::no);
+ }
+ });
+ }
+ case write_state_t::delay:
+ // delay dispatching writes until open
+ return state_changed.get_shared_future()
+ .then([] { return stop_t::no; });
+ case write_state_t::drop:
+ ceph_assert(write_dispatching);
+ write_dispatching = false;
+ return seastar::make_ready_future<stop_t>(stop_t::yes);
+ default:
+ ceph_assert(false);
+ }
+ }).handle_exception_type([this] (const std::system_error& e) {
+ if (e.code() != error::broken_pipe &&
+ e.code() != error::connection_reset) {
+ logger().error("{} do_write_dispatch_sweep(): unexpected error {}",
+ conn, e);
ceph_abort();
- return seastar::make_ready_future<stop_t>(stop_t::no);
- });
- }
- case write_state_t::delay: {
- // delay dispatching writes until open
- return state_changed.get_shared_future()
- .then([] { return stop_t::no; });
- }
- case write_state_t::drop:
- ceph_assert(write_dispatching);
- write_dispatching = false;
- return seastar::make_ready_future<stop_t>(stop_t::yes);
- default:
- ceph_assert(false);
- }
+ }
+ logger().debug("{} do_write_dispatch_sweep() fault: {}", conn, e);
+ assert(open_write);
+ open_write = false;
+ if (exit_open) {
+ exit_open->set_value();
+ exit_open = std::nullopt;
+ }
+ socket->shutdown();
+ if (write_state == write_state_t::open) {
+ write_state = write_state_t::delay;
+ }
+ return do_write_dispatch_sweep();
+ });
}
void Protocol::write_event()
[[fallthrough]];
case write_state_t::delay:
seastar::with_gate(pending_dispatch, [this] {
- return seastar::repeat([this] {
- return do_write_dispatch_sweep();
+ return do_write_dispatch_sweep(
+ ).handle_exception([this] (std::exception_ptr eptr) {
+ logger().error("{} do_write_dispatch_sweep(): unexpected exception {}",
+ conn, eptr);
+ ceph_abort();
});
});
return;