From: Yingxin Cheng Date: Mon, 14 Mar 2022 08:05:33 +0000 (+0800) Subject: crimson/os/seastore/EPM: integrate Writer with RecordSubmitter X-Git-Tag: v18.0.0~1193^2~7 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=417adf42f5b1111044ffc9893faa51bc0b3f9605;p=ceph-ci.git crimson/os/seastore/EPM: integrate Writer with RecordSubmitter Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/os/seastore/extent_placement_manager.cc b/src/crimson/os/seastore/extent_placement_manager.cc index e838fdd08ce..fcb9cb39478 100644 --- a/src/crimson/os/seastore/extent_placement_manager.cc +++ b/src/crimson/os/seastore/extent_placement_manager.cc @@ -3,7 +3,9 @@ #include "crimson/os/seastore/extent_placement_manager.h" -SET_SUBSYS(seastore_tm); +#include "crimson/common/config_proxy.h" + +SET_SUBSYS(seastore_journal); namespace crimson::os::seastore { @@ -22,35 +24,43 @@ SegmentedAllocator::SegmentedAllocator( ); } +SegmentedAllocator::Writer::Writer( + SegmentProvider& sp, + SegmentManager& sm) + : segment_allocator("OOL", segment_type_t::OOL, sp, sm), + record_submitter(crimson::common::get_conf( + "seastore_journal_iodepth_limit"), + crimson::common::get_conf( + "seastore_journal_batch_capacity"), + crimson::common::get_conf( + "seastore_journal_batch_flush_size"), + crimson::common::get_conf( + "seastore_journal_batch_preferred_fullness"), + segment_allocator) +{ +} + SegmentedAllocator::Writer::open_ertr::future<> SegmentedAllocator::Writer::open() { return segment_allocator.open().discard_result(); } -SegmentedAllocator::Writer::write_iertr::future<> -SegmentedAllocator::Writer::_write( +SegmentedAllocator::Writer::write_ertr::future<> +SegmentedAllocator::Writer::write_record( Transaction& t, record_t&& record, std::list&& extents) { - LOG_PREFIX(SegmentedAllocator::Writer::_write); + LOG_PREFIX(SegmentedAllocator::Writer::write_record); assert(extents.size()); assert(extents.size() == record.extents.size()); assert(!record.deltas.size()); - auto record_group = record_group_t( - std::move(record), segment_allocator.get_block_size()); - auto record_size = record_group.size; - ceph::bufferlist bl = encode_records( - record_group, - JOURNAL_SEQ_NULL, - segment_allocator.get_nonce()); // 0 - assert(bl.length() == record_size.get_encoded_length()); - - DEBUGT("writing {} bytes to segment {}", - t, bl.length(), segment_allocator.get_segment_id()); // account transactional ool writes before write() + // TODO: drop the incorrect size and fix the metrics + auto record_size = record_group_size_t( + record.size, segment_allocator.get_block_size()); auto& stats = t.get_ool_write_stats(); stats.extents.num += extents.size(); stats.extents.bytes += record_size.dlength; @@ -59,22 +69,23 @@ SegmentedAllocator::Writer::_write( stats.data_bytes += record_size.dlength; stats.num_records += 1; - return trans_intr::make_interruptible( - segment_allocator.write(bl) - ).si_then([FNAME, record_size, &t, - extents=std::move(extents)](write_result_t wr) mutable { - assert(wr.start_seq.segment_seq == OOL_SEG_SEQ); - paddr_t extent_addr = wr.start_seq.offset; - extent_addr = extent_addr.as_seg_paddr().add_offset( - record_size.get_mdlength()); + return record_submitter.submit(std::move(record) + ).safe_then([this, FNAME, &t, extents=std::move(extents) + ](record_locator_t ret) mutable { + assert(ret.write_result.start_seq.segment_seq == OOL_SEG_SEQ); + DEBUGT("{} finish with {} and {} extents", + t, segment_allocator.get_name(), + ret, extents.size()); + paddr_t extent_addr = ret.record_block_base; for (auto& extent : extents) { - TRACET("ool extent written at {} -- {}", t, *extent, extent_addr); + TRACET("{} ool extent written at {} -- {}", + t, segment_allocator.get_name(), + extent_addr, *extent); extent->hint = placement_hint_t::NUM_HINTS; // invalidate hint t.mark_delayed_extent_ool(extent, extent_addr); extent_addr = extent_addr.as_seg_paddr().add_offset( extent->get_length()); } - assert(extent_addr == wr.get_end_seq().offset); }); } @@ -85,15 +96,16 @@ SegmentedAllocator::Writer::do_write( { LOG_PREFIX(SegmentedAllocator::Writer::do_write); assert(!extents.empty()); - if (roll_promise.has_value()) { + if (!record_submitter.is_available()) { + DEBUGT("{} extents={} wait ...", + t, segment_allocator.get_name(), + extents.size()); return trans_intr::make_interruptible( - roll_promise->get_shared_future() - ).then_interruptible([this, &t, &extents] { + record_submitter.wait_available() + ).si_then([this, &t, &extents] { return do_write(t, extents); }); } - assert(segment_allocator.can_write()); - record_t record; std::list pending_extents; @@ -113,32 +125,33 @@ SegmentedAllocator::Writer::do_write( auto& extent = *it; record_size_t wouldbe_rsize = record.size; wouldbe_rsize.account_extent(extent->get_bptr().length()); - auto wouldbe_length = record_group_size_t( - wouldbe_rsize, segment_allocator.get_block_size() - ).get_encoded_length(); - if (segment_allocator.needs_roll(wouldbe_length)) { - // reached the segment end, write and roll - assert(!roll_promise.has_value()); - roll_promise = seastar::shared_promise<>(); + using action_t = journal::RecordSubmitter::action_t; + action_t action = record_submitter.check_action(wouldbe_rsize); + if (action == action_t::ROLL) { auto num_extents = pending_extents.size(); - DEBUGT("end of segment, writing {} extents", t, num_extents); - return (num_extents ? - _write(t, std::move(record), std::move(pending_extents)) : - write_iertr::now() - ).si_then([this] { - return segment_allocator.roll(); - }).finally([this] { - roll_promise->set_value(); - roll_promise.reset(); - }).si_then([this, &t, &extents] { - if (!extents.empty()) { - return do_write(t, extents); - } - return write_iertr::now(); + DEBUGT("{} extents={} submit {} extents and roll, unavailable ...", + t, segment_allocator.get_name(), + extents.size(), num_extents); + auto fut_write = write_ertr::now(); + if (num_extents > 0) { + assert(record_submitter.check_action(record.size) != + action_t::ROLL); + fut_write = write_record( + t, std::move(record), std::move(pending_extents)); + } + return trans_intr::make_interruptible( + record_submitter.roll_segment( + ).safe_then([fut_write=std::move(fut_write)]() mutable { + return std::move(fut_write); + }) + ).si_then([this, &t, &extents] { + return do_write(t, extents); }); } - DEBUGT("add extent to record -- {}", t, *extent); + TRACET("{} extents={} add extent to record -- {}", + t, segment_allocator.get_name(), + extents.size(), *extent); if (commit_type == record_commit_type_t::MODIFY) { extent->set_last_modified(commit_time); } else { @@ -156,10 +169,32 @@ SegmentedAllocator::Writer::do_write( extent->get_last_modified().time_since_epoch().count()}); pending_extents.push_back(extent); it = extents.erase(it); + + assert(record_submitter.check_action(record.size) == action); + if (action == action_t::SUBMIT_FULL) { + DEBUGT("{} extents={} submit {} extents ...", + t, segment_allocator.get_name(), + extents.size(), pending_extents.size()); + return trans_intr::make_interruptible( + write_record(t, std::move(record), std::move(pending_extents)) + ).si_then([this, &t, &extents] { + if (!extents.empty()) { + return do_write(t, extents); + } else { + return write_iertr::now(); + } + }); + } + // SUBMIT_NOT_FULL: evaluate the next extent } - DEBUGT("writing {} extents", t, pending_extents.size()); - return _write(t, std::move(record), std::move(pending_extents)); + auto num_extents = pending_extents.size(); + DEBUGT("{} submit the rest {} extents ...", + t, segment_allocator.get_name(), + num_extents); + assert(num_extents > 0); + return trans_intr::make_interruptible( + write_record(t, std::move(record), std::move(pending_extents))); } SegmentedAllocator::Writer::write_iertr::future<> diff --git a/src/crimson/os/seastore/extent_placement_manager.h b/src/crimson/os/seastore/extent_placement_manager.h index f9444bf7eb3..263dc0cf421 100644 --- a/src/crimson/os/seastore/extent_placement_manager.h +++ b/src/crimson/os/seastore/extent_placement_manager.h @@ -4,7 +4,6 @@ #pragma once #include "seastar/core/gate.hh" -#include "seastar/core/shared_future.hh" #include "crimson/os/seastore/cached_extent.h" #include "crimson/os/seastore/journal/segment_allocator.h" @@ -27,7 +26,8 @@ public: using open_ertr = base_ertr; virtual open_ertr::future<> open() = 0; - using write_iertr = trans_iertr; + using write_ertr = base_ertr; + using write_iertr = trans_iertr; virtual write_iertr::future<> write( Transaction& t, std::list& extent) = 0; @@ -75,9 +75,7 @@ class SegmentProvider; class SegmentedAllocator : public ExtentAllocator { class Writer : public ExtentOolWriter { public: - Writer(SegmentProvider& sp, SegmentManager& sm) - : segment_allocator("OOL", segment_type_t::OOL, sp, sm) {} - + Writer(SegmentProvider& sp, SegmentManager& sm); Writer(Writer &&) = default; open_ertr::future<> open() final; @@ -99,13 +97,13 @@ class SegmentedAllocator : public ExtentAllocator { Transaction& t, std::list& extent); - write_iertr::future<> _write( + write_ertr::future<> write_record( Transaction& t, record_t&& record, std::list&& extents); journal::SegmentAllocator segment_allocator; - std::optional> roll_promise; + journal::RecordSubmitter record_submitter; seastar::gate write_guard; }; public: @@ -131,7 +129,7 @@ public: Transaction& t, std::list& extents) final { LOG_PREFIX(SegmentedAllocator::alloc_ool_extents_paddr); - SUBDEBUGT(seastore_tm, "start", t); + SUBDEBUGT(seastore_journal, "start", t); return seastar::do_with( std::map>(), [this, extents=std::move(extents), &t](auto& alloc_map) { @@ -164,7 +162,7 @@ public: void add_allocator(device_type_t type, ExtentAllocatorRef&& allocator) { allocators[type].emplace_back(std::move(allocator)); LOG_PREFIX(ExtentPlacementManager::add_allocator); - SUBDEBUG(seastore_tm, "allocators for {}: {}", + SUBDEBUG(seastore_journal, "allocators for {}: {}", type, allocators[type].size()); } @@ -172,7 +170,7 @@ public: using open_ertr = ExtentOolWriter::open_ertr; open_ertr::future<> open() { LOG_PREFIX(ExtentPlacementManager::open); - SUBINFO(seastore_tm, "started"); + SUBINFO(seastore_journal, "started"); return crimson::do_for_each(allocators, [](auto& allocators_item) { return crimson::do_for_each(allocators_item.second, [](auto& allocator) { return allocator->open(); @@ -228,7 +226,7 @@ public: Transaction& t, const std::list& delayed_extents) { LOG_PREFIX(ExtentPlacementManager::delayed_alloc_or_ool_write); - SUBDEBUGT(seastore_tm, "start with {} delayed extents", + SUBDEBUGT(seastore_journal, "start with {} delayed extents", t, delayed_extents.size()); return seastar::do_with( std::map>(), @@ -251,7 +249,7 @@ public: using close_ertr = ExtentOolWriter::stop_ertr; close_ertr::future<> close() { LOG_PREFIX(ExtentPlacementManager::close); - SUBINFO(seastore_tm, "started"); + SUBINFO(seastore_journal, "started"); return crimson::do_for_each(allocators, [](auto& allocators_item) { return crimson::do_for_each(allocators_item.second, [](auto& allocator) { return allocator->stop();