]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore/segment_cleaner: unify update_segment_avail_bytes()
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 29 Apr 2022 08:59:38 +0000 (16:59 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Fri, 13 May 2022 07:51:20 +0000 (15:51 +0800)
* unify update_segment_avail_bytes() and set_journal_head() interfaces.
* reuse segment_info_t::written_to to get the current journal head.
* more strict validations about journal head maintainence.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/journal/segment_allocator.cc
src/crimson/os/seastore/segment_cleaner.cc
src/crimson/os/seastore/segment_cleaner.h
src/crimson/os/seastore/transaction_manager.cc
src/test/crimson/seastore/test_btree_lba_manager.cc
src/test/crimson/seastore/test_seastore_journal.cc

index ad82a122db1115dfc2a032eb1317c12efd7a7672..0ea508b31517e7d8113d0bec68f38a50c4209dc3 100644 (file)
@@ -86,11 +86,8 @@ SegmentAllocator::do_open()
     auto new_journal_seq = journal_seq_t{
       new_segment_seq,
       paddr_t::make_seg_paddr(segment_id, written_to)};
-    if (type == segment_type_t::OOL) {
-      // FIXME: improve the special handling for OOL
-      segment_provider.update_segment_avail_bytes(
-          new_journal_seq.offset);
-    }
+    segment_provider.update_segment_avail_bytes(
+        type, new_journal_seq.offset);
     return sref->write(0, bl
     ).handle_error(
       open_ertr::pass_further{},
@@ -165,13 +162,11 @@ SegmentAllocator::write(ceph::bufferlist to_write)
     static_cast<seastore_off_t>(write_length)
   };
   written_to += write_length;
-  if (type == segment_type_t::OOL) {
-    // FIXME: improve the special handling for OOL
-    segment_provider.update_segment_avail_bytes(
-      paddr_t::make_seg_paddr(
-        current_segment->get_segment_id(), written_to)
-    );
-  }
+  segment_provider.update_segment_avail_bytes(
+    type,
+    paddr_t::make_seg_paddr(
+      current_segment->get_segment_id(), written_to)
+  );
   return current_segment->write(
     write_start_offset, to_write
   ).handle_error(
index 6acd8d16e63730e6a2e646d4f3cab984df33d912..9d8b2b54957ae6c96a6d5d70cd204997c60e632e 100644 (file)
@@ -20,15 +20,14 @@ SET_SUBSYS(seastore_cleaner);
 namespace crimson::os::seastore {
 
 void segment_info_t::set_open(
-    segment_seq_t _seq, segment_type_t _type, std::size_t seg_size)
+    segment_seq_t _seq, segment_type_t _type)
 {
   ceph_assert(_seq != NULL_SEG_SEQ);
   ceph_assert(_type != segment_type_t::NULL_SEG);
-  ceph_assert(seg_size > 0);
   state = Segment::segment_state_t::OPEN;
   seq = _seq;
   type = _type;
-  open_avail_bytes = seg_size;
+  written_to = 0;
 }
 
 void segment_info_t::set_empty()
@@ -38,7 +37,7 @@ void segment_info_t::set_empty()
   type = segment_type_t::NULL_SEG;
   last_modified = {};
   last_rewritten = {};
-  open_avail_bytes = 0;
+  written_to = 0;
 }
 
 void segment_info_t::set_closed()
@@ -48,14 +47,14 @@ void segment_info_t::set_closed()
 }
 
 void segment_info_t::init_closed(
-    segment_seq_t _seq, segment_type_t _type)
+    segment_seq_t _seq, segment_type_t _type, std::size_t seg_size)
 {
   ceph_assert(_seq != NULL_SEG_SEQ);
   ceph_assert(_type != segment_type_t::NULL_SEG);
   state = Segment::segment_state_t::CLOSED;
   seq = _seq;
   type = _type;
-  open_avail_bytes = 0;
+  written_to = seg_size;
 }
 
 std::ostream& operator<<(std::ostream &out, const segment_info_t &info)
@@ -69,7 +68,7 @@ std::ostream& operator<<(std::ostream &out, const segment_info_t &info)
         << ", type=" << info.type
         << ", last_modified=" << info.last_modified.time_since_epoch()
         << ", last_rewritten=" << info.last_rewritten.time_since_epoch()
-        << ", open_avail_bytes=" << info.open_avail_bytes;
+        << ", written_to=" << info.written_to;
   }
   return out << ")";
 }
@@ -80,7 +79,9 @@ void segments_info_t::reset()
 
   segment_size = 0;
 
+  journal_segment_id = NULL_SEG_ID;
   num_in_journal = 0;
+
   num_open = 0;
   num_empty = 0;
   num_closed = 0;
