]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: adjust the IO path in FrameAssemblerV2 with the foreign socket
authorYingxin Cheng <yingxin.cheng@intel.com>
Wed, 26 Apr 2023 09:33:58 +0000 (17:33 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Sun, 25 Jun 2023 03:57:19 +0000 (11:57 +0800)
FrameAssemblerV2 and Socket may in different cores during handshake.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/net/FrameAssemblerV2.cc
src/crimson/net/FrameAssemblerV2.h
src/crimson/net/io_handler.cc

index 4608f83da533ed9aee89a3367bed5b2193c48143..a043fcc57171e9fd27225644bcce0ff43675768d 100644 (file)
@@ -53,7 +53,12 @@ void FrameAssemblerV2::intercept_frame(Tag tag, bool is_write)
     auto action = conn.interceptor->intercept(
         conn.get_local_shared_foreign_from_this(),
         Breakpoint{tag, type});
-    socket->set_trap(type, action, &conn.interceptor->blocker);
+    // tolerate leaking future in tests
+    std::ignore = seastar::smp::submit_to(
+        socket->get_shard_id(),
+        [this, type, action] {
+      socket->set_trap(type, action, &conn.interceptor->blocker);
+    });
   }
 }
 #endif
@@ -165,6 +170,7 @@ void FrameAssemblerV2::learn_socket_ephemeral_port_as_connector(uint16_t port)
 {
   assert(seastar::this_shard_id() == sid);
   assert(has_socket());
+  // Note: may not invoke on the socket core
   socket->learn_ephemeral_port_as_connector(port);
 }
 
@@ -219,74 +225,130 @@ seastar::future<> FrameAssemblerV2::close_shutdown_socket()
   });
 }
 
