]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore/journal: refactor, introduce JournalSegmentManager
authorYingxin Cheng <yingxin.cheng@intel.com>
Thu, 21 Oct 2021 07:31:47 +0000 (15:31 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Fri, 29 Oct 2021 01:33:38 +0000 (09:33 +0800)
In preparation for the follow-up batching control.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/journal.cc
src/crimson/os/seastore/journal.h

index cdda5c50747aeeda06b3b44652db25b5096e568a..f4b7d4ef03276b4c49b49b5aeb2f9baccda16bb7 100644 (file)
@@ -49,55 +49,12 @@ segment_nonce_t generate_nonce(
     sizeof(meta.seastore_id.uuid));
 }
 
-Journal::Journal(SegmentManager &segment_manager, ExtentReader& scanner)
-  : segment_manager(segment_manager), scanner(scanner) {}
-
-
-Journal::initialize_segment_ertr::future<segment_seq_t>
-Journal::initialize_segment(Segment &segment)
-{
-  auto new_tail = segment_provider->get_journal_tail_target();
-  // write out header
-  ceph_assert(segment.get_write_ptr() == 0);
-  bufferlist bl;
-
-  segment_seq_t seq = next_journal_segment_seq++;
-  current_segment_nonce = generate_nonce(
-    seq, segment_manager.get_meta());
-  auto header = segment_header_t{
-    seq,
-    segment.get_segment_id(),
-    segment_provider->get_journal_tail_target(),
-    current_segment_nonce,
-    false};
-  logger().debug(
-    "initialize_segment {} journal_tail_target {}, header {}",
-    segment.get_segment_id(),
-    new_tail,
-    header);
-  encode(header, bl);
-
-  bufferptr bp(
-    ceph::buffer::create_page_aligned(
-      segment_manager.get_block_size()));
-  bp.zero();
-  auto iter = bl.cbegin();
-  iter.copy(bl.length(), bp.c_str());
-  bl.clear();
-  bl.append(bp);
-
-  written_to = segment_manager.get_block_size();
-  committed_to = 0;
-  return segment.write(0, bl).safe_then(
-    [=] {
-      segment_provider->update_journal_tail_committed(new_tail);
-      return seq;
-    },
-    initialize_segment_ertr::pass_further{},
-    crimson::ct_error::assert_all{
-      "Invalid error in Journal::initialize_segment"
-    });
-}
+Journal::Journal(
+  SegmentManager& segment_manager,
+  ExtentReader& scanner)
+  : journal_segment_manager(segment_manager),
+    scanner(scanner)
+{}
 
 Journal::write_record_ret Journal::write_record(
   record_size_t rsize,
@@ -105,98 +62,26 @@ Journal::write_record_ret Journal::write_record(
   OrderingHandle &handle)
 {
   ceph::bufferlist to_write = encode_record(
-    rsize, std::move(record), segment_manager.get_block_size(),
-    committed_to, current_segment_nonce);
-  auto target = written_to;
-  assert((to_write.length() % segment_manager.get_block_size()) == 0);
-  written_to += to_write.length();
-  logger().debug(
-    "write_record, mdlength {}, dlength {}, target {}",
-    rsize.mdlength,
-    rsize.dlength,
-    target);
-
-  auto segment_id = current_journal_segment->get_segment_id();
-
+    rsize,
+    std::move(record),
+    journal_segment_manager.get_block_size(),
+    journal_segment_manager.get_committed_to(),
+    journal_segment_manager.get_nonce());
   // Start write under the current exclusive stage, but wait for it
   // in the device_submission concurrent stage to permit multiple
   // overlapping writes.
-  auto write_fut = current_journal_segment->write(target, to_write);
+  auto write_fut = journal_segment_manager.write(to_write);
   return handle.enter(write_pipeline->device_submission
   ).then([write_fut = std::move(write_fut)]() mutable {
-    return std::move(write_fut
-    ).handle_error(
-      write_record_ertr::pass_further{},
-      crimson::ct_error::assert_all{
-       "Invalid error in Journal::write_record"
-      }
-    );
-  }).safe_then([this, &handle] {
-    return handle.enter(write_pipeline->finalize);
-  }).safe_then([this, target, segment_id] {
-    logger().debug(
-      "write_record: commit target {}",
-      target);
-    if (segment_id == current_journal_segment->get_segment_id()) {
-      assert(committed_to < target);
-      committed_to = target;
-    }
-    return write_record_ret(
-      write_record_ertr::ready_future_marker{},
-      paddr_t{
-       segment_id,
-       target});
-  });
-}
-
-bool Journal::needs_roll(segment_off_t length) const
-{
-  return length + written_to >
-    current_journal_segment->get_write_capacity();
-}
-
-Journal::roll_journal_segment_ertr::future<segment_seq_t>
-Journal::roll_journal_segment()
-{
-  auto old_segment_id = current_journal_segment ?
-    current_journal_segment->get_segment_id() :
-    NULL_SEG_ID;
-
-  return (current_journal_segment ?
-         current_journal_segment->close() :
-         Segment::close_ertr::now()).safe_then([this] {
-      return segment_provider->get_segment(segment_manager.get_device_id());
-    }).safe_then([this](auto segment) {
-      return segment_manager.open(segment);
-    }).safe_then([this](auto sref) {
-      current_journal_segment = sref;
-      written_to = 0;
-      return initialize_segment(*current_journal_segment);
-    }).safe_then([=](auto seq) {
-      if (old_segment_id != NULL_SEG_ID) {
-       segment_provider->close_segment(old_segment_id);
-      }
-      segment_provider->set_journal_segment(
-       current_journal_segment->get_segment_id(),
-       seq);
-      return seq;
-    }).handle_error(
-      roll_journal_segment_ertr::pass_further{},
-      crimson::ct_error::all_same_way([] { ceph_assert(0 == "TODO"); })
-    );
-}
-
-Journal::open_for_write_ret Journal::open_for_write()
-{
-  return roll_journal_segment().safe_then([this](auto seq) {
-    return open_for_write_ret(
-      open_for_write_ertr::ready_future_marker{},
-      journal_seq_t{
-       seq,
-       paddr_t{
-         current_journal_segment->get_segment_id(),
-           static_cast<segment_off_t>(segment_manager.get_block_size())}
-      });
+    return std::move(write_fut);
+  }).safe_then([this, &handle, rsize](journal_seq_t write_start) {
+    return handle.enter(write_pipeline->finalize
+    ).then([this, write_start, rsize] {
+      auto committed_to = write_start;
+      committed_to.offset.offset += (rsize.mdlength + rsize.dlength);
+      journal_segment_manager.mark_committed(committed_to);
+      return write_start.offset;
+    });
   });
 }
 
