if (!reentrant && _state == state) {
logger().error("{} is not allowed to re-trigger state {}",
conn, get_state_name(state));
- ceph_assert(false);
+ ceph_abort();
+ }
+ if (state == state_t::CLOSING) {
+ logger().error("{} CLOSING is not allowed to trigger state {}",
+ conn, get_state_name(_state));
+ ceph_abort();
}
logger().debug("{} TRIGGER {}, was {}",
conn, get_state_name(_state), get_state_name(state));
case Tag::SESSION_RESET:
return frame_assembler.read_frame_payload(
).then([this](auto payload) {
+ if (unlikely(state != state_t::CONNECTING)) {
+ logger().debug("{} triggered {} before reset_session()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
// handle_session_reset() logic
auto reset = ResetFrame::Decode(payload->back());
logger().warn("{} GOT ResetFrame: full={}", conn, reset.full());
ProtocolV2* existing_proto, bool do_reset,
bool reconnect, uint64_t conn_seq, uint64_t msg_seq)
{
+ if (unlikely(state != state_t::ACCEPTING)) {
+ logger().debug("{} triggered {} before trigger_replacing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+
existing_proto->trigger_replacing(reconnect,
do_reset,
frame_assembler.to_replace(),
logger().warn("{} server_connect:"
" existing connection {} is a lossy channel. Close existing in favor of"
" this connection", conn, *existing_conn);
+ if (unlikely(state != state_t::ACCEPTING)) {
+ logger().debug("{} triggered {} before execute_establishing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
execute_establishing(existing_conn);
return seastar::make_ready_future<next_step_t>(next_step_t::ready);
}
if (existing_conn) {
return handle_existing_connection(existing_conn);
} else {
+ if (unlikely(state != state_t::ACCEPTING)) {
+ logger().debug("{} triggered {} before execute_establishing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
execute_establishing(nullptr);
return seastar::make_ready_future<next_step_t>(next_step_t::ready);
}
// ESTABLISHING
void ProtocolV2::execute_establishing(SocketConnectionRef existing_conn) {
- if (unlikely(state != state_t::ACCEPTING)) {
- logger().debug("{} triggered {} before execute_establishing()",
- conn, get_state_name(state));
- abort_protocol();
- }
-
auto accept_me = [this] {
messenger.register_conn(
seastar::static_pointer_cast<SocketConnection>(
dispatchers.ms_handle_accept(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ if (unlikely(state != state_t::ESTABLISHING)) {
+ logger().debug("{} triggered {} after ms_handle_accept() during execute_establishing()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
gated_execute("execute_establishing", [this] {
return seastar::futurize_invoke([this] {
{
return frame_assembler.read_frame_payload(
).then([this, throttle_stamp, msg_size](auto payload) {
+ if (unlikely(state != state_t::READY)) {
+ logger().debug("{} triggered {} during read_message()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
+
utime_t recv_stamp{seastar::lowres_system_clock::now()};
// we need to get the size before std::moving segments data
// TODO: change MessageRef with seastar::shared_ptr
auto msg_ref = MessageRef{message, false};
+ assert(state == state_t::READY);
// throttle the reading process by the returned future
return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
});
trigger_state(state_t::READY, out_state_t::open, false);
if (dispatch_connect) {
dispatchers.ms_handle_connect(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+ if (unlikely(state != state_t::READY)) {
+ logger().debug("{} triggered {} after ms_handle_connect() during execute_ready()",
+ conn, get_state_name(state));
+ abort_protocol();
+ }
}
#ifdef UNIT_TESTS_BUILT
if (conn.interceptor) {
{
if (closed) {
// already closing
+ assert(state == state_t::CLOSING);
return;
}