]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore/epm: simplify gating writes for Writer
authorYingxin Cheng <yingxin.cheng@intel.com>
Tue, 15 Feb 2022 01:28:04 +0000 (09:28 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Thu, 24 Feb 2022 12:24:26 +0000 (20:24 +0800)
Dropped open_segment_wrapper_t.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/extent_placement_manager.cc
src/crimson/os/seastore/extent_placement_manager.h

index 796c63ce932e46bf9bb6828ef88da326a567a601..3d92d4b329e6e83e77789c43150b5957843609b8 100644 (file)
@@ -37,25 +37,23 @@ SegmentedAllocator::Writer::_write(
   Transaction& t,
   ool_record_t& record)
 {
+  LOG_PREFIX(SegmentedAllocator::Writer::_write);
   auto record_size = record.get_encoded_record_length();
   allocated_to += record_size.get_encoded_length();
   segment_provider.update_segment_avail_bytes(
     paddr_t::make_seg_paddr(
-      current_segment->segment->get_segment_id(),
+      current_segment->get_segment_id(),
       allocated_to));
   bufferlist bl = record.encode(
-      current_segment->segment->get_segment_id(),
+      current_segment->get_segment_id(),
       0);
-  seastar::promise<> pr;
-  current_segment->inflight_writes.emplace_back(pr.get_future());
-  LOG_PREFIX(SegmentedAllocator::Writer::_write);
 
   DEBUGT(
     "written {} extents, {} bytes to segment {} at {}",
     t,
     record.get_num_extents(),
     bl.length(),
-    current_segment->segment->get_segment_id(),
+    current_segment->get_segment_id(),
     record.get_base());
 
   // account transactional ool writes before write()
@@ -68,18 +66,10 @@ SegmentedAllocator::Writer::_write(
   stats.num_records += 1;
 
   return trans_intr::make_interruptible(
-    current_segment->segment->write(record.get_base(), bl).safe_then(
-      [this, FNAME, pr=std::move(pr), &t,
-       it=(--current_segment->inflight_writes.end()),
-       cs=current_segment]() mutable {
-        if (cs->outdated) {
-          DEBUGT("segment rolled", t);
-          pr.set_value();
-        } else{
-          DEBUGT("segment not rolled", t);
-          current_segment->inflight_writes.erase(it);
-        }
-        return seastar::now();
+    current_segment->write(record.get_base(), bl
+    ).safe_then([FNAME, &t, &record, cs=current_segment] {
+      DEBUGT("written {} {}",
+             t, cs->get_segment_id(), record.get_base());
     })
   ).si_then([FNAME, &record, &t] {
     for (auto& ool_extent : record.get_extents()) {
@@ -140,7 +130,7 @@ SegmentedAllocator::Writer::write(
                   "end of segment, writing {} extents to segment {} at {}",
                   t,
                   num_extents,
-                  current_segment->segment->get_segment_id(),
+                  current_segment->get_segment_id(),
                   allocated_to);
                 return (num_extents ?
                         _write(t, record) :
@@ -160,7 +150,7 @@ SegmentedAllocator::Writer::write(
               "writing {} extents to segment {} at {}",
               t,
               record.get_num_extents(),
-              current_segment->segment->get_segment_id(),
+              current_segment->get_segment_id(),
               allocated_to);
             return _write(t, record);
           }
@@ -173,22 +163,26 @@ SegmentedAllocator::Writer::write(
     });
   };
 
-  if (rolling_segment) {
-    return segment_rotation_guard.wait([this] {
-        return !rolling_segment;
-      }, std::move(write_func));
+  return seastar::with_gate(write_guard,
+    [this, write_func=std::move(write_func)]() mutable
+  {
+    if (rolling_segment) {
+      return segment_rotation_guard.wait([this] {
+          return !rolling_segment;
+        }, std::move(write_func));
 
-  } else if (!current_segment) {
-    return trans_intr::make_interruptible(roll_segment(true)).si_then(
-      [write_func=std::move(write_func)] {
-      return write_func();
-    });
-  }
-  return write_func();
+    } else if (!current_segment) {
+      return trans_intr::make_interruptible(roll_segment(true)).si_then(
+        [write_func=std::move(write_func)] {
+        return write_func();
+      });
+    }
+    return write_func();
+  });
 }
 
 bool SegmentedAllocator::Writer::_needs_roll(seastore_off_t length) const {
-  return allocated_to + length > current_segment->segment->get_write_capacity();
+  return allocated_to + length > current_segment->get_write_capacity();
 }
 
 SegmentedAllocator::Writer::init_segment_ertr::future<>
@@ -225,44 +219,39 @@ SegmentedAllocator::Writer::roll_segment(bool set_rolling) {
     rolling_segment = true;
   }
   assert(rolling_segment);
-  if (current_segment) {
-    (void) seastar::with_gate(writer_guard, [this] {
-      auto fut = seastar::now();
-      if (!current_segment->inflight_writes.empty()) {
-        fut = seastar::when_all_succeed(
-          current_segment->inflight_writes.begin(),
-          current_segment->inflight_writes.end());
-      }
-      current_segment->outdated = true;
-      return fut.then(
-        [cs=std::move(current_segment), this, it=(--open_segments.end())] {
-        return cs->segment->close().safe_then([this, cs, it] {
-          LOG_PREFIX(SegmentedAllocator::Writer::roll_segment);
-          assert((*it).get() == cs.get());
-          segment_provider.close_segment(cs->segment->get_segment_id());
-          open_segments.erase(it);
-          DEBUG("closed segment: {}", cs->segment->get_segment_id());
+  return [this, FNAME] {
+    if (current_segment) {
+      auto seg_to_close = std::move(current_segment);
+      if (write_guard.is_closed()) {
+        DEBUG("write_guard is closed, should be stopping");
+        return seg_to_close->close(
+        ).safe_then([seg_to_close=std::move(seg_to_close)] {});
+      } else {
+        DEBUG("rolling OOL segment, close {} ...", seg_to_close->get_segment_id());
+        (void) seastar::with_gate(write_guard,
+          [this, seg_to_close=std::move(seg_to_close)]() mutable
+        {
+          return seg_to_close->close(
+          ).safe_then([this, seg_to_close=std::move(seg_to_close)] {
+            segment_provider.close_segment(seg_to_close->get_segment_id());
+          });
         });
-      });
-    }).handle_exception_type([](seastar::gate_closed_exception e) {
-      LOG_PREFIX(SegmentedAllocator::Writer::roll_segment);
-      DEBUG(" writer_guard closed, should be stopping");
-      return seastar::now();
-    });
-  }
-
-  auto new_segment_id = segment_provider.get_segment(
-      segment_manager.get_device_id(), OOL_SEG_SEQ);
-  return segment_manager.open(new_segment_id
-  ).safe_then([this](auto segref) {
-    LOG_PREFIX(SegmentedAllocator::Writer::roll_segment);
+        return Segment::close_ertr::now();
+      }
+    } else {
+      DEBUG("rolling OOL segment, no current ...");
+      return Segment::close_ertr::now();
+    }
+  }().safe_then([this] {
+    auto new_segment_id = segment_provider.get_segment(
+        segment_manager.get_device_id(), OOL_SEG_SEQ);
+    return segment_manager.open(new_segment_id);
+  }).safe_then([this, FNAME](auto segref) {
     DEBUG("opened new segment: {}", segref->get_segment_id());
-    return init_segment(*segref).safe_then([segref=std::move(segref), this] {
-      LOG_PREFIX(SegmentedAllocator::Writer::roll_segment);
-      assert(!current_segment.get());
-      current_segment.reset(new open_segment_wrapper_t());
-      current_segment->segment = segref;
-      open_segments.emplace_back(current_segment);
+    return init_segment(*segref
+    ).safe_then([segref=std::move(segref), this, FNAME] {
+      assert(!current_segment);
+      current_segment = segref;
       rolling_segment = false;
       segment_rotation_guard.broadcast();
       DEBUG("inited new segment: {}", segref->get_segment_id());
index 485ed4a069b8a4bc98c02ef73ea493278ae9e223..5d97534f858869a5d29f56e142f1f8d20f4b9861 100644 (file)
@@ -151,17 +151,6 @@ public:
 };
 using ExtentAllocatorRef = std::unique_ptr<ExtentAllocator>;
 
-struct open_segment_wrapper_t : public boost::intrusive_ref_counter<
-  open_segment_wrapper_t,
-  boost::thread_unsafe_counter> {
-  SegmentRef segment;
-  std::list<seastar::future<>> inflight_writes;
-  bool outdated = false;
-};
-
-using open_segment_wrapper_ref =
-  boost::intrusive_ptr<open_segment_wrapper_t>;
-
 class SegmentProvider;
 
 /**
@@ -191,11 +180,14 @@ class SegmentedAllocator : public ExtentAllocator {
     write_iertr::future<> write(
       Transaction& t,
       std::list<LogicalCachedExtentRef>& extent) final;
+
     stop_ertr::future<> stop() final {
-      return writer_guard.close().then([this] {
-        return crimson::do_for_each(open_segments, [](auto& seg_wrapper) {
-          return seg_wrapper->segment->close();
-        });
+      return write_guard.close().then([this] {
+        if (current_segment) {
+          return current_segment->close();
+        } else {
+          return Segment::close_ertr::now();
+        }
       });
     }
   private:
@@ -219,11 +211,10 @@ class SegmentedAllocator : public ExtentAllocator {
 
     SegmentProvider& segment_provider;
     SegmentManager& segment_manager;
-    open_segment_wrapper_ref current_segment;
-    std::list<open_segment_wrapper_ref> open_segments;
+    SegmentRef current_segment;
     seastore_off_t allocated_to = 0;
     crimson::condition_variable segment_rotation_guard;
-    seastar::gate writer_guard;
+    seastar::gate write_guard;
     bool rolling_segment = false;
   };
 public: