{
LOG_PREFIX(TransactionManager::do_submit_transaction);
SUBDEBUGT(seastore_t, "start, entering ool_writes", tref);
- return trans_intr::make_interruptible(
+ co_await trans_intr::make_interruptible(
tref.get_handle().enter(write_pipeline.ool_writes_and_lba_updates)
- ).then_interruptible([this, FNAME, &tref,
- dispatch_result = std::move(dispatch_result)] {
- return seastar::do_with(std::move(dispatch_result),
- [this, FNAME, &tref](auto &dispatch_result) {
- SUBTRACET(seastore_t, "write delayed ool extents", tref);
- return epm->write_delayed_ool_extents(tref, dispatch_result.alloc_map
- ).handle_error_interruptible(
- crimson::ct_error::input_output_error::pass_further(),
- crimson::ct_error::assert_all("invalid error")
- );
- });
- }).si_then([&tref, FNAME, this] {
- return seastar::do_with(
- tref.get_valid_pre_alloc_list(),
- [this, FNAME, &tref](auto &allocated_extents) {
- return update_lba_mappings(tref, allocated_extents
- ).si_then([this, FNAME, &tref, &allocated_extents] {
- auto num_extents = allocated_extents.size();
- SUBTRACET(seastore_t, "process {} allocated extents", tref, num_extents);
- return epm->write_preallocated_ool_extents(tref, allocated_extents
- ).handle_error_interruptible(
- crimson::ct_error::input_output_error::pass_further(),
- crimson::ct_error::assert_all("invalid error")
- );
- });
- });
- }).si_then([this, FNAME, &tref] {
- SUBTRACET(seastore_t, "entering prepare", tref);
- return tref.get_handle().enter(write_pipeline.prepare);
- }).si_then([this, FNAME, &tref, trim_alloc_to=std::move(trim_alloc_to)]() mutable
- -> submit_transaction_iertr::future<> {
- if (trim_alloc_to && *trim_alloc_to != JOURNAL_SEQ_NULL) {
- SUBTRACET(seastore_t, "trim backref_bufs to {}", tref, *trim_alloc_to);
- cache->trim_backref_bufs(*trim_alloc_to);
- }
+ );
- auto record = cache->prepare_record(
- tref,
- journal->get_trimmer().get_journal_head(),
- journal->get_trimmer().get_dirty_tail());
+ SUBTRACET(seastore_t, "write delayed ool extents", tref);
+ co_await epm->write_delayed_ool_extents(
+ tref, dispatch_result.alloc_map
+ );
- tref.get_handle().maybe_release_collection_lock();
- if (tref.get_src() == Transaction::src_t::MUTATE) {
- --(shard_stats.processing_inlock_io_num);
- ++(shard_stats.processing_postlock_io_num);
- }
+ auto allocated_extents = tref.get_valid_pre_alloc_list();
+ co_await update_lba_mappings(tref, allocated_extents);
- SUBTRACET(seastore_t, "submitting record", tref);
- return journal->submit_record(
- std::move(record),
- tref.get_handle(),
- tref.get_src(),
- [this, FNAME, &tref](record_locator_t submit_result)
- {
- SUBDEBUGT(seastore_t, "committed with {}", tref, submit_result);
- auto start_seq = submit_result.write_result.start_seq;
- journal->get_trimmer().set_journal_head(start_seq);
- cache->complete_commit(
- tref,
- submit_result.record_block_base,
- start_seq);
- journal->get_trimmer().update_journal_tails(
- cache->get_oldest_dirty_from().value_or(start_seq),
- cache->get_oldest_backref_dirty_from().value_or(start_seq));
- }).safe_then([&tref] {
- return tref.get_handle().complete();
+ auto num_extents = allocated_extents.size();
+ SUBTRACET(seastore_t, "process {} allocated extents", tref, num_extents);
+ co_await epm->write_preallocated_ool_extents(tref, allocated_extents);
+
+ SUBTRACET(seastore_t, "entering prepare", tref);
+ co_await trans_intr::make_interruptible(
+ tref.get_handle().enter(write_pipeline.prepare)
+ );
+
+ if (trim_alloc_to && *trim_alloc_to != JOURNAL_SEQ_NULL) {
+ SUBTRACET(seastore_t, "trim backref_bufs to {}", tref, *trim_alloc_to);
+ cache->trim_backref_bufs(*trim_alloc_to);
+ }
+
+ auto record = cache->prepare_record(
+ tref,
+ journal->get_trimmer().get_journal_head(),
+ journal->get_trimmer().get_dirty_tail());
+
+ tref.get_handle().maybe_release_collection_lock();
+ if (tref.get_src() == Transaction::src_t::MUTATE) {
+ --(shard_stats.processing_inlock_io_num);
+ ++(shard_stats.processing_postlock_io_num);
+ }
+
+ SUBTRACET(seastore_t, "submitting record", tref);
+ co_await journal->submit_record(
+ std::move(record),
+ tref.get_handle(),
+ tref.get_src(),
+ [this, FNAME, &tref](record_locator_t submit_result) {
+ SUBDEBUGT(seastore_t, "committed with {}", tref, submit_result);
+ auto start_seq = submit_result.write_result.start_seq;
+ journal->get_trimmer().set_journal_head(start_seq);
+ cache->complete_commit(
+ tref,
+ submit_result.record_block_base,
+ start_seq);
+ journal->get_trimmer().update_journal_tails(
+ cache->get_oldest_dirty_from().value_or(start_seq),
+ cache->get_oldest_backref_dirty_from().value_or(start_seq));
}).handle_error(
submit_transaction_iertr::pass_further{},
crimson::ct_error::assert_all{"Hit error submitting to journal"}
);
- });
+
+ co_await trans_intr::make_interruptible(
+ tref.get_handle().complete()
+ );
}
seastar::future<> TransactionManager::flush(OrderingHandle &handle)