From 96ba8abd48a57fc6be12c266f5fe1e78478bbf81 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Mon, 27 Feb 2023 16:14:16 +0800 Subject: [PATCH] crimson/os/seastore/journal: allow pending i/o in a full record_submitter when rolling the segment Fixes: https://tracker.ceph.com/issues/58824 Signed-off-by: Xuehan Xu Signed-off-by: Yingxin Cheng --- .../os/seastore/extent_placement_manager.cc | 10 +++-- .../os/seastore/extent_placement_manager.h | 3 +- .../os/seastore/journal/segment_allocator.cc | 37 ++++++++++++------- .../os/seastore/journal/segment_allocator.h | 2 +- 4 files changed, 34 insertions(+), 18 deletions(-) diff --git a/src/crimson/os/seastore/extent_placement_manager.cc b/src/crimson/os/seastore/extent_placement_manager.cc index 518a64be59e26..6805bde616a89 100644 --- a/src/crimson/os/seastore/extent_placement_manager.cc +++ b/src/crimson/os/seastore/extent_placement_manager.cc @@ -32,7 +32,8 @@ SegmentedOolWriter::alloc_write_ertr::future<> SegmentedOolWriter::write_record( Transaction& t, record_t&& record, - std::list&& extents) + std::list&& extents, + bool with_atomic_roll_segment) { LOG_PREFIX(SegmentedOolWriter::write_record); assert(extents.size()); @@ -46,7 +47,9 @@ SegmentedOolWriter::write_record( stats.md_bytes += record.size.get_raw_mdlength(); stats.num_records += 1; - return record_submitter.submit(std::move(record) + return record_submitter.submit( + std::move(record), + with_atomic_roll_segment ).safe_then([this, FNAME, &t, extents=std::move(extents) ](record_locator_t ret) mutable { DEBUGT("{} finish with {} and {} extents", @@ -101,7 +104,8 @@ SegmentedOolWriter::do_write( assert(record_submitter.check_action(record.size) != action_t::ROLL); fut_write = write_record( - t, std::move(record), std::move(pending_extents)); + t, std::move(record), std::move(pending_extents), + true/* with_atomic_roll_segment */); } return trans_intr::make_interruptible( record_submitter.roll_segment( diff --git a/src/crimson/os/seastore/extent_placement_manager.h b/src/crimson/os/seastore/extent_placement_manager.h index e4e59de06151c..7393dff3689e9 100644 --- a/src/crimson/os/seastore/extent_placement_manager.h +++ b/src/crimson/os/seastore/extent_placement_manager.h @@ -84,7 +84,8 @@ private: alloc_write_ertr::future<> write_record( Transaction& t, record_t&& record, - std::list &&extents); + std::list &&extents, + bool with_atomic_roll_segment=false); journal::SegmentAllocator segment_allocator; journal::RecordSubmitter record_submitter; diff --git a/src/crimson/os/seastore/journal/segment_allocator.cc b/src/crimson/os/seastore/journal/segment_allocator.cc index f2cf9fd2dccc3..05d29d7532463 100644 --- a/src/crimson/os/seastore/journal/segment_allocator.cc +++ b/src/crimson/os/seastore/journal/segment_allocator.cc @@ -412,16 +412,18 @@ bool RecordSubmitter::is_available() const !has_io_error; #ifndef NDEBUG if (ret) { - // invariants when available + // unconditional invariants ceph_assert(segment_allocator.can_write()); ceph_assert(p_current_batch != nullptr); ceph_assert(!p_current_batch->is_submitting()); + // the current batch accepts a further write ceph_assert(!p_current_batch->needs_flush()); if (!p_current_batch->is_empty()) { auto submit_length = p_current_batch->get_submit_size().get_encoded_length(); ceph_assert(!segment_allocator.needs_roll(submit_length)); } + // I'm not rolling } #endif return ret; @@ -466,7 +468,8 @@ RecordSubmitter::roll_segment_ertr::future<> RecordSubmitter::roll_segment() { LOG_PREFIX(RecordSubmitter::roll_segment); - assert(is_available()); + assert(p_current_batch->needs_flush() || + is_available()); // #1 block concurrent submissions due to rolling wait_available_promise = seastar::shared_promise<>(); assert(!wait_unfull_flush_promise.has_value()); @@ -521,7 +524,9 @@ RecordSubmitter::roll_segment() } RecordSubmitter::submit_ret -RecordSubmitter::submit(record_t&& record) +RecordSubmitter::submit( + record_t&& record, + bool with_atomic_roll_segment) { LOG_PREFIX(RecordSubmitter::submit); assert(is_available()); @@ -574,16 +579,22 @@ RecordSubmitter::submit(record_t&& record) get_name(), p_current_batch->get_num_records(), num_outstanding_io); - wait_available_promise = seastar::shared_promise<>(); - assert(!wait_unfull_flush_promise.has_value()); - wait_unfull_flush_promise = seastar::promise<>(); - // flush and mark available in background - std::ignore = wait_unfull_flush_promise->get_future( - ).finally([FNAME, this] { - DEBUG("{} flush done, available", get_name()); - wait_available_promise->set_value(); - wait_available_promise.reset(); - }); + if (with_atomic_roll_segment) { + // wait_available_promise and wait_unfull_flush_promise + // need to be delegated to the follow-up atomic roll_segment(); + assert(p_current_batch->is_pending()); + } else { + wait_available_promise = seastar::shared_promise<>(); + assert(!wait_unfull_flush_promise.has_value()); + wait_unfull_flush_promise = seastar::promise<>(); + // flush and mark available in background + std::ignore = wait_unfull_flush_promise->get_future( + ).finally([FNAME, this] { + DEBUG("{} flush done, available", get_name()); + wait_available_promise->set_value(); + wait_available_promise.reset(); + }); + } } else { DEBUG("{} added pending, flush", get_name()); flush_current_batch(); diff --git a/src/crimson/os/seastore/journal/segment_allocator.h b/src/crimson/os/seastore/journal/segment_allocator.h index 7faf2e1c45973..e113cd2aecf7c 100644 --- a/src/crimson/os/seastore/journal/segment_allocator.h +++ b/src/crimson/os/seastore/journal/segment_allocator.h @@ -348,7 +348,7 @@ public: // when available, submit the record if possible using submit_ertr = base_ertr; using submit_ret = submit_ertr::future; - submit_ret submit(record_t&&); + submit_ret submit(record_t&&, bool with_atomic_roll_segment=false); void update_committed_to(const journal_seq_t& new_committed_to) { assert(new_committed_to != JOURNAL_SEQ_NULL); -- 2.39.5