]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore/EPM: integrate Writer with RecordSubmitter
authorYingxin Cheng <yingxin.cheng@intel.com>
Mon, 14 Mar 2022 08:05:33 +0000 (16:05 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Fri, 18 Mar 2022 06:13:46 +0000 (14:13 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/extent_placement_manager.cc
src/crimson/os/seastore/extent_placement_manager.h

index e838fdd08ceb10791c59b0680efb9015395579ca..fcb9cb3947859e84a9201397b693dcff05338110 100644 (file)
@@ -3,7 +3,9 @@
 
 #include "crimson/os/seastore/extent_placement_manager.h"
 
-SET_SUBSYS(seastore_tm);
+#include "crimson/common/config_proxy.h"
+
+SET_SUBSYS(seastore_journal);
 
 namespace crimson::os::seastore {
 
@@ -22,35 +24,43 @@ SegmentedAllocator::SegmentedAllocator(
   );
 }
 
+SegmentedAllocator::Writer::Writer(
+  SegmentProvider& sp,
+  SegmentManager& sm)
+  : segment_allocator("OOL", segment_type_t::OOL, sp, sm),
+    record_submitter(crimson::common::get_conf<uint64_t>(
+                       "seastore_journal_iodepth_limit"),
+                     crimson::common::get_conf<uint64_t>(
+                       "seastore_journal_batch_capacity"),
+                     crimson::common::get_conf<Option::size_t>(
+                       "seastore_journal_batch_flush_size"),
+                     crimson::common::get_conf<double>(
+                       "seastore_journal_batch_preferred_fullness"),
+                     segment_allocator)
+{
+}
+
 SegmentedAllocator::Writer::open_ertr::future<>
 SegmentedAllocator::Writer::open()
 {
   return segment_allocator.open().discard_result();
 }
 
-SegmentedAllocator::Writer::write_iertr::future<>
-SegmentedAllocator::Writer::_write(
+SegmentedAllocator::Writer::write_ertr::future<>
+SegmentedAllocator::Writer::write_record(
   Transaction& t,
   record_t&& record,
   std::list<LogicalCachedExtentRef>&& extents)
 {
-  LOG_PREFIX(SegmentedAllocator::Writer::_write);
+  LOG_PREFIX(SegmentedAllocator::Writer::write_record);
   assert(extents.size());
   assert(extents.size() == record.extents.size());
   assert(!record.deltas.size());
-  auto record_group = record_group_t(
-      std::move(record), segment_allocator.get_block_size());
-  auto record_size = record_group.size;
-  ceph::bufferlist bl = encode_records(
-      record_group,
-      JOURNAL_SEQ_NULL,
-      segment_allocator.get_nonce()); // 0
-  assert(bl.length() == record_size.get_encoded_length());
-
-  DEBUGT("writing {} bytes to segment {}",
-         t, bl.length(), segment_allocator.get_segment_id());
 
   // account transactional ool writes before write()
+  // TODO: drop the incorrect size and fix the metrics
+  auto record_size = record_group_size_t(
+      record.size, segment_allocator.get_block_size());
   auto& stats = t.get_ool_write_stats();
   stats.extents.num += extents.size();
   stats.extents.bytes += record_size.dlength;
@@ -59,22 +69,23 @@ SegmentedAllocator::Writer::_write(
   stats.data_bytes += record_size.dlength;
   stats.num_records += 1;
 
-  return trans_intr::make_interruptible(
-    segment_allocator.write(bl)
-  ).si_then([FNAME, record_size, &t,
-             extents=std::move(extents)](write_result_t wr) mutable {
-    assert(wr.start_seq.segment_seq == OOL_SEG_SEQ);
-    paddr_t extent_addr = wr.start_seq.offset;
-    extent_addr = extent_addr.as_seg_paddr().add_offset(
-        record_size.get_mdlength());
+  return record_submitter.submit(std::move(record)
+  ).safe_then([this, FNAME, &t, extents=std::move(extents)
+              ](record_locator_t ret) mutable {
+    assert(ret.write_result.start_seq.segment_seq == OOL_SEG_SEQ);
+    DEBUGT("{} finish with {} and {} extents",
+           t, segment_allocator.get_name(),
+           ret, extents.size());
+    paddr_t extent_addr = ret.record_block_base;
     for (auto& extent : extents) {
-      TRACET("ool extent written at {} -- {}", t, *extent, extent_addr);
+      TRACET("{} ool extent written at {} -- {}",
+             t, segment_allocator.get_name(),
+             extent_addr, *extent);
       extent->hint = placement_hint_t::NUM_HINTS; // invalidate hint
       t.mark_delayed_extent_ool(extent, extent_addr);
       extent_addr = extent_addr.as_seg_paddr().add_offset(
           extent->get_length());
     }
-    assert(extent_addr == wr.get_end_seq().offset);
   });
 }
 
@@ -85,15 +96,16 @@ SegmentedAllocator::Writer::do_write(
 {
   LOG_PREFIX(SegmentedAllocator::Writer::do_write);
   assert(!extents.empty());
-  if (roll_promise.has_value()) {
+  if (!record_submitter.is_available()) {
+    DEBUGT("{} extents={} wait ...",
+           t, segment_allocator.get_name(),
+           extents.size());
     return trans_intr::make_interruptible(
-      roll_promise->get_shared_future()
-    ).then_interruptible([this, &t, &extents] {
+      record_submitter.wait_available()
+    ).si_then([this, &t, &extents] {
       return do_write(t, extents);
     });
   }
-  assert(segment_allocator.can_write());
-
   record_t record;
   std::list<LogicalCachedExtentRef> pending_extents;
 
@@ -113,32 +125,33 @@ SegmentedAllocator::Writer::do_write(
     auto& extent = *it;
     record_size_t wouldbe_rsize = record.size;
     wouldbe_rsize.account_extent(extent->get_bptr().length());
-    auto wouldbe_length = record_group_size_t(
-      wouldbe_rsize, segment_allocator.get_block_size()
-    ).get_encoded_length();
-    if (segment_allocator.needs_roll(wouldbe_length)) {
-      // reached the segment end, write and roll
-      assert(!roll_promise.has_value());
-      roll_promise = seastar::shared_promise<>();
+    using action_t = journal::RecordSubmitter::action_t;
+    action_t action = record_submitter.check_action(wouldbe_rsize);
+    if (action == action_t::ROLL) {
       auto num_extents = pending_extents.size();
-      DEBUGT("end of segment, writing {} extents", t, num_extents);
-      return (num_extents ?
-              _write(t, std::move(record), std::move(pending_extents)) :
-              write_iertr::now()
-      ).si_then([this] {
-        return segment_allocator.roll();
-      }).finally([this] {
-        roll_promise->set_value();
-        roll_promise.reset();
-      }).si_then([this, &t, &extents] {
-        if (!extents.empty()) {
-          return do_write(t, extents);
-        }
-        return write_iertr::now();
+      DEBUGT("{} extents={} submit {} extents and roll, unavailable ...",
+             t, segment_allocator.get_name(),
+             extents.size(), num_extents);
+      auto fut_write = write_ertr::now();
+      if (num_extents > 0) {
+        assert(record_submitter.check_action(record.size) !=
+               action_t::ROLL);
+        fut_write = write_record(
+            t, std::move(record), std::move(pending_extents));
+      }
+      return trans_intr::make_interruptible(
+        record_submitter.roll_segment(
+        ).safe_then([fut_write=std::move(fut_write)]() mutable {
+          return std::move(fut_write);
+        })
+      ).si_then([this, &t, &extents] {
+        return do_write(t, extents);
       });
     }
 
-    DEBUGT("add extent to record -- {}", t, *extent);
+    TRACET("{} extents={} add extent to record -- {}",
+           t, segment_allocator.get_name(),
+           extents.size(), *extent);
     if (commit_type == record_commit_type_t::MODIFY) {
       extent->set_last_modified(commit_time);
     } else {
@@ -156,10 +169,32 @@ SegmentedAllocator::Writer::do_write(
       extent->get_last_modified().time_since_epoch().count()});
     pending_extents.push_back(extent);
     it = extents.erase(it);
+
+    assert(record_submitter.check_action(record.size) == action);
+    if (action == action_t::SUBMIT_FULL) {
+      DEBUGT("{} extents={} submit {} extents ...",
+             t, segment_allocator.get_name(),
+             extents.size(), pending_extents.size());
+      return trans_intr::make_interruptible(
+        write_record(t, std::move(record), std::move(pending_extents))
+      ).si_then([this, &t, &extents] {
+        if (!extents.empty()) {
+          return do_write(t, extents);
+        } else {
+          return write_iertr::now();
+        }
+      });
+    }
+    // SUBMIT_NOT_FULL: evaluate the next extent
   }
 
-  DEBUGT("writing {} extents", t, pending_extents.size());
-  return _write(t, std::move(record), std::move(pending_extents));
+  auto num_extents = pending_extents.size();
+  DEBUGT("{} submit the rest {} extents ...",
+         t, segment_allocator.get_name(),
+         num_extents);
+  assert(num_extents > 0);
+  return trans_intr::make_interruptible(
+    write_record(t, std::move(record), std::move(pending_extents)));
 }
 
 SegmentedAllocator::Writer::write_iertr::future<>
index f9444bf7eb303c0ba25b680b68b16dfd93077995..263dc0cf42181d631f067e4657a5aba524495987 100644 (file)
@@ -4,7 +4,6 @@
 #pragma once
 
 #include "seastar/core/gate.hh"
-#include "seastar/core/shared_future.hh"
 
 #include "crimson/os/seastore/cached_extent.h"
 #include "crimson/os/seastore/journal/segment_allocator.h"
@@ -27,7 +26,8 @@ public:
   using open_ertr = base_ertr;
   virtual open_ertr::future<> open() = 0;
 
-  using write_iertr = trans_iertr<base_ertr>;
+  using write_ertr = base_ertr;
+  using write_iertr = trans_iertr<write_ertr>;
   virtual write_iertr::future<> write(
     Transaction& t,
     std::list<LogicalCachedExtentRef>& extent) = 0;
@@ -75,9 +75,7 @@ class SegmentProvider;
 class SegmentedAllocator : public ExtentAllocator {
   class Writer : public ExtentOolWriter {
   public:
-    Writer(SegmentProvider& sp, SegmentManager& sm)
-      : segment_allocator("OOL", segment_type_t::OOL, sp, sm) {}
-
+    Writer(SegmentProvider& sp, SegmentManager& sm);
     Writer(Writer &&) = default;
 
     open_ertr::future<> open() final;
@@ -99,13 +97,13 @@ class SegmentedAllocator : public ExtentAllocator {
       Transaction& t,
       std::list<LogicalCachedExtentRef>& extent);
 
-    write_iertr::future<> _write(
+    write_ertr::future<> write_record(
       Transaction& t,
       record_t&& record,
       std::list<LogicalCachedExtentRef>&& extents);
 
     journal::SegmentAllocator segment_allocator;
-    std::optional<seastar::shared_promise<>> roll_promise;
+    journal::RecordSubmitter record_submitter;
     seastar::gate write_guard;
   };
 public:
@@ -131,7 +129,7 @@ public:
     Transaction& t,
     std::list<LogicalCachedExtentRef>& extents) final {
     LOG_PREFIX(SegmentedAllocator::alloc_ool_extents_paddr);
-    SUBDEBUGT(seastore_tm, "start", t);
+    SUBDEBUGT(seastore_journal, "start", t);
     return seastar::do_with(
       std::map<Writer*, std::list<LogicalCachedExtentRef>>(),
       [this, extents=std::move(extents), &t](auto& alloc_map) {
@@ -164,7 +162,7 @@ public:
   void add_allocator(device_type_t type, ExtentAllocatorRef&& allocator) {
     allocators[type].emplace_back(std::move(allocator));
     LOG_PREFIX(ExtentPlacementManager::add_allocator);
-    SUBDEBUG(seastore_tm, "allocators for {}: {}",
+    SUBDEBUG(seastore_journal, "allocators for {}: {}",
       type,
       allocators[type].size());
   }
@@ -172,7 +170,7 @@ public:
   using open_ertr = ExtentOolWriter::open_ertr;
   open_ertr::future<> open() {
     LOG_PREFIX(ExtentPlacementManager::open);
-    SUBINFO(seastore_tm, "started");
+    SUBINFO(seastore_journal, "started");
     return crimson::do_for_each(allocators, [](auto& allocators_item) {
       return crimson::do_for_each(allocators_item.second, [](auto& allocator) {
         return allocator->open();
@@ -228,7 +226,7 @@ public:
     Transaction& t,
     const std::list<LogicalCachedExtentRef>& delayed_extents) {
     LOG_PREFIX(ExtentPlacementManager::delayed_alloc_or_ool_write);
-    SUBDEBUGT(seastore_tm, "start with {} delayed extents",
+    SUBDEBUGT(seastore_journal, "start with {} delayed extents",
               t, delayed_extents.size());
     return seastar::do_with(
         std::map<ExtentAllocator*, std::list<LogicalCachedExtentRef>>(),
@@ -251,7 +249,7 @@ public:
   using close_ertr = ExtentOolWriter::stop_ertr;
   close_ertr::future<> close() {
     LOG_PREFIX(ExtentPlacementManager::close);
-    SUBINFO(seastore_tm, "started");
+    SUBINFO(seastore_journal, "started");
     return crimson::do_for_each(allocators, [](auto& allocators_item) {
       return crimson::do_for_each(allocators_item.second, [](auto& allocator) {
         return allocator->stop();