void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
const entity_name_t& _peer_name)
{
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
ceph_assert(state == state_t::NONE);
ceph_assert(!gate.is_closed());
conn.peer_addr = _peer_addr;
void ProtocolV2::start_accept(SocketFRef&& new_socket,
const entity_addr_t& _peer_addr)
{
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
ceph_assert(state == state_t::NONE);
// until we know better
conn.target_addr = _peer_addr;
// handle_session_reset() logic
auto reset = ResetFrame::Decode(payload->back());
logger().warn("{} GOT ResetFrame: full={}", conn, reset.full());
+
reset_session(reset.full());
+ // user can make changes
+
return client_connect();
});
case Tag::WAIT:
new_connect_seq, new_msg_seq] () mutable {
if (state == state_t::REPLACING && do_reset) {
reset_session(true);
+ // user can make changes
}
if (unlikely(state != state_t::REPLACING)) {
+ logger().debug("{} triggered {} in the middle of trigger_replacing(), abort",
+ conn, get_state_name(state));
ceph_assert_always(state == state_t::CLOSING);
return mover.socket->close(
).then([sock = std::move(mover.socket)] {
std::exception_ptr eptr,
io_handler_state _io_states)
{
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
io_states = _io_states;
+ logger().debug("{} got notify_out_fault(): io_states={}", conn, io_states);
fault(state_t::READY, where, eptr);
}
void ProtocolV2::notify_out()
{
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+ logger().debug("{} got notify_out(): at {}", conn, get_state_name(state));
io_states.is_out_queued = true;
if (unlikely(state == state_t::STANDBY && !conn.policy.server)) {
logger().info("{} notify_out(): at {}, going to CONNECTING",
void ProtocolV2::notify_mark_down()
{
+ assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+ logger().debug("{} got notify_mark_down()", conn);
do_close(false);
}
void IOHandler::reset_session(bool full)
{
+ assert(seastar::this_shard_id() == get_shard_id());
+ logger().debug("{} got reset_session({})", conn, full);
assert(get_io_state() != io_state_t::open);
reset_in();
if (full) {
void IOHandler::reset_peer_state()
{
+ assert(seastar::this_shard_id() == get_shard_id());
+ logger().debug("{} got reset_peer_state()", conn);
assert(get_io_state() != io_state_t::open);
reset_in();
requeue_out_sent_up_to(0);
void IOHandler::requeue_out_sent()
{
+ assert(seastar::this_shard_id() == get_shard_id());
assert(get_io_state() != io_state_t::open);
if (out_sent_msgs.empty()) {
return;
void IOHandler::requeue_out_sent_up_to(seq_num_t seq)
{
+ assert(seastar::this_shard_id() == get_shard_id());
assert(get_io_state() != io_state_t::open);
if (out_sent_msgs.empty() && out_pending_msgs.empty()) {
logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
ceph_assert_always(conn_ref);
auto _conn_ref = conn_ref;
auto fut = to_new_sid(new_sid, std::move(conn_fref));
+
dispatchers.ms_handle_accept(_conn_ref, new_sid);
+ // user can make changes
+
return fut;
}
ceph_assert_always(conn_ref);
auto _conn_ref = conn_ref;
auto fut = to_new_sid(new_sid, std::move(conn_fref));
+
dispatchers.ms_handle_connect(_conn_ref, new_sid);
+ // user can make changes
+
return fut;
}
}
need_dispatch_reset = false;
ceph_assert_always(conn_ref);
+
dispatchers.ms_handle_reset(conn_ref, is_replace);
+ // user can make changes
}
void IOHandler::dispatch_remote_reset()
return;
}
ceph_assert_always(conn_ref);
+
dispatchers.ms_handle_remote_reset(conn_ref);
+ // user can make changes
}
void IOHandler::ack_out_sent(seq_num_t seq)
void IOHandler::maybe_notify_out_dispatch()
{
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
if (is_out_queued()) {
notify_out_dispatch();
}
void IOHandler::notify_out_dispatch()
{
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
assert(is_out_queued());
if (need_notify_out) {
logger().debug("{} send notify_out()", conn);
assert(ctx.get_io_state() == io_state_t::open);
assert(get_io_state() == io_state_t::open);
ceph_assert_always(conn_ref);
+
// throttle the reading process by the returned future
return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
+ // user can make changes
});
}