]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore/journal: introduce JournalAllocator to generalize SegmentAllocator
authormyoungwon oh <ohmyoungwon@gmail.com>
Wed, 5 Apr 2023 04:37:29 +0000 (04:37 +0000)
committermyoungwon oh <ohmyoungwon@gmail.com>
Thu, 20 Apr 2023 13:01:27 +0000 (13:01 +0000)
Signed-off-by: Myoungwon Oh <myoungwon.oh@samsung.com>
src/crimson/os/seastore/journal/record_submitter.cc
src/crimson/os/seastore/journal/record_submitter.h
src/crimson/os/seastore/journal/segment_allocator.h

index 09cac5b2671a00283df7b3b20bdd6f2725c80676..5ca53b436d51b57312892a22f0fd7fe1ed94d49c 100644 (file)
@@ -128,10 +128,10 @@ RecordSubmitter::RecordSubmitter(
   std::size_t batch_capacity,
   std::size_t batch_flush_size,
   double preferred_fullness,
-  SegmentAllocator& sa)
+  JournalAllocator& ja)
   : io_depth_limit{io_depth},
     preferred_fullness{preferred_fullness},
-    segment_allocator{sa},
+    journal_allocator{ja},
     batches(new RecordBatch[io_depth + 1])
 {
   LOG_PREFIX(RecordSubmitter);
@@ -158,7 +158,7 @@ bool RecordSubmitter::is_available() const
 #ifndef NDEBUG
   if (ret) {
     // unconditional invariants
-    ceph_assert(segment_allocator.can_write());
+    ceph_assert(journal_allocator.can_write());
     ceph_assert(p_current_batch != nullptr);
     ceph_assert(!p_current_batch->is_submitting());
     // the current batch accepts a further write
@@ -166,7 +166,7 @@ bool RecordSubmitter::is_available() const
     if (!p_current_batch->is_empty()) {
       auto submit_length =
         p_current_batch->get_submit_size().get_encoded_length();
-      ceph_assert(!segment_allocator.needs_roll(submit_length));
+      ceph_assert(!journal_allocator.needs_roll(submit_length));
     }
     // I'm not rolling
   }
@@ -199,8 +199,8 @@ RecordSubmitter::check_action(
 {
   assert(is_available());
   auto eval = p_current_batch->evaluate_submit(
-      rsize, segment_allocator.get_block_size());
-  if (segment_allocator.needs_roll(eval.submit_size.get_encoded_length())) {
+      rsize, journal_allocator.get_block_size());
+  if (journal_allocator.needs_roll(eval.submit_size.get_encoded_length())) {
     return action_t::ROLL;
   } else if (eval.is_full) {
     return action_t::SUBMIT_FULL;
@@ -242,7 +242,7 @@ RecordSubmitter::roll_segment()
       return roll_segment_ertr::now();
     } else {
       // start rolling in background
-      std::ignore = segment_allocator.roll(
+      std::ignore = journal_allocator.roll(
       ).safe_then([FNAME, this] {
         // good
         DEBUG("{} rolling done, available", get_name());
@@ -276,12 +276,9 @@ RecordSubmitter::submit(
   LOG_PREFIX(RecordSubmitter::submit);
   ceph_assert(is_available());
   assert(check_action(record.size) != action_t::ROLL);
-  segment_allocator.get_provider().update_modify_time(
-      segment_allocator.get_segment_id(),
-      record.modify_time,
-      record.extents.size());
+  journal_allocator.update_modify_time(record);
   auto eval = p_current_batch->evaluate_submit(
-      record.size, segment_allocator.get_block_size());
+      record.size, journal_allocator.get_block_size());
   bool needs_flush = (
       state == state_t::IDLE ||
       eval.submit_size.get_fullness() > preferred_fullness ||
@@ -296,13 +293,13 @@ RecordSubmitter::submit(
     increment_io();
     auto [to_write, sizes] = p_current_batch->submit_pending_fast(
       std::move(record),
-      segment_allocator.get_block_size(),
-      committed_to,
-      segment_allocator.get_nonce());
+      journal_allocator.get_block_size(),
+      get_committed_to(),
+      journal_allocator.get_nonce());
     DEBUG("{} fast submit {}, committed_to={}, outstanding_io={} ...",
-          get_name(), sizes, committed_to, num_outstanding_io);
+          get_name(), sizes, get_committed_to(), num_outstanding_io);
     account_submission(1, sizes);
-    return segment_allocator.write(std::move(to_write)
+    return journal_allocator.write(std::move(to_write)
     ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
       return record_locator_t{
         write_result.start_seq.offset.add_offset(mdlength),
@@ -316,7 +313,7 @@ RecordSubmitter::submit(
   auto write_fut = p_current_batch->add_pending(
     get_name(),
     std::move(record),
-    segment_allocator.get_block_size());
+    journal_allocator.get_block_size());
   if (needs_flush) {
     if (state == state_t::FULL) {
       // #2 block concurrent submissions due to lack of resource
@@ -358,7 +355,7 @@ RecordSubmitter::submit(
 RecordSubmitter::open_ret
 RecordSubmitter::open(bool is_mkfs)
 {
-  return segment_allocator.open(is_mkfs
+  return journal_allocator.open(is_mkfs
   ).safe_then([this](journal_seq_t ret) {
     LOG_PREFIX(RecordSubmitter::open);
     DEBUG("{} register metrics", get_name());
@@ -420,16 +417,16 @@ RecordSubmitter::open(bool is_mkfs)
 RecordSubmitter::close_ertr::future<>
 RecordSubmitter::close()
 {
+  committed_to = JOURNAL_SEQ_NULL;
   ceph_assert(state == state_t::IDLE);
   ceph_assert(num_outstanding_io == 0);
-  committed_to = JOURNAL_SEQ_NULL;
   ceph_assert(p_current_batch != nullptr);
   ceph_assert(p_current_batch->is_empty());
   ceph_assert(!wait_available_promise.has_value());
   has_io_error = false;
   ceph_assert(!wait_unfull_flush_promise.has_value());
   metrics.clear();
-  return segment_allocator.close();
+  return journal_allocator.close();
 }
 
 void RecordSubmitter::update_state()
@@ -511,11 +508,11 @@ void RecordSubmitter::flush_current_batch()
   increment_io();
   auto num = p_batch->get_num_records();
   auto [to_write, sizes] = p_batch->encode_batch(
-    committed_to, segment_allocator.get_nonce());
+    get_committed_to(), journal_allocator.get_nonce());
   DEBUG("{} {} records, {}, committed_to={}, outstanding_io={} ...",
-        get_name(), num, sizes, committed_to, num_outstanding_io);
+        get_name(), num, sizes, get_committed_to(), num_outstanding_io);
   account_submission(num, sizes);
-  std::ignore = segment_allocator.write(std::move(to_write)
+  std::ignore = journal_allocator.write(std::move(to_write)
   ).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) {
     TRACE("{} {} records, {}, write done with {}",
           get_name(), num, sizes, write_result);
index 118d5a7b5d75bddcc737789da512ddfefd866477..eedd2dd8cfd5ee7c70910e7cd420a8dada422758 100644 (file)
@@ -13,7 +13,6 @@
 #include "crimson/common/errorator.h"
 #include "crimson/os/seastore/segment_manager_group.h"
 #include "crimson/os/seastore/segment_seq_allocator.h"
-#include "crimson/os/seastore/journal/segment_allocator.h"
 
 namespace crimson::os::seastore {
   class SegmentProvider;
@@ -22,6 +21,38 @@ namespace crimson::os::seastore {
 
 namespace crimson::os::seastore::journal {
 
+class JournalAllocator {
+public:
+  using base_ertr = crimson::errorator<
+      crimson::ct_error::input_output_error>;
+  virtual const std::string& get_name() const = 0;
+  
+  virtual void update_modify_time(record_t& record) = 0;
+
+  virtual extent_len_t get_block_size() const = 0;
+
+  using close_ertr = base_ertr;
+  virtual close_ertr::future<> close() = 0;
+
+  virtual segment_nonce_t get_nonce() const  = 0;
+
+  using write_ertr = base_ertr;
+  using write_ret = write_ertr::future<write_result_t>;
+  virtual write_ret write(ceph::bufferlist&& to_write) = 0;
+
+  virtual bool can_write() const = 0;
+  
+  using roll_ertr = base_ertr;
+  virtual roll_ertr::future<> roll() = 0;
+
+  virtual bool needs_roll(std::size_t length) const = 0;
+
+  using open_ertr = base_ertr;
+  using open_ret = open_ertr::future<journal_seq_t>;
+  virtual open_ret open(bool is_mkfs) = 0;
+
+};
+
 /**
  * RecordBatch
  *
@@ -111,7 +142,7 @@ public:
   //
   // Set write_result_t::write_length to 0 if the record is not the first one
   // in the batch.
-  using add_pending_ertr = SegmentAllocator::write_ertr;
+  using add_pending_ertr = JournalAllocator::write_ertr;
   using add_pending_ret = add_pending_ertr::future<record_locator_t>;
   add_pending_ret add_pending(
       const std::string& name,
@@ -204,10 +235,10 @@ public:
                   std::size_t batch_capacity,
                   std::size_t batch_flush_size,
                   double preferred_fullness,
-                  SegmentAllocator&);
+                 JournalAllocator&);
 
   const std::string& get_name() const {
-    return segment_allocator.get_name();
+    return journal_allocator.get_name();
   }
 
   journal_seq_t get_committed_to() const {
@@ -287,7 +318,7 @@ private:
   std::size_t io_depth_limit;
   double preferred_fullness;
 
-  SegmentAllocator& segment_allocator;
+  JournalAllocator& journal_allocator;
   // committed_to may be in a previous journal segment
   journal_seq_t committed_to = JOURNAL_SEQ_NULL;
 
index 8cab895f8c76e0b4c384a6bc8b3c892285371839..292c23070ba74e57925a080ae55b67d5af96b5f0 100644 (file)
@@ -13,6 +13,8 @@
 #include "crimson/common/errorator.h"
 #include "crimson/os/seastore/segment_manager_group.h"
 #include "crimson/os/seastore/segment_seq_allocator.h"
+#include "crimson/os/seastore/journal/record_submitter.h"
+#include "crimson/os/seastore/async_cleaner.h"
 
 namespace crimson::os::seastore {
   class SegmentProvider;
@@ -26,27 +28,19 @@ namespace crimson::os::seastore::journal {
  *
  * Maintain an available segment for writes.
  */
-class SegmentAllocator {
-  using base_ertr = crimson::errorator<
-      crimson::ct_error::input_output_error>;
+class SegmentAllocator : public JournalAllocator {
 
  public:
+  // SegmentAllocator specific methods
   SegmentAllocator(JournalTrimmer *trimmer,
                    data_category_t category,
                    rewrite_gen_t gen,
                    SegmentProvider &sp,
                    SegmentSeqAllocator &ssa);
 
-  const std::string& get_name() const {
-    return print_name;
-  }
-
-  SegmentProvider &get_provider() {
-    return segment_provider;
-  }
-
-  extent_len_t get_block_size() const {
-    return sm_group.get_block_size();
+  segment_id_t get_segment_id() const {
+    assert(can_write());
+    return current_segment->get_segment_id();
   }
 
   extent_len_t get_max_write_length() const {
@@ -55,27 +49,27 @@ class SegmentAllocator {
            sm_group.get_rounded_tail_length();
   }
 
-  bool can_write() const {
-    return !!current_segment;
+ public:
+  // overriding methods
+  const std::string& get_name() const final {
+    return print_name;
   }
 
-  segment_id_t get_segment_id() const {
-    assert(can_write());
-    return current_segment->get_segment_id();
+  extent_len_t get_block_size() const final {
+    return sm_group.get_block_size();
   }
 
-  segment_nonce_t get_nonce() const {
-    assert(can_write());
-    return current_segment_nonce;
+  bool can_write() const final {
+    return !!current_segment;
   }
 
-  segment_off_t get_written_to() const {
+  segment_nonce_t get_nonce() const final {
     assert(can_write());
-    return written_to;
+    return current_segment_nonce;
   }
 
   // returns true iff the current segment has insufficient space
-  bool needs_roll(std::size_t length) const {
+  bool needs_roll(std::size_t length) const final {
     assert(can_write());
     assert(current_segment->get_write_capacity() ==
            sm_group.get_segment_size());
@@ -85,24 +79,26 @@ class SegmentAllocator {
   }
 
   // open for write and generate the correct print name
-  using open_ertr = base_ertr;
-  using open_ret = open_ertr::future<journal_seq_t>;
-  open_ret open(bool is_mkfs);
+  open_ret open(bool is_mkfs) final;
 
   // close the current segment and initialize next one
-  using roll_ertr = base_ertr;
-  roll_ertr::future<> roll();
+  roll_ertr::future<> roll() final;
 
   // write the buffer, return the write result
   //
   // May be called concurrently, but writes may complete in any order.
   // If rolling/opening, no write is allowed.
-  using write_ertr = base_ertr;
-  using write_ret = write_ertr::future<write_result_t>;
-  write_ret write(ceph::bufferlist&& to_write);
+  write_ret write(ceph::bufferlist&& to_write) final;
 
   using close_ertr = base_ertr;
-  close_ertr::future<> close();
+  close_ertr::future<> close() final;
+
+  void update_modify_time(record_t& record) final {
+    segment_provider.update_modify_time(
+      get_segment_id(),
+      record.modify_time,
+      record.extents.size());
+  }
 
  private:
   open_ret do_open(bool is_mkfs);