]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
seastore: seperate Journal interface from SegmentedJournal implementation 45089/head
authormyoungwon oh <myoungwon.oh@samsung.com>
Fri, 13 Aug 2021 08:15:54 +0000 (17:15 +0900)
committerSamuel Just <sjust@redhat.com>
Sun, 20 Feb 2022 23:45:29 +0000 (23:45 +0000)
A subsequent PR will introduce a CircularBoundedJournal implementation
for fast nvme devices.

SegmentCleaner no longer needs a reference to Journal, so dispense with
the set_segment_provider machinery and simply pass it in the
constructor.

Move responsibility for finding the journal segments into the journal
itself.  This does mean that we check the segment headers on the journal
device twice, but that should be a neglible amount of overhead on mount.

SegmentCleaner::init_segments no longer needs to return Journal
segments, so merge with mount().

Signed-off-by: Myoungwon Oh <myoungwon.oh@samsung.com>
Signed-off-by: Samuel Just <sjust@redhat.com>
13 files changed:
src/crimson/os/seastore/CMakeLists.txt
src/crimson/os/seastore/journal.cc
src/crimson/os/seastore/journal.h
src/crimson/os/seastore/journal/segmented_journal.cc [new file with mode: 0644]
src/crimson/os/seastore/journal/segmented_journal.h [new file with mode: 0644]
src/crimson/os/seastore/seastore.cc
src/crimson/os/seastore/segment_cleaner.cc
src/crimson/os/seastore/segment_cleaner.h
src/crimson/os/seastore/transaction_manager.cc
src/crimson/tools/store_nbd/tm_driver.cc
src/test/crimson/seastore/test_btree_lba_manager.cc
src/test/crimson/seastore/test_seastore_journal.cc
src/test/crimson/seastore/transaction_manager_test_state.h

