]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: change ConnectionRef to be a local_shared_foreign_ptr
authorYingxin Cheng <yingxin.cheng@intel.com>
Mon, 3 Apr 2023 02:00:11 +0000 (10:00 +0800)
committerMatan Breizman <mbreizma@redhat.com>
Tue, 23 May 2023 13:20:51 +0000 (13:20 +0000)
Make it possible for connections and messages to be dispatched in
arbitrary core without asynchronous foreign copy.

The local_shared_foreign_ptr conn cannot be moved to another core
implicitly, maintain it outside Message independently.

Do asynchronous foreign copy to the new ConnectionRef only in
with_remote_shard_state_and_op().

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
(cherry picked from commit b54153ecaddda890401d12d6c5ceadb0030df4f1)

32 files changed:
src/crimson/net/Fwd.h
src/crimson/net/Interceptor.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h
src/crimson/net/SocketMessenger.cc
src/crimson/net/io_handler.cc
src/crimson/net/io_handler.h
src/crimson/osd/ops_executer.cc
src/crimson/osd/ops_executer.h
src/crimson/osd/osd_operations/client_request.cc
src/crimson/osd/osd_operations/client_request.h
src/crimson/osd/osd_operations/logmissing_request.cc
src/crimson/osd/osd_operations/logmissing_request.h
src/crimson/osd/osd_operations/logmissing_request_reply.h
src/crimson/osd/osd_operations/peering_event.h
src/crimson/osd/osd_operations/recovery_subrequest.cc
src/crimson/osd/osd_operations/recovery_subrequest.h
src/crimson/osd/osd_operations/replicated_request.h
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/crimson/osd/pg_shard_manager.h
src/crimson/osd/recovery_backend.cc
src/crimson/osd/recovery_backend.h
src/crimson/osd/replicated_recovery_backend.cc
src/crimson/osd/replicated_recovery_backend.h
src/crimson/osd/watch.cc
src/crimson/osd/watch.h
src/msg/Message.cc
src/msg/Message.h
src/test/crimson/test_messenger.cc
src/test/crimson/test_messenger_thrash.cc

index 7ccd3fe35dd034b6ca5f92bb7fc875cc2ff45cee..3eb57ef9781307d83ca4dd7efdded71f24116362 100644 (file)
@@ -25,6 +25,7 @@
 #include "msg/msg_types.h"
 
 #include "crimson/common/errorator.h"
+#include "crimson/common/local_shared_foreign_ptr.h"
 
 class AuthConnectionMeta;
 
@@ -34,8 +35,11 @@ using msgr_tag_t = uint8_t;
 using stop_t = seastar::stop_iteration;
 
 class Connection;
-using ConnectionRef = seastar::shared_ptr<Connection>;
-using ConnectionFRef = seastar::foreign_ptr<ConnectionRef>;
+using ConnectionLRef = seastar::shared_ptr<Connection>;
+using ConnectionFRef = seastar::foreign_ptr<ConnectionLRef>;
+using ConnectionRef = ::crimson::local_shared_foreign_ptr<ConnectionLRef>;
+
+class SocketConnection;
 
 class Dispatcher;
 class ChainedDispatchers;
index cc4dc77d6c51cc75b4b5e3c54e51091064d6364c..41ec31f3755768d538ea8c66c0689c518d3d8551 100644 (file)
@@ -116,11 +116,11 @@ struct Breakpoint {
 struct Interceptor {
   socket_blocker blocker;
   virtual ~Interceptor() {}
-  virtual void register_conn(Connection& conn) = 0;
-  virtual void register_conn_ready(Connection& conn) = 0;
-  virtual void register_conn_closed(Connection& conn) = 0;
-  virtual void register_conn_replaced(Connection& conn) = 0;
-  virtual bp_action_t intercept(Connection& conn, Breakpoint bp) = 0;
+  virtual void register_conn(SocketConnection& conn) = 0;
+  virtual void register_conn_ready(SocketConnection& conn) = 0;
+  virtual void register_conn_closed(SocketConnection& conn) = 0;
+  virtual void register_conn_replaced(SocketConnection& conn) = 0;
+  virtual bp_action_t intercept(SocketConnection& conn, Breakpoint bp) = 0;
 };
 
 } // namespace crimson::net
index edaee1075df4cf5ad8b2869b3b76b6c16dbc11dd..95b756637f4e8a8de6ab2e2d49d3a2c9e65aa2f6 100644 (file)
@@ -107,7 +107,7 @@ namespace crimson::net {
 // should be consistent to intercept_frame() in FrameAssemblerV2.cc
 void intercept(Breakpoint bp,
                bp_type_t type,
-               Connection& conn,
+               SocketConnection& conn,
                Interceptor *interceptor,
                SocketRef& socket) {
   if (interceptor) {
index aa7fcc027790d7f4aa113bc1589bca1d82ed8434..38e2748738f7b2b39980a92f7167c98c8c71bcad 100644 (file)
 #endif
 
 using std::ostream;
-using namespace crimson::net;
 using crimson::common::local_conf;
 
+namespace crimson::net {
+
 SocketConnection::SocketConnection(SocketMessenger& messenger,
                                    ChainedDispatchers& dispatchers)
   : core(messenger.shard_id()),
@@ -137,6 +138,14 @@ seastar::socket_address SocketConnection::get_local_address() const {
   return socket->get_local_address();
 }
 
+ConnectionRef
+SocketConnection::get_local_shared_foreign_from_this()
+{
+  assert(seastar::this_shard_id() == shard_id());
+  return make_local_shared_foreign(
+      seastar::make_foreign(shared_from_this()));
+}
+
 void SocketConnection::print(ostream& out) const {
     out << (void*)this << " ";
     messenger.print(out);
@@ -150,3 +159,5 @@ void SocketConnection::print(ostream& out) const {
           << " >> " << get_peer_name() << " " << peer_addr;
     }
 }
+
+} // namespace crimson::net
index f6ef6f49753315b26323e6e8c63bb71e84c039bd..aa791b6e1701686c26006ae312a822070abe59ca 100644 (file)
@@ -25,7 +25,6 @@ namespace crimson::net {
 
 class ProtocolV2;
 class SocketMessenger;
-class SocketConnection;
 using SocketConnectionRef = seastar::shared_ptr<SocketConnection>;
 
 #ifdef UNIT_TESTS_BUILT
@@ -166,6 +165,8 @@ class SocketConnection : public Connection {
     return messenger;
   }
 
+  ConnectionRef get_local_shared_foreign_from_this();
+
 private:
   seastar::shard_id shard_id() const;
 
index da8942483152271d32b682a75bd144df297b848e..a112b50800d4a0f6c6daca0127aa209496485112 100644 (file)
@@ -239,12 +239,12 @@ SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_name_t& pe
 
   if (auto found = lookup_conn(peer_addr); found) {
     logger().debug("{} connect to existing", *found);
-    return found->shared_from_this();
+    return found->get_local_shared_foreign_from_this();
   }
   SocketConnectionRef conn =
     seastar::make_shared<SocketConnection>(*this, dispatchers);
   conn->start_connect(peer_addr, peer_name);
-  return conn->shared_from_this();
+  return conn->get_local_shared_foreign_from_this();
 }
 
 seastar::future<> SocketMessenger::shutdown()
index de296e64e23a3985f80b40a78054621b43aa7845..80d578363282e8c4b364cfd7d694328e5be5c10c 100644 (file)
@@ -48,7 +48,8 @@ namespace crimson::net {
 IOHandler::IOHandler(ChainedDispatchers &dispatchers,
                      SocketConnection &conn)
   : dispatchers(dispatchers),
-    conn(conn)
+    conn(conn),
+    conn_ref(conn.get_local_shared_foreign_from_this())
 {}
 
 IOHandler::~IOHandler()
@@ -318,8 +319,7 @@ void IOHandler::dispatch_accept()
   // protocol_is_connected can be from true to true here if the replacing is
   // happening to a connected connection.
   protocol_is_connected = true;
-  dispatchers.ms_handle_accept(
-    seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+  dispatchers.ms_handle_accept(conn_ref);
 }
 
 void IOHandler::dispatch_connect()
@@ -329,8 +329,7 @@ void IOHandler::dispatch_connect()
   }
   ceph_assert_always(protocol_is_connected == false);
   protocol_is_connected = true;
-  dispatchers.ms_handle_connect(
-    seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+  dispatchers.ms_handle_connect(conn_ref);
 }
 
 void IOHandler::dispatch_reset(bool is_replace)
@@ -340,9 +339,7 @@ void IOHandler::dispatch_reset(bool is_replace)
     return;
   }
   need_dispatch_reset = false;
-  dispatchers.ms_handle_reset(
-    seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
-    is_replace);
+  dispatchers.ms_handle_reset(conn_ref, is_replace);
 }
 
 void IOHandler::dispatch_remote_reset()
@@ -350,8 +347,7 @@ void IOHandler::dispatch_remote_reset()
   if (io_state == io_state_t::drop) {
     return;
   }
-  dispatchers.ms_handle_remote_reset(
-    seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
+  dispatchers.ms_handle_remote_reset(conn_ref);
 }
 
 void IOHandler::ack_out_sent(seq_num_t seq)
@@ -545,10 +541,8 @@ IOHandler::read_message(utime_t throttle_stamp, std::size_t msg_size)
     ceph_msg_footer footer{ceph_le32(0), ceph_le32(0),
                            ceph_le32(0), ceph_le64(0), current_header.flags};
 
-    auto conn_ref = seastar::static_pointer_cast<SocketConnection>(
-        conn.shared_from_this());
     Message *message = decode_message(nullptr, 0, header, footer,
-        msg_frame.front(), msg_frame.middle(), msg_frame.data(), conn_ref);
+        msg_frame.front(), msg_frame.middle(), msg_frame.data(), nullptr);
     if (!message) {
       logger().warn("{} decode message failed", conn);
       abort_in_fault();
index e86667a04e5ee2c796abe713c8247bcf590d932d..e04b6356e8674ea9d95854616ae99426d8276e80 100644 (file)
@@ -107,6 +107,10 @@ public:
     if (is_dispatch_reset) {
       dispatch_reset(is_replace);
     }
+
+    ceph_assert_always(conn_ref);
+    conn_ref.reset();
+
     assert(!gate.is_closed());
     return gate.close();
   }
