]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore/epm: replace condition_variable by a shared_promise
authorYingxin Cheng <yingxin.cheng@intel.com>
Tue, 15 Feb 2022 14:26:18 +0000 (22:26 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Thu, 24 Feb 2022 12:39:39 +0000 (20:39 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/extent_placement_manager.cc
src/crimson/os/seastore/extent_placement_manager.h

index 3d92d4b329e6e83e77789c43150b5957843609b8..5ba17f6038bb63a88fc4cce9314f84fe0004685e 100644 (file)
@@ -38,6 +38,7 @@ SegmentedAllocator::Writer::_write(
   ool_record_t& record)
 {
   LOG_PREFIX(SegmentedAllocator::Writer::_write);
+  record.set_base(allocated_to);
   auto record_size = record.get_encoded_record_length();
   allocated_to += record_size.get_encoded_length();
   segment_provider.update_segment_avail_bytes(
@@ -65,119 +66,103 @@ SegmentedAllocator::Writer::_write(
   stats.data_bytes += record_size.dlength;
   stats.num_records += 1;
 
+  for (auto& ool_extent : record.get_extents()) {
+    auto& lextent = ool_extent.get_lextent();
+    auto paddr = ool_extent.get_ool_paddr();
+    TRACET("ool extent written at {} -- {}", t, *lextent, paddr);
+    lextent->hint = placement_hint_t::NUM_HINTS; // invalidate hint
+    t.mark_delayed_extent_ool(lextent, paddr);
+  }
+
   return trans_intr::make_interruptible(
     current_segment->write(record.get_base(), bl
-    ).safe_then([FNAME, &t, &record, cs=current_segment] {
-      DEBUGT("written {} {}",
-             t, cs->get_segment_id(), record.get_base());
+    ).safe_then([FNAME, &t, base=record.get_base(), cs=current_segment] {
+      DEBUGT("written {} {}", t, cs->get_segment_id(), base);
     })
-  ).si_then([FNAME, &record, &t] {
-    for (auto& ool_extent : record.get_extents()) {
-      auto& lextent = ool_extent.get_lextent();
-      auto paddr = ool_extent.get_ool_paddr();
-      TRACET("ool extent written at {} -- {}", t, *lextent, paddr);
-      lextent->hint = placement_hint_t::NUM_HINTS; // invalidate hint
-      t.mark_delayed_extent_ool(lextent, paddr);
-    }
-    record.clear();
-  });
-}
-
-void SegmentedAllocator::Writer::add_extent_to_write(
-  ool_record_t& record,
-  LogicalCachedExtentRef& extent) {
-  logger().debug(
-    "SegmentedAllocator::Writer::add_extent_to_write: "
-    "add extent {} to record",
-    extent);
-  extent->prepare_write();
-  record.add_extent(extent);
+  );
 }
 
 SegmentedAllocator::Writer::write_iertr::future<>
-SegmentedAllocator::Writer::write(
+SegmentedAllocator::Writer::do_write(
   Transaction& t,
   std::list<LogicalCachedExtentRef>& extents)
 {
-  auto write_func = [this, &extents, &t] {
-    return seastar::do_with(ool_record_t(segment_manager.get_block_size()),
-      [this, &extents, &t](auto& record) {
-      return trans_intr::repeat([this, &record, &t, &extents]()
-        -> write_iertr::future<seastar::stop_iteration> {
-        if (extents.empty()) {
-          return seastar::make_ready_future<
-            seastar::stop_iteration>(seastar::stop_iteration::yes);
+  LOG_PREFIX(SegmentedAllocator::Writer::do_write);
+  assert(!extents.empty());
+  if (roll_promise.has_value()) {
+    return roll_promise->get_shared_future(
+    ).then([this, &t, &extents] {
+      return do_write(t, extents);
+    });
+  }
+  assert(current_segment);
+
+  ool_record_t record(segment_manager.get_block_size());
+  for (auto it = extents.begin(); it != extents.end();) {
+    auto& extent = *it;
+    auto wouldbe_length = record.get_wouldbe_encoded_record_length(extent);
+    if (_needs_roll(wouldbe_length)) {
+      // reached the segment end, write and roll
+      assert(!roll_promise.has_value());
+      roll_promise = seastar::shared_promise<>();
+      auto num_extents = record.get_num_extents();
+      DEBUGT(
+        "end of segment, writing {} extents to segment {} at {}",
+        t,
+        num_extents,
+        current_segment->get_segment_id(),
+        allocated_to);
+      return (num_extents ?
+              _write(t, record) :
+              write_iertr::now()
+      ).si_then([this] {
+        return roll_segment();
+      }).finally([this] {
+        roll_promise->set_value();
+        roll_promise.reset();
+      }).si_then([this, &t, &extents] {
+        if (!extents.empty()) {
+          return do_write(t, extents);
         }
-
-        return segment_rotation_guard.wait(
-          [this] {
-            return !rolling_segment;
-          },
-          [this, &record, &extents, &t]() -> write_iertr::future<> {
-            LOG_PREFIX(SegmentedAllocator::Writer::write);
-            record.set_base(allocated_to);
-            for (auto it = extents.begin();
-                 it != extents.end();) {
-              auto& extent = *it;
-              auto wouldbe_length =
-                record.get_wouldbe_encoded_record_length(extent);
-              if (_needs_roll(wouldbe_length)) {
-                // reached the segment end, write and roll
-                assert(!rolling_segment);
-                rolling_segment = true;
-                auto num_extents = record.get_num_extents();
-                DEBUGT(
-                  "end of segment, writing {} extents to segment {} at {}",
-                  t,
-                  num_extents,
-                  current_segment->get_segment_id(),
-                  allocated_to);
-                return (num_extents ?
-                        _write(t, record) :
-                        write_iertr::now()
-                ).si_then([this]() mutable {
-                  return roll_segment(false);
-                }).finally([this] {
-                  rolling_segment = false;
-                  segment_rotation_guard.broadcast();
-                });
-              }
-              add_extent_to_write(record, extent);
-              it = extents.erase(it);
-            }
-
-            DEBUGT(
-              "writing {} extents to segment {} at {}",
-              t,
-              record.get_num_extents(),
-              current_segment->get_segment_id(),
-              allocated_to);
-            return _write(t, record);
-          }
-        ).si_then([]()
-          -> write_iertr::future<seastar::stop_iteration> {
-          return seastar::make_ready_future<
-            seastar::stop_iteration>(seastar::stop_iteration::no);
-        });
+        return write_iertr::now();
       });
-    });
-  };
+    }
+    DEBUGT("add extent to record -- {}", t, *extent);
+    extent->prepare_write();
+    record.add_extent(extent);
+    it = extents.erase(it);
+  }
 
-  return seastar::with_gate(write_guard,
-    [this, write_func=std::move(write_func)]() mutable
-  {
-    if (rolling_segment) {
-      return segment_rotation_guard.wait([this] {
-          return !rolling_segment;
-        }, std::move(write_func));
+  DEBUGT(
+    "writing {} extents to segment {} at {}",
+    t,
+    record.get_num_extents(),
+    current_segment->get_segment_id(),
+    allocated_to);
+  return _write(t, record);
+}
 
-    } else if (!current_segment) {
-      return trans_intr::make_interruptible(roll_segment(true)).si_then(
-        [write_func=std::move(write_func)] {
-        return write_func();
+SegmentedAllocator::Writer::write_iertr::future<>
+SegmentedAllocator::Writer::write(
+  Transaction& t,
+  std::list<LogicalCachedExtentRef>& extents)
+{
+  if (extents.empty()) {
+    return write_iertr::now();
+  }
+  return seastar::with_gate(write_guard, [this, &t, &extents] {
+    if (!roll_promise.has_value() && !current_segment) {
+      roll_promise = seastar::shared_promise<>();
+      return trans_intr::make_interruptible(
+        roll_segment().finally([this] {
+          roll_promise->set_value();
+          roll_promise.reset();
+        })
+      ).si_then([this, &t, &extents] {
+        return do_write(t, extents);
       });
     }
-    return write_func();
+    return do_write(t, extents);
   });
 }
 
@@ -212,13 +197,10 @@ SegmentedAllocator::Writer::init_segment(Segment& segment) {
 }
 
 SegmentedAllocator::Writer::roll_segment_ertr::future<>
-SegmentedAllocator::Writer::roll_segment(bool set_rolling) {
+SegmentedAllocator::Writer::roll_segment() {
   LOG_PREFIX(SegmentedAllocator::Writer::roll_segment);
-  DEBUG("set_rolling {}", set_rolling);
-  if (set_rolling) {
-    rolling_segment = true;
-  }
-  assert(rolling_segment);
+  DEBUG("start");
+  assert(roll_promise.has_value());
   return [this, FNAME] {
     if (current_segment) {
       auto seg_to_close = std::move(current_segment);
@@ -252,8 +234,6 @@ SegmentedAllocator::Writer::roll_segment(bool set_rolling) {
     ).safe_then([segref=std::move(segref), this, FNAME] {
       assert(!current_segment);
       current_segment = segref;
-      rolling_segment = false;
-      segment_rotation_guard.broadcast();
       DEBUG("inited new segment: {}", segref->get_segment_id());
     });
   }).handle_error(
index 5d97534f858869a5d29f56e142f1f8d20f4b9861..c582f04e5892a39aee05f54a5c974e7f245261d8 100644 (file)
@@ -4,8 +4,8 @@
 #pragma once
 
 #include "seastar/core/gate.hh"
+#include "seastar/core/shared_future.hh"
 
-#include "crimson/common/condition_variable.h"
 #include "crimson/os/seastore/cached_extent.h"
 #include "crimson/os/seastore/logging.h"
 #include "crimson/os/seastore/segment_manager.h"
@@ -191,6 +191,10 @@ class SegmentedAllocator : public ExtentAllocator {
       });
     }
   private:
+    write_iertr::future<> do_write(
+      Transaction& t,
+      std::list<LogicalCachedExtentRef>& extent);
+
     bool _needs_roll(seastore_off_t length) const;
 
     write_iertr::future<> _write(
@@ -199,23 +203,18 @@ class SegmentedAllocator : public ExtentAllocator {
 
     using roll_segment_ertr = crimson::errorator<
       crimson::ct_error::input_output_error>;
-    roll_segment_ertr::future<> roll_segment(bool);
+    roll_segment_ertr::future<> roll_segment();
 
     using init_segment_ertr = crimson::errorator<
       crimson::ct_error::input_output_error>;
     init_segment_ertr::future<> init_segment(Segment& segment);
 
-    void add_extent_to_write(
-      ool_record_t&,
-      LogicalCachedExtentRef& extent);
-
     SegmentProvider& segment_provider;
     SegmentManager& segment_manager;
     SegmentRef current_segment;
     seastore_off_t allocated_to = 0;
-    crimson::condition_variable segment_rotation_guard;
+    std::optional<seastar::shared_promise<>> roll_promise;
     seastar::gate write_guard;
-    bool rolling_segment = false;
   };
 public:
   SegmentedAllocator(