@@ -218,8 +103,8 @@ Journal::prep_replay_segments(
        rt.second.journal_segment_seq;
     });
 
-  next_journal_segment_seq =
-    segments.rbegin()->second.journal_segment_seq + 1;
+  journal_segment_manager.set_segment_seq(
+    segments.rbegin()->second.journal_segment_seq);
   std::for_each(
     segments.begin(),
     segments.end(),
@@ -254,7 +139,7 @@ Journal::prep_replay_segments(
   } else {
     replay_from = paddr_t{
       from->first,
-      (segment_off_t)segment_manager.get_block_size()};
+      (segment_off_t)journal_segment_manager.get_block_size()};
   }
   auto ret = replay_segments_t(segments.end() - from);
   std::transform(
@@ -264,7 +149,7 @@ Journal::prep_replay_segments(
        p.second.journal_segment_seq,
        paddr_t{
          p.first,
-         (segment_off_t)segment_manager.get_block_size()}};
+         (segment_off_t)journal_segment_manager.get_block_size()}};
       logger().debug(
        "Journal::prep_replay_segments: replaying from  {}",
        ret);
@@ -383,4 +268,149 @@ Journal::replay_ret Journal::replay(
     });
 }
 
+Journal::JournalSegmentManager::JournalSegmentManager(
+  SegmentManager& segment_manager)
+  : segment_manager{segment_manager}
+{
+  reset();
+}
+
+Journal::JournalSegmentManager::close_ertr::future<>
+Journal::JournalSegmentManager::close()
+{
+  return (
+    current_journal_segment ?
+    current_journal_segment->close() :
+    Segment::close_ertr::now()
+  ).handle_error(
+    close_ertr::pass_further{},
+    crimson::ct_error::assert_all{
+      "Invalid error in JournalSegmentManager::close()"
+    }
+  ).finally([this] {
+    reset();
+  });
+}
+
+Journal::JournalSegmentManager::roll_ertr::future<>
+Journal::JournalSegmentManager::roll()
+{
+  auto old_segment_id = current_journal_segment ?
+    current_journal_segment->get_segment_id() :
+    NULL_SEG_ID;
+
+  return (
+    current_journal_segment ?
+    current_journal_segment->close() :
+    Segment::close_ertr::now()
+  ).safe_then([this] {
+    return segment_provider->get_segment(segment_manager.get_device_id());
+  }).safe_then([this](auto segment) {
+    return segment_manager.open(segment);
+  }).safe_then([this](auto sref) {
+    current_journal_segment = sref;
+    return initialize_segment(*current_journal_segment);
+  }).safe_then([this, old_segment_id] {
+    if (old_segment_id != NULL_SEG_ID) {
+      segment_provider->close_segment(old_segment_id);
+    }
+    segment_provider->set_journal_segment(
+      current_journal_segment->get_segment_id(),
+      get_segment_seq());
+  }).handle_error(
+    roll_ertr::pass_further{},
+    crimson::ct_error::assert_all{
+      "Invalid error in JournalSegmentManager::roll"
+    }
+  );
+}
+
+Journal::JournalSegmentManager::write_ret
+Journal::JournalSegmentManager::write(ceph::bufferlist to_write)
+{
+  auto write_length = to_write.length();
+  auto write_start_seq = get_current_write_seq();
+  logger().debug(
+    "JournalSegmentManager::write: write_start {} => {}, length={}",
+    write_start_seq,
+    write_start_seq.offset.offset + write_length,
+    write_length);
+  assert((write_length % segment_manager.get_block_size()) == 0);
+  assert(!needs_roll(write_length));
+
+  auto write_start_offset = written_to;
+  written_to += write_length;
+  return current_journal_segment->write(
+    write_start_offset, to_write
+  ).handle_error(
+    write_ertr::pass_further{},
+    crimson::ct_error::assert_all{
+      "Invalid error in JournalSegmentManager::write"
+    }
+  ).safe_then([write_start_seq] {
+    return write_start_seq;
+  });
+}
+
+void Journal::JournalSegmentManager::mark_committed(
+  const journal_seq_t& new_committed_to)
+{
+  logger().debug(
+    "JournalSegmentManager::mark_committed: committed_to {} => {}",
+    committed_to, new_committed_to);
+  assert(new_committed_to.segment_seq <=
+         get_segment_seq());
+  if (new_committed_to.segment_seq ==
+      get_segment_seq()) {
+    assert(committed_to.offset.offset < new_committed_to.offset.offset);
+    committed_to = new_committed_to;
+  }
+}
+
+Journal::JournalSegmentManager::initialize_segment_ertr::future<>
+Journal::JournalSegmentManager::initialize_segment(Segment& segment)
+{
+  auto new_tail = segment_provider->get_journal_tail_target();
+  // write out header
+  ceph_assert(segment.get_write_ptr() == 0);
+  bufferlist bl;
+
+  segment_seq_t seq = next_journal_segment_seq++;
+  current_segment_nonce = generate_nonce(
+    seq, segment_manager.get_meta());
+  auto header = segment_header_t{
+    seq,
+    segment.get_segment_id(),
+    segment_provider->get_journal_tail_target(),
+    current_segment_nonce,
+    false};
+  logger().debug(
+    "JournalSegmentManager::initialize_segment: segment_id {} journal_tail_target {}, header {}",
+    segment.get_segment_id(),
+    new_tail,
+    header);
+  encode(header, bl);
+
+  bufferptr bp(
+    ceph::buffer::create_page_aligned(
+      segment_manager.get_block_size()));
+  bp.zero();
+  auto iter = bl.cbegin();
+  iter.copy(bl.length(), bp.c_str());
+  bl.clear();
+  bl.append(bp);
+
+  written_to = 0;
+  // FIXME: improve committed_to to point to another segment
+  committed_to = get_current_write_seq();
+  return write(bl
+  ).safe_then([this, new_tail, write_size=bl.length()
+              ](journal_seq_t write_start_seq) {
+    auto committed_to = write_start_seq;
+    committed_to.offset.offset += write_size;
+    mark_committed(committed_to);
+    segment_provider->update_journal_tail_committed(new_tail);
+  });
+}
+
 }