@@ -183,6 +187,9 @@ private:
 
   SocketConnection &conn;
 
+  // core local reference for dispatching, valid until reset/close
+  ConnectionRef conn_ref;
+
   HandshakeListener *handshake_listener = nullptr;
 
   crimson::common::Gated gate;
index e5bfd839f7a0e59db724169f4607434c433dc5fb..040870203bd950ea0d4785a0eeb127a6224d0c09 100644 (file)
@@ -138,7 +138,8 @@ OpsExecuter::call_ierrorator::future<> OpsExecuter::do_op_call(OSDOp& osd_op)
 }
 
 static watch_info_t create_watch_info(const OSDOp& osd_op,
-                                      const OpsExecuter::ExecutableMessage& msg)
+                                      const OpsExecuter::ExecutableMessage& msg,
+                                      entity_addr_t peer_addr)
 {
   using crimson::common::local_conf;
   const uint32_t timeout =
@@ -147,7 +148,7 @@ static watch_info_t create_watch_info(const OSDOp& osd_op,
   return {
     osd_op.op.watch.cookie,
     timeout,
-    msg.get_connection()->get_peer_addr()
+    peer_addr
   };
 }
 
@@ -159,48 +160,47 @@ OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_watch(
   logger().debug("{}", __func__);
   struct connect_ctx_t {
     ObjectContext::watch_key_t key;
-    crimson::net::ConnectionFRef conn;
+    crimson::net::ConnectionRef conn;
     watch_info_t info;
 
     connect_ctx_t(
       const OSDOp& osd_op,
       const ExecutableMessage& msg,
-      crimson::net::ConnectionFRef conn)
+      crimson::net::ConnectionRef conn)
       : key(osd_op.op.watch.cookie, msg.get_reqid().name),
-        conn(std::move(conn)),
-        info(create_watch_info(osd_op, msg)) {
+        conn(conn),
+        info(create_watch_info(osd_op, msg, conn->get_peer_addr())) {
     }
   };
-  return get_message().get_connection().copy(
-  ).then([&, this](auto &&conn) {
-    return with_effect_on_obc(
-      connect_ctx_t{ osd_op, get_message(), std::move(conn) },
-      [&] (auto& ctx) {
-       const auto& entity = ctx.key.second;
-       auto [it, emplaced] =
-         os.oi.watchers.try_emplace(ctx.key, std::move(ctx.info));
-       if (emplaced) {
-         logger().info("registered new watch {} by {}", it->second, entity);
-         txn.nop();
-       } else {
-         logger().info("found existing watch {} by {}", it->second, entity);
-       }
-       return seastar::now();
-      },
-      [] (auto&& ctx, ObjectContextRef obc, Ref<PG> pg) {
-       assert(pg);
-       auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr);
-       if (emplaced) {
-         const auto& [cookie, entity] = ctx.key;
-         it->second = crimson::osd::Watch::create(
-           obc, ctx.info, entity, std::move(pg));
-         logger().info("op_effect: added new watcher: {}", ctx.key);
-       } else {
-         logger().info("op_effect: found existing watcher: {}", ctx.key);
-       }
-       return it->second->connect(std::move(ctx.conn), true /* will_ping */);
-      });
-  });
+
+  return with_effect_on_obc(
+    connect_ctx_t{ osd_op, get_message(), conn },
+    [&](auto& ctx) {
+      const auto& entity = ctx.key.second;
+      auto [it, emplaced] =
+        os.oi.watchers.try_emplace(ctx.key, std::move(ctx.info));
+      if (emplaced) {
+        logger().info("registered new watch {} by {}", it->second, entity);
+        txn.nop();
+      } else {
+        logger().info("found existing watch {} by {}", it->second, entity);
+      }
+      return seastar::now();
+    },
+    [](auto&& ctx, ObjectContextRef obc, Ref<PG> pg) {
+      assert(pg);
+      auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr);
+      if (emplaced) {
+        const auto& [cookie, entity] = ctx.key;
+        it->second = crimson::osd::Watch::create(
+          obc, ctx.info, entity, std::move(pg));
+        logger().info("op_effect: added new watcher: {}", ctx.key);
+      } else {
+        logger().info("op_effect: found existing watcher: {}", ctx.key);
+      }
+      return it->second->connect(std::move(ctx.conn), true /* will_ping */);
+    }
+  );
 }
 
 OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_reconnect(
@@ -323,57 +323,56 @@ OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_notify(
     return crimson::ct_error::enoent::make();
   }
   struct notify_ctx_t {
-    crimson::net::ConnectionFRef conn;
+    crimson::net::ConnectionRef conn;
     notify_info_t ninfo;
     const uint64_t client_gid;
     const epoch_t epoch;
 
-    notify_ctx_t(const ExecutableMessage& msg, crimson::net::ConnectionFRef conn)
-      : conn(std::move(conn)),
+    notify_ctx_t(const ExecutableMessage& msg,
+                 crimson::net::ConnectionRef conn)
+      : conn(conn),
         client_gid(msg.get_reqid().name.num()),
         epoch(msg.get_map_epoch()) {
     }
   };
-  return get_message().get_connection().copy(
-  ).then([&, this](auto &&conn) {
-    return with_effect_on_obc(
-      notify_ctx_t{ get_message(), std::move(conn) },
-      [&] (auto& ctx) {
-       try {
-         auto bp = osd_op.indata.cbegin();
-         uint32_t ver; // obsolete
-         ceph::decode(ver, bp);
-         ceph::decode(ctx.ninfo.timeout, bp);
-         ceph::decode(ctx.ninfo.bl, bp);
-       } catch (const buffer::error&) {
-         ctx.ninfo.timeout = 0;
-       }
-       if (!ctx.ninfo.timeout) {
-         using crimson::common::local_conf;
-         ctx.ninfo.timeout = local_conf()->osd_default_notify_timeout;
-       }
-       ctx.ninfo.notify_id = get_next_notify_id(ctx.epoch);
-       ctx.ninfo.cookie = osd_op.op.notify.cookie;
-       // return our unique notify id to the client
-       ceph::encode(ctx.ninfo.notify_id, osd_op.outdata);
-       return seastar::now();
-      },
-      [] (auto&& ctx, ObjectContextRef obc, Ref<PG>) {
-       auto alive_watchers = obc->watchers | boost::adaptors::map_values
-         | boost::adaptors::filtered(
-           [] (const auto& w) {
-             // FIXME: filter as for the `is_ping` in `Watch::start_notify`
-             return w->is_alive();
-           });
-       return crimson::osd::Notify::create_n_propagate(
-         std::begin(alive_watchers),
-         std::end(alive_watchers),
-         std::move(ctx.conn),
-         ctx.ninfo,
-         ctx.client_gid,
-         obc->obs.oi.user_version);
-      });
-  });
+  return with_effect_on_obc(
+    notify_ctx_t{ get_message(), conn },
+    [&](auto& ctx) {
+      try {
+        auto bp = osd_op.indata.cbegin();
+        uint32_t ver; // obsolete
+        ceph::decode(ver, bp);
+        ceph::decode(ctx.ninfo.timeout, bp);
+        ceph::decode(ctx.ninfo.bl, bp);
+      } catch (const buffer::error&) {
+        ctx.ninfo.timeout = 0;
+      }
+      if (!ctx.ninfo.timeout) {
+        using crimson::common::local_conf;
+        ctx.ninfo.timeout = local_conf()->osd_default_notify_timeout;
+      }
+      ctx.ninfo.notify_id = get_next_notify_id(ctx.epoch);
+      ctx.ninfo.cookie = osd_op.op.notify.cookie;
+      // return our unique notify id to the client
+      ceph::encode(ctx.ninfo.notify_id, osd_op.outdata);
+      return seastar::now();
+    },
+    [](auto&& ctx, ObjectContextRef obc, Ref<PG>) {
+      auto alive_watchers = obc->watchers | boost::adaptors::map_values
+        | boost::adaptors::filtered(
+          [] (const auto& w) {
+            // FIXME: filter as for the `is_ping` in `Watch::start_notify`
+            return w->is_alive();
+          });
+      return crimson::osd::Notify::create_n_propagate(
+        std::begin(alive_watchers),
+        std::end(alive_watchers),
+        std::move(ctx.conn),
+        ctx.ninfo,
+        ctx.client_gid,
+        obc->obs.oi.user_version);
+    }
+  );
 }
 
 OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_list_watchers(
@@ -1046,11 +1045,13 @@ OpsExecuter::OpsExecuter(Ref<PG> pg,
                          ObjectContextRef _obc,
                          const OpInfo& op_info,
                          abstracted_msg_t&& msg,
+                         crimson::net::ConnectionRef conn,
                          const SnapContext& _snapc)
   : pg(std::move(pg)),
     obc(std::move(_obc)),
     op_info(op_info),
     msg(std::move(msg)),
+    conn(conn),
     snapc(_snapc)
 {
   if (op_info.may_write() && should_clone(*obc, snapc)) {
index 14b2881d695f8b234fef43f12bed44f98838713a..697adffdb2184f92a039ce47ff23cb66498dbb07 100644 (file)
@@ -100,7 +100,6 @@ public:
   // with other message types than just the `MOSDOp`. The type erasure
   // happens in the ctor of `OpsExecuter`.
   struct ExecutableMessage {
-    virtual const crimson::net::ConnectionFRef &get_connection() const = 0;
     virtual osd_reqid_t get_reqid() const = 0;
     virtual utime_t get_mtime() const = 0;
     virtual epoch_t get_map_epoch() const = 0;
@@ -115,9 +114,6 @@ public:
   public:
     ExecutableMessagePimpl(const ImplT* pimpl) : pimpl(pimpl) {
     }
-    const crimson::net::ConnectionFRef &get_connection() const final {
-      return pimpl->get_connection();
-    }
     osd_reqid_t get_reqid() const final {
       return pimpl->get_reqid();
     }
@@ -177,6 +173,7 @@ private:
     ceph::static_ptr<ExecutableMessage,
                      sizeof(ExecutableMessagePimpl<void>)>;
   abstracted_msg_t msg;
+  crimson::net::ConnectionRef conn;
   std::optional<osd_op_params_t> osd_op_params;
   bool user_modify = false;
   ceph::os::Transaction txn;
@@ -363,6 +360,7 @@ private:
               ObjectContextRef obc,
               const OpInfo& op_info,
               abstracted_msg_t&& msg,
+              crimson::net::ConnectionRef conn,
               const SnapContext& snapc);
 
 public:
@@ -371,6 +369,7 @@ public:
               ObjectContextRef obc,
               const OpInfo& op_info,
               const MsgT& msg,
+              crimson::net::ConnectionRef conn,
               const SnapContext& snapc)
     : OpsExecuter(
         std::move(pg),
@@ -379,6 +378,7 @@ public:
         abstracted_msg_t{
           std::in_place_type_t<ExecutableMessagePimpl<MsgT>>{},
           &msg},
+        conn,
         snapc) {
   }
 
index 298eac501dffdaa9041f66b4e67edb051e8cbcf9..d23a82ab0524379e340fb3700ecc2ba6539f1e15 100644 (file)
@@ -308,7 +308,7 @@ ClientRequest::do_process(
                      __func__, m->get_hobj());
     }
   }
