extent_offset += extent.get_bptr().length();
}
assert(extent_offset == (segment_off_t)(base + rsize.mdlength + rsize.dlength));
- return encode_record(rsize, std::move(record), block_size, base, nonce);
+ return encode_record(rsize, std::move(record), block_size, journal_seq_t(), nonce);
}
void add_extent(LogicalCachedExtentRef& extent) {
extents.emplace_back(extent);
void set_base(segment_off_t b) {
base = b;
}
- segment_off_t get_base() {
+ segment_off_t get_base() const {
return base;
}
void clear() {
{
auto ret = std::make_unique<scan_extents_ret_bare>();
auto* extents = ret.get();
- return read_segment_header(cursor.get_offset().segment
+ return read_segment_header(cursor.get_segment_id()
).handle_error(
scan_extents_ertr::pass_further{},
crimson::ct_error::assert_all{
found_record_handler_t &handler)
{
auto& segment_manager =
- *segment_managers[cursor.offset.segment.device_id()];
- if (cursor.offset.offset == 0) {
- cursor.offset.offset = segment_manager.get_block_size();
+ *segment_managers[cursor.get_segment_id().device_id()];
+ if (cursor.get_segment_offset() == 0) {
+ cursor.increment(segment_manager.get_block_size());
}
auto retref = std::make_unique<size_t>(0);
auto &budget_used = *retref;
-> scan_valid_records_ertr::future<seastar::stop_iteration> {
return [=, &handler, &cursor, &budget_used] {
if (!cursor.last_valid_header_found) {
- return read_validate_record_metadata(cursor.offset, nonce
+ return read_validate_record_metadata(cursor.seq.offset, nonce
).safe_then([=, &cursor](auto md) {
logger().debug(
"ExtentReader::scan_valid_records: read complete {}",
- cursor.offset);
+ cursor.seq);
if (!md) {
logger().debug(
"ExtentReader::scan_valid_records: found invalid header at {}, presumably at end",
- cursor.offset);
+ cursor.seq);
cursor.last_valid_header_found = true;
return scan_valid_records_ertr::now();
} else {
+ auto new_committed_to = md->first.committed_to;
logger().debug(
- "ExtentReader::scan_valid_records: valid record read at {}",
- cursor.offset);
- cursor.last_committed = paddr_t{
- cursor.offset.segment,
- md->first.committed_to};
+ "ExtentReader::scan_valid_records: valid record read at {}, now committed at {}",
+ cursor.seq,
+ new_committed_to);
+ ceph_assert(cursor.last_committed == journal_seq_t() ||
+ cursor.last_committed <= new_committed_to);
+ cursor.last_committed = new_committed_to;
cursor.pending_records.emplace_back(
- cursor.offset,
+ cursor.seq.offset,
md->first,
md->second);
- cursor.offset.offset +=
- md->first.dlength + md->first.mdlength;
+ cursor.increment(md->first.dlength + md->first.mdlength);
+ ceph_assert(new_committed_to == journal_seq_t() ||
+ new_committed_to < cursor.seq);
return scan_valid_records_ertr::now();
}
}).safe_then([=, &cursor, &budget_used, &handler] {
seastar::stop_iteration>(seastar::stop_iteration::yes);
}
auto &next = cursor.pending_records.front();
- if (next.offset > cursor.last_committed) {
+ journal_seq_t next_seq = {cursor.seq.segment_seq, next.offset};
+ if (cursor.last_committed == journal_seq_t() ||
+ next_seq > cursor.last_committed) {
return scan_valid_records_ertr::make_ready_future<
seastar::stop_iteration>(seastar::stop_iteration::yes);
}
{
logger().debug("Journal::replay_segment: starting at {}", seq);
return seastar::do_with(
- scan_valid_records_cursor(seq.offset),
+ scan_valid_records_cursor(seq),
ExtentReader::found_record_handler_t(
[=, &handler](paddr_t base,
const record_header_t &header,
logger().debug(
"JournalSegmentManager::mark_committed: committed_to {} => {}",
committed_to, new_committed_to);
- assert(new_committed_to.segment_seq <=
- get_segment_seq());
- if (new_committed_to.segment_seq ==
- get_segment_seq()) {
- assert(committed_to.offset.offset < new_committed_to.offset.offset);
- committed_to = new_committed_to;
- }
+ assert(committed_to == journal_seq_t() ||
+ committed_to <= new_committed_to);
+ committed_to = new_committed_to;
}
Journal::JournalSegmentManager::initialize_segment_ertr::future<>
auto header = segment_header_t{
seq,
segment.get_segment_id(),
- segment_provider->get_journal_tail_target(),
+ new_tail,
current_segment_nonce,
false};
logger().debug(
bl.append(bp);
written_to = 0;
- // FIXME: improve committed_to to point to another segment
- committed_to = get_current_write_seq();
return write(bl
).safe_then([this, new_tail, write_size=bl.length()
](journal_seq_t write_start_seq) {
- auto committed_to = write_start_seq;
- committed_to.offset.offset += write_size;
- mark_committed(committed_to);
segment_provider->update_journal_tail_committed(new_tail);
});
}
ceph::bufferlist Journal::RecordBatch::encode_records(
size_t block_size,
- segment_off_t committed_to,
+ const journal_seq_t& committed_to,
segment_nonce_t segment_nonce)
{
logger().debug(
record_t&& record,
const record_size_t& rsize,
size_t block_size,
- segment_off_t committed_to,
+ const journal_seq_t& committed_to,
segment_nonce_t segment_nonce)
{
logger().debug(
return current_segment_nonce;
}
- segment_off_t get_committed_to() const {
- assert(committed_to.segment_seq ==
- get_segment_seq());
- return committed_to.offset.offset;
+ journal_seq_t get_committed_to() const {
+ return committed_to;
}
segment_seq_t get_segment_seq() const {
// Encode the batched records for write.
ceph::bufferlist encode_records(
size_t block_size,
- segment_off_t committed_to,
+ const journal_seq_t& committed_to,
segment_nonce_t segment_nonce);
// Set the write result and reset for reuse
record_t&&,
const record_size_t&,
size_t block_size,
- segment_off_t committed_to,
+ const journal_seq_t& committed_to,
segment_nonce_t segment_nonce);
private:
record_size_t rsize,
record_t &&record,
size_t block_size,
- segment_off_t committed_to,
+ const journal_seq_t& committed_to,
segment_nonce_t current_segment_nonce)
{
bufferlist data_bl;
// Fixed portion
extent_len_t mdlength; // block aligned, length of metadata
extent_len_t dlength; // block aligned, length of data
- uint32_t deltas; // number of deltas
- uint32_t extents; // number of extents
+ uint32_t deltas; // number of deltas
+ uint32_t extents; // number of extents
segment_nonce_t segment_nonce;// nonce of containing segment
- segment_off_t committed_to; // records in this segment prior to committed_to
- // have been fully written
+ journal_seq_t committed_to; // records prior to committed_to have been
+ // fully written, maybe in another segment.
checksum_t data_crc; // crc of data payload
record_size_t rsize,
record_t &&record,
size_t block_size,
- segment_off_t committed_to,
+ const journal_seq_t& committed_to,
segment_nonce_t current_segment_nonce = 0);
/// scan segment for end incrementally
struct scan_valid_records_cursor {
bool last_valid_header_found = false;
- paddr_t offset;
- paddr_t last_committed;
+ journal_seq_t seq;
+ journal_seq_t last_committed;
struct found_record_t {
paddr_t offset;
return last_valid_header_found && pending_records.empty();
}
- paddr_t get_offset() const {
- return offset;
+ segment_id_t get_segment_id() const {
+ return seq.offset.segment;
+ }
+
+ segment_off_t get_segment_offset() const {
+ return seq.offset.offset;
+ }
+
+ void increment(segment_off_t off) {
+ seq.offset.offset += off;
}
scan_valid_records_cursor(
- paddr_t offset)
- : offset(offset) {}
+ journal_seq_t seq)
+ : seq(seq) {}
};
}
SegmentCleaner::gc_reclaim_space_ret SegmentCleaner::gc_reclaim_space()
{
if (!scan_cursor) {
- paddr_t next = P_ADDR_NULL;
- next.segment = get_next_gc_target();
- if (next == P_ADDR_NULL) {
+ journal_seq_t next = get_next_gc_target();
+ if (next == journal_seq_t()) {
logger().debug(
"SegmentCleaner::do_gc: no segments to gc");
return seastar::now();
}
- next.offset = 0;
scan_cursor =
std::make_unique<ExtentReader::scan_extents_cursor>(
next);
logger().debug(
"SegmentCleaner::do_gc: starting gc on segment {}",
- scan_cursor->get_offset().segment);
+ scan_cursor->seq);
} else {
ceph_assert(!scan_cursor->is_complete());
}
});
}).si_then([this, &t] {
if (scan_cursor->is_complete()) {
- t.mark_segment_to_release(scan_cursor->get_offset().segment);
+ t.mark_segment_to_release(scan_cursor->get_segment_id());
}
return ecb->submit_transaction_direct(t);
});
void update_journal_tail_target(journal_seq_t target);
- void init_journal_tail(journal_seq_t tail) {
- journal_tail_target = journal_tail_committed = tail;
- }
-
void init_mkfs(journal_seq_t head) {
journal_tail_target = head;
journal_tail_committed = head;
assert(ret >= 0);
}
- segment_id_t get_next_gc_target() const {
- segment_id_t ret = NULL_SEG_ID;
+ journal_seq_t get_next_gc_target() const {
+ segment_id_t id = NULL_SEG_ID;
segment_seq_t seq = NULL_SEG_SEQ;
int64_t least_live_bytes = std::numeric_limits<int64_t>::max();
for (auto it = segments.begin();
it != segments.end();
++it) {
- auto id = it->first;
+ auto _id = it->first;
const auto& segment_info = it->second;
if (segment_info.is_closed() &&
!segment_info.is_in_journal(journal_tail_committed) &&
- space_tracker->get_usage(id) < least_live_bytes) {
- ret = id;
+ space_tracker->get_usage(_id) < least_live_bytes) {
+ id = _id;
seq = segment_info.journal_segment_seq;
least_live_bytes = space_tracker->get_usage(id);
}
}
- if (ret != NULL_SEG_ID) {
+ if (id != NULL_SEG_ID) {
crimson::get_logger(ceph_subsys_seastore).debug(
"SegmentCleaner::get_next_gc_target: segment {} seq {}",
- ret,
+ id,
seq);
+ return journal_seq_t{seq, {id, 0}};
+ } else {
+ return journal_seq_t();
}
- return ret;
}
SpaceTrackerIRef get_empty_space_tracker() const {
if (!scan_cursor)
return 0;
- return scan_cursor->get_offset().offset;
+ return scan_cursor->get_segment_offset();
}
/// Returns free space available for writes