]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/os/seastore/journal: rework RecordSubmitter for reuse
authorYingxin Cheng <yingxin.cheng@intel.com>
Wed, 9 Mar 2022 13:44:30 +0000 (21:44 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Fri, 18 Mar 2022 02:15:47 +0000 (10:15 +0800)
* Redesign submission logic to support concurrent submits;
* Extract out journal specific write_pipeline and flush;
* Distinguish logs by name;

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/os/seastore/journal/segmented_journal.cc
src/crimson/os/seastore/journal/segmented_journal.h

index db4596d22296e43018e89aa18485e6ad1a316e0d..f60cd4fd5fbc4eb7ca085c117f3964236c03cb26 100644 (file)
@@ -304,6 +304,82 @@ SegmentedJournal::replay_ret SegmentedJournal::replay(
   });
 }
 
+seastar::future<> SegmentedJournal::flush(OrderingHandle &handle)
+{
+  LOG_PREFIX(SegmentedJournal::flush);
+  DEBUG("H{} flush ...", (void*)&handle);
+  assert(write_pipeline);
+  return handle.enter(write_pipeline->device_submission
+  ).then([this, &handle] {
+    return handle.enter(write_pipeline->finalize);
+  }).then([FNAME, &handle] {
+    DEBUG("H{} flush done", (void*)&handle);
+  });
+}
+
+SegmentedJournal::submit_record_ret
+SegmentedJournal::do_submit_record(
+  record_t &&record,
+  OrderingHandle &handle)
+{
+  LOG_PREFIX(SegmentedJournal::do_submit_record);
+  if (!record_submitter.is_available()) {
+    DEBUG("H{} wait ...", (void*)&handle);
+    return record_submitter.wait_available(
+    ).safe_then([this, record=std::move(record), &handle]() mutable {
+      return do_submit_record(std::move(record), handle);
+    });
+  }
+  auto action = record_submitter.check_action(record.size);
+  if (action == RecordSubmitter::action_t::ROLL) {
+    DEBUG("H{} roll, unavailable ...", (void*)&handle);
+    return record_submitter.roll_segment(
+    ).safe_then([this, record=std::move(record), &handle]() mutable {
+      return do_submit_record(std::move(record), handle);
+    });
+  } else { // SUBMIT_FULL/NOT_FULL
+    DEBUG("H{} submit {} ...",
+          (void*)&handle,
+          action == RecordSubmitter::action_t::SUBMIT_FULL ?
+          "FULL" : "NOT_FULL");
+    auto submit_fut = record_submitter.submit(std::move(record));
+    return handle.enter(write_pipeline->device_submission
+    ).then([submit_fut=std::move(submit_fut)]() mutable {
+      return std::move(submit_fut);
+    }).safe_then([FNAME, this, &handle](record_locator_t result) {
+      return handle.enter(write_pipeline->finalize
+      ).then([FNAME, this, result, &handle] {
+        DEBUG("H{} finish with {}", (void*)&handle, result);
+        auto new_committed_to = result.write_result.get_end_seq();
+        record_submitter.update_committed_to(new_committed_to);
+        return result;
+      });
+    });
+  }
+}
+
+SegmentedJournal::submit_record_ret
+SegmentedJournal::submit_record(
+    record_t &&record,
+    OrderingHandle &handle)
+{
+  LOG_PREFIX(SegmentedJournal::submit_record);
+  DEBUG("H{} {} start ...", (void*)&handle, record);
+  assert(write_pipeline);
+  auto expected_size = record_group_size_t(
+      record.size,
+      journal_segment_allocator.get_block_size()
+  ).get_encoded_length();
+  auto max_record_length = journal_segment_allocator.get_max_write_length();
+  if (expected_size > max_record_length) {
+    ERROR("H{} {} exceeds max record size {}",
+          (void*)&handle, record, max_record_length);
+    return crimson::ct_error::erange::make();
+  }
+
+  return do_submit_record(std::move(record), handle);
+}
+
 void SegmentedJournal::register_metrics()
 {
   LOG_PREFIX(Journal::register_metrics);
@@ -368,20 +444,20 @@ void SegmentedJournal::register_metrics()
 
 SegmentedJournal::RecordBatch::add_pending_ret
 SegmentedJournal::RecordBatch::add_pending(
+  const std::string& name,
   record_t&& record,
-  OrderingHandle& handle,
   extent_len_t block_size)
 {
   LOG_PREFIX(RecordBatch::add_pending);
   auto new_size = get_encoded_length_after(record, block_size);
   auto dlength_offset = pending.size.dlength;
-  TRACE("H{} batches={}, write_size={}, dlength_offset={} ...",
-        (void*)&handle,
+  TRACE("{} batches={}, write_size={}, dlength_offset={} ...",
+        name,
         pending.get_size() + 1,
         new_size.get_encoded_length(),
         dlength_offset);
   assert(state != state_t::SUBMITTING);
-  assert(can_batch(record, block_size).value() == new_size);
+  assert(evaluate_submit(record.size, block_size).submit_size == new_size);
 
   pending.push_back(
       std::move(record), block_size);
@@ -395,10 +471,10 @@ SegmentedJournal::RecordBatch::add_pending(
   state = state_t::PENDING;
 
   return io_promise->get_shared_future(
-  ).then([dlength_offset, FNAME, &handle
+  ).then([dlength_offset, FNAME, &name
          ](auto maybe_promise_result) -> add_pending_ret {
     if (!maybe_promise_result.has_value()) {
-      ERROR("H{} write failed", (void*)&handle);
+      ERROR("{} write failed", name);
       return crimson::ct_error::input_output_error::make();
     }
     auto write_result = maybe_promise_result->write_result;
@@ -407,7 +483,7 @@ SegmentedJournal::RecordBatch::add_pending(
           maybe_promise_result->mdlength + dlength_offset),
       write_result
     };
-    TRACE("H{} write finish with {}", (void*)&handle, submit_result);
+    TRACE("{} write finish with {}", name, submit_result);
     return add_pending_ret(
       add_pending_ertr::ready_future_marker{},
       submit_result);
@@ -466,7 +542,7 @@ SegmentedJournal::RecordBatch::submit_pending_fast(
   auto new_size = get_encoded_length_after(record, block_size);
   std::ignore = new_size;
   assert(state == state_t::EMPTY);
-  assert(can_batch(record, block_size).value() == new_size);
+  assert(evaluate_submit(record.size, block_size).submit_size == new_size);
 
   auto group = record_group_t(std::move(record), block_size);
   auto size = group.size;
@@ -481,17 +557,16 @@ SegmentedJournal::RecordSubmitter::RecordSubmitter(
   std::size_t batch_capacity,
   std::size_t batch_flush_size,
   double preferred_fullness,
-  SegmentAllocator& jsa)
+  SegmentAllocator& sa)
   : io_depth_limit{io_depth},
     preferred_fullness{preferred_fullness},
-    journal_segment_allocator{jsa},
+    segment_allocator{sa},
     batches(new RecordBatch[io_depth + 1])
 {
   LOG_PREFIX(RecordSubmitter);
-  INFO("Journal::RecordSubmitter: io_depth_limit={}, "
-       "batch_capacity={}, batch_flush_size={}, "
+  INFO("{} io_depth_limit={}, batch_capacity={}, batch_flush_size={}, "
        "preferred_fullness={}",
-       io_depth, batch_capacity,
+       get_name(), io_depth, batch_capacity,
        batch_flush_size, preferred_fullness);
   ceph_assert(io_depth > 0);
   ceph_assert(batch_capacity > 0);
@@ -505,40 +580,195 @@ SegmentedJournal::RecordSubmitter::RecordSubmitter(
   pop_free_batch();
 }
 
-SegmentedJournal::RecordSubmitter::submit_ret
-SegmentedJournal::RecordSubmitter::submit(
-  record_t&& record,
-  OrderingHandle& handle)
+bool SegmentedJournal::RecordSubmitter::is_available() const
 {
-  LOG_PREFIX(RecordSubmitter::submit);
-  DEBUG("H{} {} start ...", (void*)&handle, record);
-  assert(write_pipeline);
-  auto expected_size = record_group_size_t(
-      record.size,
-      journal_segment_allocator.get_block_size()
-  ).get_encoded_length();
-  auto max_record_length = journal_segment_allocator.get_max_write_length();
-  if (expected_size > max_record_length) {
-    ERROR("H{} {} exceeds max record size {}",
-          (void*)&handle, record, max_record_length);
-    return crimson::ct_error::erange::make();
+  auto ret = !wait_available_promise.has_value() &&
+             !has_io_error;
+#ifndef NDEBUG
+  if (ret) {
+    // invariants when available
+    ceph_assert(segment_allocator.can_write());
+    ceph_assert(p_current_batch != nullptr);
+    ceph_assert(!p_current_batch->is_submitting());
+    ceph_assert(!p_current_batch->needs_flush());
+    if (!p_current_batch->is_empty()) {
+      auto submit_length =
+        p_current_batch->get_submit_size().get_encoded_length();
+      ceph_assert(!segment_allocator.needs_roll(submit_length));
+    }
   }
+#endif
+  return ret;
+}
 
-  return do_submit(std::move(record), handle);
+SegmentedJournal::RecordSubmitter::wa_ertr::future<>
+SegmentedJournal::RecordSubmitter::wait_available()
+{
+  LOG_PREFIX(RecordSubmitter::wait_available);
+  assert(!is_available());
+  if (has_io_error) {
+    ERROR("{} I/O is failed before wait", get_name());
+    return crimson::ct_error::input_output_error::make();
+  }
+  return wait_available_promise->get_shared_future(
+  ).then([FNAME, this]() -> wa_ertr::future<> {
+    if (has_io_error) {
+      ERROR("{} I/O is failed after wait", get_name());
+      return crimson::ct_error::input_output_error::make();
+    }
+    return wa_ertr::now();
+  });
 }
 
-seastar::future<> SegmentedJournal::RecordSubmitter::flush(OrderingHandle &handle)
+SegmentedJournal::RecordSubmitter::action_t
+SegmentedJournal::RecordSubmitter::check_action(
+  const record_size_t& rsize) const
 {
-  LOG_PREFIX(RecordSubmitter::flush);
-  DEBUG("H{} flush", (void*)&handle);
-  return handle.enter(write_pipeline->device_submission
-  ).then([this, &handle] {
-    return handle.enter(write_pipeline->finalize);
-  }).then([FNAME, &handle] {
-    DEBUG("H{} flush done", (void*)&handle);
+  assert(is_available());
+  auto eval = p_current_batch->evaluate_submit(
+      rsize, segment_allocator.get_block_size());
+  if (segment_allocator.needs_roll(eval.submit_size.get_encoded_length())) {
+    return action_t::ROLL;
+  } else if (eval.is_full) {
+    return action_t::SUBMIT_FULL;
+  } else {
+    return action_t::SUBMIT_NOT_FULL;
+  }
+}
+
+SegmentedJournal::RecordSubmitter::roll_segment_ertr::future<>
+SegmentedJournal::RecordSubmitter::roll_segment()
+{
+  LOG_PREFIX(RecordSubmitter::roll_segment);
+  assert(is_available());
+  // #1 block concurrent submissions due to rolling
+  wait_available_promise = seastar::shared_promise<>();
+  assert(!wait_unfull_flush_promise.has_value());
+  return [FNAME, this] {
+    if (p_current_batch->is_pending()) {
+      if (state == state_t::FULL) {
+        DEBUG("{} wait flush ...", get_name());
+        wait_unfull_flush_promise = seastar::promise<>();
+        return wait_unfull_flush_promise->get_future();
+      } else { // IDLE/PENDING
+        DEBUG("{} flush", get_name());
+        flush_current_batch();
+        return seastar::now();
+      }
+    } else {
+      assert(p_current_batch->is_empty());
+      return seastar::now();
+    }
+  }().then_wrapped([FNAME, this](auto fut) {
+    if (fut.failed()) {
+      ERROR("{} rolling is skipped unexpectedly, available", get_name());
+      has_io_error = true;
+      wait_available_promise->set_value();
+      wait_available_promise.reset();
+      return roll_segment_ertr::now();
+    } else {
+      // start rolling in background
+      std::ignore = segment_allocator.roll(
+      ).safe_then([FNAME, this] {
+        // good
+        DEBUG("{} rolling done, available", get_name());
+        assert(!has_io_error);
+        wait_available_promise->set_value();
+        wait_available_promise.reset();
+      }).handle_error(
+        crimson::ct_error::all_same_way([FNAME, this](auto e) {
+          ERROR("{} got error {}, available", get_name(), e);
+          has_io_error = true;
+          wait_available_promise->set_value();
+          wait_available_promise.reset();
+        })
+      ).handle_exception([FNAME, this](auto e) {
+        ERROR("{} got exception {}, available", get_name(), e);
+        has_io_error = true;
+        wait_available_promise->set_value();
+        wait_available_promise.reset();
+      });
+      // wait for background rolling
+      return wait_available();
+    }
   });
 }
 
+SegmentedJournal::RecordSubmitter::submit_ret
+SegmentedJournal::RecordSubmitter::submit(record_t&& record)
+{
+  LOG_PREFIX(RecordSubmitter::submit);
+  assert(is_available());
+  assert(check_action(record.size) != action_t::ROLL);
+  auto eval = p_current_batch->evaluate_submit(
+      record.size, segment_allocator.get_block_size());
+  bool needs_flush = (
+      state == state_t::IDLE ||
+      eval.submit_size.get_fullness() > preferred_fullness ||
+      // RecordBatch::needs_flush()
+      eval.is_full ||
+      p_current_batch->get_num_records() + 1 >=
+        p_current_batch->get_batch_capacity());
+  if (p_current_batch->is_empty() &&
+      needs_flush &&
+      state != state_t::FULL) {
+    // fast path with direct write
+    increment_io();
+    auto [to_write, sizes] = p_current_batch->submit_pending_fast(
+      std::move(record),
+      segment_allocator.get_block_size(),
+      committed_to,
+      segment_allocator.get_nonce());
+    DEBUG("{} fast submit {}, committed_to={}, outstanding_io={} ...",
+          get_name(), sizes, committed_to, num_outstanding_io);
+    account_submission(1, sizes);
+    return segment_allocator.write(to_write
+    ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
+      return record_locator_t{
+        write_result.start_seq.offset.add_offset(mdlength),
+        write_result
+      };
+    }).finally([this] {
+      decrement_io_with_flush();
+    });
+  }
+  // indirect batched write
+  auto write_fut = p_current_batch->add_pending(
+    get_name(),
+    std::move(record),
+    segment_allocator.get_block_size());
+  if (needs_flush) {
+    if (state == state_t::FULL) {
+      // #2 block concurrent submissions due to lack of resource
+      DEBUG("{} added with {} pending, outstanding_io={}, unavailable, wait flush ...",
+            get_name(),
+            p_current_batch->get_num_records(),
+            num_outstanding_io);
+      wait_available_promise = seastar::shared_promise<>();
+      assert(!wait_unfull_flush_promise.has_value());
+      wait_unfull_flush_promise = seastar::promise<>();
+      // flush and mark available in background
+      std::ignore = wait_unfull_flush_promise->get_future(
+      ).finally([FNAME, this] {
+        DEBUG("{} flush done, available", get_name());
+        wait_available_promise->set_value();
+        wait_available_promise.reset();
+      });
+    } else {
+      DEBUG("{} added pending, flush", get_name());
+      flush_current_batch();
+    }
+  } else {
+    // will flush later
+    DEBUG("{} added with {} pending, outstanding_io={}",
+          get_name(),
+          p_current_batch->get_num_records(),
+          num_outstanding_io);
+    assert(!p_current_batch->needs_flush());
+  }
+  return write_fut;
+}
+
 void SegmentedJournal::RecordSubmitter::update_state()
 {
   if (num_outstanding_io == 0) {
@@ -556,21 +786,32 @@ void SegmentedJournal::RecordSubmitter::decrement_io_with_flush()
 {
   LOG_PREFIX(RecordSubmitter::decrement_io_with_flush);
   assert(num_outstanding_io > 0);
-  --num_outstanding_io;
-#ifndef NDEBUG
   auto prv_state = state;
-#endif
+  --num_outstanding_io;
   update_state();
 
-  if (wait_submit_promise.has_value()) {
-    DEBUG("wait resolved");
-    assert(prv_state == state_t::FULL);
-    wait_submit_promise->set_value();
-    wait_submit_promise.reset();
+  if (prv_state == state_t::FULL) {
+    if (wait_unfull_flush_promise.has_value()) {
+      DEBUG("{} flush, resolve wait_unfull_flush_promise", get_name());
+      assert(!p_current_batch->is_empty());
+      assert(wait_available_promise.has_value());
+      flush_current_batch();
+      wait_unfull_flush_promise->set_value();
+      wait_unfull_flush_promise.reset();
+      return;
+    }
+  } else {
+    assert(!wait_unfull_flush_promise.has_value());
   }
 
-  if (!p_current_batch->is_empty()) {
-    TRACE("flush");
+  auto needs_flush = (
+      !p_current_batch->is_empty() && (
+        state == state_t::IDLE ||
+        p_current_batch->get_submit_size().get_fullness() > preferred_fullness ||
+        p_current_batch->needs_flush()
+      ));
+  if (needs_flush) {
+    DEBUG("{} flush", get_name());
     flush_current_batch();
   }
 }
@@ -606,149 +847,26 @@ void SegmentedJournal::RecordSubmitter::flush_current_batch()
   increment_io();
   auto num = p_batch->get_num_records();
   auto [to_write, sizes] = p_batch->encode_batch(
-    journal_committed_to, journal_segment_allocator.get_nonce());
-  DEBUG("{} records, {}, committed_to={}, outstanding_io={} ...",
-        num, sizes, journal_committed_to, num_outstanding_io);
+    committed_to, segment_allocator.get_nonce());
+  DEBUG("{} {} records, {}, committed_to={}, outstanding_io={} ...",
+        get_name(), num, sizes, committed_to, num_outstanding_io);
   account_submission(num, sizes);
-  std::ignore = journal_segment_allocator.write(to_write
+  std::ignore = segment_allocator.write(to_write
   ).safe_then([this, p_batch, FNAME, num, sizes=sizes](auto write_result) {
-    TRACE("{} records, {}, write done with {}", num, sizes, write_result);
+    TRACE("{} {} records, {}, write done with {}",
+          get_name(), num, sizes, write_result);
     finish_submit_batch(p_batch, write_result);
   }).handle_error(
     crimson::ct_error::all_same_way([this, p_batch, FNAME, num, sizes=sizes](auto e) {
-      ERROR("{} records, {}, got error {}", num, sizes, e);
+      ERROR("{} {} records, {}, got error {}",
+            get_name(), num, sizes, e);
       finish_submit_batch(p_batch, std::nullopt);
     })
   ).handle_exception([this, p_batch, FNAME, num, sizes=sizes](auto e) {
-    ERROR("{} records, {}, got exception {}", num, sizes, e);
+    ERROR("{} {} records, {}, got exception {}",
+          get_name(), num, sizes, e);
     finish_submit_batch(p_batch, std::nullopt);
   });
 }
 
-SegmentedJournal::RecordSubmitter::submit_pending_ret
-SegmentedJournal::RecordSubmitter::submit_pending(
-  record_t&& record,
-  OrderingHandle& handle,
-  bool flush)
-{
-  LOG_PREFIX(RecordSubmitter::submit_pending);
-  assert(!p_current_batch->is_submitting());
-  stats.record_batch_stats.increment(
-      p_current_batch->get_num_records() + 1);
-  bool do_flush = (flush || state == state_t::IDLE);
-  auto write_fut = [this, do_flush, FNAME, record=std::move(record), &handle]() mutable {
-    if (do_flush && p_current_batch->is_empty()) {
-      // fast path with direct write
-      increment_io();
-      auto [to_write, sizes] = p_current_batch->submit_pending_fast(
-        std::move(record),
-        journal_segment_allocator.get_block_size(),
-        journal_committed_to,
-        journal_segment_allocator.get_nonce());
-      DEBUG("H{} fast submit {}, committed_to={}, outstanding_io={} ...",
-            (void*)&handle, sizes, journal_committed_to, num_outstanding_io);
-      account_submission(1, sizes);
-      return journal_segment_allocator.write(to_write
-      ).safe_then([mdlength = sizes.get_mdlength()](auto write_result) {
-        return record_locator_t{
-          write_result.start_seq.offset.add_offset(mdlength),
-          write_result
-        };
-      }).finally([this] {
-        decrement_io_with_flush();
-      });
-    } else {
-      // indirect write with or without the existing pending records
-      auto write_fut = p_current_batch->add_pending(
-        std::move(record),
-        handle,
-        journal_segment_allocator.get_block_size());
-      if (do_flush) {
-        DEBUG("H{} added pending and flush", (void*)&handle);
-        flush_current_batch();
-      } else {
-        DEBUG("H{} added with {} pending",
-              (void*)&handle, p_current_batch->get_num_records());
-      }
-      return write_fut;
-    }
-  }();
-  return handle.enter(write_pipeline->device_submission
-  ).then([write_fut=std::move(write_fut)]() mutable {
-    return std::move(write_fut);
-  }).safe_then([this, FNAME, &handle](auto submit_result) {
-    return handle.enter(write_pipeline->finalize
-    ).then([this, FNAME, submit_result, &handle] {
-      DEBUG("H{} finish with {}", (void*)&handle, submit_result);
-      auto new_committed_to = submit_result.write_result.get_end_seq();
-      assert(journal_committed_to == JOURNAL_SEQ_NULL ||
-             journal_committed_to <= new_committed_to);
-      journal_committed_to = new_committed_to;
-      return submit_result;
-    });
-  });
-}
-
-SegmentedJournal::RecordSubmitter::do_submit_ret
-SegmentedJournal::RecordSubmitter::do_submit(
-  record_t&& record,
-  OrderingHandle& handle)
-{
-  LOG_PREFIX(RecordSubmitter::do_submit);
-  TRACE("H{} outstanding_io={}/{} ...",
-        (void*)&handle, num_outstanding_io, io_depth_limit);
-  assert(!p_current_batch->is_submitting());
-  if (state <= state_t::PENDING) {
-    // can increment io depth
-    assert(!wait_submit_promise.has_value());
-    auto maybe_new_size = p_current_batch->can_batch(
-        record, journal_segment_allocator.get_block_size());
-    if (!maybe_new_size.has_value() ||
-        (maybe_new_size->get_encoded_length() >
-         journal_segment_allocator.get_max_write_length())) {
-      TRACE("H{} flush", (void*)&handle);
-      assert(p_current_batch->is_pending());
-      flush_current_batch();
-      return do_submit(std::move(record), handle);
-    } else if (journal_segment_allocator.needs_roll(
-          maybe_new_size->get_encoded_length())) {
-      if (p_current_batch->is_pending()) {
-        TRACE("H{} flush and roll", (void*)&handle);
-        flush_current_batch();
-      } else {
-        TRACE("H{} roll", (void*)&handle);
-      }
-      return journal_segment_allocator.roll(
-      ).safe_then([this, record=std::move(record), &handle]() mutable {
-        return do_submit(std::move(record), handle);
-      });
-    } else {
-      bool flush = (maybe_new_size->get_fullness() > preferred_fullness ?
-                    true : false);
-      return submit_pending(std::move(record), handle, flush);
-    }
-  }
-
-  assert(state == state_t::FULL);
-  // cannot increment io depth
-  auto maybe_new_size = p_current_batch->can_batch(
-      record, journal_segment_allocator.get_block_size());
-  if (!maybe_new_size.has_value() ||
-      (maybe_new_size->get_encoded_length() >
-       journal_segment_allocator.get_max_write_length()) ||
-      journal_segment_allocator.needs_roll(
-        maybe_new_size->get_encoded_length())) {
-    if (!wait_submit_promise.has_value()) {
-      wait_submit_promise = seastar::promise<>();
-    }
-    DEBUG("H{} wait ...", (void*)&handle);
-    return wait_submit_promise->get_future(
-    ).then([this, record=std::move(record), &handle]() mutable {
-      return do_submit(std::move(record), handle);
-    });
-  } else {
-    return submit_pending(std::move(record), handle, false);
-  }
-}
-
 }
index 17e4056e7156936ab5de84c39c5a21f6cab84dca..d8cf773e503245474b70aae8c7b012b3acb6df80 100644 (file)
@@ -42,22 +42,22 @@ public:
 
   submit_record_ret submit_record(
     record_t &&record,
-    OrderingHandle &handle
-  ) final {
-    return record_submitter.submit(std::move(record), handle);
-  }
+    OrderingHandle &handle) final;
 
-  seastar::future<> flush(OrderingHandle &handle) final {
-    return record_submitter.flush(handle);
-  }
+  seastar::future<> flush(OrderingHandle &handle) final;
 
   replay_ret replay(delta_handler_t &&delta_handler) final;
 
-  void set_write_pipeline(WritePipelinewrite_pipeline) final {
-    record_submitter.set_write_pipeline(write_pipeline);
+  void set_write_pipeline(WritePipeline *_write_pipeline) final {
+    write_pipeline = _write_pipeline;
   }
 
 private:
+  submit_record_ret do_submit_record(
+    record_t &&record,
+    OrderingHandle &handle
+  );
+
   class RecordBatch {
     enum class state_t {
       EMPTY = 0,
@@ -92,19 +92,39 @@ private:
       return pending.get_size();
     }
 
-    // return the expected write sizes if allows to batch,
-    // otherwise, return nullopt
-    std::optional<record_group_size_t> can_batch(
-        const record_t& record,
-        extent_len_t block_size) const {
+    std::size_t get_batch_capacity() const {
+      return batch_capacity;
+    }
+
+    const record_group_size_t& get_submit_size() const {
+      assert(state != state_t::EMPTY);
+      return pending.size;
+    }
+
+    bool needs_flush() const {
       assert(state != state_t::SUBMITTING);
-      if (pending.get_size() >= batch_capacity ||
-          (pending.get_size() > 0 &&
-           pending.size.get_encoded_length() > batch_flush_size)) {
+      assert(pending.get_size() <= batch_capacity);
+      if (state == state_t::EMPTY) {
+        return false;
+      } else {
         assert(state == state_t::PENDING);
-        return std::nullopt;
+        return (pending.get_size() >= batch_capacity ||
+                pending.size.get_encoded_length() > batch_flush_size);
       }
-      return get_encoded_length_after(record, block_size);
+    }
+
+    struct evaluation_t {
+      record_group_size_t submit_size;
+      bool is_full;
+    };
+    evaluation_t evaluate_submit(
+        const record_size_t& rsize,
+        extent_len_t block_size) const {
+      assert(!needs_flush());
+      auto submit_size = pending.size.get_encoded_length_after(
+          rsize, block_size);
+      bool is_full = submit_size.get_encoded_length() > batch_flush_size;
+      return {submit_size, is_full};
     }
 
     void initialize(std::size_t i,
@@ -125,8 +145,8 @@ private:
     using add_pending_ertr = SegmentAllocator::write_ertr;
     using add_pending_ret = add_pending_ertr::future<record_locator_t>;
     add_pending_ret add_pending(
+        const std::string& name,
         record_t&&,
-        OrderingHandle&,
         extent_len_t block_size);
 
     // Encode the batched records for write.
@@ -195,6 +215,9 @@ private:
       }
     };
 
+    using base_ertr = crimson::errorator<
+        crimson::ct_error::input_output_error>;
+
   public:
     RecordSubmitter(std::size_t io_depth,
                     std::size_t batch_capacity,
@@ -202,6 +225,10 @@ private:
                     double preferred_fullness,
                     SegmentAllocator&);
 
+    const std::string& get_name() const {
+      return segment_allocator.get_name();
+    }
+
     grouped_io_stats get_record_batch_stats() const {
       return stats.record_batch_stats;
     }
@@ -223,20 +250,45 @@ private:
     }
 
     journal_seq_t get_committed_to() const {
-      return journal_committed_to;
+      return committed_to;
     }
 
     void reset_stats() {
       stats = {};
     }
 
-    void set_write_pipeline(WritePipeline *_write_pipeline) {
-      write_pipeline = _write_pipeline;
-    }
+    // whether is available to submit a record
+    bool is_available() const;
 
-    using submit_ret = Journal::submit_record_ret;
-    submit_ret submit(record_t&&, OrderingHandle&);
-    seastar::future<> flush(OrderingHandle &handle);
+    // wait for available if cannot submit, should check is_available() again
+    // when the future is resolved.
+    using wa_ertr = base_ertr;
+    wa_ertr::future<> wait_available();
+
+    // when available, check for the submit action
+    // according to the pending record size
+    enum class action_t {
+      ROLL,
+      SUBMIT_FULL,
+      SUBMIT_NOT_FULL
+    };
+    action_t check_action(const record_size_t&) const;
+
+    // when available, roll the segment if needed
+    using roll_segment_ertr = base_ertr;
+    roll_segment_ertr::future<> roll_segment();
+
+    // when available, submit the record if possible
+    using submit_ertr = base_ertr;
+    using submit_ret = submit_ertr::future<record_locator_t>;
+    submit_ret submit(record_t&&);
+
+    void update_committed_to(const journal_seq_t& new_committed_to) {
+      assert(new_committed_to != JOURNAL_SEQ_NULL);
+      assert(committed_to == JOURNAL_SEQ_NULL ||
+             committed_to <= new_committed_to);
+      committed_to = new_committed_to;
+    }
 
   private:
     void update_state();
@@ -265,32 +317,26 @@ private:
 
     void flush_current_batch();
 
-    using submit_pending_ertr = SegmentAllocator::write_ertr;
-    using submit_pending_ret = submit_pending_ertr::future<
-      record_locator_t>;
-    submit_pending_ret submit_pending(
-        record_t&&, OrderingHandle &handle, bool flush);
-
-    using do_submit_ret = submit_pending_ret;
-    do_submit_ret do_submit(
-        record_t&&, OrderingHandle&);
-
     state_t state = state_t::IDLE;
     std::size_t num_outstanding_io = 0;
     std::size_t io_depth_limit;
     double preferred_fullness;
 
-    WritePipeline* write_pipeline = nullptr;
-    SegmentAllocator& journal_segment_allocator;
+    SegmentAllocator& segment_allocator;
     // committed_to may be in a previous journal segment
-    journal_seq_t journal_committed_to = JOURNAL_SEQ_NULL;
+    journal_seq_t committed_to = JOURNAL_SEQ_NULL;
 
     std::unique_ptr<RecordBatch[]> batches;
-    std::size_t current_batch_index;
     // should not be nullptr after constructed
     RecordBatch* p_current_batch = nullptr;
     seastar::circular_buffer<RecordBatch*> free_batch_ptrs;
-    std::optional<seastar::promise<> > wait_submit_promise;
+
+    // blocked for rolling or lack of resource
+    std::optional<seastar::shared_promise<> > wait_available_promise;
+    bool has_io_error = false;
+    // when needs flush but io depth is full,
+    // wait for decrement_io_with_flush()
+    std::optional<seastar::promise<> > wait_unfull_flush_promise;
 
     struct {
       grouped_io_stats record_batch_stats;
@@ -306,6 +352,7 @@ private:
   RecordSubmitter record_submitter;
   ExtentReader& scanner;
   seastar::metrics::metric_group metrics;
+  WritePipeline* write_pipeline = nullptr;
 
   /// read journal segment headers from scanner
   using find_journal_segments_ertr = crimson::errorator<