+template <bool may_cross_core>
 seastar::future<ceph::bufferptr>
 FrameAssemblerV2::read_exactly(std::size_t bytes)
 {
   assert(seastar::this_shard_id() == sid);
   assert(has_socket());
-  if (unlikely(record_io)) {
-    return socket->read_exactly(bytes
-    ).then([this](auto bptr) {
-      rxbuf.append(bptr);
+  if constexpr (may_cross_core) {
+    assert(conn.get_messenger_shard_id() == sid);
+    return seastar::smp::submit_to(
+        socket->get_shard_id(), [this, bytes] {
+      return socket->read_exactly(bytes);
+    }).then([this](auto bptr) {
+      if (record_io) {
+        rxbuf.append(bptr);
+      }
       return bptr;
     });
   } else {
+    assert(socket->get_shard_id() == sid);
     return socket->read_exactly(bytes);
-  };
+  }
 }
+template seastar::future<ceph::bufferptr> FrameAssemblerV2::read_exactly<true>(std::size_t);
+template seastar::future<ceph::bufferptr> FrameAssemblerV2::read_exactly<false>(std::size_t);
 
+template <bool may_cross_core>
 seastar::future<ceph::bufferlist>
 FrameAssemblerV2::read(std::size_t bytes)
 {
   assert(seastar::this_shard_id() == sid);
   assert(has_socket());
-  if (unlikely(record_io)) {
-    return socket->read(bytes
-    ).then([this](auto buf) {
-      rxbuf.append(buf);
+  if constexpr (may_cross_core) {
+    assert(conn.get_messenger_shard_id() == sid);
+    return seastar::smp::submit_to(
+        socket->get_shard_id(), [this, bytes] {
+      return socket->read(bytes);
+    }).then([this](auto buf) {
+      if (record_io) {
+        rxbuf.append(buf);
+      }
       return buf;
     });
   } else {
+    assert(socket->get_shard_id() == sid);
     return socket->read(bytes);
   }
 }
+template seastar::future<ceph::bufferlist> FrameAssemblerV2::read<true>(std::size_t);
+template seastar::future<ceph::bufferlist> FrameAssemblerV2::read<false>(std::size_t);
 
+template <bool may_cross_core>
 seastar::future<>
 FrameAssemblerV2::write(ceph::bufferlist buf)
 {
   assert(seastar::this_shard_id() == sid);
   assert(has_socket());
-  if (unlikely(record_io)) {
-    txbuf.append(buf);
+  if constexpr (may_cross_core) {
+    assert(conn.get_messenger_shard_id() == sid);
+    if (record_io) {
+      txbuf.append(buf);
+    }
+    return seastar::smp::submit_to(
+        socket->get_shard_id(), [this, buf = std::move(buf)]() mutable {
+      return socket->write(std::move(buf));
+    });
+  } else {
+    assert(socket->get_shard_id() == sid);
+    return socket->write(std::move(buf));
   }
-  return socket->write(std::move(buf));
 }
+template seastar::future<> FrameAssemblerV2::write<true>(ceph::bufferlist);
+template seastar::future<> FrameAssemblerV2::write<false>(ceph::bufferlist);
 
+template <bool may_cross_core>
 seastar::future<>
 FrameAssemblerV2::flush()
 {
   assert(seastar::this_shard_id() == sid);
   assert(has_socket());
-  return socket->flush();
+  if constexpr (may_cross_core) {
+    assert(conn.get_messenger_shard_id() == sid);
+    return seastar::smp::submit_to(
+        socket->get_shard_id(), [this] {
+      return socket->flush();
+    });
+  } else {
+    assert(socket->get_shard_id() == sid);
+    return socket->flush();
+  }
 }
+template seastar::future<> FrameAssemblerV2::flush<true>();
+template seastar::future<> FrameAssemblerV2::flush<false>();
 
+template <bool may_cross_core>
 seastar::future<>
 FrameAssemblerV2::write_flush(ceph::bufferlist buf)
 {
   assert(seastar::this_shard_id() == sid);
   assert(has_socket());
-  if (unlikely(record_io)) {
-    txbuf.append(buf);
+  if constexpr (may_cross_core) {
+    assert(conn.get_messenger_shard_id() == sid);
+    if (unlikely(record_io)) {
+      txbuf.append(buf);
+    }
+    return seastar::smp::submit_to(
+        socket->get_shard_id(), [this, buf = std::move(buf)]() mutable {
+      return socket->write_flush(std::move(buf));
+    });
+  } else {
+    assert(socket->get_shard_id() == sid);
+    return socket->write_flush(std::move(buf));
   }
-  return socket->write_flush(std::move(buf));
 }
+template seastar::future<> FrameAssemblerV2::write_flush<true>(ceph::bufferlist);
+template seastar::future<> FrameAssemblerV2::write_flush<false>(ceph::bufferlist);
 
+template <bool may_cross_core>
 seastar::future<FrameAssemblerV2::read_main_t>
 FrameAssemblerV2::read_main_preamble()
 {
   assert(seastar::this_shard_id() == sid);
   rx_preamble.clear();
-  return read_exactly(rx_frame_asm.get_preamble_onwire_len()
+  return read_exactly<may_cross_core>(
+    rx_frame_asm.get_preamble_onwire_len()
   ).then([this](auto bptr) {
     try {
       rx_preamble.append(std::move(bptr));
@@ -301,7 +363,10 @@ FrameAssemblerV2::read_main_preamble()
     }
   });
 }
+template seastar::future<FrameAssemblerV2::read_main_t> FrameAssemblerV2::read_main_preamble<true>();
+template seastar::future<FrameAssemblerV2::read_main_t> FrameAssemblerV2::read_main_preamble<false>();
 
+template <bool may_cross_core>
 seastar::future<FrameAssemblerV2::read_payload_t*>
 FrameAssemblerV2::read_frame_payload()
 {
@@ -321,7 +386,7 @@ FrameAssemblerV2::read_frame_payload()
       }
       uint32_t onwire_len = rx_frame_asm.get_segment_onwire_len(seg_idx);
       // TODO: create aligned and contiguous buffer from socket
-      return read_exactly(onwire_len
+      return read_exactly<may_cross_core>(onwire_len
       ).then([this](auto bptr) {
         logger().trace("{} RECV({}) frame segment[{}]",
                        conn, bptr.length(), rx_segments_data.size());
@@ -331,7 +396,7 @@ FrameAssemblerV2::read_frame_payload()
       });
     }
   ).then([this] {
-    return read_exactly(rx_frame_asm.get_epilogue_onwire_len());
+    return read_exactly<may_cross_core>(rx_frame_asm.get_epilogue_onwire_len());
   }).then([this](auto bptr) {
     logger().trace("{} RECV({}) frame epilogue", conn, bptr.length());
     bool ok = false;
@@ -355,6 +420,8 @@ FrameAssemblerV2::read_frame_payload()
     return &rx_segments_data;
   });
 }
+template seastar::future<FrameAssemblerV2::read_payload_t*> FrameAssemblerV2::read_frame_payload<true>();
+template seastar::future<FrameAssemblerV2::read_payload_t*> FrameAssemblerV2::read_frame_payload<false>();
 
 void FrameAssemblerV2::log_main_preamble(const ceph::bufferlist &bl)
 {
index 0cc495574c2fe7ca1e4bfcdaaa15d506be1746b3..c2d3318f87d6243e96fb410cf2ab97af00e619f1 100644 (file)
@@ -83,14 +83,19 @@ public:
    * socket read and write interfaces
    */
 
+  template <bool may_cross_core = true>
   seastar::future<ceph::bufferptr> read_exactly(std::size_t bytes);
 
+  template <bool may_cross_core = true>
   seastar::future<ceph::bufferlist> read(std::size_t bytes);
 
+  template <bool may_cross_core = true>
   seastar::future<> write(ceph::bufferlist);
 
+  template <bool may_cross_core = true>
   seastar::future<> flush();
 
+  template <bool may_cross_core = true>
   seastar::future<> write_flush(ceph::bufferlist);
 
   /*
@@ -102,11 +107,13 @@ public:
     ceph::msgr::v2::Tag tag;
     const ceph::msgr::v2::FrameAssembler *rx_frame_asm;
   };
+  template <bool may_cross_core = true>
   seastar::future<read_main_t> read_main_preamble();
 
   /// may throw negotiation_failure as fault
   using read_payload_t = ceph::msgr::v2::segment_bls_t;
   // FIXME: read_payload_t cannot be no-throw move constructible
+  template <bool may_cross_core = true>
   seastar::future<read_payload_t*> read_frame_payload();
 
   template <class F>
@@ -120,11 +127,11 @@ public:
     return bl;
   }
 
-  template <class F>
+  template <class F, bool may_cross_core = true>
   seastar::future<> write_flush_frame(F &tx_frame) {
     assert(seastar::this_shard_id() == sid);
     auto bl = get_buffer(tx_frame);
-    return write_flush(std::move(bl));
+    return write_flush<may_cross_core>(std::move(bl));
   }
 
   static FrameAssemblerV2Ref create(SocketConnection &conn);
@@ -148,10 +155,14 @@ private:
   // different from the socket sid.
   bool is_socket_shutdown = false;
 
+  // the current working shard, can be messenger or socket shard.
+  // if is messenger shard, should call interfaces with may_cross_core = true.
   seastar::shard_id sid;
 
   /*
    * auth signature
+   *
+   * only in the messenger core
    */
 
   bool record_io = false;
@@ -179,6 +190,10 @@ private:
     &session_stream_handlers, is_rev1, common::local_conf()->ms_crc_data,
     &session_comp_handlers};
 
+  // in the messenger core during handshake,
+  // and in the socket core during open,
+  // must be cleaned before switching cores.
+
   ceph::bufferlist rx_preamble;
 
   read_payload_t rx_segments_data;
index 0c0cdc76a2cdac54e6ae677f72a6b5252c911030..9fcccde53c4ce038bc14f3a4eb17e2a67b3c1b47 100644 (file)
@@ -372,7 +372,7 @@ void IOHandler::ack_out_sent(seq_num_t seq)
 
 seastar::future<stop_t> IOHandler::try_exit_out_dispatch() {
   assert(!is_out_queued());
-  return frame_assembler->flush(
+  return frame_assembler->flush<false>(
   ).then([this] {
     if (!is_out_queued()) {
       // still nothing pending to send after flush,
@@ -405,7 +405,7 @@ seastar::future<> IOHandler::do_out_dispatch()
       }
       auto to_ack = ack_left;
       assert(to_ack == 0 || in_seq > 0);
-      return frame_assembler->write(
+      return frame_assembler->write<false>(
         sweep_out_pending_msgs_to_sent(
           need_keepalive, next_keepalive_ack, to_ack > 0)
       ).then([this, prv_keepalive_ack=next_keepalive_ack, to_ack] {
@@ -505,7 +505,7 @@ void IOHandler::notify_out_dispatch()
 seastar::future<>
 IOHandler::read_message(utime_t throttle_stamp, std::size_t msg_size)
 {
-  return frame_assembler->read_frame_payload(
+  return frame_assembler->read_frame_payload<false>(
   ).then([this, throttle_stamp, msg_size](auto payload) {
     if (unlikely(io_state != io_state_t::open)) {
       logger().debug("{} triggered {} during read_message()",
@@ -623,7 +623,7 @@ void IOHandler::do_in_dispatch()
   in_exit_dispatching = seastar::promise<>();
   gate.dispatch_in_background("do_in_dispatch", conn, [this] {
     return seastar::keep_doing([this] {
-      return frame_assembler->read_main_preamble(
+      return frame_assembler->read_main_preamble<false>(
       ).then([this](auto ret) {
         switch (ret.tag) {
           case Tag::MESSAGE: {
@@ -656,7 +656,7 @@ void IOHandler::do_in_dispatch()
             });
           }
           case Tag::ACK:
-            return frame_assembler->read_frame_payload(
+            return frame_assembler->read_frame_payload<false>(
             ).then([this](auto payload) {
               // handle_message_ack() logic
               auto ack = AckFrame::Decode(payload->back());
@@ -664,7 +664,7 @@ void IOHandler::do_in_dispatch()
               ack_out_sent(ack.seq());
             });
           case Tag::KEEPALIVE2:
-            return frame_assembler->read_frame_payload(
+            return frame_assembler->read_frame_payload<false>(
             ).then([this](auto payload) {
               // handle_keepalive2() logic
               auto keepalive_frame = KeepAliveFrame::Decode(payload->back());
@@ -677,7 +677,7 @@ void IOHandler::do_in_dispatch()
               last_keepalive = seastar::lowres_system_clock::now();
             });
           case Tag::KEEPALIVE2_ACK:
-            return frame_assembler->read_frame_payload(
+            return frame_assembler->read_frame_payload<false>(
             ).then([this](auto payload) {
               // handle_keepalive2_ack() logic
               auto keepalive_ack_frame = KeepAliveFrameAck::Decode(payload->back());