on_submission_func_t &&on_submission)
{
LOG_PREFIX(CircularBoundedJournal::do_submit_record);
- if (!record_submitter.is_available()) {
- DEBUG("H{} wait ...", (void*)&handle);
- return record_submitter.wait_available(
- ).safe_then([this, record=std::move(record), &handle,
- on_submission=std::move(on_submission)]() mutable {
- return do_submit_record(
- std::move(record), handle, std::move(on_submission));
- });
- }
- auto action = record_submitter.check_action(record.size);
- if (action == RecordSubmitter::action_t::ROLL) {
- return record_submitter.roll_segment(
- ).safe_then([this, record=std::move(record), &handle,
- on_submission=std::move(on_submission)]() mutable {
- return do_submit_record(
- std::move(record), handle, std::move(on_submission));
- });
+
+ RecordSubmitter::action_t action;
+ while (true) {
+ if (!record_submitter.is_available()) {
+ DEBUG("H{} wait ...", (void*)&handle);
+ co_await record_submitter.wait_available();
+ continue;
+ }
+ action = record_submitter.check_action(record.size);
+ if (action == RecordSubmitter::action_t::ROLL) {
+ co_await record_submitter.roll_segment();
+ continue;
+ }
+ break;
}
DEBUG("H{} submit {} ...",
"FULL" : "NOT_FULL");
auto submit_ret = record_submitter.submit(std::move(record));
// submit_ret.record_base_regardless_md is wrong for journaling
- return handle.enter(write_pipeline->device_submission
- ).then([submit_fut=std::move(submit_ret.future)]() mutable {
- return std::move(submit_fut);
- }).safe_then([FNAME, this, &handle, on_submission=std::move(on_submission)
- ](record_locator_t result) mutable {
- return handle.enter(write_pipeline->finalize
- ).then([FNAME, this, result, &handle,
- on_submission=std::move(on_submission)] {
- DEBUG("H{} finish with {}", (void*)&handle, result);
- auto new_committed_to = result.write_result.get_end_seq();
- record_submitter.update_committed_to(new_committed_to);
- std::invoke(on_submission, result);
- return seastar::now();
- });
- });
+ co_await handle.enter(write_pipeline->device_submission);
+
+ record_locator_t result = co_await std::move(submit_ret.future);
+
+ co_await handle.enter(write_pipeline->finalize);
+
+ DEBUG("H{} finish with {}", (void*)&handle, result);
+ auto new_committed_to = result.write_result.get_end_seq();
+ record_submitter.update_committed_to(new_committed_to);
+ std::invoke(on_submission, result);
}
Journal::replay_ret CircularBoundedJournal::replay_segment(