]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore/journal: mark committed_to at the write boundary
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 5 Nov 2021 08:00:16 +0000 (16:00 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Tue, 9 Nov 2021 01:45:54 +0000 (09:45 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/journal.cc
src/crimson/os/seastore/journal.h

index b2d4aaf43ea084d52c3f61e77075849519816839..eb7084418a51517d5d9c8e2994002f85da878d79 100644 (file)
@@ -355,11 +355,16 @@ Journal::JournalSegmentManager::write(ceph::bufferlist to_write)
     write_start_seq,
     write_start_seq.offset.offset + write_length,
     write_length);
+  assert(write_length > 0);
   assert((write_length % segment_manager.get_block_size()) == 0);
   assert(!needs_roll(write_length));
 
   auto write_start_offset = written_to;
   written_to += write_length;
+  auto write_result = write_result_t{
+    write_start_seq,
+    static_cast<segment_off_t>(write_length)
+  };
   return current_journal_segment->write(
     write_start_offset, to_write
   ).handle_error(
@@ -367,8 +372,8 @@ Journal::JournalSegmentManager::write(ceph::bufferlist to_write)
     crimson::ct_error::assert_all{
       "Invalid error in JournalSegmentManager::write"
     }
-  ).safe_then([write_start_seq] {
-    return write_start_seq;
+  ).safe_then([write_result] {
+    return write_result;
   });
 }
 
@@ -418,8 +423,7 @@ Journal::JournalSegmentManager::initialize_segment(Segment& segment)
 
   written_to = 0;
   return write(bl
-  ).safe_then([this, new_tail, write_size=bl.length()
-              ](journal_seq_t write_start_seq) {
+  ).safe_then([this, new_tail](auto) {
     segment_provider->update_journal_tail_committed(new_tail);
   });
 }
@@ -442,25 +446,35 @@ Journal::RecordBatch::add_pending(
   auto new_encoded_length = get_encoded_length(rsize);
   assert(new_encoded_length < MAX_SEG_OFF);
   encoded_length = new_encoded_length;
+  bool is_first;
   if (state == state_t::EMPTY) {
     assert(!io_promise.has_value());
-    io_promise = seastar::shared_promise<std::optional<journal_seq_t> >();
+    io_promise = seastar::shared_promise<maybe_result_t>();
+    is_first = true;
   } else {
     assert(io_promise.has_value());
+    is_first = false;
   }
   state = state_t::PENDING;
 
   return io_promise->get_shared_future(
-  ).then([record_start_offset
-         ](auto batch_write_start) -> add_pending_ret {
-    if (!batch_write_start.has_value()) {
+  ).then([record_start_offset, is_first
+         ](auto maybe_write_result) -> add_pending_ret {
+    if (!maybe_write_result.has_value()) {
       return crimson::ct_error::input_output_error::make();
     }
-    auto record_write_start = batch_write_start.value();
-    record_write_start.offset.offset += record_start_offset;
+    auto write_result = maybe_write_result.value();
+    if (is_first) {
+      assert(record_start_offset == 0);
+    } else {
+      assert(record_start_offset > 0);
+      write_result.write_start_seq.offset.offset += record_start_offset;
+      // only the first record should update JournalSegmentManager::committed_to
+      write_result.write_length = 0;
+    }
     return add_pending_ret(
       add_pending_ertr::ready_future_marker{},
-      record_write_start);
+      write_result);
   });
 }
 
@@ -495,14 +509,14 @@ ceph::bufferlist Journal::RecordBatch::encode_records(
 }
 
 void Journal::RecordBatch::set_result(
-  std::optional<journal_seq_t> batch_write_start)
+  maybe_result_t maybe_write_result)
 {
-  if (batch_write_start.has_value()) {
+  if (maybe_write_result.has_value()) {
     logger().debug(
-      "Journal::RecordBatch::set_result: batches={}, write_start {} => {}",
+      "Journal::RecordBatch::set_result: batches={}, write_start {} + {}",
       records.size(),
-      *batch_write_start,
-      batch_write_start->offset.offset + encoded_length);
+      maybe_write_result->write_start_seq,
+      maybe_write_result->write_length);
   } else {
     logger().error(
       "Journal::RecordBatch::set_result: batches={}, write is failed!",
@@ -515,7 +529,7 @@ void Journal::RecordBatch::set_result(
   encoded_length = 0;
   records.clear();
   record_sizes.clear();
-  io_promise->set_value(batch_write_start);
+  io_promise->set_value(maybe_write_result);
   io_promise.reset();
 }
 
@@ -600,10 +614,10 @@ void Journal::RecordSubmitter::update_state()
 
 void Journal::RecordSubmitter::finish_submit_batch(
   RecordBatch* p_batch,
-  std::optional<journal_seq_t> result)
+  maybe_result_t maybe_result)
 {
   assert(p_batch->is_submitting());
-  p_batch->set_result(result);
+  p_batch->set_result(maybe_result);
   free_batch_ptrs.push_back(p_batch);
   decrement_io_with_flush();
 }
@@ -621,8 +635,8 @@ void Journal::RecordSubmitter::flush_current_batch()
     journal_segment_manager.get_committed_to(),
     journal_segment_manager.get_nonce());
   std::ignore = journal_segment_manager.write(to_write
-  ).safe_then([this, p_batch](journal_seq_t write_start) {
-    finish_submit_batch(p_batch, write_start);
+  ).safe_then([this, p_batch](auto write_result) {
+    finish_submit_batch(p_batch, write_result);
   }).handle_error(
     crimson::ct_error::all_same_way([this, p_batch](auto e) {
       logger().error(
@@ -638,23 +652,6 @@ void Journal::RecordSubmitter::flush_current_batch()
   });
 }
 
-seastar::future<std::pair<paddr_t, journal_seq_t>>
-Journal::RecordSubmitter::mark_record_committed_in_order(
-  OrderingHandle& handle,
-  const journal_seq_t& write_start_seq,
-  const record_size_t& rsize)
-{
-  return handle.enter(write_pipeline->finalize
-  ).then([this, write_start_seq, rsize] {
-    auto committed_to = write_start_seq;
-    committed_to.offset.offset += (rsize.mdlength + rsize.dlength);
-    journal_segment_manager.mark_committed(committed_to);
-    return std::make_pair(
-      write_start_seq.offset.add_offset(rsize.mdlength),
-      write_start_seq);
-  });
-}
-
 Journal::RecordSubmitter::submit_pending_ret
 Journal::RecordSubmitter::submit_pending(
   record_t&& record,
@@ -692,8 +689,18 @@ Journal::RecordSubmitter::submit_pending(
   return handle.enter(write_pipeline->device_submission
   ).then([write_fut=std::move(write_fut)]() mutable {
     return std::move(write_fut);
-  }).safe_then([this, &handle, rsize](journal_seq_t write_start) {
-    return mark_record_committed_in_order(handle, write_start, rsize);
+  }).safe_then([this, &handle, rsize](auto write_result) {
+    return handle.enter(write_pipeline->finalize
+    ).then([this, write_result, rsize] {
+      if (write_result.write_length > 0) {
+        auto committed_to = write_result.write_start_seq;
+        committed_to.offset.offset += write_result.write_length;
+        journal_segment_manager.mark_committed(committed_to);
+      }
+      return std::make_pair(
+        write_result.write_start_seq.offset.add_offset(rsize.mdlength),
+        write_result.write_start_seq);
+    });
   });
 }
 
index be7c49695617ad258414c8c8337a6b3a8ea8dc25..893968822c20bbd70f8dd87a1c9b63f09649f2c4 100644 (file)
@@ -179,8 +179,12 @@ private:
 
     // write the buffer, return the write start
     // May be called concurrently, writes may complete in any order.
+    struct write_result_t {
+      journal_seq_t write_start_seq;
+      segment_off_t write_length;
+    };
     using write_ertr = base_ertr;
-    using write_ret = write_ertr::future<journal_seq_t>;
+    using write_ret = write_ertr::future<write_result_t>;
     write_ret write(ceph::bufferlist to_write);
 
     // mark write committed in order
@@ -278,6 +282,10 @@ private:
 
     // Add to the batch, the future will be resolved after the batch is
     // written.
+    //
+    // Set write_result_t::write_length to 0 if the record is not the first one
+    // in the batch.
+    using add_pending_result_t = JournalSegmentManager::write_result_t;
     using add_pending_ertr = JournalSegmentManager::write_ertr;
     using add_pending_ret = JournalSegmentManager::write_ret;
     add_pending_ret add_pending(record_t&&, const record_size_t&);
@@ -289,7 +297,8 @@ private:
         segment_nonce_t segment_nonce);
 
     // Set the write result and reset for reuse
-    void set_result(std::optional<journal_seq_t> batch_write_start);
+    using maybe_result_t = std::optional<add_pending_result_t>;
+    void set_result(maybe_result_t maybe_write_result);
 
     // The fast path that is equivalent to submit a single record as a batch.
     //
@@ -319,8 +328,7 @@ private:
     segment_off_t encoded_length = 0;
     std::vector<record_t> records;
     std::vector<record_size_t> record_sizes;
-    std::optional<seastar::shared_promise<
-        std::optional<journal_seq_t> > > io_promise;
+    std::optional<seastar::shared_promise<maybe_result_t> > io_promise;
   };
 
   class RecordSubmitter {
@@ -404,14 +412,11 @@ private:
       free_batch_ptrs.pop_front();
     }
 
-    void finish_submit_batch(RecordBatch*, std::optional<journal_seq_t>);
+    using maybe_result_t = RecordBatch::maybe_result_t;
+    void finish_submit_batch(RecordBatch*, maybe_result_t);
 
     void flush_current_batch();
 
-    seastar::future<std::pair<paddr_t, journal_seq_t>>
-    mark_record_committed_in_order(
-        OrderingHandle&, const journal_seq_t&, const record_size_t&);
-
     using submit_pending_ertr = JournalSegmentManager::write_ertr;
     using submit_pending_ret = submit_pending_ertr::future<
       std::pair<paddr_t, journal_seq_t> >;