execute_accepting();
}
-void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state)
+void ProtocolV2::trigger_state_phase1(state_t new_state)
{
ceph_assert_always(!gate.is_closed());
if (new_state == state) {
if (state == state_t::READY) {
// from READY
+ ceph_assert_always(!need_exit_io);
ceph_assert_always(!pr_exit_io.has_value());
+ need_exit_io = true;
pr_exit_io = seastar::shared_promise<>();
}
- bool need_notify_out;
if (new_state == state_t::STANDBY && !conn.policy.server) {
need_notify_out = true;
} else {
need_notify_out = false;
}
- auto pre_state = state;
state = new_state;
+}
+
+void ProtocolV2::trigger_state_phase2(
+ state_t new_state, io_state_t new_io_state)
+{
+ ceph_assert_always(new_state == state);
+ ceph_assert_always(!gate.is_closed());
+ ceph_assert_always(!pr_switch_io_shard.has_value());
FrameAssemblerV2Ref fa;
if (new_state == state_t::READY) {
}
io_handler.set_io_state(new_io_state, std::move(fa), need_notify_out);
- if (pre_state == state_t::READY) {
+ if (need_exit_io) {
+ // from READY
logger().debug("{} IOHandler::wait_io_exit_dispatching() start ...", conn);
+ assert(pr_exit_io.has_value());
assert(new_io_state != io_state_t::open);
+ need_exit_io = false;
gate.dispatch_in_background("exit_io", conn, [this] {
return seastar::smp::submit_to(
io_handler.get_shard_id(), [this] {
ceph_assert_always(
seastar::this_shard_id() == frame_assembler->get_shard_id());
ceph_assert_always(!frame_assembler->is_socket_valid());
+ assert(!need_exit_io);
io_states = ret.io_states;
pr_exit_io->set_value();
pr_exit_io = std::nullopt;
conn, reconnect ? "reconnected" : "connected",
peer_global_seq, connect_seq, client_cookie,
io_states, mover.socket->get_shard_id());
- trigger_state(state_t::REPLACING, io_state_t::delay);
if (is_socket_valid) {
frame_assembler->shutdown_socket<true>(&gate);
is_socket_valid = false;
}
+ trigger_state_phase1(state_t::REPLACING);
gate.dispatch_in_background(
"trigger_replacing",
conn,
// state may become CLOSING below, but we cannot abort the chain until
// mover.socket is correctly handled (closed or replaced).
- return wait_exit_io(
+ // this is preemptive
+ return wait_switch_io_shard(
).then([this] {
if (unlikely(state != state_t::REPLACING)) {
ceph_assert_always(state == state_t::CLOSING);
return seastar::now();
}
+ trigger_state_phase2(state_t::REPLACING, io_state_t::delay);
+ return wait_exit_io();
+ }).then([this] {
+ if (unlikely(state != state_t::REPLACING)) {
+ ceph_assert_always(state == state_t::CLOSING);
+ return seastar::now();
+ }
+
ceph_assert_always(frame_assembler);
protocol_timer.cancel();
auto done = std::move(execution_done);
is_socket_valid = false;
}
- trigger_state(state_t::CLOSING, io_state_t::drop);
+ trigger_state_phase1(state_t::CLOSING);
gate.dispatch_in_background(
"close_io", conn, [this, is_dispatch_reset, is_replace] {
- return io_handler.close_io(is_dispatch_reset, is_replace);
- });
-
- std::ignore = gate.close(
- ).then([this] {
- ceph_assert_always(!pr_exit_io.has_value());
- if (has_socket) {
- ceph_assert_always(frame_assembler);
- return frame_assembler->close_shutdown_socket();
- } else {
- return seastar::now();
- }
- }).then([this] {
- logger().debug("{} closed!", conn);
- messenger.closed_conn(
- seastar::static_pointer_cast<SocketConnection>(
- conn.shared_from_this()));
- pr_closed_clean.set_value();
+ // this is preemptive
+ return wait_switch_io_shard(
+ ).then([this, is_dispatch_reset, is_replace] {
+ trigger_state_phase2(state_t::CLOSING, io_state_t::drop);
+ logger().debug("{} IOHandler::close_io(reset={}, replace={})",
+ conn, is_dispatch_reset, is_replace);
+
+ std::ignore = gate.close(
+ ).then([this] {
+ ceph_assert_always(!need_exit_io);
+ ceph_assert_always(!pr_exit_io.has_value());
+ if (has_socket) {
+ ceph_assert_always(frame_assembler);
+ return frame_assembler->close_shutdown_socket();
+ } else {
+ return seastar::now();
+ }
+ }).then([this] {
+ logger().debug("{} closed!", conn);
+ messenger.closed_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+ pr_closed_clean.set_value();
#ifdef UNIT_TESTS_BUILT
- closed_clean = true;
- if (conn.interceptor) {
- conn.interceptor->register_conn_closed(
- conn.get_local_shared_foreign_from_this());
- }
+ closed_clean = true;
+ if (conn.interceptor) {
+ conn.interceptor->register_conn_closed(
+ conn.get_local_shared_foreign_from_this());
+ }
#endif
- }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
- logger().error("{} closing got unexpected exception {}",
- conn, eptr);
- ceph_abort();
+ // connection is unreferenced from the messenger,
+ // so need to hold the additional reference.
+ }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
+ logger().error("{} closing got unexpected exception {}",
+ conn, eptr);
+ ceph_abort();
+ });
+
+ return seastar::smp::submit_to(
+ io_handler.get_shard_id(),
+ [this, is_dispatch_reset, is_replace] {
+ return io_handler.close_io(is_dispatch_reset, is_replace);
+ });
+ // user can make changes
+ });
});
}
private:
using io_state_t = IOHandler::io_state_t;
+ seastar::future<> wait_switch_io_shard() {
+ if (pr_switch_io_shard.has_value()) {
+ return pr_switch_io_shard->get_shared_future();
+ } else {
+ return seastar::now();
+ }
+ }
+
seastar::future<> wait_exit_io() {
if (pr_exit_io.has_value()) {
return pr_exit_io->get_shared_future();
} else {
+ assert(!need_exit_io);
return seastar::now();
}
}
return statenames[static_cast<int>(state)];
}
- void trigger_state(state_t new_state, io_state_t new_io_state);
+ void trigger_state_phase1(state_t new_state);
+
+ void trigger_state_phase2(state_t new_state, io_state_t new_io_state);
+
+ void trigger_state(state_t new_state, io_state_t new_io_state) {
+ ceph_assert_always(!pr_switch_io_shard.has_value());
+ trigger_state_phase1(new_state);
+ trigger_state_phase2(new_state, new_io_state);
+ }
template <typename Func, typename T>
void gated_execute(const char *what, T &who, Func &&func) {
FrameAssemblerV2Ref frame_assembler;
+ bool need_notify_out = false;
+
std::optional<seastar::shared_promise<>> pr_switch_io_shard;
+ bool need_exit_io = false;
+
std::optional<seastar::shared_promise<>> pr_exit_io;
AuthConnectionMetaRef auth_meta;