@@ -134,11 +135,13 @@ void segments_info_t::init_closed(
        segment, segment_seq_printer_t{seq}, type,
        segment_info, num_empty, num_open, num_closed);
   ceph_assert(segment_info.is_empty());
-  segment_info.init_closed(seq, type);
+  segment_info.init_closed(seq, type, get_segment_size());
   ceph_assert(num_empty > 0);
   --num_empty;
   ++num_closed;
   if (type == segment_type_t::JOURNAL) {
+    // init_closed won't initialize journal_segment_id
+    ceph_assert(get_journal_head() == JOURNAL_SEQ_NULL);
     ++num_in_journal;
   }
   ceph_assert(avail_bytes >= get_segment_size());
@@ -155,11 +158,18 @@ void segments_info_t::mark_open(
        segment, segment_seq_printer_t{seq}, type,
        segment_info, num_empty, num_open, num_closed);
   ceph_assert(segment_info.is_empty());
-  segment_info.set_open(seq, type, get_segment_size());
+  segment_info.set_open(seq, type);
   ceph_assert(num_empty > 0);
   --num_empty;
   ++num_open;
   if (type == segment_type_t::JOURNAL) {
+    if (journal_segment_id != NULL_SEG_ID) {
+      auto& last_journal_segment = segments[journal_segment_id];
+      ceph_assert(last_journal_segment.is_closed());
+      ceph_assert(last_journal_segment.type == segment_type_t::JOURNAL);
+      ceph_assert(last_journal_segment.seq + 1 == seq);
+    }
+    journal_segment_id = segment;
     ++num_in_journal;
   }
   ++count_open;
