]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore/journal: fix updates to journal seq head and target 43853/head
authorYingxin Cheng <yingxin.cheng@intel.com>
Mon, 8 Nov 2021 03:52:50 +0000 (11:52 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Tue, 9 Nov 2021 01:45:54 +0000 (09:45 +0800)
* update journal sequences at the write boundary;
* update journal head to the sequence of write end;

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/journal.cc
src/crimson/os/seastore/journal.h
src/crimson/os/seastore/seastore_types.h
src/crimson/os/seastore/transaction_manager.cc
src/test/crimson/seastore/test_btree_lba_manager.cc
src/test/crimson/seastore/test_seastore_journal.cc

index eb7084418a51517d5d9c8e2994002f85da878d79..408acf2774197d97b65bf51b9c6c7ae9bc3bdd08 100644 (file)
@@ -207,9 +207,15 @@ Journal::replay_segment(
                     seq.segment_seq)) {
                  return replay_ertr::now();
                } else {
-                 return handler(
-                   journal_seq_t{seq.segment_seq, base},
+                 auto offsets = submit_result_t{
                    base.add_offset(header.mdlength),
+                   write_result_t{
+                     journal_seq_t{seq.segment_seq, base},
+                     static_cast<segment_off_t>(header.mdlength + header.dlength)
+                   }
+                 };
+                 return handler(
+                   offsets,
                    delta);
                }
              });
@@ -440,41 +446,33 @@ Journal::RecordBatch::add_pending(
   assert(state != state_t::SUBMITTING);
   assert(can_batch(rsize));
 
-  auto record_start_offset = encoded_length;
+  auto block_start_offset = encoded_length + rsize.mdlength;
   records.push_back(std::move(record));
   record_sizes.push_back(rsize);
   auto new_encoded_length = get_encoded_length(rsize);
   assert(new_encoded_length < MAX_SEG_OFF);
   encoded_length = new_encoded_length;
-  bool is_first;
   if (state == state_t::EMPTY) {
     assert(!io_promise.has_value());
     io_promise = seastar::shared_promise<maybe_result_t>();
-    is_first = true;
   } else {
     assert(io_promise.has_value());
-    is_first = false;
   }
   state = state_t::PENDING;
 
   return io_promise->get_shared_future(
-  ).then([record_start_offset, is_first
+  ).then([block_start_offset
          ](auto maybe_write_result) -> add_pending_ret {
     if (!maybe_write_result.has_value()) {
       return crimson::ct_error::input_output_error::make();
     }
-    auto write_result = maybe_write_result.value();
-    if (is_first) {
-      assert(record_start_offset == 0);
-    } else {
-      assert(record_start_offset > 0);
-      write_result.write_start_seq.offset.offset += record_start_offset;
-      // only the first record should update JournalSegmentManager::committed_to
-      write_result.write_length = 0;
-    }
+    auto submit_result = submit_result_t{
+      maybe_write_result->start_seq.offset.add_offset(block_start_offset),
+      *maybe_write_result
+    };
     return add_pending_ret(
       add_pending_ertr::ready_future_marker{},
-      write_result);
+      submit_result);
   });
 }
 
