From 69971fc44a07d6efda03b5d3c02ea5bf1b590ef6 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Fri, 10 Oct 2025 16:02:51 +0000 Subject: [PATCH] crimson/.../transaction_manager: convert get_extents_if_live to coroutine Signed-off-by: Samuel Just --- .../os/seastore/transaction_manager.cc | 160 +++++++----------- 1 file changed, 63 insertions(+), 97 deletions(-) diff --git a/src/crimson/os/seastore/transaction_manager.cc b/src/crimson/os/seastore/transaction_manager.cc index 4864413ebe8..7987d6b902c 100644 --- a/src/crimson/os/seastore/transaction_manager.cc +++ b/src/crimson/os/seastore/transaction_manager.cc @@ -795,106 +795,72 @@ TransactionManager::get_extents_if_live( // as parallel transactions may split the extent at the same time. ceph_assert(paddr.is_absolute_segmented()); - return cache->get_extent_if_cached(t, paddr, len, type - ).si_then([this, FNAME, type, paddr, laddr, len, &t](auto extent) - -> get_extents_if_live_ret { - if (extent) { - DEBUGT("{} {}~0x{:x} {} is cached and alive -- {}", - t, type, laddr, len, paddr, *extent); - assert(extent->get_length() == len); - std::list res; - res.emplace_back(std::move(extent)); - return get_extents_if_live_ret( - interruptible::ready_future_marker{}, - res); - } + auto extent = co_await cache->get_extent_if_cached(t, paddr, len, type); + std::list res; + if (extent) { + DEBUGT("{} {}~0x{:x} {} is cached and alive -- {}", + t, type, laddr, len, paddr, *extent); + assert(extent->get_length() == len); + std::list res; + res.emplace_back(std::move(extent)); + } else if (is_logical_type(type)) { + auto pin_list = co_await lba_manager->get_mappings( + t, + laddr, + len + ); + auto paddr_seg_id = paddr.as_seg_paddr().get_segment_id(); + for (auto &pin : pin_list) { + DEBUGT("got pin, try read in parallel ... -- {}", t, pin); + auto pin_paddr = pin.get_val(); + if (!pin_paddr.is_absolute_segmented()) { + continue; + } + auto &pin_seg_paddr = pin_paddr.as_seg_paddr(); + auto pin_paddr_seg_id = pin_seg_paddr.get_segment_id(); + // auto pin_len = pin->get_length(); + if (pin_paddr_seg_id != paddr_seg_id) { + continue; + } - if (is_logical_type(type)) { - return lba_manager->get_mappings( - t, - laddr, - len - ).si_then([this, FNAME, type, paddr, laddr, len, &t](lba_mapping_list_t pin_list) { - return seastar::do_with( - std::list(), - std::move(pin_list), - [this, FNAME, type, paddr, laddr, len, &t] - (std::list &extent_list, auto& pin_list) - { - auto paddr_seg_id = paddr.as_seg_paddr().get_segment_id(); - return trans_intr::parallel_for_each( - pin_list, - [this, FNAME, type, paddr_seg_id, &extent_list, &t]( - LBAMapping& pin) -> Cache::get_extent_iertr::future<> - { - DEBUGT("got pin, try read in parallel ... -- {}", t, pin); - auto pin_paddr = pin.get_val(); - if (!pin_paddr.is_absolute_segmented()) { - return seastar::now(); - } - auto &pin_seg_paddr = pin_paddr.as_seg_paddr(); - auto pin_paddr_seg_id = pin_seg_paddr.get_segment_id(); - // auto pin_len = pin->get_length(); - if (pin_paddr_seg_id != paddr_seg_id) { - return seastar::now(); - } - - // pin may be out of the range paddr~len, consider the following scene: - // 1. Trans.A writes the final record of Segment S, in which it overwrite - // another extent E in the same segment S; - // 2. Before Trans.A "complete_commit", Trans.B tries to rewrite new - // records and roll the segments, which closes Segment S; - // 3. Before Trans.A "complete_commit", a new cleaner Transaction C tries - // to clean the segment; - // - // In this scenario, C might see a part of extent E's laddr space mapped - // to another location within the same segment S. - // - // FIXME: this assert should be re-enabled once we have space reclaiming - // recognize committed segments: https://tracker.ceph.com/issues/66941 - // ceph_assert(pin_seg_paddr >= paddr && - // pin_seg_paddr.add_offset(pin_len) <= paddr.add_offset(len)); - return read_pin_by_type(t, std::move(pin), type - ).si_then([&extent_list](auto ret) { - extent_list.emplace_back(std::move(ret)); - return seastar::now(); - }); - }).si_then([&extent_list, &t, FNAME, type, laddr, len, paddr] { - DEBUGT("{} {}~0x{:x} {} is alive as {} extents", - t, type, laddr, len, paddr, extent_list.size()); - return get_extents_if_live_ret( - interruptible::ready_future_marker{}, - std::move(extent_list)); - }); - }); - }).handle_error_interruptible(crimson::ct_error::enoent::handle([] { - return get_extents_if_live_ret( - interruptible::ready_future_marker{}, - std::list()); - }), crimson::ct_error::pass_further_all{}); + // pin may be out of the range paddr~len, consider the following scene: + // 1. Trans.A writes the final record of Segment S, in which it overwrite + // another extent E in the same segment S; + // 2. Before Trans.A "complete_commit", Trans.B tries to rewrite new + // records and roll the segments, which closes Segment S; + // 3. Before Trans.A "complete_commit", a new cleaner Transaction C tries + // to clean the segment; + // + // In this scenario, C might see a part of extent E's laddr space mapped + // to another location within the same segment S. + // + // FIXME: this assert should be re-enabled once we have space reclaiming + // recognize committed segments: https://tracker.ceph.com/issues/66941 + // ceph_assert(pin_seg_paddr >= paddr && + // pin_seg_paddr.add_offset(pin_len) <= paddr.add_offset(len)); + auto ret = co_await read_pin_by_type(t, std::move(pin), type); + res.emplace_back(std::move(ret)); + } + DEBUGT("{} {}~0x{:x} {} is alive as {} extents", + t, type, laddr, len, paddr, res.size()); + } else { + auto ext = co_await lba_manager->get_physical_extent_if_live( + t, + type, + paddr, + laddr, + len + ); + if (ext) { + DEBUGT("{} {}~0x{:x} {} is absent and alive as physical extent -- {}", + t, type, laddr, len, paddr, *ext); + res.emplace_back(std::move(ext)); } else { - return lba_manager->get_physical_extent_if_live( - t, - type, - paddr, - laddr, - len - ).si_then([=, &t](auto ret) { - std::list res; - if (ret) { - DEBUGT("{} {}~0x{:x} {} is absent and alive as physical extent -- {}", - t, type, laddr, len, paddr, *ret); - res.emplace_back(std::move(ret)); - } else { - DEBUGT("{} {}~0x{:x} {} is not alive as physical extent", - t, type, laddr, len, paddr); - } - return get_extents_if_live_ret( - interruptible::ready_future_marker{}, - std::move(res)); - }); + DEBUGT("{} {}~0x{:x} {} is not alive as physical extent", + t, type, laddr, len, paddr); } - }); + } + co_return res; } TransactionManager::~TransactionManager() {} -- 2.47.3