@@ -201,35 +211,40 @@ void segments_info_t::mark_closed(
   ceph_assert(num_open > 0);
   --num_open;
   ++num_closed;
-  ceph_assert(avail_bytes >= segment_info.open_avail_bytes);
-  avail_bytes -= segment_info.open_avail_bytes;
+  ceph_assert(get_segment_size() >= segment_info.written_to);
+  auto seg_avail_bytes = get_segment_size() - segment_info.written_to;
+  ceph_assert(avail_bytes >= seg_avail_bytes);
+  avail_bytes -= seg_avail_bytes;
   ++count_close;
 }
 
 void segments_info_t::update_written_to(
+    segment_type_t type,
     paddr_t offset)
 {
   LOG_PREFIX(segments_info_t::update_written_to);
   auto& saddr = offset.as_seg_paddr();
   auto& segment_info = segments[saddr.get_segment_id()];
   if (!segment_info.is_open()) {
-    ERROR("segment is not open, not updating, offset={}, {}",
-          offset, segment_info);
+    ERROR("segment is not open, not updating, type={}, offset={}, {}",
+          type, offset, segment_info);
     ceph_abort();
   }
 
-  auto new_avail = get_segment_size() - saddr.get_segment_off();
-  if (segment_info.open_avail_bytes < new_avail) {
-    ERROR("open_avail_bytes should not increase! offset={}, {}",
-          offset, segment_info);
+  auto new_written_to = static_cast<std::size_t>(saddr.get_segment_off());
+  ceph_assert(new_written_to <= get_segment_size());
+  if (segment_info.written_to > new_written_to) {
+    ERROR("written_to should not decrease! type={}, offset={}, {}",
+          type, offset, segment_info);
     ceph_abort();
   }
 
-  DEBUG("offset={}, {}", offset, segment_info);
-  auto avail_deduction = segment_info.open_avail_bytes - new_avail;
+  DEBUG("type={}, offset={}, {}", type, offset, segment_info);
+  ceph_assert(type == segment_info.type);
+  auto avail_deduction = new_written_to - segment_info.written_to;
   ceph_assert(avail_bytes >= avail_deduction);
   avail_bytes -= avail_deduction;
-  segment_info.open_avail_bytes = new_avail;
+  segment_info.written_to = new_written_to;
 }
 
 bool SpaceTrackerSimple::equals(const SpaceTrackerI &_other) const
@@ -499,6 +514,7 @@ void SegmentCleaner::update_journal_tail_target(
 
   journal_seq_t target = std::min(dirty_replay_from, alloc_replay_from);
   ceph_assert(target != JOURNAL_SEQ_NULL);
+  auto journal_head = segments.get_journal_head();
   ceph_assert(journal_head == JOURNAL_SEQ_NULL ||
               journal_head >= target);
   if (journal_tail_target == JOURNAL_SEQ_NULL ||
@@ -528,6 +544,7 @@ void SegmentCleaner::update_journal_tail_committed(journal_seq_t committed)
   if (committed == JOURNAL_SEQ_NULL) {
     return;
   }
+  auto journal_head = segments.get_journal_head();
   ceph_assert(journal_head == JOURNAL_SEQ_NULL ||
               journal_head >= committed);
 
@@ -902,7 +919,6 @@ SegmentCleaner::mount_ret SegmentCleaner::mount()
   stats = {};
   journal_tail_target = JOURNAL_SEQ_NULL;
   journal_tail_committed = JOURNAL_SEQ_NULL;
-  journal_head = JOURNAL_SEQ_NULL;
   dirty_extents_replay_from = JOURNAL_SEQ_NULL;
   alloc_info_replay_from = JOURNAL_SEQ_NULL;
   
@@ -1103,7 +1119,7 @@ void SegmentCleaner::complete_init()
 {
   LOG_PREFIX(SegmentCleaner::complete_init);
   INFO("done, start GC");
-  ceph_assert(journal_head != JOURNAL_SEQ_NULL);
+  ceph_assert(segments.get_journal_head() != JOURNAL_SEQ_NULL);
   init_complete = true;
   gc_process.start();
 }
index 1af9c443ad4f15697c6e69509c6e4452b57a5b59..5b3f617fd6725c242ebdb15d0fbdd8882c0f9a2f 100644 (file)
@@ -41,7 +41,7 @@ struct segment_info_t {
   time_point last_modified;
   time_point last_rewritten;
 
-  std::size_t open_avail_bytes = 0;
+  std::size_t written_to = 0;
 
   bool is_in_journal(journal_seq_t tail_committed) const {
     return type == segment_type_t::JOURNAL &&
@@ -60,9 +60,9 @@ struct segment_info_t {
     return state == Segment::segment_state_t::OPEN;
   }
 
-  void init_closed(segment_seq_t, segment_type_t);
+  void init_closed(segment_seq_t, segment_type_t, std::size_t);
 
-  void set_open(segment_seq_t, segment_type_t, std::size_t segment_size);
+  void set_open(segment_seq_t, segment_type_t);
 
   void set_empty();
 
@@ -141,6 +141,21 @@ public:
   std::size_t get_available_bytes() const {
     return avail_bytes;
   }
+  journal_seq_t get_journal_head() const {
+    if (unlikely(journal_segment_id == NULL_SEG_ID)) {
+      return JOURNAL_SEQ_NULL;
+    }
+    auto &segment_info = segments[journal_segment_id];
+    assert(!segment_info.is_empty());
+    assert(segment_info.type == segment_type_t::JOURNAL);
+    assert(segment_info.seq != NULL_SEG_SEQ);
+    return journal_seq_t{
+      segment_info.seq,
+      paddr_t::make_seg_paddr(
+        journal_segment_id,
+        segment_info.written_to)
+    };
+  }
 
   void reset();
 
@@ -155,7 +170,7 @@ public:
 
   void mark_closed(segment_id_t);
 
-  void update_written_to(paddr_t offset);
+  void update_written_to(segment_type_t, paddr_t);
 
   void update_last_modified_rewritten(
       segment_id_t id, time_point last_modified, time_point last_rewritten) {
@@ -168,7 +183,9 @@ private:
 
   std::size_t segment_size;
 
+  segment_id_t journal_segment_id;
   std::size_t num_in_journal;
+
   std::size_t num_open;
   std::size_t num_empty;
   std::size_t num_closed;
@@ -201,7 +218,7 @@ public:
 
   virtual void update_journal_tail_committed(journal_seq_t tail_committed) = 0;
 
-  virtual void update_segment_avail_bytes(paddr_t offset) = 0;
+  virtual void update_segment_avail_bytes(segment_type_t, paddr_t) = 0;
 
   virtual SegmentManagerGroup* get_segment_manager_group() = 0;
 
@@ -612,9 +629,6 @@ private:
   /// most recently committed journal_tail
   journal_seq_t journal_tail_committed;
 
-  /// head of journal
-  journal_seq_t journal_head;
-
   ExtentCallbackInterface *ecb = nullptr;
 
   /// populated if there is an IO blocked on hard limits
@@ -657,8 +671,8 @@ public:
 
   void update_journal_tail_committed(journal_seq_t committed) final;
 
-  void update_segment_avail_bytes(paddr_t offset) final {
-    segments.update_written_to(offset);
+  void update_segment_avail_bytes(segment_type_t type, paddr_t offset) final {
+    segments.update_written_to(type, offset);
     gc_process.maybe_wake_on_space_used();
   }
 
@@ -681,19 +695,11 @@ public:
   void update_alloc_info_replay_from(
     journal_seq_t alloc_replay_from);
 
-  void init_mkfs(journal_seq_t head) {
-    ceph_assert(head != JOURNAL_SEQ_NULL);
-    journal_tail_target = head;
-    journal_tail_committed = head;
-    journal_head = head;
-  }
-
-  void set_journal_head(journal_seq_t head) {
-    ceph_assert(head != JOURNAL_SEQ_NULL);
-    assert(journal_head == JOURNAL_SEQ_NULL || head >= journal_head);
-    journal_head = head;
-    segments.update_written_to(head.offset);
-    gc_process.maybe_wake_on_space_used();
+  void init_mkfs() {
+    auto journal_head = segments.get_journal_head();
+    ceph_assert(journal_head != JOURNAL_SEQ_NULL);
+    journal_tail_target = journal_head;
+    journal_tail_committed = journal_head;
   }
 
   using release_ertr = SegmentManagerGroup::release_ertr;
@@ -878,7 +884,7 @@ private:
     journal_seq_t limit);
 
   journal_seq_t get_dirty_tail() const {
-    auto ret = journal_head;
+    auto ret = segments.get_journal_head();
     ceph_assert(ret != JOURNAL_SEQ_NULL);
     if (ret.segment_seq >= config.target_journal_segments) {
       ret.segment_seq -= config.target_journal_segments;
@@ -890,7 +896,7 @@ private:
   }
 
   journal_seq_t get_dirty_tail_limit() const {
-    auto ret = journal_head;
+    auto ret = segments.get_journal_head();
     ceph_assert(ret != JOURNAL_SEQ_NULL);
     if (ret.segment_seq >= config.max_journal_segments) {
       ret.segment_seq -= config.max_journal_segments;
@@ -1148,7 +1154,7 @@ private:
        get_available_ratio(),
        should_block_on_gc(),
        gc_should_reclaim_space(),
-       journal_head,
+       segments.get_journal_head(),
        journal_tail_target,
        journal_tail_committed,
        get_dirty_tail(),
index dcbbe014b94a2b7f4a29e4b9dc0889239e3277f8..9c6ebce08b686b320ba092f7fa5dadb26fe82cb9 100644 (file)
@@ -48,8 +48,8 @@ TransactionManager::mkfs_ertr::future<> TransactionManager::mkfs()
   return segment_cleaner->mount(
   ).safe_then([this] {
     return journal->open_for_write();
-  }).safe_then([this](auto addr) {
-    segment_cleaner->init_mkfs(addr);
+  }).safe_then([this](auto) {
+    segment_cleaner->init_mkfs();
     return epm->open();
   }).safe_then([this, FNAME]() {
     return with_transaction_intr(
@@ -108,8 +108,7 @@ TransactionManager::mount_ertr::future<> TransactionManager::mount()
       });
   }).safe_then([this] {
     return journal->open_for_write();
-  }).safe_then([this, FNAME](auto addr) {
-    segment_cleaner->set_journal_head(addr);
+  }).safe_then([this, FNAME](auto) {
     return seastar::do_with(
       create_weak_transaction(
         Transaction::src_t::READ, "mount"),
@@ -371,8 +370,6 @@ TransactionManager::submit_transaction_direct(
       (auto submit_result) mutable {
       SUBDEBUGT(seastore_t, "committed with {}", tref, submit_result);
       auto start_seq = submit_result.write_result.start_seq;
-      auto end_seq = submit_result.write_result.get_end_seq();
-      segment_cleaner->set_journal_head(end_seq);
       if (seq_to_trim && *seq_to_trim != JOURNAL_SEQ_NULL) {
        cache->trim_backref_bufs(*seq_to_trim);
       }
index bde23e84f35ea31e794cccf75e030df83f8dd93c..49821be3b4ae9c2f29914d02c857627dbd4c7063 100644 (file)
@@ -75,7 +75,7 @@ struct btree_test_base :
 
   void update_journal_tail_committed(journal_seq_t committed) final {}
 
-  void update_segment_avail_bytes(paddr_t offset) final {}
+  void update_segment_avail_bytes(segment_type_t, paddr_t) final {}
 
   SegmentManagerGroup* get_segment_manager_group() final { return sms.get(); }
 
index 4641987810f5ccafa830a9583acee6459554d8f2..91600ca10b8c16cac7513f9a5138b70e3bfd6c07 100644 (file)
@@ -124,7 +124,7 @@ struct journal_test_t : seastar_test_suite_t, SegmentProvider {
 
   void update_journal_tail_committed(journal_seq_t paddr) final {}
 
-  void update_segment_avail_bytes(paddr_t offset) final {}
+  void update_segment_avail_bytes(segment_type_t, paddr_t) final {}
 
   SegmentManagerGroup* get_segment_manager_group() final { return sms.get(); }