From c47dc1d0e46add1d9f284d41112e9b772b0b36b9 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Wed, 26 Apr 2023 17:14:06 +0800 Subject: [PATCH] crimson/net: check that FrameAssemblerV2 is working in the expected core Signed-off-by: Yingxin Cheng --- src/crimson/net/FrameAssemblerV2.cc | 29 +++++++++++++++++++++++++++-- src/crimson/net/FrameAssemblerV2.h | 4 ++++ src/crimson/net/SocketConnection.cc | 6 ++++++ src/crimson/net/SocketConnection.h | 2 ++ 4 files changed, 39 insertions(+), 2 deletions(-) diff --git a/src/crimson/net/FrameAssemblerV2.cc b/src/crimson/net/FrameAssemblerV2.cc index b621279353e..4a56829493a 100644 --- a/src/crimson/net/FrameAssemblerV2.cc +++ b/src/crimson/net/FrameAssemblerV2.cc @@ -27,11 +27,15 @@ seastar::logger& logger() { namespace crimson::net { FrameAssemblerV2::FrameAssemblerV2(SocketConnection &_conn) - : conn{_conn} -{} + : conn{_conn}, sid{seastar::this_shard_id()} +{ + assert(seastar::this_shard_id() == conn.get_messenger_shard_id()); +} FrameAssemblerV2::~FrameAssemblerV2() { + assert(seastar::this_shard_id() == conn.get_messenger_shard_id()); + assert(seastar::this_shard_id() == sid); if (has_socket()) { std::ignore = move_socket(); } @@ -41,6 +45,7 @@ FrameAssemblerV2::~FrameAssemblerV2() // should be consistent to intercept() in ProtocolV2.cc void FrameAssemblerV2::intercept_frame(Tag tag, bool is_write) { + assert(seastar::this_shard_id() == sid); assert(has_socket()); if (conn.interceptor) { auto type = is_write ? bp_type_t::WRITE : bp_type_t::READ; @@ -55,6 +60,7 @@ void FrameAssemblerV2::intercept_frame(Tag tag, bool is_write) void FrameAssemblerV2::set_is_rev1(bool _is_rev1) { + assert(seastar::this_shard_id() == sid); is_rev1 = _is_rev1; tx_frame_asm.set_is_rev1(_is_rev1); rx_frame_asm.set_is_rev1(_is_rev1); @@ -64,12 +70,14 @@ void FrameAssemblerV2::create_session_stream_handlers( const AuthConnectionMeta &auth_meta, bool crossed) { + assert(seastar::this_shard_id() == sid); session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair( nullptr, auth_meta, is_rev1, crossed); } void FrameAssemblerV2::reset_handlers() { + assert(seastar::this_shard_id() == sid); session_stream_handlers = { nullptr, nullptr }; session_comp_handlers = { nullptr, nullptr }; } @@ -77,6 +85,7 @@ void FrameAssemblerV2::reset_handlers() FrameAssemblerV2::mover_t FrameAssemblerV2::to_replace() { + assert(seastar::this_shard_id() == sid); assert(is_socket_valid()); return mover_t{ move_socket(), @@ -86,6 +95,7 @@ FrameAssemblerV2::to_replace() seastar::future<> FrameAssemblerV2::replace_by(FrameAssemblerV2::mover_t &&mover) { + assert(seastar::this_shard_id() == sid); record_io = false; rxbuf.clear(); txbuf.clear(); @@ -101,6 +111,7 @@ seastar::future<> FrameAssemblerV2::replace_by(FrameAssemblerV2::mover_t &&mover void FrameAssemblerV2::start_recording() { + assert(seastar::this_shard_id() == sid); record_io = true; rxbuf.clear(); txbuf.clear(); @@ -109,6 +120,7 @@ void FrameAssemblerV2::start_recording() FrameAssemblerV2::record_bufs_t FrameAssemblerV2::stop_recording() { + assert(seastar::this_shard_id() == sid); ceph_assert_always(record_io == true); record_io = false; return record_bufs_t{std::move(rxbuf), std::move(txbuf)}; @@ -122,6 +134,7 @@ bool FrameAssemblerV2::has_socket() const bool FrameAssemblerV2::is_socket_valid() const { + assert(seastar::this_shard_id() == sid); return has_socket() && !socket->is_shutdown(); } @@ -134,6 +147,7 @@ SocketFRef FrameAssemblerV2::move_socket() void FrameAssemblerV2::set_socket(SocketFRef &&new_socket) { + assert(seastar::this_shard_id() == sid); assert(!has_socket()); assert(new_socket); socket = std::move(new_socket); @@ -143,18 +157,21 @@ void FrameAssemblerV2::set_socket(SocketFRef &&new_socket) void FrameAssemblerV2::learn_socket_ephemeral_port_as_connector(uint16_t port) { + assert(seastar::this_shard_id() == sid); assert(has_socket()); socket->learn_ephemeral_port_as_connector(port); } void FrameAssemblerV2::shutdown_socket() { + assert(seastar::this_shard_id() == sid); assert(is_socket_valid()); socket->shutdown(); } seastar::future<> FrameAssemblerV2::replace_shutdown_socket(SocketFRef &&new_socket) { + assert(seastar::this_shard_id() == sid); assert(has_socket()); assert(socket->is_shutdown()); auto old_socket = move_socket(); @@ -165,6 +182,7 @@ seastar::future<> FrameAssemblerV2::replace_shutdown_socket(SocketFRef &&new_soc seastar::future<> FrameAssemblerV2::close_shutdown_socket() { + assert(seastar::this_shard_id() == sid); assert(has_socket()); assert(socket->is_shutdown()); return socket->close(); @@ -173,6 +191,7 @@ seastar::future<> FrameAssemblerV2::close_shutdown_socket() seastar::future FrameAssemblerV2::read_exactly(std::size_t bytes) { + assert(seastar::this_shard_id() == sid); assert(has_socket()); if (unlikely(record_io)) { return socket->read_exactly(bytes @@ -188,6 +207,7 @@ FrameAssemblerV2::read_exactly(std::size_t bytes) seastar::future FrameAssemblerV2::read(std::size_t bytes) { + assert(seastar::this_shard_id() == sid); assert(has_socket()); if (unlikely(record_io)) { return socket->read(bytes @@ -203,6 +223,7 @@ FrameAssemblerV2::read(std::size_t bytes) seastar::future<> FrameAssemblerV2::write(ceph::bufferlist buf) { + assert(seastar::this_shard_id() == sid); assert(has_socket()); if (unlikely(record_io)) { txbuf.append(buf); @@ -213,6 +234,7 @@ FrameAssemblerV2::write(ceph::bufferlist buf) seastar::future<> FrameAssemblerV2::flush() { + assert(seastar::this_shard_id() == sid); assert(has_socket()); return socket->flush(); } @@ -220,6 +242,7 @@ FrameAssemblerV2::flush() seastar::future<> FrameAssemblerV2::write_flush(ceph::bufferlist buf) { + assert(seastar::this_shard_id() == sid); assert(has_socket()); if (unlikely(record_io)) { txbuf.append(buf); @@ -230,6 +253,7 @@ FrameAssemblerV2::write_flush(ceph::bufferlist buf) seastar::future FrameAssemblerV2::read_main_preamble() { + assert(seastar::this_shard_id() == sid); rx_preamble.clear(); return read_exactly(rx_frame_asm.get_preamble_onwire_len() ).then([this](auto bptr) { @@ -250,6 +274,7 @@ FrameAssemblerV2::read_main_preamble() seastar::future FrameAssemblerV2::read_frame_payload() { + assert(seastar::this_shard_id() == sid); rx_segments_data.clear(); return seastar::do_until( [this] { diff --git a/src/crimson/net/FrameAssemblerV2.h b/src/crimson/net/FrameAssemblerV2.h index 0ba5f53cc9f..d31e9c4b78d 100644 --- a/src/crimson/net/FrameAssemblerV2.h +++ b/src/crimson/net/FrameAssemblerV2.h @@ -108,6 +108,7 @@ public: template ceph::bufferlist get_buffer(F &tx_frame) { + assert(seastar::this_shard_id() == sid); #ifdef UNIT_TESTS_BUILT intercept_frame(F::tag, true); #endif @@ -118,6 +119,7 @@ public: template seastar::future<> write_flush_frame(F &tx_frame) { + assert(seastar::this_shard_id() == sid); auto bl = get_buffer(tx_frame); return write_flush(std::move(bl)); } @@ -139,6 +141,8 @@ private: SocketFRef socket; + seastar::shard_id sid; + /* * auth signature */ diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 9f3740a5c77..ad96c7b6688 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -152,6 +152,12 @@ SocketConnection::get_messenger() const return messenger; } +seastar::shard_id +SocketConnection::get_messenger_shard_id() const +{ + return msgr_sid; +} + void SocketConnection::set_peer_type(entity_type_t peer_type) { assert(seastar::this_shard_id() == msgr_sid); // it is not allowed to assign an unknown value when the current diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index f54ac7fd9a4..5fd70049aab 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -141,6 +141,8 @@ class SocketConnection : public Connection { seastar::socket_address get_local_address() const; + seastar::shard_id get_messenger_shard_id() const; + SocketMessenger &get_messenger() const; ConnectionRef get_local_shared_foreign_from_this(); -- 2.39.5