class OnCommit final: public Context
{
const int cpuid;
- Context *oncommit;
seastar::alien::instance &alien;
seastar::promise<> &alien_done;
public:
OnCommit(
int id,
seastar::promise<> &done,
- Context *oncommit,
seastar::alien::instance &alien,
ceph::os::Transaction& txn)
: cpuid(id),
- oncommit(oncommit),
alien(alien),
alien_done(done) {
}
void finish(int) final {
return seastar::alien::submit_to(alien, cpuid, [this] {
- if (oncommit) {
- oncommit->complete(0);
- }
alien_done.set_value();
return seastar::make_ready_future<>();
}).wait();
});
}
-seastar::future<> AlienStore::do_transaction(CollectionRef ch,
- ceph::os::Transaction&& txn)
+seastar::future<> AlienStore::do_transaction_no_callbacks(
+ CollectionRef ch,
+ ceph::os::Transaction&& txn)
{
logger().debug("{}", __func__);
auto id = seastar::this_shard_id();
AlienCollection* alien_coll = static_cast<AlienCollection*>(ch.get());
// moving the `ch` is crucial for buildability on newer S* versions.
return alien_coll->with_lock([this, ch=std::move(ch), id, &txn, &done] {
- Context *crimson_wrapper =
- ceph::os::Transaction::collect_all_contexts(txn);
assert(tp);
return tp->submit(ch->get_cid().hash_to_shard(tp->size()),
- [this, ch, id, crimson_wrapper, &txn, &done, &alien=seastar::engine().alien()] {
- txn.register_on_commit(new OnCommit(id, done, crimson_wrapper,
- alien, txn));
+ [this, ch, id, &txn, &done, &alien=seastar::engine().alien()] {
+ txn.register_on_commit(new OnCommit(id, done, alien, txn));
auto c = static_cast<AlienCollection*>(ch.get());
return store->queue_transaction(c->collection, std::move(txn));
});
seastar::future<CollectionRef> open_collection(const coll_t& cid) final;
seastar::future<std::vector<coll_t>> list_collections() final;
- seastar::future<> do_transaction(CollectionRef c,
- ceph::os::Transaction&& txn) final;
+ seastar::future<> do_transaction_no_callbacks(
+ CollectionRef c,
+ ceph::os::Transaction&& txn) final;
// error injection
seastar::future<> inject_data_error(const ghobject_t& o) final;
o->omap_header);
}
-seastar::future<> CyanStore::do_transaction(CollectionRef ch,
- ceph::os::Transaction&& t)
+seastar::future<> CyanStore::do_transaction_no_callbacks(
+ CollectionRef ch,
+ ceph::os::Transaction&& t)
{
using ceph::os::Transaction;
int r = 0;
logger().error("{}", str.str());
ceph_assert(r == 0);
}
- for (auto i : {
- t.get_on_applied(),
- t.get_on_commit(),
- t.get_on_applied_sync()}) {
- if (i) {
- i->complete(0);
- }
- }
return seastar::now();
}
seastar::future<CollectionRef> open_collection(const coll_t& cid) final;
seastar::future<std::vector<coll_t>> list_collections() final;
- seastar::future<> do_transaction(CollectionRef ch,
- ceph::os::Transaction&& txn) final;
+ seastar::future<> do_transaction_no_callbacks(
+ CollectionRef ch,
+ ceph::os::Transaction&& txn) final;
seastar::future<> write_meta(const std::string& key,
const std::string& value) final;
virtual seastar::future<CollectionRef> open_collection(const coll_t& cid) = 0;
virtual seastar::future<std::vector<coll_t>> list_collections() = 0;
- virtual seastar::future<> do_transaction(CollectionRef ch,
- ceph::os::Transaction&& txn) = 0;
+protected:
+ virtual seastar::future<> do_transaction_no_callbacks(
+ CollectionRef ch,
+ ceph::os::Transaction&& txn) = 0;
+
+public:
+ seastar::future<> do_transaction(
+ CollectionRef ch,
+ ceph::os::Transaction&& txn) {
+ std::unique_ptr<Context> on_commit(
+ ceph::os::Transaction::collect_all_contexts(txn));
+ return do_transaction_no_callbacks(
+ std::move(ch), std::move(txn)
+ ).then([on_commit=std::move(on_commit)]() mutable {
+ auto c = on_commit.release();
+ if (c) c->complete(0);
+ return seastar::now();
+ });
+ }
+
+
/**
* flush
*
seastar::future<std::vector<coll_t>> list_collections() final {
return proxy(&T::list_collections);
}
- seastar::future<> do_transaction(
+ seastar::future<> do_transaction_no_callbacks(
CollectionRef ch,
ceph::os::Transaction &&txn) final {
- return proxy(&T::do_transaction, std::move(ch), std::move(txn));
+ return proxy(&T::do_transaction_no_callbacks, std::move(ch), std::move(txn));
}
seastar::future<> flush(CollectionRef ch) final {
return proxy(&T::flush, std::move(ch));
abort();
}
-seastar::future<> SeaStore::do_transaction(
+seastar::future<> SeaStore::do_transaction_no_callbacks(
CollectionRef _ch,
ceph::os::Transaction&& _t)
{
}).si_then([this, &ctx] {
return transaction_manager->submit_transaction(*ctx.transaction);
});
- }).safe_then([&ctx]() {
- for (auto i : {
- ctx.ext_transaction.get_on_applied(),
- ctx.ext_transaction.get_on_commit(),
- ctx.ext_transaction.get_on_applied_sync()}) {
- if (i) {
- i->complete(0);
- }
- }
- return seastar::now();
});
});
}
seastar::future<CollectionRef> open_collection(const coll_t& cid) final;
seastar::future<std::vector<coll_t>> list_collections() final;
- seastar::future<> do_transaction(
+ seastar::future<> do_transaction_no_callbacks(
CollectionRef ch,
ceph::os::Transaction&& txn) final;