const entity_name_t& _peer_name)
{
ceph_assert(state == state_t::NONE);
- ceph_assert(!frame_assembler.has_socket());
ceph_assert(!gate.is_closed());
conn.peer_addr = _peer_addr;
conn.target_addr = _peer_addr;
execute_connecting();
}
-void ProtocolV2::start_accept(SocketRef&& sock,
+void ProtocolV2::start_accept(SocketRef&& new_socket,
const entity_addr_t& _peer_addr)
{
ceph_assert(state == state_t::NONE);
- ceph_assert(!frame_assembler.has_socket());
// until we know better
conn.target_addr = _peer_addr;
- frame_assembler.set_socket(std::move(sock));
+ frame_assembler.set_socket(std::move(new_socket));
+ has_socket = true;
+ is_socket_valid = true;
logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr);
messenger.accept_conn(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
set_out_state(_out_state);
}
-void ProtocolV2::fault(bool backoff, const char* func_name, std::exception_ptr eptr)
+void ProtocolV2::fault(
+ state_t expected_state,
+ const char *where,
+ std::exception_ptr eptr)
{
- if (conn.policy.lossy) {
- logger().info("{} {}: fault at {} on lossy channel, going to CLOSING -- {}",
- conn, func_name, get_state_name(state), eptr);
+ assert(expected_state == state_t::CONNECTING ||
+ expected_state == state_t::ESTABLISHING ||
+ expected_state == state_t::REPLACING ||
+ expected_state == state_t::READY);
+ const char *e_what;
+ try {
+ std::rethrow_exception(eptr);
+ } catch (std::exception &e) {
+ e_what = e.what();
+ }
+
+ if (state != expected_state) {
+ logger().info("{} protocol {} {} is aborted at inconsistent {} -- {}",
+ conn,
+ get_state_name(expected_state),
+ where,
+ get_state_name(state),
+ e_what);
+#ifndef NDEBUG
+ if (expected_state == state_t::REPLACING) {
+ assert(state == state_t::CLOSING);
+ } else if (expected_state == state_t::READY) {
+ assert(state == state_t::CLOSING ||
+ state == state_t::REPLACING ||
+ state == state_t::CONNECTING ||
+ state == state_t::STANDBY);
+ } else {
+ assert(state == state_t::CLOSING ||
+ state == state_t::REPLACING);
+ }
+#endif
+ return;
+ }
+ assert(state == expected_state);
+
+ if (state != state_t::CONNECTING && conn.policy.lossy) {
+ // socket will be shutdown in do_close()
+ logger().info("{} protocol {} {} fault on lossy channel, going to CLOSING -- {}",
+ conn, get_state_name(state), where, e_what);
do_close(true);
- } else if (conn.policy.server ||
- (conn.policy.standby && !is_out_queued_or_sent())) {
- logger().info("{} {}: fault at {} with nothing to send, going to STANDBY -- {}",
- conn, func_name, get_state_name(state), eptr);
+ return;
+ }
+
+ if (likely(has_socket)) {
+ if (likely(is_socket_valid)) {
+ frame_assembler.shutdown_socket();
+ is_socket_valid = false;
+ } else {
+ ceph_assert_always(state == state_t::CONNECTING ||
+ state == state_t::REPLACING);
+ }
+ } else { // !has_socket
+ ceph_assert_always(state == state_t::CONNECTING);
+ assert(!is_socket_valid);
+ }
+
+ if (conn.policy.server ||
+ (conn.policy.standby && !is_out_queued_or_sent())) {
+ if (conn.policy.server) {
+ logger().info("{} protocol {} {} fault as server, going to STANDBY {} -- {}",
+ conn,
+ get_state_name(state),
+ where,
+ io_stat_printer{*this},
+ e_what);
+ } else {
+ logger().info("{} protocol {} {} fault with nothing to send, going to STANDBY {} -- {}",
+ conn,
+ get_state_name(state),
+ where,
+ io_stat_printer{*this},
+ e_what);
+ }
execute_standby();
- } else if (backoff) {
- logger().info("{} {}: fault at {}, going to WAIT -- {}",
- conn, func_name, get_state_name(state), eptr);
+ } else if (state == state_t::CONNECTING ||
+ state == state_t::REPLACING) {
+ logger().info("{} protocol {} {} fault, going to WAIT {} -- {}",
+ conn,
+ get_state_name(state),
+ where,
+ io_stat_printer{*this},
+ e_what);
execute_wait(false);
} else {
- logger().info("{} {}: fault at {}, going to CONNECTING -- {}",
- conn, func_name, get_state_name(state), eptr);
+ assert(state == state_t::READY ||
+ state == state_t::ESTABLISHING);
+ logger().info("{} protocol {} {} fault, going to CONNECTING {} -- {}",
+ conn,
+ get_state_name(state),
+ where,
+ io_stat_printer{*this},
+ e_what);
execute_connecting();
}
}
void ProtocolV2::execute_connecting()
{
+ ceph_assert_always(!is_socket_valid);
trigger_state(state_t::CONNECTING, out_state_t::delay, false);
- frame_assembler.shutdown_socket();
gated_execute("execute_connecting", [this] {
global_seq = messenger.get_global_seq();
assert(client_cookie != 0);
assert(server_cookie == 0);
logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
}
- return wait_out_exit_dispatching().then([this] {
+ return seastar::when_all(
+ wait_out_exit_dispatching(),
+ wait_in_exit_dispatching()
+ ).discard_result().then([this] {
if (unlikely(state != state_t::CONNECTING)) {
logger().debug("{} triggered {} before Socket::connect()",
conn, get_state_name(state));
abort_protocol();
}
- gate.dispatch_in_background(
- "reset_close_socket_connecting",
- *this,
- [this] { return frame_assembler.reset_and_close_socket(); });
INTERCEPT_N_RW(custom_bp_t::SOCKET_CONNECTING);
return Socket::connect(conn.peer_addr);
- }).then([this](SocketRef sock) {
+ }).then([this](SocketRef new_socket) {
logger().debug("{} socket connected", conn);
if (unlikely(state != state_t::CONNECTING)) {
logger().debug("{} triggered {} during Socket::connect()",
conn, get_state_name(state));
- return sock->close().then([sock = std::move(sock)] {
+ return new_socket->close().then([sock=std::move(new_socket)] {
abort_protocol();
});
}
- frame_assembler.set_socket(std::move(sock));
+ if (!has_socket) {
+ frame_assembler.set_socket(std::move(new_socket));
+ has_socket = true;
+ } else {
+ gate.dispatch_in_background(
+ "replace_socket_connecting",
+ *this,
+ [this, new_socket=std::move(new_socket)]() mutable {
+ return frame_assembler.replace_shutdown_socket(std::move(new_socket));
+ }
+ );
+ }
+ is_socket_valid = true;
return seastar::now();
}).then([this] {
auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
}
case next_step_t::wait: {
logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn);
+ ceph_assert_always(is_socket_valid);
+ frame_assembler.shutdown_socket();
+ is_socket_valid = false;
execute_wait(true);
break;
}
ceph_abort("impossible next step");
}
}
- }).handle_exception([this] (std::exception_ptr eptr) {
- if (state != state_t::CONNECTING) {
- logger().info("{} execute_connecting(): protocol aborted at {} -- {}",
- conn, get_state_name(state), eptr);
- assert(state == state_t::CLOSING ||
- state == state_t::REPLACING);
- return;
- }
-
- if (conn.policy.server ||
- (conn.policy.standby && !is_out_queued_or_sent())) {
- logger().info("{} execute_connecting(): fault at {} with nothing to send,"
- " going to STANDBY -- {}",
- conn, get_state_name(state), eptr);
- execute_standby();
- } else {
- logger().info("{} execute_connecting(): fault at {}, going to WAIT -- {}",
- conn, get_state_name(state), eptr);
- execute_wait(false);
- }
+ }).handle_exception([this](std::exception_ptr eptr) {
+ fault(state_t::CONNECTING, "execute_connecting", eptr);
});
});
}
peer_supported_features,
conn_seq,
msg_seq);
+ ceph_assert_always(has_socket && is_socket_valid);
+ is_socket_valid = false;
+ has_socket = false;
#ifdef UNIT_TESTS_BUILT
if (conn.interceptor) {
conn.interceptor->register_conn_replaced(conn);
void ProtocolV2::execute_accepting()
{
+ assert(is_socket_valid);
trigger_state(state_t::ACCEPTING, out_state_t::none, false);
gate.dispatch_in_background("execute_accepting", *this, [this] {
return seastar::futurize_invoke([this] {
conn.shared_from_this()));
};
+ ceph_assert_always(is_socket_valid);
trigger_state(state_t::ESTABLISHING, out_state_t::delay, false);
if (existing_conn) {
static_cast<ProtocolV2*>(existing_conn->protocol.get())->do_close(
client_cookie, server_cookie,
io_stat_printer{*this});
execute_ready(false);
- }).handle_exception([this] (std::exception_ptr eptr) {
- if (state != state_t::ESTABLISHING) {
- logger().info("{} execute_establishing() protocol aborted at {} -- {}",
- conn, get_state_name(state), eptr);
- assert(state == state_t::CLOSING ||
- state == state_t::REPLACING);
- return;
- }
- fault(false, "execute_establishing()", eptr);
+ }).handle_exception([this](std::exception_ptr eptr) {
+ fault(state_t::ESTABLISHING, "execute_establishing", eptr);
});
});
}
uint64_t new_msg_seq)
{
trigger_state(state_t::REPLACING, out_state_t::delay, false);
- frame_assembler.shutdown_socket();
+ ceph_assert_always(has_socket);
+ ceph_assert_always(!mover.socket->is_shutdown());
+ if (is_socket_valid) {
+ frame_assembler.shutdown_socket();
+ is_socket_valid = false;
+ }
gate.dispatch_in_background("trigger_replacing", *this,
[this,
reconnect,
dispatchers.ms_handle_accept(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
// state may become CLOSING, close mover.socket and abort later
- return wait_out_exit_dispatching(
- ).then([this] {
+ return seastar::when_all(
+ wait_out_exit_dispatching(),
+ wait_in_exit_dispatching()
+ ).discard_result().then([this] {
protocol_timer.cancel();
auto done = std::move(execution_done);
execution_done = seastar::now();
});
}
- gate.dispatch_in_background(
- "reset_close_socket_replacing",
- *this,
- [this] { return frame_assembler.reset_and_close_socket(); });
auth_meta = std::move(new_auth_meta);
peer_global_seq = new_peer_global_seq;
- frame_assembler.replace_by(std::move(mover));
+ gate.dispatch_in_background(
+ "replace_frame_assembler",
+ *this,
+ [this, mover=std::move(mover)]() mutable {
+ return frame_assembler.replace_by(std::move(mover));
+ }
+ );
+ is_socket_valid = true;
if (reconnect) {
connect_seq = new_connect_seq;
client_cookie, server_cookie,
io_stat_printer{*this});
execute_ready(false);
- }).handle_exception([this] (std::exception_ptr eptr) {
- if (state != state_t::REPLACING) {
- logger().info("{} trigger_replacing(): protocol aborted at {} -- {}",
- conn, get_state_name(state), eptr);
- assert(state == state_t::CLOSING);
- return;
- }
- fault(true, "trigger_replacing()", eptr);
+ }).handle_exception([this](std::exception_ptr eptr) {
+ fault(state_t::REPLACING, "trigger_replacing", eptr);
});
});
}
return bl;
}
+void ProtocolV2::notify_out_fault(std::exception_ptr eptr)
+{
+ fault(state_t::READY, "notify_out_fault", eptr);
+}
+
seastar::future<> ProtocolV2::read_message(utime_t throttle_stamp, std::size_t msg_size)
{
return frame_assembler.read_frame_payload(
void ProtocolV2::execute_ready(bool dispatch_connect)
{
+ ceph_assert_always(is_socket_valid);
assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
trigger_state(state_t::READY, out_state_t::open, false);
if (dispatch_connect) {
conn.interceptor->register_conn_ready(conn);
}
#endif
- gated_execute("execute_ready", [this] {
+ ceph_assert_always(!in_exit_dispatching.has_value());
+ in_exit_dispatching = seastar::shared_promise<>();
+ gate.dispatch_in_background("execute_ready", *this, [this] {
protocol_timer.cancel();
return seastar::keep_doing([this] {
return read_main_preamble(
}
}
});
- }).handle_exception([this] (std::exception_ptr eptr) {
- if (state != state_t::READY) {
- logger().info("{} execute_ready(): protocol aborted at {} -- {}",
- conn, get_state_name(state), eptr);
- assert(state == state_t::REPLACING ||
- state == state_t::CLOSING);
- return;
- }
- fault(false, "execute_ready()", eptr);
+ }).handle_exception([this](std::exception_ptr eptr) {
+ fault(state_t::READY, "execute_ready", eptr);
+ }).finally([this] {
+ ceph_assert_always(in_exit_dispatching.has_value());
+ in_exit_dispatching->set_value();
+ in_exit_dispatching = std::nullopt;
});
});
}
void ProtocolV2::execute_standby()
{
+ ceph_assert_always(!is_socket_valid);
trigger_state(state_t::STANDBY, out_state_t::delay, false);
- frame_assembler.shutdown_socket();
}
void ProtocolV2::notify_out()
void ProtocolV2::execute_wait(bool max_backoff)
{
+ ceph_assert_always(!is_socket_valid);
trigger_state(state_t::WAIT, out_state_t::delay, false);
- frame_assembler.shutdown_socket();
gated_execute("execute_wait", [this, max_backoff] {
double backoff = protocol_timer.last_dur();
if (max_backoff) {
void ProtocolV2::execute_server_wait()
{
+ ceph_assert_always(is_socket_valid);
trigger_state(state_t::SERVER_WAIT, out_state_t::none, false);
gated_execute("execute_server_wait", [this] {
return frame_assembler.read_exactly(1).then([this] (auto bl) {
if (f_accept_new) {
(*f_accept_new)();
}
- frame_assembler.shutdown_socket();
+ if (is_socket_valid) {
+ frame_assembler.shutdown_socket();
+ is_socket_valid = false;
+ }
assert(!gate.is_closed());
auto gate_closed = gate.close();
auto out_closed = close_out();
closed_clean_fut = seastar::when_all(
std::move(gate_closed), std::move(out_closed)
).discard_result().then([this] {
- return frame_assembler.reset_and_close_socket(false);
+ if (has_socket) {
+ return frame_assembler.close_shutdown_socket();
+ } else {
+ return seastar::now();
+ }
}).then([this] {
logger().debug("{} closed!", conn);
messenger.closed_conn(