]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: drop a foreign-copy to shard-0 for every pg operation
authorYingxin Cheng <yingxin.cheng@intel.com>
Mon, 11 Dec 2023 06:38:51 +0000 (14:38 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Mon, 22 Jan 2024 08:52:52 +0000 (16:52 +0800)
By using ConnectionRef before pg submission, and after that, change to
use ConnectionXcoreRef.

The intent is to drop the foreign copy of the connection to shard 0 at
pg submission time. This should remove two pairs of crosscore
communications in shard 0 for each I/O, one for connection-ref foreign
copy, another for connection-ref destruction.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
27 files changed:
src/crimson/common/local_shared_foreign_ptr.h
src/crimson/net/Fwd.h
src/crimson/osd/ops_executer.cc
src/crimson/osd/ops_executer.h
src/crimson/osd/osd_operations/client_request.cc
src/crimson/osd/osd_operations/client_request.h
src/crimson/osd/osd_operations/logmissing_request.cc
src/crimson/osd/osd_operations/logmissing_request.h
src/crimson/osd/osd_operations/logmissing_request_reply.cc
src/crimson/osd/osd_operations/logmissing_request_reply.h
src/crimson/osd/osd_operations/peering_event.cc
src/crimson/osd/osd_operations/peering_event.h
src/crimson/osd/osd_operations/recovery_subrequest.cc
src/crimson/osd/osd_operations/recovery_subrequest.h
src/crimson/osd/osd_operations/replicated_request.cc
src/crimson/osd/osd_operations/replicated_request.h
src/crimson/osd/osd_operations/scrub_events.cc
src/crimson/osd/osd_operations/scrub_events.h
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/crimson/osd/pg_shard_manager.h
src/crimson/osd/recovery_backend.cc
src/crimson/osd/recovery_backend.h
src/crimson/osd/replicated_recovery_backend.cc
src/crimson/osd/replicated_recovery_backend.h
src/crimson/osd/watch.cc
src/crimson/osd/watch.h

index c4bd1099a029590628b6344b1f51292d83516bfd..675442273e5b186ced6646a2d558209026c4936b 100644 (file)
@@ -27,9 +27,6 @@ namespace crimson {
  */
 template <typename PtrType>
 class local_shared_foreign_ptr {
-  using element_type = typename std::pointer_traits<PtrType>::element_type;
-  using pointer = element_type*;
-
   seastar::lw_shared_ptr<seastar::foreign_ptr<PtrType>> ptr;
 
   /// Wraps a pointer object and remembers the current core.
@@ -43,6 +40,9 @@ class local_shared_foreign_ptr {
     seastar::foreign_ptr<T> &&);
 
 public:
+  using element_type = typename std::pointer_traits<PtrType>::element_type;
+  using pointer = element_type*;
+
   /// Constructs a null local_shared_foreign_ptr<>.
   local_shared_foreign_ptr() = default;
 
index 3a56cf5bb0a63c5b17037d274c70886aa2ad0dc8..ad8eedd47773dceed3f27695df70e5d7704461aa 100644 (file)
@@ -38,6 +38,8 @@ class Connection;
 using ConnectionLRef = seastar::shared_ptr<Connection>;
 using ConnectionFRef = seastar::foreign_ptr<ConnectionLRef>;
 using ConnectionRef = ::crimson::local_shared_foreign_ptr<ConnectionLRef>;
+using ConnectionFFRef = seastar::foreign_ptr<ConnectionRef>;
+using ConnectionXcoreRef = ::crimson::local_shared_foreign_ptr<ConnectionRef>;
 
 class Dispatcher;
 class ChainedDispatchers;
index 034fdde716935a9b42dfff784f8f75232f7ee206..4e2e1861f9994f7787dfb956bdb0091ea50233fd 100644 (file)
@@ -160,13 +160,13 @@ OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_watch(
   logger().debug("{}", __func__);
   struct connect_ctx_t {
     ObjectContext::watch_key_t key;
-    crimson::net::ConnectionRef conn;
+    crimson::net::ConnectionXcoreRef conn;
     watch_info_t info;
 
     connect_ctx_t(
       const OSDOp& osd_op,
       const ExecutableMessage& msg,
-      crimson::net::ConnectionRef conn)
+      crimson::net::ConnectionXcoreRef conn)
       : key(osd_op.op.watch.cookie, msg.get_reqid().name),
         conn(conn),
         info(create_watch_info(osd_op, msg, conn->get_peer_addr())) {
@@ -323,13 +323,13 @@ OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_notify(
     return crimson::ct_error::enoent::make();
   }
   struct notify_ctx_t {
-    crimson::net::ConnectionRef conn;
+    crimson::net::ConnectionXcoreRef conn;
     notify_info_t ninfo;
     const uint64_t client_gid;
     const epoch_t epoch;
 
     notify_ctx_t(const ExecutableMessage& msg,
-                 crimson::net::ConnectionRef conn)
+                 crimson::net::ConnectionXcoreRef conn)
       : conn(conn),
         client_gid(msg.get_reqid().name.num()),
         epoch(msg.get_map_epoch()) {
@@ -1067,7 +1067,7 @@ OpsExecuter::OpsExecuter(Ref<PG> pg,
                          ObjectContextRef _obc,
                          const OpInfo& op_info,
                          abstracted_msg_t&& msg,
-                         crimson::net::ConnectionRef conn,
+                         crimson::net::ConnectionXcoreRef conn,
                          const SnapContext& _snapc)
   : pg(std::move(pg)),
     obc(std::move(_obc)),
index 1230b1c5a2e58bf3bd705a976020ee25db5e2dc1..556e2f3b94a35ec4fd3eaebe429cf1080eebda00 100644 (file)
@@ -113,10 +113,10 @@ public:
   class ExecutableMessagePimpl final : ExecutableMessage {
     const ImplT* pimpl;
     // In crimson, conn is independently maintained outside Message.
-    const crimson::net::ConnectionRef conn;
+    const crimson::net::ConnectionXcoreRef conn;
   public:
     ExecutableMessagePimpl(const ImplT* pimpl,
-                           const crimson::net::ConnectionRef conn)
+                           const crimson::net::ConnectionXcoreRef conn)
       : pimpl(pimpl), conn(conn) {
     }
 
@@ -185,7 +185,7 @@ private:
     ceph::static_ptr<ExecutableMessage,
                      sizeof(ExecutableMessagePimpl<void>)>;
   abstracted_msg_t msg;
-  crimson::net::ConnectionRef conn;
+  crimson::net::ConnectionXcoreRef conn;
   std::optional<osd_op_params_t> osd_op_params;
   bool user_modify = false;
   ceph::os::Transaction txn;
@@ -372,7 +372,7 @@ private:
               ObjectContextRef obc,
               const OpInfo& op_info,
               abstracted_msg_t&& msg,
-              crimson::net::ConnectionRef conn,
+              crimson::net::ConnectionXcoreRef conn,
               const SnapContext& snapc);
 
 public:
@@ -381,7 +381,7 @@ public:
               ObjectContextRef obc,
               const OpInfo& op_info,
               const MsgT& msg,
-              crimson::net::ConnectionRef conn,
+              crimson::net::ConnectionXcoreRef conn,
               const SnapContext& snapc)
     : OpsExecuter(
         std::move(pg),
index d1eb9fbb350dd499f396b74198afb9d4bddaa246..460f413efe381c57b0aca0e1545dd2ff883032f4 100644 (file)
@@ -49,7 +49,7 @@ ClientRequest::ClientRequest(
   ShardServices &_shard_services, crimson::net::ConnectionRef conn,
   Ref<MOSDOp> &&m)
   : shard_services(&_shard_services),
-    conn(std::move(conn)),
+    l_conn(std::move(conn)),
     m(std::move(m)),
     instance_handle(new instance_handle_t)
 {}
@@ -76,7 +76,8 @@ void ClientRequest::dump_detail(Formatter *f) const
 
 ConnectionPipeline &ClientRequest::get_connection_pipeline()
 {
-  return get_osd_priv(conn.get()).client_request_conn_pipeline;
+  return get_osd_priv(&get_local_connection()
+         ).client_request_conn_pipeline;
 }
 
 PerShardPipeline &ClientRequest::get_pershard_pipeline(
@@ -117,7 +118,7 @@ seastar::future<> ClientRequest::with_pg_int(Ref<PG> pgref)
       PG &pg = *pgref;
       if (pg.can_discard_op(*m)) {
        return shard_services->send_incremental_map(
-         std::ref(*conn), m->get_map_epoch()
+         std::ref(get_foreign_connection()), m->get_map_epoch()
        ).then([FNAME, this, this_instance_id, pgref] {
          DEBUGDPP("{}: discarding {}", *pgref, *this, this_instance_id);
          pgref->client_request_orderer.remove_request(*this);
@@ -196,7 +197,7 @@ ClientRequest::process_pg_op(
     m
   ).then_interruptible([this, pg=std::move(pg)](MURef<MOSDOpReply> reply) {
     // TODO: gate the crosscore sending
-    return conn->send_with_throttling(std::move(reply));
+    return get_foreign_connection().send_with_throttling(std::move(reply));
   });
 }
 
@@ -211,7 +212,7 @@ auto ClientRequest::reply_op_error(const Ref<PG>& pg, int err)
   reply->set_reply_versions(eversion_t(), 0);
   reply->set_op_returns(std::vector<pg_log_op_return_item_t>{});
   // TODO: gate the crosscore sending
-  return conn->send_with_throttling(std::move(reply));
+  return get_foreign_connection().send_with_throttling(std::move(reply));
 }
 
 ClientRequest::interruptible_future<>
@@ -237,7 +238,7 @@ ClientRequest::process_op(
           CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false);
        reply->set_reply_versions(completed->version, completed->user_version);
         // TODO: gate the crosscore sending
-        return conn->send_with_throttling(std::move(reply));
+        return get_foreign_connection().send_with_throttling(std::move(reply));
       } else {
        DEBUGDPP("{}.{}: not completed, entering get_obc stage",
                 *pg, *this, this_instance_id);
@@ -323,9 +324,10 @@ ClientRequest::do_process(
     return reply_op_error(pg, -ENAMETOOLONG);
   } else if (m->get_hobj().oid.name.empty()) {
     return reply_op_error(pg, -EINVAL);
-  } else if (pg->get_osdmap()->is_blocklisted(conn->get_peer_addr())) {
+  } else if (pg->get_osdmap()->is_blocklisted(
+        get_foreign_connection().get_peer_addr())) {
     DEBUGDPP("{}.{}: {} is blocklisted",
-            *pg, *this, this_instance_id, conn->get_peer_addr());
+            *pg, *this, this_instance_id, get_foreign_connection().get_peer_addr());
     return reply_op_error(pg, -EBLOCKLISTED);
   }
 
@@ -361,7 +363,9 @@ ClientRequest::do_process(
               *pg, *this, this_instance_id, m->get_hobj());
     }
   }
-  return pg->do_osd_ops(m, conn, obc, op_info, snapc).safe_then_unpack_interruptible(
+  return pg->do_osd_ops(
+    m, r_conn, obc, op_info, snapc
+  ).safe_then_unpack_interruptible(
     [FNAME, this, pg, this_instance_id, &ihref](
       auto submitted, auto all_completed) mutable {
       return submitted.then_interruptible(
@@ -379,7 +383,9 @@ ClientRequest::do_process(
                 reply=std::move(reply)]() mutable {
                  DEBUGDPP("{}.{}: sending response",
                           *pg, *this, this_instance_id);
-                 return conn->send(std::move(reply));
+                 // TODO: gate the crosscore sending
+                 return get_foreign_connection(
+                      ).send_with_throttling(std::move(reply));
                });
            }, crimson::ct_error::eagain::handle(
              [this, pg, this_instance_id, &ihref]() mutable {
index b374aacbbe940fa2f86603327f1f98b37a18e84d..8bf396232a6656112af71f36daee63657dc24072 100644 (file)
@@ -32,7 +32,9 @@ class ClientRequest final : public PhasedOperationT<ClientRequest>,
   // Initially set to primary core, updated to pg core after with_pg()
   ShardServices *shard_services = nullptr;
 
-  crimson::net::ConnectionRef conn;
+  crimson::net::ConnectionRef l_conn;
+  crimson::net::ConnectionXcoreRef r_conn;
+
   // must be after conn due to ConnectionPipeline's life-time
   Ref<MOSDOp> m;
   OpInfo op_info;
@@ -224,22 +226,31 @@ public:
 
   PerShardPipeline &get_pershard_pipeline(ShardServices &);
 
-  crimson::net::Connection &get_connection() {
-    assert(conn);
-    return *conn;
+  crimson::net::Connection &get_local_connection() {
+    assert(l_conn);
+    assert(!r_conn);
+    return *l_conn;
   };
 
-  seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
-    assert(conn);
-    return conn.get_foreign(
-    ).then([this](auto f_conn) {
-      conn.reset();
-      return f_conn;
-    });
+  crimson::net::Connection &get_foreign_connection() {
+    assert(r_conn);
+    assert(!l_conn);
+    return *r_conn;
+  };
+
+  crimson::net::ConnectionFFRef prepare_remote_submission() {
+    assert(l_conn);
+    assert(!r_conn);
+    auto ret = seastar::make_foreign(std::move(l_conn));
+    l_conn.reset();
+    return ret;
   }
-  void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
-    assert(!conn);
-    conn = make_local_shared_foreign(std::move(_conn));
+
+  void finish_remote_submission(crimson::net::ConnectionFFRef conn) {
+    assert(conn);
+    assert(!l_conn);
+    assert(!r_conn);
+    r_conn = make_local_shared_foreign(std::move(conn));
   }
 
   seastar::future<> with_pg_int(Ref<PG> pg);
index 5f92ccddfcdc2f879b634195e4267e4cf14daf12..7e979131f0604288ab5fdb9343a0cfdb2495c19d 100644 (file)
@@ -22,7 +22,7 @@ namespace crimson::osd {
 
 LogMissingRequest::LogMissingRequest(crimson::net::ConnectionRef&& conn,
                       Ref<MOSDPGUpdateLogMissing> &&req)
-  : conn{std::move(conn)},
+  : l_conn{std::move(conn)},
     req{std::move(req)}
 {}
 
@@ -48,7 +48,8 @@ void LogMissingRequest::dump_detail(Formatter *f) const
 
 ConnectionPipeline &LogMissingRequest::get_connection_pipeline()
 {
-  return get_osd_priv(conn.get()).replicated_request_conn_pipeline;
+  return get_osd_priv(&get_local_connection()
+         ).client_request_conn_pipeline;
 }
 
 PerShardPipeline &LogMissingRequest::get_pershard_pipeline(
@@ -81,7 +82,7 @@ seastar::future<> LogMissingRequest::with_pg(
           std::move(trigger), req->min_epoch);
       });
     }).then_interruptible([this, pg](auto) {
-      return pg->do_update_log_missing(req, conn);
+      return pg->do_update_log_missing(req, r_conn);
     }).then_interruptible([this] {
       logger().debug("{}: complete", *this);
       return handle.complete();
index 5b01fee17b868adcf8502336202ad7c85f8cae78..51c9d540cb5ac329ef8da4d9a4c23ac1bcb65438 100644 (file)
@@ -41,22 +41,31 @@ public:
 
   PerShardPipeline &get_pershard_pipeline(ShardServices &);
 
-  crimson::net::Connection &get_connection() {
-    assert(conn);
-    return *conn;
+  crimson::net::Connection &get_local_connection() {
+    assert(l_conn);
+    assert(!r_conn);
+    return *l_conn;
   };
 
-  seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
-    assert(conn);
-    return conn.get_foreign(
-    ).then([this](auto f_conn) {
-      conn.reset();
-      return f_conn;
-    });
+  crimson::net::Connection &get_foreign_connection() {
+    assert(r_conn);
+    assert(!l_conn);
+    return *r_conn;
+  };
+
+  crimson::net::ConnectionFFRef prepare_remote_submission() {
+    assert(l_conn);
+    assert(!r_conn);
+    auto ret = seastar::make_foreign(std::move(l_conn));
+    l_conn.reset();
+    return ret;
   }
-  void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
-    assert(!conn);
-    conn = make_local_shared_foreign(std::move(_conn));
+
+  void finish_remote_submission(crimson::net::ConnectionFFRef conn) {
+    assert(conn);
+    assert(!l_conn);
+    assert(!r_conn);
+    r_conn = make_local_shared_foreign(std::move(conn));
   }
 
   seastar::future<> with_pg(
@@ -77,7 +86,9 @@ public:
 private:
   ClientRequest::PGPipeline &client_pp(PG &pg);
 
-  crimson::net::ConnectionRef conn;
+  crimson::net::ConnectionRef l_conn;
+  crimson::net::ConnectionXcoreRef r_conn;
+
   // must be after `conn` to ensure the ConnectionPipeline's is alive
   PipelineHandle handle;
   Ref<MOSDPGUpdateLogMissing> req;
index 20a9bf7dd00e62e666696a3649a758d44c8e8ead..39b3debf10f596daa2b6f08d3e54d7d143719250 100644 (file)
@@ -21,7 +21,7 @@ namespace crimson::osd {
 LogMissingRequestReply::LogMissingRequestReply(
   crimson::net::ConnectionRef&& conn,
   Ref<MOSDPGUpdateLogMissingReply> &&req)
-  : conn{std::move(conn)},
+  : l_conn{std::move(conn)},
     req{std::move(req)}
 {}
 
@@ -46,7 +46,8 @@ void LogMissingRequestReply::dump_detail(Formatter *f) const
 
 ConnectionPipeline &LogMissingRequestReply::get_connection_pipeline()
 {
-  return get_osd_priv(conn.get()).replicated_request_conn_pipeline;
+  return get_osd_priv(&get_local_connection()
+         ).replicated_request_conn_pipeline;
 }
 
 PerShardPipeline &LogMissingRequestReply::get_pershard_pipeline(
index b01cae15421db4cdaf449d47d4ee04634ff1fab8..c741b41bd0f5730e63eafdd83272eeed7cda1acf 100644 (file)
@@ -41,22 +41,31 @@ public:
 
   PerShardPipeline &get_pershard_pipeline(ShardServices &);
 
-  crimson::net::Connection &get_connection() {
-    assert(conn);
-    return *conn;
+  crimson::net::Connection &get_local_connection() {
+    assert(l_conn);
+    assert(!r_conn);
+    return *l_conn;
   };
 
-  seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
-    assert(conn);
-    return conn.get_foreign(
-    ).then([this](auto f_conn) {
-      conn.reset();
-      return f_conn;
-    });
+  crimson::net::Connection &get_foreign_connection() {
+    assert(r_conn);
+    assert(!l_conn);
+    return *r_conn;
+  };
+
+  crimson::net::ConnectionFFRef prepare_remote_submission() {
+    assert(l_conn);
+    assert(!r_conn);
+    auto ret = seastar::make_foreign(std::move(l_conn));
+    l_conn.reset();
+    return ret;
   }
-  void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
-    assert(!conn);
-    conn = make_local_shared_foreign(std::move(_conn));
+
+  void finish_remote_submission(crimson::net::ConnectionFFRef conn) {
+    assert(conn);
+    assert(!l_conn);
+    assert(!r_conn);
+    r_conn = make_local_shared_foreign(std::move(conn));
   }
 
   seastar::future<> with_pg(
@@ -75,7 +84,9 @@ public:
 private:
   ClientRequest::PGPipeline &client_pp(PG &pg);
 
-  crimson::net::ConnectionRef conn;
+  crimson::net::ConnectionRef l_conn;
+  crimson::net::ConnectionXcoreRef r_conn;
+
   // must be after `conn` to ensure the ConnectionPipeline's is alive
   PipelineHandle handle;
   Ref<MOSDPGUpdateLogMissingReply> req;
index f12699e5e6a216c0e50e8822c60dac25f96d61b1..5c5c73e008676447029699fa12ae5b92159b2360 100644 (file)
@@ -135,7 +135,8 @@ PeeringEvent<T>::complete_rctx(ShardServices &shard_services, Ref<PG> pg)
 
 ConnectionPipeline &RemotePeeringEvent::get_connection_pipeline()
 {
-  return get_osd_priv(conn.get()).peering_request_conn_pipeline;
+  return get_osd_priv(&get_local_connection()
+         ).client_request_conn_pipeline;
 }
 
 PerShardPipeline &RemotePeeringEvent::get_pershard_pipeline(
index dad1076a93bb4800508c5aad9e637fa12e25027b..1e6bd957289ff87274b2d8491bb90795dbe87085 100644 (file)
@@ -101,7 +101,9 @@ public:
 
 class RemotePeeringEvent : public PeeringEvent<RemotePeeringEvent> {
 protected:
-  crimson::net::ConnectionRef conn;
+  crimson::net::ConnectionRef l_conn;
+  crimson::net::ConnectionXcoreRef r_conn;
+
   // must be after conn due to ConnectionPipeline's life-time
   PipelineHandle handle;
 
@@ -117,7 +119,7 @@ public:
   template <typename... Args>
   RemotePeeringEvent(crimson::net::ConnectionRef conn, Args&&... args) :
     PeeringEvent(std::forward<Args>(args)...),
-    conn(conn)
+    l_conn(conn)
   {}
 
   std::tuple<
@@ -146,22 +148,31 @@ public:
 
   PerShardPipeline &get_pershard_pipeline(ShardServices &);
 
-  crimson::net::Connection &get_connection() {
-    assert(conn);
-    return *conn;
+  crimson::net::Connection &get_local_connection() {
+    assert(l_conn);
+    assert(!r_conn);
+    return *l_conn;
   };
 
-  seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
-    assert(conn);
-    return conn.get_foreign(
-    ).then([this](auto f_conn) {
-      conn.reset();
-      return f_conn;
-    });
+  crimson::net::Connection &get_foreign_connection() {
+    assert(r_conn);
+    assert(!l_conn);
+    return *r_conn;
+  };
+
+  crimson::net::ConnectionFFRef prepare_remote_submission() {
+    assert(l_conn);
+    assert(!r_conn);
+    auto ret = seastar::make_foreign(std::move(l_conn));
+    l_conn.reset();
+    return ret;
   }
-  void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
-    assert(!conn);
-    conn = make_local_shared_foreign(std::move(_conn));
+
+  void finish_remote_submission(crimson::net::ConnectionFFRef conn) {
+    assert(conn);
+    assert(!l_conn);
+    assert(!r_conn);
+    r_conn = make_local_shared_foreign(std::move(conn));
   }
 };
 
index 1ea7ae12869efd63b1ac5b97425a9f5c94da98f2..06a3be1662c26800d185e34d0c6eb67a7f088084 100644 (file)
@@ -35,7 +35,7 @@ seastar::future<> RecoverySubRequest::with_pg(
   return interruptor::with_interruption([this, pgref] {
     LOG_PREFIX(RecoverySubRequest::with_pg);
     DEBUGI("{}: {}", "RecoverySubRequest::with_pg", *this);
-    return pgref->get_recovery_backend()->handle_recovery_op(m, conn
+    return pgref->get_recovery_backend()->handle_recovery_op(m, r_conn
     ).then_interruptible([this] {
       LOG_PREFIX(RecoverySubRequest::with_pg);
       DEBUGI("{}: complete", *this);
@@ -52,7 +52,8 @@ seastar::future<> RecoverySubRequest::with_pg(
 
 ConnectionPipeline &RecoverySubRequest::get_connection_pipeline()
 {
-  return get_osd_priv(conn.get()).peering_request_conn_pipeline;
+  return get_osd_priv(&get_local_connection()
+         ).client_request_conn_pipeline;
 }
 
 PerShardPipeline &RecoverySubRequest::get_pershard_pipeline(
index 31e6045cb0eeb2c65be07fc197a95acab5d3c166..17c2faf97ea98af3b2c785d8615c10f6739178bc 100644 (file)
@@ -22,7 +22,7 @@ public:
   RecoverySubRequest(
     crimson::net::ConnectionRef conn,
     Ref<MOSDFastDispatchOp>&& m)
-    : conn(conn), m(m) {}
+    : l_conn(conn), m(m) {}
 
   void print(std::ostream& out) const final
   {
@@ -44,22 +44,31 @@ public:
 
   PerShardPipeline &get_pershard_pipeline(ShardServices &);
 
-  crimson::net::Connection &get_connection() {
-    assert(conn);
-    return *conn;
+  crimson::net::Connection &get_local_connection() {
+    assert(l_conn);
+    assert(!r_conn);
+    return *l_conn;
   };
 
-  seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
-    assert(conn);
-    return conn.get_foreign(
-    ).then([this](auto f_conn) {
-      conn.reset();
-      return f_conn;
-    });
+  crimson::net::Connection &get_foreign_connection() {
+    assert(r_conn);
+    assert(!l_conn);
+    return *r_conn;
+  };
+
+  crimson::net::ConnectionFFRef prepare_remote_submission() {
+    assert(l_conn);
+    assert(!r_conn);
+    auto ret = seastar::make_foreign(std::move(l_conn));
+    l_conn.reset();
+    return ret;
   }
-  void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
-    assert(!conn);
-    conn = make_local_shared_foreign(std::move(_conn));
+
+  void finish_remote_submission(crimson::net::ConnectionFFRef conn) {
+    assert(conn);
+    assert(!l_conn);
+    assert(!r_conn);
+    r_conn = make_local_shared_foreign(std::move(conn));
   }
 
   seastar::future<> with_pg(
@@ -77,7 +86,9 @@ public:
   > tracking_events;
 
 private:
-  crimson::net::ConnectionRef conn;
+  crimson::net::ConnectionRef l_conn;
+  crimson::net::ConnectionXcoreRef r_conn;
+
   // must be after `conn` to ensure the ConnectionPipeline's is alive
   PipelineHandle handle;
   Ref<MOSDFastDispatchOp> m;
index 4e97db5be982cd1084e5899405e3a220054f5bea..dc2adc37efefb2c02a8acda3d5202ed9f979142d 100644 (file)
@@ -22,7 +22,7 @@ namespace crimson::osd {
 
 RepRequest::RepRequest(crimson::net::ConnectionRef&& conn,
                       Ref<MOSDRepOp> &&req)
-  : conn{std::move(conn)},
+  : l_conn{std::move(conn)},
     req{std::move(req)}
 {}
 
@@ -48,7 +48,8 @@ void RepRequest::dump_detail(Formatter *f) const
 
 ConnectionPipeline &RepRequest::get_connection_pipeline()
 {
-  return get_osd_priv(conn.get()).replicated_request_conn_pipeline;
+  return get_osd_priv(&get_local_connection()
+         ).client_request_conn_pipeline;
 }
 
 PerShardPipeline &RepRequest::get_pershard_pipeline(
index 32cf271788ba8dd574650ca53805a1229336ceb9..ff5dea6d6db3153246bf6760b4d356aa5a2d22c6 100644 (file)
@@ -41,22 +41,31 @@ public:
 
   PerShardPipeline &get_pershard_pipeline(ShardServices &);
 
-  crimson::net::Connection &get_connection() {
-    assert(conn);
-    return *conn;
+  crimson::net::Connection &get_local_connection() {
+    assert(l_conn);
+    assert(!r_conn);
+    return *l_conn;
   };
 
-  seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
-    assert(conn);
-    return conn.get_foreign(
-    ).then([this](auto f_conn) {
-      conn.reset();
-      return f_conn;
-    });
+  crimson::net::Connection &get_foreign_connection() {
+    assert(r_conn);
+    assert(!l_conn);
+    return *r_conn;
+  };
+
+  crimson::net::ConnectionFFRef prepare_remote_submission() {
+    assert(l_conn);
+    assert(!r_conn);
+    auto ret = seastar::make_foreign(std::move(l_conn));
+    l_conn.reset();
+    return ret;
   }
-  void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
-    assert(!conn);
-    conn = make_local_shared_foreign(std::move(_conn));
+
+  void finish_remote_submission(crimson::net::ConnectionFFRef conn) {
+    assert(conn);
+    assert(!l_conn);
+    assert(!r_conn);
+    r_conn = make_local_shared_foreign(std::move(conn));
   }
 
   seastar::future<> with_pg(
@@ -77,7 +86,9 @@ public:
 private:
   ClientRequest::PGPipeline &client_pp(PG &pg);
 
-  crimson::net::ConnectionRef conn;
+  crimson::net::ConnectionRef l_conn;
+  crimson::net::ConnectionXcoreRef r_conn;
+
   PipelineHandle handle;
   Ref<MOSDRepOp> req;
 };
index 4f54cf0b274f304af642c881e95689c98105a029..9246233448b5bf152e0f75a24d7674cc2916e9d6 100644 (file)
@@ -20,7 +20,8 @@ PGPeeringPipeline &RemoteScrubEventBaseT<T>::get_peering_pipeline(PG &pg)
 template <class T>
 ConnectionPipeline &RemoteScrubEventBaseT<T>::get_connection_pipeline()
 {
-  return get_osd_priv(conn.get()).peering_request_conn_pipeline;
+  return get_osd_priv(&get_local_connection()
+         ).peering_request_conn_pipeline;
 }
 
 template <class T>
index 0793983d8c66230632752b3f89e7cd3cbcb3d176..6aa7035d18562868e830567081ae4d8fc12d8852 100644 (file)
@@ -24,7 +24,9 @@ class RemoteScrubEventBaseT : public PhasedOperationT<T> {
 
   PipelineHandle handle;
 
-  crimson::net::ConnectionRef conn;
+  crimson::net::ConnectionRef l_conn;
+  crimson::net::ConnectionXcoreRef r_conn;
+
   epoch_t epoch;
   spg_t pgid;
 
@@ -38,17 +40,41 @@ protected:
 public:
   RemoteScrubEventBaseT(
     crimson::net::ConnectionRef conn, epoch_t epoch, spg_t pgid)
-    : conn(conn), epoch(epoch), pgid(pgid) {}
+    : l_conn(std::move(conn)), epoch(epoch), pgid(pgid) {}
 
   PGPeeringPipeline &get_peering_pipeline(PG &pg);
+
   ConnectionPipeline &get_connection_pipeline();
+
   PerShardPipeline &get_pershard_pipeline(ShardServices &);
 
-  crimson::net::Connection &get_connection() {
-    assert(conn);
-    return *conn;
+  crimson::net::Connection &get_local_connection() {
+    assert(l_conn);
+    assert(!r_conn);
+    return *l_conn;
   };
 
+  crimson::net::Connection &get_foreign_connection() {
+    assert(r_conn);
+    assert(!l_conn);
+    return *r_conn;
+  };
+
+  crimson::net::ConnectionFFRef prepare_remote_submission() {
+    assert(l_conn);
+    assert(!r_conn);
+    auto ret = seastar::make_foreign(std::move(l_conn));
+    l_conn.reset();
+    return ret;
+  }
+
+  void finish_remote_submission(crimson::net::ConnectionFFRef conn) {
+    assert(conn);
+    assert(!l_conn);
+    assert(!r_conn);
+    r_conn = make_local_shared_foreign(std::move(conn));
+  }
+
   static constexpr bool can_create() { return false; }
 
   spg_t get_pgid() const {
@@ -58,19 +84,6 @@ public:
   PipelineHandle &get_handle() { return handle; }
   epoch_t get_epoch() const { return epoch; }
 
-  seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() {
-    assert(conn);
-    return conn.get_foreign(
-    ).then([this](auto f_conn) {
-      conn.reset();
-      return f_conn;
-    });
-  }
-  void finish_remote_submission(crimson::net::ConnectionFRef _conn) {
-    assert(!conn);
-    conn = make_local_shared_foreign(std::move(_conn));
-  }
-
   seastar::future<> with_pg(
     ShardServices &shard_services, Ref<PG> pg);
 
index 7638b2fcdbeb2c75f771639a8eb673cf9e9e1777..8d12c42873843bc3117a976dbf5f466e5c3a9a1c 100644 (file)
@@ -1059,7 +1059,7 @@ seastar::future<std::optional<eversion_t>> PG::submit_error_log(
 PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<MURef<MOSDOpReply>>>
 PG::do_osd_ops(
   Ref<MOSDOp> m,
-  crimson::net::ConnectionRef conn,
+  crimson::net::ConnectionXcoreRef conn,
   ObjectContextRef obc,
   const OpInfo &op_info,
   const SnapContext& snapc)
@@ -1382,7 +1382,7 @@ void PG::handle_rep_op_reply(const MOSDRepOpReply& m)
 
 PG::interruptible_future<> PG::do_update_log_missing(
   Ref<MOSDPGUpdateLogMissing> m,
-  crimson::net::ConnectionRef conn)
+  crimson::net::ConnectionXcoreRef conn)
 {
   if (__builtin_expect(stopping, false)) {
     return seastar::make_exception_future<>(
index b829ea177dbbfafd19c2798f135a63671f02da5a..e103d3bb5eec1a09b2113cfb734ec29edfdccafc 100644 (file)
@@ -526,7 +526,7 @@ public:
   void handle_rep_op_reply(const MOSDRepOpReply& m);
   interruptible_future<> do_update_log_missing(
     Ref<MOSDPGUpdateLogMissing> m,
-    crimson::net::ConnectionRef conn);
+    crimson::net::ConnectionXcoreRef conn);
   interruptible_future<> do_update_log_missing_reply(
                          Ref<MOSDPGUpdateLogMissingReply> m);
 
@@ -570,7 +570,7 @@ private:
                do_osd_ops_iertr::future<Ret>>;
   do_osd_ops_iertr::future<pg_rep_op_fut_t<MURef<MOSDOpReply>>> do_osd_ops(
     Ref<MOSDOp> m,
-    crimson::net::ConnectionRef conn,
+    crimson::net::ConnectionXcoreRef conn,
     ObjectContextRef obc,
     const OpInfo &op_info,
     const SnapContext& snapc);
@@ -798,7 +798,7 @@ private:
 };
 
 struct PG::do_osd_ops_params_t {
-  crimson::net::ConnectionRef &get_connection() const {
+  crimson::net::ConnectionXcoreRef &get_connection() const {
     return conn;
   }
   osd_reqid_t get_reqid() const {
@@ -826,7 +826,7 @@ struct PG::do_osd_ops_params_t {
     return orig_source_inst.name;
   }
 
-  crimson::net::ConnectionRef &conn;
+  crimson::net::ConnectionXcoreRef &conn;
   osd_reqid_t reqid;
   utime_t mtime;
   epoch_t map_epoch;
index 1559dde0cb1b3519291b20e0aeb4ca4f51014a26..011a11605fd97a428cd8e57448937b6c82fce817 100644 (file)
@@ -156,7 +156,8 @@ public:
       ShardServices &target_shard_services,
       typename T::IRef &&op,
       F &&f) {
-    auto &crosscore_ordering = get_osd_priv(&op->get_connection()).crosscore_ordering;
+    auto &crosscore_ordering = get_osd_priv(
+        &op->get_foreign_connection()).crosscore_ordering;
     if (crosscore_ordering.proceed_or_wait(cc_seq)) {
       return std::invoke(
         std::move(f),
@@ -182,6 +183,8 @@ public:
       F &&f) {
     ceph_assert(op->use_count() == 1);
     if (seastar::this_shard_id() == core) {
+      auto f_conn = op->prepare_remote_submission();
+      op->finish_remote_submission(std::move(f_conn));
       auto &target_shard_services = shard_services.local();
       return std::invoke(
         std::move(f),
@@ -190,17 +193,17 @@ public:
     }
     // Note: the ordering in only preserved until f is invoked.
     auto &opref = *op;
-    auto &crosscore_ordering = get_osd_priv(&opref.get_connection()).crosscore_ordering;
+    auto &crosscore_ordering = get_osd_priv(
+        &opref.get_local_connection()).crosscore_ordering;
     auto cc_seq = crosscore_ordering.prepare_submit(core);
     auto &logger = crimson::get_logger(ceph_subsys_osd);
     logger.debug("{}: send {} to the remote pg core {}",
                  opref, cc_seq, core);
     return opref.get_handle().complete(
-    ).then([&opref, this] {
-      get_local_state().registry.remove_from_registry(opref);
-      return opref.prepare_remote_submission();
-    }).then([op=std::move(op), f=std::move(f), this, core, cc_seq
-            ](auto f_conn) mutable {
+    ).then([this, core, cc_seq,
+            op=std::move(op), f=std::move(f)]() mutable {
+      get_local_state().registry.remove_from_registry(*op);
+      auto f_conn = op->prepare_remote_submission();
       return shard_services.invoke_on(
         core,
         [this, cc_seq,
index b5394bfdc485dfc63b21e28cb7fc7cbd780a56e1..e5ba9f8d562b5b36e8058997ccaebe7205d09083 100644 (file)
@@ -70,7 +70,7 @@ void RecoveryBackend::WaitForObjectRecovery::stop() {
 
 void RecoveryBackend::handle_backfill_finish(
   MOSDPGBackfill& m,
-  crimson::net::ConnectionRef conn)
+  crimson::net::ConnectionXcoreRef conn)
 {
   logger().debug("{}", __func__);
   ceph_assert(!pg.is_primary());
@@ -125,7 +125,7 @@ RecoveryBackend::handle_backfill_finish_ack(
 RecoveryBackend::interruptible_future<>
 RecoveryBackend::handle_backfill(
   MOSDPGBackfill& m,
-  crimson::net::ConnectionRef conn)
+  crimson::net::ConnectionXcoreRef conn)
 {
   logger().debug("{}", __func__);
   if (pg.old_peering_msg(m.map_epoch, m.query_epoch)) {
@@ -227,7 +227,7 @@ RecoveryBackend::scan_for_backfill(
 RecoveryBackend::interruptible_future<>
 RecoveryBackend::handle_scan_get_digest(
   MOSDPGScan& m,
-  crimson::net::ConnectionRef conn)
+  crimson::net::ConnectionXcoreRef conn)
 {
   logger().debug("{}", __func__);
   if (false /* FIXME: check for backfill too full */) {
@@ -289,7 +289,7 @@ RecoveryBackend::handle_scan_digest(
 RecoveryBackend::interruptible_future<>
 RecoveryBackend::handle_scan(
   MOSDPGScan& m,
-  crimson::net::ConnectionRef conn)
+  crimson::net::ConnectionXcoreRef conn)
 {
   logger().debug("{}", __func__);
   if (pg.old_peering_msg(m.map_epoch, m.query_epoch)) {
@@ -311,7 +311,7 @@ RecoveryBackend::handle_scan(
 RecoveryBackend::interruptible_future<>
 RecoveryBackend::handle_recovery_op(
   Ref<MOSDFastDispatchOp> m,
-  crimson::net::ConnectionRef conn)
+  crimson::net::ConnectionXcoreRef conn)
 {
   switch (m->get_header().type) {
   case MSG_OSD_PG_BACKFILL:
index abf6958915966d51ece04ceee3e3726f3a09eded..c65bf56539241b6b10a638f3dc9a191334853bdb 100644 (file)
@@ -66,7 +66,7 @@ public:
 
   virtual interruptible_future<> handle_recovery_op(
     Ref<MOSDFastDispatchOp> m,
-    crimson::net::ConnectionRef conn);
+    crimson::net::ConnectionXcoreRef conn);
 
   virtual interruptible_future<> recover_object(
     const hobject_t& soid,
@@ -212,22 +212,22 @@ protected:
 private:
   void handle_backfill_finish(
     MOSDPGBackfill& m,
-    crimson::net::ConnectionRef conn);
+    crimson::net::ConnectionXcoreRef conn);
   interruptible_future<> handle_backfill_progress(
     MOSDPGBackfill& m);
   interruptible_future<> handle_backfill_finish_ack(
     MOSDPGBackfill& m);
   interruptible_future<> handle_backfill(
     MOSDPGBackfill& m,
-    crimson::net::ConnectionRef conn);
+    crimson::net::ConnectionXcoreRef conn);
 
   interruptible_future<> handle_scan_get_digest(
     MOSDPGScan& m,
-    crimson::net::ConnectionRef conn);
+    crimson::net::ConnectionXcoreRef conn);
   interruptible_future<> handle_scan_digest(
     MOSDPGScan& m);
   interruptible_future<> handle_scan(
     MOSDPGScan& m,
-    crimson::net::ConnectionRef conn);
+    crimson::net::ConnectionXcoreRef conn);
   interruptible_future<> handle_backfill_remove(MOSDPGBackfillRemove& m);
 };
index 9a1060c440e2736c0c568f4b052a4b2c57788ffe..a62cb793272d5fc836b0b2c2c8311d23ef192c79 100644 (file)
@@ -1305,7 +1305,7 @@ ReplicatedRecoveryBackend::handle_recovery_delete_reply(
 RecoveryBackend::interruptible_future<>
 ReplicatedRecoveryBackend::handle_recovery_op(
   Ref<MOSDFastDispatchOp> m,
-  crimson::net::ConnectionRef conn)
+  crimson::net::ConnectionXcoreRef conn)
 {
   switch (m->get_header().type) {
   case MSG_OSD_PG_PULL:
index cd1d9e067289e3c240d331e70d1cf7d329a528b1..aac4531ee95ab39583e3a3610d29f3246324c940 100644 (file)
@@ -25,7 +25,7 @@ public:
   {}
   interruptible_future<> handle_recovery_op(
     Ref<MOSDFastDispatchOp> m,
-    crimson::net::ConnectionRef conn) final;
+    crimson::net::ConnectionXcoreRef conn) final;
 
   interruptible_future<> recover_object(
     const hobject_t& soid,
index 4573333c3dab7ec1ea238007a9f080e9230cf7bf..088054c5a6ed6fc4c5b38eea1c4a7c9daa7b1273 100644 (file)
@@ -78,7 +78,7 @@ Watch::~Watch()
   logger().debug("{} gid={} cookie={}", __func__, get_watcher_gid(), get_cookie());
 }
 
-seastar::future<> Watch::connect(crimson::net::ConnectionRef conn, bool)
+seastar::future<> Watch::connect(crimson::net::ConnectionXcoreRef conn, bool)
 {
   if (this->conn == conn) {
     logger().debug("conn={} already connected", conn);
@@ -235,7 +235,7 @@ std::ostream &operator<<(std::ostream &out, const notify_reply_t &rhs)
   return out;
 }
 
-Notify::Notify(crimson::net::ConnectionRef conn,
+Notify::Notify(crimson::net::ConnectionXcoreRef conn,
                const notify_info_t& ninfo,
                const uint64_t client_gid,
                const uint64_t user_version)
index b3982141d86e82da02cd51740fad8ee81f275709..64708febd09ce101f91502ca4e73103ff1ddd844 100644 (file)
@@ -34,7 +34,7 @@ class Watch : public seastar::enable_shared_from_this<Watch> {
   struct private_ctag_t{};
 
   std::set<NotifyRef, std::less<>> in_progress_notifies;
-  crimson::net::ConnectionRef conn;
+  crimson::net::ConnectionXcoreRef conn;
   crimson::osd::ObjectContextRef obc;
 
   watch_info_t winfo;
@@ -67,7 +67,7 @@ public:
   }
   ~Watch();
 
-  seastar::future<> connect(crimson::net::ConnectionRef, bool);
+  seastar::future<> connect(crimson::net::ConnectionXcoreRef, bool);
   void disconnect();
   bool is_alive() const {
     return true;
@@ -131,7 +131,7 @@ std::ostream &operator<<(std::ostream &out, const notify_reply_t &rhs);
 class Notify : public seastar::enable_shared_from_this<Notify> {
   std::set<WatchRef> watchers;
   const notify_info_t ninfo;
-  crimson::net::ConnectionRef conn;
+  crimson::net::ConnectionXcoreRef conn;
   const uint64_t client_gid;
   const uint64_t user_version;
   bool complete{false};
@@ -154,14 +154,14 @@ class Notify : public seastar::enable_shared_from_this<Notify> {
   /// Called on Notify timeout
   void do_notify_timeout();
 
-  Notify(crimson::net::ConnectionRef conn,
+  Notify(crimson::net::ConnectionXcoreRef conn,
          const notify_info_t& ninfo,
          const uint64_t client_gid,
          const uint64_t user_version);
   template <class WatchIteratorT>
   Notify(WatchIteratorT begin,
          WatchIteratorT end,
-         crimson::net::ConnectionRef conn,
+         crimson::net::ConnectionXcoreRef conn,
          const notify_info_t& ninfo,
          const uint64_t client_gid,
          const uint64_t user_version);
@@ -207,7 +207,7 @@ public:
 template <class WatchIteratorT>
 Notify::Notify(WatchIteratorT begin,
                WatchIteratorT end,
-               crimson::net::ConnectionRef conn,
+               crimson::net::ConnectionXcoreRef conn,
                const notify_info_t& ninfo,
                const uint64_t client_gid,
                const uint64_t user_version)