From 092a9399214f72b9abb7545bfd5ac315f95837cb Mon Sep 17 00:00:00 2001 From: Xuehan Xu Date: Mon, 18 Aug 2025 14:16:21 +0800 Subject: [PATCH] crimson/os/seastore/cache: add "read_extents_maybe_partial" Signed-off-by: Xuehan Xu --- src/crimson/os/seastore/cache.h | 172 +++++++++++++++++- .../os/seastore/extent_placement_manager.h | 7 + 2 files changed, 177 insertions(+), 2 deletions(-) diff --git a/src/crimson/os/seastore/cache.h b/src/crimson/os/seastore/cache.h index d4acc3da87d..386050c2044 100644 --- a/src/crimson/os/seastore/cache.h +++ b/src/crimson/os/seastore/cache.h @@ -418,9 +418,9 @@ public: PLACEMENT_HINT_NULL, NULL_GENERATION, TRANS_ID_NULL); - SUBDEBUG(seastore_cache, + SUBDEBUGT(seastore_cache, "{} {}~0x{:x} is absent, add extent and reading range 0x{:x}~0x{:x} ... -- {}", - T::TYPE, offset, length, partial_off, partial_len, *ret); + t, T::TYPE, offset, length, partial_off, partial_len, *ret); add_extent(ret); extent_init_func(*ret); cache_access_stats_t& access_stats = get_by_ext( @@ -1608,6 +1608,55 @@ public: booting = false; extents_index.clear(); } + + template + struct read_extent_t { + TCachedExtentRef extent; + const extent_len_t offset = 0; + const extent_len_t length = 0; + }; + template + get_extent_iertr::future<> read_extents_maybe_partial( + Transaction &t, + std::vector> &&exts) + { + LOG_PREFIX(Cache::read_extents_maybe_partial); + auto extents = std::move(exts); + std::vector> read_extent_futs; + std::vector> absent_extents; + for (auto &ext : extents) { + auto &extent = ext.extent; + SUBDEBUGT(seastore_cache, "reading extent {} 0x{:x}~0x{:x} ...", + t, *extent, ext.offset, ext.length); + assert(is_aligned(ext.offset, get_block_size())); + assert(is_aligned(ext.length, get_block_size())); + assert(extent->get_paddr().is_absolute()); + if (extent->is_range_loaded(ext.offset, ext.length)) { + // the range of the extent has already been loaded, just do wait_io + SUBDEBUG(seastore_cache, "extent loaded"); + read_extent_futs.emplace_back( + trans_intr::make_interruptible( + extent->wait_io() + ).then_interruptible([] { return get_extent_iertr::now(); })); + continue; + } + if (extent->is_pending_io()) { + // the extent is pending on an outstanding io, + // fallback to single extent loading + read_extent_futs.emplace_back( + read_extent_maybe_partial(t, extent, ext.offset, ext.length + ).discard_result()); + continue; + } + assert(extent->state == CachedExtent::extent_state_t::EXIST_CLEAN || + extent->state == CachedExtent::extent_state_t::CLEAN); + absent_extents.emplace_back(ext); + } + co_await read_absent_extents_maybe_partial(t, std::move(absent_extents)); + co_await trans_intr::parallel_for_each( + read_extent_futs, [](auto &fut) { return std::move(fut); }); + } + private: void touch_extent_fully( CachedExtent &ext, @@ -1995,6 +2044,125 @@ private: }); } + template + get_extent_ertr::future<> read_absent_extents_maybe_partial( + Transaction &t, + std::vector> &&exts) + { + LOG_PREFIX(Cache::read_absent_extents_maybe_partial); + auto extents = std::move(exts); +#ifndef NDEBUG + for (auto &ext : extents) { + assert(!ext.extent->is_pending_io()); + } +#endif + struct range_to_read_t { + paddr_t addr = P_ADDR_NULL; + load_range_t range; + }; + struct extent_read_t { + TCachedExtentRef extent; + bool fully_loaded = false; + }; + std::vector ranges_to_read; + std::vector extents_read; + std::vector> read_extent_futs; + const auto t_src = t.get_src(); + // get all the ranges of extents that are to be loaded + for (auto &ext : extents) { + auto &extent = ext.extent; + SUBDEBUGT(seastore_cache, "reading extent {} 0x{:x}~0x{:x} ...", + t, *extent, ext.offset, ext.length); + assert(extent->state == CachedExtent::extent_state_t::EXIST_CLEAN || + extent->state == CachedExtent::extent_state_t::CLEAN); + if (extents_read.empty() || + extents_read.back().extent != extent) { + extents_read.emplace_back(extent, false); + } + if (!extent->is_pending_io()) { + extent->set_io_wait(extent->state, false); + } + auto old_length = extent->get_loaded_length(); + load_ranges_t to_read = extent->load_ranges(ext.offset, ext.length); + auto new_length = extent->get_loaded_length(); + assert(new_length > old_length); + pinboard->increase_cached_size(*extent, new_length - old_length, &t_src); + for (auto &range : to_read.ranges) { + auto range_paddr = extent->get_paddr() + range.offset; + ranges_to_read.emplace_back(range_to_read_t{range_paddr, range}); + } + extents_read.back().fully_loaded = extent->is_fully_loaded(); + } + paddr_t off = P_ADDR_NULL; + extent_len_t len = 0; + std::vector> futs; + std::vector batch; + // load ranges that are successive in the paddr space with + // a single readv request + for (auto &range : ranges_to_read) { + if (off == P_ADDR_NULL) { + off = range.addr; + len += range.range.ptr.length(); + batch.emplace_back(std::move(range.range.ptr)); + } else if (off + len == range.addr) { + len += range.range.ptr.length(); + batch.emplace_back(std::move(range.range.ptr)); + } else { + futs.emplace_back(epm.readv(off, std::move(batch))); + len = range.range.ptr.length(); + off = range.addr; + batch.emplace_back(std::move(range.range.ptr)); + } + } + if (!batch.empty()) { + futs.emplace_back(epm.readv(off, std::move(batch))); + len = 0; + off = P_ADDR_NULL; + } + + // TODO: when_all_succeed should be utilized here, however, it doesn't + // actually work with interruptible errorated futures for now. + co_await ExtentPlacementManager::read_ertr::parallel_for_each( + futs, [](auto &fut) { return std::move(fut); + }).handle_error( + get_extent_ertr::pass_further{}, + crimson::ct_error::assert_all{ + "Cache::read_extent: invalid error" + } + ); + for (auto &ext : extents_read) { + auto &extent = ext.extent; + ceph_assert(extent->state == CachedExtent::extent_state_t::EXIST_CLEAN + || extent->state == CachedExtent::extent_state_t::CLEAN + || !extent->is_valid()); + if (ext.fully_loaded) { + if (extent->is_fully_loaded()) { + // crc will be checked against LBA leaf entry for logical extents, + // or check against in-extent crc for physical extents. + if (epm.get_checksum_needed(extent->get_paddr())) { + extent->last_committed_crc = extent->calc_crc32c(); + } else { + extent->last_committed_crc = CRC_NULL; + } + // on_clean_read() may change the content, + // call after calc_crc32c() + extent->on_clean_read(); + SUBDEBUGT(seastore_cache, "read extent done -- {}", t, *extent); + } else { + extent->last_committed_crc = CRC_NULL; + SUBDEBUGT(seastore_cache, + "read extent done (partial) -- {}", t, *extent); + } + } else { + SUBDEBUGT(seastore_cache, + "read extent done (invalidated) -- {}", t, *extent); + } + if (extent->is_pending_io()) { + extent->complete_io(); + } + } + } + // Extents in cache may contain placeholders CachedExtentRef query_cache(paddr_t offset) { if (auto iter = extents_index.find_offset(offset); diff --git a/src/crimson/os/seastore/extent_placement_manager.h b/src/crimson/os/seastore/extent_placement_manager.h index a5a6846e856..f6984a7719d 100644 --- a/src/crimson/os/seastore/extent_placement_manager.h +++ b/src/crimson/os/seastore/extent_placement_manager.h @@ -531,6 +531,13 @@ public: return devices_by_id[addr.get_device_id()]->read(addr, len, out); } + read_ertr::future<> readv( + paddr_t addr, + std::vector ptrs) { + assert(devices_by_id[addr.get_device_id()] != nullptr); + return devices_by_id[addr.get_device_id()]->readv(addr, std::move(ptrs)); + } + void mark_space_used(paddr_t addr, extent_len_t len) { background_process.mark_space_used(addr, len); } -- 2.47.3