]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: misc cleanups with logs around cross-core
authorYingxin Cheng <yingxin.cheng@intel.com>
Mon, 5 Jun 2023 02:57:37 +0000 (10:57 +0800)
committerMatan Breizman <mbreizma@redhat.com>
Wed, 11 Oct 2023 11:38:32 +0000 (11:38 +0000)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
(cherry picked from commit 77268679631adaad2408eac62d3d599e2acf6059)

src/crimson/net/ProtocolV2.cc
src/crimson/net/io_handler.cc
src/crimson/net/io_handler.h

index e1ffef29088942c95d72edbad8cbbaf906da406c..15d3d565dd7784688cc1a994014870e4db16c1cb 100644 (file)
@@ -155,6 +155,7 @@ ProtocolV2::~ProtocolV2() {}
 void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
                                const entity_name_t& _peer_name)
 {
+  assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
   ceph_assert(state == state_t::NONE);
   ceph_assert(!gate.is_closed());
   conn.peer_addr = _peer_addr;
@@ -175,6 +176,7 @@ void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
 void ProtocolV2::start_accept(SocketFRef&& new_socket,
                               const entity_addr_t& _peer_addr)
 {
+  assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
   ceph_assert(state == state_t::NONE);
   // until we know better
   conn.target_addr = _peer_addr;
@@ -813,7 +815,10 @@ ProtocolV2::client_reconnect()
           // handle_session_reset() logic
           auto reset = ResetFrame::Decode(payload->back());
           logger().warn("{} GOT ResetFrame: full={}", conn, reset.full());
+
           reset_session(reset.full());
+          // user can make changes
+
           return client_connect();
         });
       case Tag::WAIT:
