]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore/journal: allow pending i/o in a full record_submitter when rollin...
authorYingxin Cheng <yingxin.cheng@intel.com>
Mon, 27 Feb 2023 08:14:16 +0000 (16:14 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Mon, 27 Feb 2023 08:57:03 +0000 (16:57 +0800)
Fixes: https://tracker.ceph.com/issues/58824
Signed-off-by: Xuehan Xu <xxhdx1985126@gmail.com>
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/extent_placement_manager.cc
src/crimson/os/seastore/extent_placement_manager.h
src/crimson/os/seastore/journal/segment_allocator.cc
src/crimson/os/seastore/journal/segment_allocator.h

index 518a64be59e265ae8b202be05ad968f88d8f5afb..6805bde616a898d76e7e8df40952000bc6e86ee0 100644 (file)
@@ -32,7 +32,8 @@ SegmentedOolWriter::alloc_write_ertr::future<>
 SegmentedOolWriter::write_record(
   Transaction& t,
   record_t&& record,
-  std::list<LogicalCachedExtentRef>&& extents)
+  std::list<LogicalCachedExtentRef>&& extents,
+  bool with_atomic_roll_segment)
 {
   LOG_PREFIX(SegmentedOolWriter::write_record);
   assert(extents.size());
@@ -46,7 +47,9 @@ SegmentedOolWriter::write_record(
   stats.md_bytes += record.size.get_raw_mdlength();
   stats.num_records += 1;
 
-  return record_submitter.submit(std::move(record)
+  return record_submitter.submit(
+    std::move(record),
+    with_atomic_roll_segment
   ).safe_then([this, FNAME, &t, extents=std::move(extents)
               ](record_locator_t ret) mutable {
     DEBUGT("{} finish with {} and {} extents",
@@ -101,7 +104,8 @@ SegmentedOolWriter::do_write(
         assert(record_submitter.check_action(record.size) !=
                action_t::ROLL);
         fut_write = write_record(
-            t, std::move(record), std::move(pending_extents));
+            t, std::move(record), std::move(pending_extents),
+            true/* with_atomic_roll_segment */);
       }
       return trans_intr::make_interruptible(
         record_submitter.roll_segment(
index e4e59de06151c06d4ce76221c65ac2addf5cbcc2..7393dff3689e9fa04bf829256b8e65cb97a51089 100644 (file)
@@ -84,7 +84,8 @@ private:
   alloc_write_ertr::future<> write_record(
     Transaction& t,
     record_t&& record,
-    std::list<LogicalCachedExtentRef> &&extents);
+    std::list<LogicalCachedExtentRef> &&extents,
+    bool with_atomic_roll_segment=false);
 
   journal::SegmentAllocator segment_allocator;
   journal::RecordSubmitter record_submitter;
index f2cf9fd2dccc3ad24985024d1b26a10bf2c9d691..05d29d7532463b3b72b3b4fb426f30c916823057 100644 (file)
@@ -412,16 +412,18 @@ bool RecordSubmitter::is_available() const
              !has_io_error;
 #ifndef NDEBUG
   if (ret) {
-    // invariants when available
+    // unconditional invariants
     ceph_assert(segment_allocator.can_write());
     ceph_assert(p_current_batch != nullptr);
     ceph_assert(!p_current_batch->is_submitting());
+    // the current batch accepts a further write
     ceph_assert(!p_current_batch->needs_flush());
     if (!p_current_batch->is_empty()) {
       auto submit_length =
         p_current_batch->get_submit_size().get_encoded_length();
       ceph_assert(!segment_allocator.needs_roll(submit_length));
     }
+    // I'm not rolling
   }
 #endif
   return ret;
@@ -466,7 +468,8 @@ RecordSubmitter::roll_segment_ertr::future<>
 RecordSubmitter::roll_segment()
 {
   LOG_PREFIX(RecordSubmitter::roll_segment);
-  assert(is_available());
+  assert(p_current_batch->needs_flush() ||
+         is_available());
   // #1 block concurrent submissions due to rolling
   wait_available_promise = seastar::shared_promise<>();
   assert(!wait_unfull_flush_promise.has_value());
@@ -521,7 +524,9 @@ RecordSubmitter::roll_segment()
 }
 
 RecordSubmitter::submit_ret
-RecordSubmitter::submit(record_t&& record)
+RecordSubmitter::submit(
+    record_t&& record,
+    bool with_atomic_roll_segment)
 {
   LOG_PREFIX(RecordSubmitter::submit);
   assert(is_available());
@@ -574,16 +579,22 @@ RecordSubmitter::submit(record_t&& record)
             get_name(),
             p_current_batch->get_num_records(),
             num_outstanding_io);
-      wait_available_promise = seastar::shared_promise<>();
-      assert(!wait_unfull_flush_promise.has_value());
-      wait_unfull_flush_promise = seastar::promise<>();
-      // flush and mark available in background
-      std::ignore = wait_unfull_flush_promise->get_future(
-      ).finally([FNAME, this] {
-        DEBUG("{} flush done, available", get_name());
-        wait_available_promise->set_value();
-        wait_available_promise.reset();
-      });
+      if (with_atomic_roll_segment) {
+        // wait_available_promise and wait_unfull_flush_promise
+        // need to be delegated to the follow-up atomic roll_segment();
+        assert(p_current_batch->is_pending());
+      } else {
+        wait_available_promise = seastar::shared_promise<>();
+        assert(!wait_unfull_flush_promise.has_value());
+        wait_unfull_flush_promise = seastar::promise<>();
+        // flush and mark available in background
+        std::ignore = wait_unfull_flush_promise->get_future(
+        ).finally([FNAME, this] {
+          DEBUG("{} flush done, available", get_name());
+          wait_available_promise->set_value();
+          wait_available_promise.reset();
+        });
+      }
     } else {
       DEBUG("{} added pending, flush", get_name());
       flush_current_batch();
index 7faf2e1c45973e79fb0208d3a219a21dfd2c062f..e113cd2aecf7c5e2caa797762d0144fc43ecfbe2 100644 (file)
@@ -348,7 +348,7 @@ public:
   // when available, submit the record if possible
   using submit_ertr = base_ertr;
   using submit_ret = submit_ertr::future<record_locator_t>;
-  submit_ret submit(record_t&&);
+  submit_ret submit(record_t&&, bool with_atomic_roll_segment=false);
 
   void update_committed_to(const journal_seq_t& new_committed_to) {
     assert(new_committed_to != JOURNAL_SEQ_NULL);