For out-of-line write only.
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
stats.md_bytes += record.size.get_raw_mdlength();
stats.num_records += 1;
- return record_submitter.submit(
+ auto ret = record_submitter.submit(
std::move(record),
- with_atomic_roll_segment
- ).safe_then([this, FNAME, &t, extents=std::move(extents)
+ with_atomic_roll_segment);
+ return std::move(ret.future
+ ).safe_then([this, FNAME, &t,
+ record_base=ret.record_base_regardless_md,
+ extents=std::move(extents)
](record_locator_t ret) mutable {
- DEBUGT("{} finish with {} and {} extents",
+ DEBUGT("{} finish with {}={} and {} extents",
t, segment_allocator.get_name(),
- ret, extents.size());
+ ret, record_base, extents.size());
paddr_t extent_addr = ret.record_block_base;
+ // ool won't write metadata, so the paddrs must be equal
+ assert(record_base.offset == extent_addr);
for (auto& extent : extents) {
TRACET("{} ool extent written at {} -- {}",
t, segment_allocator.get_name(),
(void*)&handle,
action == RecordSubmitter::action_t::SUBMIT_FULL ?
"FULL" : "NOT_FULL");
- auto submit_fut = record_submitter.submit(std::move(record));
+ auto submit_ret = record_submitter.submit(std::move(record));
+ // submit_ret.record_base_regardless_md is wrong for journaling
return handle.enter(write_pipeline->device_submission
- ).then([submit_fut=std::move(submit_fut)]() mutable {
+ ).then([submit_fut=std::move(submit_ret.future)]() mutable {
return std::move(submit_fut);
}).safe_then([FNAME, this, &handle](record_locator_t result) {
return handle.enter(write_pipeline->finalize
namespace crimson::os::seastore::journal {
-RecordBatch::add_pending_ret
+RecordBatch::add_pending_ret_t
RecordBatch::add_pending(
const std::string& name,
record_t&& record,
assert(write_base.has_value());
assert(io_promise.has_value());
- return io_promise->get_shared_future(
- ).then([dlength_offset, FNAME, &name, write_base=*write_base
- ](auto maybe_promise_result) -> add_pending_ret {
+ auto _write_base = *write_base;
+ auto fut = io_promise->get_shared_future(
+ ).then([dlength_offset, FNAME, &name, _write_base
+ ](auto maybe_promise_result) -> add_pending_fut {
if (!maybe_promise_result.has_value()) {
ERROR("{} write failed", name);
return crimson::ct_error::input_output_error::make();
}
auto submit_result = record_locator_t{
- write_base.offset.add_offset(
+ _write_base.offset.add_offset(
maybe_promise_result->mdlength + dlength_offset),
- write_result_t{write_base, maybe_promise_result->write_length}
+ write_result_t{_write_base, maybe_promise_result->write_length}
};
TRACE("{} write finish with {}", name, submit_result);
- return add_pending_ret(
+ return add_pending_fut(
add_pending_ertr::ready_future_marker{},
submit_result);
});
+ _write_base.offset = _write_base.offset.add_offset(dlength_offset);
+ return {_write_base, std::move(fut)};
}
RecordBatch::encode_ret_t RecordBatch::encode_batch(
write_result_t result{
journal_allocator.get_written_to(),
to_write.length()};
- return journal_allocator.write(std::move(to_write)
+ auto write_fut = journal_allocator.write(std::move(to_write)
).safe_then([mdlength=sizes.get_mdlength(), result] {
return record_locator_t{
result.start_seq.offset.add_offset(mdlength),
}).finally([this] {
decrement_io_with_flush();
});
+ return {result.start_seq, std::move(write_fut)};
}
// indirect batched write
std::optional<journal_seq_t> maybe_write_base;
assert(*p_current_batch->get_write_base() ==
journal_allocator.get_written_to());
}
- auto write_fut = p_current_batch->add_pending(
+ auto ret = p_current_batch->add_pending(
get_name(),
std::move(record),
journal_allocator.get_block_size(),
num_outstanding_io);
assert(!p_current_batch->needs_flush());
}
- return write_fut;
+ return ret;
}
RecordSubmitter::open_ret
//
// write_base must be assigned when the state is empty
using add_pending_ertr = JournalAllocator::write_ertr;
- using add_pending_ret = add_pending_ertr::future<record_locator_t>;
- add_pending_ret add_pending(
+ using add_pending_fut = add_pending_ertr::future<record_locator_t>;
+ struct add_pending_ret_t {
+ // The supposed record base if no metadata,
+ // only useful in case of ool.
+ journal_seq_t record_base_regardless_md;
+ add_pending_fut future;
+ };
+ add_pending_ret_t add_pending(
const std::string& name,
record_t&&,
extent_len_t block_size,
roll_segment_ertr::future<> roll_segment();
// when available, submit the record if possible
- using submit_ertr = base_ertr;
- using submit_ret = submit_ertr::future<record_locator_t>;
+ using submit_ret = RecordBatch::add_pending_ret_t;
submit_ret submit(record_t&&, bool with_atomic_roll_segment=false);
void update_committed_to(const journal_seq_t& new_committed_to) {
(void*)&handle,
action == RecordSubmitter::action_t::SUBMIT_FULL ?
"FULL" : "NOT_FULL");
- auto submit_fut = record_submitter.submit(std::move(record));
+ auto submit_ret = record_submitter.submit(std::move(record));
+ // submit_ret.record_base_regardless_md is wrong for journaling
return handle.enter(write_pipeline->device_submission
- ).then([submit_fut=std::move(submit_fut)]() mutable {
+ ).then([submit_fut=std::move(submit_ret.future)]() mutable {
return std::move(submit_fut);
}).safe_then([FNAME, this, &handle](record_locator_t result) {
return handle.enter(write_pipeline->finalize