execute_accepting();
}
-void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state, bool reentrant)
+void ProtocolV2::trigger_state(state_t new_state, io_state_t new_io_state)
{
- if (!reentrant && new_state == state) {
+ ceph_assert_always(!gate.is_closed());
+ if (new_state == state) {
logger().error("{} is not allowed to re-trigger state {}",
conn, get_state_name(state));
ceph_abort();
}
logger().debug("{} TRIGGER {}, was {}",
conn, get_state_name(new_state), get_state_name(state));
- auto pre_state = state;
- if (pre_state == state_t::READY) {
- assert(!gate.is_closed());
- ceph_assert_always(!exit_io.has_value());
- exit_io = seastar::shared_promise<>();
+
+ if (state == state_t::READY) {
+ // from READY
+ ceph_assert_always(!pr_exit_io.has_value());
+ pr_exit_io = seastar::shared_promise<>();
}
bool need_notify_out;
need_notify_out = false;
}
+ auto pre_state = state;
state = new_state;
+
+ FrameAssemblerV2Ref fa;
if (new_state == state_t::READY) {
- // I'm not responsible to shutdown the socket at READY
- is_socket_valid = false;
- io_handler.set_io_state(new_io_state, std::move(frame_assembler), need_notify_out);
+ assert(new_io_state == io_state_t::open);
+ fa = std::move(frame_assembler);
} else {
- io_handler.set_io_state(new_io_state, nullptr, need_notify_out);
+ assert(new_io_state != io_state_t::open);
}
-
- /*
- * not atomic below
- */
+ io_handler.set_io_state(new_io_state, std::move(fa), need_notify_out);
if (pre_state == state_t::READY) {
+ assert(new_io_state != io_state_t::open);
gate.dispatch_in_background("exit_io", conn, [this] {
return io_handler.wait_io_exit_dispatching(
).then([this](auto ret) {
frame_assembler = std::move(ret.frame_assembler);
ceph_assert_always(!frame_assembler->is_socket_valid());
io_states = ret.io_states;
- exit_io->set_value();
- exit_io = std::nullopt;
+ pr_exit_io->set_value();
+ pr_exit_io = std::nullopt;
});
});
}
void ProtocolV2::execute_connecting()
{
ceph_assert_always(!is_socket_valid);
- trigger_state(state_t::CONNECTING, io_state_t::delay, false);
+ trigger_state(state_t::CONNECTING, io_state_t::delay);
gated_execute("execute_connecting", conn, [this] {
global_seq = messenger.get_global_seq();
assert(client_cookie != 0);
void ProtocolV2::execute_accepting()
{
assert(is_socket_valid);
- trigger_state(state_t::ACCEPTING, io_state_t::none, false);
+ trigger_state(state_t::ACCEPTING, io_state_t::none);
gate.dispatch_in_background("execute_accepting", conn, [this] {
return seastar::futurize_invoke([this] {
#ifdef UNIT_TESTS_BUILT
};
ceph_assert_always(is_socket_valid);
- trigger_state(state_t::ESTABLISHING, io_state_t::delay, false);
+ trigger_state(state_t::ESTABLISHING, io_state_t::delay);
if (existing_conn) {
- static_cast<ProtocolV2*>(existing_conn->protocol.get())->do_close(
- true /* is_dispatch_reset */, std::move(accept_me));
+ ProtocolV2 *existing_proto = dynamic_cast<ProtocolV2*>(
+ existing_conn->protocol.get());
+ existing_proto->do_close(
+ true, // is_dispatch_reset
+ std::move(accept_me));
if (unlikely(state != state_t::ESTABLISHING)) {
logger().warn("{} triggered {} during execute_establishing(), "
"the accept event will not be delivered!",
}
gated_execute("execute_establishing", conn, [this] {
+ ceph_assert_always(state == state_t::ESTABLISHING);
return seastar::futurize_invoke([this] {
return send_server_ident();
}).then([this] {
seastar::future<>
ProtocolV2::send_server_ident()
{
+ ceph_assert_always(state == state_t::ESTABLISHING ||
+ state == state_t::REPLACING);
// send_server_ident() logic
// refered to async-conn v2: not assign gs to global_seq
uint64_t new_connect_seq,
uint64_t new_msg_seq)
{
+ ceph_assert_always(state >= state_t::ESTABLISHING);
+ ceph_assert_always(state <= state_t::WAIT);
ceph_assert_always(has_socket || state == state_t::CONNECTING);
ceph_assert_always(!mover.socket->is_shutdown());
- trigger_state(state_t::REPLACING, io_state_t::delay, false);
+ trigger_state(state_t::REPLACING, io_state_t::delay);
if (is_socket_valid) {
frame_assembler->shutdown_socket<true>(&gate);
is_socket_valid = false;
}
if (unlikely(state != state_t::REPLACING)) {
+ ceph_assert_always(state == state_t::CLOSING);
return mover.socket->close(
).then([sock = std::move(mover.socket)] {
abort_protocol();
}
}).then([this, reconnect] {
if (unlikely(state != state_t::REPLACING)) {
- logger().debug("{} triggered {} at the end of trigger_replacing()",
+ logger().debug("{} triggered {} at the end of trigger_replacing(), abort",
conn, get_state_name(state));
+ ceph_assert_always(state == state_t::CLOSING);
abort_protocol();
}
- logger().info("{} replaced ({}): gs={}, pgs={}, cs={}, "
+ logger().info("{} replaced ({}), going to ready: "
+ "gs={}, pgs={}, cs={}, "
"client_cookie={}, server_cookie={}, {}",
conn, reconnect ? "reconnected" : "connected",
global_seq, peer_global_seq, connect_seq,
assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
protocol_timer.cancel();
ceph_assert_always(is_socket_valid);
- trigger_state(state_t::READY, io_state_t::open, false);
+ // I'm not responsible to shutdown the socket at READY
+ is_socket_valid = false;
+ trigger_state(state_t::READY, io_state_t::open);
}
// STANDBY state
void ProtocolV2::execute_standby()
{
ceph_assert_always(!is_socket_valid);
- trigger_state(state_t::STANDBY, io_state_t::delay, false);
+ trigger_state(state_t::STANDBY, io_state_t::delay);
}
void ProtocolV2::notify_out()
void ProtocolV2::execute_wait(bool max_backoff)
{
ceph_assert_always(!is_socket_valid);
- trigger_state(state_t::WAIT, io_state_t::delay, false);
+ trigger_state(state_t::WAIT, io_state_t::delay);
gated_execute("execute_wait", conn, [this, max_backoff] {
double backoff = protocol_timer.last_dur();
if (max_backoff) {
void ProtocolV2::execute_server_wait()
{
ceph_assert_always(is_socket_valid);
- trigger_state(state_t::SERVER_WAIT, io_state_t::none, false);
+ trigger_state(state_t::SERVER_WAIT, io_state_t::none);
gated_execute("execute_server_wait", conn, [this] {
return frame_assembler->read_exactly(1
).then([this](auto bptr) {
// the container when seastar::parallel_for_each() is still iterating in it.
// that'd lead to a segfault.
return seastar::yield(
- ).then([this, conn_ref = conn.shared_from_this()] {
+ ).then([this] {
do_close(false);
- // it can happen if close_clean() is called inside Dispatcher::ms_handle_reset()
- // which will otherwise result in deadlock
- assert(closed_clean_fut.valid());
- return closed_clean_fut.get_future();
- });
+ return pr_closed_clean.get_shared_future();
+
+ // connection may be unreferenced from the messenger,
+ // so need to hold the additional reference.
+ }).finally([conn_ref = conn.shared_from_this()] {});;
}
void ProtocolV2::do_close(
bool is_dispatch_reset,
std::optional<std::function<void()>> f_accept_new)
{
- if (closed) {
+ if (state == state_t::CLOSING) {
// already closing
- assert(state == state_t::CLOSING);
return;
}
* atomic operations
*/
- closed = true;
+ ceph_assert_always(!gate.is_closed());
- // trigger close
+ // messenger registrations, must before user events
messenger.closing_conn(
seastar::static_pointer_cast<SocketConnection>(
conn.shared_from_this()));
// cannot happen
ceph_assert(false);
}
- protocol_timer.cancel();
- trigger_state(state_t::CLOSING, io_state_t::drop, false);
-
if (f_accept_new) {
+ // the replacing connection must be registerred after the replaced
+ // connection is unreigsterred.
(*f_accept_new)();
}
+
+ protocol_timer.cancel();
if (is_socket_valid) {
frame_assembler->shutdown_socket<true>(&gate);
is_socket_valid = false;
}
- assert(!gate.is_closed());
- auto handshake_closed = gate.close();
- auto io_closed = io_handler.close_io(
- is_dispatch_reset, is_replace);
-
- // asynchronous operations
- assert(!closed_clean_fut.valid());
- closed_clean_fut = seastar::when_all(
- std::move(handshake_closed), std::move(io_closed)
- ).discard_result().then([this] {
- ceph_assert_always(!exit_io.has_value());
+
+ trigger_state(state_t::CLOSING, io_state_t::drop);
+ gate.dispatch_in_background(
+ "close_io", conn, [this, is_dispatch_reset, is_replace] {
+ return io_handler.close_io(is_dispatch_reset, is_replace);
+ });
+
+ std::ignore = gate.close(
+ ).then([this] {
+ ceph_assert_always(!pr_exit_io.has_value());
if (has_socket) {
ceph_assert_always(frame_assembler);
return frame_assembler->close_shutdown_socket();
messenger.closed_conn(
seastar::static_pointer_cast<SocketConnection>(
conn.shared_from_this()));
+ pr_closed_clean.set_value();
#ifdef UNIT_TESTS_BUILT
closed_clean = true;
if (conn.interceptor) {
}
#endif
}).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
- logger().error("{} closing: closed_clean_fut got unexpected exception {}",
+ logger().error("{} closing got unexpected exception {}",
conn, eptr);
ceph_abort();
});