-  return pg->do_osd_ops(m, obc, op_info, snapc).safe_then_unpack_interruptible(
+  return pg->do_osd_ops(m, conn, obc, op_info, snapc).safe_then_unpack_interruptible(
     [this, pg, &ihref](auto submitted, auto all_completed) mutable {
       return submitted.then_interruptible([this, pg, &ihref] {
        return ihref.enter_stage<interruptor>(pp(*pg).wait_repop, *this);
index e7620b6af96afa26c2bb22c09fa206b7bcb55473..4338ac4169759120a5555afe1c0318834f7fc86d 100644 (file)
@@ -32,7 +32,7 @@ class ClientRequest final : public PhasedOperationT<ClientRequest>,
   // used by put_historic
   ShardServices *put_historic_shard_services = nullptr;
 
-  crimson::net::ConnectionFRef conn;
+  crimson::net::ConnectionRef conn;
   // must be after conn due to ConnectionPipeline's life-time
   Ref<MOSDOp> m;
   OpInfo op_info;
@@ -202,10 +202,23 @@ public:
   spg_t get_pgid() const {
     return m->get_spg();
   }
-  ConnectionPipeline &get_connection_pipeline();
   PipelineHandle &get_handle() { return instance_handle->handle; }
   epoch_t get_epoch() const { return m->get_min_epoch(); }
 
+  ConnectionPipeline &get_connection_pipeline();
+  seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
+    assert(conn);
+    return conn.get_foreign(
+    ).then([this](auto f_conn) {
+      conn.reset();
+      return f_conn;
+    });
+  }
+  void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
+    assert(!conn);
+    conn = make_local_shared_foreign(std::move(_conn));
+  }
+
   seastar::future<> with_pg_int(
     ShardServices &shard_services, Ref<PG> pg);
 
index eb89acdd00e6e5c51801121e882e46c465e6088d..5dfb290f945c3fccdecb39c9ed2ed02d8cfd028c 100644 (file)
@@ -61,7 +61,7 @@ seastar::future<> LogMissingRequest::with_pg(
 
   IRef ref = this;
   return interruptor::with_interruption([this, pg] {
-    return pg->do_update_log_missing(req);
+    return pg->do_update_log_missing(req, conn);
   }, [ref](std::exception_ptr) { return seastar::now(); }, pg);
 }
 
index dc3cdfb01e4a98431ce2eb6ffe3fa868bbaacd7a..4ab87996f3afbf68f37b7b6d6a0031ab045cfeb8 100644 (file)
@@ -34,10 +34,23 @@ public:
   spg_t get_pgid() const {
     return req->get_spg();
   }
-  ConnectionPipeline &get_connection_pipeline();
   PipelineHandle &get_handle() { return handle; }
   epoch_t get_epoch() const { return req->get_min_epoch(); }
 
+  ConnectionPipeline &get_connection_pipeline();
+  seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
+    assert(conn);
+    return conn.get_foreign(
+    ).then([this](auto f_conn) {
+      conn.reset();
+      return f_conn;
+    });
+  }
+  void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
+    assert(!conn);
+    conn = make_local_shared_foreign(std::move(_conn));
+  }
+
   seastar::future<> with_pg(
     ShardServices &shard_services, Ref<PG> pg);
 
@@ -53,7 +66,7 @@ public:
 private:
   ClientRequest::PGPipeline &pp(PG &pg);
 
-  crimson::net::ConnectionFRef conn;
+  crimson::net::ConnectionRef conn;
   // must be after `conn` to ensure the ConnectionPipeline's is alive
   PipelineHandle handle;
   Ref<MOSDPGUpdateLogMissing> req;
index dbbce3294d9af3e2492a377c5272c5a0faa3c988..cb39e9f6c2b42eca1a360c9ce9bf6766270b6576 100644 (file)
@@ -34,10 +34,23 @@ public:
   spg_t get_pgid() const {
     return req->get_spg();
   }
-  ConnectionPipeline &get_connection_pipeline();
   PipelineHandle &get_handle() { return handle; }
   epoch_t get_epoch() const { return req->get_min_epoch(); }
 
+  ConnectionPipeline &get_connection_pipeline();
+  seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
+    assert(conn);
+    return conn.get_foreign(
+    ).then([this](auto f_conn) {
+      conn.reset();
+      return f_conn;
+    });
+  }
+  void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
+    assert(!conn);
+    conn = make_local_shared_foreign(std::move(_conn));
+  }
+
   seastar::future<> with_pg(
     ShardServices &shard_services, Ref<PG> pg);
 
