]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore: introduce SegmentAllocator and integrate with Journal
authorYingxin Cheng <yingxin.cheng@intel.com>
Fri, 18 Feb 2022 06:47:17 +0000 (14:47 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Thu, 24 Feb 2022 12:55:37 +0000 (20:55 +0800)
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/CMakeLists.txt
src/crimson/os/seastore/journal/segment_allocator.cc [new file with mode: 0644]
src/crimson/os/seastore/journal/segment_allocator.h [new file with mode: 0644]
src/crimson/os/seastore/journal/segmented_journal.cc
src/crimson/os/seastore/journal/segmented_journal.h

index 5664bcbdb66e5f2570c065ffa715ac6118ef8929..cec75b10471d4e66c456b378633956405a10a9a9 100644 (file)
@@ -39,6 +39,7 @@ set(crimson_seastore_srcs
   random_block_manager/nvme_manager.cc
   random_block_manager/nvmedevice.cc
   journal/segmented_journal.cc
+  journal/segment_allocator.cc
   journal.cc
   ../../../test/crimson/seastore/test_block.cc
   ${PROJECT_SOURCE_DIR}/src/os/Transaction.cc
diff --git a/src/crimson/os/seastore/journal/segment_allocator.cc b/src/crimson/os/seastore/journal/segment_allocator.cc
new file mode 100644 (file)
index 0000000..b1245c6
--- /dev/null
@@ -0,0 +1,210 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#include "segment_allocator.h"
+
+#include "crimson/os/seastore/logging.h"
+#include "crimson/os/seastore/segment_cleaner.h"
+
+SET_SUBSYS(seastore_journal);
+
+namespace crimson::os::seastore::journal {
+
+static segment_nonce_t generate_nonce(
+  segment_seq_t seq,
+  const seastore_meta_t &meta)
+{
+  return ceph_crc32c(
+    seq,
+    reinterpret_cast<const unsigned char *>(meta.seastore_id.bytes()),
+    sizeof(meta.seastore_id.uuid));
+}
+
+SegmentAllocator::SegmentAllocator(
+  segment_type_t type,
+  SegmentProvider &sp,
+  SegmentManager &sm)
+  : type{type},
+    segment_provider{sp},
+    segment_manager{sm}
+{
+  ceph_assert(type != segment_type_t::NULL_SEG);
+  reset();
+}
+
+void SegmentAllocator::set_next_segment_seq(segment_seq_t seq)
+{
+  LOG_PREFIX(SegmentAllocator::set_next_segment_seq);
+  INFO("{} {} next_segment_seq={}",
+       type, get_device_id(), segment_seq_printer_t{seq});
+  assert(type == segment_seq_to_type(seq));
+  next_segment_seq = seq;
+}
+
+SegmentAllocator::open_ertr::future<journal_seq_t>
+SegmentAllocator::open()
+{
+  LOG_PREFIX(SegmentAllocator::open);
+  ceph_assert(!current_segment);
+  segment_seq_t new_segment_seq;
+  if (type == segment_type_t::JOURNAL) {
+    new_segment_seq = next_segment_seq++;
+  } else { // OOL
+    new_segment_seq = next_segment_seq;
+  }
+  assert(new_segment_seq == get_current_segment_seq());
+  ceph_assert(segment_seq_to_type(new_segment_seq) == type);
+  auto new_segment_id = segment_provider.get_segment(
+      get_device_id(), new_segment_seq);
+  return segment_manager.open(new_segment_id
+  ).handle_error(
+    open_ertr::pass_further{},
+    crimson::ct_error::assert_all{
+      "Invalid error in SegmentAllocator::open open"
+    }
+  ).safe_then([this, FNAME, new_segment_seq](auto sref) {
+    // initialize new segment
+    journal_seq_t new_journal_tail;
+    if (type == segment_type_t::JOURNAL) {
+      new_journal_tail = segment_provider.get_journal_tail_target();
+      current_segment_nonce = generate_nonce(
+          new_segment_seq, segment_manager.get_meta());
+    } else { // OOL
+      new_journal_tail = NO_DELTAS;
+      assert(current_segment_nonce == 0);
+    }
+    segment_id_t segment_id = sref->get_segment_id();
+    auto header = segment_header_t{
+      new_segment_seq,
+      segment_id,
+      new_journal_tail,
+      current_segment_nonce};
+    INFO("{} {} writing header to new segment ... -- {}",
+         type, get_device_id(), header);
+
+    auto header_length = segment_manager.get_block_size();
+    bufferlist bl;
+    encode(header, bl);
+    bufferptr bp(ceph::buffer::create_page_aligned(header_length));
+    bp.zero();
+    auto iter = bl.cbegin();
+    iter.copy(bl.length(), bp.c_str());
+    bl.clear();
+    bl.append(bp);
+
+    ceph_assert(sref->get_write_ptr() == 0);
+    assert((unsigned)header_length == bl.length());
+    written_to = header_length;
+    auto new_journal_seq = journal_seq_t{
+      new_segment_seq,
+      paddr_t::make_seg_paddr(segment_id, written_to)};
+    return sref->write(0, bl
+    ).handle_error(
+      open_ertr::pass_further{},
+      crimson::ct_error::assert_all{
+        "Invalid error in SegmentAllocator::open write"
+      }
+    ).safe_then([this,
+                 FNAME,
+                 new_journal_seq,
+                 new_journal_tail,
+                 sref=std::move(sref)]() mutable {
+      ceph_assert(!current_segment);
+      current_segment = std::move(sref);
+      if (type == segment_type_t::JOURNAL) {
+        segment_provider.update_journal_tail_committed(new_journal_tail);
+      }
+      DEBUG("{} {} rolled new segment id={}",
+            type, get_device_id(), current_segment->get_segment_id());
+      ceph_assert(new_journal_seq.segment_seq == get_current_segment_seq());
+      return new_journal_seq;
+    });
+  });
+}
+
+SegmentAllocator::roll_ertr::future<>
+SegmentAllocator::roll()
+{
+  ceph_assert(can_write());
+  return close_segment(true).safe_then([this] {
+    return open().discard_result();
+  });
+}
+
+SegmentAllocator::write_ret
+SegmentAllocator::write(ceph::bufferlist to_write)
+{
+  LOG_PREFIX(SegmentAllocator::write);
+  assert(can_write());
+  auto write_length = to_write.length();
+  auto write_start_offset = written_to;
+  auto write_start_seq = journal_seq_t{
+    get_current_segment_seq(),
+    paddr_t::make_seg_paddr(
+      current_segment->get_segment_id(), write_start_offset)
+  };
+  TRACE("{} {} {}~{}", type, get_device_id(), write_start_seq, write_length);
+  assert(write_length > 0);
+  assert((write_length % segment_manager.get_block_size()) == 0);
+  assert(!needs_roll(write_length));
+
+  auto write_result = write_result_t{
+    write_start_seq,
+    static_cast<seastore_off_t>(write_length)
+  };
+  written_to += write_length;
+  return current_segment->write(
+    write_start_offset, to_write
+  ).handle_error(
+    write_ertr::pass_further{},
+    crimson::ct_error::assert_all{
+      "Invalid error in SegmentAllocator::write"
+    }
+  ).safe_then([write_result] {
+    return write_result;
+  });
+}
+
+SegmentAllocator::close_ertr::future<>
+SegmentAllocator::close()
+{
+  return [this] {
+    LOG_PREFIX(SegmentAllocator::close);
+    if (current_segment) {
+      return close_segment(false);
+    } else {
+      INFO("{} {} no current segment", type, get_device_id());
+      return close_segment_ertr::now();
+    }
+  }().finally([this] {
+    reset();
+  });
+}
+
+SegmentAllocator::close_segment_ertr::future<>
+SegmentAllocator::close_segment(bool is_rolling)
+{
+  LOG_PREFIX(SegmentAllocator::close_segment);
+  assert(can_write());
+  auto seg_to_close = std::move(current_segment);
+  auto close_segment_id = seg_to_close->get_segment_id();
+  INFO("{} {} close segment id={}, seq={}, written_to={}, nonce={}",
+       type, get_device_id(),
+       close_segment_id,
+       segment_seq_printer_t{get_current_segment_seq()},
+       written_to,
+       current_segment_nonce);
+  if (is_rolling) {
+    segment_provider.close_segment(close_segment_id);
+  }
+  return seg_to_close->close(
+  ).safe_then([seg_to_close=std::move(seg_to_close)] {
+  }).handle_error(
+    close_segment_ertr::pass_further{},
+    crimson::ct_error::assert_all{
+      "Invalid error in SegmentAllocator::close_segment"
+    }
+  );
+}
+
+}
diff --git a/src/crimson/os/seastore/journal/segment_allocator.h b/src/crimson/os/seastore/journal/segment_allocator.h
new file mode 100644 (file)
index 0000000..ad36cfd
--- /dev/null
@@ -0,0 +1,124 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#pragma once
+
+#include "include/buffer.h"
+
+#include "crimson/common/errorator.h"
+#include "crimson/os/seastore/segment_manager.h"
+
+namespace crimson::os::seastore {
+  class SegmentProvider;
+}
+
+namespace crimson::os::seastore::journal {
+
+/**
+ * SegmentAllocator
+ *
+ * Maintain an available segment for writes.
+ */
+class SegmentAllocator {
+  using base_ertr = crimson::errorator<
+      crimson::ct_error::input_output_error>;
+
+ public:
+  SegmentAllocator(segment_type_t type,
+                   SegmentProvider &sp,
+                   SegmentManager &sm);
+
+  device_id_t get_device_id() const {
+    return segment_manager.get_device_id();
+  }
+
+  seastore_off_t get_block_size() const {
+    return segment_manager.get_block_size();
+  }
+
+  extent_len_t get_max_write_length() const {
+    return segment_manager.get_segment_size() -
+           p2align(ceph::encoded_sizeof_bounded<segment_header_t>(),
+                   size_t(segment_manager.get_block_size()));
+  }
+
+  device_segment_id_t get_num_segments() const {
+    return segment_manager.get_num_segments();
+  }
+
+  bool can_write() const {
+    return !!current_segment;
+  }
+
+  segment_nonce_t get_nonce() const {
+    assert(can_write());
+    return current_segment_nonce;
+  }
+
+  void set_next_segment_seq(segment_seq_t);
+
+  // returns true iff the current segment has insufficient space
+  bool needs_roll(std::size_t length) const {
+    assert(can_write());
+    auto write_capacity = current_segment->get_write_capacity();
+    return length + written_to > std::size_t(write_capacity);
+  }
+
+  // open for write
+  using open_ertr = base_ertr;
+  using open_ret = open_ertr::future<journal_seq_t>;
+  open_ret open();
+
+  // close the current segment and initialize next one
+  using roll_ertr = base_ertr;
+  roll_ertr::future<> roll();
+
+  // write the buffer, return the write result
+  //
+  // May be called concurrently, but writes may complete in any order.
+  // If rolling/opening, no write is allowed.
+  using write_ertr = base_ertr;
+  using write_ret = write_ertr::future<write_result_t>;
+  write_ret write(ceph::bufferlist to_write);
+
+  using close_ertr = base_ertr;
+  close_ertr::future<> close();
+
+ private:
+  void reset() {
+    current_segment.reset();
+    if (type == segment_type_t::JOURNAL) {
+      next_segment_seq = 0;
+    } else { // OOL
+      next_segment_seq = OOL_SEG_SEQ;
+    }
+    current_segment_nonce = 0;
+    written_to = 0;
+  }
+
+  // FIXME: remove the unnecessary is_rolling
+  using close_segment_ertr = base_ertr;
+  close_segment_ertr::future<> close_segment(bool is_rolling);
+
+  segment_seq_t get_current_segment_seq() const {
+    segment_seq_t ret;
+    if (type == segment_type_t::JOURNAL) {
+      assert(next_segment_seq != 0);
+      ret = next_segment_seq - 1;
+    } else { // OOL
+      ret = next_segment_seq;
+    }
+    assert(segment_seq_to_type(ret) == type);
+    return ret;
+  }
+
+  const segment_type_t type; // JOURNAL or OOL
+  SegmentProvider &segment_provider;
+  SegmentManager &segment_manager;
+  SegmentRef current_segment;
+  segment_seq_t next_segment_seq;
+  segment_nonce_t current_segment_nonce;
+  seastore_off_t written_to;
+};
+
+}
index 851e6cc2182cf75c3596eb1d5ea3ac3a9cc6e10a..ce677b55c9d8d20d4085ce002493141450fc1049 100644 (file)
@@ -26,22 +26,14 @@ SET_SUBSYS(seastore_journal);
 
 namespace crimson::os::seastore::journal {
 
-segment_nonce_t generate_nonce(
-  segment_seq_t seq,
-  const seastore_meta_t &meta)
-{
-  return ceph_crc32c(
-    seq,
-    reinterpret_cast<const unsigned char *>(meta.seastore_id.bytes()),
-    sizeof(meta.seastore_id.uuid));
-}
-
 SegmentedJournal::SegmentedJournal(
   SegmentManager &segment_manager,
   ExtentReader &scanner,
   SegmentProvider &segment_provider)
   : segment_provider(segment_provider),
-    journal_segment_manager(segment_manager, segment_provider),
+    journal_segment_allocator(segment_type_t::JOURNAL,
+                              segment_provider,
+                              segment_manager),
     record_submitter(crimson::common::get_conf<uint64_t>(
                        "seastore_journal_iodepth_limit"),
                      crimson::common::get_conf<uint64_t>(
@@ -50,7 +42,7 @@ SegmentedJournal::SegmentedJournal(
                        "seastore_journal_batch_flush_size"),
                      crimson::common::get_conf<double>(
                        "seastore_journal_batch_preferred_fullness"),
-                     journal_segment_manager),
+                     journal_segment_allocator),
     scanner(scanner)
 {
   register_metrics();
@@ -59,8 +51,8 @@ SegmentedJournal::SegmentedJournal(
 SegmentedJournal::open_for_write_ret SegmentedJournal::open_for_write()
 {
   LOG_PREFIX(Journal::open_for_write);
-  INFO("device_id={}", journal_segment_manager.get_device_id());
-  return journal_segment_manager.open();
+  INFO("device_id={}", journal_segment_allocator.get_device_id());
+  return journal_segment_allocator.open();
 }
 
 SegmentedJournal::close_ertr::future<> SegmentedJournal::close()
@@ -69,7 +61,7 @@ SegmentedJournal::close_ertr::future<> SegmentedJournal::close()
   INFO("closing, committed_to={}",
        record_submitter.get_committed_to());
   metrics.clear();
-  return journal_segment_manager.close();
+  return journal_segment_allocator.close();
 }
 
 SegmentedJournal::prep_replay_segments_fut
@@ -89,15 +81,15 @@ SegmentedJournal::prep_replay_segments(
        rt.second.journal_segment_seq;
     });
 
-  journal_segment_manager.set_segment_seq(
-    segments.rbegin()->second.journal_segment_seq);
+  journal_segment_allocator.set_next_segment_seq(
+    segments.rbegin()->second.journal_segment_seq + 1);
   std::for_each(
     segments.begin(),
     segments.end(),
     [this, FNAME](auto &seg)
   {
     if (seg.first != seg.second.physical_segment_id ||
-        seg.first.device_id() != journal_segment_manager.get_device_id() ||
+        seg.first.device_id() != journal_segment_allocator.get_device_id() ||
         seg.second.get_type() != segment_type_t::JOURNAL) {
       ERROR("illegal journal segment for replay -- {}", seg.second);
       ceph_abort();
@@ -124,7 +116,7 @@ SegmentedJournal::prep_replay_segments(
   } else {
     replay_from = paddr_t::make_seg_paddr(
       from->first,
-      journal_segment_manager.get_block_size());
+      journal_segment_allocator.get_block_size());
   }
 
   auto num_segments = segments.end() - from;
@@ -138,7 +130,7 @@ SegmentedJournal::prep_replay_segments(
        p.second.journal_segment_seq,
        paddr_t::make_seg_paddr(
          p.first,
-         journal_segment_manager.get_block_size())
+         journal_segment_allocator.get_block_size())
       };
       return std::make_pair(ret, p.second);
     });
@@ -254,10 +246,10 @@ SegmentedJournal::find_journal_segments()
       return crimson::do_for_each(
        boost::counting_iterator<device_segment_id_t>(0),
        boost::counting_iterator<device_segment_id_t>(
-         journal_segment_manager.get_num_segments()),
+         journal_segment_allocator.get_num_segments()),
        [this, &ret](device_segment_id_t d_segment_id) {
          segment_id_t segment_id{
-           journal_segment_manager.get_device_id(),
+           journal_segment_allocator.get_device_id(),
            d_segment_id};
          return scanner.read_segment_header(
            segment_id
@@ -367,154 +359,6 @@ void SegmentedJournal::register_metrics()
   );
 }
 
-SegmentedJournal::JournalSegmentManager::JournalSegmentManager(
-  SegmentManager& segment_manager,
-  SegmentProvider& segment_provider)
-  : segment_provider{segment_provider}, segment_manager{segment_manager}
-{
-  reset();
-}
-
-SegmentedJournal::JournalSegmentManager::open_ret
-SegmentedJournal::JournalSegmentManager::open()
-{
-  return roll().safe_then([this] {
-    return get_current_write_seq();
-  });
-}
-
-SegmentedJournal::JournalSegmentManager::close_ertr::future<>
-SegmentedJournal::JournalSegmentManager::close()
-{
-  LOG_PREFIX(JournalSegmentManager::close);
-  if (current_journal_segment) {
-    INFO("segment_id={}, seq={}, written_to={}, nonce={}",
-         current_journal_segment->get_segment_id(),
-         get_segment_seq(),
-         written_to,
-         current_segment_nonce);
-  } else {
-    INFO("no current journal segment");
-  }
-
-  return (
-    current_journal_segment ?
-    current_journal_segment->close() :
-    Segment::close_ertr::now()
-  ).handle_error(
-    close_ertr::pass_further{},
-    crimson::ct_error::assert_all{
-      "Invalid error in JournalSegmentManager::close()"
-    }
-  ).finally([this] {
-    reset();
-  });
-}
-
-SegmentedJournal::JournalSegmentManager::roll_ertr::future<>
-SegmentedJournal::JournalSegmentManager::roll()
-{
-  LOG_PREFIX(JournalSegmentManager::roll);
-  auto old_segment_id = current_journal_segment ?
-    current_journal_segment->get_segment_id() :
-    NULL_SEG_ID;
-  if (current_journal_segment) {
-    INFO("closing segment {}, seq={}, written_to={}, nonce={}",
-         old_segment_id,
-         get_segment_seq(),
-         written_to,
-         current_segment_nonce);
-  }
-
-  return (
-    current_journal_segment ?
-    current_journal_segment->close() :
-    Segment::close_ertr::now()
-  ).safe_then([this] {
-    auto new_segment_id = segment_provider->get_segment(
-        get_device_id(), next_journal_segment_seq);
-    return segment_manager.open(new_segment_id);
-  }).safe_then([this](auto sref) {
-    current_journal_segment = sref;
-    return initialize_segment(*current_journal_segment);
-  }).safe_then([this, old_segment_id] {
-    if (old_segment_id != NULL_SEG_ID) {
-      segment_provider.close_segment(old_segment_id);
-    }
-  }).handle_error(
-    roll_ertr::pass_further{},
-    crimson::ct_error::assert_all{
-      "Invalid error in JournalSegmentManager::roll"
-    }
-  );
-}
-
-SegmentedJournal::JournalSegmentManager::write_ret
-SegmentedJournal::JournalSegmentManager::write(ceph::bufferlist to_write)
-{
-  LOG_PREFIX(JournalSegmentManager::write);
-  auto write_length = to_write.length();
-  auto write_start_seq = get_current_write_seq();
-  TRACE("{}~{}", write_start_seq, write_length);
-  assert(write_length > 0);
-  assert((write_length % segment_manager.get_block_size()) == 0);
-  assert(!needs_roll(write_length));
-
-  auto write_start_offset = written_to;
-  written_to += write_length;
-  auto write_result = write_result_t{
-    write_start_seq,
-    static_cast<seastore_off_t>(write_length)
-  };
-  return current_journal_segment->write(
-    write_start_offset, to_write
-  ).handle_error(
-    write_ertr::pass_further{},
-    crimson::ct_error::assert_all{
-      "Invalid error in JournalSegmentManager::write"
-    }
-  ).safe_then([write_result] {
-    return write_result;
-  });
-}
-
-SegmentedJournal::JournalSegmentManager::initialize_segment_ertr::future<>
-SegmentedJournal::JournalSegmentManager::initialize_segment(Segment& segment)
-{
-  LOG_PREFIX(JournalSegmentManager::initialize_segment);
-  auto new_tail = segment_provider.get_journal_tail_target();
-  // write out header
-  ceph_assert(segment.get_write_ptr() == 0);
-  bufferlist bl;
-
-  segment_seq_t seq = next_journal_segment_seq++;
-  current_segment_nonce = generate_nonce(
-    seq, segment_manager.get_meta());
-  auto header = segment_header_t{
-    seq,
-    segment.get_segment_id(),
-    new_tail,
-    current_segment_nonce};
-  INFO("writing {} ...", header);
-  ceph_assert(header.get_type() == segment_type_t::JOURNAL);
-  encode(header, bl);
-
-  bufferptr bp(
-    ceph::buffer::create_page_aligned(
-      segment_manager.get_block_size()));
-  bp.zero();
-  auto iter = bl.cbegin();
-  iter.copy(bl.length(), bp.c_str());
-  bl.clear();
-  bl.append(bp);
-
-  written_to = 0;
-  return write(bl
-  ).safe_then([this, new_tail](auto) {
-    segment_provider.update_journal_tail_committed(new_tail);
-  });
-}
-
 SegmentedJournal::RecordBatch::add_pending_ret
 SegmentedJournal::RecordBatch::add_pending(
   record_t&& record,
@@ -630,10 +474,10 @@ SegmentedJournal::RecordSubmitter::RecordSubmitter(
   std::size_t batch_capacity,
   std::size_t batch_flush_size,
   double preferred_fullness,
-  JournalSegmentManager& jsm)
+  SegmentAllocator& jsa)
   : io_depth_limit{io_depth},
     preferred_fullness{preferred_fullness},
-    journal_segment_manager{jsm},
+    journal_segment_allocator{jsa},
     batches(new RecordBatch[io_depth + 1])
 {
   LOG_PREFIX(RecordSubmitter);
@@ -664,9 +508,9 @@ SegmentedJournal::RecordSubmitter::submit(
   assert(write_pipeline);
   auto expected_size = record_group_size_t(
       record.size,
-      journal_segment_manager.get_block_size()
+      journal_segment_allocator.get_block_size()
   ).get_encoded_length();
-  auto max_record_length = journal_segment_manager.get_max_write_length();
+  auto max_record_length = journal_segment_allocator.get_max_write_length();
   if (expected_size > max_record_length) {
     ERROR("H{} {} exceeds max record size {}",
           (void*)&handle, record, max_record_length);
@@ -755,11 +599,11 @@ void SegmentedJournal::RecordSubmitter::flush_current_batch()
   increment_io();
   auto num = p_batch->get_num_records();
   auto [to_write, sizes] = p_batch->encode_batch(
-    journal_committed_to, journal_segment_manager.get_nonce());
+    journal_committed_to, journal_segment_allocator.get_nonce());
   DEBUG("{} records, {}, committed_to={}, outstanding_io={} ...",
         num, sizes, journal_committed_to, num_outstanding_io);
   account_submission(num, sizes);
-  std::ignore = journal_segment_manager.write(to_write
+  std::ignore = journal_segment_allocator.write(to_write
   ).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) {
     TRACE("{} records, {}, write done with {}", num, sizes, write_result);
     finish_submit_batch(p_batch, write_result);
@@ -791,13 +635,13 @@ SegmentedJournal::RecordSubmitter::submit_pending(
       increment_io();
       auto [to_write, sizes] = p_current_batch->submit_pending_fast(
         std::move(record),
-        journal_segment_manager.get_block_size(),
+        journal_segment_allocator.get_block_size(),
         journal_committed_to,
-        journal_segment_manager.get_nonce());
+        journal_segment_allocator.get_nonce());
       DEBUG("H{} fast submit {}, committed_to={}, outstanding_io={} ...",
             (void*)&handle, sizes, journal_committed_to, num_outstanding_io);
       account_submission(1, sizes);
-      return journal_segment_manager.write(to_write
+      return journal_segment_allocator.write(to_write
       ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
         return record_locator_t{
           write_result.start_seq.offset.add_offset(mdlength),
@@ -811,7 +655,7 @@ SegmentedJournal::RecordSubmitter::submit_pending(
       auto write_fut = p_current_batch->add_pending(
         std::move(record),
         handle,
-        journal_segment_manager.get_block_size());
+        journal_segment_allocator.get_block_size());
       if (do_flush) {
         DEBUG("H{} added pending and flush", (void*)&handle);
         flush_current_batch();
@@ -851,15 +695,15 @@ SegmentedJournal::RecordSubmitter::do_submit(
     // can increment io depth
     assert(!wait_submit_promise.has_value());
     auto maybe_new_size = p_current_batch->can_batch(
-        record, journal_segment_manager.get_block_size());
+        record, journal_segment_allocator.get_block_size());
     if (!maybe_new_size.has_value() ||
         (maybe_new_size->get_encoded_length() >
-         journal_segment_manager.get_max_write_length())) {
+         journal_segment_allocator.get_max_write_length())) {
       TRACE("H{} flush", (void*)&handle);
       assert(p_current_batch->is_pending());
       flush_current_batch();
       return do_submit(std::move(record), handle);
-    } else if (journal_segment_manager.needs_roll(
+    } else if (journal_segment_allocator.needs_roll(
           maybe_new_size->get_encoded_length())) {
       if (p_current_batch->is_pending()) {
         TRACE("H{} flush and roll", (void*)&handle);
@@ -867,7 +711,7 @@ SegmentedJournal::RecordSubmitter::do_submit(
       } else {
         TRACE("H{} roll", (void*)&handle);
       }
-      return journal_segment_manager.roll(
+      return journal_segment_allocator.roll(
       ).safe_then([this, record=std::move(record), &handle]() mutable {
         return do_submit(std::move(record), handle);
       });
@@ -881,11 +725,11 @@ SegmentedJournal::RecordSubmitter::do_submit(
   assert(state == state_t::FULL);
   // cannot increment io depth
   auto maybe_new_size = p_current_batch->can_batch(
-      record, journal_segment_manager.get_block_size());
+      record, journal_segment_allocator.get_block_size());
   if (!maybe_new_size.has_value() ||
       (maybe_new_size->get_encoded_length() >
-       journal_segment_manager.get_max_write_length()) ||
-      journal_segment_manager.needs_roll(
+       journal_segment_allocator.get_max_write_length()) ||
+      journal_segment_allocator.needs_roll(
         maybe_new_size->get_encoded_length())) {
     if (!wait_submit_promise.has_value()) {
       wait_submit_promise = seastar::promise<>();
index 23e8dbb04719b9b9095f2af18fac738995f202a6..17e4056e7156936ab5de84c39c5a21f6cab84dca 100644 (file)
 #include "crimson/os/seastore/segment_cleaner.h"
 #include "crimson/os/seastore/journal.h"
 #include "crimson/os/seastore/extent_reader.h"
-#include "crimson/os/seastore/segment_manager.h"
 #include "crimson/os/seastore/ordering_handle.h"
 #include "crimson/os/seastore/seastore_types.h"
 #include "crimson/osd/exceptions.h"
+#include "segment_allocator.h"
 
 namespace crimson::os::seastore::journal {
 
@@ -58,96 +58,6 @@ public:
   }
 
 private:
-  class JournalSegmentManager {
-  public:
-    JournalSegmentManager(SegmentManager&, SegmentProvider&);
-
-    using base_ertr = crimson::errorator<
-        crimson::ct_error::input_output_error>;
-    extent_len_t get_max_write_length() const {
-      return segment_manager.get_segment_size() -
-             p2align(ceph::encoded_sizeof_bounded<segment_header_t>(),
-                     size_t(segment_manager.get_block_size()));
-    }
-
-    device_id_t get_device_id() const {
-      return segment_manager.get_device_id();
-    }
-
-    device_segment_id_t get_num_segments() const {
-      return segment_manager.get_num_segments();
-    }
-
-    seastore_off_t get_block_size() const {
-      return segment_manager.get_block_size();
-    }
-
-    segment_nonce_t get_nonce() const {
-      return current_segment_nonce;
-    }
-
-    segment_seq_t get_segment_seq() const {
-      return next_journal_segment_seq - 1;
-    }
-
-    void set_segment_seq(segment_seq_t current_seq) {
-      next_journal_segment_seq = (current_seq + 1);
-    }
-
-    using open_ertr = base_ertr;
-    using open_ret = open_ertr::future<journal_seq_t>;
-    open_ret open();
-
-    using close_ertr = base_ertr;
-    close_ertr::future<> close();
-
-    // returns true iff the current segment has insufficient space
-    bool needs_roll(std::size_t length) const {
-      auto write_capacity = current_journal_segment->get_write_capacity();
-      return length + written_to > std::size_t(write_capacity);
-    }
-
-    // close the current segment and initialize next one
-    using roll_ertr = base_ertr;
-    roll_ertr::future<> roll();
-
-    // write the buffer, return the write result
-    // May be called concurrently, writes may complete in any order.
-    using write_ertr = base_ertr;
-    using write_ret = write_ertr::future<write_result_t>;
-    write_ret write(ceph::bufferlist to_write);
-    
-  private:
-    journal_seq_t get_current_write_seq() const {
-      assert(current_journal_segment);
-      return journal_seq_t{
-        get_segment_seq(),
-        paddr_t::make_seg_paddr(current_journal_segment->get_segment_id(),
-         written_to)
-      };
-    }
-
-    void reset() {
-      next_journal_segment_seq = 0;
-      current_segment_nonce = 0;
-      current_journal_segment.reset();
-      written_to = 0;
-    }
-
-    // prepare segment for writes, writes out segment header
-    using initialize_segment_ertr = base_ertr;
-    initialize_segment_ertr::future<> initialize_segment(Segment&);
-
-    SegmentProvider& segment_provider;
-    SegmentManager& segment_manager;
-
-    segment_seq_t next_journal_segment_seq;
-    segment_nonce_t current_segment_nonce;
-
-    SegmentRef current_journal_segment;
-    seastore_off_t written_to;
-  };
-
   class RecordBatch {
     enum class state_t {
       EMPTY = 0,
@@ -212,7 +122,7 @@ private:
     //
     // Set write_result_t::write_length to 0 if the record is not the first one
     // in the batch.
-    using add_pending_ertr = JournalSegmentManager::write_ertr;
+    using add_pending_ertr = SegmentAllocator::write_ertr;
     using add_pending_ret = add_pending_ertr::future<record_locator_t>;
     add_pending_ret add_pending(
         record_t&&,
@@ -290,7 +200,7 @@ private:
                     std::size_t batch_capacity,
                     std::size_t batch_flush_size,
                     double preferred_fullness,
-                    JournalSegmentManager&);
+                    SegmentAllocator&);
 
     grouped_io_stats get_record_batch_stats() const {
       return stats.record_batch_stats;
@@ -355,7 +265,7 @@ private:
 
     void flush_current_batch();
 
-    using submit_pending_ertr = JournalSegmentManager::write_ertr;
+    using submit_pending_ertr = SegmentAllocator::write_ertr;
     using submit_pending_ret = submit_pending_ertr::future<
       record_locator_t>;
     submit_pending_ret submit_pending(
@@ -371,7 +281,7 @@ private:
     double preferred_fullness;
 
     WritePipeline* write_pipeline = nullptr;
-    JournalSegmentManager& journal_segment_manager;
+    SegmentAllocator& journal_segment_allocator;
     // committed_to may be in a previous journal segment
     journal_seq_t journal_committed_to = JOURNAL_SEQ_NULL;
 
@@ -392,7 +302,7 @@ private:
   };
 
   SegmentProvider& segment_provider;
-  JournalSegmentManager journal_segment_manager;
+  SegmentAllocator journal_segment_allocator;
   RecordSubmitter record_submitter;
   ExtentReader& scanner;
   seastar::metrics::metric_group metrics;