index 7c6eb51074c05dce7d87c22743dfe6d650929865..064020dcc1c5baa5851d75e24dd06991662122db 100644 (file)
@@ -4,6 +4,7 @@
 #pragma once
 
 #include <boost/intrusive_ptr.hpp>
+#include <optional>
 
 #include <seastar/core/future.hh>
 
@@ -34,7 +35,7 @@ public:
    * Gets the current journal segment sequence.
    */
   segment_seq_t get_segment_seq() const {
-    return next_journal_segment_seq - 1;
+    return journal_segment_manager.get_segment_seq();
   }
 
   /**
@@ -48,6 +49,7 @@ public:
    */
   void set_segment_provider(SegmentProvider *provider) {
     segment_provider = provider;
+    journal_segment_manager.set_segment_provider(provider);
   }
 
   /**
@@ -59,7 +61,9 @@ public:
     crimson::ct_error::input_output_error
     >;
   using open_for_write_ret = open_for_write_ertr::future<journal_seq_t>;
-  open_for_write_ret open_for_write();
+  open_for_write_ret open_for_write() {
+    return journal_segment_manager.open();
+  }
 
   /**
    * close journal
@@ -69,19 +73,7 @@ public:
   using close_ertr = crimson::errorator<
     crimson::ct_error::input_output_error>;
   close_ertr::future<> close() {
-    return (
-      current_journal_segment ?
-      current_journal_segment->close() :
-      Segment::close_ertr::now()
-    ).handle_error(
-      close_ertr::pass_further{},
-      crimson::ct_error::assert_all{
-       "Error during Journal::close()"
-      }
-    ).finally([this] {
-      current_journal_segment.reset();
-      reset_soft_state();
-    });
+    return journal_segment_manager.close();
   }
 
   /**
@@ -102,23 +94,24 @@ public:
   ) {
     assert(write_pipeline);
     auto rsize = get_encoded_record_length(
-      record, segment_manager.get_block_size());
+      record, journal_segment_manager.get_block_size());
     auto total = rsize.mdlength + rsize.dlength;
-    if (total > max_record_length()) {
+    auto max_record_length = journal_segment_manager.get_max_write_length();
+    if (total > max_record_length) {
       auto &logger = crimson::get_logger(ceph_subsys_seastore);
       logger.error(
        "Journal::submit_record: record size {} exceeds max {}",
        total,
-       max_record_length()
+       max_record_length
       );
       return crimson::ct_error::erange::make();
     }
-    auto roll = needs_roll(total)
-      ? roll_journal_segment().safe_then([](auto){})
-      : roll_journal_segment_ertr::now();
+    auto roll = journal_segment_manager.needs_roll(total)
+      ? journal_segment_manager.roll()
+      : JournalSegmentManager::roll_ertr::now();
     return roll.safe_then(
       [this, rsize, record=std::move(record), &handle]() mutable {
-       auto seq = next_journal_segment_seq - 1;
+       auto seq = journal_segment_manager.get_segment_seq();
        return write_record(
          rsize, std::move(record),
          handle
@@ -151,32 +144,111 @@ public:
   }
 
 private:
-  SegmentProvider *segment_provider = nullptr;
-  SegmentManager &segment_manager;
+  class JournalSegmentManager {
+  public:
+    JournalSegmentManager(SegmentManager&);
+
+    using base_ertr = crimson::errorator<
+        crimson::ct_error::input_output_error>;
+    extent_len_t get_max_write_length() const {
+      return segment_manager.get_segment_size() -
+             p2align(ceph::encoded_sizeof_bounded<segment_header_t>(),
+                     size_t(segment_manager.get_block_size()));
+    }
 
-  segment_seq_t next_journal_segment_seq = 0;
-  segment_nonce_t current_segment_nonce = 0;
+    segment_off_t get_block_size() const {
+      return segment_manager.get_block_size();
+    }
 
-  SegmentRef current_journal_segment;
-  segment_off_t written_to = 0;
-  segment_off_t committed_to = 0;
+    segment_nonce_t get_nonce() const {
+      return current_segment_nonce;
+    }
 
-  ExtentReader& scanner;
-  WritePipeline *write_pipeline = nullptr;
+    segment_off_t get_committed_to() const {
+      assert(committed_to.segment_seq ==
+             get_segment_seq());
+      return committed_to.offset.offset;
+    }
 
-  void reset_soft_state() {
-    next_journal_segment_seq = 0;
-    current_segment_nonce = 0;
-    written_to = 0;
-    committed_to = 0;
-  }
+    segment_seq_t get_segment_seq() const {
+      return next_journal_segment_seq - 1;
+    }
 
-  /// prepare segment for writes, writes out segment header
-  using initialize_segment_ertr = crimson::errorator<
-    crimson::ct_error::input_output_error>;
-  initialize_segment_ertr::future<segment_seq_t> initialize_segment(
-    Segment &segment);
+    void set_segment_provider(SegmentProvider* provider) {
+      segment_provider = provider;
+    }
+
+    void set_segment_seq(segment_seq_t current_seq) {
+      next_journal_segment_seq = (current_seq + 1);
+    }
+
+    using open_ertr = base_ertr;
+    using open_ret = open_ertr::future<journal_seq_t>;
+    open_ret open() {
+      return roll().safe_then([this] {
+        return get_current_write_seq();
+      });
+    }
+
+    using close_ertr = base_ertr;
+    close_ertr::future<> close();
 
+    // returns true iff the current segment has insufficient space
+    bool needs_roll(std::size_t length) const {
+      auto write_capacity = current_journal_segment->get_write_capacity();
+      return length + written_to > std::size_t(write_capacity);
+    }
+
+    // close the current segment and initialize next one
+    using roll_ertr = base_ertr;
+    roll_ertr::future<> roll();
+
+    // write the buffer, return the write start
+    // May be called concurrently, writes may complete in any order.
+    using write_ertr = base_ertr;
+    using write_ret = write_ertr::future<journal_seq_t>;
+    write_ret write(ceph::bufferlist to_write);
+
+    // mark write committed in order
+    void mark_committed(const journal_seq_t& new_committed_to);
+
+  private:
+    journal_seq_t get_current_write_seq() const {
+      assert(current_journal_segment);
+      return journal_seq_t{
+        get_segment_seq(),
+        {current_journal_segment->get_segment_id(), written_to}
+      };
+    }
+
+    void reset() {
+      next_journal_segment_seq = 0;
+      current_segment_nonce = 0;
+      current_journal_segment.reset();
+      written_to = 0;
+      committed_to = {};
+    }
+
+    // prepare segment for writes, writes out segment header
+    using initialize_segment_ertr = base_ertr;
+    initialize_segment_ertr::future<> initialize_segment(Segment&);
+
+    SegmentProvider* segment_provider;
+    SegmentManager& segment_manager;
+
+    segment_seq_t next_journal_segment_seq;
+    segment_nonce_t current_segment_nonce;
+
+    SegmentRef current_journal_segment;
+    segment_off_t written_to;
+    // committed_to may be in a previous journal segment
+    journal_seq_t committed_to;
+  };
+
+  SegmentProvider* segment_provider = nullptr;
+  JournalSegmentManager journal_segment_manager;
+  ExtentReader& scanner;
+  WritePipeline *write_pipeline = nullptr;
 
   /// do record write
   using write_record_ertr = crimson::errorator<
@@ -187,14 +259,6 @@ private:
     record_t &&record,
     OrderingHandle &handle);
 
-  /// close current segment and initialize next one
-  using roll_journal_segment_ertr = crimson::errorator<
-    crimson::ct_error::input_output_error>;
-  roll_journal_segment_ertr::future<segment_seq_t> roll_journal_segment();
-
-  /// returns true iff current segment has insufficient space
-  bool needs_roll(segment_off_t length) const;
-
   /// return ordered vector of segments to replay
   using replay_segments_t = std::vector<
     std::pair<journal_seq_t, segment_header_t>>;
@@ -219,19 +283,7 @@ private:
     segment_header_t header,         ///< [in] segment header
     delta_handler_t &delta_handler   ///< [in] processes deltas in order
   );
-
-  extent_len_t max_record_length() const;
 };
 using JournalRef = std::unique_ptr<Journal>;
 
 }
-
-namespace crimson::os::seastore {
-
-inline extent_len_t Journal::max_record_length() const {
-  return segment_manager.get_segment_size() -
-    p2align(ceph::encoded_sizeof_bounded<segment_header_t>(),
-           size_t(segment_manager.get_block_size()));
-}
-
-}