const hobject_t& soid,
eversion_t need)
{
+ logger().debug("{}: {}, {}", __func__, soid, need);
pg_missing_tracker_t local_missing = pg.get_local_missing();
if (!local_missing.is_missing(soid)) {
+ // object is not missing, don't pull
return seastar::make_ready_future<>();
}
return pg.obc_loader.with_obc<RWState::RWREAD>(soid.get_head(),
const auto missing_iter = local_missing.get_items().find(soid);
auto m = pg.get_missing_loc_shards();
pg_shard_t fromshard = *(m[soid].begin());
+ const auto& last_backfill =
+ pg.get_peering_state().get_peer_info(fromshard).last_backfill;
- //TODO: skipped snap objects case for now
- pull_op.recovery_info.copy_subset.insert(0, (uint64_t) -1);
- pull_op.recovery_info.copy_subset.intersection_of(
- missing_iter->second.clean_regions.get_dirty_regions());
- if (soid.is_snap()) {
- assert(head_obc->ssc);
- pull_op.recovery_info.ss = head_obc->ssc->snapset;
- }
- pull_op.recovery_info.size = ((uint64_t) -1);
- pull_op.recovery_info.object_exist =
- missing_iter->second.clean_regions.object_is_exist();
- pull_op.recovery_info.soid = soid;
+ pull_op.recovery_info =
+ set_recovery_info(soid, head_obc->ssc, last_backfill);
pull_op.soid = soid;
pull_op.recovery_progress.data_complete = false;
pull_op.recovery_progress.omap_complete =
pull_info.from = fromshard;
pull_info.soid = soid;
+ pull_info.head_ctx = head_obc;
pull_info.recovery_info = pull_op.recovery_info;
pull_info.recovery_progress = pull_op.recovery_progress;
}
+ObjectRecoveryInfo ReplicatedRecoveryBackend::set_recovery_info(
+ const hobject_t& soid,
+ const crimson::osd::SnapSetContextRef ssc,
+ const hobject_t& last_backfill)
+{
+ pg_missing_tracker_t local_missing = pg.get_local_missing();
+ const auto missing_iter = local_missing.get_items().find(soid);
+ ObjectRecoveryInfo recovery_info;
+ if (soid.is_snap()) {
+ assert(!local_missing.is_missing(soid.get_head()));
+ assert(ssc);
+ recovery_info.ss = ssc->snapset;
+ auto subsets = crimson::osd::calc_clone_subsets(
+ ssc->snapset, soid, local_missing, last_backfill);
+ crimson::osd::set_subsets(subsets, recovery_info);
+ logger().debug("{}: pulling {}", __func__, recovery_info);
+ ceph_assert(ssc->snapset.clone_size.count(soid.snap));
+ recovery_info.size = ssc->snapset.clone_size[soid.snap];
+ } else {
+ // pulling head or unversioned object.
+ // always pull the whole thing.
+ recovery_info.copy_subset.insert(0, (uint64_t) -1);
+ recovery_info.copy_subset.intersection_of(
+ missing_iter->second.clean_regions.get_dirty_regions());
+ recovery_info.size = ((uint64_t) -1);
+ }
+ recovery_info.object_exist =
+ missing_iter->second.clean_regions.object_is_exist();
+ recovery_info.soid = soid;
+ return recovery_info;
+}
+
RecoveryBackend::interruptible_future<PushOp>
ReplicatedRecoveryBackend::build_push_op(
const ObjectRecoveryInfo& recovery_info,