return maybe_pull_missing_obj(soid, need).then_interruptible([this, soid, need] {
logger().debug("recover_object: loading obc: {}", soid);
return pg.obc_loader.with_obc<RWState::RWREAD>(soid,
- [this, soid, need](auto, auto obc) {
+ [this, soid, need](auto head, auto obc) {
logger().debug("recover_object: loaded obc: {}", obc->obs.oi.soid);
auto& recovery_waiter = get_recovering(soid);
recovery_waiter.obc = obc;
recovery_waiter.obc->wait_recovery_read();
- return maybe_push_shards(soid, need);
+ return maybe_push_shards(head, soid, need);
}).handle_error_interruptible(
crimson::osd::PG::load_obc_ertr::all_same_way([soid](auto& code) {
// TODO: may need eio handling?
RecoveryBackend::interruptible_future<>
ReplicatedRecoveryBackend::maybe_push_shards(
+ const crimson::osd::ObjectContextRef &head_obc,
const hobject_t& soid,
eversion_t need)
{
return seastar::do_with(
get_shards_to_push(soid),
- [this, need, soid](auto &shards) {
+ [this, need, soid, head_obc](auto &shards) {
return interruptor::parallel_for_each(
shards,
- [this, need, soid](auto shard) {
- return prep_push(soid, need, shard).then_interruptible([this, soid, shard](auto push) {
+ [this, need, soid, head_obc](auto shard) {
+ return prep_push(head_obc, soid, need, shard
+ ).then_interruptible([this, soid, shard](auto push) {
auto msg = crimson::make_message<MOSDPGPush>();
msg->from = pg.get_pg_whoami();
msg->pgid = pg.get_pgid();
if (!local_missing.is_missing(soid)) {
return seastar::make_ready_future<>();
}
- PullOp pull_op;
- auto& recovery_waiter = get_recovering(soid);
- recovery_waiter.pull_info =
- std::make_optional<RecoveryBackend::pull_info_t>();
- auto& pull_info = *recovery_waiter.pull_info;
- prepare_pull(pull_op, pull_info, soid, need);
- auto msg = crimson::make_message<MOSDPGPull>();
- msg->from = pg.get_pg_whoami();
- msg->set_priority(pg.get_recovery_op_priority());
- msg->pgid = pg.get_pgid();
- msg->map_epoch = pg.get_osdmap_epoch();
- msg->min_epoch = pg.get_last_peering_reset();
- msg->set_pulls({std::move(pull_op)});
- return interruptor::make_interruptible(
- shard_services.send_to_osd(
+ return pg.obc_loader.with_obc<RWState::RWREAD>(soid.get_head(),
+ [this, soid, need](auto head, auto) {
+ PullOp pull_op;
+ auto& recovery_waiter = get_recovering(soid);
+ recovery_waiter.pull_info =
+ std::make_optional<RecoveryBackend::pull_info_t>();
+ auto& pull_info = *recovery_waiter.pull_info;
+ prepare_pull(head, pull_op, pull_info, soid, need);
+ auto msg = crimson::make_message<MOSDPGPull>();
+ msg->from = pg.get_pg_whoami();
+ msg->set_priority(pg.get_recovery_op_priority());
+ msg->pgid = pg.get_pgid();
+ msg->map_epoch = pg.get_osdmap_epoch();
+ msg->min_epoch = pg.get_last_peering_reset();
+ msg->set_pulls({std::move(pull_op)});
+ return shard_services.send_to_osd(
pull_info.from.osd,
std::move(msg),
- pg.get_osdmap_epoch()
- )).then_interruptible([&recovery_waiter] {
+ pg.get_osdmap_epoch());
+ }).si_then([this, soid] {
+ auto& recovery_waiter = get_recovering(soid);
return recovery_waiter.wait_for_pull();
- });
+ }).handle_error_interruptible(
+ crimson::ct_error::assert_all("unexpected error")
+ );
}
RecoveryBackend::interruptible_future<>
RecoveryBackend::interruptible_future<PushOp>
ReplicatedRecoveryBackend::prep_push(
+ const crimson::osd::ObjectContextRef &head_obc,
const hobject_t& soid,
eversion_t need,
pg_shard_t pg_shard)
push_info.recovery_info.copy_subset = data_subset;
push_info.recovery_info.soid = soid;
push_info.recovery_info.oi = obc->obs.oi;
+ assert(head_obc->ssc);
+ push_info.recovery_info.ss = head_obc->ssc->snapset;
push_info.recovery_info.version = obc->obs.oi.version;
push_info.recovery_info.object_exist =
missing_iter->second.clean_regions.object_is_exist();
});
}
-void ReplicatedRecoveryBackend::prepare_pull(PullOp& pull_op,
+void ReplicatedRecoveryBackend::prepare_pull(
+ const crimson::osd::ObjectContextRef &head_obc,
+ PullOp& pull_op,
pull_info_t& pull_info,
const hobject_t& soid,
eversion_t need) {
pull_op.recovery_info.copy_subset.insert(0, (uint64_t) -1);
pull_op.recovery_info.copy_subset.intersection_of(
missing_iter->second.clean_regions.get_dirty_regions());
+ if (soid.is_snap()) {
+ assert(head_obc->ssc);
+ pull_op.recovery_info.ss = head_obc->ssc->snapset;
+ }
pull_op.recovery_info.size = ((uint64_t) -1);
pull_op.recovery_info.object_exist =
missing_iter->second.clean_regions.object_is_exist();