}
return on_stop();
}
+
+ template <typename Func>
+ void for_each_recovery_waiter(Func &&f) {
+ for (auto &[soid, recovery_waiter] : recovering) {
+ std::forward<Func>(f)(soid, recovery_waiter);
+ }
+ }
protected:
crimson::osd::PG& pg;
crimson::osd::ShardServices& shard_services;
pulled.reset();
}
}
+ void repeat_pull() {
+ ceph_assert(pulled);
+ pulled->set_exception(crimson::ct_error::eagain::exception_ptr());
+ }
+ bool is_pulling() const {
+ return (bool)pulled;
+ }
void set_push_failed(pg_shard_t shard, std::exception_ptr e) {
auto it = pushes.find(shard);
if (it != pushes.end()) {
// object is not missing, don't pull
return seastar::make_ready_future<>();
}
- return pg.obc_loader.with_obc<RWState::RWREAD>(soid.get_head(),
- [this, soid, need](auto head, auto) {
- PullOp pull_op;
- auto& recovery_waiter = get_recovering(soid);
- recovery_waiter.pull_info =
- std::make_optional<RecoveryBackend::pull_info_t>();
- auto& pull_info = *recovery_waiter.pull_info;
- prepare_pull(head, pull_op, pull_info, soid, need);
- auto msg = crimson::make_message<MOSDPGPull>();
- msg->from = pg.get_pg_whoami();
- msg->set_priority(pg.get_recovery_op_priority());
- msg->pgid = pg.get_pgid();
- msg->map_epoch = pg.get_osdmap_epoch();
- msg->min_epoch = pg.get_last_peering_reset();
- msg->set_pulls({std::move(pull_op)});
- return shard_services.send_to_osd(
- pull_info.from.osd,
- std::move(msg),
- pg.get_osdmap_epoch());
- }).si_then([this, soid] {
- auto& recovery_waiter = get_recovering(soid);
- return recovery_waiter.wait_for_pull();
+ return interruptor::repeat_eagain([this, soid, need] {
+ using prepare_pull_iertr =
+ crimson::osd::ObjectContextLoader::load_obc_iertr::extend<
+ crimson::ct_error::eagain>;
+ return pg.obc_loader.with_obc<RWState::RWREAD>(soid.get_head(),
+ [this, soid, need](auto head, auto) {
+ PullOp pull_op;
+ auto& recovery_waiter = get_recovering(soid);
+ recovery_waiter.pull_info =
+ std::make_optional<RecoveryBackend::pull_info_t>();
+ auto& pull_info = *recovery_waiter.pull_info;
+ prepare_pull(head, pull_op, pull_info, soid, need);
+ auto msg = crimson::make_message<MOSDPGPull>();
+ msg->from = pg.get_pg_whoami();
+ msg->set_priority(pg.get_recovery_op_priority());
+ msg->pgid = pg.get_pgid();
+ msg->map_epoch = pg.get_osdmap_epoch();
+ msg->min_epoch = pg.get_last_peering_reset();
+ msg->set_pulls({std::move(pull_op)});
+ return shard_services.send_to_osd(
+ pull_info.from.osd,
+ std::move(msg),
+ pg.get_osdmap_epoch());
+ }).si_then([this, soid]() -> prepare_pull_iertr::future<> {
+ auto& recovery_waiter = get_recovering(soid);
+ return recovery_waiter.wait_for_pull();
+ });
}).handle_error_interruptible(
crimson::ct_error::assert_all("unexpected error")
);