}));
}
+static std::optional<std::string> nullopt_if_empty(const std::string& s)
+{
+ return s.empty() ? std::nullopt : std::make_optional(s);
+}
+
+static bool is_too_many_entries_per_chunk(const PushOp* push_op)
+{
+ const uint64_t entries_per_chunk =
+ crimson::common::local_conf()->osd_recovery_max_omap_entries_per_chunk;
+ if (!entries_per_chunk) {
+ // the limit is disabled
+ return false;
+ }
+ return push_op->omap_entries.size() >= entries_per_chunk;
+}
+
RecoveryBackend::interruptible_future<>
ReplicatedRecoveryBackend::read_omap_for_push_op(
const hobject_t& oid,
if (progress.omap_complete) {
return seastar::make_ready_future<>();
}
- return shard_services.get_store().get_omap_iterator(coll, ghobject_t{oid})
- .then([&progress, &new_progress, max_len, push_op](auto omap_iter) {
- return omap_iter->lower_bound(progress.omap_recovered_to).then(
- [omap_iter, &new_progress, max_len, push_op] {
- return seastar::do_until([omap_iter, &new_progress, max_len, push_op] {
- if (!omap_iter->valid()) {
- new_progress.omap_complete = true;
- return true;
- }
- if (push_op->omap_entries.empty()) {
- return false;
+ return seastar::repeat([&new_progress, max_len, push_op, &oid, this] {
+ return shard_services.get_store().omap_get_values(
+ coll, ghobject_t{oid}, nullopt_if_empty(new_progress.omap_recovered_to)
+ ).safe_then([&new_progress, max_len, push_op](const auto& ret) {
+ const auto& [done, kvs] = ret;
+ bool stop = done;
+ // assuming "values.empty() only if done" holds here!
+ for (const auto& [key, value] : kvs) {
+ if (is_too_many_entries_per_chunk(push_op)) {
+ stop = true;
+ break;
}
- if (const uint64_t entries_per_chunk =
- crimson::common::local_conf()->osd_recovery_max_omap_entries_per_chunk;
- entries_per_chunk > 0 &&
- push_op->omap_entries.size() >= entries_per_chunk) {
- new_progress.omap_recovered_to = omap_iter->key();
- return true;
- }
- if (omap_iter->key().size() + omap_iter->value().length() > *max_len) {
- new_progress.omap_recovered_to = omap_iter->key();
- return true;
- }
- return false;
- },
- [omap_iter, max_len, push_op] {
- push_op->omap_entries.emplace(omap_iter->key(), omap_iter->value());
- if (const uint64_t entry_size =
- omap_iter->key().size() + omap_iter->value().length();
+ if (const uint64_t entry_size = key.size() + value.length();
entry_size > *max_len) {
- *max_len -= entry_size;
+ stop = true;
+ break;
} else {
- *max_len = 0;
+ *max_len -= std::min(*max_len, entry_size);
}
- return omap_iter->next();
- });
- });
+ push_op->omap_entries.emplace(key, value);
+ }
+ if (!push_op->omap_entries.empty()) {
+ // we iterate in order
+ new_progress.omap_recovered_to = std::rbegin(push_op->omap_entries)->first;
+ }
+ if (done) {
+ new_progress.omap_complete = true;
+ }
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ stop ? seastar::stop_iteration::yes : seastar::stop_iteration::no
+ );
+ }, crimson::os::FuturizedStore::read_errorator::assert_all{});
});
}