From c9e423facea79d42f0496264f267adee5d911b87 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Tue, 23 Jul 2024 17:11:44 +0800 Subject: [PATCH] crimson/os/seastore: wait ool writes in DeviceSubmission phase So that it is moved out of the collection lock. Signed-off-by: Yingxin Cheng --- .../os/seastore/extent_placement_manager.cc | 70 +++++++++---------- .../os/seastore/extent_placement_manager.h | 4 +- .../journal/circular_bounded_journal.cc | 4 +- .../os/seastore/journal/segmented_journal.cc | 4 +- src/crimson/os/seastore/ordering_handle.h | 26 +++++++ 5 files changed, 68 insertions(+), 40 deletions(-) diff --git a/src/crimson/os/seastore/extent_placement_manager.cc b/src/crimson/os/seastore/extent_placement_manager.cc index 1ac7c68484b..a958fbdfd02 100644 --- a/src/crimson/os/seastore/extent_placement_manager.cc +++ b/src/crimson/os/seastore/extent_placement_manager.cc @@ -28,8 +28,7 @@ SegmentedOolWriter::SegmentedOolWriter( { } -SegmentedOolWriter::alloc_write_ertr::future<> -SegmentedOolWriter::write_record( +void SegmentedOolWriter::write_record( Transaction& t, record_t&& record, std::list&& extents, @@ -63,15 +62,20 @@ SegmentedOolWriter::write_record( extent_addr = extent_addr.as_seg_paddr().add_offset( extent->get_length()); } - return std::move(ret.future - ).safe_then([this, FNAME, &t, - record_base=ret.record_base_regardless_md - ](record_locator_t ret) { - TRACET("{} finish {}=={}", - t, segment_allocator.get_name(), ret, record_base); - // ool won't write metadata, so the paddrs must be equal - assert(ret.record_block_base == record_base.offset); + // t might be destructed inside write_future + auto write_future = seastar::with_gate(write_guard, + [this, FNAME, tid=t.get_trans_id(), + record_base=ret.record_base_regardless_md, + submit_fut=std::move(ret.future)]() mutable { + return std::move(submit_fut + ).safe_then([this, FNAME, tid, record_base](record_locator_t ret) { + TRACE("trans.{} {} finish {}=={}", + tid, segment_allocator.get_name(), ret, record_base); + // ool won't write metadata, so the paddrs must be equal + assert(ret.record_block_base == record_base.offset); + }); }); + t.get_handle().add_write_future(std::move(write_future)); } SegmentedOolWriter::alloc_write_iertr::future<> @@ -108,19 +112,15 @@ SegmentedOolWriter::do_write( DEBUGT("{} extents={} submit {} extents and roll, unavailable ...", t, segment_allocator.get_name(), extents.size(), num_extents); - auto fut_write = alloc_write_ertr::now(); if (num_extents > 0) { assert(record_submitter.check_action(record.size) != action_t::ROLL); - fut_write = write_record( + write_record( t, std::move(record), std::move(pending_extents), true/* with_atomic_roll_segment */); } return trans_intr::make_interruptible( - record_submitter.roll_segment( - ).safe_then([fut_write=std::move(fut_write)]() mutable { - return std::move(fut_write); - }) + record_submitter.roll_segment() ).si_then([this, &t, &extents] { return do_write(t, extents); }); @@ -151,15 +151,12 @@ SegmentedOolWriter::do_write( DEBUGT("{} extents={} submit {} extents ...", t, segment_allocator.get_name(), extents.size(), pending_extents.size()); - return trans_intr::make_interruptible( - write_record(t, std::move(record), std::move(pending_extents)) - ).si_then([this, &t, &extents] { - if (!extents.empty()) { - return do_write(t, extents); - } else { - return alloc_write_iertr::now(); - } - }); + write_record(t, std::move(record), std::move(pending_extents)); + if (!extents.empty()) { + return do_write(t, extents); + } else { + return alloc_write_iertr::now(); + } } // SUBMIT_NOT_FULL: evaluate the next extent } @@ -169,8 +166,8 @@ SegmentedOolWriter::do_write( t, segment_allocator.get_name(), num_extents); assert(num_extents > 0); - return trans_intr::make_interruptible( - write_record(t, std::move(record), std::move(pending_extents))); + write_record(t, std::move(record), std::move(pending_extents)); + return alloc_write_iertr::now(); } SegmentedOolWriter::alloc_write_iertr::future<> @@ -994,13 +991,11 @@ RandomBlockOolWriter::alloc_write_ool_extents( if (extents.empty()) { return alloc_write_iertr::now(); } - return seastar::with_gate(write_guard, [this, &t, &extents] { - return do_write(t, extents); - }); + do_write(t, extents); + return alloc_write_iertr::now(); } -RandomBlockOolWriter::alloc_write_iertr::future<> -RandomBlockOolWriter::do_write( +void RandomBlockOolWriter::do_write( Transaction& t, std::list& extents) { @@ -1053,8 +1048,10 @@ RandomBlockOolWriter::do_write( } } - return trans_intr::make_interruptible( - seastar::do_with(std::move(writes), + // t might be destructed inside write_future + auto write_future = seastar::with_gate(write_guard, + [writes=std::move(writes)]() mutable { + return seastar::do_with(std::move(writes), [](auto& writes) { return crimson::do_for_each(writes, [](auto& info) { @@ -1065,8 +1062,9 @@ RandomBlockOolWriter::do_write( "Invalid error when writing record"} ); }); - }) - ); + }); + }); + t.get_handle().add_write_future(std::move(write_future)); } } diff --git a/src/crimson/os/seastore/extent_placement_manager.h b/src/crimson/os/seastore/extent_placement_manager.h index 7c4110c053e..0f2d55ef04a 100644 --- a/src/crimson/os/seastore/extent_placement_manager.h +++ b/src/crimson/os/seastore/extent_placement_manager.h @@ -115,7 +115,7 @@ private: Transaction& t, std::list &extent); - alloc_write_ertr::future<> write_record( + void write_record( Transaction& t, record_t&& record, std::list &&extents, @@ -195,7 +195,7 @@ private: ceph::bufferptr bp; RandomBlockManager* rbm; }; - alloc_write_iertr::future<> do_write( + void do_write( Transaction& t, std::list &extent); diff --git a/src/crimson/os/seastore/journal/circular_bounded_journal.cc b/src/crimson/os/seastore/journal/circular_bounded_journal.cc index 9ee8b1b997f..4da70f72c4c 100644 --- a/src/crimson/os/seastore/journal/circular_bounded_journal.cc +++ b/src/crimson/os/seastore/journal/circular_bounded_journal.cc @@ -97,7 +97,9 @@ CircularBoundedJournal::do_submit_record( auto submit_ret = record_submitter.submit(std::move(record)); // submit_ret.record_base_regardless_md is wrong for journaling return handle.enter(write_pipeline->device_submission - ).then([submit_fut=std::move(submit_ret.future)]() mutable { + ).then([&handle] { + return handle.take_write_future(); + }).safe_then([submit_fut=std::move(submit_ret.future)]() mutable { return std::move(submit_fut); }).safe_then([FNAME, this, &handle](record_locator_t result) { return handle.enter(write_pipeline->finalize diff --git a/src/crimson/os/seastore/journal/segmented_journal.cc b/src/crimson/os/seastore/journal/segmented_journal.cc index eca45f113c2..81e8c5a62c7 100644 --- a/src/crimson/os/seastore/journal/segmented_journal.cc +++ b/src/crimson/os/seastore/journal/segmented_journal.cc @@ -396,7 +396,9 @@ SegmentedJournal::do_submit_record( auto submit_ret = record_submitter.submit(std::move(record)); // submit_ret.record_base_regardless_md is wrong for journaling return handle.enter(write_pipeline->device_submission - ).then([submit_fut=std::move(submit_ret.future)]() mutable { + ).then([&handle] { + return handle.take_write_future(); + }).safe_then([submit_fut=std::move(submit_ret.future)]() mutable { return std::move(submit_fut); }).safe_then([FNAME, this, &handle](record_locator_t result) { return handle.enter(write_pipeline->finalize diff --git a/src/crimson/os/seastore/ordering_handle.h b/src/crimson/os/seastore/ordering_handle.h index 8ab8442acd9..cfa86205875 100644 --- a/src/crimson/os/seastore/ordering_handle.h +++ b/src/crimson/os/seastore/ordering_handle.h @@ -122,6 +122,11 @@ struct OrderingHandle { std::unique_ptr op; seastar::shared_mutex *collection_ordering_lock = nullptr; + using write_ertr = crimson::errorator< + crimson::ct_error::input_output_error>; + // the pending writes that should complete at DeviceSubmission phase + write_ertr::future<> write_future = write_ertr::now(); + // in the future we might add further constructors / template to type // erasure while extracting the location of tracking events. OrderingHandle(std::unique_ptr op) : op(std::move(op)) {} @@ -144,6 +149,20 @@ struct OrderingHandle { } } + void add_write_future(write_ertr::future<>&& fut) { + auto appended = std::move(write_future + ).safe_then([fut=std::move(fut)]() mutable { + return std::move(fut); + }); + write_future = std::move(appended); + } + + write_ertr::future<> take_write_future() { + auto ret = std::move(write_future); + write_future = write_ertr::now(); + return ret; + } + template seastar::future<> enter(T &t) { return op->enter(t); @@ -151,6 +170,10 @@ struct OrderingHandle { void exit() { op->exit(); + + auto ignore_writes = std::move(write_future); + std::ignore = ignore_writes; + write_future = write_ertr::now(); } seastar::future<> complete() { @@ -159,6 +182,9 @@ struct OrderingHandle { ~OrderingHandle() { maybe_release_collection_lock(); + + assert(write_future.available()); + assert(!write_future.failed()); } }; -- 2.39.5