From: chunmei liu Date: Wed, 16 Jul 2025 03:32:21 +0000 (-0700) Subject: crimson/os/futurized_store: support cross core store calling X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e628ea2e4502b5d3e301edeb4d8aa79e579066a1;p=ceph-ci.git crimson/os/futurized_store: support cross core store calling Signed-off-by: chunmei liu --- diff --git a/src/crimson/os/futurized_store.cc b/src/crimson/os/futurized_store.cc index edca056c18a..ffcf6fe6b6a 100644 --- a/src/crimson/os/futurized_store.cc +++ b/src/crimson/os/futurized_store.cc @@ -33,4 +33,35 @@ FuturizedStore::create(const std::string& type, } } +seastar::future<> with_store_do_transaction( + BackendStore store, + FuturizedStore::Shard::CollectionRef ch, + ceph::os::Transaction&& txn) +{ + std::unique_ptr on_commit( + ceph::os::Transaction::collect_all_contexts(txn)); + const auto original_core = seastar::this_shard_id(); + if (store.shard_id == original_core || store.shard_id == GLOBAL_STORE) { + return store.f_store.get_sharded_store(store.store_index).do_transaction_no_callbacks( + std::move(ch), std::move(txn) + ).then([on_commit=std::move(on_commit)]() mutable { + auto c = on_commit.release(); + if (c) c->complete(0); + return seastar::now(); + }); + } else { + return seastar::smp::submit_to( + store.shard_id, + [store, ch=std::move(ch), txn=std::move(txn)]() mutable { + return store.f_store.get_sharded_store(store.store_index).do_transaction_no_callbacks( + std::move(ch), std::move(txn)); + }).then([original_core, on_commit=std::move(on_commit)]() mutable { + return seastar::smp::submit_to(original_core, [on_commit=std::move(on_commit)]() mutable { + auto c = on_commit.release(); + if (c) c->complete(0); + return seastar::now(); + }); + }); + } +} } diff --git a/src/crimson/os/futurized_store.h b/src/crimson/os/futurized_store.h index 487fa6e81eb..3abc364dac8 100644 --- a/src/crimson/os/futurized_store.h +++ b/src/crimson/os/futurized_store.h @@ -154,7 +154,7 @@ public: virtual seastar::future<> set_collection_opts(CollectionRef c, const pool_opts_t& opts) = 0; - protected: + public: virtual seastar::future<> do_transaction_no_callbacks( CollectionRef ch, ceph::os::Transaction&& txn) = 0; @@ -258,4 +258,69 @@ public: protected: const core_id_t primary_core; }; + +template +auto with_store(BackendStore store, Args&&... args) +{ + using raw_return_type = decltype((std::declval().*MemberFunc)(std::forward(args)...)); + + constexpr bool is_errorator = is_errorated_future_v; + constexpr bool is_seastar_future = seastar::is_future::value && !is_errorator; + constexpr bool is_plain = !is_errorator && !is_seastar_future; + const auto original_core = seastar::this_shard_id(); + if (store.shard_id == seastar::this_shard_id() || store.shard_id == GLOBAL_STORE) { + if constexpr (is_plain) { + return seastar::make_ready_future( + (store.f_store.get_sharded_store(store.store_index).*MemberFunc)(std::forward(args)...)); + } else { + return (store.f_store.get_sharded_store(store.store_index).*MemberFunc)(std::forward(args)...); + } + } else { + if constexpr (is_errorator) { + auto fut = seastar::smp::submit_to( + store.shard_id, + [store, args=std::make_tuple(std::forward(args)...)]() mutable { + return std::apply([store](auto&&... args) { + return (store.f_store.get_sharded_store(store.store_index).*MemberFunc)( + std::forward(args)...).to_base(); + }, std::move(args)); + }).then([original_core] (auto&& result) { + return seastar::smp::submit_to(original_core, + [result = std::forward(result)]() mutable { + return std::forward(result); + }); + }); + return raw_return_type(std::move(fut)); + } else { + auto fut = seastar::smp::submit_to( + store.shard_id, + [store, args=std::make_tuple(std::forward(args)...)]() mutable { + return std::apply([store](auto&&... args) { + return (store.f_store.get_sharded_store(store.store_index).*MemberFunc)( + std::forward(args)...); + }, std::move(args)); + }); + if constexpr (std::is_same_v>) { + return fut.then([original_core] { + return seastar::smp::submit_to(original_core, [] { + return seastar::make_ready_future<>(); + }); + }); + } else { + return fut.then([original_core](auto&& result) { + return seastar::smp::submit_to(original_core, + [result = std::forward(result)]() mutable { + return std::forward(result); + }); + }); + } + } + } +} + +seastar::future<> with_store_do_transaction( + BackendStore store, + FuturizedStore::Shard::CollectionRef ch, + ceph::os::Transaction&& txn); + }