PushOp* push_op)
{
if (progress.omap_complete) {
- return seastar::make_ready_future<>();
+ co_return;
}
- return seastar::repeat([&new_progress, &max_len, push_op, &oid, this] {
- return shard_services.get_store().omap_get_values(
- coll, ghobject_t{oid}, nullopt_if_empty(new_progress.omap_recovered_to)
- ).safe_then([&new_progress, &max_len, push_op](const auto& ret) {
- const auto& [done, kvs] = ret;
- bool stop = done;
- // assuming "values.empty() only if done" holds here!
- for (const auto& [key, value] : kvs) {
- if (is_too_many_entries_per_chunk(push_op)) {
- stop = true;
- break;
- }
- if (const uint64_t entry_size = key.size() + value.length();
- entry_size > max_len) {
- stop = true;
- break;
- } else {
- max_len -= std::min(max_len, entry_size);
- }
- push_op->omap_entries.emplace(key, value);
- }
- if (!push_op->omap_entries.empty()) {
- // we iterate in order
- new_progress.omap_recovered_to = std::rbegin(push_op->omap_entries)->first;
- }
- if (done) {
+
+ ObjectStore::omap_iter_seek_t start_from;
+ start_from.seek_position = new_progress.omap_recovered_to.empty() ?
+ std::string{} : new_progress.omap_recovered_to;
+ start_from.seek_type = ObjectStore::omap_iter_seek_t::UPPER_BOUND;
+
+ std::function<ObjectStore::omap_iter_ret_t(std::string_view, std::string_view)> callback =
+ [&max_len, push_op] (std::string_view key, std::string_view value)
+ {
+ uint64_t entry_size = key.size() + value.length();
+ if (is_too_many_entries_per_chunk(push_op) || entry_size > max_len) {
+ return ObjectStore::omap_iter_ret_t::STOP;
+ }
+
+ max_len -= std::min(max_len, entry_size);
+ ceph::bufferlist bl;
+ bl.append(value);
+ push_op->omap_entries.emplace(key, bl);
+ return ObjectStore::omap_iter_ret_t::NEXT;
+ };
+
+ co_await interruptor::make_interruptible(
+ shard_services.get_store().omap_iterate(
+ coll, ghobject_t{oid}, start_from, callback
+ ).safe_then([&new_progress](auto ret) {
+ if (ret == ObjectStore::omap_iter_ret_t::NEXT) {
new_progress.omap_complete = true;
+ } else {
+ new_progress.omap_complete = false;
}
- return seastar::make_ready_future<seastar::stop_iteration>(
- stop ? seastar::stop_iteration::yes : seastar::stop_iteration::no
- );
- }, crimson::os::FuturizedStore::Shard::read_errorator::assert_all(fmt::format(
- "{} ReplicatedRecoveryBackend::read_omap_for_push_op error with {}", pg, oid).c_str()));
- });
+ }).handle_error(
+ crimson::os::FuturizedStore::Shard::read_errorator::assert_all(fmt::format(
+ "{} ReplicatedRecoveryBackend::read_omap_for_push_op error with {}", pg, oid).c_str())
+ )
+ );
}
std::vector<pg_shard_t>