]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/os/seastore/journal: relocate RecordSubmitter related classes
authormyoungwon oh <ohmyoungwon@gmail.com>
Mon, 27 Mar 2023 00:49:18 +0000 (00:49 +0000)
committermyoungwon oh <ohmyoungwon@gmail.com>
Thu, 20 Apr 2023 13:01:21 +0000 (13:01 +0000)
Signed-off-by: Myoungwon Oh <myoungwon.oh@samsung.com>
src/crimson/os/seastore/CMakeLists.txt
src/crimson/os/seastore/extent_placement_manager.h
src/crimson/os/seastore/journal/record_submitter.cc [new file with mode: 0644]
src/crimson/os/seastore/journal/record_submitter.h [new file with mode: 0644]
src/crimson/os/seastore/journal/segment_allocator.cc
src/crimson/os/seastore/journal/segment_allocator.h
src/crimson/os/seastore/journal/segmented_journal.h

index 0e998674c3e4fefdd018133e7d7efad40a0f1345..d26ae813afe68b117b2221e18dcf5aefd1df3381 100644 (file)
@@ -43,6 +43,7 @@ set(crimson_seastore_srcs
   random_block_manager/avlallocator.cc
   journal/segmented_journal.cc
   journal/segment_allocator.cc
+  journal/record_submitter.cc
   journal.cc
   device.cc
   segment_manager_group.cc
index 6664c86aff6546f3ff881bfe587a0dbc5f07dfc4..9ab9ce7fe9f3ea0d052e438745b683d3003864da 100644 (file)
@@ -8,6 +8,7 @@
 #include "crimson/os/seastore/async_cleaner.h"
 #include "crimson/os/seastore/cached_extent.h"
 #include "crimson/os/seastore/journal/segment_allocator.h"
+#include "crimson/os/seastore/journal/record_submitter.h"
 #include "crimson/os/seastore/transaction.h"
 #include "crimson/os/seastore/random_block_manager.h"
 #include "crimson/os/seastore/random_block_manager/block_rb_manager.h"
diff --git a/src/crimson/os/seastore/journal/record_submitter.cc b/src/crimson/os/seastore/journal/record_submitter.cc
new file mode 100644 (file)
index 0000000..09cac5b
--- /dev/null
@@ -0,0 +1,536 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#include "record_submitter.h"
+
+#include <fmt/format.h>
+#include <fmt/os.h>
+
+#include "crimson/os/seastore/logging.h"
+#include "crimson/os/seastore/async_cleaner.h"
+
+SET_SUBSYS(seastore_journal);
+
+namespace crimson::os::seastore::journal {
+
+RecordBatch::add_pending_ret
+RecordBatch::add_pending(
+  const std::string& name,
+  record_t&& record,
+  extent_len_t block_size)
+{
+  LOG_PREFIX(RecordBatch::add_pending);
+  auto new_size = get_encoded_length_after(record, block_size);
+  auto dlength_offset = pending.size.dlength;
+  TRACE("{} batches={}, write_size={}, dlength_offset={} ...",
+        name,
+        pending.get_size() + 1,
+        new_size.get_encoded_length(),
+        dlength_offset);
+  assert(state != state_t::SUBMITTING);
+  assert(evaluate_submit(record.size, block_size).submit_size == new_size);
+
+  pending.push_back(
+      std::move(record), block_size);
+  assert(pending.size == new_size);
+  if (state == state_t::EMPTY) {
+    assert(!io_promise.has_value());
+    io_promise = seastar::shared_promise<maybe_promise_result_t>();
+  } else {
+    assert(io_promise.has_value());
+  }
+  state = state_t::PENDING;
+
+  return io_promise->get_shared_future(
+  ).then([dlength_offset, FNAME, &name
+         ](auto maybe_promise_result) -> add_pending_ret {
+    if (!maybe_promise_result.has_value()) {
+      ERROR("{} write failed", name);
+      return crimson::ct_error::input_output_error::make();
+    }
+    auto write_result = maybe_promise_result->write_result;
+    auto submit_result = record_locator_t{
+      write_result.start_seq.offset.add_offset(
+          maybe_promise_result->mdlength + dlength_offset),
+      write_result
+    };
+    TRACE("{} write finish with {}", name, submit_result);
+    return add_pending_ret(
+      add_pending_ertr::ready_future_marker{},
+      submit_result);
+  });
+}
+
+std::pair<ceph::bufferlist, record_group_size_t>
+RecordBatch::encode_batch(
+  const journal_seq_t& committed_to,
+  segment_nonce_t segment_nonce)
+{
+  assert(state == state_t::PENDING);
+  assert(pending.get_size() > 0);
+  assert(io_promise.has_value());
+
+  state = state_t::SUBMITTING;
+  submitting_size = pending.get_size();
+  auto gsize = pending.size;
+  submitting_length = gsize.get_encoded_length();
+  submitting_mdlength = gsize.get_mdlength();
+  auto bl = encode_records(pending, committed_to, segment_nonce);
+  // Note: pending is cleared here
+  assert(bl.length() == submitting_length);
+  return std::make_pair(bl, gsize);
+}
+
+void RecordBatch::set_result(
+  maybe_result_t maybe_write_result)
+{
+  maybe_promise_result_t result;
+  if (maybe_write_result.has_value()) {
+    assert(maybe_write_result->length == submitting_length);
+    result = promise_result_t{
+      *maybe_write_result,
+      submitting_mdlength
+    };
+  }
+  assert(state == state_t::SUBMITTING);
+  assert(io_promise.has_value());
+
+  state = state_t::EMPTY;
+  submitting_size = 0;
+  submitting_length = 0;
+  submitting_mdlength = 0;
+  io_promise->set_value(result);
+  io_promise.reset();
+}
+
+std::pair<ceph::bufferlist, record_group_size_t>
+RecordBatch::submit_pending_fast(
+  record_t&& record,
+  extent_len_t block_size,
+  const journal_seq_t& committed_to,
+  segment_nonce_t segment_nonce)
+{
+  auto new_size = get_encoded_length_after(record, block_size);
+  std::ignore = new_size;
+  assert(state == state_t::EMPTY);
+  assert(evaluate_submit(record.size, block_size).submit_size == new_size);
+
+  auto group = record_group_t(std::move(record), block_size);
+  auto size = group.size;
+  assert(size == new_size);
+  auto bl = encode_records(group, committed_to, segment_nonce);
+  assert(bl.length() == size.get_encoded_length());
+  return std::make_pair(std::move(bl), size);
+}
+
+RecordSubmitter::RecordSubmitter(
+  std::size_t io_depth,
+  std::size_t batch_capacity,
+  std::size_t batch_flush_size,
+  double preferred_fullness,
+  SegmentAllocator& sa)
+  : io_depth_limit{io_depth},
+    preferred_fullness{preferred_fullness},
+    segment_allocator{sa},
+    batches(new RecordBatch[io_depth + 1])
+{
+  LOG_PREFIX(RecordSubmitter);
+  INFO("{} io_depth_limit={}, batch_capacity={}, batch_flush_size={}, "
+       "preferred_fullness={}",
+       get_name(), io_depth, batch_capacity,
+       batch_flush_size, preferred_fullness);
+  ceph_assert(io_depth > 0);
+  ceph_assert(batch_capacity > 0);
+  ceph_assert(preferred_fullness >= 0 &&
+              preferred_fullness <= 1);
+  free_batch_ptrs.reserve(io_depth + 1);
+  for (std::size_t i = 0; i <= io_depth; ++i) {
+    batches[i].initialize(i, batch_capacity, batch_flush_size);
+    free_batch_ptrs.push_back(&batches[i]);
+  }
+  pop_free_batch();
+}
+
+bool RecordSubmitter::is_available() const
+{
+  auto ret = !wait_available_promise.has_value() &&
+             !has_io_error;
+#ifndef NDEBUG
+  if (ret) {
+    // unconditional invariants
+    ceph_assert(segment_allocator.can_write());
+    ceph_assert(p_current_batch != nullptr);
+    ceph_assert(!p_current_batch->is_submitting());
+    // the current batch accepts a further write
+    ceph_assert(!p_current_batch->needs_flush());
+    if (!p_current_batch->is_empty()) {
+      auto submit_length =
+        p_current_batch->get_submit_size().get_encoded_length();
+      ceph_assert(!segment_allocator.needs_roll(submit_length));
+    }
+    // I'm not rolling
+  }
+#endif
+  return ret;
+}
+
+RecordSubmitter::wa_ertr::future<>
+RecordSubmitter::wait_available()
+{
+  LOG_PREFIX(RecordSubmitter::wait_available);
+  assert(!is_available());
+  if (has_io_error) {
+    ERROR("{} I/O is failed before wait", get_name());
+    return crimson::ct_error::input_output_error::make();
+  }
+  return wait_available_promise->get_shared_future(
+  ).then([FNAME, this]() -> wa_ertr::future<> {
+    if (has_io_error) {
+      ERROR("{} I/O is failed after wait", get_name());
+      return crimson::ct_error::input_output_error::make();
+    }
+    return wa_ertr::now();
+  });
+}
+
+RecordSubmitter::action_t
+RecordSubmitter::check_action(
+  const record_size_t& rsize) const
+{
+  assert(is_available());
+  auto eval = p_current_batch->evaluate_submit(
+      rsize, segment_allocator.get_block_size());
+  if (segment_allocator.needs_roll(eval.submit_size.get_encoded_length())) {
+    return action_t::ROLL;
+  } else if (eval.is_full) {
+    return action_t::SUBMIT_FULL;
+  } else {
+    return action_t::SUBMIT_NOT_FULL;
+  }
+}
+
+RecordSubmitter::roll_segment_ertr::future<>
+RecordSubmitter::roll_segment()
+{
+  LOG_PREFIX(RecordSubmitter::roll_segment);
+  ceph_assert(p_current_batch->needs_flush() ||
+              is_available());
+  // #1 block concurrent submissions due to rolling
+  wait_available_promise = seastar::shared_promise<>();
+  ceph_assert(!wait_unfull_flush_promise.has_value());
+  return [FNAME, this] {
+    if (p_current_batch->is_pending()) {
+      if (state == state_t::FULL) {
+        DEBUG("{} wait flush ...", get_name());
+        wait_unfull_flush_promise = seastar::promise<>();
+        return wait_unfull_flush_promise->get_future();
+      } else { // IDLE/PENDING
+        DEBUG("{} flush", get_name());
+        flush_current_batch();
+        return seastar::now();
+      }
+    } else {
+      assert(p_current_batch->is_empty());
+      return seastar::now();
+    }
+  }().then_wrapped([FNAME, this](auto fut) {
+    if (fut.failed()) {
+      ERROR("{} rolling is skipped unexpectedly, available", get_name());
+      has_io_error = true;
+      wait_available_promise->set_value();
+      wait_available_promise.reset();
+      return roll_segment_ertr::now();
+    } else {
+      // start rolling in background
+      std::ignore = segment_allocator.roll(
+      ).safe_then([FNAME, this] {
+        // good
+        DEBUG("{} rolling done, available", get_name());
+        assert(!has_io_error);
+        wait_available_promise->set_value();
+        wait_available_promise.reset();
+      }).handle_error(
+        crimson::ct_error::all_same_way([FNAME, this](auto e) {
+          ERROR("{} got error {}, available", get_name(), e);
+          has_io_error = true;
+          wait_available_promise->set_value();
+          wait_available_promise.reset();
+        })
+      ).handle_exception([FNAME, this](auto e) {
+        ERROR("{} got exception {}, available", get_name(), e);
+        has_io_error = true;
+        wait_available_promise->set_value();
+        wait_available_promise.reset();
+      });
+      // wait for background rolling
+      return wait_available();
+    }
+  });
+}
+
+RecordSubmitter::submit_ret
+RecordSubmitter::submit(
+    record_t&& record,
+    bool with_atomic_roll_segment)
+{
+  LOG_PREFIX(RecordSubmitter::submit);
+  ceph_assert(is_available());
+  assert(check_action(record.size) != action_t::ROLL);
+  segment_allocator.get_provider().update_modify_time(
+      segment_allocator.get_segment_id(),
+      record.modify_time,
+      record.extents.size());
+  auto eval = p_current_batch->evaluate_submit(
+      record.size, segment_allocator.get_block_size());
+  bool needs_flush = (
+      state == state_t::IDLE ||
+      eval.submit_size.get_fullness() > preferred_fullness ||
+      // RecordBatch::needs_flush()
+      eval.is_full ||
+      p_current_batch->get_num_records() + 1 >=
+        p_current_batch->get_batch_capacity());
+  if (p_current_batch->is_empty() &&
+      needs_flush &&
+      state != state_t::FULL) {
+    // fast path with direct write
+    increment_io();
+    auto [to_write, sizes] = p_current_batch->submit_pending_fast(
+      std::move(record),
+      segment_allocator.get_block_size(),
+      committed_to,
+      segment_allocator.get_nonce());
+    DEBUG("{} fast submit {}, committed_to={}, outstanding_io={} ...",
+          get_name(), sizes, committed_to, num_outstanding_io);
+    account_submission(1, sizes);
+    return segment_allocator.write(std::move(to_write)
+    ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
+      return record_locator_t{
+        write_result.start_seq.offset.add_offset(mdlength),
+        write_result
+      };
+    }).finally([this] {
+      decrement_io_with_flush();
+    });
+  }
+  // indirect batched write
+  auto write_fut = p_current_batch->add_pending(
+    get_name(),
+    std::move(record),
+    segment_allocator.get_block_size());
+  if (needs_flush) {
+    if (state == state_t::FULL) {
+      // #2 block concurrent submissions due to lack of resource
+      DEBUG("{} added with {} pending, outstanding_io={}, unavailable, wait flush ...",
+            get_name(),
+            p_current_batch->get_num_records(),
+            num_outstanding_io);
+      if (with_atomic_roll_segment) {
+        // wait_available_promise and wait_unfull_flush_promise
+        // need to be delegated to the follow-up atomic roll_segment();
+        assert(p_current_batch->is_pending());
+      } else {
+        wait_available_promise = seastar::shared_promise<>();
+        ceph_assert(!wait_unfull_flush_promise.has_value());
+        wait_unfull_flush_promise = seastar::promise<>();
+        // flush and mark available in background
+        std::ignore = wait_unfull_flush_promise->get_future(
+        ).finally([FNAME, this] {
+          DEBUG("{} flush done, available", get_name());
+          wait_available_promise->set_value();
+          wait_available_promise.reset();
+        });
+      }
+    } else {
+      DEBUG("{} added pending, flush", get_name());
+      flush_current_batch();
+    }
+  } else {
+    // will flush later
+    DEBUG("{} added with {} pending, outstanding_io={}",
+          get_name(),
+          p_current_batch->get_num_records(),
+          num_outstanding_io);
+    assert(!p_current_batch->needs_flush());
+  }
+  return write_fut;
+}
+
+RecordSubmitter::open_ret
+RecordSubmitter::open(bool is_mkfs)
+{
+  return segment_allocator.open(is_mkfs
+  ).safe_then([this](journal_seq_t ret) {
+    LOG_PREFIX(RecordSubmitter::open);
+    DEBUG("{} register metrics", get_name());
+    stats = {};
+    namespace sm = seastar::metrics;
+    std::vector<sm::label_instance> label_instances;
+    label_instances.push_back(sm::label_instance("submitter", get_name()));
+    metrics.add_group(
+      "journal",
+      {
+        sm::make_counter(
+          "record_num",
+          stats.record_batch_stats.num_io,
+          sm::description("total number of records submitted"),
+          label_instances
+        ),
+        sm::make_counter(
+          "record_batch_num",
+          stats.record_batch_stats.num_io_grouped,
+          sm::description("total number of records batched"),
+          label_instances
+        ),
+        sm::make_counter(
+          "io_num",
+          stats.io_depth_stats.num_io,
+          sm::description("total number of io submitted"),
+          label_instances
+        ),
+        sm::make_counter(
+          "io_depth_num",
+          stats.io_depth_stats.num_io_grouped,
+          sm::description("total number of io depth"),
+          label_instances
+        ),
+        sm::make_counter(
+          "record_group_padding_bytes",
+          stats.record_group_padding_bytes,
+          sm::description("bytes of metadata padding when write record groups"),
+          label_instances
+        ),
+        sm::make_counter(
+          "record_group_metadata_bytes",
+          stats.record_group_metadata_bytes,
+          sm::description("bytes of raw metadata when write record groups"),
+          label_instances
+        ),
+        sm::make_counter(
+          "record_group_data_bytes",
+          stats.record_group_data_bytes,
+          sm::description("bytes of data when write record groups"),
+          label_instances
+        ),
+      }
+    );
+    return ret;
+  });
+}
+
+RecordSubmitter::close_ertr::future<>
+RecordSubmitter::close()
+{
+  ceph_assert(state == state_t::IDLE);
+  ceph_assert(num_outstanding_io == 0);
+  committed_to = JOURNAL_SEQ_NULL;
+  ceph_assert(p_current_batch != nullptr);
+  ceph_assert(p_current_batch->is_empty());
+  ceph_assert(!wait_available_promise.has_value());
+  has_io_error = false;
+  ceph_assert(!wait_unfull_flush_promise.has_value());
+  metrics.clear();
+  return segment_allocator.close();
+}
+
+void RecordSubmitter::update_state()
+{
+  if (num_outstanding_io == 0) {
+    state = state_t::IDLE;
+  } else if (num_outstanding_io < io_depth_limit) {
+    state = state_t::PENDING;
+  } else if (num_outstanding_io == io_depth_limit) {
+    state = state_t::FULL;
+  } else {
+    ceph_abort("fatal error: io-depth overflow");
+  }
+}
+
+void RecordSubmitter::decrement_io_with_flush()
+{
+  LOG_PREFIX(RecordSubmitter::decrement_io_with_flush);
+  assert(num_outstanding_io > 0);
+  auto prv_state = state;
+  --num_outstanding_io;
+  update_state();
+
+  if (prv_state == state_t::FULL) {
+    if (wait_unfull_flush_promise.has_value()) {
+      DEBUG("{} flush, resolve wait_unfull_flush_promise", get_name());
+      assert(!p_current_batch->is_empty());
+      assert(wait_available_promise.has_value());
+      flush_current_batch();
+      wait_unfull_flush_promise->set_value();
+      wait_unfull_flush_promise.reset();
+      return;
+    }
+  } else {
+    ceph_assert(!wait_unfull_flush_promise.has_value());
+  }
+
+  auto needs_flush = (
+      !p_current_batch->is_empty() && (
+        state == state_t::IDLE ||
+        p_current_batch->get_submit_size().get_fullness() > preferred_fullness ||
+        p_current_batch->needs_flush()
+      ));
+  if (needs_flush) {
+    DEBUG("{} flush", get_name());
+    flush_current_batch();
+  }
+}
+
+void RecordSubmitter::account_submission(
+  std::size_t num,
+  const record_group_size_t& size)
+{
+  stats.record_group_padding_bytes +=
+    (size.get_mdlength() - size.get_raw_mdlength());
+  stats.record_group_metadata_bytes += size.get_raw_mdlength();
+  stats.record_group_data_bytes += size.dlength;
+  stats.record_batch_stats.increment(num);
+}
+
+void RecordSubmitter::finish_submit_batch(
+  RecordBatch* p_batch,
+  maybe_result_t maybe_result)
+{
+  assert(p_batch->is_submitting());
+  p_batch->set_result(maybe_result);
+  free_batch_ptrs.push_back(p_batch);
+  decrement_io_with_flush();
+}
+
+void RecordSubmitter::flush_current_batch()
+{
+  LOG_PREFIX(RecordSubmitter::flush_current_batch);
+  RecordBatch* p_batch = p_current_batch;
+  assert(p_batch->is_pending());
+  p_current_batch = nullptr;
+  pop_free_batch();
+
+  increment_io();
+  auto num = p_batch->get_num_records();
+  auto [to_write, sizes] = p_batch->encode_batch(
+    committed_to, segment_allocator.get_nonce());
+  DEBUG("{} {} records, {}, committed_to={}, outstanding_io={} ...",
+        get_name(), num, sizes, committed_to, num_outstanding_io);
+  account_submission(num, sizes);
+  std::ignore = segment_allocator.write(std::move(to_write)
+  ).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) {
+    TRACE("{} {} records, {}, write done with {}",
+          get_name(), num, sizes, write_result);
+    finish_submit_batch(p_batch, write_result);
+  }).handle_error(
+    crimson::ct_error::all_same_way([this, p_batch, FNAME, num, sizes=sizes](auto e) {
+      ERROR("{} {} records, {}, got error {}",
+            get_name(), num, sizes, e);
+      finish_submit_batch(p_batch, std::nullopt);
+    })
+  ).handle_exception([this, p_batch, FNAME, num, sizes=sizes](auto e) {
+    ERROR("{} {} records, {}, got exception {}",
+          get_name(), num, sizes, e);
+    finish_submit_batch(p_batch, std::nullopt);
+  });
+}
+
+}
diff --git a/src/crimson/os/seastore/journal/record_submitter.h b/src/crimson/os/seastore/journal/record_submitter.h
new file mode 100644 (file)
index 0000000..118d5a7
--- /dev/null
@@ -0,0 +1,316 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
+
+#pragma once
+
+#include <optional>
+#include <seastar/core/circular_buffer.hh>
+#include <seastar/core/metrics.hh>
+#include <seastar/core/shared_future.hh>
+
+#include "include/buffer.h"
+
+#include "crimson/common/errorator.h"
+#include "crimson/os/seastore/segment_manager_group.h"
+#include "crimson/os/seastore/segment_seq_allocator.h"
+#include "crimson/os/seastore/journal/segment_allocator.h"
+
+namespace crimson::os::seastore {
+  class SegmentProvider;
+  class JournalTrimmer;
+}
+
+namespace crimson::os::seastore::journal {
+
+/**
+ * RecordBatch
+ *
+ * Maintain a batch of records for submit.
+ */
+class RecordBatch {
+  enum class state_t {
+    EMPTY = 0,
+    PENDING,
+    SUBMITTING
+  };
+
+public:
+  RecordBatch() = default;
+  RecordBatch(RecordBatch&&) = delete;
+  RecordBatch(const RecordBatch&) = delete;
+  RecordBatch& operator=(RecordBatch&&) = delete;
+  RecordBatch& operator=(const RecordBatch&) = delete;
+
+  bool is_empty() const {
+    return state == state_t::EMPTY;
+  }
+
+  bool is_pending() const {
+    return state == state_t::PENDING;
+  }
+
+  bool is_submitting() const {
+    return state == state_t::SUBMITTING;
+  }
+
+  std::size_t get_index() const {
+    return index;
+  }
+
+  std::size_t get_num_records() const {
+    return pending.get_size();
+  }
+
+  std::size_t get_batch_capacity() const {
+    return batch_capacity;
+  }
+
+  const record_group_size_t& get_submit_size() const {
+    assert(state != state_t::EMPTY);
+    return pending.size;
+  }
+
+  bool needs_flush() const {
+    assert(state != state_t::SUBMITTING);
+    assert(pending.get_size() <= batch_capacity);
+    if (state == state_t::EMPTY) {
+      return false;
+    } else {
+      assert(state == state_t::PENDING);
+      return (pending.get_size() >= batch_capacity ||
+              pending.size.get_encoded_length() > batch_flush_size);
+    }
+  }
+
+  struct evaluation_t {
+    record_group_size_t submit_size;
+    bool is_full;
+  };
+  evaluation_t evaluate_submit(
+      const record_size_t& rsize,
+      extent_len_t block_size) const {
+    assert(!needs_flush());
+    auto submit_size = pending.size.get_encoded_length_after(
+        rsize, block_size);
+    bool is_full = submit_size.get_encoded_length() > batch_flush_size;
+    return {submit_size, is_full};
+  }
+
+  void initialize(std::size_t i,
+                  std::size_t _batch_capacity,
+                  std::size_t _batch_flush_size) {
+    ceph_assert(_batch_capacity > 0);
+    index = i;
+    batch_capacity = _batch_capacity;
+    batch_flush_size = _batch_flush_size;
+    pending.reserve(batch_capacity);
+  }
+
+  // Add to the batch, the future will be resolved after the batch is
+  // written.
+  //
+  // Set write_result_t::write_length to 0 if the record is not the first one
+  // in the batch.
+  using add_pending_ertr = SegmentAllocator::write_ertr;
+  using add_pending_ret = add_pending_ertr::future<record_locator_t>;
+  add_pending_ret add_pending(
+      const std::string& name,
+      record_t&&,
+      extent_len_t block_size);
+
+  // Encode the batched records for write.
+  std::pair<ceph::bufferlist, record_group_size_t> encode_batch(
+      const journal_seq_t& committed_to,
+      segment_nonce_t segment_nonce);
+
+  // Set the write result and reset for reuse
+  using maybe_result_t = std::optional<write_result_t>;
+  void set_result(maybe_result_t maybe_write_end_seq);
+
+  // The fast path that is equivalent to submit a single record as a batch.
+  //
+  // Essentially, equivalent to the combined logic of:
+  // add_pending(), encode_batch() and set_result() above without
+  // the intervention of the shared io_promise.
+  //
+  // Note the current RecordBatch can be reused afterwards.
+  std::pair<ceph::bufferlist, record_group_size_t> submit_pending_fast(
+      record_t&&,
+      extent_len_t block_size,
+      const journal_seq_t& committed_to,
+      segment_nonce_t segment_nonce);
+
+private:
+  record_group_size_t get_encoded_length_after(
+      const record_t& record,
+      extent_len_t block_size) const {
+    return pending.size.get_encoded_length_after(
+        record.size, block_size);
+  }
+
+  state_t state = state_t::EMPTY;
+  std::size_t index = 0;
+  std::size_t batch_capacity = 0;
+  std::size_t batch_flush_size = 0;
+
+  record_group_t pending;
+  std::size_t submitting_size = 0;
+  extent_len_t submitting_length = 0;
+  extent_len_t submitting_mdlength = 0;
+
+  struct promise_result_t {
+    write_result_t write_result;
+    extent_len_t mdlength;
+  };
+  using maybe_promise_result_t = std::optional<promise_result_t>;
+  std::optional<seastar::shared_promise<maybe_promise_result_t> > io_promise;
+};
+
+/**
+ * RecordSubmitter
+ *
+ * Submit records concurrently with RecordBatch with SegmentAllocator.
+ *
+ * Configurations and controls:
+ * - io_depth: the io-depth limit to SegmentAllocator;
+ * - batch_capacity: the number limit of records in a RecordBatch;
+ * - batch_flush_size: the bytes threshold to force flush a RecordBatch to
+ *   control the maximum latency;
+ * - preferred_fullness: the fullness threshold to flush a RecordBatch;
+ */
+class RecordSubmitter {
+  enum class state_t {
+    IDLE = 0, // outstanding_io == 0
+    PENDING,  // outstanding_io <  io_depth_limit
+    FULL      // outstanding_io == io_depth_limit
+    // OVERFLOW: outstanding_io >  io_depth_limit is impossible
+  };
+
+  struct grouped_io_stats {
+    uint64_t num_io = 0;
+    uint64_t num_io_grouped = 0;
+
+    void increment(uint64_t num_grouped_io) {
+      ++num_io;
+      num_io_grouped += num_grouped_io;
+    }
+  };
+
+  using base_ertr = crimson::errorator<
+      crimson::ct_error::input_output_error>;
+
+public:
+  RecordSubmitter(std::size_t io_depth,
+                  std::size_t batch_capacity,
+                  std::size_t batch_flush_size,
+                  double preferred_fullness,
+                  SegmentAllocator&);
+
+  const std::string& get_name() const {
+    return segment_allocator.get_name();
+  }
+
+  journal_seq_t get_committed_to() const {
+    return committed_to;
+  }
+
+  // whether is available to submit a record
+  bool is_available() const;
+
+  // wait for available if cannot submit, should check is_available() again
+  // when the future is resolved.
+  using wa_ertr = base_ertr;
+  wa_ertr::future<> wait_available();
+
+  // when available, check for the submit action
+  // according to the pending record size
+  enum class action_t {
+    ROLL,
+    SUBMIT_FULL,
+    SUBMIT_NOT_FULL
+  };
+  action_t check_action(const record_size_t&) const;
+
+  // when available, roll the segment if needed
+  using roll_segment_ertr = base_ertr;
+  roll_segment_ertr::future<> roll_segment();
+
+  // when available, submit the record if possible
+  using submit_ertr = base_ertr;
+  using submit_ret = submit_ertr::future<record_locator_t>;
+  submit_ret submit(record_t&&, bool with_atomic_roll_segment=false);
+
+  void update_committed_to(const journal_seq_t& new_committed_to) {
+    assert(new_committed_to != JOURNAL_SEQ_NULL);
+    assert(committed_to == JOURNAL_SEQ_NULL ||
+           committed_to <= new_committed_to);
+    committed_to = new_committed_to;
+  }
+
+  // open for write, generate the correct print name, and register metrics
+  using open_ertr = base_ertr;
+  using open_ret = open_ertr::future<journal_seq_t>;
+  open_ret open(bool is_mkfs);
+
+  using close_ertr = base_ertr;
+  close_ertr::future<> close();
+
+private:
+  void update_state();
+
+  void increment_io() {
+    ++num_outstanding_io;
+    stats.io_depth_stats.increment(num_outstanding_io);
+    update_state();
+  }
+
+  void decrement_io_with_flush();
+
+  void pop_free_batch() {
+    assert(p_current_batch == nullptr);
+    assert(!free_batch_ptrs.empty());
+    p_current_batch = free_batch_ptrs.front();
+    assert(p_current_batch->is_empty());
+    assert(p_current_batch == &batches[p_current_batch->get_index()]);
+    free_batch_ptrs.pop_front();
+  }
+
+  void account_submission(std::size_t, const record_group_size_t&);
+
+  using maybe_result_t = RecordBatch::maybe_result_t;
+  void finish_submit_batch(RecordBatch*, maybe_result_t);
+
+  void flush_current_batch();
+
+  state_t state = state_t::IDLE;
+  std::size_t num_outstanding_io = 0;
+  std::size_t io_depth_limit;
+  double preferred_fullness;
+
+  SegmentAllocator& segment_allocator;
+  // committed_to may be in a previous journal segment
+  journal_seq_t committed_to = JOURNAL_SEQ_NULL;
+
+  std::unique_ptr<RecordBatch[]> batches;
+  // should not be nullptr after constructed
+  RecordBatch* p_current_batch = nullptr;
+  seastar::circular_buffer<RecordBatch*> free_batch_ptrs;
+
+  // blocked for rolling or lack of resource
+  std::optional<seastar::shared_promise<> > wait_available_promise;
+  bool has_io_error = false;
+  // when needs flush but io depth is full,
+  // wait for decrement_io_with_flush()
+  std::optional<seastar::promise<> > wait_unfull_flush_promise;
+
+  struct {
+    grouped_io_stats record_batch_stats;
+    grouped_io_stats io_depth_stats;
+    uint64_t record_group_padding_bytes = 0;
+    uint64_t record_group_metadata_bytes = 0;
+    uint64_t record_group_data_bytes = 0;
+  } stats;
+  seastar::metrics::metric_group metrics;
+};
+
+}
index 7ecbf486763125ef5c2df521226f681d3289926a..61e1be585c8eec4d62d46d8593c2ccf6ff110acd 100644 (file)
@@ -280,524 +280,4 @@ SegmentAllocator::close_segment()
 
 }
 