@@ -515,8 +513,9 @@ void Journal::RecordBatch::set_result(
     logger().debug(
       "Journal::RecordBatch::set_result: batches={}, write_start {} + {}",
       records.size(),
-      maybe_write_result->write_start_seq,
-      maybe_write_result->write_length);
+      maybe_write_result->start_seq,
+      maybe_write_result->length);
+    assert(maybe_write_result->length == encoded_length);
   } else {
     logger().error(
       "Journal::RecordBatch::set_result: batches={}, write is failed!",
@@ -673,7 +672,12 @@ Journal::RecordSubmitter::submit_pending(
         journal_segment_manager.get_committed_to(),
         journal_segment_manager.get_nonce());
       return journal_segment_manager.write(to_write
-      ).finally([this] {
+      ).safe_then([rsize](auto write_result) {
+        return submit_result_t{
+          write_result.start_seq.offset.add_offset(rsize.mdlength),
+          write_result
+        };
+      }).finally([this] {
         decrement_io_with_flush();
       });
     } else {
@@ -689,17 +693,12 @@ Journal::RecordSubmitter::submit_pending(
   return handle.enter(write_pipeline->device_submission
   ).then([write_fut=std::move(write_fut)]() mutable {
     return std::move(write_fut);
-  }).safe_then([this, &handle, rsize](auto write_result) {
+  }).safe_then([this, &handle](auto submit_result) {
     return handle.enter(write_pipeline->finalize
-    ).then([this, write_result, rsize] {
-      if (write_result.write_length > 0) {
-        auto committed_to = write_result.write_start_seq;
-        committed_to.offset.offset += write_result.write_length;
-        journal_segment_manager.mark_committed(committed_to);
-      }
-      return std::make_pair(
-        write_result.write_start_seq.offset.add_offset(rsize.mdlength),
-        write_result.write_start_seq);
+    ).then([this, submit_result] {
+      journal_segment_manager.mark_committed(
+          submit_result.write_result.get_end_seq());
+      return submit_result;
     });
   });
 }
index 893968822c20bbd70f8dd87a1c9b63f09649f2c4..dec7c89ab54f4203eec6ad5488e46db1cf2b1e44 100644 (file)
@@ -83,14 +83,26 @@ public:
   /**
    * submit_record
    *
-   * @param write record and returns offset of first block and seq
+   * write record with the ordering handle
    */
+  struct write_result_t {
+    journal_seq_t start_seq;
+    segment_off_t length;
+
+    journal_seq_t get_end_seq() const {
+      return start_seq.add_offset(length);
+    }
+  };
+  struct submit_result_t {
+    paddr_t record_block_base;
+    write_result_t write_result;
+  };
   using submit_record_ertr = crimson::errorator<
     crimson::ct_error::erange,
     crimson::ct_error::input_output_error
     >;
   using submit_record_ret = submit_record_ertr::future<
-    std::pair<paddr_t, journal_seq_t>
+    submit_result_t
     >;
   submit_record_ret submit_record(
     record_t &&record,
@@ -108,8 +120,7 @@ public:
   using replay_ertr = SegmentManager::read_ertr;
   using replay_ret = replay_ertr::future<>;
   using delta_handler_t = std::function<
-    replay_ret(journal_seq_t seq,
-              paddr_t record_block_base,
+    replay_ret(const submit_result_t&,
               const delta_info_t&)>;
   replay_ret replay(
     std::vector<std::pair<segment_id_t, segment_header_t>>&& segment_headers,
@@ -177,12 +188,8 @@ private:
     using roll_ertr = base_ertr;
     roll_ertr::future<> roll();
 
-    // write the buffer, return the write start
+    // write the buffer, return the write result
     // May be called concurrently, writes may complete in any order.
-    struct write_result_t {
-      journal_seq_t write_start_seq;
-      segment_off_t write_length;
-    };
     using write_ertr = base_ertr;
     using write_ret = write_ertr::future<write_result_t>;
     write_ret write(ceph::bufferlist to_write);
@@ -285,9 +292,8 @@ private:
     //
     // Set write_result_t::write_length to 0 if the record is not the first one
     // in the batch.
-    using add_pending_result_t = JournalSegmentManager::write_result_t;
     using add_pending_ertr = JournalSegmentManager::write_ertr;
-    using add_pending_ret = JournalSegmentManager::write_ret;
+    using add_pending_ret = add_pending_ertr::future<submit_result_t>;
     add_pending_ret add_pending(record_t&&, const record_size_t&);
 
     // Encode the batched records for write.
@@ -297,8 +303,8 @@ private:
         segment_nonce_t segment_nonce);
 
     // Set the write result and reset for reuse
-    using maybe_result_t = std::optional<add_pending_result_t>;
-    void set_result(maybe_result_t maybe_write_result);
+    using maybe_result_t = std::optional<write_result_t>;
+    void set_result(maybe_result_t maybe_write_end_seq);
 
     // The fast path that is equivalent to submit a single record as a batch.
     //
@@ -419,7 +425,7 @@ private:
 
     using submit_pending_ertr = JournalSegmentManager::write_ertr;
     using submit_pending_ret = submit_pending_ertr::future<
-      std::pair<paddr_t, journal_seq_t> >;
+      submit_result_t>;
     submit_pending_ret submit_pending(
         record_t&&, const record_size_t&, OrderingHandle &handle, bool flush);
 
index a7fc29a46ffc3ca2886e3d222dcb0894f387817a..8ce7e8d77b06392793af513a8e4ce10c4da691e7 100644 (file)
@@ -585,6 +585,10 @@ struct journal_seq_t {
   segment_seq_t segment_seq = 0;
   paddr_t offset;
 
+  journal_seq_t add_offset(segment_off_t o) const {
+    return {segment_seq, offset.add_offset(o)};
+  }
+
   DENC(journal_seq_t, v, p) {
     DENC_START(1, 1, p);
     denc(v.segment_seq, p);
index 5d52fa83d2c17c149913e1d5b29f5783478f04a2..643becf009b1d1c44c0d79fd9ecdd13d2c360e39 100644 (file)
@@ -75,11 +75,14 @@ TransactionManager::mount_ertr::future<> TransactionManager::mount()
     [this](auto&& segments) {
     return journal->replay(
       std::move(segments),
-      [this](auto seq, auto paddr, const auto &e) {
-      auto fut = cache->replay_delta(seq, paddr, e);
+      [this](const auto &offsets, const auto &e) {
+      auto start_seq = offsets.write_result.start_seq;
       segment_cleaner->update_journal_tail_target(
-       cache->get_oldest_dirty_from().value_or(seq));
-      return fut;
+          cache->get_oldest_dirty_from().value_or(start_seq));
+      return cache->replay_delta(
+          start_seq,
+          offsets.record_block_base,
+          e);
     });
   }).safe_then([this] {
     return journal->open_for_write();
@@ -273,14 +276,23 @@ TransactionManager::submit_transaction_direct(
     DEBUGT("about to submit to journal", tref);
 
     return journal->submit_record(std::move(record), tref.get_handle()
-    ).safe_then([this, FNAME, &tref](auto p) mutable {
-      auto [addr, journal_seq] = p;
-      DEBUGT("journal commit to {} seq {}", tref, addr, journal_seq);
-      segment_cleaner->set_journal_head(journal_seq);
-      cache->complete_commit(tref, addr, journal_seq, segment_cleaner.get());
+    ).safe_then([this, FNAME, &tref](auto submit_result) mutable {
+      auto start_seq = submit_result.write_result.start_seq;
+      auto end_seq = submit_result.write_result.get_end_seq();
+      DEBUGT("journal commit to record_block_base={}, start_seq={}, end_seq={}",
+             tref,
+             submit_result.record_block_base,
+             start_seq,
+             end_seq);
+      segment_cleaner->set_journal_head(end_seq);
+      cache->complete_commit(
+          tref,
+          submit_result.record_block_base,
+          start_seq,
+          segment_cleaner.get());
       lba_manager->complete_transaction(tref);
       segment_cleaner->update_journal_tail_target(
-       cache->get_oldest_dirty_from().value_or(journal_seq));
+       cache->get_oldest_dirty_from().value_or(start_seq));
       auto to_release = tref.get_segment_to_release();
       if (to_release != NULL_SEG_ID) {
        return segment_manager.release(to_release
index 4c4bec1478059c161fb7872a560a263ab137eac2..9bfa40e4f39555106aa4e589e9883828224c6fc8 100644 (file)
@@ -58,9 +58,11 @@ struct btree_test_base :
   {
     auto record = cache->prepare_record(*t);
     return journal->submit_record(std::move(record), t->get_handle()).safe_then(
-      [this, t=std::move(t)](auto p) mutable {
-       auto [addr, seq] = p;
-       cache->complete_commit(*t, addr, seq);
+      [this, t=std::move(t)](auto submit_result) mutable {
+       cache->complete_commit(
+            *t,
+            submit_result.record_block_base,
+            submit_result.write_result.start_seq);
        complete_commit(*t);
       }).handle_error(crimson::ct_error::assert_all{});
   }
index bb69c81750259dc009d926f37f7b415b00584443..366f3f3b9b8a1f13b8dfb13c16d30ac47b4a1bf5 100644 (file)
@@ -193,11 +193,11 @@ struct journal_test_t : seastar_test_suite_t, SegmentProvider {
     replay(
       [&advance,
        &delta_checker]
-      (auto seq, auto base, const auto &di) mutable {
+      (const auto &offsets, const auto &di) mutable {
        if (!delta_checker) {
          EXPECT_FALSE("No Deltas Left");
        }
-       if (!(*delta_checker)(base, di)) {
+       if (!(*delta_checker)(offsets.record_block_base, di)) {
          delta_checker = std::nullopt;
          advance();
        }