seq.segment_seq)) {
return replay_ertr::now();
} else {
- return handler(
- journal_seq_t{seq.segment_seq, base},
+ auto offsets = submit_result_t{
base.add_offset(header.mdlength),
+ write_result_t{
+ journal_seq_t{seq.segment_seq, base},
+ static_cast<segment_off_t>(header.mdlength + header.dlength)
+ }
+ };
+ return handler(
+ offsets,
delta);
}
});
assert(state != state_t::SUBMITTING);
assert(can_batch(rsize));
- auto record_start_offset = encoded_length;
+ auto block_start_offset = encoded_length + rsize.mdlength;
records.push_back(std::move(record));
record_sizes.push_back(rsize);
auto new_encoded_length = get_encoded_length(rsize);
assert(new_encoded_length < MAX_SEG_OFF);
encoded_length = new_encoded_length;
- bool is_first;
if (state == state_t::EMPTY) {
assert(!io_promise.has_value());
io_promise = seastar::shared_promise<maybe_result_t>();
- is_first = true;
} else {
assert(io_promise.has_value());
- is_first = false;
}
state = state_t::PENDING;
return io_promise->get_shared_future(
- ).then([record_start_offset, is_first
+ ).then([block_start_offset
](auto maybe_write_result) -> add_pending_ret {
if (!maybe_write_result.has_value()) {
return crimson::ct_error::input_output_error::make();
}
- auto write_result = maybe_write_result.value();
- if (is_first) {
- assert(record_start_offset == 0);
- } else {
- assert(record_start_offset > 0);
- write_result.write_start_seq.offset.offset += record_start_offset;
- // only the first record should update JournalSegmentManager::committed_to
- write_result.write_length = 0;
- }
+ auto submit_result = submit_result_t{
+ maybe_write_result->start_seq.offset.add_offset(block_start_offset),
+ *maybe_write_result
+ };
return add_pending_ret(
add_pending_ertr::ready_future_marker{},
- write_result);
+ submit_result);
});
}
logger().debug(
"Journal::RecordBatch::set_result: batches={}, write_start {} + {}",
records.size(),
- maybe_write_result->write_start_seq,
- maybe_write_result->write_length);
+ maybe_write_result->start_seq,
+ maybe_write_result->length);
+ assert(maybe_write_result->length == encoded_length);
} else {
logger().error(
"Journal::RecordBatch::set_result: batches={}, write is failed!",
journal_segment_manager.get_committed_to(),
journal_segment_manager.get_nonce());
return journal_segment_manager.write(to_write
- ).finally([this] {
+ ).safe_then([rsize](auto write_result) {
+ return submit_result_t{
+ write_result.start_seq.offset.add_offset(rsize.mdlength),
+ write_result
+ };
+ }).finally([this] {
decrement_io_with_flush();
});
} else {
return handle.enter(write_pipeline->device_submission
).then([write_fut=std::move(write_fut)]() mutable {
return std::move(write_fut);
- }).safe_then([this, &handle, rsize](auto write_result) {
+ }).safe_then([this, &handle](auto submit_result) {
return handle.enter(write_pipeline->finalize
- ).then([this, write_result, rsize] {
- if (write_result.write_length > 0) {
- auto committed_to = write_result.write_start_seq;
- committed_to.offset.offset += write_result.write_length;
- journal_segment_manager.mark_committed(committed_to);
- }
- return std::make_pair(
- write_result.write_start_seq.offset.add_offset(rsize.mdlength),
- write_result.write_start_seq);
+ ).then([this, submit_result] {
+ journal_segment_manager.mark_committed(
+ submit_result.write_result.get_end_seq());
+ return submit_result;
});
});
}
/**
* submit_record
*
- * @param write record and returns offset of first block and seq
+ * write record with the ordering handle
*/
+ struct write_result_t {
+ journal_seq_t start_seq;
+ segment_off_t length;
+
+ journal_seq_t get_end_seq() const {
+ return start_seq.add_offset(length);
+ }
+ };
+ struct submit_result_t {
+ paddr_t record_block_base;
+ write_result_t write_result;
+ };
using submit_record_ertr = crimson::errorator<
crimson::ct_error::erange,
crimson::ct_error::input_output_error
>;
using submit_record_ret = submit_record_ertr::future<
- std::pair<paddr_t, journal_seq_t>
+ submit_result_t
>;
submit_record_ret submit_record(
record_t &&record,
using replay_ertr = SegmentManager::read_ertr;
using replay_ret = replay_ertr::future<>;
using delta_handler_t = std::function<
- replay_ret(journal_seq_t seq,
- paddr_t record_block_base,
+ replay_ret(const submit_result_t&,
const delta_info_t&)>;
replay_ret replay(
std::vector<std::pair<segment_id_t, segment_header_t>>&& segment_headers,
using roll_ertr = base_ertr;
roll_ertr::future<> roll();
- // write the buffer, return the write start
+ // write the buffer, return the write result
// May be called concurrently, writes may complete in any order.
- struct write_result_t {
- journal_seq_t write_start_seq;
- segment_off_t write_length;
- };
using write_ertr = base_ertr;
using write_ret = write_ertr::future<write_result_t>;
write_ret write(ceph::bufferlist to_write);
//
// Set write_result_t::write_length to 0 if the record is not the first one
// in the batch.
- using add_pending_result_t = JournalSegmentManager::write_result_t;
using add_pending_ertr = JournalSegmentManager::write_ertr;
- using add_pending_ret = JournalSegmentManager::write_ret;
+ using add_pending_ret = add_pending_ertr::future<submit_result_t>;
add_pending_ret add_pending(record_t&&, const record_size_t&);
// Encode the batched records for write.
segment_nonce_t segment_nonce);
// Set the write result and reset for reuse
- using maybe_result_t = std::optional<add_pending_result_t>;
- void set_result(maybe_result_t maybe_write_result);
+ using maybe_result_t = std::optional<write_result_t>;
+ void set_result(maybe_result_t maybe_write_end_seq);
// The fast path that is equivalent to submit a single record as a batch.
//
using submit_pending_ertr = JournalSegmentManager::write_ertr;
using submit_pending_ret = submit_pending_ertr::future<
- std::pair<paddr_t, journal_seq_t> >;
+ submit_result_t>;
submit_pending_ret submit_pending(
record_t&&, const record_size_t&, OrderingHandle &handle, bool flush);
segment_seq_t segment_seq = 0;
paddr_t offset;
+ journal_seq_t add_offset(segment_off_t o) const {
+ return {segment_seq, offset.add_offset(o)};
+ }
+
DENC(journal_seq_t, v, p) {
DENC_START(1, 1, p);
denc(v.segment_seq, p);
[this](auto&& segments) {
return journal->replay(
std::move(segments),
- [this](auto seq, auto paddr, const auto &e) {
- auto fut = cache->replay_delta(seq, paddr, e);
+ [this](const auto &offsets, const auto &e) {
+ auto start_seq = offsets.write_result.start_seq;
segment_cleaner->update_journal_tail_target(
- cache->get_oldest_dirty_from().value_or(seq));
- return fut;
+ cache->get_oldest_dirty_from().value_or(start_seq));
+ return cache->replay_delta(
+ start_seq,
+ offsets.record_block_base,
+ e);
});
}).safe_then([this] {
return journal->open_for_write();
DEBUGT("about to submit to journal", tref);
return journal->submit_record(std::move(record), tref.get_handle()
- ).safe_then([this, FNAME, &tref](auto p) mutable {
- auto [addr, journal_seq] = p;
- DEBUGT("journal commit to {} seq {}", tref, addr, journal_seq);
- segment_cleaner->set_journal_head(journal_seq);
- cache->complete_commit(tref, addr, journal_seq, segment_cleaner.get());
+ ).safe_then([this, FNAME, &tref](auto submit_result) mutable {
+ auto start_seq = submit_result.write_result.start_seq;
+ auto end_seq = submit_result.write_result.get_end_seq();
+ DEBUGT("journal commit to record_block_base={}, start_seq={}, end_seq={}",
+ tref,
+ submit_result.record_block_base,
+ start_seq,
+ end_seq);
+ segment_cleaner->set_journal_head(end_seq);
+ cache->complete_commit(
+ tref,
+ submit_result.record_block_base,
+ start_seq,
+ segment_cleaner.get());
lba_manager->complete_transaction(tref);
segment_cleaner->update_journal_tail_target(
- cache->get_oldest_dirty_from().value_or(journal_seq));
+ cache->get_oldest_dirty_from().value_or(start_seq));
auto to_release = tref.get_segment_to_release();
if (to_release != NULL_SEG_ID) {
return segment_manager.release(to_release
{
auto record = cache->prepare_record(*t);
return journal->submit_record(std::move(record), t->get_handle()).safe_then(
- [this, t=std::move(t)](auto p) mutable {
- auto [addr, seq] = p;
- cache->complete_commit(*t, addr, seq);
+ [this, t=std::move(t)](auto submit_result) mutable {
+ cache->complete_commit(
+ *t,
+ submit_result.record_block_base,
+ submit_result.write_result.start_seq);
complete_commit(*t);
}).handle_error(crimson::ct_error::assert_all{});
}
replay(
[&advance,
&delta_checker]
- (auto seq, auto base, const auto &di) mutable {
+ (const auto &offsets, const auto &di) mutable {
if (!delta_checker) {
EXPECT_FALSE("No Deltas Left");
}
- if (!(*delta_checker)(base, di)) {
+ if (!(*delta_checker)(offsets.record_block_base, di)) {
delta_checker = std::nullopt;
advance();
}