if (pull_info.recovery_info.version == eversion_t())
pull_info.recovery_info.version = push_op.version;
- auto prepare_waiter = interruptor::make_interruptible(
- seastar::make_ready_future<>());
if (pull_info.recovery_progress.first) {
- prepare_waiter = pg.obc_loader.with_obc<RWState::RWNONE>(
+ auto fut = pg.obc_loader.with_obc<RWState::RWNONE>(
pull_info.recovery_info.soid,
[FNAME, this, &pull_info, &recovery_waiter, &push_op](auto, auto obc) {
pull_info.obc = obc;
}
return crimson::osd::PG::load_obc_ertr::now();
}, false).handle_error_interruptible(crimson::ct_error::assert_all{});
+ co_await std::move(fut);
};
- return prepare_waiter.then_interruptible(
- [FNAME, this, &pull_info, &push_op, t, response]() mutable {
- const bool first = pull_info.recovery_progress.first;
- pull_info.recovery_progress = push_op.after_progress;
- DEBUGDPP("new recovery_info {}, new progress {}",
- pg, pull_info.recovery_info, pull_info.recovery_progress);
- interval_set<uint64_t> data_zeros;
- {
- uint64_t offset = push_op.before_progress.data_recovered_to;
- uint64_t length = (push_op.after_progress.data_recovered_to -
- push_op.before_progress.data_recovered_to);
- if (length) {
- data_zeros.insert(offset, length);
- }
+
+ const bool first = pull_info.recovery_progress.first;
+ pull_info.recovery_progress = push_op.after_progress;
+ DEBUGDPP("new recovery_info {}, new progress {}",
+ pg, pull_info.recovery_info, pull_info.recovery_progress);
+ interval_set<uint64_t> data_zeros;
+ {
+ uint64_t offset = push_op.before_progress.data_recovered_to;
+ uint64_t length = (push_op.after_progress.data_recovered_to -
+ push_op.before_progress.data_recovered_to);
+ if (length) {
+ data_zeros.insert(offset, length);
}
- auto [usable_intervals, data] =
- trim_pushed_data(pull_info.recovery_info.copy_subset,
- push_op.data_included, push_op.data);
- bool complete = pull_info.is_complete();
- bool clear_omap = !push_op.before_progress.omap_complete;
- return submit_push_data(pull_info.recovery_info,
- first, complete, clear_omap,
+ }
+ auto [usable_intervals, data] =
+ trim_pushed_data(pull_info.recovery_info.copy_subset,
+ push_op.data_included, push_op.data);
+ bool complete = pull_info.is_complete();
+ bool clear_omap = !push_op.before_progress.omap_complete;
+ co_await submit_push_data(pull_info.recovery_info,
+ first, complete, clear_omap,
std::move(data_zeros), std::move(usable_intervals),
std::move(data), std::move(push_op.omap_header),
- push_op.attrset, std::move(push_op.omap_entries), t)
- .then_interruptible(
- [this, response, &pull_info, &push_op, complete,
- t, bytes_recovered=data.length()]()
- -> RecoveryBackend::interruptible_future<bool> {
- pull_info.stat.num_keys_recovered += push_op.omap_entries.size();
- pull_info.stat.num_bytes_recovered += bytes_recovered;
+ push_op.attrset, std::move(push_op.omap_entries), t);
- if (complete) {
- pull_info.stat.num_objects_recovered++;
- return pg.get_recovery_handler()->on_local_recover(
- push_op.soid, get_recovering(push_op.soid).pull_info->recovery_info,
- false, *t
- ).then_interruptible([] {
- return true;
- });
- } else {
- response->soid = push_op.soid;
- response->recovery_info = pull_info.recovery_info;
- response->recovery_progress = pull_info.recovery_progress;
- return seastar::make_ready_future<bool>(false);
- }
- });
- });
+ const auto bytes_recovered = data.length();
+ pull_info.stat.num_keys_recovered += push_op.omap_entries.size();
+ pull_info.stat.num_bytes_recovered += bytes_recovered;
+
+ if (complete) {
+ pull_info.stat.num_objects_recovered++;
+ co_await pg.get_recovery_handler()->on_local_recover(
+ push_op.soid, get_recovering(push_op.soid).pull_info->recovery_info,
+ false, *t
+ );
+ co_return true;
+ } else {
+ response->soid = push_op.soid;
+ response->recovery_info = pull_info.recovery_info;
+ response->recovery_progress = pull_info.recovery_progress;
+ co_return false;
+ }
}
void ReplicatedRecoveryBackend::recalc_subsets(