Writer(std::string name, SegmentProvider& sp, SegmentManager& sm);
Writer(Writer &&) = default;
- open_ertr::future<> open() final;
+ open_ertr::future<> open() final {
+ return record_submitter.open().discard_result();
+ }
write_iertr::future<> write(
Transaction& t,
stop_ertr::future<> stop() final {
return write_guard.close().then([this] {
- return segment_allocator.close();
+ return record_submitter.close();
}).safe_then([this] {
write_guard = seastar::gate();
});
next_segment_seq = seq;
}
-SegmentAllocator::open_ertr::future<journal_seq_t>
-SegmentAllocator::open()
+SegmentAllocator::open_ret
+SegmentAllocator::do_open()
{
- LOG_PREFIX(SegmentAllocator::open);
+ LOG_PREFIX(SegmentAllocator::do_open);
ceph_assert(!current_segment);
- std::ostringstream oss;
- oss << "D" << device_id_printer_t{get_device_id()} << "_" << name;
- print_name = oss.str();
segment_seq_t new_segment_seq = get_new_segment_seq_and_increment();
auto new_segment_id = segment_provider.get_segment(
get_device_id(), new_segment_seq);
).handle_error(
open_ertr::pass_further{},
crimson::ct_error::assert_all{
- "Invalid error in SegmentAllocator::open open"
+ "Invalid error in SegmentAllocator::do_open open"
}
).safe_then([this, FNAME, new_segment_seq](auto sref) {
// initialize new segment
).handle_error(
open_ertr::pass_further{},
crimson::ct_error::assert_all{
- "Invalid error in SegmentAllocator::open write"
+ "Invalid error in SegmentAllocator::do_open write"
}
).safe_then([this,
FNAME,
});
}
+SegmentAllocator::open_ret
+SegmentAllocator::open()
+{
+ LOG_PREFIX(SegmentAllocator::open);
+ std::ostringstream oss;
+ oss << "D" << device_id_printer_t{get_device_id()} << "_" << name;
+ print_name = oss.str();
+ INFO("{}", print_name);
+ return do_open();
+}
+
SegmentAllocator::roll_ertr::future<>
SegmentAllocator::roll()
{
ceph_assert(can_write());
return close_segment(true).safe_then([this] {
- return open().discard_result();
+ return do_open().discard_result();
});
}
return [this] {
LOG_PREFIX(SegmentAllocator::close);
if (current_segment) {
+ INFO("{} close current segment", print_name);
return close_segment(false);
} else {
INFO("{} no current segment", print_name);
return write_fut;
}
+RecordSubmitter::open_ret
+RecordSubmitter::open()
+{
+ return segment_allocator.open();
+}
+
+RecordSubmitter::close_ertr::future<>
+RecordSubmitter::close()
+{
+ assert(state == state_t::IDLE);
+ assert(num_outstanding_io == 0);
+ committed_to = JOURNAL_SEQ_NULL;
+ assert(p_current_batch != nullptr);
+ assert(p_current_batch->is_empty());
+ assert(!wait_available_promise.has_value());
+ has_io_error = false;
+ assert(!wait_unfull_flush_promise.has_value());
+ return segment_allocator.close();
+}
+
void RecordSubmitter::update_state()
{
if (num_outstanding_io == 0) {
return length + written_to > std::size_t(write_capacity);
}
- // open for write
+ // open for write and generate the correct print name
using open_ertr = base_ertr;
using open_ret = open_ertr::future<journal_seq_t>;
open_ret open();
close_ertr::future<> close();
private:
+ open_ret do_open();
+
void reset() {
current_segment.reset();
written_to = 0;
committed_to = new_committed_to;
}
+ // open for write, generate the correct print name, and register metrics
+ using open_ertr = base_ertr;
+ using open_ret = open_ertr::future<journal_seq_t>;
+ open_ret open();
+
+ using close_ertr = base_ertr;
+ close_ertr::future<> close();
+
private:
void update_state();
SegmentedJournal::open_for_write_ret SegmentedJournal::open_for_write()
{
- LOG_PREFIX(Journal::open_for_write);
- INFO("device_id={}", journal_segment_allocator.get_device_id());
- return journal_segment_allocator.open();
+ return record_submitter.open();
}
SegmentedJournal::close_ertr::future<> SegmentedJournal::close()
INFO("closing, committed_to={}",
record_submitter.get_committed_to());
metrics.clear();
- return journal_segment_allocator.close();
+ return record_submitter.close();
}
SegmentedJournal::prep_replay_segments_fut