auto& segment_manager =
*segment_managers[cursor.get_segment_id().device_id()];
if (cursor.get_segment_offset() == 0) {
- cursor.increment(segment_manager.get_block_size());
+ cursor.increment_seq(segment_manager.get_block_size());
}
auto retref = std::make_unique<size_t>(0);
auto &budget_used = *retref;
auto new_committed_to = header.committed_to;
DEBUG("valid record read at {}, now committed at {}",
cursor.seq, new_committed_to);
- ceph_assert(cursor.last_committed == journal_seq_t() ||
- cursor.last_committed <= new_committed_to);
- cursor.last_committed = new_committed_to;
- cursor.pending_record_groups.emplace_back(
- cursor.seq.offset,
- header,
- std::move(md_bl));
- cursor.increment(header.dlength + header.mdlength);
- ceph_assert(new_committed_to == journal_seq_t() ||
- new_committed_to < cursor.seq);
+ cursor.emplace_record_group(header, std::move(md_bl));
return scan_valid_records_ertr::now();
}
}).safe_then([=, &cursor, &budget_used, &handler] {
next.header,
next.mdbuffer
).safe_then([&cursor] {
- cursor.pending_record_groups.pop_front();
+ cursor.pop_record_group();
});
}
(block_size * blocks_per_segment) + s.get_segment_off());
}
+void scan_valid_records_cursor::emplace_record_group(
+ const record_group_header_t& header, ceph::bufferlist&& md_bl)
+{
+ auto new_committed_to = header.committed_to;
+ ceph_assert(last_committed == journal_seq_t() ||
+ last_committed <= new_committed_to);
+ last_committed = new_committed_to;
+ pending_record_groups.emplace_back(
+ seq.offset,
+ header,
+ std::move(md_bl));
+ increment_seq(header.dlength + header.mdlength);
+ ceph_assert(new_committed_to == journal_seq_t() ||
+ new_committed_to < seq);
+}
}
bool last_valid_header_found = false;
journal_seq_t seq;
journal_seq_t last_committed;
+ std::size_t num_consumed_records = 0;
struct found_record_group_t {
paddr_t offset;
return seq.offset.as_seg_paddr().get_segment_off();
}
- void increment(segment_off_t off) {
+ void increment_seq(segment_off_t off) {
auto& seg_addr = seq.offset.as_seg_paddr();
seg_addr.set_segment_off(
seg_addr.get_segment_off() + off);
}
+ void emplace_record_group(const record_group_header_t&, ceph::bufferlist&&);
+
+ void pop_record_group() {
+ assert(!pending_record_groups.empty());
+ ++num_consumed_records;
+ pending_record_groups.pop_front();
+ }
+
scan_valid_records_cursor(
journal_seq_t seq)
: seq(seq) {}