);
}
-void PGRecovery::on_local_recover(
+RecoveryBackend::interruptible_future<>
+PGRecovery::on_local_recover(
const hobject_t& soid,
const ObjectRecoveryInfo& recovery_info,
const bool is_delete,
ceph_abort("mark_unfound_lost (LOST_REVERT) is not implemented yet");
}
}
- pg->get_peering_state().recover_got(soid,
- recovery_info.version, is_delete, t);
-
- if (pg->is_primary()) {
- if (!is_delete) {
- auto& obc = pg->get_recovery_backend()->get_recovering(soid).obc; //TODO: move to pg backend?
- obc->obs.exists = true;
- obc->obs.oi = recovery_info.oi;
+
+ return RecoveryBackend::interruptor::async(
+ [soid, &recovery_info, is_delete, &t, this] {
+ if (soid.is_snap()) {
+ OSDriver::OSTransaction _t(pg->get_osdriver().get_transaction(&t));
+ int r = pg->get_snap_mapper().remove_oid(soid, &_t);
+ assert(r == 0 || r == -ENOENT);
+
+ if (!is_delete) {
+ set<snapid_t> snaps;
+ auto p = recovery_info.ss.clone_snaps.find(soid.snap);
+ assert(p != recovery_info.ss.clone_snaps.end());
+ snaps.insert(p->second.begin(), p->second.end());
+ pg->get_snap_mapper().add_oid(recovery_info.soid, snaps, &_t);
+ }
}
- if (!pg->is_unreadable_object(soid)) {
- pg->get_recovery_backend()->get_recovering(soid).set_readable();
+
+ pg->get_peering_state().recover_got(soid,
+ recovery_info.version, is_delete, t);
+
+ if (pg->is_primary()) {
+ if (!is_delete) {
+ auto& obc = pg->get_recovery_backend()->get_recovering(soid).obc; //TODO: move to pg backend?
+ obc->obs.exists = true;
+ obc->obs.oi = recovery_info.oi;
+ }
+ if (!pg->is_unreadable_object(soid)) {
+ pg->get_recovery_backend()->get_recovering(soid).set_readable();
+ }
+ pg->publish_stats_to_osd();
}
- pg->publish_stats_to_osd();
- }
+ });
}
void PGRecovery::on_global_recover (
epoch_t epoch_frozen)
{
logger().debug("{}", __func__);
- ceph::os::Transaction t;
- pg.get_recovery_handler()->on_local_recover(soid, _recovery_info, is_delete, t);
- logger().debug("ReplicatedRecoveryBackend::on_local_recover_persist: do_transaction...");
- return interruptor::make_interruptible(
- shard_services.get_store().do_transaction(coll, std::move(t)))
- .then_interruptible(
- [this, epoch_frozen, last_complete = pg.get_info().last_complete] {
- pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete);
- return seastar::make_ready_future<>();
+ return seastar::do_with(
+ ceph::os::Transaction(),
+ [this, soid, &_recovery_info, is_delete, epoch_frozen](auto &t) {
+ return pg.get_recovery_handler()->on_local_recover(
+ soid, _recovery_info, is_delete, t
+ ).then_interruptible([this, &t] {
+ logger().debug("ReplicatedRecoveryBackend::{}: do_transaction...", __func__);
+ return shard_services.get_store().do_transaction(coll, std::move(t));
+ }).then_interruptible(
+ [this, epoch_frozen, last_complete = pg.get_info().last_complete] {
+ pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete);
+ return seastar::make_ready_future<>();
+ });
});
}
}
return seastar::make_ready_future<>();
}).safe_then_interruptible([this, soid, epoch_to_freeze, need] {
- ObjectRecoveryInfo recovery_info;
- recovery_info.soid = soid;
- recovery_info.version = need;
- return on_local_recover_persist(soid, recovery_info,
- true, epoch_to_freeze);
- }, PGBackend::load_metadata_ertr::all_same_way(
- [this, soid, epoch_to_freeze, need] (auto e) {
- ObjectRecoveryInfo recovery_info;
+ return seastar::do_with(
+ ObjectRecoveryInfo(),
+ [soid, need, this, epoch_to_freeze](auto &recovery_info) {
recovery_info.soid = soid;
recovery_info.version = need;
return on_local_recover_persist(soid, recovery_info,
- true, epoch_to_freeze);
+ true, epoch_to_freeze);
+ });
+ }, PGBackend::load_metadata_ertr::all_same_way(
+ [this, soid, epoch_to_freeze, need] (auto e) {
+ return seastar::do_with(
+ ObjectRecoveryInfo(),
+ [soid, need, this, epoch_to_freeze](auto &recovery_info) {
+ recovery_info.soid = soid;
+ recovery_info.version = need;
+ return on_local_recover_persist(soid, recovery_info,
+ true, epoch_to_freeze);
+ });
})
);
}
push_op.attrset, std::move(push_op.omap_entries), t)
.then_interruptible(
[this, response, &pull_info, &push_op, complete,
- t, bytes_recovered=data.length()] {
+ t, bytes_recovered=data.length()]()
+ -> RecoveryBackend::interruptible_future<bool> {
pull_info.stat.num_keys_recovered += push_op.omap_entries.size();
pull_info.stat.num_bytes_recovered += bytes_recovered;
if (complete) {
pull_info.stat.num_objects_recovered++;
- pg.get_recovery_handler()->on_local_recover(
+ return pg.get_recovery_handler()->on_local_recover(
push_op.soid, get_recovering(push_op.soid).pull_info->recovery_info,
- false, *t);
- return true;
+ false, *t
+ ).then_interruptible([] {
+ return true;
+ });
} else {
response->soid = push_op.soid;
response->recovery_info = pull_info.recovery_info;
response->recovery_progress = pull_info.recovery_progress;
- return false;
+ return seastar::make_ready_future<bool>(false);
}
});
});
.then_interruptible(
[this, complete, &push_op, t] {
if (complete) {
- pg.get_recovery_handler()->on_local_recover(
+ return pg.get_recovery_handler()->on_local_recover(
push_op.recovery_info.soid, push_op.recovery_info,
false, *t);
}
+ return RecoveryBackend::interruptor::now();
});
}