From d2e3bb6f7aa568e9d452acd7625d18c66a0d34ce Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Tue, 15 Feb 2022 22:26:18 +0800 Subject: [PATCH] crimson/os/seastore/epm: replace condition_variable by a shared_promise Signed-off-by: Yingxin Cheng --- .../os/seastore/extent_placement_manager.cc | 192 ++++++++---------- .../os/seastore/extent_placement_manager.h | 15 +- 2 files changed, 93 insertions(+), 114 deletions(-) diff --git a/src/crimson/os/seastore/extent_placement_manager.cc b/src/crimson/os/seastore/extent_placement_manager.cc index 3d92d4b329e6..5ba17f6038bb 100644 --- a/src/crimson/os/seastore/extent_placement_manager.cc +++ b/src/crimson/os/seastore/extent_placement_manager.cc @@ -38,6 +38,7 @@ SegmentedAllocator::Writer::_write( ool_record_t& record) { LOG_PREFIX(SegmentedAllocator::Writer::_write); + record.set_base(allocated_to); auto record_size = record.get_encoded_record_length(); allocated_to += record_size.get_encoded_length(); segment_provider.update_segment_avail_bytes( @@ -65,119 +66,103 @@ SegmentedAllocator::Writer::_write( stats.data_bytes += record_size.dlength; stats.num_records += 1; + for (auto& ool_extent : record.get_extents()) { + auto& lextent = ool_extent.get_lextent(); + auto paddr = ool_extent.get_ool_paddr(); + TRACET("ool extent written at {} -- {}", t, *lextent, paddr); + lextent->hint = placement_hint_t::NUM_HINTS; // invalidate hint + t.mark_delayed_extent_ool(lextent, paddr); + } + return trans_intr::make_interruptible( current_segment->write(record.get_base(), bl - ).safe_then([FNAME, &t, &record, cs=current_segment] { - DEBUGT("written {} {}", - t, cs->get_segment_id(), record.get_base()); + ).safe_then([FNAME, &t, base=record.get_base(), cs=current_segment] { + DEBUGT("written {} {}", t, cs->get_segment_id(), base); }) - ).si_then([FNAME, &record, &t] { - for (auto& ool_extent : record.get_extents()) { - auto& lextent = ool_extent.get_lextent(); - auto paddr = ool_extent.get_ool_paddr(); - TRACET("ool extent written at {} -- {}", t, *lextent, paddr); - lextent->hint = placement_hint_t::NUM_HINTS; // invalidate hint - t.mark_delayed_extent_ool(lextent, paddr); - } - record.clear(); - }); -} - -void SegmentedAllocator::Writer::add_extent_to_write( - ool_record_t& record, - LogicalCachedExtentRef& extent) { - logger().debug( - "SegmentedAllocator::Writer::add_extent_to_write: " - "add extent {} to record", - extent); - extent->prepare_write(); - record.add_extent(extent); + ); } SegmentedAllocator::Writer::write_iertr::future<> -SegmentedAllocator::Writer::write( +SegmentedAllocator::Writer::do_write( Transaction& t, std::list& extents) { - auto write_func = [this, &extents, &t] { - return seastar::do_with(ool_record_t(segment_manager.get_block_size()), - [this, &extents, &t](auto& record) { - return trans_intr::repeat([this, &record, &t, &extents]() - -> write_iertr::future { - if (extents.empty()) { - return seastar::make_ready_future< - seastar::stop_iteration>(seastar::stop_iteration::yes); + LOG_PREFIX(SegmentedAllocator::Writer::do_write); + assert(!extents.empty()); + if (roll_promise.has_value()) { + return roll_promise->get_shared_future( + ).then([this, &t, &extents] { + return do_write(t, extents); + }); + } + assert(current_segment); + + ool_record_t record(segment_manager.get_block_size()); + for (auto it = extents.begin(); it != extents.end();) { + auto& extent = *it; + auto wouldbe_length = record.get_wouldbe_encoded_record_length(extent); + if (_needs_roll(wouldbe_length)) { + // reached the segment end, write and roll + assert(!roll_promise.has_value()); + roll_promise = seastar::shared_promise<>(); + auto num_extents = record.get_num_extents(); + DEBUGT( + "end of segment, writing {} extents to segment {} at {}", + t, + num_extents, + current_segment->get_segment_id(), + allocated_to); + return (num_extents ? + _write(t, record) : + write_iertr::now() + ).si_then([this] { + return roll_segment(); + }).finally([this] { + roll_promise->set_value(); + roll_promise.reset(); + }).si_then([this, &t, &extents] { + if (!extents.empty()) { + return do_write(t, extents); } - - return segment_rotation_guard.wait( - [this] { - return !rolling_segment; - }, - [this, &record, &extents, &t]() -> write_iertr::future<> { - LOG_PREFIX(SegmentedAllocator::Writer::write); - record.set_base(allocated_to); - for (auto it = extents.begin(); - it != extents.end();) { - auto& extent = *it; - auto wouldbe_length = - record.get_wouldbe_encoded_record_length(extent); - if (_needs_roll(wouldbe_length)) { - // reached the segment end, write and roll - assert(!rolling_segment); - rolling_segment = true; - auto num_extents = record.get_num_extents(); - DEBUGT( - "end of segment, writing {} extents to segment {} at {}", - t, - num_extents, - current_segment->get_segment_id(), - allocated_to); - return (num_extents ? - _write(t, record) : - write_iertr::now() - ).si_then([this]() mutable { - return roll_segment(false); - }).finally([this] { - rolling_segment = false; - segment_rotation_guard.broadcast(); - }); - } - add_extent_to_write(record, extent); - it = extents.erase(it); - } - - DEBUGT( - "writing {} extents to segment {} at {}", - t, - record.get_num_extents(), - current_segment->get_segment_id(), - allocated_to); - return _write(t, record); - } - ).si_then([]() - -> write_iertr::future { - return seastar::make_ready_future< - seastar::stop_iteration>(seastar::stop_iteration::no); - }); + return write_iertr::now(); }); - }); - }; + } + DEBUGT("add extent to record -- {}", t, *extent); + extent->prepare_write(); + record.add_extent(extent); + it = extents.erase(it); + } - return seastar::with_gate(write_guard, - [this, write_func=std::move(write_func)]() mutable - { - if (rolling_segment) { - return segment_rotation_guard.wait([this] { - return !rolling_segment; - }, std::move(write_func)); + DEBUGT( + "writing {} extents to segment {} at {}", + t, + record.get_num_extents(), + current_segment->get_segment_id(), + allocated_to); + return _write(t, record); +} - } else if (!current_segment) { - return trans_intr::make_interruptible(roll_segment(true)).si_then( - [write_func=std::move(write_func)] { - return write_func(); +SegmentedAllocator::Writer::write_iertr::future<> +SegmentedAllocator::Writer::write( + Transaction& t, + std::list& extents) +{ + if (extents.empty()) { + return write_iertr::now(); + } + return seastar::with_gate(write_guard, [this, &t, &extents] { + if (!roll_promise.has_value() && !current_segment) { + roll_promise = seastar::shared_promise<>(); + return trans_intr::make_interruptible( + roll_segment().finally([this] { + roll_promise->set_value(); + roll_promise.reset(); + }) + ).si_then([this, &t, &extents] { + return do_write(t, extents); }); } - return write_func(); + return do_write(t, extents); }); } @@ -212,13 +197,10 @@ SegmentedAllocator::Writer::init_segment(Segment& segment) { } SegmentedAllocator::Writer::roll_segment_ertr::future<> -SegmentedAllocator::Writer::roll_segment(bool set_rolling) { +SegmentedAllocator::Writer::roll_segment() { LOG_PREFIX(SegmentedAllocator::Writer::roll_segment); - DEBUG("set_rolling {}", set_rolling); - if (set_rolling) { - rolling_segment = true; - } - assert(rolling_segment); + DEBUG("start"); + assert(roll_promise.has_value()); return [this, FNAME] { if (current_segment) { auto seg_to_close = std::move(current_segment); @@ -252,8 +234,6 @@ SegmentedAllocator::Writer::roll_segment(bool set_rolling) { ).safe_then([segref=std::move(segref), this, FNAME] { assert(!current_segment); current_segment = segref; - rolling_segment = false; - segment_rotation_guard.broadcast(); DEBUG("inited new segment: {}", segref->get_segment_id()); }); }).handle_error( diff --git a/src/crimson/os/seastore/extent_placement_manager.h b/src/crimson/os/seastore/extent_placement_manager.h index 5d97534f8588..c582f04e5892 100644 --- a/src/crimson/os/seastore/extent_placement_manager.h +++ b/src/crimson/os/seastore/extent_placement_manager.h @@ -4,8 +4,8 @@ #pragma once #include "seastar/core/gate.hh" +#include "seastar/core/shared_future.hh" -#include "crimson/common/condition_variable.h" #include "crimson/os/seastore/cached_extent.h" #include "crimson/os/seastore/logging.h" #include "crimson/os/seastore/segment_manager.h" @@ -191,6 +191,10 @@ class SegmentedAllocator : public ExtentAllocator { }); } private: + write_iertr::future<> do_write( + Transaction& t, + std::list& extent); + bool _needs_roll(seastore_off_t length) const; write_iertr::future<> _write( @@ -199,23 +203,18 @@ class SegmentedAllocator : public ExtentAllocator { using roll_segment_ertr = crimson::errorator< crimson::ct_error::input_output_error>; - roll_segment_ertr::future<> roll_segment(bool); + roll_segment_ertr::future<> roll_segment(); using init_segment_ertr = crimson::errorator< crimson::ct_error::input_output_error>; init_segment_ertr::future<> init_segment(Segment& segment); - void add_extent_to_write( - ool_record_t&, - LogicalCachedExtentRef& extent); - SegmentProvider& segment_provider; SegmentManager& segment_manager; SegmentRef current_segment; seastore_off_t allocated_to = 0; - crimson::condition_variable segment_rotation_guard; + std::optional> roll_promise; seastar::gate write_guard; - bool rolling_segment = false; }; public: SegmentedAllocator( -- 2.47.3