}
}
+seastar::future<> with_store_do_transaction(
+ crimson::os::FuturizedStore::StoreShardRef store,
+ FuturizedStore::Shard::CollectionRef ch,
+ ceph::os::Transaction&& txn)
+{
+ std::unique_ptr<Context> on_commit(
+ ceph::os::Transaction::collect_all_contexts(txn));
+ const auto original_core = seastar::this_shard_id();
+ if (store.get_owner_shard() == seastar::this_shard_id()) {
+ return store->do_transaction_no_callbacks(
+ std::move(ch), std::move(txn)
+ ).then([on_commit=std::move(on_commit)]() mutable {
+ auto c = on_commit.release();
+ if (c) c->complete(0);
+ return seastar::now();
+ });
+ } else {
+ return seastar::smp::submit_to(
+ store.get_owner_shard(),
+ [f_store=store.get(), ch=std::move(ch), txn=std::move(txn)]() mutable {
+ return f_store->do_transaction_no_callbacks(
+ std::move(ch), std::move(txn));
+ }).then([original_core, on_commit=std::move(on_commit)]() mutable {
+ return seastar::smp::submit_to(original_core, [on_commit=std::move(on_commit)]() mutable {
+ auto c = on_commit.release();
+ if (c) c->complete(0);
+ return seastar::now();
+ });
+ });
+ }
+}
}
virtual seastar::future<> set_collection_opts(CollectionRef c,
const pool_opts_t& opts) = 0;
- protected:
+ public:
virtual seastar::future<> do_transaction_no_callbacks(
CollectionRef ch,
ceph::os::Transaction&& txn) = 0;
protected:
const core_id_t primary_core;
};
+
+template<auto MemberFunc, typename... Args>
+auto with_store(crimson::os::FuturizedStore::StoreShardRef store, Args&&... args)
+{
+ using raw_return_type = decltype((std::declval<crimson::os::FuturizedStore::Shard>().*MemberFunc)(std::forward<Args>(args)...));
+
+ constexpr bool is_errorator = is_errorated_future_v<raw_return_type>;
+ constexpr bool is_seastar_future = seastar::is_future<raw_return_type>::value && !is_errorator;
+ constexpr bool is_plain = !is_errorator && !is_seastar_future;
+ const auto original_core = seastar::this_shard_id();
+ if (store.get_owner_shard() == seastar::this_shard_id()) {
+ if constexpr (is_plain) {
+ return seastar::make_ready_future<raw_return_type>(
+ ((*store).*MemberFunc)(std::forward<Args>(args)...));
+ } else {
+ return ((*store).*MemberFunc)(std::forward<Args>(args)...);
+ }
+ } else {
+ if constexpr (is_errorator) {
+ auto fut = seastar::smp::submit_to(
+ store.get_owner_shard(),
+ [f_store=store.get(), args=std::make_tuple(std::forward<Args>(args)...)]() mutable {
+ return std::apply([f_store](auto&&... args) {
+ return ((*f_store).*MemberFunc)(std::forward<decltype(args)>(args)...).to_base();
+ }, std::move(args));
+ }).then([original_core] (auto&& result) {
+ return seastar::smp::submit_to(original_core,
+ [result = std::forward<decltype(result)>(result)]() mutable {
+ return std::forward<decltype(result)>(result);
+ });
+ });
+ return raw_return_type(std::move(fut));
+ } else {
+ auto fut = seastar::smp::submit_to(
+ store.get_owner_shard(),
+ [f_store=store.get(), args=std::make_tuple(std::forward<Args>(args)...)]() mutable {
+ return std::apply([f_store](auto&&... args) {
+ return ((*f_store).*MemberFunc)(std::forward<decltype(args)>(args)...);
+ }, std::move(args));
+ });
+ if constexpr (std::is_same_v<raw_return_type, seastar::future<>>) {
+ return fut.then([original_core] {
+ return seastar::smp::submit_to(original_core, [] {
+ return seastar::make_ready_future<>();
+ });
+ });
+ } else {
+ return fut.then([original_core](auto&& result) {
+ return seastar::smp::submit_to(original_core,
+ [result = std::forward<decltype(result)>(result)]() mutable {
+ return std::forward<decltype(result)>(result);
+ });
+ });
+ }
+ }
+ }
+}
+
+seastar::future<> with_store_do_transaction(
+ crimson::os::FuturizedStore::StoreShardRef store,
+ FuturizedStore::Shard::CollectionRef ch,
+ ceph::os::Transaction&& txn);
+
}