From c67d0ce9c909bb8a9612ffb5fdf2d495a639da57 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Mon, 3 Apr 2023 10:00:11 +0800 Subject: [PATCH] crimson/net: change ConnectionRef to be a local_shared_foreign_ptr Make it possible for connections and messages to be dispatched in arbitrary core without asynchronous foreign copy. The local_shared_foreign_ptr conn cannot be moved to another core implicitly, maintain it outside Message independently. Do asynchronous foreign copy to the new ConnectionRef only in with_remote_shard_state_and_op(). Signed-off-by: Yingxin Cheng (cherry picked from commit b54153ecaddda890401d12d6c5ceadb0030df4f1) --- src/crimson/net/Fwd.h | 8 +- src/crimson/net/Interceptor.h | 10 +- src/crimson/net/ProtocolV2.cc | 2 +- src/crimson/net/SocketConnection.cc | 13 +- src/crimson/net/SocketConnection.h | 3 +- src/crimson/net/SocketMessenger.cc | 4 +- src/crimson/net/io_handler.cc | 20 +-- src/crimson/net/io_handler.h | 7 + src/crimson/osd/ops_executer.cc | 159 +++++++++--------- src/crimson/osd/ops_executer.h | 8 +- .../osd/osd_operations/client_request.cc | 2 +- .../osd/osd_operations/client_request.h | 17 +- .../osd/osd_operations/logmissing_request.cc | 2 +- .../osd/osd_operations/logmissing_request.h | 17 +- .../osd_operations/logmissing_request_reply.h | 17 +- .../osd/osd_operations/peering_event.h | 17 +- .../osd/osd_operations/recovery_subrequest.cc | 2 +- .../osd/osd_operations/recovery_subrequest.h | 17 +- .../osd/osd_operations/replicated_request.h | 17 +- src/crimson/osd/pg.cc | 18 +- src/crimson/osd/pg.h | 12 +- src/crimson/osd/pg_shard_manager.h | 51 ++++-- src/crimson/osd/recovery_backend.cc | 29 ++-- src/crimson/osd/recovery_backend.h | 16 +- .../osd/replicated_recovery_backend.cc | 6 +- src/crimson/osd/replicated_recovery_backend.h | 3 +- src/crimson/osd/watch.cc | 4 +- src/crimson/osd/watch.h | 12 +- src/msg/Message.cc | 4 + src/msg/Message.h | 16 +- src/test/crimson/test_messenger.cc | 35 ++-- src/test/crimson/test_messenger_thrash.cc | 21 +-- 32 files changed, 372 insertions(+), 197 deletions(-) diff --git a/src/crimson/net/Fwd.h b/src/crimson/net/Fwd.h index 7ccd3fe35dd03..3eb57ef978130 100644 --- a/src/crimson/net/Fwd.h +++ b/src/crimson/net/Fwd.h @@ -25,6 +25,7 @@ #include "msg/msg_types.h" #include "crimson/common/errorator.h" +#include "crimson/common/local_shared_foreign_ptr.h" class AuthConnectionMeta; @@ -34,8 +35,11 @@ using msgr_tag_t = uint8_t; using stop_t = seastar::stop_iteration; class Connection; -using ConnectionRef = seastar::shared_ptr; -using ConnectionFRef = seastar::foreign_ptr; +using ConnectionLRef = seastar::shared_ptr; +using ConnectionFRef = seastar::foreign_ptr; +using ConnectionRef = ::crimson::local_shared_foreign_ptr; + +class SocketConnection; class Dispatcher; class ChainedDispatchers; diff --git a/src/crimson/net/Interceptor.h b/src/crimson/net/Interceptor.h index cc4dc77d6c51c..41ec31f375576 100644 --- a/src/crimson/net/Interceptor.h +++ b/src/crimson/net/Interceptor.h @@ -116,11 +116,11 @@ struct Breakpoint { struct Interceptor { socket_blocker blocker; virtual ~Interceptor() {} - virtual void register_conn(Connection& conn) = 0; - virtual void register_conn_ready(Connection& conn) = 0; - virtual void register_conn_closed(Connection& conn) = 0; - virtual void register_conn_replaced(Connection& conn) = 0; - virtual bp_action_t intercept(Connection& conn, Breakpoint bp) = 0; + virtual void register_conn(SocketConnection& conn) = 0; + virtual void register_conn_ready(SocketConnection& conn) = 0; + virtual void register_conn_closed(SocketConnection& conn) = 0; + virtual void register_conn_replaced(SocketConnection& conn) = 0; + virtual bp_action_t intercept(SocketConnection& conn, Breakpoint bp) = 0; }; } // namespace crimson::net diff --git a/src/crimson/net/ProtocolV2.cc b/src/crimson/net/ProtocolV2.cc index edaee1075df4c..95b756637f4e8 100644 --- a/src/crimson/net/ProtocolV2.cc +++ b/src/crimson/net/ProtocolV2.cc @@ -107,7 +107,7 @@ namespace crimson::net { // should be consistent to intercept_frame() in FrameAssemblerV2.cc void intercept(Breakpoint bp, bp_type_t type, - Connection& conn, + SocketConnection& conn, Interceptor *interceptor, SocketRef& socket) { if (interceptor) { diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index aa7fcc027790d..38e2748738f7b 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -22,9 +22,10 @@ #endif using std::ostream; -using namespace crimson::net; using crimson::common::local_conf; +namespace crimson::net { + SocketConnection::SocketConnection(SocketMessenger& messenger, ChainedDispatchers& dispatchers) : core(messenger.shard_id()), @@ -137,6 +138,14 @@ seastar::socket_address SocketConnection::get_local_address() const { return socket->get_local_address(); } +ConnectionRef +SocketConnection::get_local_shared_foreign_from_this() +{ + assert(seastar::this_shard_id() == shard_id()); + return make_local_shared_foreign( + seastar::make_foreign(shared_from_this())); +} + void SocketConnection::print(ostream& out) const { out << (void*)this << " "; messenger.print(out); @@ -150,3 +159,5 @@ void SocketConnection::print(ostream& out) const { << " >> " << get_peer_name() << " " << peer_addr; } } + +} // namespace crimson::net diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index f6ef6f4975331..aa791b6e17016 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -25,7 +25,6 @@ namespace crimson::net { class ProtocolV2; class SocketMessenger; -class SocketConnection; using SocketConnectionRef = seastar::shared_ptr; #ifdef UNIT_TESTS_BUILT @@ -166,6 +165,8 @@ class SocketConnection : public Connection { return messenger; } + ConnectionRef get_local_shared_foreign_from_this(); + private: seastar::shard_id shard_id() const; diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index da89424831522..a112b50800d4a 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -239,12 +239,12 @@ SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_name_t& pe if (auto found = lookup_conn(peer_addr); found) { logger().debug("{} connect to existing", *found); - return found->shared_from_this(); + return found->get_local_shared_foreign_from_this(); } SocketConnectionRef conn = seastar::make_shared(*this, dispatchers); conn->start_connect(peer_addr, peer_name); - return conn->shared_from_this(); + return conn->get_local_shared_foreign_from_this(); } seastar::future<> SocketMessenger::shutdown() diff --git a/src/crimson/net/io_handler.cc b/src/crimson/net/io_handler.cc index de296e64e23a3..80d578363282e 100644 --- a/src/crimson/net/io_handler.cc +++ b/src/crimson/net/io_handler.cc @@ -48,7 +48,8 @@ namespace crimson::net { IOHandler::IOHandler(ChainedDispatchers &dispatchers, SocketConnection &conn) : dispatchers(dispatchers), - conn(conn) + conn(conn), + conn_ref(conn.get_local_shared_foreign_from_this()) {} IOHandler::~IOHandler() @@ -318,8 +319,7 @@ void IOHandler::dispatch_accept() // protocol_is_connected can be from true to true here if the replacing is // happening to a connected connection. protocol_is_connected = true; - dispatchers.ms_handle_accept( - seastar::static_pointer_cast(conn.shared_from_this())); + dispatchers.ms_handle_accept(conn_ref); } void IOHandler::dispatch_connect() @@ -329,8 +329,7 @@ void IOHandler::dispatch_connect() } ceph_assert_always(protocol_is_connected == false); protocol_is_connected = true; - dispatchers.ms_handle_connect( - seastar::static_pointer_cast(conn.shared_from_this())); + dispatchers.ms_handle_connect(conn_ref); } void IOHandler::dispatch_reset(bool is_replace) @@ -340,9 +339,7 @@ void IOHandler::dispatch_reset(bool is_replace) return; } need_dispatch_reset = false; - dispatchers.ms_handle_reset( - seastar::static_pointer_cast(conn.shared_from_this()), - is_replace); + dispatchers.ms_handle_reset(conn_ref, is_replace); } void IOHandler::dispatch_remote_reset() @@ -350,8 +347,7 @@ void IOHandler::dispatch_remote_reset() if (io_state == io_state_t::drop) { return; } - dispatchers.ms_handle_remote_reset( - seastar::static_pointer_cast(conn.shared_from_this())); + dispatchers.ms_handle_remote_reset(conn_ref); } void IOHandler::ack_out_sent(seq_num_t seq) @@ -545,10 +541,8 @@ IOHandler::read_message(utime_t throttle_stamp, std::size_t msg_size) ceph_msg_footer footer{ceph_le32(0), ceph_le32(0), ceph_le32(0), ceph_le64(0), current_header.flags}; - auto conn_ref = seastar::static_pointer_cast( - conn.shared_from_this()); Message *message = decode_message(nullptr, 0, header, footer, - msg_frame.front(), msg_frame.middle(), msg_frame.data(), conn_ref); + msg_frame.front(), msg_frame.middle(), msg_frame.data(), nullptr); if (!message) { logger().warn("{} decode message failed", conn); abort_in_fault(); diff --git a/src/crimson/net/io_handler.h b/src/crimson/net/io_handler.h index e86667a04e5ee..e04b6356e8674 100644 --- a/src/crimson/net/io_handler.h +++ b/src/crimson/net/io_handler.h @@ -107,6 +107,10 @@ public: if (is_dispatch_reset) { dispatch_reset(is_replace); } + + ceph_assert_always(conn_ref); + conn_ref.reset(); + assert(!gate.is_closed()); return gate.close(); } @@ -183,6 +187,9 @@ private: SocketConnection &conn; + // core local reference for dispatching, valid until reset/close + ConnectionRef conn_ref; + HandshakeListener *handshake_listener = nullptr; crimson::common::Gated gate; diff --git a/src/crimson/osd/ops_executer.cc b/src/crimson/osd/ops_executer.cc index e5bfd839f7a0e..040870203bd95 100644 --- a/src/crimson/osd/ops_executer.cc +++ b/src/crimson/osd/ops_executer.cc @@ -138,7 +138,8 @@ OpsExecuter::call_ierrorator::future<> OpsExecuter::do_op_call(OSDOp& osd_op) } static watch_info_t create_watch_info(const OSDOp& osd_op, - const OpsExecuter::ExecutableMessage& msg) + const OpsExecuter::ExecutableMessage& msg, + entity_addr_t peer_addr) { using crimson::common::local_conf; const uint32_t timeout = @@ -147,7 +148,7 @@ static watch_info_t create_watch_info(const OSDOp& osd_op, return { osd_op.op.watch.cookie, timeout, - msg.get_connection()->get_peer_addr() + peer_addr }; } @@ -159,48 +160,47 @@ OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_watch( logger().debug("{}", __func__); struct connect_ctx_t { ObjectContext::watch_key_t key; - crimson::net::ConnectionFRef conn; + crimson::net::ConnectionRef conn; watch_info_t info; connect_ctx_t( const OSDOp& osd_op, const ExecutableMessage& msg, - crimson::net::ConnectionFRef conn) + crimson::net::ConnectionRef conn) : key(osd_op.op.watch.cookie, msg.get_reqid().name), - conn(std::move(conn)), - info(create_watch_info(osd_op, msg)) { + conn(conn), + info(create_watch_info(osd_op, msg, conn->get_peer_addr())) { } }; - return get_message().get_connection().copy( - ).then([&, this](auto &&conn) { - return with_effect_on_obc( - connect_ctx_t{ osd_op, get_message(), std::move(conn) }, - [&] (auto& ctx) { - const auto& entity = ctx.key.second; - auto [it, emplaced] = - os.oi.watchers.try_emplace(ctx.key, std::move(ctx.info)); - if (emplaced) { - logger().info("registered new watch {} by {}", it->second, entity); - txn.nop(); - } else { - logger().info("found existing watch {} by {}", it->second, entity); - } - return seastar::now(); - }, - [] (auto&& ctx, ObjectContextRef obc, Ref pg) { - assert(pg); - auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr); - if (emplaced) { - const auto& [cookie, entity] = ctx.key; - it->second = crimson::osd::Watch::create( - obc, ctx.info, entity, std::move(pg)); - logger().info("op_effect: added new watcher: {}", ctx.key); - } else { - logger().info("op_effect: found existing watcher: {}", ctx.key); - } - return it->second->connect(std::move(ctx.conn), true /* will_ping */); - }); - }); + + return with_effect_on_obc( + connect_ctx_t{ osd_op, get_message(), conn }, + [&](auto& ctx) { + const auto& entity = ctx.key.second; + auto [it, emplaced] = + os.oi.watchers.try_emplace(ctx.key, std::move(ctx.info)); + if (emplaced) { + logger().info("registered new watch {} by {}", it->second, entity); + txn.nop(); + } else { + logger().info("found existing watch {} by {}", it->second, entity); + } + return seastar::now(); + }, + [](auto&& ctx, ObjectContextRef obc, Ref pg) { + assert(pg); + auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr); + if (emplaced) { + const auto& [cookie, entity] = ctx.key; + it->second = crimson::osd::Watch::create( + obc, ctx.info, entity, std::move(pg)); + logger().info("op_effect: added new watcher: {}", ctx.key); + } else { + logger().info("op_effect: found existing watcher: {}", ctx.key); + } + return it->second->connect(std::move(ctx.conn), true /* will_ping */); + } + ); } OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_reconnect( @@ -323,57 +323,56 @@ OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_notify( return crimson::ct_error::enoent::make(); } struct notify_ctx_t { - crimson::net::ConnectionFRef conn; + crimson::net::ConnectionRef conn; notify_info_t ninfo; const uint64_t client_gid; const epoch_t epoch; - notify_ctx_t(const ExecutableMessage& msg, crimson::net::ConnectionFRef conn) - : conn(std::move(conn)), + notify_ctx_t(const ExecutableMessage& msg, + crimson::net::ConnectionRef conn) + : conn(conn), client_gid(msg.get_reqid().name.num()), epoch(msg.get_map_epoch()) { } }; - return get_message().get_connection().copy( - ).then([&, this](auto &&conn) { - return with_effect_on_obc( - notify_ctx_t{ get_message(), std::move(conn) }, - [&] (auto& ctx) { - try { - auto bp = osd_op.indata.cbegin(); - uint32_t ver; // obsolete - ceph::decode(ver, bp); - ceph::decode(ctx.ninfo.timeout, bp); - ceph::decode(ctx.ninfo.bl, bp); - } catch (const buffer::error&) { - ctx.ninfo.timeout = 0; - } - if (!ctx.ninfo.timeout) { - using crimson::common::local_conf; - ctx.ninfo.timeout = local_conf()->osd_default_notify_timeout; - } - ctx.ninfo.notify_id = get_next_notify_id(ctx.epoch); - ctx.ninfo.cookie = osd_op.op.notify.cookie; - // return our unique notify id to the client - ceph::encode(ctx.ninfo.notify_id, osd_op.outdata); - return seastar::now(); - }, - [] (auto&& ctx, ObjectContextRef obc, Ref) { - auto alive_watchers = obc->watchers | boost::adaptors::map_values - | boost::adaptors::filtered( - [] (const auto& w) { - // FIXME: filter as for the `is_ping` in `Watch::start_notify` - return w->is_alive(); - }); - return crimson::osd::Notify::create_n_propagate( - std::begin(alive_watchers), - std::end(alive_watchers), - std::move(ctx.conn), - ctx.ninfo, - ctx.client_gid, - obc->obs.oi.user_version); - }); - }); + return with_effect_on_obc( + notify_ctx_t{ get_message(), conn }, + [&](auto& ctx) { + try { + auto bp = osd_op.indata.cbegin(); + uint32_t ver; // obsolete + ceph::decode(ver, bp); + ceph::decode(ctx.ninfo.timeout, bp); + ceph::decode(ctx.ninfo.bl, bp); + } catch (const buffer::error&) { + ctx.ninfo.timeout = 0; + } + if (!ctx.ninfo.timeout) { + using crimson::common::local_conf; + ctx.ninfo.timeout = local_conf()->osd_default_notify_timeout; + } + ctx.ninfo.notify_id = get_next_notify_id(ctx.epoch); + ctx.ninfo.cookie = osd_op.op.notify.cookie; + // return our unique notify id to the client + ceph::encode(ctx.ninfo.notify_id, osd_op.outdata); + return seastar::now(); + }, + [](auto&& ctx, ObjectContextRef obc, Ref) { + auto alive_watchers = obc->watchers | boost::adaptors::map_values + | boost::adaptors::filtered( + [] (const auto& w) { + // FIXME: filter as for the `is_ping` in `Watch::start_notify` + return w->is_alive(); + }); + return crimson::osd::Notify::create_n_propagate( + std::begin(alive_watchers), + std::end(alive_watchers), + std::move(ctx.conn), + ctx.ninfo, + ctx.client_gid, + obc->obs.oi.user_version); + } + ); } OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_list_watchers( @@ -1046,11 +1045,13 @@ OpsExecuter::OpsExecuter(Ref pg, ObjectContextRef _obc, const OpInfo& op_info, abstracted_msg_t&& msg, + crimson::net::ConnectionRef conn, const SnapContext& _snapc) : pg(std::move(pg)), obc(std::move(_obc)), op_info(op_info), msg(std::move(msg)), + conn(conn), snapc(_snapc) { if (op_info.may_write() && should_clone(*obc, snapc)) { diff --git a/src/crimson/osd/ops_executer.h b/src/crimson/osd/ops_executer.h index 14b2881d695f8..697adffdb2184 100644 --- a/src/crimson/osd/ops_executer.h +++ b/src/crimson/osd/ops_executer.h @@ -100,7 +100,6 @@ public: // with other message types than just the `MOSDOp`. The type erasure // happens in the ctor of `OpsExecuter`. struct ExecutableMessage { - virtual const crimson::net::ConnectionFRef &get_connection() const = 0; virtual osd_reqid_t get_reqid() const = 0; virtual utime_t get_mtime() const = 0; virtual epoch_t get_map_epoch() const = 0; @@ -115,9 +114,6 @@ public: public: ExecutableMessagePimpl(const ImplT* pimpl) : pimpl(pimpl) { } - const crimson::net::ConnectionFRef &get_connection() const final { - return pimpl->get_connection(); - } osd_reqid_t get_reqid() const final { return pimpl->get_reqid(); } @@ -177,6 +173,7 @@ private: ceph::static_ptr)>; abstracted_msg_t msg; + crimson::net::ConnectionRef conn; std::optional osd_op_params; bool user_modify = false; ceph::os::Transaction txn; @@ -363,6 +360,7 @@ private: ObjectContextRef obc, const OpInfo& op_info, abstracted_msg_t&& msg, + crimson::net::ConnectionRef conn, const SnapContext& snapc); public: @@ -371,6 +369,7 @@ public: ObjectContextRef obc, const OpInfo& op_info, const MsgT& msg, + crimson::net::ConnectionRef conn, const SnapContext& snapc) : OpsExecuter( std::move(pg), @@ -379,6 +378,7 @@ public: abstracted_msg_t{ std::in_place_type_t>{}, &msg}, + conn, snapc) { } diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index 298eac501dffd..d23a82ab05243 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -308,7 +308,7 @@ ClientRequest::do_process( __func__, m->get_hobj()); } } - return pg->do_osd_ops(m, obc, op_info, snapc).safe_then_unpack_interruptible( + return pg->do_osd_ops(m, conn, obc, op_info, snapc).safe_then_unpack_interruptible( [this, pg, &ihref](auto submitted, auto all_completed) mutable { return submitted.then_interruptible([this, pg, &ihref] { return ihref.enter_stage(pp(*pg).wait_repop, *this); diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h index e7620b6af96af..4338ac4169759 100644 --- a/src/crimson/osd/osd_operations/client_request.h +++ b/src/crimson/osd/osd_operations/client_request.h @@ -32,7 +32,7 @@ class ClientRequest final : public PhasedOperationT, // used by put_historic ShardServices *put_historic_shard_services = nullptr; - crimson::net::ConnectionFRef conn; + crimson::net::ConnectionRef conn; // must be after conn due to ConnectionPipeline's life-time Ref m; OpInfo op_info; @@ -202,10 +202,23 @@ public: spg_t get_pgid() const { return m->get_spg(); } - ConnectionPipeline &get_connection_pipeline(); PipelineHandle &get_handle() { return instance_handle->handle; } epoch_t get_epoch() const { return m->get_min_epoch(); } + ConnectionPipeline &get_connection_pipeline(); + seastar::future prepare_remote_submission() { + assert(conn); + return conn.get_foreign( + ).then([this](auto f_conn) { + conn.reset(); + return f_conn; + }); + } + void finish_remote_submission(crimson::net::ConnectionFRef _conn) { + assert(!conn); + conn = make_local_shared_foreign(std::move(_conn)); + } + seastar::future<> with_pg_int( ShardServices &shard_services, Ref pg); diff --git a/src/crimson/osd/osd_operations/logmissing_request.cc b/src/crimson/osd/osd_operations/logmissing_request.cc index eb89acdd00e6e..5dfb290f945c3 100644 --- a/src/crimson/osd/osd_operations/logmissing_request.cc +++ b/src/crimson/osd/osd_operations/logmissing_request.cc @@ -61,7 +61,7 @@ seastar::future<> LogMissingRequest::with_pg( IRef ref = this; return interruptor::with_interruption([this, pg] { - return pg->do_update_log_missing(req); + return pg->do_update_log_missing(req, conn); }, [ref](std::exception_ptr) { return seastar::now(); }, pg); } diff --git a/src/crimson/osd/osd_operations/logmissing_request.h b/src/crimson/osd/osd_operations/logmissing_request.h index dc3cdfb01e4a9..4ab87996f3afb 100644 --- a/src/crimson/osd/osd_operations/logmissing_request.h +++ b/src/crimson/osd/osd_operations/logmissing_request.h @@ -34,10 +34,23 @@ public: spg_t get_pgid() const { return req->get_spg(); } - ConnectionPipeline &get_connection_pipeline(); PipelineHandle &get_handle() { return handle; } epoch_t get_epoch() const { return req->get_min_epoch(); } + ConnectionPipeline &get_connection_pipeline(); + seastar::future prepare_remote_submission() { + assert(conn); + return conn.get_foreign( + ).then([this](auto f_conn) { + conn.reset(); + return f_conn; + }); + } + void finish_remote_submission(crimson::net::ConnectionFRef _conn) { + assert(!conn); + conn = make_local_shared_foreign(std::move(_conn)); + } + seastar::future<> with_pg( ShardServices &shard_services, Ref pg); @@ -53,7 +66,7 @@ public: private: ClientRequest::PGPipeline &pp(PG &pg); - crimson::net::ConnectionFRef conn; + crimson::net::ConnectionRef conn; // must be after `conn` to ensure the ConnectionPipeline's is alive PipelineHandle handle; Ref req; diff --git a/src/crimson/osd/osd_operations/logmissing_request_reply.h b/src/crimson/osd/osd_operations/logmissing_request_reply.h index dbbce3294d9af..cb39e9f6c2b42 100644 --- a/src/crimson/osd/osd_operations/logmissing_request_reply.h +++ b/src/crimson/osd/osd_operations/logmissing_request_reply.h @@ -34,10 +34,23 @@ public: spg_t get_pgid() const { return req->get_spg(); } - ConnectionPipeline &get_connection_pipeline(); PipelineHandle &get_handle() { return handle; } epoch_t get_epoch() const { return req->get_min_epoch(); } + ConnectionPipeline &get_connection_pipeline(); + seastar::future prepare_remote_submission() { + assert(conn); + return conn.get_foreign( + ).then([this](auto f_conn) { + conn.reset(); + return f_conn; + }); + } + void finish_remote_submission(crimson::net::ConnectionFRef _conn) { + assert(!conn); + conn = make_local_shared_foreign(std::move(_conn)); + } + seastar::future<> with_pg( ShardServices &shard_services, Ref pg); @@ -53,7 +66,7 @@ public: private: ClientRequest::PGPipeline &pp(PG &pg); - crimson::net::ConnectionFRef conn; + crimson::net::ConnectionRef conn; // must be after `conn` to ensure the ConnectionPipeline's is alive PipelineHandle handle; Ref req; diff --git a/src/crimson/osd/osd_operations/peering_event.h b/src/crimson/osd/osd_operations/peering_event.h index 44442cc01eede..d9c9da58a17fc 100644 --- a/src/crimson/osd/osd_operations/peering_event.h +++ b/src/crimson/osd/osd_operations/peering_event.h @@ -107,7 +107,7 @@ public: class RemotePeeringEvent : public PeeringEvent { protected: - crimson::net::ConnectionFRef conn; + crimson::net::ConnectionRef conn; // must be after conn due to ConnectionPipeline's life-time PipelineHandle handle; @@ -153,9 +153,22 @@ public: spg_t get_pgid() const { return pgid; } - ConnectionPipeline &get_connection_pipeline(); PipelineHandle &get_handle() { return handle; } epoch_t get_epoch() const { return evt.get_epoch_sent(); } + + ConnectionPipeline &get_connection_pipeline(); + seastar::future prepare_remote_submission() { + assert(conn); + return conn.get_foreign( + ).then([this](auto f_conn) { + conn.reset(); + return f_conn; + }); + } + void finish_remote_submission(crimson::net::ConnectionFRef _conn) { + assert(!conn); + conn = make_local_shared_foreign(std::move(_conn)); + } }; class LocalPeeringEvent final : public PeeringEvent { diff --git a/src/crimson/osd/osd_operations/recovery_subrequest.cc b/src/crimson/osd/osd_operations/recovery_subrequest.cc index 7e5dd76e0a8a7..68655b8da5171 100644 --- a/src/crimson/osd/osd_operations/recovery_subrequest.cc +++ b/src/crimson/osd/osd_operations/recovery_subrequest.cc @@ -30,7 +30,7 @@ seastar::future<> RecoverySubRequest::with_pg( track_event(); IRef opref = this; return interruptor::with_interruption([this, pgref] { - return pgref->get_recovery_backend()->handle_recovery_op(m); + return pgref->get_recovery_backend()->handle_recovery_op(m, conn); }, [](std::exception_ptr) { return seastar::now(); }, pgref).finally([this, opref, pgref] { diff --git a/src/crimson/osd/osd_operations/recovery_subrequest.h b/src/crimson/osd/osd_operations/recovery_subrequest.h index c7b7efb5182d9..07c7c95b5e0fe 100644 --- a/src/crimson/osd/osd_operations/recovery_subrequest.h +++ b/src/crimson/osd/osd_operations/recovery_subrequest.h @@ -37,10 +37,23 @@ public: spg_t get_pgid() const { return m->get_spg(); } - ConnectionPipeline &get_connection_pipeline(); PipelineHandle &get_handle() { return handle; } epoch_t get_epoch() const { return m->get_min_epoch(); } + ConnectionPipeline &get_connection_pipeline(); + seastar::future prepare_remote_submission() { + assert(conn); + return conn.get_foreign( + ).then([this](auto f_conn) { + conn.reset(); + return f_conn; + }); + } + void finish_remote_submission(crimson::net::ConnectionFRef _conn) { + assert(!conn); + conn = make_local_shared_foreign(std::move(_conn)); + } + seastar::future<> with_pg( ShardServices &shard_services, Ref pg); @@ -55,7 +68,7 @@ public: > tracking_events; private: - crimson::net::ConnectionFRef conn; + crimson::net::ConnectionRef conn; // must be after `conn` to ensure the ConnectionPipeline's is alive PipelineHandle handle; Ref m; diff --git a/src/crimson/osd/osd_operations/replicated_request.h b/src/crimson/osd/osd_operations/replicated_request.h index 2716fb0b9293b..78d97ecf439fa 100644 --- a/src/crimson/osd/osd_operations/replicated_request.h +++ b/src/crimson/osd/osd_operations/replicated_request.h @@ -34,10 +34,23 @@ public: spg_t get_pgid() const { return req->get_spg(); } - ConnectionPipeline &get_connection_pipeline(); PipelineHandle &get_handle() { return handle; } epoch_t get_epoch() const { return req->get_min_epoch(); } + ConnectionPipeline &get_connection_pipeline(); + seastar::future prepare_remote_submission() { + assert(conn); + return conn.get_foreign( + ).then([this](auto f_conn) { + conn.reset(); + return f_conn; + }); + } + void finish_remote_submission(crimson::net::ConnectionFRef _conn) { + assert(!conn); + conn = make_local_shared_foreign(std::move(_conn)); + } + seastar::future<> with_pg( ShardServices &shard_services, Ref pg); @@ -55,7 +68,7 @@ public: private: ClientRequest::PGPipeline &pp(PG &pg); - crimson::net::ConnectionFRef conn; + crimson::net::ConnectionRef conn; PipelineHandle handle; Ref req; }; diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 997ad0953fd26..2b4a9da79946f 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -972,6 +972,7 @@ seastar::future<> PG::submit_error_log( PG::do_osd_ops_iertr::future>> PG::do_osd_ops( Ref m, + crimson::net::ConnectionRef conn, ObjectContextRef obc, const OpInfo &op_info, const SnapContext& snapc) @@ -981,7 +982,7 @@ PG::do_osd_ops( } return do_osd_ops_execute>( seastar::make_lw_shared( - Ref{this}, obc, op_info, *m, snapc), + Ref{this}, obc, op_info, *m, conn, snapc), m->ops, [this, m, obc, may_write = op_info.may_write(), may_read = op_info.may_read(), rvec = op_info.allows_returnvec()] { @@ -1103,7 +1104,13 @@ PG::do_osd_ops( [=, this, &ops, &op_info](auto &msg_params) { return do_osd_ops_execute( seastar::make_lw_shared( - Ref{this}, std::move(obc), op_info, msg_params, SnapContext{}), + Ref{this}, + std::move(obc), + op_info, + msg_params, + msg_params.get_connection(), + SnapContext{} + ), ops, std::move(success_func), std::move(failure_func)); @@ -1300,7 +1307,8 @@ void PG::handle_rep_op_reply(const MOSDRepOpReply& m) } PG::interruptible_future<> PG::do_update_log_missing( - Ref m) + Ref m, + crimson::net::ConnectionRef conn) { if (__builtin_expect(stopping, false)) { return seastar::make_exception_future<>( @@ -1322,7 +1330,7 @@ PG::interruptible_future<> PG::do_update_log_missing( return interruptor::make_interruptible(shard_services.get_store().do_transaction( coll_ref, std::move(t))).then_interruptible( - [m, lcod=peering_state.get_info().last_complete, this] { + [m, conn, lcod=peering_state.get_info().last_complete, this] { if (!peering_state.pg_has_reset_since(m->get_epoch())) { peering_state.update_last_complete_ondisk(lcod); auto reply = @@ -1334,7 +1342,7 @@ PG::interruptible_future<> PG::do_update_log_missing( m->get_tid(), lcod); reply->set_priority(CEPH_MSG_PRIO_HIGH); - return m->get_connection()->send(std::move(reply)); + return conn->send(std::move(reply)); } return seastar::now(); }); diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index aecd295087526..f401151bdb71f 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -534,7 +534,9 @@ public: void replica_clear_repop_obc( const std::vector &logv); void handle_rep_op_reply(const MOSDRepOpReply& m); - interruptible_future<> do_update_log_missing(Ref m); + interruptible_future<> do_update_log_missing( + Ref m, + crimson::net::ConnectionRef conn); interruptible_future<> do_update_log_missing_reply( Ref m); @@ -562,6 +564,7 @@ private: do_osd_ops_iertr::future>; do_osd_ops_iertr::future>> do_osd_ops( Ref m, + crimson::net::ConnectionRef conn, ObjectContextRef obc, const OpInfo &op_info, const SnapContext& snapc); @@ -770,7 +773,7 @@ private: }; struct PG::do_osd_ops_params_t { - crimson::net::ConnectionFRef &get_connection() const { + crimson::net::ConnectionRef &get_connection() const { return conn; } osd_reqid_t get_reqid() const { @@ -791,8 +794,9 @@ struct PG::do_osd_ops_params_t { // Only used by InternalClientRequest, no op flags bool has_flag(uint32_t flag) const { return false; - } - crimson::net::ConnectionFRef &conn; + } + + crimson::net::ConnectionRef &conn; osd_reqid_t reqid; utime_t mtime; epoch_t map_epoch; diff --git a/src/crimson/osd/pg_shard_manager.h b/src/crimson/osd/pg_shard_manager.h index fc56e64eb1b97..14d9c56b874c0 100644 --- a/src/crimson/osd/pg_shard_manager.h +++ b/src/crimson/osd/pg_shard_manager.h @@ -148,6 +148,37 @@ public: }); } + template + auto with_remote_shard_state_and_op( + core_id_t core, + typename T::IRef &&op, + F &&f) { + ceph_assert(seastar::this_shard_id() == PRIMARY_CORE); + if (seastar::this_shard_id() == core) { + auto &target_shard_services = shard_services.local(); + return std::invoke( + std::move(f), + target_shard_services.local_state, + target_shard_services, + std::move(op)); + } + return op->prepare_remote_submission( + ).then([op=std::move(op), f=std::move(f), this, core + ](auto f_conn) mutable { + return shard_services.invoke_on( + core, + [f=std::move(f), op=std::move(op), f_conn=std::move(f_conn) + ](auto &target_shard_services) mutable { + op->finish_remote_submission(std::move(f_conn)); + return std::invoke( + std::move(f), + target_shard_services.local_state, + target_shard_services, + std::move(op)); + }); + }); + } + /// Runs opref on the appropriate core, creating the pg as necessary. template seastar::future<> run_with_pg_maybe_create( @@ -163,11 +194,11 @@ public: op->get_pgid()); get_local_state().registry.remove_from_registry(*op); - return with_remote_shard_state( - core, - [op=std::move(op)]( - PerShardState &per_shard_state, - ShardServices &shard_services) mutable { + return with_remote_shard_state_and_op( + core, std::move(op), + [](PerShardState &per_shard_state, + ShardServices &shard_services, + typename T::IRef op) { per_shard_state.registry.add_to_registry(*op); auto &logger = crimson::get_logger(ceph_subsys_osd); auto &opref = *op; @@ -207,11 +238,11 @@ public: op->get_pgid()); get_local_state().registry.remove_from_registry(*op); - return with_remote_shard_state( - core, - [op=std::move(op)]( - PerShardState &per_shard_state, - ShardServices &shard_services) mutable { + return with_remote_shard_state_and_op( + core, std::move(op), + [](PerShardState &per_shard_state, + ShardServices &shard_services, + typename T::IRef op) { per_shard_state.registry.add_to_registry(*op); auto &logger = crimson::get_logger(ceph_subsys_osd); auto &opref = *op; diff --git a/src/crimson/osd/recovery_backend.cc b/src/crimson/osd/recovery_backend.cc index 7d6570bcf5709..b5394bfdc485d 100644 --- a/src/crimson/osd/recovery_backend.cc +++ b/src/crimson/osd/recovery_backend.cc @@ -69,7 +69,8 @@ void RecoveryBackend::WaitForObjectRecovery::stop() { } void RecoveryBackend::handle_backfill_finish( - MOSDPGBackfill& m) + MOSDPGBackfill& m, + crimson::net::ConnectionRef conn) { logger().debug("{}", __func__); ceph_assert(!pg.is_primary()); @@ -80,7 +81,7 @@ void RecoveryBackend::handle_backfill_finish( m.query_epoch, spg_t(pg.get_pgid().pgid, pg.get_primary().shard)); reply->set_priority(pg.get_recovery_op_priority()); - std::ignore = m.get_connection()->send(std::move(reply)); + std::ignore = conn->send(std::move(reply)); shard_services.start_operation( static_cast(&pg), pg.get_pg_whoami(), @@ -123,7 +124,8 @@ RecoveryBackend::handle_backfill_finish_ack( RecoveryBackend::interruptible_future<> RecoveryBackend::handle_backfill( - MOSDPGBackfill& m) + MOSDPGBackfill& m, + crimson::net::ConnectionRef conn) { logger().debug("{}", __func__); if (pg.old_peering_msg(m.map_epoch, m.query_epoch)) { @@ -132,7 +134,7 @@ RecoveryBackend::handle_backfill( } switch (m.op) { case MOSDPGBackfill::OP_BACKFILL_FINISH: - handle_backfill_finish(m); + handle_backfill_finish(m, conn); [[fallthrough]]; case MOSDPGBackfill::OP_BACKFILL_PROGRESS: return handle_backfill_progress(m); @@ -224,7 +226,8 @@ RecoveryBackend::scan_for_backfill( RecoveryBackend::interruptible_future<> RecoveryBackend::handle_scan_get_digest( - MOSDPGScan& m) + MOSDPGScan& m, + crimson::net::ConnectionRef conn) { logger().debug("{}", __func__); if (false /* FIXME: check for backfill too full */) { @@ -243,7 +246,7 @@ RecoveryBackend::handle_scan_get_digest( crimson::common::local_conf().get_val("osd_backfill_scan_min"), crimson::common::local_conf().get_val("osd_backfill_scan_max") ).then_interruptible( - [this, query_epoch=m.query_epoch, &conn=*(m.get_connection()) + [this, query_epoch=m.query_epoch, conn ](auto backfill_interval) { auto reply = crimson::make_message( MOSDPGScan::OP_SCAN_DIGEST, @@ -254,7 +257,7 @@ RecoveryBackend::handle_scan_get_digest( backfill_interval.begin, backfill_interval.end); encode(backfill_interval.objects, reply->get_data()); - return conn.send(std::move(reply)); + return conn->send(std::move(reply)); }); } @@ -285,7 +288,8 @@ RecoveryBackend::handle_scan_digest( RecoveryBackend::interruptible_future<> RecoveryBackend::handle_scan( - MOSDPGScan& m) + MOSDPGScan& m, + crimson::net::ConnectionRef conn) { logger().debug("{}", __func__); if (pg.old_peering_msg(m.map_epoch, m.query_epoch)) { @@ -294,7 +298,7 @@ RecoveryBackend::handle_scan( } switch (m.op) { case MOSDPGScan::OP_SCAN_GET_DIGEST: - return handle_scan_get_digest(m); + return handle_scan_get_digest(m, conn); case MOSDPGScan::OP_SCAN_DIGEST: return handle_scan_digest(m); default: @@ -306,15 +310,16 @@ RecoveryBackend::handle_scan( RecoveryBackend::interruptible_future<> RecoveryBackend::handle_recovery_op( - Ref m) + Ref m, + crimson::net::ConnectionRef conn) { switch (m->get_header().type) { case MSG_OSD_PG_BACKFILL: - return handle_backfill(*boost::static_pointer_cast(m)); + return handle_backfill(*boost::static_pointer_cast(m), conn); case MSG_OSD_PG_BACKFILL_REMOVE: return handle_backfill_remove(*boost::static_pointer_cast(m)); case MSG_OSD_PG_SCAN: - return handle_scan(*boost::static_pointer_cast(m)); + return handle_scan(*boost::static_pointer_cast(m), conn); default: return seastar::make_exception_future<>( std::invalid_argument(fmt::format("invalid request type: {}", diff --git a/src/crimson/osd/recovery_backend.h b/src/crimson/osd/recovery_backend.h index 64be1862074d8..3eed892c22582 100644 --- a/src/crimson/osd/recovery_backend.h +++ b/src/crimson/osd/recovery_backend.h @@ -65,7 +65,8 @@ public: } virtual interruptible_future<> handle_recovery_op( - Ref m); + Ref m, + crimson::net::ConnectionRef conn); virtual interruptible_future<> recover_object( const hobject_t& soid, @@ -210,18 +211,23 @@ protected: virtual seastar::future<> on_stop() = 0; private: void handle_backfill_finish( - MOSDPGBackfill& m); + MOSDPGBackfill& m, + crimson::net::ConnectionRef conn); interruptible_future<> handle_backfill_progress( MOSDPGBackfill& m); interruptible_future<> handle_backfill_finish_ack( MOSDPGBackfill& m); - interruptible_future<> handle_backfill(MOSDPGBackfill& m); + interruptible_future<> handle_backfill( + MOSDPGBackfill& m, + crimson::net::ConnectionRef conn); interruptible_future<> handle_scan_get_digest( - MOSDPGScan& m); + MOSDPGScan& m, + crimson::net::ConnectionRef conn); interruptible_future<> handle_scan_digest( MOSDPGScan& m); interruptible_future<> handle_scan( - MOSDPGScan& m); + MOSDPGScan& m, + crimson::net::ConnectionRef conn); interruptible_future<> handle_backfill_remove(MOSDPGBackfillRemove& m); }; diff --git a/src/crimson/osd/replicated_recovery_backend.cc b/src/crimson/osd/replicated_recovery_backend.cc index b2e3dfbd63826..24b990d78b757 100644 --- a/src/crimson/osd/replicated_recovery_backend.cc +++ b/src/crimson/osd/replicated_recovery_backend.cc @@ -1156,7 +1156,9 @@ ReplicatedRecoveryBackend::handle_recovery_delete_reply( } RecoveryBackend::interruptible_future<> -ReplicatedRecoveryBackend::handle_recovery_op(Ref m) +ReplicatedRecoveryBackend::handle_recovery_op( + Ref m, + crimson::net::ConnectionRef conn) { switch (m->get_header().type) { case MSG_OSD_PG_PULL: @@ -1174,7 +1176,7 @@ ReplicatedRecoveryBackend::handle_recovery_op(Ref m) boost::static_pointer_cast(m)); default: // delegate to parent class for handling backend-agnostic recovery ops. - return RecoveryBackend::handle_recovery_op(std::move(m)); + return RecoveryBackend::handle_recovery_op(std::move(m), conn); } } diff --git a/src/crimson/osd/replicated_recovery_backend.h b/src/crimson/osd/replicated_recovery_backend.h index 1ada19b18a079..b023b7417e5fb 100644 --- a/src/crimson/osd/replicated_recovery_backend.h +++ b/src/crimson/osd/replicated_recovery_backend.h @@ -23,7 +23,8 @@ public: : RecoveryBackend(pg, shard_services, coll, backend) {} interruptible_future<> handle_recovery_op( - Ref m) final; + Ref m, + crimson::net::ConnectionRef conn) final; interruptible_future<> recover_object( const hobject_t& soid, diff --git a/src/crimson/osd/watch.cc b/src/crimson/osd/watch.cc index ffdb17827063b..f71d915bb9d7a 100644 --- a/src/crimson/osd/watch.cc +++ b/src/crimson/osd/watch.cc @@ -78,7 +78,7 @@ Watch::~Watch() logger().debug("{} gid={} cookie={}", __func__, get_watcher_gid(), get_cookie()); } -seastar::future<> Watch::connect(crimson::net::ConnectionFRef conn, bool) +seastar::future<> Watch::connect(crimson::net::ConnectionRef conn, bool) { if (this->conn == conn) { logger().debug("conn={} already connected", conn); @@ -218,7 +218,7 @@ std::ostream &operator<<(std::ostream &out, const notify_reply_t &rhs) return out; } -Notify::Notify(crimson::net::ConnectionFRef conn, +Notify::Notify(crimson::net::ConnectionRef conn, const notify_info_t& ninfo, const uint64_t client_gid, const uint64_t user_version) diff --git a/src/crimson/osd/watch.h b/src/crimson/osd/watch.h index a37703569c7ef..0f7c9df544ac6 100644 --- a/src/crimson/osd/watch.h +++ b/src/crimson/osd/watch.h @@ -34,7 +34,7 @@ class Watch : public seastar::enable_shared_from_this { struct private_ctag_t{}; std::set> in_progress_notifies; - crimson::net::ConnectionFRef conn; + crimson::net::ConnectionRef conn; crimson::osd::ObjectContextRef obc; watch_info_t winfo; @@ -67,7 +67,7 @@ public: } ~Watch(); - seastar::future<> connect(crimson::net::ConnectionFRef, bool); + seastar::future<> connect(crimson::net::ConnectionRef, bool); void disconnect(); bool is_alive() const { return true; @@ -131,7 +131,7 @@ std::ostream &operator<<(std::ostream &out, const notify_reply_t &rhs); class Notify : public seastar::enable_shared_from_this { std::set watchers; const notify_info_t ninfo; - crimson::net::ConnectionFRef conn; + crimson::net::ConnectionRef conn; const uint64_t client_gid; const uint64_t user_version; bool complete{false}; @@ -152,14 +152,14 @@ class Notify : public seastar::enable_shared_from_this { /// Called on Notify timeout void do_notify_timeout(); - Notify(crimson::net::ConnectionFRef conn, + Notify(crimson::net::ConnectionRef conn, const notify_info_t& ninfo, const uint64_t client_gid, const uint64_t user_version); template Notify(WatchIteratorT begin, WatchIteratorT end, - crimson::net::ConnectionFRef conn, + crimson::net::ConnectionRef conn, const notify_info_t& ninfo, const uint64_t client_gid, const uint64_t user_version); @@ -205,7 +205,7 @@ public: template Notify::Notify(WatchIteratorT begin, WatchIteratorT end, - crimson::net::ConnectionFRef conn, + crimson::net::ConnectionRef conn, const notify_info_t& ninfo, const uint64_t client_gid, const uint64_t user_version) diff --git a/src/msg/Message.cc b/src/msg/Message.cc index e10981794b4e8..fdf32b5e09f2b 100644 --- a/src/msg/Message.cc +++ b/src/msg/Message.cc @@ -314,6 +314,10 @@ Message *decode_message(CephContext *cct, ceph::bufferlist& data, Message::ConnectionRef conn) { +#ifdef WITH_SEASTAR + // In crimson, conn is independently maintained outside Message. + ceph_assert(conn == nullptr); +#endif // verify crc if (crcflags & MSG_CRC_HEADER) { __u32 front_crc = front.crc32c(0); diff --git a/src/msg/Message.h b/src/msg/Message.h index 6ebf06346c3f0..9eec1c5bb8385 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -252,10 +252,8 @@ class Message : public RefCountedObject { public: #ifdef WITH_SEASTAR using ConnectionRef = crimson::net::ConnectionRef; - using ConnectionFRef = crimson::net::ConnectionFRef; #else using ConnectionRef = ::ConnectionRef; - using ConnectionFRef = ::ConnectionRef; #endif // WITH_SEASTAR protected: @@ -276,7 +274,7 @@ protected: /* time at which message was fully read */ utime_t recv_complete_stamp; - ConnectionFRef connection; + ConnectionRef connection; uint32_t magic = 0; @@ -351,8 +349,18 @@ protected: completion_hook->complete(0); } public: - const ConnectionFRef& get_connection() const { return connection; } + const ConnectionRef& get_connection() const { +#ifdef WITH_SEASTAR + // In crimson, conn is independently maintained outside Message. + ceph_abort(); +#endif + return connection; + } void set_connection(ConnectionRef c) { +#ifdef WITH_SEASTAR + // In crimson, conn is independently maintained outside Message. + ceph_assert(c == nullptr); +#endif connection = std::move(c); } CompletionHook* get_completion_hook() { return completion_hook; } diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index 8f14bafb65225..6fc9c1d7750c8 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -14,6 +14,7 @@ #include "crimson/net/Dispatcher.h" #include "crimson/net/Messenger.h" #include "crimson/net/Interceptor.h" +#include "crimson/net/SocketConnection.h" #include #include @@ -495,6 +496,7 @@ using crimson::net::Dispatcher; using crimson::net::Interceptor; using crimson::net::Messenger; using crimson::net::MessengerRef; +using crimson::net::SocketConnection; using crimson::net::SocketPolicy; using crimson::net::tag_bp_t; using namespace ceph::net::test; @@ -550,8 +552,8 @@ struct ConnResult { unsigned cnt_reset_dispatched = 0; unsigned cnt_remote_reset_dispatched = 0; - ConnResult(Connection& conn, unsigned index) - : conn(conn.shared_from_this()), index(index) {} + ConnResult(ConnectionRef conn, unsigned index) + : conn(conn), index(index) {} template void _assert_eq(const char* expr_actual, T actual, @@ -697,16 +699,23 @@ struct TestInterceptor : public Interceptor { } private: - void register_conn(Connection& conn) override { + void register_conn(SocketConnection& _conn) override { + auto conn = _conn.get_local_shared_foreign_from_this(); + auto result = find_result(conn); + if (result != nullptr) { + logger().error("The connection [{}] {} already exists when register {}", + result->index, *result->conn, _conn); + ceph_abort(); + } unsigned index = results.size(); results.emplace_back(conn, index); - conns[conn.shared_from_this()] = index; + conns[conn] = index; notify(); - logger().info("[{}] {} new connection registered", index, conn); + logger().info("[{}] {} new connection registered", index, _conn); } - void register_conn_closed(Connection& conn) override { - auto result = find_result(conn.shared_from_this()); + void register_conn_closed(SocketConnection& conn) override { + auto result = find_result(conn.get_local_shared_foreign_from_this()); if (result == nullptr) { logger().error("Untracked closed connection: {}", conn); ceph_abort(); @@ -719,8 +728,8 @@ struct TestInterceptor : public Interceptor { logger().info("[{}] {} closed({})", result->index, conn, result->state); } - void register_conn_ready(Connection& conn) override { - auto result = find_result(conn.shared_from_this()); + void register_conn_ready(SocketConnection& conn) override { + auto result = find_result(conn.get_local_shared_foreign_from_this()); if (result == nullptr) { logger().error("Untracked ready connection: {}", conn); ceph_abort(); @@ -731,8 +740,8 @@ struct TestInterceptor : public Interceptor { logger().info("[{}] {} ready", result->index, conn); } - void register_conn_replaced(Connection& conn) override { - auto result = find_result(conn.shared_from_this()); + void register_conn_replaced(SocketConnection& conn) override { + auto result = find_result(conn.get_local_shared_foreign_from_this()); if (result == nullptr) { logger().error("Untracked replaced connection: {}", conn); ceph_abort(); @@ -742,10 +751,10 @@ struct TestInterceptor : public Interceptor { logger().info("[{}] {} {}", result->index, conn, result->state); } - bp_action_t intercept(Connection& conn, Breakpoint bp) override { + bp_action_t intercept(SocketConnection& conn, Breakpoint bp) override { ++breakpoints_counter[bp].counter; - auto result = find_result(conn.shared_from_this()); + auto result = find_result(conn.get_local_shared_foreign_from_this()); if (result == nullptr) { logger().error("Untracked intercepted connection: {}, at breakpoint {}({})", conn, bp, breakpoints_counter[bp].counter); diff --git a/src/test/crimson/test_messenger_thrash.cc b/src/test/crimson/test_messenger_thrash.cc index 1dd910be9a60d..b3a1d910b5dba 100644 --- a/src/test/crimson/test_messenger_thrash.cc +++ b/src/test/crimson/test_messenger_thrash.cc @@ -118,17 +118,15 @@ class SyntheticDispatcher final auto p = m->get_data().cbegin(); decode(pl, p); if (pl.who == Payload::PING) { - logger().info(" {} conn= {} {}", __func__, - *m->get_connection(), pl); - return reply_message(m, pl); + logger().info(" {} conn= {} {}", __func__, *con, pl); + return reply_message(m, con, pl); } else { ceph_assert(pl.who == Payload::PONG); if (sent.count(pl.seq)) { - logger().info(" {} conn= {} {}", __func__, - m->get_connection(), pl); - ceph_assert(conn_sent[&*m->get_connection()].front() == pl.seq); + logger().info(" {} conn= {} {}", __func__, *con, pl); + ceph_assert(conn_sent[&*con].front() == pl.seq); ceph_assert(pl.data.contents_equal(sent[pl.seq])); - conn_sent[&*m->get_connection()].pop_front(); + conn_sent[&*con].pop_front(); sent.erase(pl.seq); } @@ -150,7 +148,10 @@ class SyntheticDispatcher final clear_pending(con); } - std::optional> reply_message(const MessageRef m, Payload& pl) { + std::optional> reply_message( + const MessageRef m, + crimson::net::ConnectionRef con, + Payload& pl) { pl.who = Payload::PONG; bufferlist bl; encode(pl, bl); @@ -158,9 +159,9 @@ class SyntheticDispatcher final rm->set_data(bl); if (verbose) { logger().info("{} conn= {} reply i= {}", - __func__, m->get_connection(), pl.seq); + __func__, *con, pl.seq); } - return m->get_connection()->send(std::move(rm)); + return con->send(std::move(rm)); } seastar::future<> send_message_wrap(crimson::net::ConnectionRef con, -- 2.39.5