]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore: wait ool writes in DeviceSubmission phase
authorYingxin Cheng <yingxin.cheng@intel.com>
Tue, 23 Jul 2024 09:11:44 +0000 (17:11 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Wed, 7 Aug 2024 04:51:53 +0000 (12:51 +0800)
So that it is moved out of the collection lock.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/extent_placement_manager.cc
src/crimson/os/seastore/extent_placement_manager.h
src/crimson/os/seastore/journal/circular_bounded_journal.cc
src/crimson/os/seastore/journal/segmented_journal.cc
src/crimson/os/seastore/ordering_handle.h

index 1ac7c68484b1761ea02798fc58575202213e5df5..a958fbdfd028a0f9aca7b43fe6311fbf61db496e 100644 (file)
@@ -28,8 +28,7 @@ SegmentedOolWriter::SegmentedOolWriter(
 {
 }
 
-SegmentedOolWriter::alloc_write_ertr::future<>
-SegmentedOolWriter::write_record(
+void SegmentedOolWriter::write_record(
   Transaction& t,
   record_t&& record,
   std::list<LogicalCachedExtentRef>&& extents,
@@ -63,15 +62,20 @@ SegmentedOolWriter::write_record(
     extent_addr = extent_addr.as_seg_paddr().add_offset(
         extent->get_length());
   }
-  return std::move(ret.future
-  ).safe_then([this, FNAME, &t,
-               record_base=ret.record_base_regardless_md
-              ](record_locator_t ret) {
-    TRACET("{} finish {}=={}",
-           t, segment_allocator.get_name(), ret, record_base);
-    // ool won't write metadata, so the paddrs must be equal
-    assert(ret.record_block_base == record_base.offset);
+  // t might be destructed inside write_future
+  auto write_future = seastar::with_gate(write_guard,
+    [this, FNAME, tid=t.get_trans_id(),
+     record_base=ret.record_base_regardless_md,
+     submit_fut=std::move(ret.future)]() mutable {
+    return std::move(submit_fut
+    ).safe_then([this, FNAME, tid, record_base](record_locator_t ret) {
+      TRACE("trans.{} {} finish {}=={}",
+            tid, segment_allocator.get_name(), ret, record_base);
+      // ool won't write metadata, so the paddrs must be equal
+      assert(ret.record_block_base == record_base.offset);
+    });
   });
+  t.get_handle().add_write_future(std::move(write_future));
 }
 
 SegmentedOolWriter::alloc_write_iertr::future<>
@@ -108,19 +112,15 @@ SegmentedOolWriter::do_write(
       DEBUGT("{} extents={} submit {} extents and roll, unavailable ...",
              t, segment_allocator.get_name(),
              extents.size(), num_extents);
-      auto fut_write = alloc_write_ertr::now();
       if (num_extents > 0) {
         assert(record_submitter.check_action(record.size) !=
                action_t::ROLL);
-        fut_write = write_record(
+        write_record(
             t, std::move(record), std::move(pending_extents),
             true/* with_atomic_roll_segment */);
       }
       return trans_intr::make_interruptible(
-        record_submitter.roll_segment(
-        ).safe_then([fut_write=std::move(fut_write)]() mutable {
-          return std::move(fut_write);
-        })
+        record_submitter.roll_segment()
       ).si_then([this, &t, &extents] {
         return do_write(t, extents);
       });
@@ -151,15 +151,12 @@ SegmentedOolWriter::do_write(
       DEBUGT("{} extents={} submit {} extents ...",
              t, segment_allocator.get_name(),
              extents.size(), pending_extents.size());
-      return trans_intr::make_interruptible(
-        write_record(t, std::move(record), std::move(pending_extents))
-      ).si_then([this, &t, &extents] {
-        if (!extents.empty()) {
-          return do_write(t, extents);
-        } else {
-          return alloc_write_iertr::now();
-        }
-      });
+      write_record(t, std::move(record), std::move(pending_extents));
+      if (!extents.empty()) {
+        return do_write(t, extents);
+      } else {
+        return alloc_write_iertr::now();
+      }
     }
     // SUBMIT_NOT_FULL: evaluate the next extent
   }
@@ -169,8 +166,8 @@ SegmentedOolWriter::do_write(
          t, segment_allocator.get_name(),
          num_extents);
   assert(num_extents > 0);
-  return trans_intr::make_interruptible(
-    write_record(t, std::move(record), std::move(pending_extents)));
+  write_record(t, std::move(record), std::move(pending_extents));
+  return alloc_write_iertr::now();
 }
 
 SegmentedOolWriter::alloc_write_iertr::future<>
@@ -994,13 +991,11 @@ RandomBlockOolWriter::alloc_write_ool_extents(
   if (extents.empty()) {
     return alloc_write_iertr::now();
   }
-  return seastar::with_gate(write_guard, [this, &t, &extents] {
-    return do_write(t, extents);
-  });
+  do_write(t, extents);
+  return alloc_write_iertr::now();
 }
 
-RandomBlockOolWriter::alloc_write_iertr::future<>
-RandomBlockOolWriter::do_write(
+void RandomBlockOolWriter::do_write(
   Transaction& t,
   std::list<CachedExtentRef>& extents)
 {
@@ -1053,8 +1048,10 @@ RandomBlockOolWriter::do_write(
     }
   }
 
-  return trans_intr::make_interruptible(
-    seastar::do_with(std::move(writes),
+  // t might be destructed inside write_future
+  auto write_future = seastar::with_gate(write_guard,
+    [writes=std::move(writes)]() mutable {
+    return seastar::do_with(std::move(writes),
       [](auto& writes) {
       return crimson::do_for_each(writes,
         [](auto& info) {
@@ -1065,8 +1062,9 @@ RandomBlockOolWriter::do_write(
             "Invalid error when writing record"}
         );
       });
-    })
-  );
+    });
+  });
+  t.get_handle().add_write_future(std::move(write_future));
 }
 
 }
index 7c4110c053ef5060d1860c73e4ae4198997928e0..0f2d55ef04aeb5d99ee829aae1fb7dcf28de8b60 100644 (file)
@@ -115,7 +115,7 @@ private:
     Transaction& t,
     std::list<CachedExtentRef> &extent);
 
-  alloc_write_ertr::future<> write_record(
+  void write_record(
     Transaction& t,
     record_t&& record,
     std::list<LogicalCachedExtentRef> &&extents,
@@ -195,7 +195,7 @@ private:
     ceph::bufferptr bp;
     RandomBlockManager* rbm;
   };
-  alloc_write_iertr::future<> do_write(
+  void do_write(
     Transaction& t,
     std::list<CachedExtentRef> &extent);
 
index 9ee8b1b997f0ae39118a19e1c91802eec6504984..4da70f72c4cd334edf120d8fbfa0d2ac5abdfaa6 100644 (file)
@@ -97,7 +97,9 @@ CircularBoundedJournal::do_submit_record(
   auto submit_ret = record_submitter.submit(std::move(record));
   // submit_ret.record_base_regardless_md is wrong for journaling
   return handle.enter(write_pipeline->device_submission
-  ).then([submit_fut=std::move(submit_ret.future)]() mutable {
+  ).then([&handle] {
+    return handle.take_write_future();
+  }).safe_then([submit_fut=std::move(submit_ret.future)]() mutable {
     return std::move(submit_fut);
   }).safe_then([FNAME, this, &handle](record_locator_t result) {
     return handle.enter(write_pipeline->finalize
index eca45f113c25c2f18c4f9c445346d8246d965964..81e8c5a62c781f3e2bb3a211dd51260fc3efbc06 100644 (file)
@@ -396,7 +396,9 @@ SegmentedJournal::do_submit_record(
     auto submit_ret = record_submitter.submit(std::move(record));
     // submit_ret.record_base_regardless_md is wrong for journaling
     return handle.enter(write_pipeline->device_submission
-    ).then([submit_fut=std::move(submit_ret.future)]() mutable {
+    ).then([&handle] {
+      return handle.take_write_future();
+    }).safe_then([submit_fut=std::move(submit_ret.future)]() mutable {
       return std::move(submit_fut);
     }).safe_then([FNAME, this, &handle](record_locator_t result) {
       return handle.enter(write_pipeline->finalize
index 8ab8442acd9f7acc6b8073beb6bc5ce556fa47b3..cfa8620587552104013a5bf3559fcc918b0e7f8c 100644 (file)
@@ -122,6 +122,11 @@ struct OrderingHandle {
   std::unique_ptr<OperationProxy> op;
   seastar::shared_mutex *collection_ordering_lock = nullptr;
 
+  using write_ertr = crimson::errorator<
+      crimson::ct_error::input_output_error>;
+  // the pending writes that should complete at DeviceSubmission phase
+  write_ertr::future<> write_future = write_ertr::now();
+
   // in the future we might add further constructors / template to type
   // erasure while extracting the location of tracking events.
   OrderingHandle(std::unique_ptr<OperationProxy> op) : op(std::move(op)) {}
@@ -144,6 +149,20 @@ struct OrderingHandle {
     }
   }
 
+  void add_write_future(write_ertr::future<>&& fut) {
+    auto appended = std::move(write_future
+    ).safe_then([fut=std::move(fut)]() mutable {
+      return std::move(fut);
+    });
+    write_future = std::move(appended);
+  }
+
+  write_ertr::future<> take_write_future() {
+    auto ret = std::move(write_future);
+    write_future = write_ertr::now();
+    return ret;
+  }
+
   template <typename T>
   seastar::future<> enter(T &t) {
     return op->enter(t);
@@ -151,6 +170,10 @@ struct OrderingHandle {
 
   void exit() {
     op->exit();
+
+    auto ignore_writes = std::move(write_future);
+    std::ignore = ignore_writes;
+    write_future = write_ertr::now();
   }
 
   seastar::future<> complete() {
@@ -159,6 +182,9 @@ struct OrderingHandle {
 
   ~OrderingHandle() {
     maybe_release_collection_lock();
+
+    assert(write_future.available());
+    assert(!write_future.failed());
   }
 };