{
logger().debug("{}: {}, {}", __func__, soid, need);
- return seastar::do_with(std::map<pg_shard_t, PushOp>(),
- std::map<pg_shard_t, interval_set<uint64_t>>(),
- [this, soid, &shards](auto& pops,
- auto& data_subsets) {
- return seastar::parallel_for_each(shards,
- [this, soid, pops, &data_subsets](auto pg_shard) mutable {
- pops.emplace(pg_shard, PushOp());
- auto& recovery_waiter = recovering.at(soid);
- auto& obc = recovery_waiter.obc;
- auto& data_subset = data_subsets[pg_shard];
-
- if (obc->obs.oi.size) {
- data_subset.insert(0, obc->obs.oi.size);
- }
- const auto& missing = pg.get_shard_missing().find(pg_shard)->second;
- if (HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS)) {
- const auto it = missing.get_items().find(soid);
- assert(it != missing.get_items().end());
- data_subset.intersection_of(it->second.clean_regions.get_dirty_regions());
- logger().debug("calc_head_subsets {} data_subset {}", soid, data_subset);
- }
+ auto pops = seastar::make_lw_shared<std::map<pg_shard_t, PushOp>>();
+ return seastar::parallel_for_each(shards,
+ [this, soid, pops](auto pg_shard) mutable {
+ auto& recovery_waiter = recovering.at(soid);
+ auto& obc = recovery_waiter.obc;
+ interval_set<uint64_t> data_subset;
+ if (obc->obs.oi.size) {
+ data_subset.insert(0, obc->obs.oi.size);
+ }
+ const auto& missing = pg.get_shard_missing().find(pg_shard)->second;
+ if (HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS)) {
+ const auto it = missing.get_items().find(soid);
+ assert(it != missing.get_items().end());
+ data_subset.intersection_of(it->second.clean_regions.get_dirty_regions());
+ logger().debug("prep_push: {} data_subset {}", soid, data_subset);
+ }
- logger().debug("prep_push: {} to {}", soid, pg_shard);
- auto& pi = recovery_waiter.pushing[pg_shard];
- pg.begin_peer_recover(pg_shard, soid);
- const auto pmissing_iter = pg.get_shard_missing().find(pg_shard);
- const auto missing_iter = pmissing_iter->second.get_items().find(soid);
- assert(missing_iter != pmissing_iter->second.get_items().end());
+ logger().debug("prep_push: {} to {}", soid, pg_shard);
+ auto& pi = recovery_waiter.pushing[pg_shard];
+ pg.begin_peer_recover(pg_shard, soid);
+ const auto pmissing_iter = pg.get_shard_missing().find(pg_shard);
+ const auto missing_iter = pmissing_iter->second.get_items().find(soid);
+ assert(missing_iter != pmissing_iter->second.get_items().end());
- pi.obc = obc;
- pi.recovery_info.size = obc->obs.oi.size;
- pi.recovery_info.copy_subset = data_subset;
- pi.recovery_info.soid = soid;
- pi.recovery_info.oi = obc->obs.oi;
- pi.recovery_info.version = obc->obs.oi.version;
- pi.recovery_info.object_exist =
- missing_iter->second.clean_regions.object_is_exist();
- pi.recovery_progress.omap_complete =
- !missing_iter->second.clean_regions.omap_is_dirty() &&
- HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS);
+ pi.obc = obc;
+ pi.recovery_info.size = obc->obs.oi.size;
+ pi.recovery_info.copy_subset = data_subset;
+ pi.recovery_info.soid = soid;
+ pi.recovery_info.oi = obc->obs.oi;
+ pi.recovery_info.version = obc->obs.oi.version;
+ pi.recovery_info.object_exist =
+ missing_iter->second.clean_regions.object_is_exist();
+ pi.recovery_progress.omap_complete =
+ (!missing_iter->second.clean_regions.omap_is_dirty() &&
+ HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS));
- return build_push_op(pi.recovery_info, pi.recovery_progress,
- &pi.stat).then(
- [this, soid, pg_shard, &pops](auto pop) {
- auto& recovery_waiter = recovering.at(soid);
- auto& pi = recovery_waiter.pushing[pg_shard];
- pi.recovery_progress = pop.after_progress;
- pops[pg_shard] = std::move(pop);
- return seastar::make_ready_future<>();
- });
- }).then([&pops]() mutable {
- return seastar::make_ready_future<std::map<pg_shard_t, PushOp>>(std::move(pops));
+ return build_push_op(pi.recovery_info, pi.recovery_progress,
+ &pi.stat).then(
+ [this, soid, pg_shard, pops](auto pop) {
+ auto& recovery_waiter = recovering.at(soid);
+ auto& pi = recovery_waiter.pushing[pg_shard];
+ pi.recovery_progress = pop.after_progress;
+ pops->emplace(pg_shard, std::move(pop));
});
+ }).then([pops]() mutable {
+ return seastar::make_ready_future<std::map<pg_shard_t, PushOp>>(std::move(*pops));
});
}