index 55cfd425d84967c1f8405cca4644edc918090ca7..5664bcbdb66e5f2570c065ffa715ac6118ef8929 100644 (file)
@@ -6,7 +6,6 @@ set(crimson_seastore_srcs
   segment_manager/block.cc
   transaction_manager.cc
   transaction.cc
-  journal.cc
   cache.cc
   extent_reader.cc
   lba_manager.cc
@@ -39,6 +38,8 @@ set(crimson_seastore_srcs
   seastore.cc
   random_block_manager/nvme_manager.cc
   random_block_manager/nvmedevice.cc
+  journal/segmented_journal.cc
+  journal.cc
   ../../../test/crimson/seastore/test_block.cc
   ${PROJECT_SOURCE_DIR}/src/os/Transaction.cc
        )
index acba0e3ce411c6c1a88b63ac58fa4537e529c2ab..56e582390dc273f248bdd3ac350cf7c108de5991 100644 (file)
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 
-#include <iostream>
+#include "journal.h"
+#include "journal/segmented_journal.h"
 
-#include <boost/iterator/counting_iterator.hpp>
+namespace crimson::os::seastore::journal {
 
-#include "include/intarith.h"
-
-#include "crimson/common/config_proxy.h"
-#include "crimson/os/seastore/journal.h"
-#include "crimson/os/seastore/logging.h"
-#include "crimson/os/seastore/segment_cleaner.h"
-
-SET_SUBSYS(seastore_journal);
-/*
- * format:
- * - H<handle-addr> information
- *
- * levels:
- * - INFO:  major initiation, closing, rolling and replay operations
- * - DEBUG: INFO details, major submit operations
- * - TRACE: DEBUG details
- */
-
-namespace crimson::os::seastore {
-
-segment_nonce_t generate_nonce(
-  segment_seq_t seq,
-  const seastore_meta_t &meta)
-{
-  return ceph_crc32c(
-    seq,
-    reinterpret_cast<const unsigned char *>(meta.seastore_id.bytes()),
-    sizeof(meta.seastore_id.uuid));
-}
-
-Journal::Journal(
-  SegmentManager& segment_manager,
-  ExtentReader& scanner)
-  : journal_segment_manager(segment_manager),
-    record_submitter(crimson::common::get_conf<uint64_t>(
-                       "seastore_journal_iodepth_limit"),
-                     crimson::common::get_conf<uint64_t>(
-                       "seastore_journal_batch_capacity"),
-                     crimson::common::get_conf<Option::size_t>(
-                       "seastore_journal_batch_flush_size"),
-                     crimson::common::get_conf<double>(
-                       "seastore_journal_batch_preferred_fullness"),
-                     journal_segment_manager),
-    scanner(scanner)
-{
-  register_metrics();
-}
-
-Journal::open_for_write_ret Journal::open_for_write()
-{
-  LOG_PREFIX(Journal::open_for_write);
-  INFO("device_id={}", journal_segment_manager.get_device_id());
-  return journal_segment_manager.open();
-}
-
-Journal::close_ertr::future<> Journal::close()
-{
-  LOG_PREFIX(Journal::close);
-  INFO("closing");
-  metrics.clear();
-  return journal_segment_manager.close();
-}
-
-Journal::prep_replay_segments_fut
-Journal::prep_replay_segments(
-  std::vector<std::pair<segment_id_t, segment_header_t>> segments)
-{
-  LOG_PREFIX(Journal::prep_replay_segments);
-  if (segments.empty()) {
-    ERROR("no journal segments for replay");
-    return crimson::ct_error::input_output_error::make();
-  }
-  std::sort(
-    segments.begin(),
-    segments.end(),
-    [](const auto &lt, const auto &rt) {
-      return lt.second.journal_segment_seq <
-       rt.second.journal_segment_seq;
-    });
-
-  journal_segment_manager.set_segment_seq(
-    segments.rbegin()->second.journal_segment_seq);
-  std::for_each(
-    segments.begin(),
-    segments.end(),
-    [this, FNAME](auto &seg)
-  {
-    if (seg.first != seg.second.physical_segment_id ||
-        seg.first.device_id() != journal_segment_manager.get_device_id() ||
-        seg.second.get_type() != segment_type_t::JOURNAL) {
-      ERROR("illegal journal segment for replay -- {}", seg.second);
-      ceph_abort();
-    }
-  });
-
-  auto journal_tail = segments.rbegin()->second.journal_tail;
-  segment_provider->update_journal_tail_committed(journal_tail);
-  auto replay_from = journal_tail.offset;
-  auto from = segments.begin();
-  if (replay_from != P_ADDR_NULL) {
-    from = std::find_if(
-      segments.begin(),
-      segments.end(),
-      [&replay_from](const auto &seg) -> bool {
-       auto& seg_addr = replay_from.as_seg_paddr();
-       return seg.first == seg_addr.get_segment_id();
-      });
-    if (from->second.journal_segment_seq != journal_tail.segment_seq) {
-      ERROR("journal_tail {} does not match {}",
-            journal_tail, from->second);
-      ceph_abort();
-    }
-  } else {
-    replay_from = paddr_t::make_seg_paddr(
-      from->first,
-      journal_segment_manager.get_block_size());
-  }
-
-  auto num_segments = segments.end() - from;
-  INFO("{} segments to replay, from {}",
-       num_segments, replay_from);
-  auto ret = replay_segments_t(num_segments);
-  std::transform(
-    from, segments.end(), ret.begin(),
-    [this](const auto &p) {
-      auto ret = journal_seq_t{
-       p.second.journal_segment_seq,
-       paddr_t::make_seg_paddr(
-         p.first,
-         journal_segment_manager.get_block_size())
-      };
-      return std::make_pair(ret, p.second);
-    });
-  ret[0].first.offset = replay_from;
-  return prep_replay_segments_fut(
-    prep_replay_segments_ertr::ready_future_marker{},
-    std::move(ret));
-}
-
-Journal::replay_ertr::future<>
-Journal::replay_segment(
-  journal_seq_t seq,
-  segment_header_t header,
-  delta_handler_t &handler)
-{
-  LOG_PREFIX(Journal::replay_segment);
-  INFO("starting at {} -- {}", seq, header);
-  return seastar::do_with(
-    scan_valid_records_cursor(seq),
-    ExtentReader::found_record_handler_t([=, &handler](
-      record_locator_t locator,
-      const record_group_header_t& header,
-      const bufferlist& mdbuf)
-      -> ExtentReader::scan_valid_records_ertr::future<>
-    {
-      auto maybe_record_deltas_list = try_decode_deltas(
-          header, mdbuf, locator.record_block_base);
-      if (!maybe_record_deltas_list) {
-        // This should be impossible, we did check the crc on the mdbuf
-        ERROR("unable to decode deltas for record {} at {}",
-              header, locator);
-        return crimson::ct_error::input_output_error::make();
-      }
-
-      return seastar::do_with(
-        std::move(*maybe_record_deltas_list),
-        [write_result=locator.write_result,
-         this,
-         FNAME,
-         &handler](auto& record_deltas_list)
-      {
-        return crimson::do_for_each(
-          record_deltas_list,
-          [write_result,
-           this,
-           FNAME,
-           &handler](record_deltas_t& record_deltas)
-        {
-          auto locator = record_locator_t{
-            record_deltas.record_block_base,
-            write_result
-          };
-          DEBUG("processing {} deltas at block_base {}",
-                record_deltas.deltas.size(),
-                locator);
-          return crimson::do_for_each(
-            record_deltas.deltas,
-            [locator,
-             this,
-             FNAME,
-             &handler](delta_info_t& delta)
-          {
-            /* The journal may validly contain deltas for extents in
-             * since released segments.  We can detect those cases by
-             * checking whether the segment in question currently has a
-             * sequence number > the current journal segment seq. We can
-             * safetly skip these deltas because the extent must already
-             * have been rewritten.
-             */
-            if (delta.paddr != P_ADDR_NULL) {
-              auto& seg_addr = delta.paddr.as_seg_paddr();
-              auto delta_paddr_segment_seq = segment_provider->get_seq(seg_addr.get_segment_id());
-              auto delta_paddr_segment_type = segment_seq_to_type(delta_paddr_segment_seq);
-              auto locator_segment_seq = locator.write_result.start_seq.segment_seq;
-              if (delta_paddr_segment_type == segment_type_t::NULL_SEG ||
-                  (delta_paddr_segment_type == segment_type_t::JOURNAL &&
-                   delta_paddr_segment_seq > locator_segment_seq)) {
-                SUBDEBUG(seastore_cache,
-                         "delta is obsolete, delta_paddr_segment_seq={}, locator_segment_seq={} -- {}",
-                         segment_seq_printer_t{delta_paddr_segment_seq},
-                         segment_seq_printer_t{locator_segment_seq},
-                         delta);
-                return replay_ertr::now();
-              }
-            }
-            return handler(locator, delta);
-          });
-        });
-      });
-    }),
-    [=](auto &cursor, auto &dhandler) {
-      return scanner.scan_valid_records(
-       cursor,
-       header.segment_nonce,
-       std::numeric_limits<size_t>::max(),
-       dhandler).safe_then([](auto){}
-      ).handle_error(
-       replay_ertr::pass_further{},
-       crimson::ct_error::assert_all{
-         "shouldn't meet with any other error other replay_ertr"
-       }
-      );
-    }
-  );
-}
-
-Journal::replay_ret Journal::replay(
-  std::vector<std::pair<segment_id_t, segment_header_t>>&& segment_headers,
-  delta_handler_t &&delta_handler)
-{
-  LOG_PREFIX(Journal::replay);
-  INFO("got {} segments", segment_headers.size());
-  return seastar::do_with(
-    std::move(delta_handler), replay_segments_t(),
-    [this, segment_headers=std::move(segment_headers)]
-    (auto &handler, auto &segments) mutable -> replay_ret
-  {
-    return prep_replay_segments(std::move(segment_headers)
-    ).safe_then([this, &handler, &segments](auto replay_segs) mutable {
-      segments = std::move(replay_segs);
-      return crimson::do_for_each(segments, [this, &handler](auto i) mutable {
-        return replay_segment(i.first, i.second, handler);
-      });
-    });
-  });
-}
-
-void Journal::register_metrics()
-{
-  LOG_PREFIX(Journal::register_metrics);
-  DEBUG("");
-  record_submitter.reset_stats();
-  namespace sm = seastar::metrics;
-  metrics.add_group(
-    "journal",
-    {
-      sm::make_counter(
-        "record_num",
-        [this] {
-          return record_submitter.get_record_batch_stats().num_io;
-        },
-        sm::description("total number of records submitted")
-      ),
-      sm::make_counter(
-        "record_batch_num",
-        [this] {
-          return record_submitter.get_record_batch_stats().num_io_grouped;
-        },
-        sm::description("total number of records batched")
-      ),
-      sm::make_counter(
-        "io_num",
-        [this] {
-          return record_submitter.get_io_depth_stats().num_io;
-        },
-        sm::description("total number of io submitted")
-      ),
-      sm::make_counter(
-        "io_depth_num",
-        [this] {
-          return record_submitter.get_io_depth_stats().num_io_grouped;
-        },
-        sm::description("total number of io depth")
-      ),
-      sm::make_counter(
-        "record_group_padding_bytes",
-        [this] {
-          return record_submitter.get_record_group_padding_bytes();
-        },
-        sm::description("bytes of metadata padding when write record groups")
-      ),
-      sm::make_counter(
-        "record_group_metadata_bytes",
-        [this] {
-          return record_submitter.get_record_group_metadata_bytes();
-        },
-        sm::description("bytes of raw metadata when write record groups")
-      ),
-      sm::make_counter(
-        "record_group_data_bytes",
-        [this] {
-          return record_submitter.get_record_group_data_bytes();
-        },
-        sm::description("bytes of data when write record groups")
-      ),
-    }
-  );
-}
-
-Journal::JournalSegmentManager::JournalSegmentManager(
-  SegmentManager& segment_manager)
-  : segment_manager{segment_manager}
-{
-  reset();
-}
-
-Journal::JournalSegmentManager::open_ret
-Journal::JournalSegmentManager::open()
-{
-  return roll().safe_then([this] {
-    return get_current_write_seq();
-  });
-}
-
-Journal::JournalSegmentManager::close_ertr::future<>
-Journal::JournalSegmentManager::close()
-{
-  LOG_PREFIX(JournalSegmentManager::close);
-  if (current_journal_segment) {
-    INFO("segment_id={}, seq={}, "
-         "written_to={}, committed_to={}, nonce={}",
-         current_journal_segment->get_segment_id(),
-         get_segment_seq(),
-         written_to,
-         committed_to,
-         current_segment_nonce);
-  } else {
-    INFO("no current journal segment");
-  }
-
-  return (
-    current_journal_segment ?
-    current_journal_segment->close() :
-    Segment::close_ertr::now()
-  ).handle_error(
-    close_ertr::pass_further{},
-    crimson::ct_error::assert_all{
-      "Invalid error in JournalSegmentManager::close()"
-    }
-  ).finally([this] {
-    reset();
-  });
-}
-
-Journal::JournalSegmentManager::roll_ertr::future<>
-Journal::JournalSegmentManager::roll()
-{
-  LOG_PREFIX(JournalSegmentManager::roll);
-  auto old_segment_id = current_journal_segment ?
-    current_journal_segment->get_segment_id() :
-    NULL_SEG_ID;
-  if (current_journal_segment) {
-    INFO("closing segment {}, seq={}, "
-         "written_to={}, committed_to={}, nonce={}",
-         old_segment_id,
-         get_segment_seq(),
-         written_to,
-         committed_to,
-         current_segment_nonce);
-  }
-
-  return (
-    current_journal_segment ?
-    current_journal_segment->close() :
-    Segment::close_ertr::now()
-  ).safe_then([this] {
-    return segment_provider->get_segment(
-        get_device_id(), next_journal_segment_seq);
-  }).safe_then([this](auto segment) {
-    return segment_manager.open(segment);
-  }).safe_then([this](auto sref) {
-    current_journal_segment = sref;
-    return initialize_segment(*current_journal_segment);
-  }).safe_then([this, old_segment_id] {
-    if (old_segment_id != NULL_SEG_ID) {
-      segment_provider->close_segment(old_segment_id);
-    }
-  }).handle_error(
-    roll_ertr::pass_further{},
-    crimson::ct_error::assert_all{
-      "Invalid error in JournalSegmentManager::roll"
-    }
-  );
-}
-
-Journal::JournalSegmentManager::write_ret
-Journal::JournalSegmentManager::write(ceph::bufferlist to_write)
-{
-  LOG_PREFIX(JournalSegmentManager::write);
-  auto write_length = to_write.length();
-  auto write_start_seq = get_current_write_seq();
-  TRACE("{}~{}", write_start_seq, write_length);
-  assert(write_length > 0);
-  assert((write_length % segment_manager.get_block_size()) == 0);
-  assert(!needs_roll(write_length));
-
-  auto write_start_offset = written_to;
-  written_to += write_length;
-  auto write_result = write_result_t{
-    write_start_seq,
-    static_cast<seastore_off_t>(write_length)
-  };
-  return current_journal_segment->write(
-    write_start_offset, to_write
-  ).handle_error(
-    write_ertr::pass_further{},
-    crimson::ct_error::assert_all{
-      "Invalid error in JournalSegmentManager::write"
-    }
-  ).safe_then([write_result] {
-    return write_result;
-  });
-}
-
-void Journal::JournalSegmentManager::mark_committed(
-  const journal_seq_t& new_committed_to)
-{
-  LOG_PREFIX(JournalSegmentManager::mark_committed);
-  TRACE("{} => {}", committed_to, new_committed_to);
-  assert(committed_to == journal_seq_t() ||
-         committed_to <= new_committed_to);
-  committed_to = new_committed_to;
-}
-
-Journal::JournalSegmentManager::initialize_segment_ertr::future<>
-Journal::JournalSegmentManager::initialize_segment(Segment& segment)
-{
-  LOG_PREFIX(JournalSegmentManager::initialize_segment);
-  auto new_tail = segment_provider->get_journal_tail_target();
-  // write out header
-  ceph_assert(segment.get_write_ptr() == 0);
-  bufferlist bl;
-
-  segment_seq_t seq = next_journal_segment_seq++;
-  current_segment_nonce = generate_nonce(
-    seq, segment_manager.get_meta());
-  auto header = segment_header_t{
-    seq,
-    segment.get_segment_id(),
-    new_tail,
-    current_segment_nonce};
-  INFO("writing {} ...", header);
-  ceph_assert(header.get_type() == segment_type_t::JOURNAL);
-  encode(header, bl);
-
-  bufferptr bp(
-    ceph::buffer::create_page_aligned(
-      segment_manager.get_block_size()));
-  bp.zero();
-  auto iter = bl.cbegin();
-  iter.copy(bl.length(), bp.c_str());
-  bl.clear();
-  bl.append(bp);
-
-  written_to = 0;
-  return write(bl
-  ).safe_then([this, new_tail](auto) {
-    segment_provider->update_journal_tail_committed(new_tail);
-  });
-}
-
-Journal::RecordBatch::add_pending_ret
-Journal::RecordBatch::add_pending(
-  record_t&& record,
-  OrderingHandle& handle,
-  extent_len_t block_size)
-{
-  LOG_PREFIX(RecordBatch::add_pending);
-  auto new_size = get_encoded_length_after(record, block_size);
-  auto dlength_offset = pending.size.dlength;
-  TRACE("H{} batches={}, write_size={}, dlength_offset={} ...",
-        (void*)&handle,
-        pending.get_size() + 1,
-        new_size.get_encoded_length(),
-        dlength_offset);
-  assert(state != state_t::SUBMITTING);
-  assert(can_batch(record, block_size).value() == new_size);
-
-  pending.push_back(
-      std::move(record), block_size);
-  assert(pending.size == new_size);
-  if (state == state_t::EMPTY) {
-    assert(!io_promise.has_value());
-    io_promise = seastar::shared_promise<maybe_promise_result_t>();
-  } else {
-    assert(io_promise.has_value());
-  }
-  state = state_t::PENDING;
-
-  return io_promise->get_shared_future(
-  ).then([dlength_offset, FNAME, &handle
-         ](auto maybe_promise_result) -> add_pending_ret {
-    if (!maybe_promise_result.has_value()) {
-      ERROR("H{} write failed", (void*)&handle);
-      return crimson::ct_error::input_output_error::make();
-    }
-    auto write_result = maybe_promise_result->write_result;
-    auto submit_result = record_locator_t{
-      write_result.start_seq.offset.add_offset(
-          maybe_promise_result->mdlength + dlength_offset),
-      write_result
-    };
-    TRACE("H{} write finish with {}", (void*)&handle, submit_result);
-    return add_pending_ret(
-      add_pending_ertr::ready_future_marker{},
-      submit_result);
-  });
-}
-
-std::pair<ceph::bufferlist, record_group_size_t>
-Journal::RecordBatch::encode_batch(
-  const journal_seq_t& committed_to,
-  segment_nonce_t segment_nonce)
-{
-  assert(state == state_t::PENDING);
-  assert(pending.get_size() > 0);
-  assert(io_promise.has_value());
-
-  state = state_t::SUBMITTING;
-  submitting_size = pending.get_size();
-  auto gsize = pending.size;
-  submitting_length = gsize.get_encoded_length();
-  submitting_mdlength = gsize.get_mdlength();
-  auto bl = encode_records(pending, committed_to, segment_nonce);
-  // Note: pending is cleared here
-  assert(bl.length() == (std::size_t)submitting_length);
-  return std::make_pair(bl, gsize);
-}
-
-void Journal::RecordBatch::set_result(
-  maybe_result_t maybe_write_result)
-{
-  maybe_promise_result_t result;
-  if (maybe_write_result.has_value()) {
-    assert(maybe_write_result->length == submitting_length);
-    result = promise_result_t{
-      *maybe_write_result,
-      submitting_mdlength
-    };
-  }
-  assert(state == state_t::SUBMITTING);
-  assert(io_promise.has_value());
-
-  state = state_t::EMPTY;
-  submitting_size = 0;
-  submitting_length = 0;
-  submitting_mdlength = 0;
-  io_promise->set_value(result);
-  io_promise.reset();
-}
-
-std::pair<ceph::bufferlist, record_group_size_t>
-Journal::RecordBatch::submit_pending_fast(
-  record_t&& record,
-  extent_len_t block_size,
-  const journal_seq_t& committed_to,
-  segment_nonce_t segment_nonce)
-{
-  auto new_size = get_encoded_length_after(record, block_size);
-  std::ignore = new_size;
-  assert(state == state_t::EMPTY);
-  assert(can_batch(record, block_size).value() == new_size);
-
-  auto group = record_group_t(std::move(record), block_size);
-  auto size = group.size;
-  assert(size == new_size);
-  auto bl = encode_records(group, committed_to, segment_nonce);
-  assert(bl.length() == size.get_encoded_length());
-  return std::make_pair(bl, size);
-}
-
-Journal::RecordSubmitter::RecordSubmitter(
-  std::size_t io_depth,
-  std::size_t batch_capacity,
-  std::size_t batch_flush_size,
-  double preferred_fullness,
-  JournalSegmentManager& jsm)
-  : io_depth_limit{io_depth},
-    preferred_fullness{preferred_fullness},
-    journal_segment_manager{jsm},
-    batches(new RecordBatch[io_depth + 1])
-{
-  LOG_PREFIX(RecordSubmitter);
-  INFO("Journal::RecordSubmitter: io_depth_limit={}, "
-       "batch_capacity={}, batch_flush_size={}, "
-       "preferred_fullness={}",
-       io_depth, batch_capacity,
-       batch_flush_size, preferred_fullness);
-  ceph_assert(io_depth > 0);
-  ceph_assert(batch_capacity > 0);
-  ceph_assert(preferred_fullness >= 0 &&
-              preferred_fullness <= 1);
-  free_batch_ptrs.reserve(io_depth + 1);
-  for (std::size_t i = 0; i <= io_depth; ++i) {
-    batches[i].initialize(i, batch_capacity, batch_flush_size);
-    free_batch_ptrs.push_back(&batches[i]);
-  }
-  pop_free_batch();
-}
-
-Journal::RecordSubmitter::submit_ret
-Journal::RecordSubmitter::submit(
-  record_t&& record,
-  OrderingHandle& handle)
-{
-  LOG_PREFIX(RecordSubmitter::submit);
-  DEBUG("H{} {} start ...", (void*)&handle, record);
-  assert(write_pipeline);
-  auto expected_size = record_group_size_t(
-      record.size,
-      journal_segment_manager.get_block_size()
-  ).get_encoded_length();
-  auto max_record_length = journal_segment_manager.get_max_write_length();
-  if (expected_size > max_record_length) {
-    ERROR("H{} {} exceeds max record size {}",
-          (void*)&handle, record, max_record_length);
-    return crimson::ct_error::erange::make();
-  }
-
-  return do_submit(std::move(record), handle);
-}
-
-seastar::future<> Journal::RecordSubmitter::flush(OrderingHandle &handle)
-{
-  LOG_PREFIX(RecordSubmitter::flush);
-  DEBUG("H{} flush", (void*)&handle);
-  return handle.enter(write_pipeline->device_submission
-  ).then([this, &handle] {
-    return handle.enter(write_pipeline->finalize);
-  }).then([FNAME, &handle] {
-    DEBUG("H{} flush done", (void*)&handle);
-  });
-}
-
-void Journal::RecordSubmitter::update_state()
-{
-  if (num_outstanding_io == 0) {
-    state = state_t::IDLE;
-  } else if (num_outstanding_io < io_depth_limit) {
-    state = state_t::PENDING;
-  } else if (num_outstanding_io == io_depth_limit) {
-    state = state_t::FULL;
-  } else {
-    ceph_abort("fatal error: io-depth overflow");
-  }
-}
-
-void Journal::RecordSubmitter::decrement_io_with_flush()
+JournalRef make_segmented(
+  SegmentManager &sm,
+  ExtentReader &reader,
+  SegmentProvider &provider)
 {
-  LOG_PREFIX(RecordSubmitter::decrement_io_with_flush);
-  assert(num_outstanding_io > 0);
-  --num_outstanding_io;
-#ifndef NDEBUG
-  auto prv_state = state;
-#endif
-  update_state();
-
-  if (wait_submit_promise.has_value()) {
-    DEBUG("wait resolved");
-    assert(prv_state == state_t::FULL);
-    wait_submit_promise->set_value();
-    wait_submit_promise.reset();
-  }
-
-  if (!p_current_batch->is_empty()) {
-    TRACE("flush");
-    flush_current_batch();
-  }
-}
-
-void Journal::RecordSubmitter::account_submission(
-  std::size_t num,
-  const record_group_size_t& size)
-{
-  stats.record_group_padding_bytes +=
-    (size.get_mdlength() - size.get_raw_mdlength());
-  stats.record_group_metadata_bytes += size.get_raw_mdlength();
-  stats.record_group_data_bytes += size.dlength;
-}
-
-void Journal::RecordSubmitter::finish_submit_batch(
-  RecordBatch* p_batch,
-  maybe_result_t maybe_result)
-{
-  assert(p_batch->is_submitting());
-  p_batch->set_result(maybe_result);
-  free_batch_ptrs.push_back(p_batch);
-  decrement_io_with_flush();
-}
-
-void Journal::RecordSubmitter::flush_current_batch()
-{
-  LOG_PREFIX(RecordSubmitter::flush_current_batch);
-  RecordBatch* p_batch = p_current_batch;
-  assert(p_batch->is_pending());
-  p_current_batch = nullptr;
-  pop_free_batch();
-
-  increment_io();
-  auto num = p_batch->get_num_records();
-  auto committed_to = journal_segment_manager.get_committed_to();
-  auto [to_write, sizes] = p_batch->encode_batch(
-    committed_to, journal_segment_manager.get_nonce());
-  DEBUG("{} records, {}, committed_to={}, outstanding_io={} ...",
-        num, sizes, committed_to, num_outstanding_io);
-  account_submission(num, sizes);
-  std::ignore = journal_segment_manager.write(to_write
-  ).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) {
-    TRACE("{} records, {}, write done with {}", num, sizes, write_result);
-    finish_submit_batch(p_batch, write_result);
-  }).handle_error(
-    crimson::ct_error::all_same_way([this, p_batch, FNAME, num, sizes=sizes](auto e) {
-      ERROR("{} records, {}, got error {}", num, sizes, e);
-      finish_submit_batch(p_batch, std::nullopt);
-    })
-  ).handle_exception([this, p_batch, FNAME, num, sizes=sizes](auto e) {
-    ERROR("{} records, {}, got exception {}", num, sizes, e);
-    finish_submit_batch(p_batch, std::nullopt);
-  });
-}
-
-Journal::RecordSubmitter::submit_pending_ret
-Journal::RecordSubmitter::submit_pending(
-  record_t&& record,
-  OrderingHandle& handle,
-  bool flush)
-{
-  LOG_PREFIX(RecordSubmitter::submit_pending);
-  assert(!p_current_batch->is_submitting());
-  stats.record_batch_stats.increment(
-      p_current_batch->get_num_records() + 1);
-  bool do_flush = (flush || state == state_t::IDLE);
-  auto write_fut = [this, do_flush, FNAME, record=std::move(record), &handle]() mutable {
-    if (do_flush && p_current_batch->is_empty()) {
-      // fast path with direct write
-      increment_io();
-      auto committed_to = journal_segment_manager.get_committed_to();
-      auto [to_write, sizes] = p_current_batch->submit_pending_fast(
-        std::move(record),
-        journal_segment_manager.get_block_size(),
-        committed_to,
-        journal_segment_manager.get_nonce());
-      DEBUG("H{} fast submit {}, committed_to={}, outstanding_io={} ...",
-            (void*)&handle, sizes, committed_to, num_outstanding_io);
-      account_submission(1, sizes);
-      return journal_segment_manager.write(to_write
-      ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
-        return record_locator_t{
-          write_result.start_seq.offset.add_offset(mdlength),
-          write_result
-        };
-      }).finally([this] {
-        decrement_io_with_flush();
-      });
-    } else {
-      // indirect write with or without the existing pending records
-      auto write_fut = p_current_batch->add_pending(
-        std::move(record),
-        handle,
-        journal_segment_manager.get_block_size());
-      if (do_flush) {
-        DEBUG("H{} added pending and flush", (void*)&handle);
-        flush_current_batch();
-      } else {
-        DEBUG("H{} added with {} pending",
-              (void*)&handle, p_current_batch->get_num_records());
-      }
-      return write_fut;
-    }
-  }();
-  return handle.enter(write_pipeline->device_submission
-  ).then([write_fut=std::move(write_fut)]() mutable {
-    return std::move(write_fut);
-  }).safe_then([this, FNAME, &handle](auto submit_result) {
-    return handle.enter(write_pipeline->finalize
-    ).then([this, FNAME, submit_result, &handle] {
-      DEBUG("H{} finish with {}", (void*)&handle, submit_result);
-      journal_segment_manager.mark_committed(
-          submit_result.write_result.get_end_seq());
-      return submit_result;
-    });
-  });
-}
-
-Journal::RecordSubmitter::do_submit_ret
-Journal::RecordSubmitter::do_submit(
-  record_t&& record,
-  OrderingHandle& handle)
-{
-  LOG_PREFIX(RecordSubmitter::do_submit);
-  TRACE("H{} outstanding_io={}/{} ...",
-        (void*)&handle, num_outstanding_io, io_depth_limit);
-  assert(!p_current_batch->is_submitting());
-  if (state <= state_t::PENDING) {
-    // can increment io depth
-    assert(!wait_submit_promise.has_value());
-    auto maybe_new_size = p_current_batch->can_batch(
-        record, journal_segment_manager.get_block_size());
-    if (!maybe_new_size.has_value() ||
-        (maybe_new_size->get_encoded_length() >
-         journal_segment_manager.get_max_write_length())) {
-      TRACE("H{} flush", (void*)&handle);
-      assert(p_current_batch->is_pending());
-      flush_current_batch();
-      return do_submit(std::move(record), handle);
-    } else if (journal_segment_manager.needs_roll(
-          maybe_new_size->get_encoded_length())) {
-      if (p_current_batch->is_pending()) {
-        TRACE("H{} flush and roll", (void*)&handle);
-        flush_current_batch();
-      } else {
-        TRACE("H{} roll", (void*)&handle);
-      }
-      return journal_segment_manager.roll(
-      ).safe_then([this, record=std::move(record), &handle]() mutable {
-        return do_submit(std::move(record), handle);
-      });
-    } else {
-      bool flush = (maybe_new_size->get_fullness() > preferred_fullness ?
-                    true : false);
-      return submit_pending(std::move(record), handle, flush);
-    }
-  }
-
-  assert(state == state_t::FULL);
-  // cannot increment io depth
-  auto maybe_new_size = p_current_batch->can_batch(
-      record, journal_segment_manager.get_block_size());
-  if (!maybe_new_size.has_value() ||
-      (maybe_new_size->get_encoded_length() >
-       journal_segment_manager.get_max_write_length()) ||
-      journal_segment_manager.needs_roll(
-        maybe_new_size->get_encoded_length())) {
-    if (!wait_submit_promise.has_value()) {
-      wait_submit_promise = seastar::promise<>();
-    }
-    DEBUG("H{} wait ...", (void*)&handle);
-    return wait_submit_promise->get_future(
-    ).then([this, record=std::move(record), &handle]() mutable {
-      return do_submit(std::move(record), handle);
-    });
-  } else {
-    return submit_pending(std::move(record), handle, false);
-  }
+  return std::make_unique<SegmentedJournal>(sm, reader, provider);
 }
 
 }