@@ -53,7 +66,7 @@ public:
 private:
   ClientRequest::PGPipeline &pp(PG &pg);
 
-  crimson::net::ConnectionFRef conn;
+  crimson::net::ConnectionRef conn;
   // must be after `conn` to ensure the ConnectionPipeline's is alive
   PipelineHandle handle;
   Ref<MOSDPGUpdateLogMissingReply> req;
index 44442cc01eedecaad1105ec3bea976259b2d0684..d9c9da58a17fcc088d9579ab784c2717830f0976 100644 (file)
@@ -107,7 +107,7 @@ public:
 
 class RemotePeeringEvent : public PeeringEvent<RemotePeeringEvent> {
 protected:
-  crimson::net::ConnectionFRef conn;
+  crimson::net::ConnectionRef conn;
   // must be after conn due to ConnectionPipeline's life-time
   PipelineHandle handle;
 
@@ -153,9 +153,22 @@ public:
   spg_t get_pgid() const {
     return pgid;
   }
-  ConnectionPipeline &get_connection_pipeline();
   PipelineHandle &get_handle() { return handle; }
   epoch_t get_epoch() const { return evt.get_epoch_sent(); }
+
+  ConnectionPipeline &get_connection_pipeline();
+  seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
+    assert(conn);
+    return conn.get_foreign(
+    ).then([this](auto f_conn) {
+      conn.reset();
+      return f_conn;
+    });
+  }
+  void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
+    assert(!conn);
+    conn = make_local_shared_foreign(std::move(_conn));
+  }
 };
 
 class LocalPeeringEvent final : public PeeringEvent<LocalPeeringEvent> {
index 7e5dd76e0a8a7ac784d5fbb013ea6b193d4a4115..68655b8da517198147916e358e4f07cb077cf2b4 100644 (file)
@@ -30,7 +30,7 @@ seastar::future<> RecoverySubRequest::with_pg(
   track_event<StartEvent>();
   IRef opref = this;
   return interruptor::with_interruption([this, pgref] {
-    return pgref->get_recovery_backend()->handle_recovery_op(m);
+    return pgref->get_recovery_backend()->handle_recovery_op(m, conn);
   }, [](std::exception_ptr) {
     return seastar::now();
   }, pgref).finally([this, opref, pgref] {
index c7b7efb5182d9b68cc93cf0860ca2b331bc5b738..07c7c95b5e0fe1396015419e41434d3de12f6ce6 100644 (file)
@@ -37,10 +37,23 @@ public:
   spg_t get_pgid() const {
     return m->get_spg();
   }
-  ConnectionPipeline &get_connection_pipeline();
   PipelineHandle &get_handle() { return handle; }
   epoch_t get_epoch() const { return m->get_min_epoch(); }
 
+  ConnectionPipeline &get_connection_pipeline();
+  seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
+    assert(conn);
+    return conn.get_foreign(
+    ).then([this](auto f_conn) {
+      conn.reset();
+      return f_conn;
+    });
+  }
+  void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
+    assert(!conn);
+    conn = make_local_shared_foreign(std::move(_conn));
+  }
+
   seastar::future<> with_pg(
     ShardServices &shard_services, Ref<PG> pg);
 
@@ -55,7 +68,7 @@ public:
   > tracking_events;
 
 private:
-  crimson::net::ConnectionFRef conn;
+  crimson::net::ConnectionRef conn;
   // must be after `conn` to ensure the ConnectionPipeline's is alive
   PipelineHandle handle;
   Ref<MOSDFastDispatchOp> m;
index 2716fb0b9293b2847637e99051f6cb72312f8f90..78d97ecf439fa0d85cd3f332ec39ed0fd64df109 100644 (file)
@@ -34,10 +34,23 @@ public:
   spg_t get_pgid() const {
     return req->get_spg();
   }
-  ConnectionPipeline &get_connection_pipeline();
   PipelineHandle &get_handle() { return handle; }
   epoch_t get_epoch() const { return req->get_min_epoch(); }
 
+  ConnectionPipeline &get_connection_pipeline();
+  seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
+    assert(conn);
+    return conn.get_foreign(
+    ).then([this](auto f_conn) {
+      conn.reset();
+      return f_conn;
+    });
+  }
+  void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
+    assert(!conn);
+    conn = make_local_shared_foreign(std::move(_conn));
+  }
+
   seastar::future<> with_pg(
     ShardServices &shard_services, Ref<PG> pg);
 
