LOG_PREFIX(ReplicatedRecoveryBackend::handle_pull_response);
if (pg.can_discard_replica_op(*m)) {
DEBUGDPP("discarding {}", pg, *m);
- return seastar::now();
+ co_return;
}
- const PushOp& push_op = m->pushes[0]; //TODO: only one push per message for now.
+ PushOp& push_op = m->pushes[0]; //TODO: only one push per message for now.
if (push_op.version == eversion_t()) {
// replica doesn't have it!
pg.get_recovery_handler()->on_failed_recover({ m->from }, push_op.soid,
get_recovering(push_op.soid).pull_info->recovery_info.version);
- return seastar::make_exception_future<>(
- std::runtime_error(fmt::format(
- "Error on pushing side {} when pulling obj {}",
- m->from, push_op.soid)));
+ throw std::runtime_error(
+ fmt::format(
+ "Error on pushing side {} when pulling obj {}",
+ m->from, push_op.soid));
}
DEBUGDPP("{}", pg, *m);
- return seastar::do_with(PullOp(), [this, m](auto& response) {
- return seastar::do_with(ceph::os::Transaction(), m.get(),
- [FNAME, this, &response](auto& t, auto& m) {
- pg_shard_t from = m->from;
- PushOp& push_op = m->pushes[0]; // only one push per message for now
- return _handle_pull_response(from, push_op, &response, &t
- ).then_interruptible(
- [FNAME, this, &t](bool complete) {
- epoch_t epoch_frozen = pg.get_osdmap_epoch();
- DEBUGDPP("submitting transaction", pg);
- return shard_services.get_store().do_transaction(coll, std::move(t))
- .then([this, epoch_frozen, complete,
- last_complete = pg.get_info().last_complete] {
- pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete);
- return seastar::make_ready_future<bool>(complete);
- });
- });
- }).then_interruptible([this, m, &response](bool complete) {
- if (complete) {
- auto& push_op = m->pushes[0];
- get_recovering(push_op.soid).set_pulled();
- return seastar::make_ready_future<>();
- } else {
- auto reply = crimson::make_message<MOSDPGPull>();
- reply->from = pg.get_pg_whoami();
- reply->set_priority(m->get_priority());
- reply->pgid = pg.get_info().pgid;
- reply->map_epoch = m->map_epoch;
- reply->min_epoch = m->min_epoch;
- reply->set_pulls({std::move(response)});
- return shard_services.send_to_osd(m->from.osd, std::move(reply), pg.get_osdmap_epoch());
- }
- });
- });
+ PullOp response;
+ ceph::os::Transaction t;
+
+ pg_shard_t from = m->from;
+
+ const bool complete = co_await _handle_pull_response(
+ from, push_op, &response, &t);
+
+ epoch_t epoch_frozen = pg.get_osdmap_epoch();
+ DEBUGDPP("submitting transaction", pg);
+ co_await interruptor::make_interruptible(
+ shard_services.get_store().do_transaction(coll, std::move(t)));
+ pg.get_recovery_handler()->_committed_pushed_object(
+ epoch_frozen, pg.get_info().last_complete);
+
+ if (complete) {
+ get_recovering(push_op.soid).set_pulled();
+ } else {
+ auto reply = crimson::make_message<MOSDPGPull>();
+ reply->from = pg.get_pg_whoami();
+ reply->set_priority(m->get_priority());
+ reply->pgid = pg.get_info().pgid;
+ reply->map_epoch = m->map_epoch;
+ reply->min_epoch = m->min_epoch;
+ reply->set_pulls({std::move(response)});
+ co_await interruptor::make_interruptible(
+ shard_services.send_to_osd(
+ m->from.osd, std::move(reply), pg.get_osdmap_epoch()));
+ }
}
RecoveryBackend::interruptible_future<>