index 538e8f7a973fc806512fce2a0f4f747065a5a096..904d794d4a23d03522fee7f498ad90ddfb2521a1 100644 (file)
@@ -3,57 +3,23 @@
 
 #pragma once
 
-#include <boost/intrusive_ptr.hpp>
-#include <optional>
+#include <memory>
 
-#include <seastar/core/circular_buffer.hh>
-#include <seastar/core/future.hh>
-#include <seastar/core/metrics.hh>
-#include <seastar/core/shared_future.hh>
-
-#include "include/ceph_assert.h"
-#include "include/buffer.h"
-#include "include/denc.h"
-
-#include "crimson/os/seastore/extent_reader.h"
-#include "crimson/os/seastore/segment_manager.h"
 #include "crimson/os/seastore/ordering_handle.h"
 #include "crimson/os/seastore/seastore_types.h"
-#include "crimson/osd/exceptions.h"
 
 namespace crimson::os::seastore {
 
+namespace nvme_device {
+class NVMeBlockDevice;
+}
+
+class SegmentManager;
+class ExtentReader;
 class SegmentProvider;
-class SegmentedAllocator;
 
-/**
- * Manages stream of atomically written records to a SegmentManager.
- */
 class Journal {
 public:
-  Journal(SegmentManager &segment_manager, ExtentReader& scanner);
-
-  /**
-   * Gets the current journal segment sequence.
-   */
-  segment_seq_t get_segment_seq() const {
-    return journal_segment_manager.get_segment_seq();
-  }
-
-  /**
-   * Sets the SegmentProvider.
-   *
-   * Not provided in constructor to allow the provider to not own
-   * or construct the Journal (TransactionManager).
-   *
-   * Note, Journal does not own this ptr, user must ensure that
-   * *provider outlives Journal.
-   */
-  void set_segment_provider(SegmentProvider *provider) {
-    segment_provider = provider;
-    journal_segment_manager.set_segment_provider(provider);
-  }
-
   /**
    * initializes journal for new writes -- must run prior to calls
    * to submit_record.  Should be called after replay if not a new
@@ -63,16 +29,12 @@ public:
     crimson::ct_error::input_output_error
     >;
   using open_for_write_ret = open_for_write_ertr::future<journal_seq_t>;
-  open_for_write_ret open_for_write();
+  virtual open_for_write_ret open_for_write() = 0;
 
-  /**
-   * close journal
-   *
-   * TODO: should probably flush and disallow further writes
-   */
+  /// close journal
   using close_ertr = crimson::errorator<
     crimson::ct_error::input_output_error>;
-  close_ertr::future<> close();
+  virtual close_ertr::future<> close() = 0;
 
   /**
    * submit_record
@@ -86,12 +48,10 @@ public:
   using submit_record_ret = submit_record_ertr::future<
     record_locator_t
     >;
-  submit_record_ret submit_record(
+  virtual submit_record_ret submit_record(
     record_t &&record,
     OrderingHandle &handle
-  ) {
-    return record_submitter.submit(std::move(record), handle);
-  }
+  ) = 0;
 
   /**
    * flush
@@ -100,9 +60,10 @@ public:
    * Note, flush() machinery must go through the same pipeline
    * stages and locks as submit_record.
    */
-  seastar::future<> flush(OrderingHandle &handle) {
-    return record_submitter.flush(handle);
-  }
+  virtual seastar::future<> flush(OrderingHandle &handle) = 0;
+
+  /// sets write pipeline reference
+  virtual void set_write_pipeline(WritePipeline *_write_pipeline) = 0;
 
   /**
    * Read deltas and pass to delta_handler
@@ -110,383 +71,29 @@ public:
    * record_block_start (argument to delta_handler) is the start of the
    * of the first block in the record
    */
-  using replay_ertr = SegmentManager::read_ertr;
+  using replay_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error,
+    crimson::ct_error::invarg,
+    crimson::ct_error::enoent,
+    crimson::ct_error::erange>;
   using replay_ret = replay_ertr::future<>;
   using delta_handler_t = std::function<
     replay_ret(const record_locator_t&,
               const delta_info_t&)>;
-  replay_ret replay(
-    std::vector<std::pair<segment_id_t, segment_header_t>>&& segment_headers,
-    delta_handler_t &&delta_handler);
-
-  void set_write_pipeline(WritePipeline* write_pipeline) {
-    record_submitter.set_write_pipeline(write_pipeline);
-  }
-
-private:
-  class JournalSegmentManager {
-  public:
-    JournalSegmentManager(SegmentManager&);
-
-    using base_ertr = crimson::errorator<
-        crimson::ct_error::input_output_error>;
-    extent_len_t get_max_write_length() const {
-      return segment_manager.get_segment_size() -
-             p2align(ceph::encoded_sizeof_bounded<segment_header_t>(),
-                     size_t(segment_manager.get_block_size()));
-    }
-
-    device_id_t get_device_id() const {
-      return segment_manager.get_device_id();
-    }
-
-    seastore_off_t get_block_size() const {
-      return segment_manager.get_block_size();
-    }
-
-    segment_nonce_t get_nonce() const {
-      return current_segment_nonce;
-    }
-
-    journal_seq_t get_committed_to() const {
-      return committed_to;
-    }
-
-    segment_seq_t get_segment_seq() const {
-      return next_journal_segment_seq - 1;
-    }
-
-    void set_segment_provider(SegmentProvider* provider) {
-      segment_provider = provider;
-    }
-
-    void set_segment_seq(segment_seq_t current_seq) {
-      next_journal_segment_seq = (current_seq + 1);
-    }
-
-    using open_ertr = base_ertr;
-    using open_ret = open_ertr::future<journal_seq_t>;
-    open_ret open();
-
-    using close_ertr = base_ertr;
-    close_ertr::future<> close();
-
-    // returns true iff the current segment has insufficient space
-    bool needs_roll(std::size_t length) const {
-      auto write_capacity = current_journal_segment->get_write_capacity();
-      return length + written_to > std::size_t(write_capacity);
-    }
-
-    // close the current segment and initialize next one
-    using roll_ertr = base_ertr;
-    roll_ertr::future<> roll();
-
-    // write the buffer, return the write result
-    // May be called concurrently, writes may complete in any order.
-    using write_ertr = base_ertr;
-    using write_ret = write_ertr::future<write_result_t>;
-    write_ret write(ceph::bufferlist to_write);
-
-    // mark write committed in order
-    void mark_committed(const journal_seq_t& new_committed_to);
-
-  private:
-    journal_seq_t get_current_write_seq() const {
-      assert(current_journal_segment);
-      return journal_seq_t{
-        get_segment_seq(),
-        paddr_t::make_seg_paddr(current_journal_segment->get_segment_id(),
-         written_to)
-      };
-    }
-
-    void reset() {
-      next_journal_segment_seq = 0;
-      current_segment_nonce = 0;
-      current_journal_segment.reset();
-      written_to = 0;
-      committed_to = {};
-    }
-
-    // prepare segment for writes, writes out segment header
-    using initialize_segment_ertr = base_ertr;
-    initialize_segment_ertr::future<> initialize_segment(Segment&);
-
-    SegmentProvider* segment_provider;
-    SegmentManager& segment_manager;
-
-    segment_seq_t next_journal_segment_seq;
-    segment_nonce_t current_segment_nonce;
-
-    SegmentRef current_journal_segment;
-    seastore_off_t written_to;
-    // committed_to may be in a previous journal segment
-    journal_seq_t committed_to;
-  };
-
-  class RecordBatch {
-    enum class state_t {
-      EMPTY = 0,
-      PENDING,
-      SUBMITTING
-    };
-
-  public:
-    RecordBatch() = default;
-    RecordBatch(RecordBatch&&) = delete;
-    RecordBatch(const RecordBatch&) = delete;
-    RecordBatch& operator=(RecordBatch&&) = delete;
-    RecordBatch& operator=(const RecordBatch&) = delete;
-
-    bool is_empty() const {
-      return state == state_t::EMPTY;
-    }
-
-    bool is_pending() const {
-      return state == state_t::PENDING;
-    }
-
-    bool is_submitting() const {
-      return state == state_t::SUBMITTING;
-    }
+  virtual replay_ret replay(
+    delta_handler_t &&delta_handler) = 0;
 
-    std::size_t get_index() const {
-      return index;
-    }
-
-    std::size_t get_num_records() const {
-      return pending.get_size();
-    }
-
-    // return the expected write sizes if allows to batch,
-    // otherwise, return nullopt
-    std::optional<record_group_size_t> can_batch(
-        const record_t& record,
-        extent_len_t block_size) const {
-      assert(state != state_t::SUBMITTING);
-      if (pending.get_size() >= batch_capacity ||
-          (pending.get_size() > 0 &&
-           pending.size.get_encoded_length() > batch_flush_size)) {
-        assert(state == state_t::PENDING);
-        return std::nullopt;
-      }
-      return get_encoded_length_after(record, block_size);
-    }
-
-    void initialize(std::size_t i,
-                    std::size_t _batch_capacity,
-                    std::size_t _batch_flush_size) {
-      ceph_assert(_batch_capacity > 0);
-      index = i;
-      batch_capacity = _batch_capacity;
-      batch_flush_size = _batch_flush_size;
-      pending.reserve(batch_capacity);
-    }
-
-    // Add to the batch, the future will be resolved after the batch is
-    // written.
-    //
-    // Set write_result_t::write_length to 0 if the record is not the first one
-    // in the batch.
-    using add_pending_ertr = JournalSegmentManager::write_ertr;
-    using add_pending_ret = add_pending_ertr::future<record_locator_t>;
-    add_pending_ret add_pending(
-        record_t&&,
-        OrderingHandle&,
-        extent_len_t block_size);
-
-    // Encode the batched records for write.
-    std::pair<ceph::bufferlist, record_group_size_t> encode_batch(
-        const journal_seq_t& committed_to,
-        segment_nonce_t segment_nonce);
-
-    // Set the write result and reset for reuse
-    using maybe_result_t = std::optional<write_result_t>;
-    void set_result(maybe_result_t maybe_write_end_seq);
-
-    // The fast path that is equivalent to submit a single record as a batch.
-    //
-    // Essentially, equivalent to the combined logic of:
-    // add_pending(), encode_batch() and set_result() above without
-    // the intervention of the shared io_promise.
-    //
-    // Note the current RecordBatch can be reused afterwards.
-    std::pair<ceph::bufferlist, record_group_size_t> submit_pending_fast(
-        record_t&&,
-        extent_len_t block_size,
-        const journal_seq_t& committed_to,
-        segment_nonce_t segment_nonce);
-
-  private:
-    record_group_size_t get_encoded_length_after(
-        const record_t& record,
-        extent_len_t block_size) const {
-      return pending.size.get_encoded_length_after(
-          record.size, block_size);
-    }
-
-    state_t state = state_t::EMPTY;
-    std::size_t index = 0;
-    std::size_t batch_capacity = 0;
-    std::size_t batch_flush_size = 0;
-
-    record_group_t pending;
-    std::size_t submitting_size = 0;
-    seastore_off_t submitting_length = 0;
-    seastore_off_t submitting_mdlength = 0;
-
-    struct promise_result_t {
-      write_result_t write_result;
-      seastore_off_t mdlength;
-    };
-    using maybe_promise_result_t = std::optional<promise_result_t>;
-    std::optional<seastar::shared_promise<maybe_promise_result_t> > io_promise;
-  };
-
-  class RecordSubmitter {
-    enum class state_t {
-      IDLE = 0, // outstanding_io == 0
-      PENDING,  // outstanding_io <  io_depth_limit
-      FULL      // outstanding_io == io_depth_limit
-      // OVERFLOW: outstanding_io >  io_depth_limit is impossible
-    };
-
-    struct grouped_io_stats {
-      uint64_t num_io = 0;
-      uint64_t num_io_grouped = 0;
-
-      void increment(uint64_t num_grouped_io) {
-        ++num_io;
-        num_io_grouped += num_grouped_io;
-      }
-    };
-
-  public:
-    RecordSubmitter(std::size_t io_depth,
-                    std::size_t batch_capacity,
-                    std::size_t batch_flush_size,
-                    double preferred_fullness,
-                    JournalSegmentManager&);
-
-    grouped_io_stats get_record_batch_stats() const {
-      return stats.record_batch_stats;
-    }
-
-    grouped_io_stats get_io_depth_stats() const {
-      return stats.io_depth_stats;
-    }
-
-    uint64_t get_record_group_padding_bytes() const {
-      return stats.record_group_padding_bytes;
-    }
-
-    uint64_t get_record_group_metadata_bytes() const {
-      return stats.record_group_metadata_bytes;
-    }
-
-    uint64_t get_record_group_data_bytes() const {
-      return stats.record_group_data_bytes;
-    }
-
-    void reset_stats() {
-      stats = {};
-    }
-
-    void set_write_pipeline(WritePipeline *_write_pipeline) {
-      write_pipeline = _write_pipeline;
-    }
-
-    using submit_ret = Journal::submit_record_ret;
-    submit_ret submit(record_t&&, OrderingHandle&);
-    seastar::future<> flush(OrderingHandle &handle);
-
-  private:
-    void update_state();
-
-    void increment_io() {
-      ++num_outstanding_io;
-      stats.io_depth_stats.increment(num_outstanding_io);
-      update_state();
-    }
-
-    void decrement_io_with_flush();
-
-    void pop_free_batch() {
-      assert(p_current_batch == nullptr);
-      assert(!free_batch_ptrs.empty());
-      p_current_batch = free_batch_ptrs.front();
-      assert(p_current_batch->is_empty());
-      assert(p_current_batch == &batches[p_current_batch->get_index()]);
-      free_batch_ptrs.pop_front();
-    }
-
-    void account_submission(std::size_t, const record_group_size_t&);
-
-    using maybe_result_t = RecordBatch::maybe_result_t;
-    void finish_submit_batch(RecordBatch*, maybe_result_t);
-
-    void flush_current_batch();
-
-    using submit_pending_ertr = JournalSegmentManager::write_ertr;
-    using submit_pending_ret = submit_pending_ertr::future<
-      record_locator_t>;
-    submit_pending_ret submit_pending(
-        record_t&&, OrderingHandle &handle, bool flush);
-
-    using do_submit_ret = submit_pending_ret;
-    do_submit_ret do_submit(
-        record_t&&, OrderingHandle&);
-
-    state_t state = state_t::IDLE;
-    std::size_t num_outstanding_io = 0;
-    std::size_t io_depth_limit;
-    double preferred_fullness;
-
-    WritePipeline* write_pipeline = nullptr;
-    JournalSegmentManager& journal_segment_manager;
-    std::unique_ptr<RecordBatch[]> batches;
-    std::size_t current_batch_index;
-    // should not be nullptr after constructed
-    RecordBatch* p_current_batch = nullptr;
-    seastar::circular_buffer<RecordBatch*> free_batch_ptrs;
-    std::optional<seastar::promise<> > wait_submit_promise;
-
-    struct {
-      grouped_io_stats record_batch_stats;
-      grouped_io_stats io_depth_stats;
-      uint64_t record_group_padding_bytes = 0;
-      uint64_t record_group_metadata_bytes = 0;
-      uint64_t record_group_data_bytes = 0;
-    } stats;
-  };
-
-  SegmentProvider* segment_provider = nullptr;
-  JournalSegmentManager journal_segment_manager;
-  RecordSubmitter record_submitter;
-  ExtentReader& scanner;
-  seastar::metrics::metric_group metrics;
+  virtual ~Journal() {}
+};
+using JournalRef = std::unique_ptr<Journal>;
 
-  /// return ordered vector of segments to replay
-  using replay_segments_t = std::vector<
-    std::pair<journal_seq_t, segment_header_t>>;
-  using prep_replay_segments_ertr = crimson::errorator<
-    crimson::ct_error::input_output_error
-    >;
-  using prep_replay_segments_fut = prep_replay_segments_ertr::future<
-    replay_segments_t>;
-  prep_replay_segments_fut prep_replay_segments(
-    std::vector<std::pair<segment_id_t, segment_header_t>> segments);
+namespace journal {
 
-  /// replays records starting at start through end of segment
-  replay_ertr::future<>
-  replay_segment(
-    journal_seq_t start,             ///< [in] starting addr, seq
-    segment_header_t header,         ///< [in] segment header
-    delta_handler_t &delta_handler   ///< [in] processes deltas in order
-  );
+JournalRef make_segmented(
+  SegmentManager &sm,
+  ExtentReader &reader,
+  SegmentProvider &provider);
 
-  void register_metrics();
-};
-using JournalRef = std::unique_ptr<Journal>;
+}
 
 }
