From f109ce9b85cac5625a5f15861cdbdeef7f56ef69 Mon Sep 17 00:00:00 2001 From: chunmei liu Date: Mon, 24 Mar 2025 00:30:25 -0700 Subject: [PATCH] crimson/osd/../scrub_events: replace omap_get_values() by omap_iterate() Signed-off-by: chunmei liu --- .../osd/osd_operations/scrub_events.cc | 258 ++++++++++-------- 1 file changed, 138 insertions(+), 120 deletions(-) diff --git a/src/crimson/osd/osd_operations/scrub_events.cc b/src/crimson/osd/osd_operations/scrub_events.cc index df404014db0c9..c2fd916ff714f 100644 --- a/src/crimson/osd/osd_operations/scrub_events.cc +++ b/src/crimson/osd/osd_operations/scrub_events.cc @@ -7,6 +7,7 @@ #include "crimson/osd/osd_connection_priv.h" #include "messages/MOSDRepScrubMap.h" #include "scrub_events.h" +#include "crimson/os/futurized_store.h" SET_SUBSYS(osd); @@ -231,127 +232,144 @@ ScrubScan::ifut<> ScrubScan::deep_scan_object( auto &entry = ret.objects[obj.hobj]; auto progress_ref = std::make_unique(); auto &progress = *progress_ref; - return interruptor::repeat( + + co_await interruptor::repeat( [FNAME, this, &progress, &obj, &entry, &pg]() - -> interruptible_future { - if (progress.offset) { - DEBUGDPP("op: {}, obj: {}, progress: {} scanning data", - pg, *this, obj, progress); - const auto stride = local_conf().get_val( - "osd_deep_scrub_stride"); - return pg.shard_services.get_store().read( - pg.get_collection_ref(), - obj, - *(progress.offset), - stride - ).safe_then([this, FNAME, stride, &obj, &progress, &entry, &pg](auto bl) { - size_t offset = *progress.offset; - DEBUGDPP("op: {}, obj: {}, progress: {} got offset {}", - pg, *this, obj, progress, offset); - progress.data_hash << bl; - if (bl.length() < stride) { - progress.offset = std::nullopt; - entry.digest = progress.data_hash.digest(); - entry.digest_present = true; - } else { - ceph_assert(stride == bl.length()); - *(progress.offset) += stride; - } - }).handle_error( - ct_error::all_same_way([&progress, &entry](auto e) { - entry.read_error = true; - progress.offset = std::nullopt; - return seastar::now(); - }) - ).then([] { - return interruptor::make_interruptible( - seastar::make_ready_future( - seastar::stop_iteration::no)); - }); - } else if (!progress.header_done) { - DEBUGDPP("op: {}, obj: {}, progress: {} scanning omap header", - pg, *this, obj, progress); - return pg.shard_services.get_store().omap_get_header( - pg.get_collection_ref(), - obj - ).safe_then([&progress](auto bl) { - progress.omap_hash << bl; - }).handle_error( - ct_error::enodata::handle([] { return seastar::now(); }), - ct_error::all_same_way([&entry](auto e) { - entry.read_error = true; - return seastar::now(); - }) - ).then([&progress] { - progress.header_done = true; - return interruptor::make_interruptible( - seastar::make_ready_future( - seastar::stop_iteration::no)); - }); - } else if (!progress.keys_done) { - DEBUGDPP("op: {}, obj: {}, progress: {} scanning omap keys", - pg, *this, obj, progress); - return pg.shard_services.get_store().omap_get_values( - pg.get_collection_ref(), - obj, - progress.next_key - ).safe_then([FNAME, this, &obj, &progress, &entry, &pg](auto result) { - const auto &[done, omap] = result; - DEBUGDPP("op: {}, obj: {}, progress: {} got {} keys", - pg, *this, obj, progress, omap.size()); - for (const auto &p : omap) { - bufferlist bl; - encode(p.first, bl); - encode(p.second, bl); - progress.omap_hash << bl; - entry.object_omap_keys++; - entry.object_omap_bytes += p.second.length(); - } - if (done) { - DEBUGDPP("op: {}, obj: {}, progress: {} omap done", - pg, *this, obj, progress); - progress.keys_done = true; - entry.omap_digest = progress.omap_hash.digest(); - entry.omap_digest_present = true; - - if ((entry.object_omap_keys > - local_conf().get_val( - "osd_deep_scrub_large_omap_object_key_threshold")) || - (entry.object_omap_bytes > - local_conf().get_val( - "osd_deep_scrub_large_omap_object_value_sum_threshold"))) { - entry.large_omap_object_found = true; - entry.large_omap_object_key_count = entry.object_omap_keys; - ret.has_large_omap_object_errors = true; - } - } else { - ceph_assert(!omap.empty()); // omap_get_values invariant - DEBUGDPP("op: {}, obj: {}, progress: {} omap not done, next {}", - pg, *this, obj, progress, omap.crbegin()->first); - progress.next_key = omap.crbegin()->first; - } - }).handle_error( - ct_error::all_same_way([FNAME, this, &obj, &progress, &entry, &pg] - (auto e) { - DEBUGDPP("op: {}, obj: {}, progress: {} error reading omap {}", - pg, *this, obj, progress, e); - progress.keys_done = true; - entry.read_error = true; - return seastar::now(); - }) - ).then([] { - return interruptor::make_interruptible( - seastar::make_ready_future( - seastar::stop_iteration::no)); - }); - } else { - DEBUGDPP("op: {}, obj: {}, progress: {} done", - pg, *this, obj, progress); - return interruptor::make_interruptible( - seastar::make_ready_future( - seastar::stop_iteration::yes)); - } - }).finally([progress_ref=std::move(progress_ref)] {}); + -> interruptible_future + { + auto store_read = [FNAME, this, &progress, &obj, &entry, &pg]() + -> interruptible_future + { + DEBUGDPP("op: {}, obj: {}, progress: {} scanning data", + pg, *this, obj, progress); + const auto stride = local_conf().get_val( + "osd_deep_scrub_stride"); + return pg.shard_services.get_store().read( + pg.get_collection_ref(), + obj, + *(progress.offset), + stride + ).safe_then([this, FNAME, stride, &obj, &progress, &entry, &pg](auto bl) { + size_t offset = *progress.offset; + DEBUGDPP("op: {}, obj: {}, progress: {} got offset {}", + pg, *this, obj, progress, offset); + progress.data_hash << bl; + if (bl.length() < stride) { + progress.offset = std::nullopt; + entry.digest = progress.data_hash.digest(); + entry.digest_present = true; + } else { + ceph_assert(stride == bl.length()); + *(progress.offset) += stride; + } + }).handle_error( + ct_error::all_same_way([&progress, &entry](auto e) { + entry.read_error = true; + progress.offset = std::nullopt; + return seastar::now(); + }) + ).then([] { + return seastar::make_ready_future( + seastar::stop_iteration::no); + }); + }; + + auto get_header = [FNAME, this, &progress, &obj, &entry, &pg]() + -> interruptible_future + { + DEBUGDPP("op: {}, obj: {}, progress: {} scanning omap header", + pg, *this, obj, progress); + return pg.shard_services.get_store().omap_get_header( + pg.get_collection_ref(), + obj + ).safe_then([&progress](auto bl) { + progress.omap_hash << bl; + }).handle_error( + ct_error::enodata::handle([] { return seastar::now(); }), + ct_error::all_same_way([&entry](auto e) { + entry.read_error = true; + return seastar::now(); + }) + ).then([&progress] { + progress.header_done = true; + return seastar::make_ready_future( + seastar::stop_iteration::no); + }); + }; + + ObjectStore::omap_iter_seek_t start_from; + start_from.seek_position = progress.next_key.has_value() ? + progress.next_key.value() : std::string{}; + start_from.seek_type = ObjectStore::omap_iter_seek_t::UPPER_BOUND; + + std::function callback = + [&progress, &entry] (std::string_view key, std::string_view value) + { + bufferlist bl; + encode(key, bl); + encode(value, bl); + progress.omap_hash << bl; + entry.object_omap_keys++; + entry.object_omap_bytes += value.length(); + return ObjectStore::omap_iter_ret_t::NEXT; + }; + + auto get_keys = [FNAME, this, &progress, &obj, &entry, &pg, start_from, callback]() + -> interruptible_future + { + DEBUGDPP("op: {}, obj: {}, progress: {} scanning omap keys", + pg, *this, obj, progress); + return pg.shard_services.get_store().omap_iterate( + pg.get_collection_ref(), + obj, + start_from, + callback + ).safe_then([FNAME, this, &obj, &progress, &entry, &pg](auto result) { + assert(result == ObjectStore::omap_iter_ret_t::NEXT); + DEBUGDPP("op: {}, obj: {}, progress: {} omap done", + pg, *this, obj, progress); + progress.keys_done = true; + entry.omap_digest = progress.omap_hash.digest(); + entry.omap_digest_present = true; + + if ((entry.object_omap_keys > + local_conf().get_val( + "osd_deep_scrub_large_omap_object_key_threshold")) || + (entry.object_omap_bytes > + local_conf().get_val( + "osd_deep_scrub_large_omap_object_value_sum_threshold"))) { + entry.large_omap_object_found = true; + entry.large_omap_object_key_count = entry.object_omap_keys; + ret.has_large_omap_object_errors = true; + } + }).handle_error( + ct_error::all_same_way([FNAME, this, &obj, &progress, &entry, &pg] + (auto e) + { + DEBUGDPP("op: {}, obj: {}, progress: {} error reading omap {}", + pg, *this, obj, progress, e); + progress.keys_done = true; + entry.read_error = true; + return seastar::now(); + }) + ).then([] { + return seastar::make_ready_future( + seastar::stop_iteration::no); + }); + }; + + if (progress.offset) { + co_return co_await store_read(); + } else if (!progress.header_done) { + co_return co_await get_header(); + } else if (!progress.keys_done) { + co_return co_await get_keys(); + } else { + DEBUGDPP("op: {}, obj: {}, progress: {} done", + pg, *this, obj, progress); + co_return seastar::stop_iteration::yes; + } + }).finally([progress_ref=std::move(progress_ref)] {}); } template class ScrubAsyncOpT; -- 2.39.5