size_t num_msgs = conn.out_q.size();
// we must have something to write...
ceph_assert(is_queued());
- assert(!open_write);
- open_write = true;
-
conn.pending_q.clear();
conn.pending_q.swap(conn.out_q);
if (!conn.policy.lossy) {
// the dispatching can ONLY stop now
ceph_assert(write_dispatching);
write_dispatching = false;
- assert(open_write);
- open_write = false;
+ if (unlikely(exit_open.has_value())) {
+ exit_open->set_value();
+ exit_open = std::nullopt;
+ logger().info("{} write_event: nothing queued at {},"
+ " set exit_open",
+ conn, get_state_name(write_state));
+ }
return seastar::make_ready_future<stop_t>(stop_t::yes);
} else {
// something is pending to send during flushing
- assert(open_write);
- open_write = false;
return seastar::make_ready_future<stop_t>(stop_t::no);
}
});
} else {
// messages were enqueued during socket write
- assert(open_write);
- open_write = false;
return seastar::make_ready_future<stop_t>(stop_t::no);
}
});
}
case write_state_t::delay:
// delay dispatching writes until open
+ if (exit_open) {
+ exit_open->set_value();
+ exit_open = std::nullopt;
+ logger().info("{} write_event: delay and set exit_open ...", conn);
+ } else {
+ logger().info("{} write_event: delay ...", conn);
+ }
return state_changed.get_shared_future()
.then([] { return stop_t::no; });
case write_state_t::drop:
ceph_assert(write_dispatching);
write_dispatching = false;
+ if (exit_open) {
+ exit_open->set_value();
+ exit_open = std::nullopt;
+ logger().info("{} write_event: dropped and set exit_open", conn);
+ } else {
+ logger().info("{} write_event: dropped", conn);
+ }
return seastar::make_ready_future<stop_t>(stop_t::yes);
default:
ceph_assert(false);
if (e.code() != error::broken_pipe &&
e.code() != error::connection_reset &&
e.code() != error::negotiation_failure) {
- logger().error("{} do_write_dispatch_sweep(): unexpected error {}",
- conn, e);
+ logger().error("{} write_event(): unexpected error at {} -- {}",
+ conn, get_state_name(write_state), e);
ceph_abort();
}
- logger().debug("{} do_write_dispatch_sweep() fault: {}", conn, e);
- assert(open_write);
- open_write = false;
- if (exit_open) {
- exit_open->set_value();
- exit_open = std::nullopt;
- }
socket->shutdown();
if (write_state == write_state_t::open) {
+ logger().info("{} write_event(): fault at {}, going to delay -- {}",
+ conn, get_state_name(write_state), e);
write_state = write_state_t::delay;
+ } else {
+ logger().info("{} write_event(): fault at {} -- {}",
+ conn, get_state_name(write_state), e);
}
return do_write_dispatch_sweep();
});
open,
drop
};
+
+ static const char* get_state_name(write_state_t state) {
+ static const char *const state_names[] = {"none",
+ "delay",
+ "open",
+ "drop"};
+ assert(static_cast<int>(state) < std::size(state_names));
+ return state_names[static_cast<int>(state)];
+ }
+
void set_write_state(const write_state_t& state) {
if (write_state == write_state_t::open &&
- state == write_state_t::delay) {
- if (open_write) {
- exit_open = seastar::shared_promise<>();
- }
- }
- if (state == write_state_t::drop && exit_open) {
- exit_open->set_value();
- exit_open = std::nullopt;
+ state != write_state_t::open &&
+ write_dispatching) {
+ exit_open = seastar::shared_promise<>();
}
write_state = state;
state_changed.set_value();
std::optional<utime_t> keepalive_ack = std::nullopt;
uint64_t ack_left = 0;
bool write_dispatching = false;
- // Indicate if we are in the middle of writing.
- bool open_write = false;
// If another continuation is trying to close or replace socket when
- // open_write is true, it needs to wait for exit_open until writing is
- // stopped or failed.
+ // write_dispatching is true and write_state is open,
+ // it needs to wait for exit_open until writing is stopped or failed.
std::optional<seastar::shared_promise<>> exit_open;
seastar::future<> do_write_dispatch_sweep();