diff --git a/src/crimson/os/seastore/journal/segmented_journal.cc b/src/crimson/os/seastore/journal/segmented_journal.cc
new file mode 100644 (file)
index 0000000..232b665
--- /dev/null
@@ -0,0 +1,917 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <iostream>
+
+#include <boost/iterator/counting_iterator.hpp>
+
+#include "include/intarith.h"
+
+#include "segmented_journal.h"
+
+#include "crimson/common/config_proxy.h"
+#include "crimson/os/seastore/logging.h"
+
+SET_SUBSYS(seastore_journal);
+
+/*
+ * format:
+ * - H<handle-addr> information
+ *
+ * levels:
+ * - INFO:  major initiation, closing, rolling and replay operations
+ * - DEBUG: INFO details, major submit operations
+ * - TRACE: DEBUG details
+ */
+
+namespace crimson::os::seastore::journal {
+
+segment_nonce_t generate_nonce(
+  segment_seq_t seq,
+  const seastore_meta_t &meta)
+{
+  return ceph_crc32c(
+    seq,
+    reinterpret_cast<const unsigned char *>(meta.seastore_id.bytes()),
+    sizeof(meta.seastore_id.uuid));
+}
+
+SegmentedJournal::SegmentedJournal(
+  SegmentManager &segment_manager,
+  ExtentReader &scanner,
+  SegmentProvider &segment_provider)
+  : segment_provider(segment_provider),
+    journal_segment_manager(segment_manager, segment_provider),
+    record_submitter(crimson::common::get_conf<uint64_t>(
+                       "seastore_journal_iodepth_limit"),
+                     crimson::common::get_conf<uint64_t>(
+                       "seastore_journal_batch_capacity"),
+                     crimson::common::get_conf<Option::size_t>(
+                       "seastore_journal_batch_flush_size"),
+                     crimson::common::get_conf<double>(
+                       "seastore_journal_batch_preferred_fullness"),
+                     journal_segment_manager),
+    scanner(scanner)
+{
+  register_metrics();
+}
+
+SegmentedJournal::open_for_write_ret SegmentedJournal::open_for_write()
+{
+  LOG_PREFIX(Journal::open_for_write);
+  INFO("device_id={}", journal_segment_manager.get_device_id());
+  return journal_segment_manager.open();
+}
+
+SegmentedJournal::close_ertr::future<> SegmentedJournal::close()
+{
+  LOG_PREFIX(Journal::close);
+  INFO("closing");
+  metrics.clear();
+  return journal_segment_manager.close();
+}
+
+SegmentedJournal::prep_replay_segments_fut
+SegmentedJournal::prep_replay_segments(
+  std::vector<std::pair<segment_id_t, segment_header_t>> segments)
+{
+  LOG_PREFIX(Journal::prep_replay_segments);
+  if (segments.empty()) {
+    ERROR("no journal segments for replay");
+    return crimson::ct_error::input_output_error::make();
+  }
+  std::sort(
+    segments.begin(),
+    segments.end(),
+    [](const auto &lt, const auto &rt) {
+      return lt.second.journal_segment_seq <
+       rt.second.journal_segment_seq;
+    });
+
+  journal_segment_manager.set_segment_seq(
+    segments.rbegin()->second.journal_segment_seq);
+  std::for_each(
+    segments.begin(),
+    segments.end(),
+    [this, FNAME](auto &seg)
+  {
+    if (seg.first != seg.second.physical_segment_id ||
+        seg.first.device_id() != journal_segment_manager.get_device_id() ||
+        seg.second.get_type() != segment_type_t::JOURNAL) {
+      ERROR("illegal journal segment for replay -- {}", seg.second);
+      ceph_abort();
+    }
+  });
+
+  auto journal_tail = segments.rbegin()->second.journal_tail;
+  segment_provider.update_journal_tail_committed(journal_tail);
+  auto replay_from = journal_tail.offset;
+  auto from = segments.begin();
+  if (replay_from != P_ADDR_NULL) {
+    from = std::find_if(
+      segments.begin(),
+      segments.end(),
+      [&replay_from](const auto &seg) -> bool {
+       auto& seg_addr = replay_from.as_seg_paddr();
+       return seg.first == seg_addr.get_segment_id();
+      });
+    if (from->second.journal_segment_seq != journal_tail.segment_seq) {
+      ERROR("journal_tail {} does not match {}",
+            journal_tail, from->second);
+      ceph_abort();
+    }
+  } else {
+    replay_from = paddr_t::make_seg_paddr(
+      from->first,
+      journal_segment_manager.get_block_size());
+  }
+
+  auto num_segments = segments.end() - from;
+  INFO("{} segments to replay, from {}",
+       num_segments, replay_from);
+  auto ret = replay_segments_t(num_segments);
+  std::transform(
+    from, segments.end(), ret.begin(),
+    [this](const auto &p) {
+      auto ret = journal_seq_t{
+       p.second.journal_segment_seq,
+       paddr_t::make_seg_paddr(
+         p.first,
+         journal_segment_manager.get_block_size())
+      };
+      return std::make_pair(ret, p.second);
+    });
+  ret[0].first.offset = replay_from;
+  return prep_replay_segments_fut(
+    prep_replay_segments_ertr::ready_future_marker{},
+    std::move(ret));
+}
+
+SegmentedJournal::replay_ertr::future<>
+SegmentedJournal::replay_segment(
+  journal_seq_t seq,
+  segment_header_t header,
+  delta_handler_t &handler)
+{
+  LOG_PREFIX(Journal::replay_segment);
+  INFO("starting at {} -- {}", seq, header);
+  return seastar::do_with(
+    scan_valid_records_cursor(seq),
+    ExtentReader::found_record_handler_t([=, &handler](
+      record_locator_t locator,
+      const record_group_header_t& header,
+      const bufferlist& mdbuf)
+      -> ExtentReader::scan_valid_records_ertr::future<>
+    {
+      auto maybe_record_deltas_list = try_decode_deltas(
+          header, mdbuf, locator.record_block_base);
+      if (!maybe_record_deltas_list) {
+        // This should be impossible, we did check the crc on the mdbuf
+        ERROR("unable to decode deltas for record {} at {}",
+              header, locator);
+        return crimson::ct_error::input_output_error::make();
+      }
+
+      return seastar::do_with(
+        std::move(*maybe_record_deltas_list),
+        [write_result=locator.write_result,
+         this,
+         FNAME,
+         &handler](auto& record_deltas_list)
+      {
+        return crimson::do_for_each(
+          record_deltas_list,
+          [write_result,
+           this,
+           FNAME,
+           &handler](record_deltas_t& record_deltas)
+        {
+          auto locator = record_locator_t{
+            record_deltas.record_block_base,
+            write_result
+          };
+          DEBUG("processing {} deltas at block_base {}",
+                record_deltas.deltas.size(),
+                locator);
+          return crimson::do_for_each(
+            record_deltas.deltas,
+            [locator,
+             this,
+             FNAME,
+             &handler](delta_info_t& delta)
+          {
+            /* The journal may validly contain deltas for extents in
+             * since released segments.  We can detect those cases by
+             * checking whether the segment in question currently has a
+             * sequence number > the current journal segment seq. We can
+             * safetly skip these deltas because the extent must already
+             * have been rewritten.
+             */
+            if (delta.paddr != P_ADDR_NULL) {
+              auto& seg_addr = delta.paddr.as_seg_paddr();
+              auto delta_paddr_segment_seq = segment_provider.get_seq(seg_addr.get_segment_id());
+              auto delta_paddr_segment_type = segment_seq_to_type(delta_paddr_segment_seq);
+              auto locator_segment_seq = locator.write_result.start_seq.segment_seq;
+              if (delta_paddr_segment_type == segment_type_t::NULL_SEG ||
+                  (delta_paddr_segment_type == segment_type_t::JOURNAL &&
+                   delta_paddr_segment_seq > locator_segment_seq)) {
+                SUBDEBUG(seastore_cache,
+                         "delta is obsolete, delta_paddr_segment_seq={}, locator_segment_seq={} -- {}",
+                         segment_seq_printer_t{delta_paddr_segment_seq},
+                         segment_seq_printer_t{locator_segment_seq},
+                         delta);
+                return replay_ertr::now();
+              }
+            }
+            return handler(locator, delta);
+          });
+        });
+      });
+    }),
+    [=](auto &cursor, auto &dhandler) {
+      return scanner.scan_valid_records(
+       cursor,
+       header.segment_nonce,
+       std::numeric_limits<size_t>::max(),
+       dhandler).safe_then([](auto){}
+      ).handle_error(
+       replay_ertr::pass_further{},
+       crimson::ct_error::assert_all{
+         "shouldn't meet with any other error other replay_ertr"
+       }
+      );
+    }
+  );
+}
+
+SegmentedJournal::find_journal_segments_ret
+SegmentedJournal::find_journal_segments()
+{
+  return seastar::do_with(
+    find_journal_segments_ret_bare{},
+    [this](auto &ret) -> find_journal_segments_ret {
+      return crimson::do_for_each(
+       boost::counting_iterator<device_segment_id_t>(0),
+       boost::counting_iterator<device_segment_id_t>(
+         journal_segment_manager.get_num_segments()),
+       [this, &ret](device_segment_id_t d_segment_id) {
+         segment_id_t segment_id{
+           journal_segment_manager.get_device_id(),
+           d_segment_id};
+         return scanner.read_segment_header(
+           segment_id
+         ).safe_then([segment_id, &ret](auto &&header) {
+           if (header.get_type() == segment_type_t::JOURNAL) {
+             ret.emplace_back(std::make_pair(segment_id, std::move(header)));
+           }
+         }).handle_error(
+           crimson::ct_error::enoent::handle([](auto) {
+             return find_journal_segments_ertr::now();
+           }),
+           crimson::ct_error::enodata::handle([](auto) {
+             return find_journal_segments_ertr::now();
+           }),
+           crimson::ct_error::input_output_error::pass_further{}
+         );
+       }).safe_then([&ret]() mutable {
+         return find_journal_segments_ret{
+           find_journal_segments_ertr::ready_future_marker{},
+           std::move(ret)};
+       });
+    });
+}
+
+SegmentedJournal::replay_ret SegmentedJournal::replay(
+  delta_handler_t &&delta_handler)
+{
+  LOG_PREFIX(Journal::replay);
+  return find_journal_segments(
+  ).safe_then([this, FNAME, delta_handler=std::move(delta_handler)]
+    (auto &&segment_headers) mutable -> replay_ret {
+    INFO("got {} segments", segment_headers.size());
+    return seastar::do_with(
+      std::move(delta_handler), replay_segments_t(),
+      [this, segment_headers=std::move(segment_headers)]
+      (auto &handler, auto &segments) mutable -> replay_ret {
+       return prep_replay_segments(std::move(segment_headers)
+       ).safe_then([this, &handler, &segments](auto replay_segs) mutable {
+         segments = std::move(replay_segs);
+         return crimson::do_for_each(segments, [this, &handler](auto i) mutable {
+           return replay_segment(i.first, i.second, handler);
+         });
+       });
+      });
+  });
+}
+
+void SegmentedJournal::register_metrics()
+{
+  LOG_PREFIX(Journal::register_metrics);
+  DEBUG("");
+  record_submitter.reset_stats();
+  namespace sm = seastar::metrics;
+  metrics.add_group(
+    "journal",
+    {
+      sm::make_counter(
+        "record_num",
+        [this] {
+          return record_submitter.get_record_batch_stats().num_io;
+        },
+        sm::description("total number of records submitted")
+      ),
+      sm::make_counter(
+        "record_batch_num",
+        [this] {
+          return record_submitter.get_record_batch_stats().num_io_grouped;
+        },
+        sm::description("total number of records batched")
+      ),
+      sm::make_counter(
+        "io_num",
+        [this] {
+          return record_submitter.get_io_depth_stats().num_io;
+        },
+        sm::description("total number of io submitted")
+      ),
+      sm::make_counter(
+        "io_depth_num",
+        [this] {
+          return record_submitter.get_io_depth_stats().num_io_grouped;
+        },
+        sm::description("total number of io depth")
+      ),
+      sm::make_counter(
+        "record_group_padding_bytes",
+        [this] {
+          return record_submitter.get_record_group_padding_bytes();
+        },
+        sm::description("bytes of metadata padding when write record groups")
+      ),
+      sm::make_counter(
+        "record_group_metadata_bytes",
+        [this] {
+          return record_submitter.get_record_group_metadata_bytes();
+        },
+        sm::description("bytes of raw metadata when write record groups")
+      ),
+      sm::make_counter(
+        "record_group_data_bytes",
+        [this] {
+          return record_submitter.get_record_group_data_bytes();
+        },
+        sm::description("bytes of data when write record groups")
+      ),
+    }
+  );
+}
+
+SegmentedJournal::JournalSegmentManager::JournalSegmentManager(
+  SegmentManager& segment_manager,
+  SegmentProvider& segment_provider)
+  : segment_provider{segment_provider}, segment_manager{segment_manager}
+{
+  reset();
+}
+
+SegmentedJournal::JournalSegmentManager::open_ret
+SegmentedJournal::JournalSegmentManager::open()
+{
+  return roll().safe_then([this] {
+    return get_current_write_seq();
+  });
+}
+
+SegmentedJournal::JournalSegmentManager::close_ertr::future<>
+SegmentedJournal::JournalSegmentManager::close()
+{
+  LOG_PREFIX(JournalSegmentManager::close);
+  if (current_journal_segment) {
+    INFO("segment_id={}, seq={}, "
+         "written_to={}, committed_to={}, nonce={}",
+         current_journal_segment->get_segment_id(),
+         get_segment_seq(),
+         written_to,
+         committed_to,
+         current_segment_nonce);
+  } else {
+    INFO("no current journal segment");
+  }
+
+  return (
+    current_journal_segment ?
+    current_journal_segment->close() :
+    Segment::close_ertr::now()
+  ).handle_error(
+    close_ertr::pass_further{},
+    crimson::ct_error::assert_all{
+      "Invalid error in JournalSegmentManager::close()"
+    }
+  ).finally([this] {
+    reset();
+  });
+}
+
+SegmentedJournal::JournalSegmentManager::roll_ertr::future<>
+SegmentedJournal::JournalSegmentManager::roll()
+{
+  LOG_PREFIX(JournalSegmentManager::roll);
+  auto old_segment_id = current_journal_segment ?
+    current_journal_segment->get_segment_id() :
+    NULL_SEG_ID;
+  if (current_journal_segment) {
+    INFO("closing segment {}, seq={}, "
+         "written_to={}, committed_to={}, nonce={}",
+         old_segment_id,
+         get_segment_seq(),
+         written_to,
+         committed_to,
+         current_segment_nonce);
+  }
+
+  return (
+    current_journal_segment ?
+    current_journal_segment->close() :
+    Segment::close_ertr::now()
+  ).safe_then([this] {
+    return segment_provider.get_segment(
+        get_device_id(), next_journal_segment_seq);
+  }).safe_then([this](auto segment) {
+    return segment_manager.open(segment);
+  }).safe_then([this](auto sref) {
+    current_journal_segment = sref;
+    return initialize_segment(*current_journal_segment);
+  }).safe_then([this, old_segment_id] {
+    if (old_segment_id != NULL_SEG_ID) {
+      segment_provider.close_segment(old_segment_id);
+    }
+  }).handle_error(
+    roll_ertr::pass_further{},
+    crimson::ct_error::assert_all{
+      "Invalid error in JournalSegmentManager::roll"
+    }
+  );
+}
+
+SegmentedJournal::JournalSegmentManager::write_ret
+SegmentedJournal::JournalSegmentManager::write(ceph::bufferlist to_write)
+{
+  LOG_PREFIX(JournalSegmentManager::write);
+  auto write_length = to_write.length();
+  auto write_start_seq = get_current_write_seq();
+  TRACE("{}~{}", write_start_seq, write_length);
+  assert(write_length > 0);
+  assert((write_length % segment_manager.get_block_size()) == 0);
+  assert(!needs_roll(write_length));
+
+  auto write_start_offset = written_to;
+  written_to += write_length;
+  auto write_result = write_result_t{
+    write_start_seq,
+    static_cast<seastore_off_t>(write_length)
+  };
+  return current_journal_segment->write(
+    write_start_offset, to_write
+  ).handle_error(
+    write_ertr::pass_further{},
+    crimson::ct_error::assert_all{
+      "Invalid error in JournalSegmentManager::write"
+    }
+  ).safe_then([write_result] {
+    return write_result;
+  });
+}
+
+void SegmentedJournal::JournalSegmentManager::mark_committed(
+  const journal_seq_t& new_committed_to)
+{
+  LOG_PREFIX(JournalSegmentManager::mark_committed);
+  TRACE("{} => {}", committed_to, new_committed_to);
+  assert(committed_to == journal_seq_t() ||
+         committed_to <= new_committed_to);
+  committed_to = new_committed_to;
+}
+
+SegmentedJournal::JournalSegmentManager::initialize_segment_ertr::future<>
+SegmentedJournal::JournalSegmentManager::initialize_segment(Segment& segment)
+{
+  LOG_PREFIX(JournalSegmentManager::initialize_segment);
+  auto new_tail = segment_provider.get_journal_tail_target();
+  // write out header
+  ceph_assert(segment.get_write_ptr() == 0);
+  bufferlist bl;
+
+  segment_seq_t seq = next_journal_segment_seq++;
+  current_segment_nonce = generate_nonce(
+    seq, segment_manager.get_meta());
+  auto header = segment_header_t{
+    seq,
+    segment.get_segment_id(),
+    new_tail,
+    current_segment_nonce};
+  INFO("writing {} ...", header);
+  ceph_assert(header.get_type() == segment_type_t::JOURNAL);
+  encode(header, bl);
+
+  bufferptr bp(
+    ceph::buffer::create_page_aligned(
+      segment_manager.get_block_size()));
+  bp.zero();
+  auto iter = bl.cbegin();
+  iter.copy(bl.length(), bp.c_str());
+  bl.clear();
+  bl.append(bp);
+
+  written_to = 0;
+  return write(bl
+  ).safe_then([this, new_tail](auto) {
+    segment_provider.update_journal_tail_committed(new_tail);
+  });
+}
+
+SegmentedJournal::RecordBatch::add_pending_ret
+SegmentedJournal::RecordBatch::add_pending(
+  record_t&& record,
+  OrderingHandle& handle,
+  extent_len_t block_size)
+{
+  LOG_PREFIX(RecordBatch::add_pending);
+  auto new_size = get_encoded_length_after(record, block_size);
+  auto dlength_offset = pending.size.dlength;
+  TRACE("H{} batches={}, write_size={}, dlength_offset={} ...",
+        (void*)&handle,
+        pending.get_size() + 1,
+        new_size.get_encoded_length(),
+        dlength_offset);
+  assert(state != state_t::SUBMITTING);
+  assert(can_batch(record, block_size).value() == new_size);
+
+  pending.push_back(
+      std::move(record), block_size);
+  assert(pending.size == new_size);
+  if (state == state_t::EMPTY) {
+    assert(!io_promise.has_value());
+    io_promise = seastar::shared_promise<maybe_promise_result_t>();
+  } else {
+    assert(io_promise.has_value());
+  }
+  state = state_t::PENDING;
+
+  return io_promise->get_shared_future(
+  ).then([dlength_offset, FNAME, &handle
+         ](auto maybe_promise_result) -> add_pending_ret {
+    if (!maybe_promise_result.has_value()) {
+      ERROR("H{} write failed", (void*)&handle);
+      return crimson::ct_error::input_output_error::make();
+    }
+    auto write_result = maybe_promise_result->write_result;
+    auto submit_result = record_locator_t{
+      write_result.start_seq.offset.add_offset(
+          maybe_promise_result->mdlength + dlength_offset),
+      write_result
+    };
+    TRACE("H{} write finish with {}", (void*)&handle, submit_result);
+    return add_pending_ret(
+      add_pending_ertr::ready_future_marker{},
+      submit_result);
+  });
+}
+
+std::pair<ceph::bufferlist, record_group_size_t>
+SegmentedJournal::RecordBatch::encode_batch(
+  const journal_seq_t& committed_to,
+  segment_nonce_t segment_nonce)
+{
+  assert(state == state_t::PENDING);
+  assert(pending.get_size() > 0);
+  assert(io_promise.has_value());
+
+  state = state_t::SUBMITTING;
+  submitting_size = pending.get_size();
+  auto gsize = pending.size;
+  submitting_length = gsize.get_encoded_length();
+  submitting_mdlength = gsize.get_mdlength();
+  auto bl = encode_records(pending, committed_to, segment_nonce);
+  // Note: pending is cleared here
+  assert(bl.length() == (std::size_t)submitting_length);
+  return std::make_pair(bl, gsize);
+}
+
+void SegmentedJournal::RecordBatch::set_result(
+  maybe_result_t maybe_write_result)
+{
+  maybe_promise_result_t result;
+  if (maybe_write_result.has_value()) {
+    assert(maybe_write_result->length == submitting_length);
+    result = promise_result_t{
+      *maybe_write_result,
+      submitting_mdlength
+    };
+  }
+  assert(state == state_t::SUBMITTING);
+  assert(io_promise.has_value());
+
+  state = state_t::EMPTY;
+  submitting_size = 0;
+  submitting_length = 0;
+  submitting_mdlength = 0;
+  io_promise->set_value(result);
+  io_promise.reset();
+}
+
+std::pair<ceph::bufferlist, record_group_size_t>
+SegmentedJournal::RecordBatch::submit_pending_fast(
+  record_t&& record,
+  extent_len_t block_size,
+  const journal_seq_t& committed_to,
+  segment_nonce_t segment_nonce)
+{
+  auto new_size = get_encoded_length_after(record, block_size);
+  std::ignore = new_size;
+  assert(state == state_t::EMPTY);
+  assert(can_batch(record, block_size).value() == new_size);
+
+  auto group = record_group_t(std::move(record), block_size);
+  auto size = group.size;
+  assert(size == new_size);
+  auto bl = encode_records(group, committed_to, segment_nonce);
+  assert(bl.length() == size.get_encoded_length());
+  return std::make_pair(bl, size);
+}
+
+SegmentedJournal::RecordSubmitter::RecordSubmitter(
+  std::size_t io_depth,
+  std::size_t batch_capacity,
+  std::size_t batch_flush_size,
+  double preferred_fullness,
+  JournalSegmentManager& jsm)
+  : io_depth_limit{io_depth},
+    preferred_fullness{preferred_fullness},
+    journal_segment_manager{jsm},
+    batches(new RecordBatch[io_depth + 1])
+{
+  LOG_PREFIX(RecordSubmitter);
+  INFO("Journal::RecordSubmitter: io_depth_limit={}, "
+       "batch_capacity={}, batch_flush_size={}, "
+       "preferred_fullness={}",
+       io_depth, batch_capacity,
+       batch_flush_size, preferred_fullness);
+  ceph_assert(io_depth > 0);
+  ceph_assert(batch_capacity > 0);
+  ceph_assert(preferred_fullness >= 0 &&
+              preferred_fullness <= 1);
+  free_batch_ptrs.reserve(io_depth + 1);
+  for (std::size_t i = 0; i <= io_depth; ++i) {
+    batches[i].initialize(i, batch_capacity, batch_flush_size);
+    free_batch_ptrs.push_back(&batches[i]);
+  }
+  pop_free_batch();
+}
+
+SegmentedJournal::RecordSubmitter::submit_ret
+SegmentedJournal::RecordSubmitter::submit(
+  record_t&& record,
+  OrderingHandle& handle)
+{
+  LOG_PREFIX(RecordSubmitter::submit);
+  DEBUG("H{} {} start ...", (void*)&handle, record);
+  assert(write_pipeline);
+  auto expected_size = record_group_size_t(
+      record.size,
+      journal_segment_manager.get_block_size()
+  ).get_encoded_length();
+  auto max_record_length = journal_segment_manager.get_max_write_length();
+  if (expected_size > max_record_length) {
+    ERROR("H{} {} exceeds max record size {}",
+          (void*)&handle, record, max_record_length);
+    return crimson::ct_error::erange::make();
+  }
+
+  return do_submit(std::move(record), handle);
+}
+
+seastar::future<> SegmentedJournal::RecordSubmitter::flush(OrderingHandle &handle)
+{
+  LOG_PREFIX(RecordSubmitter::flush);
+  DEBUG("H{} flush", (void*)&handle);
+  return handle.enter(write_pipeline->device_submission
+  ).then([this, &handle] {
+    return handle.enter(write_pipeline->finalize);
+  }).then([FNAME, &handle] {
+    DEBUG("H{} flush done", (void*)&handle);
+  });
+}
+
+void SegmentedJournal::RecordSubmitter::update_state()
+{
+  if (num_outstanding_io == 0) {
+    state = state_t::IDLE;
+  } else if (num_outstanding_io < io_depth_limit) {
+    state = state_t::PENDING;
+  } else if (num_outstanding_io == io_depth_limit) {
+    state = state_t::FULL;
+  } else {
+    ceph_abort("fatal error: io-depth overflow");
+  }
+}
+
+void SegmentedJournal::RecordSubmitter::decrement_io_with_flush()
+{
+  LOG_PREFIX(RecordSubmitter::decrement_io_with_flush);
+  assert(num_outstanding_io > 0);
+  --num_outstanding_io;
+#ifndef NDEBUG
+  auto prv_state = state;
+#endif
+  update_state();
+
+  if (wait_submit_promise.has_value()) {
+    DEBUG("wait resolved");
+    assert(prv_state == state_t::FULL);
+    wait_submit_promise->set_value();
+    wait_submit_promise.reset();
+  }
+
+  if (!p_current_batch->is_empty()) {
+    TRACE("flush");
+    flush_current_batch();
+  }
+}
+
+void SegmentedJournal::RecordSubmitter::account_submission(
+  std::size_t num,
+  const record_group_size_t& size)
+{
+  stats.record_group_padding_bytes +=
+    (size.get_mdlength() - size.get_raw_mdlength());
+  stats.record_group_metadata_bytes += size.get_raw_mdlength();
+  stats.record_group_data_bytes += size.dlength;
+}
+
+void SegmentedJournal::RecordSubmitter::finish_submit_batch(
+  RecordBatch* p_batch,
+  maybe_result_t maybe_result)
+{
+  assert(p_batch->is_submitting());
+  p_batch->set_result(maybe_result);
+  free_batch_ptrs.push_back(p_batch);
+  decrement_io_with_flush();
+}
+
+void SegmentedJournal::RecordSubmitter::flush_current_batch()
+{
+  LOG_PREFIX(RecordSubmitter::flush_current_batch);
+  RecordBatch* p_batch = p_current_batch;
+  assert(p_batch->is_pending());
+  p_current_batch = nullptr;
+  pop_free_batch();
+
+  increment_io();
+  auto num = p_batch->get_num_records();
+  auto committed_to = journal_segment_manager.get_committed_to();
+  auto [to_write, sizes] = p_batch->encode_batch(
+    committed_to, journal_segment_manager.get_nonce());
+  DEBUG("{} records, {}, committed_to={}, outstanding_io={} ...",
+        num, sizes, committed_to, num_outstanding_io);
+  account_submission(num, sizes);
+  std::ignore = journal_segment_manager.write(to_write
+  ).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) {
+    TRACE("{} records, {}, write done with {}", num, sizes, write_result);
+    finish_submit_batch(p_batch, write_result);
+  }).handle_error(
+    crimson::ct_error::all_same_way([this, p_batch, FNAME, num, sizes=sizes](auto e) {
+      ERROR("{} records, {}, got error {}", num, sizes, e);
+      finish_submit_batch(p_batch, std::nullopt);
+    })
+  ).handle_exception([this, p_batch, FNAME, num, sizes=sizes](auto e) {
+    ERROR("{} records, {}, got exception {}", num, sizes, e);
+    finish_submit_batch(p_batch, std::nullopt);
+  });
+}
+
+SegmentedJournal::RecordSubmitter::submit_pending_ret
+SegmentedJournal::RecordSubmitter::submit_pending(
+  record_t&& record,
+  OrderingHandle& handle,
+  bool flush)
+{
+  LOG_PREFIX(RecordSubmitter::submit_pending);
+  assert(!p_current_batch->is_submitting());
+  stats.record_batch_stats.increment(
+      p_current_batch->get_num_records() + 1);
+  bool do_flush = (flush || state == state_t::IDLE);
+  auto write_fut = [this, do_flush, FNAME, record=std::move(record), &handle]() mutable {
+    if (do_flush && p_current_batch->is_empty()) {
+      // fast path with direct write
+      increment_io();
+      auto committed_to = journal_segment_manager.get_committed_to();
+      auto [to_write, sizes] = p_current_batch->submit_pending_fast(
+        std::move(record),
+        journal_segment_manager.get_block_size(),
+        committed_to,
+        journal_segment_manager.get_nonce());
+      DEBUG("H{} fast submit {}, committed_to={}, outstanding_io={} ...",
+            (void*)&handle, sizes, committed_to, num_outstanding_io);
+      account_submission(1, sizes);
+      return journal_segment_manager.write(to_write
+      ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
+        return record_locator_t{
+          write_result.start_seq.offset.add_offset(mdlength),
+          write_result
+        };
+      }).finally([this] {
+        decrement_io_with_flush();
+      });
+    } else {
+      // indirect write with or without the existing pending records
+      auto write_fut = p_current_batch->add_pending(
+        std::move(record),
+        handle,
+        journal_segment_manager.get_block_size());
+      if (do_flush) {
+        DEBUG("H{} added pending and flush", (void*)&handle);
+        flush_current_batch();
+      } else {
+        DEBUG("H{} added with {} pending",
+              (void*)&handle, p_current_batch->get_num_records());
+      }
+      return write_fut;
+    }
+  }();
+  return handle.enter(write_pipeline->device_submission
+  ).then([write_fut=std::move(write_fut)]() mutable {
+    return std::move(write_fut);
+  }).safe_then([this, FNAME, &handle](auto submit_result) {
+    return handle.enter(write_pipeline->finalize
+    ).then([this, FNAME, submit_result, &handle] {
+      DEBUG("H{} finish with {}", (void*)&handle, submit_result);
+      journal_segment_manager.mark_committed(
+          submit_result.write_result.get_end_seq());
+      return submit_result;
+    });
+  });
+}
+
+SegmentedJournal::RecordSubmitter::do_submit_ret
+SegmentedJournal::RecordSubmitter::do_submit(
+  record_t&& record,
+  OrderingHandle& handle)
+{
+  LOG_PREFIX(RecordSubmitter::do_submit);
+  TRACE("H{} outstanding_io={}/{} ...",
+        (void*)&handle, num_outstanding_io, io_depth_limit);
+  assert(!p_current_batch->is_submitting());
+  if (state <= state_t::PENDING) {
+    // can increment io depth
+    assert(!wait_submit_promise.has_value());
+    auto maybe_new_size = p_current_batch->can_batch(
+        record, journal_segment_manager.get_block_size());
+    if (!maybe_new_size.has_value() ||
+        (maybe_new_size->get_encoded_length() >
+         journal_segment_manager.get_max_write_length())) {
+      TRACE("H{} flush", (void*)&handle);
+      assert(p_current_batch->is_pending());
+      flush_current_batch();
+      return do_submit(std::move(record), handle);
+    } else if (journal_segment_manager.needs_roll(
+          maybe_new_size->get_encoded_length())) {
+      if (p_current_batch->is_pending()) {
+        TRACE("H{} flush and roll", (void*)&handle);
+        flush_current_batch();
+      } else {
+        TRACE("H{} roll", (void*)&handle);
+      }
+      return journal_segment_manager.roll(
+      ).safe_then([this, record=std::move(record), &handle]() mutable {
+        return do_submit(std::move(record), handle);
+      });
+    } else {
+      bool flush = (maybe_new_size->get_fullness() > preferred_fullness ?
+                    true : false);
+      return submit_pending(std::move(record), handle, flush);
+    }
+  }
+
+  assert(state == state_t::FULL);
+  // cannot increment io depth
+  auto maybe_new_size = p_current_batch->can_batch(
+      record, journal_segment_manager.get_block_size());
+  if (!maybe_new_size.has_value() ||
+      (maybe_new_size->get_encoded_length() >
+       journal_segment_manager.get_max_write_length()) ||
+      journal_segment_manager.needs_roll(
+        maybe_new_size->get_encoded_length())) {
+    if (!wait_submit_promise.has_value()) {
+      wait_submit_promise = seastar::promise<>();
+    }
+    DEBUG("H{} wait ...", (void*)&handle);
+    return wait_submit_promise->get_future(
+    ).then([this, record=std::move(record), &handle]() mutable {
+      return do_submit(std::move(record), handle);
+    });
+  } else {
+    return submit_pending(std::move(record), handle, false);
+  }
+}
+
+}
diff --git a/src/crimson/os/seastore/journal/segmented_journal.h b/src/crimson/os/seastore/journal/segmented_journal.h
new file mode 100644 (file)
index 0000000..79ca359
--- /dev/null
@@ -0,0 +1,434 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <boost/intrusive_ptr.hpp>
+#include <optional>
+
+#include <seastar/core/circular_buffer.hh>
+#include <seastar/core/future.hh>
+#include <seastar/core/metrics.hh>
+#include <seastar/core/shared_future.hh>
+
+#include "include/ceph_assert.h"
+#include "include/buffer.h"
+#include "include/denc.h"
+
+#include "crimson/os/seastore/segment_cleaner.h"
+#include "crimson/os/seastore/journal.h"
+#include "crimson/os/seastore/extent_reader.h"
+#include "crimson/os/seastore/segment_manager.h"
+#include "crimson/os/seastore/ordering_handle.h"
+#include "crimson/os/seastore/seastore_types.h"
+#include "crimson/osd/exceptions.h"
+
+namespace crimson::os::seastore::journal {
+
+/**
+ * Manages stream of atomically written records to a SegmentManager.
+ */
+class SegmentedJournal : public Journal {
+public:
+  SegmentedJournal(
+    SegmentManager &segment_manager,
+    ExtentReader& scanner,
+    SegmentProvider& cleaner);
+  ~SegmentedJournal() {}
+
+  open_for_write_ret open_for_write() final;
+
+  close_ertr::future<> close() final;
+
+  submit_record_ret submit_record(
+    record_t &&record,
+    OrderingHandle &handle
+  ) final {
+    return record_submitter.submit(std::move(record), handle);
+  }
+
+  seastar::future<> flush(OrderingHandle &handle) final {
+    return record_submitter.flush(handle);
+  }
+
+  replay_ret replay(delta_handler_t &&delta_handler) final;
+
+  void set_write_pipeline(WritePipeline* write_pipeline) final {
+    record_submitter.set_write_pipeline(write_pipeline);
+  }
+
+private:
+  class JournalSegmentManager {
+  public:
+    JournalSegmentManager(SegmentManager&, SegmentProvider&);
+
+    using base_ertr = crimson::errorator<
+        crimson::ct_error::input_output_error>;
+    extent_len_t get_max_write_length() const {
+      return segment_manager.get_segment_size() -
+             p2align(ceph::encoded_sizeof_bounded<segment_header_t>(),
+                     size_t(segment_manager.get_block_size()));
+    }
+
+    device_id_t get_device_id() const {
+      return segment_manager.get_device_id();
+    }
+
+    device_segment_id_t get_num_segments() const {
+      return segment_manager.get_num_segments();
+    }
+
+    seastore_off_t get_block_size() const {
+      return segment_manager.get_block_size();
+    }
+
+    segment_nonce_t get_nonce() const {
+      return current_segment_nonce;
+    }
+
+    journal_seq_t get_committed_to() const {
+      return committed_to;
+    }
+
+    segment_seq_t get_segment_seq() const {
+      return next_journal_segment_seq - 1;
+    }
+
+    void set_segment_seq(segment_seq_t current_seq) {
+      next_journal_segment_seq = (current_seq + 1);
+    }
+
+    using open_ertr = base_ertr;
+    using open_ret = open_ertr::future<journal_seq_t>;
+    open_ret open();
+
+    using close_ertr = base_ertr;
+    close_ertr::future<> close();
+
+    // returns true iff the current segment has insufficient space
+    bool needs_roll(std::size_t length) const {
+      auto write_capacity = current_journal_segment->get_write_capacity();
+      return length + written_to > std::size_t(write_capacity);
+    }
+
+    // close the current segment and initialize next one
+    using roll_ertr = base_ertr;
+    roll_ertr::future<> roll();
+
+    // write the buffer, return the write result
+    // May be called concurrently, writes may complete in any order.
+    using write_ertr = base_ertr;
+    using write_ret = write_ertr::future<write_result_t>;
+    write_ret write(ceph::bufferlist to_write);
+
+    // mark write committed in order
+    void mark_committed(const journal_seq_t& new_committed_to);
+    
+  private:
+    journal_seq_t get_current_write_seq() const {
+      assert(current_journal_segment);
+      return journal_seq_t{
+        get_segment_seq(),
+        paddr_t::make_seg_paddr(current_journal_segment->get_segment_id(),
+         written_to)
+      };
+    }
+
+    void reset() {
+      next_journal_segment_seq = 0;
+      current_segment_nonce = 0;
+      current_journal_segment.reset();
+      written_to = 0;
+      committed_to = {};
+    }
+
+    // prepare segment for writes, writes out segment header
+    using initialize_segment_ertr = base_ertr;
+    initialize_segment_ertr::future<> initialize_segment(Segment&);
+
+    SegmentProvider& segment_provider;
+    SegmentManager& segment_manager;
+
+    segment_seq_t next_journal_segment_seq;
+    segment_nonce_t current_segment_nonce;
+
+    SegmentRef current_journal_segment;
+    seastore_off_t written_to;
+    // committed_to may be in a previous journal segment
+    journal_seq_t committed_to;
+  };
+
+  class RecordBatch {
+    enum class state_t {
+      EMPTY = 0,
+      PENDING,
+      SUBMITTING
+    };
+
+  public:
+    RecordBatch() = default;
+    RecordBatch(RecordBatch&&) = delete;
+    RecordBatch(const RecordBatch&) = delete;
+    RecordBatch& operator=(RecordBatch&&) = delete;
+    RecordBatch& operator=(const RecordBatch&) = delete;
+
+    bool is_empty() const {
+      return state == state_t::EMPTY;
+    }
+
+    bool is_pending() const {
+      return state == state_t::PENDING;
+    }
+
+    bool is_submitting() const {
+      return state == state_t::SUBMITTING;
+    }
+
+    std::size_t get_index() const {
+      return index;
+    }
+
+    std::size_t get_num_records() const {
+      return pending.get_size();
+    }
+
+    // return the expected write sizes if allows to batch,
+    // otherwise, return nullopt
+    std::optional<record_group_size_t> can_batch(
+        const record_t& record,
+        extent_len_t block_size) const {
+      assert(state != state_t::SUBMITTING);
+      if (pending.get_size() >= batch_capacity ||
+          (pending.get_size() > 0 &&
+           pending.size.get_encoded_length() > batch_flush_size)) {
+        assert(state == state_t::PENDING);
+        return std::nullopt;
+      }
+      return get_encoded_length_after(record, block_size);
+    }
+
+    void initialize(std::size_t i,
+                    std::size_t _batch_capacity,
+                    std::size_t _batch_flush_size) {
+      ceph_assert(_batch_capacity > 0);
+      index = i;
+      batch_capacity = _batch_capacity;
+      batch_flush_size = _batch_flush_size;
+      pending.reserve(batch_capacity);
+    }
+
+    // Add to the batch, the future will be resolved after the batch is
+    // written.
+    //
+    // Set write_result_t::write_length to 0 if the record is not the first one
+    // in the batch.
+    using add_pending_ertr = JournalSegmentManager::write_ertr;
+    using add_pending_ret = add_pending_ertr::future<record_locator_t>;
+    add_pending_ret add_pending(
+        record_t&&,
+        OrderingHandle&,
+        extent_len_t block_size);
+
+    // Encode the batched records for write.
+    std::pair<ceph::bufferlist, record_group_size_t> encode_batch(
+        const journal_seq_t& committed_to,
+        segment_nonce_t segment_nonce);
+
+    // Set the write result and reset for reuse
+    using maybe_result_t = std::optional<write_result_t>;
+    void set_result(maybe_result_t maybe_write_end_seq);
+
+    // The fast path that is equivalent to submit a single record as a batch.
+    //
+    // Essentially, equivalent to the combined logic of:
+    // add_pending(), encode_batch() and set_result() above without
+    // the intervention of the shared io_promise.
+    //
+    // Note the current RecordBatch can be reused afterwards.
+    std::pair<ceph::bufferlist, record_group_size_t> submit_pending_fast(
+        record_t&&,
+        extent_len_t block_size,
+        const journal_seq_t& committed_to,
+        segment_nonce_t segment_nonce);
+
+  private:
+    record_group_size_t get_encoded_length_after(
+        const record_t& record,
+        extent_len_t block_size) const {
+      return pending.size.get_encoded_length_after(
+          record.size, block_size);
+    }
+
+    state_t state = state_t::EMPTY;
+    std::size_t index = 0;
+    std::size_t batch_capacity = 0;
+    std::size_t batch_flush_size = 0;
+
+    record_group_t pending;
+    std::size_t submitting_size = 0;
+    seastore_off_t submitting_length = 0;
+    seastore_off_t submitting_mdlength = 0;
+
+    struct promise_result_t {
+      write_result_t write_result;
+      seastore_off_t mdlength;
+    };
+    using maybe_promise_result_t = std::optional<promise_result_t>;
+    std::optional<seastar::shared_promise<maybe_promise_result_t> > io_promise;
+  };
+
+  class RecordSubmitter {
+    enum class state_t {
+      IDLE = 0, // outstanding_io == 0
+      PENDING,  // outstanding_io <  io_depth_limit
+      FULL      // outstanding_io == io_depth_limit
+      // OVERFLOW: outstanding_io >  io_depth_limit is impossible
+    };
+
+    struct grouped_io_stats {
+      uint64_t num_io = 0;
+      uint64_t num_io_grouped = 0;
+
+      void increment(uint64_t num_grouped_io) {
+        ++num_io;
+        num_io_grouped += num_grouped_io;
+      }
+    };
+
+  public:
+    RecordSubmitter(std::size_t io_depth,
+                    std::size_t batch_capacity,
+                    std::size_t batch_flush_size,
+                    double preferred_fullness,
+                    JournalSegmentManager&);
+
+    grouped_io_stats get_record_batch_stats() const {
+      return stats.record_batch_stats;
+    }
+
+    grouped_io_stats get_io_depth_stats() const {
+      return stats.io_depth_stats;
+    }
+
+    uint64_t get_record_group_padding_bytes() const {
+      return stats.record_group_padding_bytes;
+    }
+
+    uint64_t get_record_group_metadata_bytes() const {
+      return stats.record_group_metadata_bytes;
+    }
+
+    uint64_t get_record_group_data_bytes() const {
+      return stats.record_group_data_bytes;
+    }
+
+    void reset_stats() {
+      stats = {};
+    }
+
+    void set_write_pipeline(WritePipeline *_write_pipeline) {
+      write_pipeline = _write_pipeline;
+    }
+
+    using submit_ret = Journal::submit_record_ret;
+    submit_ret submit(record_t&&, OrderingHandle&);
+    seastar::future<> flush(OrderingHandle &handle);
+
+  private:
+    void update_state();
+
+    void increment_io() {
+      ++num_outstanding_io;
+      stats.io_depth_stats.increment(num_outstanding_io);
+      update_state();
+    }
+
+    void decrement_io_with_flush();
+
+    void pop_free_batch() {
+      assert(p_current_batch == nullptr);
+      assert(!free_batch_ptrs.empty());
+      p_current_batch = free_batch_ptrs.front();
+      assert(p_current_batch->is_empty());
+      assert(p_current_batch == &batches[p_current_batch->get_index()]);
+      free_batch_ptrs.pop_front();
+    }
+
+    void account_submission(std::size_t, const record_group_size_t&);
+
+    using maybe_result_t = RecordBatch::maybe_result_t;
+    void finish_submit_batch(RecordBatch*, maybe_result_t);
+
+    void flush_current_batch();
+
+    using submit_pending_ertr = JournalSegmentManager::write_ertr;
+    using submit_pending_ret = submit_pending_ertr::future<
+      record_locator_t>;
+    submit_pending_ret submit_pending(
+        record_t&&, OrderingHandle &handle, bool flush);
+
+    using do_submit_ret = submit_pending_ret;
+    do_submit_ret do_submit(
+        record_t&&, OrderingHandle&);
+
+    state_t state = state_t::IDLE;
+    std::size_t num_outstanding_io = 0;
+    std::size_t io_depth_limit;
+    double preferred_fullness;
+
+    WritePipeline* write_pipeline = nullptr;
+    JournalSegmentManager& journal_segment_manager;
+    std::unique_ptr<RecordBatch[]> batches;
+    std::size_t current_batch_index;
+    // should not be nullptr after constructed
+    RecordBatch* p_current_batch = nullptr;
+    seastar::circular_buffer<RecordBatch*> free_batch_ptrs;
+    std::optional<seastar::promise<> > wait_submit_promise;
+
+    struct {
+      grouped_io_stats record_batch_stats;
+      grouped_io_stats io_depth_stats;
+      uint64_t record_group_padding_bytes = 0;
+      uint64_t record_group_metadata_bytes = 0;
+      uint64_t record_group_data_bytes = 0;
+    } stats;
+  };
+
+  SegmentProvider& segment_provider;
+  JournalSegmentManager journal_segment_manager;
+  RecordSubmitter record_submitter;
+  ExtentReader& scanner;
+  seastar::metrics::metric_group metrics;
+
+  /// read journal segment headers from scanner
+  using find_journal_segments_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error>;
+  using find_journal_segments_ret_bare = std::vector<
+    std::pair<segment_id_t, segment_header_t>>;
+  using find_journal_segments_ret = find_journal_segments_ertr::future<
+    find_journal_segments_ret_bare>;
+  find_journal_segments_ret find_journal_segments();
+
+  /// return ordered vector of segments to replay
+  using replay_segments_t = std::vector<
+    std::pair<journal_seq_t, segment_header_t>>;
+  using prep_replay_segments_ertr = crimson::errorator<
+    crimson::ct_error::input_output_error
+    >;
+  using prep_replay_segments_fut = prep_replay_segments_ertr::future<
+    replay_segments_t>;
+  prep_replay_segments_fut prep_replay_segments(
+    std::vector<std::pair<segment_id_t, segment_header_t>> segments);
+
+  /// replays records starting at start through end of segment
+  replay_ertr::future<>
+  replay_segment(
+    journal_seq_t start,             ///< [in] starting addr, seq
+    segment_header_t header,         ///< [in] segment header
+    delta_handler_t &delta_handler   ///< [in] processes deltas in order
+  );
+
+  void register_metrics();
+};
+
+}
index 1ff54fca9fd77010645c9b1f6b86f2bba0718a1e..42f3f551f4cd39b37f503e5d006c4363aa7af13a 100644 (file)
@@ -1414,13 +1414,11 @@ seastar::future<std::unique_ptr<SeaStore>> make_seastore(
       std::move(scanner),
       false /* detailed */);
 
