auto push_func = [this, soid, need, &shards] {
auto prepare_pops = seastar::now();
if (!shards.empty()) {
- prepare_pops =
- prep_push(soid, need, shards).then([this, &shards, soid](auto pops) {
- return seastar::parallel_for_each(shards,
- [this, &pops, soid](auto shard) {
- auto msg = make_message<MOSDPGPush>();
- msg->from = pg.get_pg_whoami();
- msg->pgid = pg.get_pgid();
- msg->map_epoch = pg.get_osdmap_epoch();
- msg->min_epoch = pg.get_last_peering_reset();
- msg->pushes.push_back(pops[shard]);
- msg->set_priority(pg.get_recovery_op_priority());
- return shard_services.send_to_osd(shard.osd,
- std::move(msg),
- pg.get_osdmap_epoch()).then(
- [this, soid, shard] {
- return recovering.at(soid).wait_for_pushes(shard);
- });
+ prepare_pops = seastar::parallel_for_each(shards, [this, need, soid](auto shard) {
+ return prep_push(soid, need, shard).then([this, soid, shard](auto push) {
+ auto msg = make_message<MOSDPGPush>();
+ msg->from = pg.get_pg_whoami();
+ msg->pgid = pg.get_pgid();
+ msg->map_epoch = pg.get_osdmap_epoch();
+ msg->min_epoch = pg.get_last_peering_reset();
+ msg->pushes.push_back(std::move(push));
+ msg->set_priority(pg.get_recovery_op_priority());
+ return shard_services.send_to_osd(shard.osd,
+ std::move(msg),
+ pg.get_osdmap_epoch()).then(
+ [this, soid, shard] {
+ return recovering.at(soid).wait_for_pushes(shard);
});
});
+ });
}
return prepare_pops.then([this, soid] {
auto &recovery = recovering.at(soid);
});
}
-seastar::future<std::map<pg_shard_t, PushOp>>
+seastar::future<PushOp>
ReplicatedRecoveryBackend::prep_push(
const hobject_t& soid,
eversion_t need,
- const std::vector<pg_shard_t>& shards)
+ pg_shard_t pg_shard)
{
logger().debug("{}: {}, {}", __func__, soid, need);
- auto pops = seastar::make_lw_shared<std::map<pg_shard_t, PushOp>>();
- return seastar::parallel_for_each(shards,
- [this, soid, pops](auto pg_shard) mutable {
- auto& recovery_waiter = recovering.at(soid);
- auto& obc = recovery_waiter.obc;
- interval_set<uint64_t> data_subset;
- if (obc->obs.oi.size) {
- data_subset.insert(0, obc->obs.oi.size);
- }
- const auto& missing = pg.get_shard_missing().find(pg_shard)->second;
- if (HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS)) {
- const auto it = missing.get_items().find(soid);
- assert(it != missing.get_items().end());
- data_subset.intersection_of(it->second.clean_regions.get_dirty_regions());
- logger().debug("prep_push: {} data_subset {}", soid, data_subset);
- }
+ auto& recovery_waiter = recovering.at(soid);
+ auto& obc = recovery_waiter.obc;
+ interval_set<uint64_t> data_subset;
+ if (obc->obs.oi.size) {
+ data_subset.insert(0, obc->obs.oi.size);
+ }
+ const auto& missing = pg.get_shard_missing().find(pg_shard)->second;
+ if (HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS)) {
+ const auto it = missing.get_items().find(soid);
+ assert(it != missing.get_items().end());
+ data_subset.intersection_of(it->second.clean_regions.get_dirty_regions());
+ logger().debug("prep_push: {} data_subset {}", soid, data_subset);
+ }
- logger().debug("prep_push: {} to {}", soid, pg_shard);
- auto& pi = recovery_waiter.pushing[pg_shard];
- pg.begin_peer_recover(pg_shard, soid);
- const auto pmissing_iter = pg.get_shard_missing().find(pg_shard);
- const auto missing_iter = pmissing_iter->second.get_items().find(soid);
- assert(missing_iter != pmissing_iter->second.get_items().end());
+ logger().debug("prep_push: {} to {}", soid, pg_shard);
+ auto& pi = recovery_waiter.pushing[pg_shard];
+ pg.begin_peer_recover(pg_shard, soid);
+ const auto pmissing_iter = pg.get_shard_missing().find(pg_shard);
+ const auto missing_iter = pmissing_iter->second.get_items().find(soid);
+ assert(missing_iter != pmissing_iter->second.get_items().end());
- pi.obc = obc;
- pi.recovery_info.size = obc->obs.oi.size;
- pi.recovery_info.copy_subset = data_subset;
- pi.recovery_info.soid = soid;
- pi.recovery_info.oi = obc->obs.oi;
- pi.recovery_info.version = obc->obs.oi.version;
- pi.recovery_info.object_exist =
- missing_iter->second.clean_regions.object_is_exist();
- pi.recovery_progress.omap_complete =
- (!missing_iter->second.clean_regions.omap_is_dirty() &&
- HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS));
+ pi.obc = obc;
+ pi.recovery_info.size = obc->obs.oi.size;
+ pi.recovery_info.copy_subset = data_subset;
+ pi.recovery_info.soid = soid;
+ pi.recovery_info.oi = obc->obs.oi;
+ pi.recovery_info.version = obc->obs.oi.version;
+ pi.recovery_info.object_exist =
+ missing_iter->second.clean_regions.object_is_exist();
+ pi.recovery_progress.omap_complete =
+ (!missing_iter->second.clean_regions.omap_is_dirty() &&
+ HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS));
- return build_push_op(pi.recovery_info, pi.recovery_progress,
- &pi.stat).then(
- [this, soid, pg_shard, pops](auto pop) {
- auto& recovery_waiter = recovering.at(soid);
- auto& pi = recovery_waiter.pushing[pg_shard];
- pi.recovery_progress = pop.after_progress;
- pops->emplace(pg_shard, std::move(pop));
- });
- }).then([pops]() mutable {
- return seastar::make_ready_future<std::map<pg_shard_t, PushOp>>(std::move(*pops));
+ return build_push_op(pi.recovery_info, pi.recovery_progress, &pi.stat).then(
+ [this, soid, pg_shard](auto pop) {
+ auto& recovery_waiter = recovering.at(soid);
+ auto& pi = recovery_waiter.pushing[pg_shard];
+ pi.recovery_progress = pop.after_progress;
+ return pop;
});
}