]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/os/futurized_store: support cross core store calling
authorchunmei liu <chunmei.liu@ibm.com>
Wed, 16 Jul 2025 03:32:21 +0000 (20:32 -0700)
committerChunmei Liu <chunmei.liu@ibm.com>
Thu, 27 Nov 2025 09:38:28 +0000 (09:38 +0000)
Signed-off-by: chunmei liu <chunmei.liu@ibm.com>
src/crimson/os/futurized_store.cc
src/crimson/os/futurized_store.h

index edca056c18a890a8d24b7315988b77352ef10050..da4aa5bf8dc24288d771ee480c72648b9d29b3d3 100644 (file)
@@ -33,4 +33,35 @@ FuturizedStore::create(const std::string& type,
   }
 }
 
+seastar::future<> with_store_do_transaction(
+  crimson::os::FuturizedStore::StoreShardRef store,
+  FuturizedStore::Shard::CollectionRef ch,
+  ceph::os::Transaction&& txn)
+{
+  std::unique_ptr<Context> on_commit(
+    ceph::os::Transaction::collect_all_contexts(txn));
+  const auto original_core = seastar::this_shard_id();
+  if (store.get_owner_shard() == seastar::this_shard_id()) {
+    return store->do_transaction_no_callbacks(
+      std::move(ch), std::move(txn)
+    ).then([on_commit=std::move(on_commit)]() mutable {
+      auto c = on_commit.release();
+      if (c) c->complete(0);
+      return seastar::now();
+    });
+  } else {
+    return seastar::smp::submit_to(
+      store.get_owner_shard(),
+      [f_store=store.get(), ch=std::move(ch), txn=std::move(txn)]() mutable {
+      return f_store->do_transaction_no_callbacks(
+        std::move(ch), std::move(txn));
+    }).then([original_core, on_commit=std::move(on_commit)]() mutable {
+      return seastar::smp::submit_to(original_core, [on_commit=std::move(on_commit)]() mutable {
+        auto c = on_commit.release();
+        if (c) c->complete(0);
+        return seastar::now();
+      });
+    });
+  }
+}
 }
index 851dc60e367e6466e87f31d1a120879a9134a036..eb932fca72a33b6095169e8fdeb9d6cfd1db19e5 100644 (file)
@@ -149,7 +149,7 @@ public:
     virtual seastar::future<> set_collection_opts(CollectionRef c,
                                         const pool_opts_t& opts) = 0;
 
-  protected:
+  public:
     virtual seastar::future<> do_transaction_no_callbacks(
       CollectionRef ch,
       ceph::os::Transaction&& txn) = 0;
@@ -259,4 +259,67 @@ public:
 protected:
   const core_id_t primary_core;
 };
+
+template<auto MemberFunc, typename... Args>
+auto with_store(crimson::os::FuturizedStore::StoreShardRef store, Args&&... args)
+{
+  using raw_return_type = decltype((std::declval<crimson::os::FuturizedStore::Shard>().*MemberFunc)(std::forward<Args>(args)...));
+
+  constexpr bool is_errorator = is_errorated_future_v<raw_return_type>;
+  constexpr bool is_seastar_future = seastar::is_future<raw_return_type>::value && !is_errorator;
+  constexpr bool is_plain = !is_errorator && !is_seastar_future;
+  const auto original_core = seastar::this_shard_id();
+  if (store.get_owner_shard() == seastar::this_shard_id()) {
+    if constexpr (is_plain) {
+      return seastar::make_ready_future<raw_return_type>(
+        ((*store).*MemberFunc)(std::forward<Args>(args)...));
+    } else {
+      return ((*store).*MemberFunc)(std::forward<Args>(args)...);
+    }
+  } else {
+    if constexpr (is_errorator) {
+      auto fut = seastar::smp::submit_to(
+        store.get_owner_shard(),
+        [f_store=store.get(), args=std::make_tuple(std::forward<Args>(args)...)]() mutable {
+          return std::apply([f_store](auto&&... args) {
+            return ((*f_store).*MemberFunc)(std::forward<decltype(args)>(args)...).to_base();
+          }, std::move(args));
+        }).then([original_core] (auto&& result) {
+          return seastar::smp::submit_to(original_core,
+            [result = std::forward<decltype(result)>(result)]() mutable {
+              return std::forward<decltype(result)>(result);
+          });
+        });
+      return raw_return_type(std::move(fut));
+    } else {
+      auto fut = seastar::smp::submit_to(
+        store.get_owner_shard(),
+        [f_store=store.get(), args=std::make_tuple(std::forward<Args>(args)...)]() mutable {
+        return std::apply([f_store](auto&&... args) {
+          return ((*f_store).*MemberFunc)(std::forward<decltype(args)>(args)...);
+        }, std::move(args));
+      });
+      if constexpr (std::is_same_v<raw_return_type, seastar::future<>>) {
+        return fut.then([original_core] {
+          return seastar::smp::submit_to(original_core, [] {
+            return seastar::make_ready_future<>();
+          });
+        });
+      } else {
+        return fut.then([original_core](auto&& result) {
+          return seastar::smp::submit_to(original_core,
+            [result = std::forward<decltype(result)>(result)]() mutable {
+              return std::forward<decltype(result)>(result);
+          });
+        });
+      }
+    }
+  }
+}
+
+seastar::future<> with_store_do_transaction(
+  crimson::os::FuturizedStore::StoreShardRef store,
+  FuturizedStore::Shard::CollectionRef ch,
+  ceph::os::Transaction&& txn);
+
 }