-    auto journal = std::make_unique<Journal>(*sm, scanner_ref);
+    auto journal = journal::make_segmented(*sm, scanner_ref, *segment_cleaner);
     auto epm = std::make_unique<ExtentPlacementManager>();
     auto cache = std::make_unique<Cache>(scanner_ref, *epm);
     auto lba_manager = lba_manager::create_lba_manager(*sm, *cache);
 
-    journal->set_segment_provider(&*segment_cleaner);
-
     auto tm = std::make_unique<TransactionManager>(
       *sm,
       std::move(segment_cleaner),
index 8b87dc97f71c3b4ba81a77e953a2134ae54f7272..f0907b1cca664375521056765107731ee344ba9f 100644 (file)
@@ -432,49 +432,71 @@ SegmentCleaner::gc_reclaim_space_ret SegmentCleaner::gc_reclaim_space()
   });
 }
 
-SegmentCleaner::init_segments_ret SegmentCleaner::init_segments() {
-  logger().debug("SegmentCleaner::init_segments: {} segments", segments.size());
-  return seastar::do_with(
-    std::vector<std::pair<segment_id_t, segment_header_t>>(),
-    [this](auto& segment_set) {
-    return crimson::do_for_each(
-      segments.begin(),
-      segments.end(),
-      [this, &segment_set](auto& it) {
-       auto segment_id = it.first;
-       return scanner->read_segment_header(
-         segment_id
-       ).safe_then([&segment_set, segment_id, this](auto header) {
-         logger().debug(
-           "ExtentReader::init_segments: segment_id={} -- {}",
+SegmentCleaner::mount_ret SegmentCleaner::mount(
+  device_id_t pdevice_id,
+  std::vector<SegmentManager*>& sms)
+{
+  logger().debug(
+    "SegmentCleaner::mount: {} segment managers", sms.size());
+  init_complete = false;
+  stats = {};
+  journal_tail_target = journal_seq_t{};
+  journal_tail_committed = journal_seq_t{};
+  journal_head = journal_seq_t{};
+  journal_device_id = pdevice_id;
+  
+  space_tracker.reset(
+    detailed ?
+    (SpaceTrackerI*)new SpaceTrackerDetailed(
+      sms) :
+    (SpaceTrackerI*)new SpaceTrackerSimple(
+      sms));
+  
+  segments.clear();
+  for (auto sm : sms) {
+    // sms is a vector that is indexed by device id and
+    // always has "max_device" elements, some of which
+    // may be null.
+    if (!sm) {
+      continue;
+    }
+    segments.add_segment_manager(*sm);
+    stats.empty_segments += sm->get_num_segments();
+  }
+  metrics.clear();
+  register_metrics();
+
+  logger().debug("SegmentCleaner::mount: {} segments", segments.size());
+  return crimson::do_for_each(
+    segments.begin(),
+    segments.end(),
+    [this](auto& it) {
+      auto segment_id = it.first;
+      return scanner->read_segment_header(
+       segment_id
+      ).safe_then([segment_id, this](auto header) {
+       logger().debug(
+         "ExtentReader::mount: segment_id={} -- {}",
+         segment_id, header);
+       auto s_type = header.get_type();
+       if (s_type == segment_type_t::NULL_SEG) {
+         logger().error(
+           "ExtentReader::mount: got null segment, segment_id={} -- {}",
            segment_id, header);
-         auto s_type = header.get_type();
-         if (s_type == segment_type_t::NULL_SEG) {
-           logger().error(
-             "ExtentReader::init_segments: got null segment, segment_id={} -- {}",
-             segment_id, header);
-           ceph_abort();
-         }
-         if (s_type == segment_type_t::JOURNAL) {
-           segment_set.emplace_back(std::make_pair(segment_id, std::move(header)));
-         }
-         init_mark_segment_closed(
-           segment_id,
-           header.journal_segment_seq);
-       }).handle_error(
-         crimson::ct_error::enoent::handle([](auto) {
-           return init_segments_ertr::now();
-         }),
-         crimson::ct_error::enodata::handle([](auto) {
-           return init_segments_ertr::now();
-         }),
-         crimson::ct_error::input_output_error::pass_further{}
-       );
-      }).safe_then([&segment_set] {
-       return seastar::make_ready_future<
-         std::vector<std::pair<segment_id_t, segment_header_t>>>(
-           std::move(segment_set));
-      });
+         ceph_abort();
+       }
+       init_mark_segment_closed(
+         segment_id,
+         header.journal_segment_seq);
+      }).handle_error(
+       crimson::ct_error::enoent::handle([](auto) {
+         return mount_ertr::now();
+       }),
+       crimson::ct_error::enodata::handle([](auto) {
+         return mount_ertr::now();
+       }),
+       crimson::ct_error::input_output_error::pass_further{}
+      );
     });
 }
 
index e8b917d7533b5d3e7ad87f447f70ef4876a715da..6e0e774ea9de15a9ffefdcf55a7b5e43593afcfc 100644 (file)
@@ -11,7 +11,7 @@
 
 #include "crimson/common/log.h"
 #include "crimson/os/seastore/cached_extent.h"
-#include "crimson/os/seastore/journal.h"
+#include "crimson/os/seastore/extent_reader.h"
 #include "crimson/os/seastore/seastore_types.h"
 #include "crimson/os/seastore/segment_manager.h"
 #include "crimson/os/seastore/transaction.h"
@@ -676,44 +676,10 @@ public:
     ExtentReaderRef&& scanner,
     bool detailed = false);
 
-  void mount(device_id_t pdevice_id, std::vector<SegmentManager*>& sms) {
-    crimson::get_logger(ceph_subsys_seastore_cleaner).debug(
-      "SegmentCleaner::mount: {} segment managers", sms.size());
-    init_complete = false;
-    stats = {};
-    journal_tail_target = journal_seq_t{};
-    journal_tail_committed = journal_seq_t{};
-    journal_head = journal_seq_t{};
-    journal_device_id = pdevice_id;
-
-    space_tracker.reset(
-      detailed ?
-      (SpaceTrackerI*)new SpaceTrackerDetailed(
-       sms) :
-      (SpaceTrackerI*)new SpaceTrackerSimple(
-       sms));
-
-    segments.clear();
-    for (auto sm : sms) {
-      // sms is a vector that is indexed by device id and
-      // always has "max_device" elements, some of which
-      // may be null.
-      if (!sm) {
-       continue;
-      }
-      segments.add_segment_manager(*sm);
-      stats.empty_segments += sm->get_num_segments();
-    }
-    metrics.clear();
-    register_metrics();
-  }
-
-  using init_segments_ertr = crimson::errorator<
+  using mount_ertr = crimson::errorator<
     crimson::ct_error::input_output_error>;
-  using init_segments_ret_bare =
-    std::vector<std::pair<segment_id_t, segment_header_t>>;
-  using init_segments_ret = init_segments_ertr::future<init_segments_ret_bare>;
-  init_segments_ret init_segments();
+  using mount_ret = mount_ertr::future<>;
+  mount_ret mount(device_id_t pdevice_id, std::vector<SegmentManager*>& sms);
 
   get_segment_ret get_segment(
       device_id_t id, segment_seq_t seq) final;
index ec80eb92341773d58a83526cd20f46da3c6ebf61..00c2863a92f4e35491ee6d99e3c9b7469e693bfe 100644 (file)
@@ -46,10 +46,12 @@ TransactionManager::mkfs_ertr::future<> TransactionManager::mkfs()
 {
   LOG_PREFIX(TransactionManager::mkfs);
   INFO("enter");
-  segment_cleaner->mount(
+  return segment_cleaner->mount(
     segment_manager.get_device_id(),
-    scanner.get_segment_managers());
-  return journal->open_for_write().safe_then([this, FNAME](auto addr) {
+    scanner.get_segment_managers()
+  ).safe_then([this] {
+    return journal->open_for_write();
+  }).safe_then([this, FNAME](auto addr) {
     segment_cleaner->init_mkfs(addr);
     return with_transaction_intr(
       Transaction::src_t::MUTATE,
@@ -83,22 +85,20 @@ TransactionManager::mount_ertr::future<> TransactionManager::mount()
   LOG_PREFIX(TransactionManager::mount);
   INFO("enter");
   cache->init();
-  segment_cleaner->mount(
+  return segment_cleaner->mount(
     segment_manager.get_device_id(),
-    scanner.get_segment_managers());
-  return segment_cleaner->init_segments().safe_then(
-    [this](auto&& segments) {
+    scanner.get_segment_managers()
+  ).safe_then([this] {
     return journal->replay(
-      std::move(segments),
       [this](const auto &offsets, const auto &e) {
-      auto start_seq = offsets.write_result.start_seq;
-      segment_cleaner->update_journal_tail_target(
-          cache->get_oldest_dirty_from().value_or(start_seq));
-      return cache->replay_delta(
-          start_seq,
-          offsets.record_block_base,
-          e);
-    });
+       auto start_seq = offsets.write_result.start_seq;
+       segment_cleaner->update_journal_tail_target(
+         cache->get_oldest_dirty_from().value_or(start_seq));
+       return cache->replay_delta(
+         start_seq,
+         offsets.record_block_base,
+         e);
+      });
   }).safe_then([this] {
     return journal->open_for_write();
   }).safe_then([this, FNAME](auto addr) {
index 28b070bf598b69fd0799e322e2450fbedc8cce50..d49f0e575a892968b555962535af0588433863db 100644 (file)
@@ -138,9 +138,8 @@ void TMDriver::init()
     SegmentCleaner::config_t::get_default(),
     std::move(scanner),
     false /* detailed */);
-  std::vector<SegmentManager*> sms;
-  segment_cleaner->mount(segment_manager->get_device_id(), sms);
-  auto journal = std::make_unique<Journal>(*segment_manager, *scanner);
+  auto journal = journal::make_segmented(
+    *segment_manager, *scanner, *segment_cleaner);
   auto epm = std::make_unique<ExtentPlacementManager>();
   auto cache = std::make_unique<Cache>(scanner_ref, *epm);
   auto lba_manager = lba_manager::create_lba_manager(*segment_manager, *cache);
@@ -151,8 +150,6 @@ void TMDriver::init()
       *segment_cleaner,
       *segment_manager));
 
-  journal->set_segment_provider(&*segment_cleaner);
-
   tm = std::make_unique<TransactionManager>(
     *segment_manager,
     std::move(segment_cleaner),
index 9e2fe06692cbf048df641a82f8dc02318d4864cf..58ac9393535b7ed19aa4bcc5fb03a5df8f364484 100644 (file)
@@ -74,14 +74,15 @@ struct btree_test_base :
   seastar::future<> set_up_fut() final {
     segment_manager = segment_manager::create_test_ephemeral();
     scanner.reset(new ExtentReader());
-    journal.reset(new Journal(*segment_manager, *scanner));
+    auto& scanner_ref = *scanner.get();
+    journal = journal::make_segmented(
+      *segment_manager, scanner_ref, *this);
     epm.reset(new ExtentPlacementManager());
-    cache.reset(new Cache(*scanner, *epm));
+    cache.reset(new Cache(scanner_ref, *epm));
 
     block_size = segment_manager->get_block_size();
     next = segment_id_t{segment_manager->get_device_id(), 0};
-    scanner->add_segment_manager(segment_manager.get());
-    journal->set_segment_provider(this);
+    scanner_ref.add_segment_manager(segment_manager.get());
     journal->set_write_pipeline(&pipeline);
 
     return segment_manager->init(
index f1271e2c8229fbc2fbe8c81f6ec3a9690992b586..1a25392974c71f1b422236bb2fdf46c163bf93f5 100644 (file)
@@ -68,7 +68,7 @@ struct record_validator_t {
 struct journal_test_t : seastar_test_suite_t, SegmentProvider {
   segment_manager::EphemeralSegmentManagerRef segment_manager;
   WritePipeline pipeline;
-  std::unique_ptr<Journal> journal;
+  JournalRef journal;
 
   std::vector<record_validator_t> records;
 
@@ -102,9 +102,7 @@ struct journal_test_t : seastar_test_suite_t, SegmentProvider {
     block_size = segment_manager->get_block_size();
     scanner.reset(new ExtentReader());
     next = segment_id_t(segment_manager->get_device_id(), 0);
-    journal.reset(new Journal(*segment_manager, *scanner));
-
-    journal->set_segment_provider(this);
+    journal = journal::make_segmented(*segment_manager, *scanner, *this);
     journal->set_write_pipeline(&pipeline);
     scanner->add_segment_manager(segment_manager.get());
     return segment_manager->init(
@@ -134,48 +132,12 @@ struct journal_test_t : seastar_test_suite_t, SegmentProvider {
   auto replay(T &&f) {
     return journal->close(
     ).safe_then([this, f=std::move(f)]() mutable {
-      journal.reset(new Journal(*segment_manager, *scanner));
-      journal->set_segment_provider(this);
+      journal = journal::make_segmented(
+       *segment_manager, *scanner, *this);
       journal->set_write_pipeline(&pipeline);
-      return seastar::do_with(
-       std::vector<std::pair<segment_id_t, segment_header_t>>(),
-       [this](auto& segments) {
-       return crimson::do_for_each(
-         boost::make_counting_iterator(device_segment_id_t{0}),
-         boost::make_counting_iterator(device_segment_id_t{
-           segment_manager->get_num_segments()}),
-         [this, &segments](auto segment_id) {
-         return scanner->read_segment_header(segment_id_t{0, segment_id})
-         .safe_then([&segments, segment_id](auto header) {
-           if (header.get_type() == segment_type_t::JOURNAL) {
-             segments.emplace_back(
-               std::make_pair(
-                 segment_id_t{0, segment_id},
-                 std::move(header)
-               ));
-           }
-           return seastar::now();
-         }).handle_error(
-           crimson::ct_error::enoent::handle([](auto) {
-             return SegmentCleaner::init_segments_ertr::now();
-           }),
-           crimson::ct_error::enodata::handle([](auto) {
-             return SegmentCleaner::init_segments_ertr::now();
-           }),
-           crimson::ct_error::input_output_error::pass_further{}
-         );
-       }).safe_then([&segments] {
-         return seastar::make_ready_future<
-           std::vector<std::pair<segment_id_t, segment_header_t>>>(
-             std::move(segments));
-       });
-      }).safe_then([this, f=std::move(f)](auto&& segments) mutable {
-       return journal->replay(
-         std::move(segments),
-         std::forward<T>(std::move(f)));
-      }).safe_then([this] {
-       return journal->open_for_write();
-      });
+      return journal->replay(std::forward<T>(std::move(f)));
+    }).safe_then([this] {
+      return journal->open_for_write();
     });
   }
 
index 4fdf7895410a3ce875c8a59be032e0336d04157d..9f7d3aebb177795d88d21faf01bf0892db65543b 100644 (file)
@@ -79,7 +79,10 @@ auto get_transaction_manager(
     SegmentCleaner::config_t::get_default(),
     std::move(scanner),
     true);
-  auto journal = std::make_unique<Journal>(segment_manager, scanner_ref);
+  auto journal = journal::make_segmented(
+    segment_manager,
+    scanner_ref,
+    *segment_cleaner);
   auto epm = std::make_unique<ExtentPlacementManager>();
   auto cache = std::make_unique<Cache>(scanner_ref, *epm);
   auto lba_manager = lba_manager::create_lba_manager(segment_manager, *cache);
@@ -90,8 +93,6 @@ auto get_transaction_manager(
       *segment_cleaner,
       segment_manager));
 
-  journal->set_segment_provider(&*segment_cleaner);
-
   return std::make_unique<TransactionManager>(
     segment_manager,
     std::move(segment_cleaner),