Protocol::Protocol(ChainedDispatchers& dispatchers,
SocketConnection& conn)
: dispatchers(dispatchers),
- conn(conn),
- frame_assembler(conn)
+ conn(conn)
{}
Protocol::~Protocol()
if (unlikely(require_keepalive)) {
auto keepalive_frame = KeepAliveFrame::Encode();
- bl.append(frame_assembler.get_buffer(keepalive_frame));
+ bl.append(frame_assembler->get_buffer(keepalive_frame));
}
if (unlikely(maybe_keepalive_ack.has_value())) {
auto keepalive_ack_frame = KeepAliveFrameAck::Encode(*maybe_keepalive_ack);
- bl.append(frame_assembler.get_buffer(keepalive_ack_frame));
+ bl.append(frame_assembler->get_buffer(keepalive_ack_frame));
}
if (require_ack && num_msgs == 0u) {
auto ack_frame = AckFrame::Encode(get_in_seq());
- bl.append(frame_assembler.get_buffer(ack_frame));
+ bl.append(frame_assembler->get_buffer(ack_frame));
}
std::for_each(
msg->get_payload(), msg->get_middle(), msg->get_data());
logger().debug("{} --> #{} === {} ({})",
conn, msg->get_seq(), *msg, msg->get_type());
- bl.append(frame_assembler.get_buffer(message));
+ bl.append(frame_assembler->get_buffer(message));
});
if (!conn.policy.lossy) {
}
void Protocol::set_out_state(
- const Protocol::out_state_t &new_state)
+ const Protocol::out_state_t &new_state,
+ FrameAssemblerV2Ref fa)
{
ceph_assert_always(!(
(new_state == out_state_t::none && out_state != out_state_t::none) ||
));
bool dispatch_in = false;
- if (out_state != out_state_t::open &&
- new_state == out_state_t::open) {
+ if (new_state == out_state_t::open) {
// to open
- ceph_assert_always(frame_assembler.is_socket_valid());
+ assert(fa != nullptr);
+ ceph_assert_always(frame_assembler == nullptr);
+ frame_assembler = std::move(fa);
+ ceph_assert_always(frame_assembler->is_socket_valid());
dispatch_in = true;
#ifdef UNIT_TESTS_BUILT
if (conn.interceptor) {
conn.interceptor->register_conn_ready(conn);
}
#endif
- } else if (out_state == out_state_t::open &&
- new_state != out_state_t::open) {
+ } else if (out_state == out_state_t::open) {
// from open
- ceph_assert_always(frame_assembler.is_socket_valid());
- frame_assembler.shutdown_socket();
+ assert(fa == nullptr);
+ ceph_assert_always(frame_assembler->is_socket_valid());
+ frame_assembler->shutdown_socket();
if (out_dispatching) {
ceph_assert_always(!out_exit_dispatching.has_value());
- out_exit_dispatching = seastar::shared_promise<>();
+ out_exit_dispatching = seastar::promise<>();
}
+ } else {
+ assert(fa == nullptr);
}
if (out_state != new_state) {
out_state_changed = seastar::promise<>();
}
- // The above needs to be atomic
+ /*
+ * not atomic below
+ */
+
if (dispatch_in) {
do_in_dispatch();
}
}
-seastar::future<> Protocol::wait_io_exit_dispatching()
+seastar::future<FrameAssemblerV2Ref> Protocol::wait_io_exit_dispatching()
{
ceph_assert_always(out_state != out_state_t::open);
- ceph_assert_always(!frame_assembler.is_socket_valid());
+ ceph_assert_always(frame_assembler != nullptr);
+ ceph_assert_always(!frame_assembler->is_socket_valid());
return seastar::when_all(
[this] {
if (out_exit_dispatching) {
- return out_exit_dispatching->get_shared_future();
+ return out_exit_dispatching->get_future();
} else {
return seastar::now();
}
}(),
[this] {
if (in_exit_dispatching) {
- return in_exit_dispatching->get_shared_future();
+ return in_exit_dispatching->get_future();
} else {
return seastar::now();
}
}()
- ).discard_result();
+ ).discard_result().then([this] {
+ return std::move(frame_assembler);
+ });
}
void Protocol::requeue_out_sent()
seastar::future<stop_t> Protocol::try_exit_out_dispatch() {
assert(!is_out_queued());
- return frame_assembler.flush().then([this] {
+ return frame_assembler->flush(
+ ).then([this] {
if (!is_out_queued()) {
// still nothing pending to send after flush,
// the dispatching can ONLY stop now
auto to_ack = ack_left;
assert(to_ack == 0 || in_seq > 0);
// sweep all pending out with the concrete Protocol
- return frame_assembler.write(
+ return frame_assembler->write(
sweep_out_pending_msgs_to_sent(
need_keepalive, next_keepalive_ack, to_ack > 0)
).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] {
seastar::future<>
Protocol::read_message(utime_t throttle_stamp, std::size_t msg_size)
{
- return frame_assembler.read_frame_payload(
+ return frame_assembler->read_frame_payload(
).then([this, throttle_stamp, msg_size](auto payload) {
if (unlikely(out_state != out_state_t::open)) {
logger().debug("{} triggered {} during read_message()",
void Protocol::do_in_dispatch()
{
ceph_assert_always(!in_exit_dispatching.has_value());
- in_exit_dispatching = seastar::shared_promise<>();
+ in_exit_dispatching = seastar::promise<>();
gate.dispatch_in_background("do_in_dispatch", *this, [this] {
return seastar::keep_doing([this] {
- return frame_assembler.read_main_preamble(
+ return frame_assembler->read_main_preamble(
).then([this](auto ret) {
switch (ret.tag) {
case Tag::MESSAGE: {
});
}
case Tag::ACK:
- return frame_assembler.read_frame_payload(
+ return frame_assembler->read_frame_payload(
).then([this](auto payload) {
// handle_message_ack() logic
auto ack = AckFrame::Decode(payload->back());
ack_out_sent(ack.seq());
});
case Tag::KEEPALIVE2:
- return frame_assembler.read_frame_payload(
+ return frame_assembler->read_frame_payload(
).then([this](auto payload) {
// handle_keepalive2() logic
auto keepalive_frame = KeepAliveFrame::Decode(payload->back());
last_keepalive = seastar::lowres_system_clock::now();
});
case Tag::KEEPALIVE2_ACK:
- return frame_assembler.read_frame_payload(
+ return frame_assembler->read_frame_payload(
).then([this](auto payload) {
// handle_keepalive2_ack() logic
auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back());
SocketMessenger& messenger)
: Protocol(dispatchers, conn),
messenger{messenger},
+ frame_assembler{FrameAssemblerV2::create(conn)},
auth_meta{seastar::make_lw_shared<AuthConnectionMeta>()},
protocol_timer{conn}
{}
ceph_assert(state == state_t::NONE);
// until we know better
conn.target_addr = _peer_addr;
- frame_assembler.set_socket(std::move(new_socket));
+ frame_assembler->set_socket(std::move(new_socket));
has_socket = true;
is_socket_valid = true;
logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr);
execute_accepting();
}
-void ProtocolV2::trigger_state(state_t _state, out_state_t _out_state, bool reentrant)
+void ProtocolV2::trigger_state(state_t new_state, out_state_t _out_state, bool reentrant)
{
- if (!reentrant && _state == state) {
+ if (!reentrant && new_state == state) {
logger().error("{} is not allowed to re-trigger state {}",
conn, get_state_name(state));
ceph_abort();
}
if (state == state_t::CLOSING) {
logger().error("{} CLOSING is not allowed to trigger state {}",
- conn, get_state_name(_state));
+ conn, get_state_name(new_state));
ceph_abort();
}
logger().debug("{} TRIGGER {}, was {}",
- conn, get_state_name(_state), get_state_name(state));
- state = _state;
- set_out_state(_out_state);
+ conn, get_state_name(new_state), get_state_name(state));
+ auto pre_state = state;
+ if (pre_state == state_t::READY) {
+ assert(!gate.is_closed());
+ ceph_assert_always(!exit_io.has_value());
+ exit_io = seastar::shared_promise<>();
+ }
+ state = new_state;
+ if (new_state == state_t::READY) {
+ // I'm not responsible to shutdown the socket at READY
+ is_socket_valid = false;
+ set_out_state(_out_state, std::move(frame_assembler));
+ } else {
+ set_out_state(_out_state, nullptr);
+ }
+
+ /*
+ * not atomic below
+ */
+
+ if (pre_state == state_t::READY) {
+ gate.dispatch_in_background("exit_io", *this, [this] {
+ return wait_io_exit_dispatching(
+ ).then([this](FrameAssemblerV2Ref fa) {
+ frame_assembler = std::move(fa);
+ exit_io->set_value();
+ exit_io = std::nullopt;
+ });
+ });
+ }
}
void ProtocolV2::fault(
if (likely(has_socket)) {
if (likely(is_socket_valid)) {
ceph_assert_always(state != state_t::READY);
- frame_assembler.shutdown_socket();
+ frame_assembler->shutdown_socket();
is_socket_valid = false;
} else {
ceph_assert_always(state != state_t::ESTABLISHING);
CEPH_MSGR2_REQUIRED_FEATURES,
CEPH_BANNER_V2_PREFIX);
INTERCEPT_CUSTOM(custom_bp_t::BANNER_WRITE, bp_type_t::WRITE);
- return frame_assembler.write_flush(std::move(bl)).then([this] {
+ return frame_assembler->write_flush(std::move(bl)).then([this] {
// 2. read peer banner
unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(ceph_le16);
INTERCEPT_CUSTOM(custom_bp_t::BANNER_READ, bp_type_t::READ);
- return frame_assembler.read_exactly(banner_len); // or read exactly?
+ return frame_assembler->read_exactly(banner_len); // or read exactly?
}).then([this] (auto bl) {
// 3. process peer banner and read banner_payload
unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
}
logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
INTERCEPT_CUSTOM(custom_bp_t::BANNER_PAYLOAD_READ, bp_type_t::READ);
- return frame_assembler.read(payload_len);
+ return frame_assembler->read(payload_len);
}).then([this, is_connect] (bufferlist bl) {
// 4. process peer banner_payload and send HelloFrame
auto p = bl.cbegin();
}
peer_supported_features = _peer_supported_features;
bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
- frame_assembler.set_is_rev1(is_rev1);
+ frame_assembler->set_is_rev1(is_rev1);
auto hello = HelloFrame::Encode(messenger.get_mytype(),
conn.target_addr);
logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
conn, ceph_entity_type_name(messenger.get_mytype()),
conn.target_addr);
- return frame_assembler.write_flush_frame(hello);
+ return frame_assembler->write_flush_frame(hello);
}).then([this] {
//5. read peer HelloFrame
- return frame_assembler.read_main_preamble();
+ return frame_assembler->read_main_preamble();
}).then([this](auto ret) {
expect_tag(Tag::HELLO, ret.tag, conn, __func__);
- return frame_assembler.read_frame_payload();
+ return frame_assembler->read_frame_payload();
}).then([this](auto payload) {
// 6. process peer HelloFrame
auto hello = HelloFrame::Decode(payload->back());
seastar::future<> ProtocolV2::handle_auth_reply()
{
- return frame_assembler.read_main_preamble(
+ return frame_assembler->read_main_preamble(
).then([this](auto ret) {
switch (ret.tag) {
case Tag::AUTH_BAD_METHOD:
- return frame_assembler.read_frame_payload(
+ return frame_assembler->read_frame_payload(
).then([this](auto payload) {
// handle_auth_bad_method() logic
auto bad_method = AuthBadMethodFrame::Decode(payload->back());
return client_auth(bad_method.allowed_methods());
});
case Tag::AUTH_REPLY_MORE:
- return frame_assembler.read_frame_payload(
+ return frame_assembler->read_frame_payload(
).then([this](auto payload) {
// handle_auth_reply_more() logic
auto auth_more = AuthReplyMoreFrame::Decode(payload->back());
auto more_reply = AuthRequestMoreFrame::Encode(reply);
logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}",
conn, reply.length());
- return frame_assembler.write_flush_frame(more_reply);
+ return frame_assembler->write_flush_frame(more_reply);
}).then([this] {
return handle_auth_reply();
});
case Tag::AUTH_DONE:
- return frame_assembler.read_frame_payload(
+ return frame_assembler->read_frame_payload(
).then([this](auto payload) {
// handle_auth_done() logic
auto auth_done = AuthDoneFrame::Decode(payload->back());
abort_in_fault();
}
auth_meta->con_mode = auth_done.con_mode();
- frame_assembler.create_session_stream_handlers(*auth_meta, false);
+ frame_assembler->create_session_stream_handlers(*auth_meta, false);
return finish_auth();
});
default: {
logger().debug("{} WRITE AuthRequestFrame: method={},"
" preferred_modes={}, payload_len={}",
conn, auth_method, preferred_modes, bl.length());
- return frame_assembler.write_flush_frame(frame).then([this] {
+ return frame_assembler->write_flush_frame(frame
+ ).then([this] {
return handle_auth_reply();
});
} catch (const crimson::auth::error& e) {
seastar::future<ProtocolV2::next_step_t>
ProtocolV2::process_wait()
{
- return frame_assembler.read_frame_payload(
+ return frame_assembler->read_frame_payload(
).then([this](auto payload) {
// handle_wait() logic
logger().debug("{} GOT WaitFrame", conn);
conn.policy.features_supported,
conn.policy.features_required | msgr2_required,
flags, client_cookie);
- return frame_assembler.write_flush_frame(client_ident).then([this] {
- return frame_assembler.read_main_preamble();
+ return frame_assembler->write_flush_frame(client_ident
+ ).then([this] {
+ return frame_assembler->read_main_preamble();
}).then([this](auto ret) {
switch (ret.tag) {
case Tag::IDENT_MISSING_FEATURES:
- return frame_assembler.read_frame_payload(
+ return frame_assembler->read_frame_payload(
).then([this](auto payload) {
// handle_ident_missing_features() logic
auto ident_missing = IdentMissingFeaturesFrame::Decode(payload->back());
case Tag::WAIT:
return process_wait();
case Tag::SERVER_IDENT:
- return frame_assembler.read_frame_payload(
+ return frame_assembler->read_frame_payload(
).then([this](auto payload) {
// handle_server_ident() logic
requeue_out_sent();
conn, messenger.get_myaddrs(),
client_cookie, server_cookie,
global_seq, connect_seq, get_in_seq());
- return frame_assembler.write_flush_frame(reconnect).then([this] {
- return frame_assembler.read_main_preamble();
+ return frame_assembler->write_flush_frame(reconnect).then([this] {
+ return frame_assembler->read_main_preamble();
}).then([this](auto ret) {
switch (ret.tag) {
case Tag::SESSION_RETRY_GLOBAL:
- return frame_assembler.read_frame_payload(
+ return frame_assembler->read_frame_payload(
).then([this](auto payload) {
// handle_session_retry_global() logic
auto retry = RetryGlobalFrame::Decode(payload->back());
return client_reconnect();
});
case Tag::SESSION_RETRY:
- return frame_assembler.read_frame_payload(
+ return frame_assembler->read_frame_payload(
).then([this](auto payload) {
// handle_session_retry() logic
auto retry = RetryFrame::Decode(payload->back());
return client_reconnect();
});
case Tag::SESSION_RESET:
- return frame_assembler.read_frame_payload(
+ return frame_assembler->read_frame_payload(
).then([this](auto payload) {
if (unlikely(state != state_t::CONNECTING)) {
logger().debug("{} triggered {} before reset_session()",
case Tag::WAIT:
return process_wait();
case Tag::SESSION_RECONNECT_OK:
- return frame_assembler.read_frame_payload(
+ return frame_assembler->read_frame_payload(
).then([this](auto payload) {
// handle_reconnect_ok() logic
auto reconnect_ok = ReconnectOkFrame::Decode(payload->back());
assert(server_cookie == 0);
logger().debug("{} UPDATE: gs={} for connect", conn, global_seq);
}
- return wait_io_exit_dispatching().then([this] {
+ return wait_exit_io().then([this] {
+ ceph_assert_always(frame_assembler);
if (unlikely(state != state_t::CONNECTING)) {
logger().debug("{} triggered {} before Socket::connect()",
conn, get_state_name(state));
});
}
if (!has_socket) {
- frame_assembler.set_socket(std::move(new_socket));
+ frame_assembler->set_socket(std::move(new_socket));
has_socket = true;
} else {
gate.dispatch_in_background(
"replace_socket_connecting",
*this,
[this, new_socket=std::move(new_socket)]() mutable {
- return frame_assembler.replace_shutdown_socket(std::move(new_socket));
+ return frame_assembler->replace_shutdown_socket(std::move(new_socket));
}
);
}
return seastar::now();
}).then([this] {
auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
- frame_assembler.reset_handlers();
- frame_assembler.start_recording();
+ frame_assembler->reset_handlers();
+ frame_assembler->start_recording();
return banner_exchange(true);
}).then([this] (auto&& ret) {
auto [_peer_type, _my_addr_from_peer] = std::move(ret);
conn, get_state_name(state));
abort_protocol();
}
- frame_assembler.learn_socket_ephemeral_port_as_connector(
+ frame_assembler->learn_socket_ephemeral_port_as_connector(
_my_addr_from_peer.get_port());
if (unlikely(_my_addr_from_peer.is_legacy())) {
logger().warn("{} peer sent a legacy address for me: {}",
case next_step_t::wait: {
logger().info("{} execute_connecting(): going to WAIT(max-backoff)", conn);
ceph_assert_always(is_socket_valid);
- frame_assembler.shutdown_socket();
+ frame_assembler->shutdown_socket();
is_socket_valid = false;
execute_wait(true);
break;
"allowed_methods={}, allowed_modes={})",
conn, auth_meta->auth_method, cpp_strerror(r),
allowed_methods, allowed_modes);
- return frame_assembler.write_flush_frame(bad_method).then([this] {
+ return frame_assembler->write_flush_frame(bad_method
+ ).then([this] {
return server_auth();
});
}
logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
conn, conn.peer_global_id,
ceph_con_mode_name(auth_meta->con_mode), reply.length());
- return frame_assembler.write_flush_frame(auth_done).then([this] {
+ return frame_assembler->write_flush_frame(auth_done
+ ).then([this] {
ceph_assert(auth_meta);
- frame_assembler.create_session_stream_handlers(*auth_meta, true);
+ frame_assembler->create_session_stream_handlers(*auth_meta, true);
return finish_auth();
});
}
auto more = AuthReplyMoreFrame::Encode(reply);
logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}",
conn, reply.length());
- return frame_assembler.write_flush_frame(more).then([this] {
- return frame_assembler.read_main_preamble();
+ return frame_assembler->write_flush_frame(more
+ ).then([this] {
+ return frame_assembler->read_main_preamble();
}).then([this](auto ret) {
expect_tag(Tag::AUTH_REQUEST_MORE, ret.tag, conn, __func__);
- return frame_assembler.read_frame_payload();
+ return frame_assembler->read_frame_payload();
}).then([this](auto payload) {
auto auth_more = AuthRequestMoreFrame::Decode(payload->back());
logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}",
seastar::future<> ProtocolV2::server_auth()
{
- return frame_assembler.read_main_preamble(
+ return frame_assembler->read_main_preamble(
).then([this](auto ret) {
expect_tag(Tag::AUTH_REQUEST, ret.tag, conn, __func__);
- return frame_assembler.read_frame_payload();
+ return frame_assembler->read_frame_payload();
}).then([this](auto payload) {
// handle_auth_request() logic
auto request = AuthRequestFrame::Decode(payload->back());
{
auto wait = WaitFrame::Encode();
logger().debug("{} WRITE WaitFrame", conn);
- return frame_assembler.write_flush_frame(wait).then([] {
+ return frame_assembler->write_flush_frame(wait
+ ).then([] {
return next_step_t::wait;
});
}
existing_proto->trigger_replacing(reconnect,
do_reset,
- frame_assembler.to_replace(),
+ frame_assembler->to_replace(),
std::move(auth_meta),
peer_global_seq,
client_cookie,
seastar::future<ProtocolV2::next_step_t>
ProtocolV2::server_connect()
{
- return frame_assembler.read_frame_payload(
+ return frame_assembler->read_frame_payload(
).then([this](auto payload) {
// handle_client_ident() logic
auto client_ident = ClientIdentFrame::Decode(payload->back());
auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing);
logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)",
conn, feat_missing);
- return frame_assembler.write_flush_frame(ident_missing_features).then([] {
+ return frame_assembler->write_flush_frame(ident_missing_features
+ ).then([] {
return next_step_t::wait;
});
}
seastar::future<ProtocolV2::next_step_t>
ProtocolV2::read_reconnect()
{
- return frame_assembler.read_main_preamble(
+ return frame_assembler->read_main_preamble(
).then([this](auto ret) {
expect_tag(Tag::SESSION_RECONNECT, ret.tag, conn, "read_reconnect");
return server_reconnect();
{
auto retry = RetryFrame::Encode(connect_seq);
logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq);
- return frame_assembler.write_flush_frame(retry).then([this] {
+ return frame_assembler->write_flush_frame(retry
+ ).then([this] {
return read_reconnect();
});
}
{
auto retry = RetryGlobalFrame::Encode(global_seq);
logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq);
- return frame_assembler.write_flush_frame(retry).then([this] {
+ return frame_assembler->write_flush_frame(retry
+ ).then([this] {
return read_reconnect();
});
}
{
auto reset = ResetFrame::Encode(full);
logger().warn("{} WRITE ResetFrame: full={}", conn, full);
- return frame_assembler.write_flush_frame(reset).then([this] {
- return frame_assembler.read_main_preamble();
+ return frame_assembler->write_flush_frame(reset
+ ).then([this] {
+ return frame_assembler->read_main_preamble();
}).then([this](auto ret) {
expect_tag(Tag::CLIENT_IDENT, ret.tag, conn, "post_send_reset");
return server_connect();
seastar::future<ProtocolV2::next_step_t>
ProtocolV2::server_reconnect()
{
- return frame_assembler.read_frame_payload(
+ return frame_assembler->read_frame_payload(
).then([this](auto payload) {
// handle_reconnect() logic
auto reconnect = ReconnectFrame::Decode(payload->back());
return seastar::futurize_invoke([this] {
INTERCEPT_N_RW(custom_bp_t::SOCKET_ACCEPTED);
auth_meta = seastar::make_lw_shared<AuthConnectionMeta>();
- frame_assembler.reset_handlers();
- frame_assembler.start_recording();
+ frame_assembler->reset_handlers();
+ frame_assembler->start_recording();
return banner_exchange(false);
}).then([this] (auto&& ret) {
auto [_peer_type, _my_addr_from_peer] = std::move(ret);
messenger.learned_addr(_my_addr_from_peer, conn);
return server_auth();
}).then([this] {
- return frame_assembler.read_main_preamble();
+ return frame_assembler->read_main_preamble();
}).then([this](auto ret) {
switch (ret.tag) {
case Tag::CLIENT_IDENT:
{
ceph_assert(auth_meta);
- auto records = frame_assembler.stop_recording();
+ auto records = frame_assembler->stop_recording();
const auto sig = auth_meta->session_key.empty() ? sha256_digest_t() :
auth_meta->session_key.hmac_sha256(nullptr, records.rxbuf);
auto sig_frame = AuthSignatureFrame::Encode(sig);
logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn, sig);
- return frame_assembler.write_flush_frame(sig_frame).then([this] {
- return frame_assembler.read_main_preamble();
+ return frame_assembler->write_flush_frame(sig_frame
+ ).then([this] {
+ return frame_assembler->read_main_preamble();
}).then([this](auto ret) {
expect_tag(Tag::AUTH_SIGNATURE, ret.tag, conn, "post_finish_auth");
- return frame_assembler.read_frame_payload();
+ return frame_assembler->read_frame_payload();
}).then([this, txbuf=std::move(records.txbuf)](auto payload) {
// handle_auth_signature() logic
auto sig_frame = AuthSignatureFrame::Decode(payload->back());
conn.policy.features_required | msgr2_required,
flags, server_cookie);
- return frame_assembler.write_flush_frame(server_ident);
+ return frame_assembler->write_flush_frame(server_ident);
}
// REPLACING state
ceph_assert_always(has_socket);
ceph_assert_always(!mover.socket->is_shutdown());
if (is_socket_valid) {
- frame_assembler.shutdown_socket();
+ frame_assembler->shutdown_socket();
is_socket_valid = false;
}
gate.dispatch_in_background("trigger_replacing", *this,
dispatchers.ms_handle_accept(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
// state may become CLOSING, close mover.socket and abort later
- return wait_io_exit_dispatching(
+ return wait_exit_io(
).then([this] {
+ ceph_assert_always(frame_assembler);
protocol_timer.cancel();
auto done = std::move(execution_done);
execution_done = seastar::now();
"replace_frame_assembler",
*this,
[this, mover=std::move(mover)]() mutable {
- return frame_assembler.replace_by(std::move(mover));
+ return frame_assembler->replace_by(std::move(mover));
}
);
is_socket_valid = true;
requeue_out_sent_up_to(new_msg_seq);
auto reconnect_ok = ReconnectOkFrame::Encode(get_in_seq());
logger().debug("{} WRITE ReconnectOkFrame: msg_seq={}", conn, get_in_seq());
- return frame_assembler.write_flush_frame(reconnect_ok);
+ return frame_assembler->write_flush_frame(reconnect_ok);
} else {
client_cookie = new_client_cookie;
assert(conn.get_peer_type() == new_peer_name.type());
conn.set_features(new_conn_features);
peer_supported_features = new_peer_supported_features;
bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
- frame_assembler.set_is_rev1(is_rev1);
+ frame_assembler->set_is_rev1(is_rev1);
return send_server_ident();
}
}).then([this, reconnect] {
assert(conn.policy.lossy || (client_cookie != 0 && server_cookie != 0));
protocol_timer.cancel();
ceph_assert_always(is_socket_valid);
- // I'm not responsible to shutdown the socket at READY
- is_socket_valid = false;
trigger_state(state_t::READY, out_state_t::open, false);
}
ceph_assert_always(is_socket_valid);
trigger_state(state_t::SERVER_WAIT, out_state_t::none, false);
gated_execute("execute_server_wait", [this] {
- return frame_assembler.read_exactly(1).then([this] (auto bl) {
+ return frame_assembler->read_exactly(1
+ ).then([this](auto bl) {
logger().warn("{} SERVER_WAIT got read, abort", conn);
abort_in_fault();
}).handle_exception([this] (std::exception_ptr eptr) {
(*f_accept_new)();
}
if (is_socket_valid) {
- frame_assembler.shutdown_socket();
+ frame_assembler->shutdown_socket();
is_socket_valid = false;
}
assert(!gate.is_closed());
closed_clean_fut = seastar::when_all(
std::move(gate_closed), std::move(out_closed)
).discard_result().then([this] {
+ ceph_assert_always(!exit_io.has_value());
if (has_socket) {
- return frame_assembler.close_shutdown_socket();
+ ceph_assert_always(frame_assembler);
+ return frame_assembler->close_shutdown_socket();
} else {
return seastar::now();
}