]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore: fix ordered updates to JournalSegmentManager::committed_to 43754/head
authorYingxin Cheng <yingxin.cheng@intel.com>
Mon, 1 Nov 2021 08:28:59 +0000 (16:28 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Thu, 4 Nov 2021 05:34:23 +0000 (13:34 +0800)
Journal segment should not update committed_to during rolling as there
might be still pending writes from the previous segment.

A side-effect here is that committed_to now needs to include
segment_seq_t to point to a previous segment.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/extent_placement_manager.h
src/crimson/os/seastore/extent_reader.cc
src/crimson/os/seastore/journal.cc
src/crimson/os/seastore/journal.h
src/crimson/os/seastore/seastore_types.cc
src/crimson/os/seastore/seastore_types.h
src/crimson/os/seastore/segment_cleaner.cc
src/crimson/os/seastore/segment_cleaner.h

index e8bde09b44d0cfa67f4a90179837e4664056b301..a691f036ee4af273bc57bcf86083fd53cfa25284 100644 (file)
@@ -69,7 +69,7 @@ public:
       extent_offset += extent.get_bptr().length();
     }
     assert(extent_offset == (segment_off_t)(base + rsize.mdlength + rsize.dlength));
-    return encode_record(rsize, std::move(record), block_size, base, nonce);
+    return encode_record(rsize, std::move(record), block_size, journal_seq_t(), nonce);
   }
   void add_extent(LogicalCachedExtentRef& extent) {
     extents.emplace_back(extent);
@@ -87,7 +87,7 @@ public:
   void set_base(segment_off_t b) {
     base = b;
   }
-  segment_off_t get_base() {
+  segment_off_t get_base() const {
     return base;
   }
   void clear() {
index 984be2d5198be37fc2107d0da5b6d2354ddc9c62..a3dca456bf54234b8cc9d33db958a8863fc1ce8b 100644 (file)
@@ -62,7 +62,7 @@ ExtentReader::scan_extents_ret ExtentReader::scan_extents(
 {
   auto ret = std::make_unique<scan_extents_ret_bare>();
   auto* extents = ret.get();
-  return read_segment_header(cursor.get_offset().segment
+  return read_segment_header(cursor.get_segment_id()
   ).handle_error(
     scan_extents_ertr::pass_further{},
     crimson::ct_error::assert_all{
@@ -114,9 +114,9 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
   found_record_handler_t &handler)
 {
   auto& segment_manager =
-    *segment_managers[cursor.offset.segment.device_id()];
-  if (cursor.offset.offset == 0) {
-    cursor.offset.offset = segment_manager.get_block_size();
+    *segment_managers[cursor.get_segment_id().device_id()];
+  if (cursor.get_segment_offset() == 0) {
+    cursor.increment(segment_manager.get_block_size());
   }
   auto retref = std::make_unique<size_t>(0);
   auto &budget_used = *retref;
@@ -125,30 +125,33 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
     -> scan_valid_records_ertr::future<seastar::stop_iteration> {
       return [=, &handler, &cursor, &budget_used] {
        if (!cursor.last_valid_header_found) {
-         return read_validate_record_metadata(cursor.offset, nonce
+         return read_validate_record_metadata(cursor.seq.offset, nonce
          ).safe_then([=, &cursor](auto md) {
            logger().debug(
              "ExtentReader::scan_valid_records: read complete {}",
-             cursor.offset);
+             cursor.seq);
            if (!md) {
              logger().debug(
                "ExtentReader::scan_valid_records: found invalid header at {}, presumably at end",
-               cursor.offset);
+               cursor.seq);
              cursor.last_valid_header_found = true;
              return scan_valid_records_ertr::now();
            } else {
+             auto new_committed_to = md->first.committed_to;
              logger().debug(
-               "ExtentReader::scan_valid_records: valid record read at {}",
-               cursor.offset);
-             cursor.last_committed = paddr_t{
-               cursor.offset.segment,
-               md->first.committed_to};
+               "ExtentReader::scan_valid_records: valid record read at {}, now committed at {}",
+               cursor.seq,
+               new_committed_to);
+             ceph_assert(cursor.last_committed == journal_seq_t() ||
+                         cursor.last_committed <= new_committed_to);
+             cursor.last_committed = new_committed_to;
              cursor.pending_records.emplace_back(
-               cursor.offset,
+               cursor.seq.offset,
                md->first,
                md->second);
-             cursor.offset.offset +=
-               md->first.dlength + md->first.mdlength;
+             cursor.increment(md->first.dlength + md->first.mdlength);
+             ceph_assert(new_committed_to == journal_seq_t() ||
+                         new_committed_to < cursor.seq);
              return scan_valid_records_ertr::now();
            }
          }).safe_then([=, &cursor, &budget_used, &handler] {
@@ -166,7 +169,9 @@ ExtentReader::scan_valid_records_ret ExtentReader::scan_valid_records(
                    seastar::stop_iteration>(seastar::stop_iteration::yes);
                }
                auto &next = cursor.pending_records.front();
-               if (next.offset > cursor.last_committed) {
+               journal_seq_t next_seq = {cursor.seq.segment_seq, next.offset};
+               if (cursor.last_committed == journal_seq_t() ||
+                   next_seq > cursor.last_committed) {
                  return scan_valid_records_ertr::make_ready_future<
                    seastar::stop_iteration>(seastar::stop_iteration::yes);
                }
index 55057b88bdf38c117c6791b8f09621917eafba9f..b2d4aaf43ea084d52c3f61e77075849519816839 100644 (file)
@@ -170,7 +170,7 @@ Journal::replay_segment(
 {
   logger().debug("Journal::replay_segment: starting at {}", seq);
   return seastar::do_with(
-    scan_valid_records_cursor(seq.offset),
+    scan_valid_records_cursor(seq),
     ExtentReader::found_record_handler_t(
       [=, &handler](paddr_t base,
                    const record_header_t &header,
@@ -378,13 +378,9 @@ void Journal::JournalSegmentManager::mark_committed(
   logger().debug(
     "JournalSegmentManager::mark_committed: committed_to {} => {}",
     committed_to, new_committed_to);
-  assert(new_committed_to.segment_seq <=
-         get_segment_seq());
-  if (new_committed_to.segment_seq ==
-      get_segment_seq()) {
-    assert(committed_to.offset.offset < new_committed_to.offset.offset);
-    committed_to = new_committed_to;
-  }
+  assert(committed_to == journal_seq_t() ||
+         committed_to <= new_committed_to);
+  committed_to = new_committed_to;
 }
 
 Journal::JournalSegmentManager::initialize_segment_ertr::future<>
@@ -401,7 +397,7 @@ Journal::JournalSegmentManager::initialize_segment(Segment& segment)
   auto header = segment_header_t{
     seq,
     segment.get_segment_id(),
-    segment_provider->get_journal_tail_target(),
+    new_tail,
     current_segment_nonce,
     false};
   logger().debug(
@@ -421,14 +417,9 @@ Journal::JournalSegmentManager::initialize_segment(Segment& segment)
   bl.append(bp);
 
   written_to = 0;
-  // FIXME: improve committed_to to point to another segment
-  committed_to = get_current_write_seq();
   return write(bl
   ).safe_then([this, new_tail, write_size=bl.length()
               ](journal_seq_t write_start_seq) {
-    auto committed_to = write_start_seq;
-    committed_to.offset.offset += write_size;
-    mark_committed(committed_to);
     segment_provider->update_journal_tail_committed(new_tail);
   });
 }
@@ -475,7 +466,7 @@ Journal::RecordBatch::add_pending(
 
 ceph::bufferlist Journal::RecordBatch::encode_records(
   size_t block_size,
-  segment_off_t committed_to,
+  const journal_seq_t& committed_to,
   segment_nonce_t segment_nonce)
 {
   logger().debug(
@@ -532,7 +523,7 @@ ceph::bufferlist Journal::RecordBatch::submit_pending_fast(
   record_t&& record,
   const record_size_t& rsize,
   size_t block_size,
-  segment_off_t committed_to,
+  const journal_seq_t& committed_to,
   segment_nonce_t segment_nonce)
 {
   logger().debug(
index 6e0aa3942ff84f3ca6314bbd688f95ba397883a2..be7c49695617ad258414c8c8337a6b3a8ea8dc25 100644 (file)
@@ -140,10 +140,8 @@ private:
       return current_segment_nonce;
     }
 
-    segment_off_t get_committed_to() const {
-      assert(committed_to.segment_seq ==
-             get_segment_seq());
-      return committed_to.offset.offset;
+    journal_seq_t get_committed_to() const {
+      return committed_to;
     }
 
     segment_seq_t get_segment_seq() const {
@@ -287,7 +285,7 @@ private:
     // Encode the batched records for write.
     ceph::bufferlist encode_records(
         size_t block_size,
-        segment_off_t committed_to,
+        const journal_seq_t& committed_to,
         segment_nonce_t segment_nonce);
 
     // Set the write result and reset for reuse
@@ -304,7 +302,7 @@ private:
         record_t&&,
         const record_size_t&,
         size_t block_size,
-        segment_off_t committed_to,
+        const journal_seq_t& committed_to,
         segment_nonce_t segment_nonce);
 
   private:
index 63fa9759f3e9d18941d2f56dbd956c9e8a6faf36..cafe6fe6a8de0823955c72c86c1c6abeed79f4a9 100644 (file)
@@ -150,7 +150,7 @@ ceph::bufferlist encode_record(
   record_size_t rsize,
   record_t &&record,
   size_t block_size,
-  segment_off_t committed_to,
+  const journal_seq_t& committed_to,
   segment_nonce_t current_segment_nonce)
 {
   bufferlist data_bl;
index 226010713884a6ea0293ff2740760ca24105c4b7..a7fc29a46ffc3ca2886e3d222dcb0894f387817a 100644 (file)
@@ -1143,11 +1143,11 @@ struct record_header_t {
   // Fixed portion
   extent_len_t  mdlength;       // block aligned, length of metadata
   extent_len_t  dlength;        // block aligned, length of data
-  uint32_t deltas;                // number of deltas
-  uint32_t extents;               // number of extents
+  uint32_t deltas;              // number of deltas
+  uint32_t extents;             // number of extents
   segment_nonce_t segment_nonce;// nonce of containing segment
-  segment_off_t committed_to;   // records in this segment prior to committed_to
-                                // have been fully written
+  journal_seq_t committed_to;   // records prior to committed_to have been
+                                // fully written, maybe in another segment.
   checksum_t data_crc;          // crc of data payload
 
 
@@ -1188,14 +1188,14 @@ ceph::bufferlist encode_record(
   record_size_t rsize,
   record_t &&record,
   size_t block_size,
-  segment_off_t committed_to,
+  const journal_seq_t& committed_to,
   segment_nonce_t current_segment_nonce = 0);
 
 /// scan segment for end incrementally
 struct scan_valid_records_cursor {
   bool last_valid_header_found = false;
-  paddr_t offset;
-  paddr_t last_committed;
+  journal_seq_t seq;
+  journal_seq_t last_committed;
 
   struct found_record_t {
     paddr_t offset;
@@ -1214,13 +1214,21 @@ struct scan_valid_records_cursor {
     return last_valid_header_found && pending_records.empty();
   }
 
-  paddr_t get_offset() const {
-    return offset;
+  segment_id_t get_segment_id() const {
+    return seq.offset.segment;
+  }
+
+  segment_off_t get_segment_offset() const {
+    return seq.offset.offset;
+  }
+
+  void increment(segment_off_t off) {
+    seq.offset.offset += off;
   }
 
   scan_valid_records_cursor(
-    paddr_t offset)
-    : offset(offset) {}
+    journal_seq_t seq)
+    : seq(seq) {}
 };
 
 }
index 23e0aebc648d303f88e95d6def7ec41320a89881..e209cf47fbf9f822070009d61a6ea13cd9031d30 100644 (file)
@@ -321,20 +321,18 @@ SegmentCleaner::gc_trim_journal_ret SegmentCleaner::gc_trim_journal()
 SegmentCleaner::gc_reclaim_space_ret SegmentCleaner::gc_reclaim_space()
 {
   if (!scan_cursor) {
-    paddr_t next = P_ADDR_NULL;
-    next.segment = get_next_gc_target();
-    if (next == P_ADDR_NULL) {
+    journal_seq_t next = get_next_gc_target();
+    if (next == journal_seq_t()) {
       logger().debug(
        "SegmentCleaner::do_gc: no segments to gc");
       return seastar::now();
     }
-    next.offset = 0;
     scan_cursor =
       std::make_unique<ExtentReader::scan_extents_cursor>(
        next);
     logger().debug(
       "SegmentCleaner::do_gc: starting gc on segment {}",
-      scan_cursor->get_offset().segment);
+      scan_cursor->seq);
   } else {
     ceph_assert(!scan_cursor->is_complete());
   }
@@ -384,7 +382,7 @@ SegmentCleaner::gc_reclaim_space_ret SegmentCleaner::gc_reclaim_space()
             });
           }).si_then([this, &t] {
             if (scan_cursor->is_complete()) {
-              t.mark_segment_to_release(scan_cursor->get_offset().segment);
+              t.mark_segment_to_release(scan_cursor->get_segment_id());
             }
             return ecb->submit_transaction_direct(t);
           });
index aaac218395cf944cfccfacba25747458ad2e3765..c1732aca6ce097198b05ce5fe24452e9aab00a64 100644 (file)
@@ -683,10 +683,6 @@ public:
 
   void update_journal_tail_target(journal_seq_t target);
 
-  void init_journal_tail(journal_seq_t tail) {
-    journal_tail_target = journal_tail_committed = tail;
-  }
-
   void init_mkfs(journal_seq_t head) {
     journal_tail_target = head;
     journal_tail_committed = head;
@@ -773,30 +769,32 @@ public:
     assert(ret >= 0);
   }
 
-  segment_id_t get_next_gc_target() const {
-    segment_id_t ret = NULL_SEG_ID;
+  journal_seq_t get_next_gc_target() const {
+    segment_id_t id = NULL_SEG_ID;
     segment_seq_t seq = NULL_SEG_SEQ;
     int64_t least_live_bytes = std::numeric_limits<int64_t>::max();
     for (auto it = segments.begin();
         it != segments.end();
         ++it) {
-      auto id = it->first;
+      auto _id = it->first;
       const auto& segment_info = it->second;
       if (segment_info.is_closed() &&
          !segment_info.is_in_journal(journal_tail_committed) &&
-         space_tracker->get_usage(id) < least_live_bytes) {
-       ret = id;
+         space_tracker->get_usage(_id) < least_live_bytes) {
+       id = _id;
        seq = segment_info.journal_segment_seq;
        least_live_bytes = space_tracker->get_usage(id);
       }
     }
-    if (ret != NULL_SEG_ID) {
+    if (id != NULL_SEG_ID) {
       crimson::get_logger(ceph_subsys_seastore).debug(
        "SegmentCleaner::get_next_gc_target: segment {} seq {}",
-       ret,
+       id,
        seq);
+      return journal_seq_t{seq, {id, 0}};
+    } else {
+      return journal_seq_t();
     }
-    return ret;
   }
 
   SpaceTrackerIRef get_empty_space_tracker() const {
@@ -987,7 +985,7 @@ private:
     if (!scan_cursor)
       return 0;
 
-    return scan_cursor->get_offset().offset;
+    return scan_cursor->get_segment_offset();
   }
 
   /// Returns free space available for writes