From fe90f687c0e700d32a1bd140d614313ca190716c Mon Sep 17 00:00:00 2001 From: chunmei liu Date: Tue, 15 Jul 2025 20:32:21 -0700 Subject: [PATCH] crimson/os/futurized_store: support cross core store calling Signed-off-by: chunmei liu --- src/crimson/os/futurized_store.cc | 31 +++++++++++++++ src/crimson/os/futurized_store.h | 65 ++++++++++++++++++++++++++++++- 2 files changed, 95 insertions(+), 1 deletion(-) diff --git a/src/crimson/os/futurized_store.cc b/src/crimson/os/futurized_store.cc index edca056c18a..da4aa5bf8dc 100644 --- a/src/crimson/os/futurized_store.cc +++ b/src/crimson/os/futurized_store.cc @@ -33,4 +33,35 @@ FuturizedStore::create(const std::string& type, } } +seastar::future<> with_store_do_transaction( + crimson::os::FuturizedStore::StoreShardRef store, + FuturizedStore::Shard::CollectionRef ch, + ceph::os::Transaction&& txn) +{ + std::unique_ptr on_commit( + ceph::os::Transaction::collect_all_contexts(txn)); + const auto original_core = seastar::this_shard_id(); + if (store.get_owner_shard() == seastar::this_shard_id()) { + return store->do_transaction_no_callbacks( + std::move(ch), std::move(txn) + ).then([on_commit=std::move(on_commit)]() mutable { + auto c = on_commit.release(); + if (c) c->complete(0); + return seastar::now(); + }); + } else { + return seastar::smp::submit_to( + store.get_owner_shard(), + [f_store=store.get(), ch=std::move(ch), txn=std::move(txn)]() mutable { + return f_store->do_transaction_no_callbacks( + std::move(ch), std::move(txn)); + }).then([original_core, on_commit=std::move(on_commit)]() mutable { + return seastar::smp::submit_to(original_core, [on_commit=std::move(on_commit)]() mutable { + auto c = on_commit.release(); + if (c) c->complete(0); + return seastar::now(); + }); + }); + } +} } diff --git a/src/crimson/os/futurized_store.h b/src/crimson/os/futurized_store.h index 851dc60e367..eb932fca72a 100644 --- a/src/crimson/os/futurized_store.h +++ b/src/crimson/os/futurized_store.h @@ -149,7 +149,7 @@ public: virtual seastar::future<> set_collection_opts(CollectionRef c, const pool_opts_t& opts) = 0; - protected: + public: virtual seastar::future<> do_transaction_no_callbacks( CollectionRef ch, ceph::os::Transaction&& txn) = 0; @@ -259,4 +259,67 @@ public: protected: const core_id_t primary_core; }; + +template +auto with_store(crimson::os::FuturizedStore::StoreShardRef store, Args&&... args) +{ + using raw_return_type = decltype((std::declval().*MemberFunc)(std::forward(args)...)); + + constexpr bool is_errorator = is_errorated_future_v; + constexpr bool is_seastar_future = seastar::is_future::value && !is_errorator; + constexpr bool is_plain = !is_errorator && !is_seastar_future; + const auto original_core = seastar::this_shard_id(); + if (store.get_owner_shard() == seastar::this_shard_id()) { + if constexpr (is_plain) { + return seastar::make_ready_future( + ((*store).*MemberFunc)(std::forward(args)...)); + } else { + return ((*store).*MemberFunc)(std::forward(args)...); + } + } else { + if constexpr (is_errorator) { + auto fut = seastar::smp::submit_to( + store.get_owner_shard(), + [f_store=store.get(), args=std::make_tuple(std::forward(args)...)]() mutable { + return std::apply([f_store](auto&&... args) { + return ((*f_store).*MemberFunc)(std::forward(args)...).to_base(); + }, std::move(args)); + }).then([original_core] (auto&& result) { + return seastar::smp::submit_to(original_core, + [result = std::forward(result)]() mutable { + return std::forward(result); + }); + }); + return raw_return_type(std::move(fut)); + } else { + auto fut = seastar::smp::submit_to( + store.get_owner_shard(), + [f_store=store.get(), args=std::make_tuple(std::forward(args)...)]() mutable { + return std::apply([f_store](auto&&... args) { + return ((*f_store).*MemberFunc)(std::forward(args)...); + }, std::move(args)); + }); + if constexpr (std::is_same_v>) { + return fut.then([original_core] { + return seastar::smp::submit_to(original_core, [] { + return seastar::make_ready_future<>(); + }); + }); + } else { + return fut.then([original_core](auto&& result) { + return seastar::smp::submit_to(original_core, + [result = std::forward(result)]() mutable { + return std::forward(result); + }); + }); + } + } + } +} + +seastar::future<> with_store_do_transaction( + crimson::os::FuturizedStore::StoreShardRef store, + FuturizedStore::Shard::CollectionRef ch, + ceph::os::Transaction&& txn); + } -- 2.47.3