seastar::future<> ReplicatedRecoveryBackend::handle_pull(Ref<MOSDPGPull> m)
{
logger().debug("{}: {}", __func__, *m);
- vector<PullOp> pulls;
- m->take_pulls(&pulls);
- return seastar::do_with(std::move(pulls),
- [this, m, from = m->from](auto& pulls) {
- return seastar::parallel_for_each(pulls, [this, m, from](auto& pull_op) {
- const hobject_t& soid = pull_op.soid;
- logger().debug("handle_pull: {}", soid);
- return backend->stat(coll, ghobject_t(soid)).then(
- [this, &pull_op](auto st) {
- ObjectRecoveryInfo &recovery_info = pull_op.recovery_info;
- ObjectRecoveryProgress &progress = pull_op.recovery_progress;
- if (progress.first && recovery_info.size == ((uint64_t) -1)) {
- // Adjust size and copy_subset
- recovery_info.size = st.st_size;
- if (st.st_size) {
- interval_set<uint64_t> object_range;
- object_range.insert(0, st.st_size);
- recovery_info.copy_subset.intersection_of(object_range);
- } else {
- recovery_info.copy_subset.clear();
- }
- assert(recovery_info.clone_subset.empty());
+ return seastar::parallel_for_each(m->take_pulls(),
+ [this, from=m->from](auto& pull_op) {
+ const hobject_t& soid = pull_op.soid;
+ logger().debug("handle_pull: {}", soid);
+ return backend->stat(coll, ghobject_t(soid)).then(
+ [this, &pull_op](auto st) {
+ ObjectRecoveryInfo &recovery_info = pull_op.recovery_info;
+ ObjectRecoveryProgress &progress = pull_op.recovery_progress;
+ if (progress.first && recovery_info.size == ((uint64_t) -1)) {
+ // Adjust size and copy_subset
+ recovery_info.size = st.st_size;
+ if (st.st_size) {
+ interval_set<uint64_t> object_range;
+ object_range.insert(0, st.st_size);
+ recovery_info.copy_subset.intersection_of(object_range);
+ } else {
+ recovery_info.copy_subset.clear();
}
- return build_push_op(recovery_info, progress, 0);
- }).then([this, from](auto pop) {
- auto msg = make_message<MOSDPGPush>();
- msg->from = pg.get_pg_whoami();
- msg->pgid = pg.get_pgid();
- msg->map_epoch = pg.get_osdmap_epoch();
- msg->min_epoch = pg.get_last_peering_reset();
- msg->set_priority(pg.get_recovery_op_priority());
- msg->pushes.push_back(std::move(pop));
- return shard_services.send_to_osd(from.osd, std::move(msg),
- pg.get_osdmap_epoch());
- });
+ assert(recovery_info.clone_subset.empty());
+ }
+ return build_push_op(recovery_info, progress, 0);
+ }).then([this, from](auto pop) {
+ auto msg = make_message<MOSDPGPush>();
+ msg->from = pg.get_pg_whoami();
+ msg->pgid = pg.get_pgid();
+ msg->map_epoch = pg.get_osdmap_epoch();
+ msg->min_epoch = pg.get_last_peering_reset();
+ msg->set_priority(pg.get_recovery_op_priority());
+ msg->pushes.push_back(std::move(pop));
+ return shard_services.send_to_osd(from.osd, std::move(msg),
+ pg.get_osdmap_epoch());
});
});
}