@@ -55,7 +68,7 @@ public:
 private:
   ClientRequest::PGPipeline &pp(PG &pg);
 
-  crimson::net::ConnectionFRef conn;
+  crimson::net::ConnectionRef conn;
   PipelineHandle handle;
   Ref<MOSDRepOp> req;
 };
index 997ad0953fd2636f6eb06fbfaee364644c664225..2b4a9da79946f21209981879f736fb6ec9b79e44 100644 (file)
@@ -972,6 +972,7 @@ seastar::future<> PG::submit_error_log(
 PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<MURef<MOSDOpReply>>>
 PG::do_osd_ops(
   Ref<MOSDOp> m,
+  crimson::net::ConnectionRef conn,
   ObjectContextRef obc,
   const OpInfo &op_info,
   const SnapContext& snapc)
@@ -981,7 +982,7 @@ PG::do_osd_ops(
   }
   return do_osd_ops_execute<MURef<MOSDOpReply>>(
     seastar::make_lw_shared<OpsExecuter>(
-      Ref<PG>{this}, obc, op_info, *m, snapc),
+      Ref<PG>{this}, obc, op_info, *m, conn, snapc),
     m->ops,
     [this, m, obc, may_write = op_info.may_write(),
      may_read = op_info.may_read(), rvec = op_info.allows_returnvec()] {
@@ -1103,7 +1104,13 @@ PG::do_osd_ops(
     [=, this, &ops, &op_info](auto &msg_params) {
     return do_osd_ops_execute<void>(
       seastar::make_lw_shared<OpsExecuter>(
-        Ref<PG>{this}, std::move(obc), op_info, msg_params, SnapContext{}),
+        Ref<PG>{this},
+        std::move(obc),
+        op_info,
+        msg_params,
+        msg_params.get_connection(),
+        SnapContext{}
+      ),
       ops,
       std::move(success_func),
       std::move(failure_func));
@@ -1300,7 +1307,8 @@ void PG::handle_rep_op_reply(const MOSDRepOpReply& m)
 }
 
 PG::interruptible_future<> PG::do_update_log_missing(
-  Ref<MOSDPGUpdateLogMissing> m)
+  Ref<MOSDPGUpdateLogMissing> m,
+  crimson::net::ConnectionRef conn)
 {
   if (__builtin_expect(stopping, false)) {
     return seastar::make_exception_future<>(
@@ -1322,7 +1330,7 @@ PG::interruptible_future<> PG::do_update_log_missing(
 
   return interruptor::make_interruptible(shard_services.get_store().do_transaction(
     coll_ref, std::move(t))).then_interruptible(
-    [m, lcod=peering_state.get_info().last_complete, this] {
+    [m, conn, lcod=peering_state.get_info().last_complete, this] {
     if (!peering_state.pg_has_reset_since(m->get_epoch())) {
       peering_state.update_last_complete_ondisk(lcod);
       auto reply =
@@ -1334,7 +1342,7 @@ PG::interruptible_future<> PG::do_update_log_missing(
           m->get_tid(),
           lcod);
       reply->set_priority(CEPH_MSG_PRIO_HIGH);
-      return m->get_connection()->send(std::move(reply));
+      return conn->send(std::move(reply));
     }
     return seastar::now();
   });
index aecd295087526d99b17508990864127461cf92e1..f401151bdb71fb7a53b438a79c53fd1e1c3693d1 100644 (file)
@@ -534,7 +534,9 @@ public:
   void replica_clear_repop_obc(
     const std::vector<pg_log_entry_t> &logv);
   void handle_rep_op_reply(const MOSDRepOpReply& m);
-  interruptible_future<> do_update_log_missing(Ref<MOSDPGUpdateLogMissing> m);
+  interruptible_future<> do_update_log_missing(
+    Ref<MOSDPGUpdateLogMissing> m,
+    crimson::net::ConnectionRef conn);
   interruptible_future<> do_update_log_missing_reply(
                          Ref<MOSDPGUpdateLogMissingReply> m);
 
@@ -562,6 +564,7 @@ private:
                do_osd_ops_iertr::future<Ret>>;
   do_osd_ops_iertr::future<pg_rep_op_fut_t<MURef<MOSDOpReply>>> do_osd_ops(
     Ref<MOSDOp> m,
+    crimson::net::ConnectionRef conn,
     ObjectContextRef obc,
     const OpInfo &op_info,
     const SnapContext& snapc);
@@ -770,7 +773,7 @@ private:
 };
 
 struct PG::do_osd_ops_params_t {
-  crimson::net::ConnectionFRef &get_connection() const {
+  crimson::net::ConnectionRef &get_connection() const {
     return conn;
   }
   osd_reqid_t get_reqid() const {
@@ -791,8 +794,9 @@ struct PG::do_osd_ops_params_t {
   // Only used by InternalClientRequest, no op flags
   bool has_flag(uint32_t flag) const {
     return false;
- }
-  crimson::net::ConnectionFRef &conn;
+  }
+
+  crimson::net::ConnectionRef &conn;
   osd_reqid_t reqid;
   utime_t mtime;
   epoch_t map_epoch;
index fc56e64eb1b974d7e025f92caed2878847266e75..14d9c56b874c007be60937ad2d202ca0793c56fc 100644 (file)
@@ -148,6 +148,37 @@ public:
       });
   }
 
+  template <typename T, typename F>
+  auto with_remote_shard_state_and_op(
+      core_id_t core,
+      typename T::IRef &&op,
+      F &&f) {
+    ceph_assert(seastar::this_shard_id() == PRIMARY_CORE);
+    if (seastar::this_shard_id() == core) {
+      auto &target_shard_services = shard_services.local();
+      return std::invoke(
+        std::move(f),
+        target_shard_services.local_state,
+        target_shard_services,
+        std::move(op));
+    }
+    return op->prepare_remote_submission(
+    ).then([op=std::move(op), f=std::move(f), this, core
+           ](auto f_conn) mutable {
+      return shard_services.invoke_on(
+        core,
+        [f=std::move(f), op=std::move(op), f_conn=std::move(f_conn)
+        ](auto &target_shard_services) mutable {
+        op->finish_remote_submission(std::move(f_conn));
+        return std::invoke(
+          std::move(f),
+          target_shard_services.local_state,
+          target_shard_services,
+          std::move(op));
+      });
+    });
+  }
+
   /// Runs opref on the appropriate core, creating the pg as necessary.
   template <typename T>
   seastar::future<> run_with_pg_maybe_create(
@@ -163,11 +194,11 @@ public:
       op->get_pgid());
 
     get_local_state().registry.remove_from_registry(*op);
-    return with_remote_shard_state(
-      core,
-      [op=std::move(op)](
-       PerShardState &per_shard_state,
-       ShardServices &shard_services) mutable {
+    return with_remote_shard_state_and_op<T>(
+      core, std::move(op),
+      [](PerShardState &per_shard_state,
+         ShardServices &shard_services,
+         typename T::IRef op) {
        per_shard_state.registry.add_to_registry(*op);
        auto &logger = crimson::get_logger(ceph_subsys_osd);
        auto &opref = *op;
@@ -207,11 +238,11 @@ public:
       op->get_pgid());
 
     get_local_state().registry.remove_from_registry(*op);
-    return with_remote_shard_state(
-      core,
-      [op=std::move(op)](
-       PerShardState &per_shard_state,
-       ShardServices &shard_services) mutable {
+    return with_remote_shard_state_and_op<T>(
+      core, std::move(op),
+      [](PerShardState &per_shard_state,
+         ShardServices &shard_services,
+         typename T::IRef op) {
        per_shard_state.registry.add_to_registry(*op);
        auto &logger = crimson::get_logger(ceph_subsys_osd);
        auto &opref = *op;
index 7d6570bcf57098866174e3be4e43b4d6513a5bbe..b5394bfdc485dfc63b21e28cb7fc7cbd780a56e1 100644 (file)
@@ -69,7 +69,8 @@ void RecoveryBackend::WaitForObjectRecovery::stop() {
 }
 
 void RecoveryBackend::handle_backfill_finish(
-  MOSDPGBackfill& m)
+  MOSDPGBackfill& m,
+  crimson::net::ConnectionRef conn)
 {
   logger().debug("{}", __func__);
   ceph_assert(!pg.is_primary());
@@ -80,7 +81,7 @@ void RecoveryBackend::handle_backfill_finish(
     m.query_epoch,
     spg_t(pg.get_pgid().pgid, pg.get_primary().shard));
   reply->set_priority(pg.get_recovery_op_priority());
-  std::ignore = m.get_connection()->send(std::move(reply));
+  std::ignore = conn->send(std::move(reply));
   shard_services.start_operation<crimson::osd::LocalPeeringEvent>(
     static_cast<crimson::osd::PG*>(&pg),
     pg.get_pg_whoami(),
@@ -123,7 +124,8 @@ RecoveryBackend::handle_backfill_finish_ack(
 
 RecoveryBackend::interruptible_future<>
 RecoveryBackend::handle_backfill(
-  MOSDPGBackfill& m)
+  MOSDPGBackfill& m,
+  crimson::net::ConnectionRef conn)
 {
   logger().debug("{}", __func__);
   if (pg.old_peering_msg(m.map_epoch, m.query_epoch)) {
@@ -132,7 +134,7 @@ RecoveryBackend::handle_backfill(
   }
   switch (m.op) {
     case MOSDPGBackfill::OP_BACKFILL_FINISH:
-      handle_backfill_finish(m);
+      handle_backfill_finish(m, conn);
       [[fallthrough]];
     case MOSDPGBackfill::OP_BACKFILL_PROGRESS:
       return handle_backfill_progress(m);
@@ -224,7 +226,8 @@ RecoveryBackend::scan_for_backfill(
 
 RecoveryBackend::interruptible_future<>
 RecoveryBackend::handle_scan_get_digest(
-  MOSDPGScan& m)
+  MOSDPGScan& m,
+  crimson::net::ConnectionRef conn)
 {
   logger().debug("{}", __func__);
   if (false /* FIXME: check for backfill too full */) {
@@ -243,7 +246,7 @@ RecoveryBackend::handle_scan_get_digest(
     crimson::common::local_conf().get_val<std::int64_t>("osd_backfill_scan_min"),
     crimson::common::local_conf().get_val<std::int64_t>("osd_backfill_scan_max")
   ).then_interruptible(
-    [this, query_epoch=m.query_epoch, &conn=*(m.get_connection())
+    [this, query_epoch=m.query_epoch, conn
     ](auto backfill_interval) {
       auto reply = crimson::make_message<MOSDPGScan>(
        MOSDPGScan::OP_SCAN_DIGEST,
@@ -254,7 +257,7 @@ RecoveryBackend::handle_scan_get_digest(
        backfill_interval.begin,
        backfill_interval.end);
       encode(backfill_interval.objects, reply->get_data());
-      return conn.send(std::move(reply));
+      return conn->send(std::move(reply));
     });
 }
 
@@ -285,7 +288,8 @@ RecoveryBackend::handle_scan_digest(
 
 RecoveryBackend::interruptible_future<>
 RecoveryBackend::handle_scan(
-  MOSDPGScan& m)
+  MOSDPGScan& m,
+  crimson::net::ConnectionRef conn)
 {
   logger().debug("{}", __func__);
   if (pg.old_peering_msg(m.map_epoch, m.query_epoch)) {
@@ -294,7 +298,7 @@ RecoveryBackend::handle_scan(
   }
   switch (m.op) {
     case MOSDPGScan::OP_SCAN_GET_DIGEST:
-      return handle_scan_get_digest(m);
+      return handle_scan_get_digest(m, conn);
     case MOSDPGScan::OP_SCAN_DIGEST:
       return handle_scan_digest(m);
     default:
@@ -306,15 +310,16 @@ RecoveryBackend::handle_scan(
 
 RecoveryBackend::interruptible_future<>
 RecoveryBackend::handle_recovery_op(
-  Ref<MOSDFastDispatchOp> m)
+  Ref<MOSDFastDispatchOp> m,
+  crimson::net::ConnectionRef conn)
 {
   switch (m->get_header().type) {
   case MSG_OSD_PG_BACKFILL:
-    return handle_backfill(*boost::static_pointer_cast<MOSDPGBackfill>(m));
+    return handle_backfill(*boost::static_pointer_cast<MOSDPGBackfill>(m), conn);
   case MSG_OSD_PG_BACKFILL_REMOVE:
     return handle_backfill_remove(*boost::static_pointer_cast<MOSDPGBackfillRemove>(m));
   case MSG_OSD_PG_SCAN:
-    return handle_scan(*boost::static_pointer_cast<MOSDPGScan>(m));
+    return handle_scan(*boost::static_pointer_cast<MOSDPGScan>(m), conn);
   default:
     return seastar::make_exception_future<>(
        std::invalid_argument(fmt::format("invalid request type: {}",
index 64be1862074d87679d7ec93ff895970222f3a461..3eed892c22582f57fc6d1b1a425c637cf4d9a8a4 100644 (file)
@@ -65,7 +65,8 @@ public:
   }
 
   virtual interruptible_future<> handle_recovery_op(
-    Ref<MOSDFastDispatchOp> m);
+    Ref<MOSDFastDispatchOp> m,
+    crimson::net::ConnectionRef conn);
 
   virtual interruptible_future<> recover_object(
     const hobject_t& soid,
@@ -210,18 +211,23 @@ protected:
   virtual seastar::future<> on_stop() = 0;
 private:
   void handle_backfill_finish(
-    MOSDPGBackfill& m);
+    MOSDPGBackfill& m,
+    crimson::net::ConnectionRef conn);
   interruptible_future<> handle_backfill_progress(
     MOSDPGBackfill& m);
   interruptible_future<> handle_backfill_finish_ack(
     MOSDPGBackfill& m);
-  interruptible_future<> handle_backfill(MOSDPGBackfill& m);
+  interruptible_future<> handle_backfill(
+    MOSDPGBackfill& m,
+    crimson::net::ConnectionRef conn);
 
   interruptible_future<> handle_scan_get_digest(
-    MOSDPGScan& m);
+    MOSDPGScan& m,
+    crimson::net::ConnectionRef conn);
   interruptible_future<> handle_scan_digest(
     MOSDPGScan& m);
   interruptible_future<> handle_scan(
-    MOSDPGScan& m);
+    MOSDPGScan& m,
+    crimson::net::ConnectionRef conn);
   interruptible_future<> handle_backfill_remove(MOSDPGBackfillRemove& m);
 };
index b2e3dfbd6382675b3b8e9cc997a86fc31604325d..24b990d78b757e7ebbd1ff0cecd932a7073a7f3f 100644 (file)
@@ -1156,7 +1156,9 @@ ReplicatedRecoveryBackend::handle_recovery_delete_reply(
 }
 
 RecoveryBackend::interruptible_future<>
-ReplicatedRecoveryBackend::handle_recovery_op(Ref<MOSDFastDispatchOp> m)
+ReplicatedRecoveryBackend::handle_recovery_op(
+  Ref<MOSDFastDispatchOp> m,
+  crimson::net::ConnectionRef conn)
 {
   switch (m->get_header().type) {
   case MSG_OSD_PG_PULL:
@@ -1174,7 +1176,7 @@ ReplicatedRecoveryBackend::handle_recovery_op(Ref<MOSDFastDispatchOp> m)
        boost::static_pointer_cast<MOSDPGRecoveryDeleteReply>(m));
   default:
     // delegate to parent class for handling backend-agnostic recovery ops.
-    return RecoveryBackend::handle_recovery_op(std::move(m));
+    return RecoveryBackend::handle_recovery_op(std::move(m), conn);
   }
 }
 
index 1ada19b18a079a635c43ab6029f8aa3f24fa6b21..b023b7417e5fbab3ae9ad084242d583849807d2d 100644 (file)
@@ -23,7 +23,8 @@ public:
     : RecoveryBackend(pg, shard_services, coll, backend)
   {}
   interruptible_future<> handle_recovery_op(
-    Ref<MOSDFastDispatchOp> m) final;
+    Ref<MOSDFastDispatchOp> m,
+    crimson::net::ConnectionRef conn) final;
 
   interruptible_future<> recover_object(
     const hobject_t& soid,
index ffdb17827063b9891a7906c6a3d80f09d4eefe19..f71d915bb9d7ab6b3e14352b7ba42a0f5430bca7 100644 (file)
@@ -78,7 +78,7 @@ Watch::~Watch()
   logger().debug("{} gid={} cookie={}", __func__, get_watcher_gid(), get_cookie());
 }
 
-seastar::future<> Watch::connect(crimson::net::ConnectionFRef conn, bool)
+seastar::future<> Watch::connect(crimson::net::ConnectionRef conn, bool)
 {
   if (this->conn == conn) {
     logger().debug("conn={} already connected", conn);
@@ -218,7 +218,7 @@ std::ostream &operator<<(std::ostream &out, const notify_reply_t &rhs)
   return out;
 }
 
-Notify::Notify(crimson::net::ConnectionFRef conn,
+Notify::Notify(crimson::net::ConnectionRef conn,
                const notify_info_t& ninfo,
                const uint64_t client_gid,
                const uint64_t user_version)
index a37703569c7ef2c86c57810285493f7ab5a7b1b1..0f7c9df544ac6bee8eb3eb1a6961a350cbcf1667 100644 (file)
@@ -34,7 +34,7 @@ class Watch : public seastar::enable_shared_from_this<Watch> {
   struct private_ctag_t{};
 
   std::set<NotifyRef, std::less<>> in_progress_notifies;
-  crimson::net::ConnectionFRef conn;
+  crimson::net::ConnectionRef conn;
   crimson::osd::ObjectContextRef obc;
 
   watch_info_t winfo;
@@ -67,7 +67,7 @@ public:
   }
   ~Watch();
 
-  seastar::future<> connect(crimson::net::ConnectionFRef, bool);
+  seastar::future<> connect(crimson::net::ConnectionRef, bool);
   void disconnect();
   bool is_alive() const {
     return true;
@@ -131,7 +131,7 @@ std::ostream &operator<<(std::ostream &out, const notify_reply_t &rhs);
 class Notify : public seastar::enable_shared_from_this<Notify> {
   std::set<WatchRef> watchers;
   const notify_info_t ninfo;
-  crimson::net::ConnectionFRef conn;
+  crimson::net::ConnectionRef conn;
   const uint64_t client_gid;
   const uint64_t user_version;
   bool complete{false};
@@ -152,14 +152,14 @@ class Notify : public seastar::enable_shared_from_this<Notify> {
   /// Called on Notify timeout
   void do_notify_timeout();
 
-  Notify(crimson::net::ConnectionFRef conn,
+  Notify(crimson::net::ConnectionRef conn,
          const notify_info_t& ninfo,
          const uint64_t client_gid,
          const uint64_t user_version);
   template <class WatchIteratorT>
   Notify(WatchIteratorT begin,
          WatchIteratorT end,
-         crimson::net::ConnectionFRef conn,
+         crimson::net::ConnectionRef conn,
          const notify_info_t& ninfo,
          const uint64_t client_gid,
          const uint64_t user_version);
@@ -205,7 +205,7 @@ public:
 template <class WatchIteratorT>
 Notify::Notify(WatchIteratorT begin,
                WatchIteratorT end,
-               crimson::net::ConnectionFRef conn,
+               crimson::net::ConnectionRef conn,
                const notify_info_t& ninfo,
                const uint64_t client_gid,
                const uint64_t user_version)
index e10981794b4e8da90fb367c995e8e7559820e266..fdf32b5e09f2bce20439f1c4529316ad1ce95f51 100644 (file)
@@ -314,6 +314,10 @@ Message *decode_message(CephContext *cct,
                         ceph::bufferlist& data,
                         Message::ConnectionRef conn)
 {
+#ifdef WITH_SEASTAR
+  // In crimson, conn is independently maintained outside Message.
+  ceph_assert(conn == nullptr);
+#endif
   // verify crc
   if (crcflags & MSG_CRC_HEADER) {
     __u32 front_crc = front.crc32c(0);
index 6ebf06346c3f06a72f21365904efb92b4fc8733a..9eec1c5bb8385ae176c439734e95eccae596ac3e 100644 (file)
@@ -252,10 +252,8 @@ class Message : public RefCountedObject {
 public:
 #ifdef WITH_SEASTAR
   using ConnectionRef = crimson::net::ConnectionRef;
-  using ConnectionFRef = crimson::net::ConnectionFRef;
 #else
   using ConnectionRef = ::ConnectionRef;
-  using ConnectionFRef = ::ConnectionRef;
 #endif // WITH_SEASTAR
 
 protected:
@@ -276,7 +274,7 @@ protected:
   /* time at which message was fully read */
   utime_t recv_complete_stamp;
 
-  ConnectionFRef connection;
+  ConnectionRef connection;
 
   uint32_t magic = 0;
 
@@ -351,8 +349,18 @@ protected:
       completion_hook->complete(0);
   }
 public:
-  const ConnectionFRef& get_connection() const { return connection; }
+  const ConnectionRef& get_connection() const {
+#ifdef WITH_SEASTAR
+    // In crimson, conn is independently maintained outside Message.
+    ceph_abort();
+#endif
+    return connection;
+  }
   void set_connection(ConnectionRef c) {
+#ifdef WITH_SEASTAR
+    // In crimson, conn is independently maintained outside Message.
+    ceph_assert(c == nullptr);
+#endif
     connection = std::move(c);
   }
   CompletionHook* get_completion_hook() { return completion_hook; }
index 8f14bafb6522573b03358dd083d9ddd024987b06..6fc9c1d7750c80e4527a5a7cf5525c5566c97bf6 100644 (file)
@@ -14,6 +14,7 @@
 #include "crimson/net/Dispatcher.h"
 #include "crimson/net/Messenger.h"
 #include "crimson/net/Interceptor.h"
+#include "crimson/net/SocketConnection.h"
 
 #include <map>
 #include <random>
@@ -495,6 +496,7 @@ using crimson::net::Dispatcher;
 using crimson::net::Interceptor;
 using crimson::net::Messenger;
 using crimson::net::MessengerRef;
+using crimson::net::SocketConnection;
 using crimson::net::SocketPolicy;
 using crimson::net::tag_bp_t;
 using namespace ceph::net::test;
@@ -550,8 +552,8 @@ struct ConnResult {
   unsigned cnt_reset_dispatched = 0;
   unsigned cnt_remote_reset_dispatched = 0;
 
-  ConnResult(Connection& conn, unsigned index)
-    : conn(conn.shared_from_this()), index(index) {}
+  ConnResult(ConnectionRef conn, unsigned index)
+    : conn(conn), index(index) {}
 
   template <typename T>
   void _assert_eq(const char* expr_actual, T actual,
@@ -697,16 +699,23 @@ struct TestInterceptor : public Interceptor {
   }
 
  private:
-  void register_conn(Connection& conn) override {
+  void register_conn(SocketConnection& _conn) override {
+    auto conn = _conn.get_local_shared_foreign_from_this();
+    auto result = find_result(conn);
+    if (result != nullptr) {
+      logger().error("The connection [{}] {} already exists when register {}",
+                     result->index, *result->conn, _conn);
+      ceph_abort();
+    }
     unsigned index = results.size();
     results.emplace_back(conn, index);
-    conns[conn.shared_from_this()] = index;
+    conns[conn] = index;
     notify();
-    logger().info("[{}] {} new connection registered", index, conn);
+    logger().info("[{}] {} new connection registered", index, _conn);
   }
 
-  void register_conn_closed(Connection& conn) override {
-    auto result = find_result(conn.shared_from_this());
+  void register_conn_closed(SocketConnection& conn) override {
+    auto result = find_result(conn.get_local_shared_foreign_from_this());
     if (result == nullptr) {
       logger().error("Untracked closed connection: {}", conn);
       ceph_abort();
@@ -719,8 +728,8 @@ struct TestInterceptor : public Interceptor {
     logger().info("[{}] {} closed({})", result->index, conn, result->state);
   }
 
-  void register_conn_ready(Connection& conn) override {
-    auto result = find_result(conn.shared_from_this());
+  void register_conn_ready(SocketConnection& conn) override {
+    auto result = find_result(conn.get_local_shared_foreign_from_this());
     if (result == nullptr) {
       logger().error("Untracked ready connection: {}", conn);
       ceph_abort();
@@ -731,8 +740,8 @@ struct TestInterceptor : public Interceptor {
     logger().info("[{}] {} ready", result->index, conn);
   }
 
-  void register_conn_replaced(Connection& conn) override {
-    auto result = find_result(conn.shared_from_this());
+  void register_conn_replaced(SocketConnection& conn) override {
+    auto result = find_result(conn.get_local_shared_foreign_from_this());
     if (result == nullptr) {
       logger().error("Untracked replaced connection: {}", conn);
       ceph_abort();
@@ -742,10 +751,10 @@ struct TestInterceptor : public Interceptor {
     logger().info("[{}] {} {}", result->index, conn, result->state);
   }
 
-  bp_action_t intercept(Connection& conn, Breakpoint bp) override {
+  bp_action_t intercept(SocketConnection& conn, Breakpoint bp) override {
     ++breakpoints_counter[bp].counter;
 
-    auto result = find_result(conn.shared_from_this());
+    auto result = find_result(conn.get_local_shared_foreign_from_this());
     if (result == nullptr) {
       logger().error("Untracked intercepted connection: {}, at breakpoint {}({})",
                      conn, bp, breakpoints_counter[bp].counter);
index 1dd910be9a60d372be3d32e35dbbe511aae6a9c4..b3a1d910b5dba0828a6fc4347c4fbacc454b0062 100644 (file)
@@ -118,17 +118,15 @@ class SyntheticDispatcher final
     auto p = m->get_data().cbegin();
     decode(pl, p);
     if (pl.who == Payload::PING) {
-      logger().info(" {} conn= {} {}", __func__,
-        *m->get_connection(), pl);
-      return reply_message(m, pl);
+      logger().info(" {} conn= {} {}", __func__, *con, pl);
+      return reply_message(m, con, pl);
     } else {
       ceph_assert(pl.who == Payload::PONG);
       if (sent.count(pl.seq)) {
-        logger().info(" {} conn= {} {}", __func__,
-          m->get_connection(), pl);
-        ceph_assert(conn_sent[&*m->get_connection()].front() == pl.seq);
+        logger().info(" {} conn= {} {}", __func__, *con, pl);
+        ceph_assert(conn_sent[&*con].front() == pl.seq);
         ceph_assert(pl.data.contents_equal(sent[pl.seq]));
-        conn_sent[&*m->get_connection()].pop_front();
+        conn_sent[&*con].pop_front();
         sent.erase(pl.seq);
       }
 
@@ -150,7 +148,10 @@ class SyntheticDispatcher final
     clear_pending(con);
   }
 
-  std::optional<seastar::future<>> reply_message(const MessageRef m, Payload& pl) {
+  std::optional<seastar::future<>> reply_message(
+      const MessageRef m,
+      crimson::net::ConnectionRef con,
+      Payload& pl) {
     pl.who = Payload::PONG;
     bufferlist bl;
     encode(pl, bl);
@@ -158,9 +159,9 @@ class SyntheticDispatcher final
     rm->set_data(bl);
     if (verbose) {
       logger().info("{} conn= {} reply i= {}",
-        __func__, m->get_connection(), pl.seq);
+        __func__, *con, pl.seq);
     }
-    return m->get_connection()->send(std::move(rm));
+    return con->send(std::move(rm));
   }
 
   seastar::future<> send_message_wrap(crimson::net::ConnectionRef con,