]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/net: wrap message with foreign-ptr in the send path
authorYingxin Cheng <yingxin.cheng@intel.com>
Wed, 26 Apr 2023 08:33:28 +0000 (16:33 +0800)
committerMatan Breizman <mbreizma@redhat.com>
Wed, 11 Oct 2023 11:38:31 +0000 (11:38 +0000)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
(cherry picked from commit db07ae9d5cf8e6ac8031eec0689c74a9d5f55aff)

src/crimson/net/Fwd.h
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h
src/crimson/net/io_handler.cc
src/crimson/net/io_handler.h

index 1dc4809810de27e2174b350c2e2c82d0041a5512..2b159514193c7f23b7c0faff3c872d0cac9267a8 100644 (file)
@@ -47,4 +47,6 @@ using dispatchers_t = boost::container::small_vector<Dispatcher*, NUM_DISPATCHER
 class Messenger;
 using MessengerRef = seastar::shared_ptr<Messenger>;
 
+using MessageFRef = seastar::foreign_ptr<MessageURef>;
+
 } // namespace crimson::net
index 1cbc3cf3fbadc56892ae123589cc3c233a74a9a3..70abd6fc62fe42f33f63517f88a805576f4ce62a 100644 (file)
@@ -68,8 +68,9 @@ bool SocketConnection::peer_wins() const
   return (messenger.get_myaddr() > peer_addr || policy.server);
 }
 
-seastar::future<> SocketConnection::send(MessageURef msg)
+seastar::future<> SocketConnection::send(MessageURef _msg)
 {
+  MessageFRef msg = seastar::make_foreign(std::move(_msg));
   return seastar::smp::submit_to(
     io_handler->get_shard_id(),
     [this, msg=std::move(msg)]() mutable {
index c986370fc015f59171c19f49303b286c2e697808..d1d4ae8a71201a028b60ac5332a4b9f6f9da468a 100644 (file)
@@ -52,7 +52,7 @@ public:
 
   virtual bool is_connected() const = 0;
 
-  virtual seastar::future<> send(MessageURef) = 0;
+  virtual seastar::future<> send(MessageFRef) = 0;
 
   virtual seastar::future<> send_keepalive() = 0;
 
index 63e0c93c9fd9cb88aca5f9267900bbea27ce5dcc..70e6650c699cb44110509b356dbb9e77045a409b 100644 (file)
@@ -85,7 +85,7 @@ ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent(
   std::for_each(
       out_pending_msgs.begin(),
       out_pending_msgs.begin()+num_msgs,
-      [this, &bl](const MessageURef& msg) {
+      [this, &bl](const MessageFRef& msg) {
     // set priority
     msg->get_header().src = conn.messenger.get_myname();
 
@@ -122,7 +122,7 @@ ceph::bufferlist IOHandler::sweep_out_pending_msgs_to_sent(
   return bl;
 }
 
-seastar::future<> IOHandler::send(MessageURef msg)
+seastar::future<> IOHandler::send(MessageFRef msg)
 {
   ceph_assert_always(seastar::this_shard_id() == sid);
   if (io_state != io_state_t::drop) {
@@ -272,7 +272,7 @@ void IOHandler::requeue_out_sent()
   out_seq -= out_sent_msgs.size();
   logger().debug("{} requeue {} items, revert out_seq to {}",
                  conn, out_sent_msgs.size(), out_seq);
-  for (MessageURef& msg : out_sent_msgs) {
+  for (MessageFRef& msg : out_sent_msgs) {
     msg->clear_payload();
     msg->set_seq(0);
   }
index 3f2d6f9a453a6f9385124d1d9daf65ae787917f6..4fbe33960a368802a1cd619c413cec5117ab4c51 100644 (file)
@@ -70,7 +70,7 @@ private:
     return protocol_is_connected;
   }
 
-  seastar::future<> send(MessageURef msg) final;
+  seastar::future<> send(MessageFRef msg) final;
 
   seastar::future<> send_keepalive() final;
 
@@ -227,10 +227,10 @@ private:
   seq_num_t out_seq = 0;
 
   // messages to be resent after connection gets reset
-  std::deque<MessageURef> out_pending_msgs;
+  std::deque<MessageFRef> out_pending_msgs;
 
   // messages sent, but not yet acked by peer
-  std::deque<MessageURef> out_sent_msgs;
+  std::deque<MessageFRef> out_sent_msgs;
 
   bool need_keepalive = false;