-RecordBatch::add_pending_ret
-RecordBatch::add_pending(
-  const std::string& name,
-  record_t&& record,
-  extent_len_t block_size)
-{
-  LOG_PREFIX(RecordBatch::add_pending);
-  auto new_size = get_encoded_length_after(record, block_size);
-  auto dlength_offset = pending.size.dlength;
-  TRACE("{} batches={}, write_size={}, dlength_offset={} ...",
-        name,
-        pending.get_size() + 1,
-        new_size.get_encoded_length(),
-        dlength_offset);
-  assert(state != state_t::SUBMITTING);
-  assert(evaluate_submit(record.size, block_size).submit_size == new_size);
-
-  pending.push_back(
-      std::move(record), block_size);
-  assert(pending.size == new_size);
-  if (state == state_t::EMPTY) {
-    assert(!io_promise.has_value());
-    io_promise = seastar::shared_promise<maybe_promise_result_t>();
-  } else {
-    assert(io_promise.has_value());
-  }
-  state = state_t::PENDING;
-
-  return io_promise->get_shared_future(
-  ).then([dlength_offset, FNAME, &name
-         ](auto maybe_promise_result) -> add_pending_ret {
-    if (!maybe_promise_result.has_value()) {
-      ERROR("{} write failed", name);
-      return crimson::ct_error::input_output_error::make();
-    }
-    auto write_result = maybe_promise_result->write_result;
-    auto submit_result = record_locator_t{
-      write_result.start_seq.offset.add_offset(
-          maybe_promise_result->mdlength + dlength_offset),
-      write_result
-    };
-    TRACE("{} write finish with {}", name, submit_result);
-    return add_pending_ret(
-      add_pending_ertr::ready_future_marker{},
-      submit_result);
-  });
-}
-
-std::pair<ceph::bufferlist, record_group_size_t>
-RecordBatch::encode_batch(
-  const journal_seq_t& committed_to,
-  segment_nonce_t segment_nonce)
-{
-  assert(state == state_t::PENDING);
-  assert(pending.get_size() > 0);
-  assert(io_promise.has_value());
-
-  state = state_t::SUBMITTING;
-  submitting_size = pending.get_size();
-  auto gsize = pending.size;
-  submitting_length = gsize.get_encoded_length();
-  submitting_mdlength = gsize.get_mdlength();
-  auto bl = encode_records(pending, committed_to, segment_nonce);
-  // Note: pending is cleared here
-  assert(bl.length() == submitting_length);
-  return std::make_pair(bl, gsize);
-}
-
-void RecordBatch::set_result(
-  maybe_result_t maybe_write_result)
-{
-  maybe_promise_result_t result;
-  if (maybe_write_result.has_value()) {
-    assert(maybe_write_result->length == submitting_length);
-    result = promise_result_t{
-      *maybe_write_result,
-      submitting_mdlength
-    };
-  }
-  assert(state == state_t::SUBMITTING);
-  assert(io_promise.has_value());
-
-  state = state_t::EMPTY;
-  submitting_size = 0;
-  submitting_length = 0;
-  submitting_mdlength = 0;
-  io_promise->set_value(result);
-  io_promise.reset();
-}
-
-std::pair<ceph::bufferlist, record_group_size_t>
-RecordBatch::submit_pending_fast(
-  record_t&& record,
-  extent_len_t block_size,
-  const journal_seq_t& committed_to,
-  segment_nonce_t segment_nonce)
-{
-  auto new_size = get_encoded_length_after(record, block_size);
-  std::ignore = new_size;
-  assert(state == state_t::EMPTY);
-  assert(evaluate_submit(record.size, block_size).submit_size == new_size);
-
-  auto group = record_group_t(std::move(record), block_size);
-  auto size = group.size;
-  assert(size == new_size);
-  auto bl = encode_records(group, committed_to, segment_nonce);
-  assert(bl.length() == size.get_encoded_length());
-  return std::make_pair(std::move(bl), size);
-}
-
-RecordSubmitter::RecordSubmitter(
-  std::size_t io_depth,
-  std::size_t batch_capacity,
-  std::size_t batch_flush_size,
-  double preferred_fullness,
-  SegmentAllocator& sa)
-  : io_depth_limit{io_depth},
-    preferred_fullness{preferred_fullness},
-    segment_allocator{sa},
-    batches(new RecordBatch[io_depth + 1])
-{
-  LOG_PREFIX(RecordSubmitter);
-  INFO("{} io_depth_limit={}, batch_capacity={}, batch_flush_size={}, "
-       "preferred_fullness={}",
-       get_name(), io_depth, batch_capacity,
-       batch_flush_size, preferred_fullness);
-  ceph_assert(io_depth > 0);
-  ceph_assert(batch_capacity > 0);
-  ceph_assert(preferred_fullness >= 0 &&
-              preferred_fullness <= 1);
-  free_batch_ptrs.reserve(io_depth + 1);
-  for (std::size_t i = 0; i <= io_depth; ++i) {
-    batches[i].initialize(i, batch_capacity, batch_flush_size);
-    free_batch_ptrs.push_back(&batches[i]);
-  }
-  pop_free_batch();
-}
-
-bool RecordSubmitter::is_available() const
-{
-  auto ret = !wait_available_promise.has_value() &&
-             !has_io_error;
-#ifndef NDEBUG
-  if (ret) {
-    // unconditional invariants
-    ceph_assert(segment_allocator.can_write());
-    ceph_assert(p_current_batch != nullptr);
-    ceph_assert(!p_current_batch->is_submitting());
-    // the current batch accepts a further write
-    ceph_assert(!p_current_batch->needs_flush());
-    if (!p_current_batch->is_empty()) {
-      auto submit_length =
-        p_current_batch->get_submit_size().get_encoded_length();
-      ceph_assert(!segment_allocator.needs_roll(submit_length));
-    }
-    // I'm not rolling
-  }
-#endif
-  return ret;
-}
-
-RecordSubmitter::wa_ertr::future<>
-RecordSubmitter::wait_available()
-{
-  LOG_PREFIX(RecordSubmitter::wait_available);
-  assert(!is_available());
-  if (has_io_error) {
-    ERROR("{} I/O is failed before wait", get_name());
-    return crimson::ct_error::input_output_error::make();
-  }
-  return wait_available_promise->get_shared_future(
-  ).then([FNAME, this]() -> wa_ertr::future<> {
-    if (has_io_error) {
-      ERROR("{} I/O is failed after wait", get_name());
-      return crimson::ct_error::input_output_error::make();
-    }
-    return wa_ertr::now();
-  });
-}
-
-RecordSubmitter::action_t
-RecordSubmitter::check_action(
-  const record_size_t& rsize) const
-{
-  assert(is_available());
-  auto eval = p_current_batch->evaluate_submit(
-      rsize, segment_allocator.get_block_size());
-  if (segment_allocator.needs_roll(eval.submit_size.get_encoded_length())) {
-    return action_t::ROLL;
-  } else if (eval.is_full) {
-    return action_t::SUBMIT_FULL;
-  } else {
-    return action_t::SUBMIT_NOT_FULL;
-  }
-}
-
-RecordSubmitter::roll_segment_ertr::future<>
-RecordSubmitter::roll_segment()
-{
-  LOG_PREFIX(RecordSubmitter::roll_segment);
-  ceph_assert(p_current_batch->needs_flush() ||
-              is_available());
-  // #1 block concurrent submissions due to rolling
-  wait_available_promise = seastar::shared_promise<>();
-  ceph_assert(!wait_unfull_flush_promise.has_value());
-  return [FNAME, this] {
-    if (p_current_batch->is_pending()) {
-      if (state == state_t::FULL) {
-        DEBUG("{} wait flush ...", get_name());
-        wait_unfull_flush_promise = seastar::promise<>();
-        return wait_unfull_flush_promise->get_future();
-      } else { // IDLE/PENDING
-        DEBUG("{} flush", get_name());
-        flush_current_batch();
-        return seastar::now();
-      }
-    } else {
-      assert(p_current_batch->is_empty());
-      return seastar::now();
-    }
-  }().then_wrapped([FNAME, this](auto fut) {
-    if (fut.failed()) {
-      ERROR("{} rolling is skipped unexpectedly, available", get_name());
-      has_io_error = true;
-      wait_available_promise->set_value();
-      wait_available_promise.reset();
-      return roll_segment_ertr::now();
-    } else {
-      // start rolling in background
-      std::ignore = segment_allocator.roll(
-      ).safe_then([FNAME, this] {
-        // good
-        DEBUG("{} rolling done, available", get_name());
-        assert(!has_io_error);
-        wait_available_promise->set_value();
-        wait_available_promise.reset();
-      }).handle_error(
-        crimson::ct_error::all_same_way([FNAME, this](auto e) {
-          ERROR("{} got error {}, available", get_name(), e);
-          has_io_error = true;
-          wait_available_promise->set_value();
-          wait_available_promise.reset();
-        })
-      ).handle_exception([FNAME, this](auto e) {
-        ERROR("{} got exception {}, available", get_name(), e);
-        has_io_error = true;
-        wait_available_promise->set_value();
-        wait_available_promise.reset();
-      });
-      // wait for background rolling
-      return wait_available();
-    }
-  });
-}
-
-RecordSubmitter::submit_ret
-RecordSubmitter::submit(
-    record_t&& record,
-    bool with_atomic_roll_segment)
-{
-  LOG_PREFIX(RecordSubmitter::submit);
-  ceph_assert(is_available());
-  assert(check_action(record.size) != action_t::ROLL);
-  segment_allocator.get_provider().update_modify_time(
-      segment_allocator.get_segment_id(),
-      record.modify_time,
-      record.extents.size());
-  auto eval = p_current_batch->evaluate_submit(
-      record.size, segment_allocator.get_block_size());
-  bool needs_flush = (
-      state == state_t::IDLE ||
-      eval.submit_size.get_fullness() > preferred_fullness ||
-      // RecordBatch::needs_flush()
-      eval.is_full ||
-      p_current_batch->get_num_records() + 1 >=
-        p_current_batch->get_batch_capacity());
-  if (p_current_batch->is_empty() &&
-      needs_flush &&
-      state != state_t::FULL) {
-    // fast path with direct write
-    increment_io();
-    auto [to_write, sizes] = p_current_batch->submit_pending_fast(
-      std::move(record),
-      segment_allocator.get_block_size(),
-      committed_to,
-      segment_allocator.get_nonce());
-    DEBUG("{} fast submit {}, committed_to={}, outstanding_io={} ...",
-          get_name(), sizes, committed_to, num_outstanding_io);
-    account_submission(1, sizes);
-    return segment_allocator.write(std::move(to_write)
-    ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
-      return record_locator_t{
-        write_result.start_seq.offset.add_offset(mdlength),
-        write_result
-      };
-    }).finally([this] {
-      decrement_io_with_flush();
-    });
-  }
-  // indirect batched write
-  auto write_fut = p_current_batch->add_pending(
-    get_name(),
-    std::move(record),
-    segment_allocator.get_block_size());
-  if (needs_flush) {
-    if (state == state_t::FULL) {
-      // #2 block concurrent submissions due to lack of resource
-      DEBUG("{} added with {} pending, outstanding_io={}, unavailable, wait flush ...",
-            get_name(),
-            p_current_batch->get_num_records(),
-            num_outstanding_io);
-      if (with_atomic_roll_segment) {
-        // wait_available_promise and wait_unfull_flush_promise
-        // need to be delegated to the follow-up atomic roll_segment();
-        assert(p_current_batch->is_pending());
-      } else {
-        wait_available_promise = seastar::shared_promise<>();
-        ceph_assert(!wait_unfull_flush_promise.has_value());
-        wait_unfull_flush_promise = seastar::promise<>();
-        // flush and mark available in background
-        std::ignore = wait_unfull_flush_promise->get_future(
-        ).finally([FNAME, this] {
-          DEBUG("{} flush done, available", get_name());
-          wait_available_promise->set_value();
-          wait_available_promise.reset();
-        });
-      }
-    } else {
-      DEBUG("{} added pending, flush", get_name());
-      flush_current_batch();
-    }
-  } else {
-    // will flush later
-    DEBUG("{} added with {} pending, outstanding_io={}",
-          get_name(),
-          p_current_batch->get_num_records(),
-          num_outstanding_io);
-    assert(!p_current_batch->needs_flush());
-  }
-  return write_fut;
-}
-
-RecordSubmitter::open_ret
-RecordSubmitter::open(bool is_mkfs)
-{
-  return segment_allocator.open(is_mkfs
-  ).safe_then([this](journal_seq_t ret) {
-    LOG_PREFIX(RecordSubmitter::open);
-    DEBUG("{} register metrics", get_name());
-    stats = {};
-    namespace sm = seastar::metrics;
-    std::vector<sm::label_instance> label_instances;
-    label_instances.push_back(sm::label_instance("submitter", get_name()));
-    metrics.add_group(
-      "journal",
-      {
-        sm::make_counter(
-          "record_num",
-          stats.record_batch_stats.num_io,
-          sm::description("total number of records submitted"),
-          label_instances
-        ),
-        sm::make_counter(
-          "record_batch_num",
-          stats.record_batch_stats.num_io_grouped,
-          sm::description("total number of records batched"),
-          label_instances
-        ),
-        sm::make_counter(
-          "io_num",
-          stats.io_depth_stats.num_io,
-          sm::description("total number of io submitted"),
-          label_instances
-        ),
-        sm::make_counter(
-          "io_depth_num",
-          stats.io_depth_stats.num_io_grouped,
-          sm::description("total number of io depth"),
-          label_instances
-        ),
-        sm::make_counter(
-          "record_group_padding_bytes",
-          stats.record_group_padding_bytes,
-          sm::description("bytes of metadata padding when write record groups"),
-          label_instances
-        ),
-        sm::make_counter(
-          "record_group_metadata_bytes",
-          stats.record_group_metadata_bytes,
-          sm::description("bytes of raw metadata when write record groups"),
-          label_instances
-        ),
-        sm::make_counter(
-          "record_group_data_bytes",
-          stats.record_group_data_bytes,
-          sm::description("bytes of data when write record groups"),
-          label_instances
-        ),
-      }
-    );
-    return ret;
-  });
-}
-
-RecordSubmitter::close_ertr::future<>
-RecordSubmitter::close()
-{
-  ceph_assert(state == state_t::IDLE);
-  ceph_assert(num_outstanding_io == 0);
-  committed_to = JOURNAL_SEQ_NULL;
-  ceph_assert(p_current_batch != nullptr);
-  ceph_assert(p_current_batch->is_empty());
-  ceph_assert(!wait_available_promise.has_value());
-  has_io_error = false;
-  ceph_assert(!wait_unfull_flush_promise.has_value());
-  metrics.clear();
-  return segment_allocator.close();
-}
-
-void RecordSubmitter::update_state()
-{
-  if (num_outstanding_io == 0) {
-    state = state_t::IDLE;
-  } else if (num_outstanding_io < io_depth_limit) {
-    state = state_t::PENDING;
-  } else if (num_outstanding_io == io_depth_limit) {
-    state = state_t::FULL;
-  } else {
-    ceph_abort("fatal error: io-depth overflow");
-  }
-}
-
-void RecordSubmitter::decrement_io_with_flush()
-{
-  LOG_PREFIX(RecordSubmitter::decrement_io_with_flush);
-  assert(num_outstanding_io > 0);
-  auto prv_state = state;
-  --num_outstanding_io;
-  update_state();
-
-  if (prv_state == state_t::FULL) {
-    if (wait_unfull_flush_promise.has_value()) {
-      DEBUG("{} flush, resolve wait_unfull_flush_promise", get_name());
-      assert(!p_current_batch->is_empty());
-      assert(wait_available_promise.has_value());
-      flush_current_batch();
-      wait_unfull_flush_promise->set_value();
-      wait_unfull_flush_promise.reset();
-      return;
-    }
-  } else {
-    ceph_assert(!wait_unfull_flush_promise.has_value());
-  }
-
-  auto needs_flush = (
-      !p_current_batch->is_empty() && (
-        state == state_t::IDLE ||
-        p_current_batch->get_submit_size().get_fullness() > preferred_fullness ||
-        p_current_batch->needs_flush()
-      ));
-  if (needs_flush) {
-    DEBUG("{} flush", get_name());
-    flush_current_batch();
-  }
-}
-
-void RecordSubmitter::account_submission(
-  std::size_t num,
-  const record_group_size_t& size)
-{
-  stats.record_group_padding_bytes +=
-    (size.get_mdlength() - size.get_raw_mdlength());
-  stats.record_group_metadata_bytes += size.get_raw_mdlength();
-  stats.record_group_data_bytes += size.dlength;
-  stats.record_batch_stats.increment(num);
-}
-
-void RecordSubmitter::finish_submit_batch(
-  RecordBatch* p_batch,
-  maybe_result_t maybe_result)
-{
-  assert(p_batch->is_submitting());
-  p_batch->set_result(maybe_result);
-  free_batch_ptrs.push_back(p_batch);
-  decrement_io_with_flush();
-}
-
-void RecordSubmitter::flush_current_batch()
-{
-  LOG_PREFIX(RecordSubmitter::flush_current_batch);
-  RecordBatch* p_batch = p_current_batch;
-  assert(p_batch->is_pending());
-  p_current_batch = nullptr;
-  pop_free_batch();
-
-  increment_io();
-  auto num = p_batch->get_num_records();
-  auto [to_write, sizes] = p_batch->encode_batch(
-    committed_to, segment_allocator.get_nonce());
-  DEBUG("{} {} records, {}, committed_to={}, outstanding_io={} ...",
-        get_name(), num, sizes, committed_to, num_outstanding_io);
-  account_submission(num, sizes);
-  std::ignore = segment_allocator.write(std::move(to_write)
-  ).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) {
-    TRACE("{} {} records, {}, write done with {}",
-          get_name(), num, sizes, write_result);
-    finish_submit_batch(p_batch, write_result);
-  }).handle_error(
-    crimson::ct_error::all_same_way([this, p_batch, FNAME, num, sizes=sizes](auto e) {
-      ERROR("{} {} records, {}, got error {}",
-            get_name(), num, sizes, e);
-      finish_submit_batch(p_batch, std::nullopt);
-    })
-  ).handle_exception([this, p_batch, FNAME, num, sizes=sizes](auto e) {
-    ERROR("{} {} records, {}, got exception {}",
-          get_name(), num, sizes, e);
-    finish_submit_batch(p_batch, std::nullopt);
-  });
-}
-
 }
