class OnCommit final: public Context
{
int cpuid;
- Context* on_commit = nullptr;
+ Context *oncommit;
+ seastar::promise<> &alien_done;
public:
- seastar::promise<> alien_done;
- OnCommit(int id, ceph::os::Transaction& txn): cpuid(id) {
- if (txn.has_contexts()) {
- on_commit = txn.get_on_commit();
- }
- }
+ OnCommit(
+ int id,
+ seastar::promise<> &done,
+ Context *oncommit,
+ ceph::os::Transaction& txn)
+ : cpuid(id), oncommit(oncommit),
+ alien_done(done) {}
void finish(int) final {
- auto fut = seastar::alien::submit_to(cpuid, [this] {
- if (on_commit) {
- on_commit->complete(0);
- }
+ return seastar::alien::submit_to(cpuid, [this] {
+ if (oncommit) oncommit->complete(0);
alien_done.set_value();
return seastar::make_ready_future<>();
- });
- fut.wait();
+ }).wait();
}
};
}
ceph::os::Transaction&& txn)
{
logger().debug("{}", __func__);
- auto callback =
- std::make_unique<OnCommit>(seastar::engine().cpu_id(), txn);
- return seastar::do_with(std::move(txn), std::move(callback),
- [this, ch] (ceph::os::Transaction &txn, auto &callback) {
- return seastar::with_gate(transaction_gate, [this, ch, &txn, &callback] {
- return tp_mutex.lock().then ([this, ch, &txn, &callback] {
- return tp->submit([=, &txn, &callback] {
- txn.register_on_commit(callback.get());
- auto c = static_cast<AlienCollection*>(ch.get());
- return store->queue_transaction(c->collection, std::move(txn));
- });
- }).then([this, &callback] (int) {
- tp_mutex.unlock();
- return callback->alien_done.get_future();
+ auto id = seastar::engine().cpu_id();
+ auto done = seastar::promise<>();
+ return seastar::do_with(
+ std::move(txn),
+ std::move(done),
+ [this, ch, id] (auto &txn, auto &done) {
+ return seastar::with_gate(transaction_gate, [this, ch, id, &txn, &done] {
+ return tp_mutex.lock().then ([this, ch, id, &txn, &done] {
+ Context *crimson_wrapper =
+ ceph::os::Transaction::collect_all_contexts(txn);
+ return tp->submit([this, ch, id, crimson_wrapper, &txn, &done] {
+ txn.register_on_commit(new OnCommit(id, done, crimson_wrapper, txn));
+ auto c = static_cast<AlienCollection*>(ch.get());
+ return store->queue_transaction(c->collection, std::move(txn));
+ });
+ }).then([this, &done] (int) {
+ tp_mutex.unlock();
+ return done.get_future();
+ });
});
});
- });
}
seastar::future<> AlienStore::write_meta(const std::string& key,