@@ -1960,9 +1965,12 @@ void ProtocolV2::trigger_replacing(bool reconnect,
              new_connect_seq, new_msg_seq] () mutable {
       if (state == state_t::REPLACING && do_reset) {
         reset_session(true);
+        // user can make changes
       }
 
       if (unlikely(state != state_t::REPLACING)) {
+        logger().debug("{} triggered {} in the middle of trigger_replacing(), abort",
+                       conn, get_state_name(state));
         ceph_assert_always(state == state_t::CLOSING);
         return mover.socket->close(
         ).then([sock = std::move(mover.socket)] {
@@ -2038,7 +2046,9 @@ void ProtocolV2::notify_out_fault(
     std::exception_ptr eptr,
     io_handler_state _io_states)
 {
+  assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
   io_states = _io_states;
+  logger().debug("{} got notify_out_fault(): io_states={}", conn, io_states);
   fault(state_t::READY, where, eptr);
 }
 
@@ -2062,6 +2072,8 @@ void ProtocolV2::execute_standby()
 
 void ProtocolV2::notify_out()
 {
+  assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+  logger().debug("{} got notify_out(): at {}", conn, get_state_name(state));
   io_states.is_out_queued = true;
   if (unlikely(state == state_t::STANDBY && !conn.policy.server)) {
     logger().info("{} notify_out(): at {}, going to CONNECTING",
@@ -2137,6 +2149,8 @@ void ProtocolV2::execute_server_wait()
 
 void ProtocolV2::notify_mark_down()
 {
+  assert(seastar::this_shard_id() == conn.get_messenger_shard_id());
+  logger().debug("{} got notify_mark_down()", conn);
   do_close(false);
 }
 
index 7072c4d0b2ef845b633e54ab054122c65da207c2..f04ffff3a006577472d58e5ff298c00b59a07c0b 100644 (file)
@@ -367,6 +367,8 @@ IOHandler::wait_io_exit_dispatching()
 
 void IOHandler::reset_session(bool full)
 {
+  assert(seastar::this_shard_id() == get_shard_id());
+  logger().debug("{} got reset_session({})", conn, full);
   assert(get_io_state() != io_state_t::open);
   reset_in();
   if (full) {
@@ -377,6 +379,8 @@ void IOHandler::reset_session(bool full)
 
 void IOHandler::reset_peer_state()
 {
+  assert(seastar::this_shard_id() == get_shard_id());
+  logger().debug("{} got reset_peer_state()", conn);
   assert(get_io_state() != io_state_t::open);
   reset_in();
   requeue_out_sent_up_to(0);
@@ -385,6 +389,7 @@ void IOHandler::reset_peer_state()
 
 void IOHandler::requeue_out_sent()
 {
+  assert(seastar::this_shard_id() == get_shard_id());
   assert(get_io_state() != io_state_t::open);
   if (out_sent_msgs.empty()) {
     return;
@@ -407,6 +412,7 @@ void IOHandler::requeue_out_sent()
 
 void IOHandler::requeue_out_sent_up_to(seq_num_t seq)
 {
+  assert(seastar::this_shard_id() == get_shard_id());
   assert(get_io_state() != io_state_t::open);
   if (out_sent_msgs.empty() && out_pending_msgs.empty()) {
     logger().debug("{} nothing to requeue, reset out_seq from {} to seq {}",
@@ -470,7 +476,10 @@ IOHandler::dispatch_accept(
   ceph_assert_always(conn_ref);
   auto _conn_ref = conn_ref;
   auto fut = to_new_sid(new_sid, std::move(conn_fref));
+
   dispatchers.ms_handle_accept(_conn_ref, new_sid);
+  // user can make changes
+
   return fut;
 }
 
@@ -493,7 +502,10 @@ IOHandler::dispatch_connect(
   ceph_assert_always(conn_ref);
   auto _conn_ref = conn_ref;
   auto fut = to_new_sid(new_sid, std::move(conn_fref));
+
   dispatchers.ms_handle_connect(_conn_ref, new_sid);
+  // user can make changes
+
   return fut;
 }
 
@@ -581,7 +593,9 @@ void IOHandler::dispatch_reset(bool is_replace)
   }
   need_dispatch_reset = false;
   ceph_assert_always(conn_ref);
+
   dispatchers.ms_handle_reset(conn_ref, is_replace);
+  // user can make changes
 }
 
 void IOHandler::dispatch_remote_reset()
@@ -590,7 +604,9 @@ void IOHandler::dispatch_remote_reset()
     return;
   }
   ceph_assert_always(conn_ref);
+
   dispatchers.ms_handle_remote_reset(conn_ref);
+  // user can make changes
 }
 
 void IOHandler::ack_out_sent(seq_num_t seq)
@@ -712,6 +728,7 @@ IOHandler::do_out_dispatch(shard_states_t &ctx)
 
 void IOHandler::maybe_notify_out_dispatch()
 {
+  ceph_assert_always(seastar::this_shard_id() == get_shard_id());
   if (is_out_queued()) {
     notify_out_dispatch();
   }
@@ -719,6 +736,7 @@ void IOHandler::maybe_notify_out_dispatch()
 
 void IOHandler::notify_out_dispatch()
 {
+  ceph_assert_always(seastar::this_shard_id() == get_shard_id());
   assert(is_out_queued());
   if (need_notify_out) {
     logger().debug("{} send notify_out()", conn);
@@ -853,8 +871,10 @@ IOHandler::read_message(
     assert(ctx.get_io_state() == io_state_t::open);
     assert(get_io_state() == io_state_t::open);
     ceph_assert_always(conn_ref);
+
     // throttle the reading process by the returned future
     return dispatchers.ms_dispatch(conn_ref, std::move(msg_ref));
+    // user can make changes
   });
 }
 
index ecdd02f1573355e33c5ac88ee722a8708f60d93d..f478f14296300bfd221c161d0f71b5edc5526579 100644 (file)
@@ -151,11 +151,13 @@ public:
    */
 
   void set_handshake_listener(HandshakeListener &hl) {
+    assert(seastar::this_shard_id() == get_shard_id());
     ceph_assert_always(handshake_listener == nullptr);
     handshake_listener = &hl;
   }
 
   io_handler_state get_states() const {
+    assert(seastar::this_shard_id() == get_shard_id());
     return {in_seq, is_out_queued(), has_out_sent()};
   }