index e113cd2aecf7c5e2caa797762d0144fc43ecfbe2..8cab895f8c76e0b4c384a6bc8b3c892285371839 100644 (file)
@@ -132,295 +132,4 @@ class SegmentAllocator {
   JournalTrimmer *trimmer;
 };
 
-/**
- * RecordBatch
- *
- * Maintain a batch of records for submit.
- */
-class RecordBatch {
-  enum class state_t {
-    EMPTY = 0,
-    PENDING,
-    SUBMITTING
-  };
-
-public:
-  RecordBatch() = default;
-  RecordBatch(RecordBatch&&) = delete;
-  RecordBatch(const RecordBatch&) = delete;
-  RecordBatch& operator=(RecordBatch&&) = delete;
-  RecordBatch& operator=(const RecordBatch&) = delete;
-
-  bool is_empty() const {
-    return state == state_t::EMPTY;
-  }
-
-  bool is_pending() const {
-    return state == state_t::PENDING;
-  }
-
-  bool is_submitting() const {
-    return state == state_t::SUBMITTING;
-  }
-
-  std::size_t get_index() const {
-    return index;
-  }
-
-  std::size_t get_num_records() const {
-    return pending.get_size();
-  }
-
-  std::size_t get_batch_capacity() const {
-    return batch_capacity;
-  }
-
-  const record_group_size_t& get_submit_size() const {
-    assert(state != state_t::EMPTY);
-    return pending.size;
-  }
-
-  bool needs_flush() const {
-    assert(state != state_t::SUBMITTING);
-    assert(pending.get_size() <= batch_capacity);
-    if (state == state_t::EMPTY) {
-      return false;
-    } else {
-      assert(state == state_t::PENDING);
-      return (pending.get_size() >= batch_capacity ||
-              pending.size.get_encoded_length() > batch_flush_size);
-    }
-  }
-
-  struct evaluation_t {
-    record_group_size_t submit_size;
-    bool is_full;
-  };
-  evaluation_t evaluate_submit(
-      const record_size_t& rsize,
-      extent_len_t block_size) const {
-    assert(!needs_flush());
-    auto submit_size = pending.size.get_encoded_length_after(
-        rsize, block_size);
-    bool is_full = submit_size.get_encoded_length() > batch_flush_size;
-    return {submit_size, is_full};
-  }
-
-  void initialize(std::size_t i,
-                  std::size_t _batch_capacity,
-                  std::size_t _batch_flush_size) {
-    ceph_assert(_batch_capacity > 0);
-    index = i;
-    batch_capacity = _batch_capacity;
-    batch_flush_size = _batch_flush_size;
-    pending.reserve(batch_capacity);
-  }
-
-  // Add to the batch, the future will be resolved after the batch is
-  // written.
-  //
-  // Set write_result_t::write_length to 0 if the record is not the first one
-  // in the batch.
-  using add_pending_ertr = SegmentAllocator::write_ertr;
-  using add_pending_ret = add_pending_ertr::future<record_locator_t>;
-  add_pending_ret add_pending(
-      const std::string& name,
-      record_t&&,
-      extent_len_t block_size);
-
-  // Encode the batched records for write.
-  std::pair<ceph::bufferlist, record_group_size_t> encode_batch(
-      const journal_seq_t& committed_to,
-      segment_nonce_t segment_nonce);
-
-  // Set the write result and reset for reuse
-  using maybe_result_t = std::optional<write_result_t>;
-  void set_result(maybe_result_t maybe_write_end_seq);
-
-  // The fast path that is equivalent to submit a single record as a batch.
-  //
-  // Essentially, equivalent to the combined logic of:
-  // add_pending(), encode_batch() and set_result() above without
-  // the intervention of the shared io_promise.
-  //
-  // Note the current RecordBatch can be reused afterwards.
-  std::pair<ceph::bufferlist, record_group_size_t> submit_pending_fast(
-      record_t&&,
-      extent_len_t block_size,
-      const journal_seq_t& committed_to,
-      segment_nonce_t segment_nonce);
-
-private:
-  record_group_size_t get_encoded_length_after(
-      const record_t& record,
-      extent_len_t block_size) const {
-    return pending.size.get_encoded_length_after(
-        record.size, block_size);
-  }
-
-  state_t state = state_t::EMPTY;
-  std::size_t index = 0;
-  std::size_t batch_capacity = 0;
-  std::size_t batch_flush_size = 0;
-
-  record_group_t pending;
-  std::size_t submitting_size = 0;
-  extent_len_t submitting_length = 0;
-  extent_len_t submitting_mdlength = 0;
-
-  struct promise_result_t {
-    write_result_t write_result;
-    extent_len_t mdlength;
-  };
-  using maybe_promise_result_t = std::optional<promise_result_t>;
-  std::optional<seastar::shared_promise<maybe_promise_result_t> > io_promise;
-};
-
-/**
- * RecordSubmitter
- *
- * Submit records concurrently with RecordBatch with SegmentAllocator.
- *
- * Configurations and controls:
- * - io_depth: the io-depth limit to SegmentAllocator;
- * - batch_capacity: the number limit of records in a RecordBatch;
- * - batch_flush_size: the bytes threshold to force flush a RecordBatch to
- *   control the maximum latency;
- * - preferred_fullness: the fullness threshold to flush a RecordBatch;
- */
-class RecordSubmitter {
-  enum class state_t {
-    IDLE = 0, // outstanding_io == 0
-    PENDING,  // outstanding_io <  io_depth_limit
-    FULL      // outstanding_io == io_depth_limit
-    // OVERFLOW: outstanding_io >  io_depth_limit is impossible
-  };
-
-  struct grouped_io_stats {
-    uint64_t num_io = 0;
-    uint64_t num_io_grouped = 0;
-
-    void increment(uint64_t num_grouped_io) {
-      ++num_io;
-      num_io_grouped += num_grouped_io;
-    }
-  };
-
-  using base_ertr = crimson::errorator<
-      crimson::ct_error::input_output_error>;
-
-public:
-  RecordSubmitter(std::size_t io_depth,
-                  std::size_t batch_capacity,
-                  std::size_t batch_flush_size,
-                  double preferred_fullness,
-                  SegmentAllocator&);
-
-  const std::string& get_name() const {
-    return segment_allocator.get_name();
-  }
-
-  journal_seq_t get_committed_to() const {
-    return committed_to;
-  }
-
-  // whether is available to submit a record
-  bool is_available() const;
-
-  // wait for available if cannot submit, should check is_available() again
-  // when the future is resolved.
-  using wa_ertr = base_ertr;
-  wa_ertr::future<> wait_available();
-
-  // when available, check for the submit action
-  // according to the pending record size
-  enum class action_t {
-    ROLL,
-    SUBMIT_FULL,
-    SUBMIT_NOT_FULL
-  };
-  action_t check_action(const record_size_t&) const;
-
-  // when available, roll the segment if needed
-  using roll_segment_ertr = base_ertr;
-  roll_segment_ertr::future<> roll_segment();
-
-  // when available, submit the record if possible
-  using submit_ertr = base_ertr;
-  using submit_ret = submit_ertr::future<record_locator_t>;
-  submit_ret submit(record_t&&, bool with_atomic_roll_segment=false);
-
-  void update_committed_to(const journal_seq_t& new_committed_to) {
-    assert(new_committed_to != JOURNAL_SEQ_NULL);
-    assert(committed_to == JOURNAL_SEQ_NULL ||
-           committed_to <= new_committed_to);
-    committed_to = new_committed_to;
-  }
-
-  // open for write, generate the correct print name, and register metrics
-  using open_ertr = base_ertr;
-  using open_ret = open_ertr::future<journal_seq_t>;
-  open_ret open(bool is_mkfs);
-
-  using close_ertr = base_ertr;
-  close_ertr::future<> close();
-
-private:
-  void update_state();
-
-  void increment_io() {
-    ++num_outstanding_io;
-    stats.io_depth_stats.increment(num_outstanding_io);
-    update_state();
-  }
-
-  void decrement_io_with_flush();
-
-  void pop_free_batch() {
-    assert(p_current_batch == nullptr);
-    assert(!free_batch_ptrs.empty());
-    p_current_batch = free_batch_ptrs.front();
-    assert(p_current_batch->is_empty());
-    assert(p_current_batch == &batches[p_current_batch->get_index()]);
-    free_batch_ptrs.pop_front();
-  }
-
-  void account_submission(std::size_t, const record_group_size_t&);
-
-  using maybe_result_t = RecordBatch::maybe_result_t;
-  void finish_submit_batch(RecordBatch*, maybe_result_t);
-
-  void flush_current_batch();
-
-  state_t state = state_t::IDLE;
-  std::size_t num_outstanding_io = 0;
-  std::size_t io_depth_limit;
-  double preferred_fullness;
-
-  SegmentAllocator& segment_allocator;
-  // committed_to may be in a previous journal segment
-  journal_seq_t committed_to = JOURNAL_SEQ_NULL;
-
-  std::unique_ptr<RecordBatch[]> batches;
-  // should not be nullptr after constructed
-  RecordBatch* p_current_batch = nullptr;
-  seastar::circular_buffer<RecordBatch*> free_batch_ptrs;
-
-  // blocked for rolling or lack of resource
-  std::optional<seastar::shared_promise<> > wait_available_promise;
-  bool has_io_error = false;
-  // when needs flush but io depth is full,
-  // wait for decrement_io_with_flush()
-  std::optional<seastar::promise<> > wait_unfull_flush_promise;
-
-  struct {
-    grouped_io_stats record_batch_stats;
-    grouped_io_stats io_depth_stats;
-    uint64_t record_group_padding_bytes = 0;
-    uint64_t record_group_metadata_bytes = 0;
-    uint64_t record_group_data_bytes = 0;
-  } stats;
-  seastar::metrics::metric_group metrics;
-};
-
 }
index dbeede8944e382b6448577715db29ac30fd7d351..3d580817c0f493faf0fa500c543b020750f53caf 100644 (file)
@@ -17,6 +17,7 @@
 #include "crimson/osd/exceptions.h"
 #include "segment_allocator.h"
 #include "crimson/os/seastore/segment_seq_allocator.h"
+#include "record_submitter.h"
 
 namespace crimson::os::seastore::journal {
 /**