]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: check that FrameAssemblerV2 is working in the expected core
authorYingxin Cheng <yingxin.cheng@intel.com>
Wed, 26 Apr 2023 09:14:06 +0000 (17:14 +0800)
committerMatan Breizman <mbreizma@redhat.com>
Wed, 11 Oct 2023 11:38:31 +0000 (11:38 +0000)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
(cherry picked from commit c47dc1d0e46add1d9f284d41112e9b772b0b36b9)

src/crimson/net/FrameAssemblerV2.cc
src/crimson/net/FrameAssemblerV2.h
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h

index b621279353e5a700623c571a2726c20fb598b890..4a56829493a07f67878c300e1bd15cae882dad73 100644 (file)
@@ -27,11 +27,15 @@ seastar::logger& logger() {
 namespace crimson::net {
 
 FrameAssemblerV2::FrameAssemblerV2(SocketConnection &_conn)
-  : conn{_conn}
-{}
+  : conn{_conn}, sid{seastar::this_shard_id()}
+{
+  assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+}
 
 FrameAssemblerV2::~FrameAssemblerV2()
 {
+  assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+  assert(seastar::this_shard_id() == sid);
   if (has_socket()) {
     std::ignore = move_socket();
   }
@@ -41,6 +45,7 @@ FrameAssemblerV2::~FrameAssemblerV2()
 // should be consistent to intercept() in ProtocolV2.cc
 void FrameAssemblerV2::intercept_frame(Tag tag, bool is_write)
 {
+  assert(seastar::this_shard_id() == sid);
   assert(has_socket());
   if (conn.interceptor) {
     auto type = is_write ? bp_type_t::WRITE : bp_type_t::READ;
@@ -55,6 +60,7 @@ void FrameAssemblerV2::intercept_frame(Tag tag, bool is_write)
 
 void FrameAssemblerV2::set_is_rev1(bool _is_rev1)
 {
+  assert(seastar::this_shard_id() == sid);
   is_rev1 = _is_rev1;
   tx_frame_asm.set_is_rev1(_is_rev1);
   rx_frame_asm.set_is_rev1(_is_rev1);
@@ -64,12 +70,14 @@ void FrameAssemblerV2::create_session_stream_handlers(
   const AuthConnectionMeta &auth_meta,
   bool crossed)
 {
+  assert(seastar::this_shard_id() == sid);
   session_stream_handlers = ceph::crypto::onwire::rxtx_t::create_handler_pair(
       nullptr, auth_meta, is_rev1, crossed);
 }
 
 void FrameAssemblerV2::reset_handlers()
 {
+  assert(seastar::this_shard_id() == sid);
   session_stream_handlers = { nullptr, nullptr };
   session_comp_handlers = { nullptr, nullptr };
 }
@@ -77,6 +85,7 @@ void FrameAssemblerV2::reset_handlers()
 FrameAssemblerV2::mover_t
 FrameAssemblerV2::to_replace()
 {
+  assert(seastar::this_shard_id() == sid);
   assert(is_socket_valid());
   return mover_t{
       move_socket(),
@@ -86,6 +95,7 @@ FrameAssemblerV2::to_replace()
 
 seastar::future<> FrameAssemblerV2::replace_by(FrameAssemblerV2::mover_t &&mover)
 {
+  assert(seastar::this_shard_id() == sid);
   record_io = false;
   rxbuf.clear();
   txbuf.clear();
@@ -101,6 +111,7 @@ seastar::future<> FrameAssemblerV2::replace_by(FrameAssemblerV2::mover_t &&mover
 
 void FrameAssemblerV2::start_recording()
 {
+  assert(seastar::this_shard_id() == sid);
   record_io = true;
   rxbuf.clear();
   txbuf.clear();
@@ -109,6 +120,7 @@ void FrameAssemblerV2::start_recording()
 FrameAssemblerV2::record_bufs_t
 FrameAssemblerV2::stop_recording()
 {
+  assert(seastar::this_shard_id() == sid);
   ceph_assert_always(record_io == true);
   record_io = false;
   return record_bufs_t{std::move(rxbuf), std::move(txbuf)};
@@ -122,6 +134,7 @@ bool FrameAssemblerV2::has_socket() const
 
 bool FrameAssemblerV2::is_socket_valid() const
 {
+  assert(seastar::this_shard_id() == sid);
   return has_socket() && !socket->is_shutdown();
 }
 
@@ -134,6 +147,7 @@ SocketFRef FrameAssemblerV2::move_socket()
 
 void FrameAssemblerV2::set_socket(SocketFRef &&new_socket)
 {
+  assert(seastar::this_shard_id() == sid);
   assert(!has_socket());
   assert(new_socket);
   socket = std::move(new_socket);
@@ -143,18 +157,21 @@ void FrameAssemblerV2::set_socket(SocketFRef &&new_socket)
 
 void FrameAssemblerV2::learn_socket_ephemeral_port_as_connector(uint16_t port)
 {
+  assert(seastar::this_shard_id() == sid);
   assert(has_socket());
   socket->learn_ephemeral_port_as_connector(port);
 }
 
 void FrameAssemblerV2::shutdown_socket()
 {
+  assert(seastar::this_shard_id() == sid);
   assert(is_socket_valid());
   socket->shutdown();
 }
 
 seastar::future<> FrameAssemblerV2::replace_shutdown_socket(SocketFRef &&new_socket)
 {
+  assert(seastar::this_shard_id() == sid);
   assert(has_socket());
   assert(socket->is_shutdown());
   auto old_socket = move_socket();
@@ -165,6 +182,7 @@ seastar::future<> FrameAssemblerV2::replace_shutdown_socket(SocketFRef &&new_soc
 
 seastar::future<> FrameAssemblerV2::close_shutdown_socket()
 {
+  assert(seastar::this_shard_id() == sid);
   assert(has_socket());
   assert(socket->is_shutdown());
   return socket->close();
@@ -173,6 +191,7 @@ seastar::future<> FrameAssemblerV2::close_shutdown_socket()
 seastar::future<ceph::bufferptr>
 FrameAssemblerV2::read_exactly(std::size_t bytes)
 {
+  assert(seastar::this_shard_id() == sid);
   assert(has_socket());
   if (unlikely(record_io)) {
     return socket->read_exactly(bytes
@@ -188,6 +207,7 @@ FrameAssemblerV2::read_exactly(std::size_t bytes)
 seastar::future<ceph::bufferlist>
 FrameAssemblerV2::read(std::size_t bytes)
 {
+  assert(seastar::this_shard_id() == sid);
   assert(has_socket());
   if (unlikely(record_io)) {
     return socket->read(bytes
@@ -203,6 +223,7 @@ FrameAssemblerV2::read(std::size_t bytes)
 seastar::future<>
 FrameAssemblerV2::write(ceph::bufferlist buf)
 {
+  assert(seastar::this_shard_id() == sid);
   assert(has_socket());
   if (unlikely(record_io)) {
     txbuf.append(buf);
@@ -213,6 +234,7 @@ FrameAssemblerV2::write(ceph::bufferlist buf)
 seastar::future<>
 FrameAssemblerV2::flush()
 {
+  assert(seastar::this_shard_id() == sid);
   assert(has_socket());
   return socket->flush();
 }
@@ -220,6 +242,7 @@ FrameAssemblerV2::flush()
 seastar::future<>
 FrameAssemblerV2::write_flush(ceph::bufferlist buf)
 {
+  assert(seastar::this_shard_id() == sid);
   assert(has_socket());
   if (unlikely(record_io)) {
     txbuf.append(buf);
@@ -230,6 +253,7 @@ FrameAssemblerV2::write_flush(ceph::bufferlist buf)
 seastar::future<FrameAssemblerV2::read_main_t>
 FrameAssemblerV2::read_main_preamble()
 {
+  assert(seastar::this_shard_id() == sid);
   rx_preamble.clear();
   return read_exactly(rx_frame_asm.get_preamble_onwire_len()
   ).then([this](auto bptr) {
@@ -250,6 +274,7 @@ FrameAssemblerV2::read_main_preamble()
 seastar::future<FrameAssemblerV2::read_payload_t*>
 FrameAssemblerV2::read_frame_payload()
 {
+  assert(seastar::this_shard_id() == sid);
   rx_segments_data.clear();
   return seastar::do_until(
     [this] {
index 0ba5f53cc9f84c067564bd61d414a441a7e00ae6..d31e9c4b78d86e10673fda574a2a91c29e273db8 100644 (file)
@@ -108,6 +108,7 @@ public:
 
   template <class F>
   ceph::bufferlist get_buffer(F &tx_frame) {
+    assert(seastar::this_shard_id() == sid);
 #ifdef UNIT_TESTS_BUILT
     intercept_frame(F::tag, true);
 #endif
@@ -118,6 +119,7 @@ public:
 
   template <class F>
   seastar::future<> write_flush_frame(F &tx_frame) {
+    assert(seastar::this_shard_id() == sid);
     auto bl = get_buffer(tx_frame);
     return write_flush(std::move(bl));
   }
@@ -139,6 +141,8 @@ private:
 
   SocketFRef socket;
 
+  seastar::shard_id sid;
+
   /*
    * auth signature
    */
index 9f3740a5c77701059368ad1ef7500db392d7c96d..ad96c7b6688a5a97798804db3b30df7de6395a44 100644 (file)
@@ -152,6 +152,12 @@ SocketConnection::get_messenger() const
   return messenger;
 }
 
+seastar::shard_id
+SocketConnection::get_messenger_shard_id() const
+{
+  return msgr_sid;
+}
+
 void SocketConnection::set_peer_type(entity_type_t peer_type) {
   assert(seastar::this_shard_id() == msgr_sid);
   // it is not allowed to assign an unknown value when the current
index f54ac7fd9a47464390a3d3022505a2297d0a8307..5fd70049aabd3a8c7c60e3763b0279504f0e3f2b 100644 (file)
@@ -141,6 +141,8 @@ class SocketConnection : public Connection {
 
   seastar::socket_address get_local_address() const;
 
+  seastar::shard_id get_messenger_shard_id() const;
+
   SocketMessenger &get_messenger() const;
 
   ConnectionRef get_local_shared_foreign_from_this();