logger().info("{} ProtocolV2::start_accept(): target_addr={}", conn, _peer_addr);
messenger.accept_conn(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+
+ auto cc_seq = crosscore.prepare_submit();
+ gate.dispatch_in_background("set_accepted_sid", conn, [this, cc_seq] {
+ return io_handler.set_accepted_sid(
+ cc_seq,
+ frame_assembler->get_socket_shard_id(),
+ seastar::make_foreign(conn.shared_from_this()));
+ });
+
execute_accepting();
}
});
}
+seastar::future<> IOHandler::set_accepted_sid(
+ crosscore_t::seq_t cc_seq,
+ seastar::shard_id sid,
+ ConnectionFRef conn_fref)
+{
+ assert(seastar::this_shard_id() == get_shard_id());
+ assert(get_io_state() == io_state_t::none);
+ ceph_assert_always(conn_ref);
+ conn_ref.reset();
+ assert(maybe_prv_shard_states == nullptr);
+ shard_states.reset();
+ shard_states = shard_states_t::create(sid, io_state_t::none);
+ return seastar::smp::submit_to(sid,
+ [this, cc_seq, conn_fref=std::move(conn_fref)]() mutable {
+ // must be the first to proceed
+ ceph_assert_always(crosscore.proceed_or_wait(cc_seq));
+
+ logger().debug("{} set accepted sid", conn);
+ ceph_assert_always(seastar::this_shard_id() == get_shard_id());
+ ceph_assert_always(get_io_state() == io_state_t::none);
+ assert(maybe_prv_shard_states == nullptr);
+ ceph_assert_always(!conn_ref);
+ conn_ref = make_local_shared_foreign(std::move(conn_fref));
+ });
+}
+
void IOHandler::dispatch_reset(bool is_replace)
{
ceph_assert_always(get_io_state() == io_state_t::drop);
};
void print_io_stat(std::ostream &out) const;
+ seastar::future<> set_accepted_sid(
+ crosscore_t::seq_t cc_seq,
+ seastar::shard_id sid,
+ ConnectionFRef conn_fref);
+
/*
* may be called cross-core
*/