By introducing a callback upon completing submission.
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
crimson::ct_error::erange,
crimson::ct_error::input_output_error
>;
- using submit_record_ret = submit_record_ertr::future<
- record_locator_t
- >;
- virtual submit_record_ret submit_record(
+ using on_submission_func_t = std::function<
+ void(record_locator_t)>;
+ virtual submit_record_ertr::future<> submit_record(
record_t &&record,
- OrderingHandle &handle
- ) = 0;
+ OrderingHandle &handle,
+ transaction_type_t t_src,
+ on_submission_func_t &&on_submission) = 0;
/**
* flush
virtual replay_ret replay(
delta_handler_t &&delta_handler) = 0;
- virtual seastar::future<> finish_commit(
- transaction_type_t type) = 0;
-
virtual ~Journal() {}
virtual backend_type_t get_type() = 0;
return record_submitter.close();
}
-CircularBoundedJournal::submit_record_ret
+CircularBoundedJournal::submit_record_ertr::future<>
CircularBoundedJournal::submit_record(
record_t &&record,
- OrderingHandle &handle)
+ OrderingHandle &handle,
+ transaction_type_t t_src,
+ on_submission_func_t &&on_submission)
{
LOG_PREFIX(CircularBoundedJournal::submit_record);
DEBUG("H{} {} start ...", (void*)&handle, record);
assert(write_pipeline);
- return do_submit_record(std::move(record), handle);
+ return do_submit_record(
+ std::move(record), handle, std::move(on_submission)
+ ).safe_then([this, t_src] {
+ if (is_trim_transaction(t_src)) {
+ return update_journal_tail(
+ trimmer.get_dirty_tail(),
+ trimmer.get_alloc_tail());
+ } else {
+ return seastar::now();
+ }
+ });
}
-CircularBoundedJournal::submit_record_ret
+CircularBoundedJournal::submit_record_ertr::future<>
CircularBoundedJournal::do_submit_record(
record_t &&record,
- OrderingHandle &handle)
+ OrderingHandle &handle,
+ on_submission_func_t &&on_submission)
{
LOG_PREFIX(CircularBoundedJournal::do_submit_record);
if (!record_submitter.is_available()) {
DEBUG("H{} wait ...", (void*)&handle);
return record_submitter.wait_available(
- ).safe_then([this, record=std::move(record), &handle]() mutable {
- return do_submit_record(std::move(record), handle);
+ ).safe_then([this, record=std::move(record), &handle,
+ on_submission=std::move(on_submission)]() mutable {
+ return do_submit_record(
+ std::move(record), handle, std::move(on_submission));
});
}
auto action = record_submitter.check_action(record.size);
if (action == RecordSubmitter::action_t::ROLL) {
return record_submitter.roll_segment(
- ).safe_then([this, record=std::move(record), &handle]() mutable {
- return do_submit_record(std::move(record), handle);
+ ).safe_then([this, record=std::move(record), &handle,
+ on_submission=std::move(on_submission)]() mutable {
+ return do_submit_record(
+ std::move(record), handle, std::move(on_submission));
});
}
return handle.enter(write_pipeline->device_submission
).then([submit_fut=std::move(submit_ret.future)]() mutable {
return std::move(submit_fut);
- }).safe_then([FNAME, this, &handle](record_locator_t result) {
+ }).safe_then([FNAME, this, &handle, on_submission=std::move(on_submission)
+ ](record_locator_t result) mutable {
return handle.enter(write_pipeline->finalize
- ).then([FNAME, this, result, &handle] {
+ ).then([FNAME, this, result, &handle,
+ on_submission=std::move(on_submission)] {
DEBUG("H{} finish with {}", (void*)&handle, result);
auto new_committed_to = result.write_result.get_end_seq();
record_submitter.update_committed_to(new_committed_to);
- return result;
+ std::invoke(on_submission, result);
+ return seastar::now();
});
});
}
});
}
-seastar::future<> CircularBoundedJournal::finish_commit(transaction_type_t type) {
- if (is_trim_transaction(type)) {
- return update_journal_tail(
- trimmer.get_dirty_tail(),
- trimmer.get_alloc_tail());
- }
- return seastar::now();
-}
-
}
return backend_type_t::RANDOM_BLOCK;
}
- submit_record_ret submit_record(
+ submit_record_ertr::future<> submit_record(
record_t &&record,
- OrderingHandle &handle
+ OrderingHandle &handle,
+ transaction_type_t t_src,
+ on_submission_func_t &&on_submission
) final;
seastar::future<> flush(
return cjs.get_records_start();
}
- seastar::future<> finish_commit(transaction_type_t type) final;
-
using cbj_delta_handler_t = std::function<
replay_ertr::future<bool>(
const record_locator_t&,
cbj_delta_handler_t &&delta_handler,
journal_seq_t tail);
- submit_record_ret do_submit_record(record_t &&record, OrderingHandle &handle);
+ submit_record_ertr::future<> do_submit_record(
+ record_t &&record,
+ OrderingHandle &handle,
+ on_submission_func_t &&on_submission);
void try_read_rolled_header(scan_valid_records_cursor &cursor) {
paddr_t addr = convert_abs_addr_to_paddr(
});
}
-SegmentedJournal::submit_record_ret
+SegmentedJournal::submit_record_ertr::future<>
SegmentedJournal::do_submit_record(
record_t &&record,
- OrderingHandle &handle)
+ OrderingHandle &handle,
+ on_submission_func_t &&on_submission)
{
LOG_PREFIX(SegmentedJournal::do_submit_record);
if (!record_submitter.is_available()) {
DEBUG("H{} wait ...", (void*)&handle);
return record_submitter.wait_available(
- ).safe_then([this, record=std::move(record), &handle]() mutable {
- return do_submit_record(std::move(record), handle);
+ ).safe_then([this, record=std::move(record), &handle,
+ on_submission=std::move(on_submission)]() mutable {
+ return do_submit_record(
+ std::move(record), handle, std::move(on_submission));
});
}
auto action = record_submitter.check_action(record.size);
if (action == RecordSubmitter::action_t::ROLL) {
DEBUG("H{} roll, unavailable ...", (void*)&handle);
return record_submitter.roll_segment(
- ).safe_then([this, record=std::move(record), &handle]() mutable {
- return do_submit_record(std::move(record), handle);
+ ).safe_then([this, record=std::move(record), &handle,
+ on_submission=std::move(on_submission)]() mutable {
+ return do_submit_record(
+ std::move(record), handle, std::move(on_submission));
});
} else { // SUBMIT_FULL/NOT_FULL
DEBUG("H{} submit {} ...",
return handle.enter(write_pipeline->device_submission
).then([submit_fut=std::move(submit_ret.future)]() mutable {
return std::move(submit_fut);
- }).safe_then([FNAME, this, &handle](record_locator_t result) {
+ }).safe_then([FNAME, this, &handle, on_submission=std::move(on_submission)
+ ](record_locator_t result) mutable {
return handle.enter(write_pipeline->finalize
- ).then([FNAME, this, result, &handle] {
+ ).then([FNAME, this, result, &handle,
+ on_submission=std::move(on_submission)] {
DEBUG("H{} finish with {}", (void*)&handle, result);
auto new_committed_to = result.write_result.get_end_seq();
record_submitter.update_committed_to(new_committed_to);
- return result;
+ std::invoke(on_submission, result);
+ return seastar::now();
});
});
}
}
-SegmentedJournal::submit_record_ret
+SegmentedJournal::submit_record_ertr::future<>
SegmentedJournal::submit_record(
record_t &&record,
- OrderingHandle &handle)
+ OrderingHandle &handle,
+ transaction_type_t t_src,
+ on_submission_func_t &&on_submission)
{
LOG_PREFIX(SegmentedJournal::submit_record);
DEBUG("H{} {} start ...", (void*)&handle, record);
return crimson::ct_error::erange::make();
}
- return do_submit_record(std::move(record), handle);
+ return do_submit_record(
+ std::move(record), handle, std::move(on_submission));
}
}
close_ertr::future<> close() final;
- submit_record_ret submit_record(
+ submit_record_ertr::future<> submit_record(
record_t &&record,
- OrderingHandle &handle) final;
+ OrderingHandle &handle,
+ transaction_type_t t_src,
+ on_submission_func_t &&on_submission) final;
seastar::future<> flush(OrderingHandle &handle) final;
backend_type_t get_type() final {
return backend_type_t::SEGMENTED;
}
- seastar::future<> finish_commit(transaction_type_t type) {
- return seastar::now();
- }
bool is_checksum_needed() final {
// segmented journal always requires checksum
}
private:
- submit_record_ret do_submit_record(
+ submit_record_ertr::future<> do_submit_record(
record_t &&record,
- OrderingHandle &handle
- );
+ OrderingHandle &handle,
+ on_submission_func_t &&on_submission);
SegmentSeqAllocatorRef segment_seq_allocator;
SegmentAllocator journal_segment_allocator;
}
SUBTRACET(seastore_t, "submitting record", tref);
- return journal->submit_record(std::move(record), tref.get_handle()
- ).safe_then([this, FNAME, &tref](auto submit_result) mutable {
+ return journal->submit_record(
+ std::move(record),
+ tref.get_handle(),
+ tref.get_src(),
+ [this, FNAME, &tref](record_locator_t submit_result)
+ {
SUBDEBUGT(seastore_t, "committed with {}", tref, submit_result);
auto start_seq = submit_result.write_result.start_seq;
journal->get_trimmer().set_journal_head(start_seq);
journal->get_trimmer().update_journal_tails(
cache->get_oldest_dirty_from().value_or(start_seq),
cache->get_oldest_backref_dirty_from().value_or(start_seq));
- return journal->finish_commit(tref.get_src()
- ).then([&tref] {
- return tref.get_handle().complete();
- });
+ }).safe_then([&tref] {
+ return tref.get_handle().complete();
}).handle_error(
submit_transaction_iertr::pass_further{},
crimson::ct_error::assert_all{"Hit error submitting to journal"}
seastar::future<> submit_transaction(TransactionRef t)
{
auto record = cache->prepare_record(*t, JOURNAL_SEQ_NULL, JOURNAL_SEQ_NULL);
- return journal->submit_record(std::move(record), t->get_handle()).safe_then(
- [this, t=std::move(t)](auto submit_result) mutable {
- cache->complete_commit(
- *t,
+ return seastar::do_with(
+ std::move(t), [this, record=std::move(record)](auto& _t) mutable {
+ auto& t = *_t;
+ return journal->submit_record(
+ std::move(record),
+ t.get_handle(),
+ t.get_src(),
+ [this, &t](auto submit_result) {
+ cache->complete_commit(
+ t,
submit_result.record_block_base,
submit_result.write_result.start_seq);
- complete_commit(*t);
- }).handle_error(crimson::ct_error::assert_all{});
+ complete_commit(t);
+ }
+ ).handle_error(crimson::ct_error::assert_all{});
+ });
}
virtual LBAManager::mkfs_ret test_structure_setup(Transaction &t) = 0;
auto submit_record(record_t&& record) {
entries.push_back(record);
+ entry_validator_t& back = entries.back();
OrderingHandle handle = get_dummy_ordering_handle();
- auto [addr, w_result] = cbj->submit_record(
- std::move(record),
- handle).unsafe_get();
- entries.back().seq = w_result.start_seq;
- entries.back().entries = 1;
- entries.back().magic = cbj->get_cjs().get_cbj_header().magic;
- logger().debug("submit entry to addr {}", entries.back().seq);
- return convert_paddr_to_abs_addr(entries.back().seq.offset);
+ cbj->submit_record(
+ std::move(record),
+ handle,
+ transaction_type_t::MUTATE,
+ [this, &back](auto locator) {
+ back.seq = locator.write_result.start_seq;
+ back.entries = 1;
+ back.magic = cbj->get_cjs().get_cbj_header().magic;
+ logger().debug("submit entry to addr {}", back.seq);
+ }
+ ).unsafe_get();
+ return convert_paddr_to_abs_addr(back.seq.offset);
}
seastar::future<> tear_down_fut() final {
auto submit_record(T&&... _record) {
auto record{std::forward<T>(_record)...};
records.push_back(record);
+ record_validator_t& back = records.back();
OrderingHandle handle = get_dummy_ordering_handle();
- auto [addr, _] = journal->submit_record(
+ journal->submit_record(
std::move(record),
- handle).unsafe_get();
- records.back().record_final_offset = addr;
- return addr;
+ handle,
+ transaction_type_t::MUTATE,
+ [&back](auto locator) {
+ back.record_final_offset = locator.record_block_base;
+ }
+ ).unsafe_get();
+ return back.record_final_offset;
}
extent_t generate_extent(size_t blocks) {