#include "crimson/osd/osd_connection_priv.h"
#include "messages/MOSDRepScrubMap.h"
#include "scrub_events.h"
+#include "crimson/os/futurized_store.h"
SET_SUBSYS(osd);
auto &entry = ret.objects[obj.hobj];
auto progress_ref = std::make_unique<obj_scrub_progress_t>();
auto &progress = *progress_ref;
- return interruptor::repeat(
+
+ co_await interruptor::repeat(
[FNAME, this, &progress, &obj, &entry, &pg]()
- -> interruptible_future<seastar::stop_iteration> {
- if (progress.offset) {
- DEBUGDPP("op: {}, obj: {}, progress: {} scanning data",
- pg, *this, obj, progress);
- const auto stride = local_conf().get_val<Option::size_t>(
- "osd_deep_scrub_stride");
- return pg.shard_services.get_store().read(
- pg.get_collection_ref(),
- obj,
- *(progress.offset),
- stride
- ).safe_then([this, FNAME, stride, &obj, &progress, &entry, &pg](auto bl) {
- size_t offset = *progress.offset;
- DEBUGDPP("op: {}, obj: {}, progress: {} got offset {}",
- pg, *this, obj, progress, offset);
- progress.data_hash << bl;
- if (bl.length() < stride) {
- progress.offset = std::nullopt;
- entry.digest = progress.data_hash.digest();
- entry.digest_present = true;
- } else {
- ceph_assert(stride == bl.length());
- *(progress.offset) += stride;
- }
- }).handle_error(
- ct_error::all_same_way([&progress, &entry](auto e) {
- entry.read_error = true;
- progress.offset = std::nullopt;
- return seastar::now();
- })
- ).then([] {
- return interruptor::make_interruptible(
- seastar::make_ready_future<seastar::stop_iteration>(
- seastar::stop_iteration::no));
- });
- } else if (!progress.header_done) {
- DEBUGDPP("op: {}, obj: {}, progress: {} scanning omap header",
- pg, *this, obj, progress);
- return pg.shard_services.get_store().omap_get_header(
- pg.get_collection_ref(),
- obj
- ).safe_then([&progress](auto bl) {
- progress.omap_hash << bl;
- }).handle_error(
- ct_error::enodata::handle([] { return seastar::now(); }),
- ct_error::all_same_way([&entry](auto e) {
- entry.read_error = true;
- return seastar::now();
- })
- ).then([&progress] {
- progress.header_done = true;
- return interruptor::make_interruptible(
- seastar::make_ready_future<seastar::stop_iteration>(
- seastar::stop_iteration::no));
- });
- } else if (!progress.keys_done) {
- DEBUGDPP("op: {}, obj: {}, progress: {} scanning omap keys",
- pg, *this, obj, progress);
- return pg.shard_services.get_store().omap_get_values(
- pg.get_collection_ref(),
- obj,
- progress.next_key
- ).safe_then([FNAME, this, &obj, &progress, &entry, &pg](auto result) {
- const auto &[done, omap] = result;
- DEBUGDPP("op: {}, obj: {}, progress: {} got {} keys",
- pg, *this, obj, progress, omap.size());
- for (const auto &p : omap) {
- bufferlist bl;
- encode(p.first, bl);
- encode(p.second, bl);
- progress.omap_hash << bl;
- entry.object_omap_keys++;
- entry.object_omap_bytes += p.second.length();
- }
- if (done) {
- DEBUGDPP("op: {}, obj: {}, progress: {} omap done",
- pg, *this, obj, progress);
- progress.keys_done = true;
- entry.omap_digest = progress.omap_hash.digest();
- entry.omap_digest_present = true;
-
- if ((entry.object_omap_keys >
- local_conf().get_val<uint64_t>(
- "osd_deep_scrub_large_omap_object_key_threshold")) ||
- (entry.object_omap_bytes >
- local_conf().get_val<Option::size_t>(
- "osd_deep_scrub_large_omap_object_value_sum_threshold"))) {
- entry.large_omap_object_found = true;
- entry.large_omap_object_key_count = entry.object_omap_keys;
- ret.has_large_omap_object_errors = true;
- }
- } else {
- ceph_assert(!omap.empty()); // omap_get_values invariant
- DEBUGDPP("op: {}, obj: {}, progress: {} omap not done, next {}",
- pg, *this, obj, progress, omap.crbegin()->first);
- progress.next_key = omap.crbegin()->first;
- }
- }).handle_error(
- ct_error::all_same_way([FNAME, this, &obj, &progress, &entry, &pg]
- (auto e) {
- DEBUGDPP("op: {}, obj: {}, progress: {} error reading omap {}",
- pg, *this, obj, progress, e);
- progress.keys_done = true;
- entry.read_error = true;
- return seastar::now();
- })
- ).then([] {
- return interruptor::make_interruptible(
- seastar::make_ready_future<seastar::stop_iteration>(
- seastar::stop_iteration::no));
- });
- } else {
- DEBUGDPP("op: {}, obj: {}, progress: {} done",
- pg, *this, obj, progress);
- return interruptor::make_interruptible(
- seastar::make_ready_future<seastar::stop_iteration>(
- seastar::stop_iteration::yes));
- }
- }).finally([progress_ref=std::move(progress_ref)] {});
+ -> interruptible_future<seastar::stop_iteration>
+ {
+ auto store_read = [FNAME, this, &progress, &obj, &entry, &pg]()
+ -> interruptible_future<seastar::stop_iteration>
+ {
+ DEBUGDPP("op: {}, obj: {}, progress: {} scanning data",
+ pg, *this, obj, progress);
+ const auto stride = local_conf().get_val<Option::size_t>(
+ "osd_deep_scrub_stride");
+ return pg.shard_services.get_store().read(
+ pg.get_collection_ref(),
+ obj,
+ *(progress.offset),
+ stride
+ ).safe_then([this, FNAME, stride, &obj, &progress, &entry, &pg](auto bl) {
+ size_t offset = *progress.offset;
+ DEBUGDPP("op: {}, obj: {}, progress: {} got offset {}",
+ pg, *this, obj, progress, offset);
+ progress.data_hash << bl;
+ if (bl.length() < stride) {
+ progress.offset = std::nullopt;
+ entry.digest = progress.data_hash.digest();
+ entry.digest_present = true;
+ } else {
+ ceph_assert(stride == bl.length());
+ *(progress.offset) += stride;
+ }
+ }).handle_error(
+ ct_error::all_same_way([&progress, &entry](auto e) {
+ entry.read_error = true;
+ progress.offset = std::nullopt;
+ return seastar::now();
+ })
+ ).then([] {
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::no);
+ });
+ };
+
+ auto get_header = [FNAME, this, &progress, &obj, &entry, &pg]()
+ -> interruptible_future<seastar::stop_iteration>
+ {
+ DEBUGDPP("op: {}, obj: {}, progress: {} scanning omap header",
+ pg, *this, obj, progress);
+ return pg.shard_services.get_store().omap_get_header(
+ pg.get_collection_ref(),
+ obj
+ ).safe_then([&progress](auto bl) {
+ progress.omap_hash << bl;
+ }).handle_error(
+ ct_error::enodata::handle([] { return seastar::now(); }),
+ ct_error::all_same_way([&entry](auto e) {
+ entry.read_error = true;
+ return seastar::now();
+ })
+ ).then([&progress] {
+ progress.header_done = true;
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::no);
+ });
+ };
+
+ ObjectStore::omap_iter_seek_t start_from;
+ start_from.seek_position = progress.next_key.has_value() ?
+ progress.next_key.value() : std::string{};
+ start_from.seek_type = ObjectStore::omap_iter_seek_t::UPPER_BOUND;
+
+ std::function<ObjectStore::omap_iter_ret_t(std::string_view, std::string_view)> callback =
+ [&progress, &entry] (std::string_view key, std::string_view value)
+ {
+ bufferlist bl;
+ encode(key, bl);
+ encode(value, bl);
+ progress.omap_hash << bl;
+ entry.object_omap_keys++;
+ entry.object_omap_bytes += value.length();
+ return ObjectStore::omap_iter_ret_t::NEXT;
+ };
+
+ auto get_keys = [FNAME, this, &progress, &obj, &entry, &pg, start_from, callback]()
+ -> interruptible_future<seastar::stop_iteration>
+ {
+ DEBUGDPP("op: {}, obj: {}, progress: {} scanning omap keys",
+ pg, *this, obj, progress);
+ return pg.shard_services.get_store().omap_iterate(
+ pg.get_collection_ref(),
+ obj,
+ start_from,
+ callback
+ ).safe_then([FNAME, this, &obj, &progress, &entry, &pg](auto result) {
+ assert(result == ObjectStore::omap_iter_ret_t::NEXT);
+ DEBUGDPP("op: {}, obj: {}, progress: {} omap done",
+ pg, *this, obj, progress);
+ progress.keys_done = true;
+ entry.omap_digest = progress.omap_hash.digest();
+ entry.omap_digest_present = true;
+
+ if ((entry.object_omap_keys >
+ local_conf().get_val<uint64_t>(
+ "osd_deep_scrub_large_omap_object_key_threshold")) ||
+ (entry.object_omap_bytes >
+ local_conf().get_val<Option::size_t>(
+ "osd_deep_scrub_large_omap_object_value_sum_threshold"))) {
+ entry.large_omap_object_found = true;
+ entry.large_omap_object_key_count = entry.object_omap_keys;
+ ret.has_large_omap_object_errors = true;
+ }
+ }).handle_error(
+ ct_error::all_same_way([FNAME, this, &obj, &progress, &entry, &pg]
+ (auto e)
+ {
+ DEBUGDPP("op: {}, obj: {}, progress: {} error reading omap {}",
+ pg, *this, obj, progress, e);
+ progress.keys_done = true;
+ entry.read_error = true;
+ return seastar::now();
+ })
+ ).then([] {
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::no);
+ });
+ };
+
+ if (progress.offset) {
+ co_return co_await store_read();
+ } else if (!progress.header_done) {
+ co_return co_await get_header();
+ } else if (!progress.keys_done) {
+ co_return co_await get_keys();
+ } else {
+ DEBUGDPP("op: {}, obj: {}, progress: {} done",
+ pg, *this, obj, progress);
+ co_return seastar::stop_iteration::yes;
+ }
+ }).finally([progress_ref=std::move(progress_ref)] {});
}
template class ScrubAsyncOpT<ScrubScan>;