From 7ebf3590de92cc64e6e25322934cc0cf71ebedc0 Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Tue, 15 Feb 2022 09:28:04 +0800 Subject: [PATCH] crimson/os/seastore/epm: simplify gating writes for Writer Dropped open_segment_wrapper_t. Signed-off-by: Yingxin Cheng --- .../os/seastore/extent_placement_manager.cc | 125 ++++++++---------- .../os/seastore/extent_placement_manager.h | 27 ++-- 2 files changed, 66 insertions(+), 86 deletions(-) diff --git a/src/crimson/os/seastore/extent_placement_manager.cc b/src/crimson/os/seastore/extent_placement_manager.cc index 796c63ce932..3d92d4b329e 100644 --- a/src/crimson/os/seastore/extent_placement_manager.cc +++ b/src/crimson/os/seastore/extent_placement_manager.cc @@ -37,25 +37,23 @@ SegmentedAllocator::Writer::_write( Transaction& t, ool_record_t& record) { + LOG_PREFIX(SegmentedAllocator::Writer::_write); auto record_size = record.get_encoded_record_length(); allocated_to += record_size.get_encoded_length(); segment_provider.update_segment_avail_bytes( paddr_t::make_seg_paddr( - current_segment->segment->get_segment_id(), + current_segment->get_segment_id(), allocated_to)); bufferlist bl = record.encode( - current_segment->segment->get_segment_id(), + current_segment->get_segment_id(), 0); - seastar::promise<> pr; - current_segment->inflight_writes.emplace_back(pr.get_future()); - LOG_PREFIX(SegmentedAllocator::Writer::_write); DEBUGT( "written {} extents, {} bytes to segment {} at {}", t, record.get_num_extents(), bl.length(), - current_segment->segment->get_segment_id(), + current_segment->get_segment_id(), record.get_base()); // account transactional ool writes before write() @@ -68,18 +66,10 @@ SegmentedAllocator::Writer::_write( stats.num_records += 1; return trans_intr::make_interruptible( - current_segment->segment->write(record.get_base(), bl).safe_then( - [this, FNAME, pr=std::move(pr), &t, - it=(--current_segment->inflight_writes.end()), - cs=current_segment]() mutable { - if (cs->outdated) { - DEBUGT("segment rolled", t); - pr.set_value(); - } else{ - DEBUGT("segment not rolled", t); - current_segment->inflight_writes.erase(it); - } - return seastar::now(); + current_segment->write(record.get_base(), bl + ).safe_then([FNAME, &t, &record, cs=current_segment] { + DEBUGT("written {} {}", + t, cs->get_segment_id(), record.get_base()); }) ).si_then([FNAME, &record, &t] { for (auto& ool_extent : record.get_extents()) { @@ -140,7 +130,7 @@ SegmentedAllocator::Writer::write( "end of segment, writing {} extents to segment {} at {}", t, num_extents, - current_segment->segment->get_segment_id(), + current_segment->get_segment_id(), allocated_to); return (num_extents ? _write(t, record) : @@ -160,7 +150,7 @@ SegmentedAllocator::Writer::write( "writing {} extents to segment {} at {}", t, record.get_num_extents(), - current_segment->segment->get_segment_id(), + current_segment->get_segment_id(), allocated_to); return _write(t, record); } @@ -173,22 +163,26 @@ SegmentedAllocator::Writer::write( }); }; - if (rolling_segment) { - return segment_rotation_guard.wait([this] { - return !rolling_segment; - }, std::move(write_func)); + return seastar::with_gate(write_guard, + [this, write_func=std::move(write_func)]() mutable + { + if (rolling_segment) { + return segment_rotation_guard.wait([this] { + return !rolling_segment; + }, std::move(write_func)); - } else if (!current_segment) { - return trans_intr::make_interruptible(roll_segment(true)).si_then( - [write_func=std::move(write_func)] { - return write_func(); - }); - } - return write_func(); + } else if (!current_segment) { + return trans_intr::make_interruptible(roll_segment(true)).si_then( + [write_func=std::move(write_func)] { + return write_func(); + }); + } + return write_func(); + }); } bool SegmentedAllocator::Writer::_needs_roll(seastore_off_t length) const { - return allocated_to + length > current_segment->segment->get_write_capacity(); + return allocated_to + length > current_segment->get_write_capacity(); } SegmentedAllocator::Writer::init_segment_ertr::future<> @@ -225,44 +219,39 @@ SegmentedAllocator::Writer::roll_segment(bool set_rolling) { rolling_segment = true; } assert(rolling_segment); - if (current_segment) { - (void) seastar::with_gate(writer_guard, [this] { - auto fut = seastar::now(); - if (!current_segment->inflight_writes.empty()) { - fut = seastar::when_all_succeed( - current_segment->inflight_writes.begin(), - current_segment->inflight_writes.end()); - } - current_segment->outdated = true; - return fut.then( - [cs=std::move(current_segment), this, it=(--open_segments.end())] { - return cs->segment->close().safe_then([this, cs, it] { - LOG_PREFIX(SegmentedAllocator::Writer::roll_segment); - assert((*it).get() == cs.get()); - segment_provider.close_segment(cs->segment->get_segment_id()); - open_segments.erase(it); - DEBUG("closed segment: {}", cs->segment->get_segment_id()); + return [this, FNAME] { + if (current_segment) { + auto seg_to_close = std::move(current_segment); + if (write_guard.is_closed()) { + DEBUG("write_guard is closed, should be stopping"); + return seg_to_close->close( + ).safe_then([seg_to_close=std::move(seg_to_close)] {}); + } else { + DEBUG("rolling OOL segment, close {} ...", seg_to_close->get_segment_id()); + (void) seastar::with_gate(write_guard, + [this, seg_to_close=std::move(seg_to_close)]() mutable + { + return seg_to_close->close( + ).safe_then([this, seg_to_close=std::move(seg_to_close)] { + segment_provider.close_segment(seg_to_close->get_segment_id()); + }); }); - }); - }).handle_exception_type([](seastar::gate_closed_exception e) { - LOG_PREFIX(SegmentedAllocator::Writer::roll_segment); - DEBUG(" writer_guard closed, should be stopping"); - return seastar::now(); - }); - } - - auto new_segment_id = segment_provider.get_segment( - segment_manager.get_device_id(), OOL_SEG_SEQ); - return segment_manager.open(new_segment_id - ).safe_then([this](auto segref) { - LOG_PREFIX(SegmentedAllocator::Writer::roll_segment); + return Segment::close_ertr::now(); + } + } else { + DEBUG("rolling OOL segment, no current ..."); + return Segment::close_ertr::now(); + } + }().safe_then([this] { + auto new_segment_id = segment_provider.get_segment( + segment_manager.get_device_id(), OOL_SEG_SEQ); + return segment_manager.open(new_segment_id); + }).safe_then([this, FNAME](auto segref) { DEBUG("opened new segment: {}", segref->get_segment_id()); - return init_segment(*segref).safe_then([segref=std::move(segref), this] { - LOG_PREFIX(SegmentedAllocator::Writer::roll_segment); - assert(!current_segment.get()); - current_segment.reset(new open_segment_wrapper_t()); - current_segment->segment = segref; - open_segments.emplace_back(current_segment); + return init_segment(*segref + ).safe_then([segref=std::move(segref), this, FNAME] { + assert(!current_segment); + current_segment = segref; rolling_segment = false; segment_rotation_guard.broadcast(); DEBUG("inited new segment: {}", segref->get_segment_id()); diff --git a/src/crimson/os/seastore/extent_placement_manager.h b/src/crimson/os/seastore/extent_placement_manager.h index 485ed4a069b..5d97534f858 100644 --- a/src/crimson/os/seastore/extent_placement_manager.h +++ b/src/crimson/os/seastore/extent_placement_manager.h @@ -151,17 +151,6 @@ public: }; using ExtentAllocatorRef = std::unique_ptr; -struct open_segment_wrapper_t : public boost::intrusive_ref_counter< - open_segment_wrapper_t, - boost::thread_unsafe_counter> { - SegmentRef segment; - std::list> inflight_writes; - bool outdated = false; -}; - -using open_segment_wrapper_ref = - boost::intrusive_ptr; - class SegmentProvider; /** @@ -191,11 +180,14 @@ class SegmentedAllocator : public ExtentAllocator { write_iertr::future<> write( Transaction& t, std::list& extent) final; + stop_ertr::future<> stop() final { - return writer_guard.close().then([this] { - return crimson::do_for_each(open_segments, [](auto& seg_wrapper) { - return seg_wrapper->segment->close(); - }); + return write_guard.close().then([this] { + if (current_segment) { + return current_segment->close(); + } else { + return Segment::close_ertr::now(); + } }); } private: @@ -219,11 +211,10 @@ class SegmentedAllocator : public ExtentAllocator { SegmentProvider& segment_provider; SegmentManager& segment_manager; - open_segment_wrapper_ref current_segment; - std::list open_segments; + SegmentRef current_segment; seastore_off_t allocated_to = 0; crimson::condition_variable segment_rotation_guard; - seastar::gate writer_guard; + seastar::gate write_guard; bool rolling_segment = false; }; public: -- 2.39.5