Record may not have its own base if headers are merged.
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
auto segment_nonce = segment_header.segment_nonce;
return seastar::do_with(
found_record_handler_t([extents, this](
- paddr_t base,
+ record_locator_t locator,
const record_header_t& header,
const bufferlist& mdbuf) mutable -> scan_valid_records_ertr::future<>
{
// This should be impossible, we did check the crc on the mdbuf
logger().error(
"ExtentReader::scan_extents: unable to decode extents for record {}",
- base);
+ locator.record_block_base);
return crimson::ct_error::input_output_error::make();
}
- paddr_t extent_offset = base.add_offset(header.mdlength);
+ paddr_t extent_offset = locator.record_block_base;
logger().debug("ExtentReader::scan_extents: decoded {} extents",
maybe_record_extent_infos->size());
for (const auto &i : *maybe_record_extent_infos) {
auto& next = cursor.pending_records.front();
auto total_length = next.header.dlength + next.header.mdlength;
budget_used += total_length;
+ auto locator = record_locator_t{
+ next.offset.add_offset(next.header.mdlength),
+ write_result_t{
+ journal_seq_t{
+ cursor.seq.segment_seq,
+ next.offset
+ },
+ static_cast<segment_off_t>(total_length)
+ }
+ };
return handler(
- next.offset,
+ locator,
next.header,
next.mdbuffer
).safe_then([&cursor] {
size_t>;
using found_record_handler_t = std::function<
scan_valid_records_ertr::future<>(
- paddr_t record_block_base,
+ record_locator_t record_locator,
// callee may assume header and bl will remain valid until
// returned future resolves
const record_header_t &header,
- const bufferlist &bl)>;
+ const bufferlist &mdbuf)>;
scan_valid_records_ret scan_valid_records(
scan_valid_records_cursor &cursor, ///< [in, out] cursor, updated during call
segment_nonce_t nonce, ///< [in] nonce for segment
return seastar::do_with(
scan_valid_records_cursor(seq),
ExtentReader::found_record_handler_t([=, &handler](
- paddr_t base,
+ record_locator_t locator,
const record_header_t& header,
const bufferlist& mdbuf)
-> ExtentReader::scan_valid_records_ertr::future<>
// This should be impossible, we did check the crc on the mdbuf
logger().error(
"Journal::replay_segment: unable to decode deltas for record {}",
- base);
+ locator.record_block_base);
return crimson::ct_error::input_output_error::make();
}
std::move(*maybe_record_deltas_list),
[=](auto &deltas)
{
- logger().debug("Journal::replay_segment: decoded {} deltas at base {}",
+ logger().debug("Journal::replay_segment: decoded {} deltas at block_base {}",
deltas.size(),
- base);
+ locator.record_block_base);
return crimson::do_for_each(
deltas,
[=](auto &delta)
seq.segment_seq)) {
return replay_ertr::now();
} else {
- auto offsets = submit_result_t{
- base.add_offset(header.mdlength),
- write_result_t{
- journal_seq_t{seq.segment_seq, base},
- static_cast<segment_off_t>(header.mdlength + header.dlength)
- }
- };
- return handler(offsets, delta);
+ return handler(locator, delta);
}
});
});
if (!maybe_write_result.has_value()) {
return crimson::ct_error::input_output_error::make();
}
- auto submit_result = submit_result_t{
+ auto submit_result = record_locator_t{
maybe_write_result->start_seq.offset.add_offset(block_start_offset),
*maybe_write_result
};
journal_segment_manager.get_nonce());
return journal_segment_manager.write(to_write
).safe_then([rsize](auto write_result) {
- return submit_result_t{
+ return record_locator_t{
write_result.start_seq.offset.add_offset(rsize.mdlength),
write_result
};
*
* write record with the ordering handle
*/
- struct write_result_t {
- journal_seq_t start_seq;
- segment_off_t length;
-
- journal_seq_t get_end_seq() const {
- return start_seq.add_offset(length);
- }
- };
- struct submit_result_t {
- paddr_t record_block_base;
- write_result_t write_result;
- };
using submit_record_ertr = crimson::errorator<
crimson::ct_error::erange,
crimson::ct_error::input_output_error
>;
using submit_record_ret = submit_record_ertr::future<
- submit_result_t
+ record_locator_t
>;
submit_record_ret submit_record(
record_t &&record,
using replay_ertr = SegmentManager::read_ertr;
using replay_ret = replay_ertr::future<>;
using delta_handler_t = std::function<
- replay_ret(const submit_result_t&,
+ replay_ret(const record_locator_t&,
const delta_info_t&)>;
replay_ret replay(
std::vector<std::pair<segment_id_t, segment_header_t>>&& segment_headers,
// Set write_result_t::write_length to 0 if the record is not the first one
// in the batch.
using add_pending_ertr = JournalSegmentManager::write_ertr;
- using add_pending_ret = add_pending_ertr::future<submit_result_t>;
+ using add_pending_ret = add_pending_ertr::future<record_locator_t>;
add_pending_ret add_pending(record_t&&, const record_size_t&);
// Encode the batched records for write.
using submit_pending_ertr = JournalSegmentManager::write_ertr;
using submit_pending_ret = submit_pending_ertr::future<
- submit_result_t>;
+ record_locator_t>;
submit_pending_ret submit_pending(
record_t&&, const record_size_t&, OrderingHandle &handle, bool flush);
const journal_seq_t& committed_to,
segment_nonce_t current_segment_nonce = 0);
+struct write_result_t {
+ journal_seq_t start_seq;
+ segment_off_t length;
+
+ journal_seq_t get_end_seq() const {
+ return start_seq.add_offset(length);
+ }
+};
+
+struct record_locator_t {
+ paddr_t record_block_base;
+ write_result_t write_result;
+};
+
/// scan segment for end incrementally
struct scan_valid_records_cursor {
bool last_valid_header_found = false;