* unify update_segment_avail_bytes() and set_journal_head() interfaces.
* reuse segment_info_t::written_to to get the current journal head.
* more strict validations about journal head maintainence.
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
auto new_journal_seq = journal_seq_t{
new_segment_seq,
paddr_t::make_seg_paddr(segment_id, written_to)};
- if (type == segment_type_t::OOL) {
- // FIXME: improve the special handling for OOL
- segment_provider.update_segment_avail_bytes(
- new_journal_seq.offset);
- }
+ segment_provider.update_segment_avail_bytes(
+ type, new_journal_seq.offset);
return sref->write(0, bl
).handle_error(
open_ertr::pass_further{},
static_cast<seastore_off_t>(write_length)
};
written_to += write_length;
- if (type == segment_type_t::OOL) {
- // FIXME: improve the special handling for OOL
- segment_provider.update_segment_avail_bytes(
- paddr_t::make_seg_paddr(
- current_segment->get_segment_id(), written_to)
- );
- }
+ segment_provider.update_segment_avail_bytes(
+ type,
+ paddr_t::make_seg_paddr(
+ current_segment->get_segment_id(), written_to)
+ );
return current_segment->write(
write_start_offset, to_write
).handle_error(
namespace crimson::os::seastore {
void segment_info_t::set_open(
- segment_seq_t _seq, segment_type_t _type, std::size_t seg_size)
+ segment_seq_t _seq, segment_type_t _type)
{
ceph_assert(_seq != NULL_SEG_SEQ);
ceph_assert(_type != segment_type_t::NULL_SEG);
- ceph_assert(seg_size > 0);
state = Segment::segment_state_t::OPEN;
seq = _seq;
type = _type;
- open_avail_bytes = seg_size;
+ written_to = 0;
}
void segment_info_t::set_empty()
type = segment_type_t::NULL_SEG;
last_modified = {};
last_rewritten = {};
- open_avail_bytes = 0;
+ written_to = 0;
}
void segment_info_t::set_closed()
}
void segment_info_t::init_closed(
- segment_seq_t _seq, segment_type_t _type)
+ segment_seq_t _seq, segment_type_t _type, std::size_t seg_size)
{
ceph_assert(_seq != NULL_SEG_SEQ);
ceph_assert(_type != segment_type_t::NULL_SEG);
state = Segment::segment_state_t::CLOSED;
seq = _seq;
type = _type;
- open_avail_bytes = 0;
+ written_to = seg_size;
}
std::ostream& operator<<(std::ostream &out, const segment_info_t &info)
<< ", type=" << info.type
<< ", last_modified=" << info.last_modified.time_since_epoch()
<< ", last_rewritten=" << info.last_rewritten.time_since_epoch()
- << ", open_avail_bytes=" << info.open_avail_bytes;
+ << ", written_to=" << info.written_to;
}
return out << ")";
}
segment_size = 0;
+ journal_segment_id = NULL_SEG_ID;
num_in_journal = 0;
+
num_open = 0;
num_empty = 0;
num_closed = 0;
segment, segment_seq_printer_t{seq}, type,
segment_info, num_empty, num_open, num_closed);
ceph_assert(segment_info.is_empty());
- segment_info.init_closed(seq, type);
+ segment_info.init_closed(seq, type, get_segment_size());
ceph_assert(num_empty > 0);
--num_empty;
++num_closed;
if (type == segment_type_t::JOURNAL) {
+ // init_closed won't initialize journal_segment_id
+ ceph_assert(get_journal_head() == JOURNAL_SEQ_NULL);
++num_in_journal;
}
ceph_assert(avail_bytes >= get_segment_size());
segment, segment_seq_printer_t{seq}, type,
segment_info, num_empty, num_open, num_closed);
ceph_assert(segment_info.is_empty());
- segment_info.set_open(seq, type, get_segment_size());
+ segment_info.set_open(seq, type);
ceph_assert(num_empty > 0);
--num_empty;
++num_open;
if (type == segment_type_t::JOURNAL) {
+ if (journal_segment_id != NULL_SEG_ID) {
+ auto& last_journal_segment = segments[journal_segment_id];
+ ceph_assert(last_journal_segment.is_closed());
+ ceph_assert(last_journal_segment.type == segment_type_t::JOURNAL);
+ ceph_assert(last_journal_segment.seq + 1 == seq);
+ }
+ journal_segment_id = segment;
++num_in_journal;
}
++count_open;
ceph_assert(num_open > 0);
--num_open;
++num_closed;
- ceph_assert(avail_bytes >= segment_info.open_avail_bytes);
- avail_bytes -= segment_info.open_avail_bytes;
+ ceph_assert(get_segment_size() >= segment_info.written_to);
+ auto seg_avail_bytes = get_segment_size() - segment_info.written_to;
+ ceph_assert(avail_bytes >= seg_avail_bytes);
+ avail_bytes -= seg_avail_bytes;
++count_close;
}
void segments_info_t::update_written_to(
+ segment_type_t type,
paddr_t offset)
{
LOG_PREFIX(segments_info_t::update_written_to);
auto& saddr = offset.as_seg_paddr();
auto& segment_info = segments[saddr.get_segment_id()];
if (!segment_info.is_open()) {
- ERROR("segment is not open, not updating, offset={}, {}",
- offset, segment_info);
+ ERROR("segment is not open, not updating, type={}, offset={}, {}",
+ type, offset, segment_info);
ceph_abort();
}
- auto new_avail = get_segment_size() - saddr.get_segment_off();
- if (segment_info.open_avail_bytes < new_avail) {
- ERROR("open_avail_bytes should not increase! offset={}, {}",
- offset, segment_info);
+ auto new_written_to = static_cast<std::size_t>(saddr.get_segment_off());
+ ceph_assert(new_written_to <= get_segment_size());
+ if (segment_info.written_to > new_written_to) {
+ ERROR("written_to should not decrease! type={}, offset={}, {}",
+ type, offset, segment_info);
ceph_abort();
}
- DEBUG("offset={}, {}", offset, segment_info);
- auto avail_deduction = segment_info.open_avail_bytes - new_avail;
+ DEBUG("type={}, offset={}, {}", type, offset, segment_info);
+ ceph_assert(type == segment_info.type);
+ auto avail_deduction = new_written_to - segment_info.written_to;
ceph_assert(avail_bytes >= avail_deduction);
avail_bytes -= avail_deduction;
- segment_info.open_avail_bytes = new_avail;
+ segment_info.written_to = new_written_to;
}
bool SpaceTrackerSimple::equals(const SpaceTrackerI &_other) const
journal_seq_t target = std::min(dirty_replay_from, alloc_replay_from);
ceph_assert(target != JOURNAL_SEQ_NULL);
+ auto journal_head = segments.get_journal_head();
ceph_assert(journal_head == JOURNAL_SEQ_NULL ||
journal_head >= target);
if (journal_tail_target == JOURNAL_SEQ_NULL ||
if (committed == JOURNAL_SEQ_NULL) {
return;
}
+ auto journal_head = segments.get_journal_head();
ceph_assert(journal_head == JOURNAL_SEQ_NULL ||
journal_head >= committed);
stats = {};
journal_tail_target = JOURNAL_SEQ_NULL;
journal_tail_committed = JOURNAL_SEQ_NULL;
- journal_head = JOURNAL_SEQ_NULL;
dirty_extents_replay_from = JOURNAL_SEQ_NULL;
alloc_info_replay_from = JOURNAL_SEQ_NULL;
{
LOG_PREFIX(SegmentCleaner::complete_init);
INFO("done, start GC");
- ceph_assert(journal_head != JOURNAL_SEQ_NULL);
+ ceph_assert(segments.get_journal_head() != JOURNAL_SEQ_NULL);
init_complete = true;
gc_process.start();
}
time_point last_modified;
time_point last_rewritten;
- std::size_t open_avail_bytes = 0;
+ std::size_t written_to = 0;
bool is_in_journal(journal_seq_t tail_committed) const {
return type == segment_type_t::JOURNAL &&
return state == Segment::segment_state_t::OPEN;
}
- void init_closed(segment_seq_t, segment_type_t);
+ void init_closed(segment_seq_t, segment_type_t, std::size_t);
- void set_open(segment_seq_t, segment_type_t, std::size_t segment_size);
+ void set_open(segment_seq_t, segment_type_t);
void set_empty();
std::size_t get_available_bytes() const {
return avail_bytes;
}
+ journal_seq_t get_journal_head() const {
+ if (unlikely(journal_segment_id == NULL_SEG_ID)) {
+ return JOURNAL_SEQ_NULL;
+ }
+ auto &segment_info = segments[journal_segment_id];
+ assert(!segment_info.is_empty());
+ assert(segment_info.type == segment_type_t::JOURNAL);
+ assert(segment_info.seq != NULL_SEG_SEQ);
+ return journal_seq_t{
+ segment_info.seq,
+ paddr_t::make_seg_paddr(
+ journal_segment_id,
+ segment_info.written_to)
+ };
+ }
void reset();
void mark_closed(segment_id_t);
- void update_written_to(paddr_t offset);
+ void update_written_to(segment_type_t, paddr_t);
void update_last_modified_rewritten(
segment_id_t id, time_point last_modified, time_point last_rewritten) {
std::size_t segment_size;
+ segment_id_t journal_segment_id;
std::size_t num_in_journal;
+
std::size_t num_open;
std::size_t num_empty;
std::size_t num_closed;
virtual void update_journal_tail_committed(journal_seq_t tail_committed) = 0;
- virtual void update_segment_avail_bytes(paddr_t offset) = 0;
+ virtual void update_segment_avail_bytes(segment_type_t, paddr_t) = 0;
virtual SegmentManagerGroup* get_segment_manager_group() = 0;
/// most recently committed journal_tail
journal_seq_t journal_tail_committed;
- /// head of journal
- journal_seq_t journal_head;
-
ExtentCallbackInterface *ecb = nullptr;
/// populated if there is an IO blocked on hard limits
void update_journal_tail_committed(journal_seq_t committed) final;
- void update_segment_avail_bytes(paddr_t offset) final {
- segments.update_written_to(offset);
+ void update_segment_avail_bytes(segment_type_t type, paddr_t offset) final {
+ segments.update_written_to(type, offset);
gc_process.maybe_wake_on_space_used();
}
void update_alloc_info_replay_from(
journal_seq_t alloc_replay_from);
- void init_mkfs(journal_seq_t head) {
- ceph_assert(head != JOURNAL_SEQ_NULL);
- journal_tail_target = head;
- journal_tail_committed = head;
- journal_head = head;
- }
-
- void set_journal_head(journal_seq_t head) {
- ceph_assert(head != JOURNAL_SEQ_NULL);
- assert(journal_head == JOURNAL_SEQ_NULL || head >= journal_head);
- journal_head = head;
- segments.update_written_to(head.offset);
- gc_process.maybe_wake_on_space_used();
+ void init_mkfs() {
+ auto journal_head = segments.get_journal_head();
+ ceph_assert(journal_head != JOURNAL_SEQ_NULL);
+ journal_tail_target = journal_head;
+ journal_tail_committed = journal_head;
}
using release_ertr = SegmentManagerGroup::release_ertr;
journal_seq_t limit);
journal_seq_t get_dirty_tail() const {
- auto ret = journal_head;
+ auto ret = segments.get_journal_head();
ceph_assert(ret != JOURNAL_SEQ_NULL);
if (ret.segment_seq >= config.target_journal_segments) {
ret.segment_seq -= config.target_journal_segments;
}
journal_seq_t get_dirty_tail_limit() const {
- auto ret = journal_head;
+ auto ret = segments.get_journal_head();
ceph_assert(ret != JOURNAL_SEQ_NULL);
if (ret.segment_seq >= config.max_journal_segments) {
ret.segment_seq -= config.max_journal_segments;
get_available_ratio(),
should_block_on_gc(),
gc_should_reclaim_space(),
- journal_head,
+ segments.get_journal_head(),
journal_tail_target,
journal_tail_committed,
get_dirty_tail(),
return segment_cleaner->mount(
).safe_then([this] {
return journal->open_for_write();
- }).safe_then([this](auto addr) {
- segment_cleaner->init_mkfs(addr);
+ }).safe_then([this](auto) {
+ segment_cleaner->init_mkfs();
return epm->open();
}).safe_then([this, FNAME]() {
return with_transaction_intr(
});
}).safe_then([this] {
return journal->open_for_write();
- }).safe_then([this, FNAME](auto addr) {
- segment_cleaner->set_journal_head(addr);
+ }).safe_then([this, FNAME](auto) {
return seastar::do_with(
create_weak_transaction(
Transaction::src_t::READ, "mount"),
(auto submit_result) mutable {
SUBDEBUGT(seastore_t, "committed with {}", tref, submit_result);
auto start_seq = submit_result.write_result.start_seq;
- auto end_seq = submit_result.write_result.get_end_seq();
- segment_cleaner->set_journal_head(end_seq);
if (seq_to_trim && *seq_to_trim != JOURNAL_SEQ_NULL) {
cache->trim_backref_bufs(*seq_to_trim);
}
void update_journal_tail_committed(journal_seq_t committed) final {}
- void update_segment_avail_bytes(paddr_t offset) final {}
+ void update_segment_avail_bytes(segment_type_t, paddr_t) final {}
SegmentManagerGroup* get_segment_manager_group() final { return sms.get(); }
void update_journal_tail_committed(journal_seq_t paddr) final {}
- void update_segment_avail_bytes(paddr_t offset) final {}
+ void update_segment_avail_bytes(segment_type_t, paddr_t) final {}
SegmentManagerGroup* get_segment_manager_group() final { return sms.get(); }