SegmentedJournal::close_ertr::future<> SegmentedJournal::close()
{
LOG_PREFIX(Journal::close);
- INFO("closing");
+ INFO("closing, committed_to={}",
+ record_submitter.get_committed_to());
metrics.clear();
return journal_segment_manager.close();
}
{
LOG_PREFIX(JournalSegmentManager::close);
if (current_journal_segment) {
- INFO("segment_id={}, seq={}, "
- "written_to={}, committed_to={}, nonce={}",
+ INFO("segment_id={}, seq={}, written_to={}, nonce={}",
current_journal_segment->get_segment_id(),
get_segment_seq(),
written_to,
- committed_to,
current_segment_nonce);
} else {
INFO("no current journal segment");
current_journal_segment->get_segment_id() :
NULL_SEG_ID;
if (current_journal_segment) {
- INFO("closing segment {}, seq={}, "
- "written_to={}, committed_to={}, nonce={}",
+ INFO("closing segment {}, seq={}, written_to={}, nonce={}",
old_segment_id,
get_segment_seq(),
written_to,
- committed_to,
current_segment_nonce);
}
});
}
-void SegmentedJournal::JournalSegmentManager::mark_committed(
- const journal_seq_t& new_committed_to)
-{
- LOG_PREFIX(JournalSegmentManager::mark_committed);
- TRACE("{} => {}", committed_to, new_committed_to);
- assert(committed_to == JOURNAL_SEQ_NULL ||
- committed_to <= new_committed_to);
- committed_to = new_committed_to;
-}
-
SegmentedJournal::JournalSegmentManager::initialize_segment_ertr::future<>
SegmentedJournal::JournalSegmentManager::initialize_segment(Segment& segment)
{
increment_io();
auto num = p_batch->get_num_records();
- auto committed_to = journal_segment_manager.get_committed_to();
auto [to_write, sizes] = p_batch->encode_batch(
- committed_to, journal_segment_manager.get_nonce());
+ journal_committed_to, journal_segment_manager.get_nonce());
DEBUG("{} records, {}, committed_to={}, outstanding_io={} ...",
- num, sizes, committed_to, num_outstanding_io);
+ num, sizes, journal_committed_to, num_outstanding_io);
account_submission(num, sizes);
std::ignore = journal_segment_manager.write(to_write
).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) {
if (do_flush && p_current_batch->is_empty()) {
// fast path with direct write
increment_io();
- auto committed_to = journal_segment_manager.get_committed_to();
auto [to_write, sizes] = p_current_batch->submit_pending_fast(
std::move(record),
journal_segment_manager.get_block_size(),
- committed_to,
+ journal_committed_to,
journal_segment_manager.get_nonce());
DEBUG("H{} fast submit {}, committed_to={}, outstanding_io={} ...",
- (void*)&handle, sizes, committed_to, num_outstanding_io);
+ (void*)&handle, sizes, journal_committed_to, num_outstanding_io);
account_submission(1, sizes);
return journal_segment_manager.write(to_write
).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
return handle.enter(write_pipeline->finalize
).then([this, FNAME, submit_result, &handle] {
DEBUG("H{} finish with {}", (void*)&handle, submit_result);
- journal_segment_manager.mark_committed(
- submit_result.write_result.get_end_seq());
+ auto new_committed_to = submit_result.write_result.get_end_seq();
+ assert(journal_committed_to == JOURNAL_SEQ_NULL ||
+ journal_committed_to <= new_committed_to);
+ journal_committed_to = new_committed_to;
return submit_result;
});
});
return current_segment_nonce;
}
- journal_seq_t get_committed_to() const {
- return committed_to;
- }
-
segment_seq_t get_segment_seq() const {
return next_journal_segment_seq - 1;
}
using write_ertr = base_ertr;
using write_ret = write_ertr::future<write_result_t>;
write_ret write(ceph::bufferlist to_write);
-
- // mark write committed in order
- void mark_committed(const journal_seq_t& new_committed_to);
private:
journal_seq_t get_current_write_seq() const {
current_segment_nonce = 0;
current_journal_segment.reset();
written_to = 0;
- committed_to = JOURNAL_SEQ_NULL;
}
// prepare segment for writes, writes out segment header
SegmentRef current_journal_segment;
seastore_off_t written_to;
- // committed_to may be in a previous journal segment
- journal_seq_t committed_to;
};
class RecordBatch {
return stats.record_group_data_bytes;
}
+ journal_seq_t get_committed_to() const {
+ return journal_committed_to;
+ }
+
void reset_stats() {
stats = {};
}
WritePipeline* write_pipeline = nullptr;
JournalSegmentManager& journal_segment_manager;
+ // committed_to may be in a previous journal segment
+ journal_seq_t journal_committed_to = JOURNAL_SEQ_NULL;
+
std::unique_ptr<RecordBatch[]> batches;
std::size_t current_batch_index;
// should not be nullptr after constructed