assert(is_recovering(soid));
// start tracking the recovery of soid
return maybe_pull_missing_obj(soid, need).then([this, soid, need] {
- return seastar::do_with(std::map<pg_shard_t, PushOp>(),
- get_shards_to_push(soid),
- [this, soid, need](auto& pops, auto& shards) {
- return maybe_push_shards(soid, need, pops, shards);
+ return seastar::do_with(get_shards_to_push(soid),
+ [this, soid, need](auto& shards) {
+ return maybe_push_shards(soid, need, shards);
});
});
}
ReplicatedRecoveryBackend::maybe_push_shards(
const hobject_t& soid,
eversion_t need,
- std::map<pg_shard_t, PushOp>& pops,
std::vector<pg_shard_t>& shards)
{
- auto push_func = [this, soid, need, &pops, &shards] {
- auto fut = seastar::now();
- if (!shards.empty())
- fut = prep_push(soid, need, &pops, shards);
- return fut.then([this, &pops, &shards, soid] {
- return seastar::parallel_for_each(shards,
- [this, &pops, soid](auto shard) {
- auto msg = make_message<MOSDPGPush>();
- msg->from = pg.get_pg_whoami();
- msg->pgid = pg.get_pgid();
- msg->map_epoch = pg.get_osdmap_epoch();
- msg->min_epoch = pg.get_last_peering_reset();
- msg->set_priority(pg.get_recovery_op_priority());
- msg->pushes.push_back(pops[shard]);
- return shard_services.send_to_osd(shard.osd, std::move(msg),
- pg.get_osdmap_epoch()).then(
- [this, soid, shard] {
- return recovering.at(soid).wait_for_pushes(shard);
- });
- });
- }).then([this, soid] {
- auto& recovery = recovering.at(soid);
+ auto push_func = [this, soid, need, &shards] {
+ auto prepare_pops = seastar::now();
+ if (!shards.empty()) {
+ prepare_pops =
+ prep_push(soid, need, shards).then([this, &shards, soid](auto pops) {
+ return seastar::parallel_for_each(shards,
+ [this, &pops, soid](auto shard) {
+ auto msg = make_message<MOSDPGPush>();
+ msg->from = pg.get_pg_whoami();
+ msg->pgid = pg.get_pgid();
+ msg->map_epoch = pg.get_osdmap_epoch();
+ msg->min_epoch = pg.get_last_peering_reset();
+ msg->pushes.push_back(pops[shard]);
+ msg->set_priority(pg.get_recovery_op_priority());
+ return shard_services.send_to_osd(shard.osd,
+ std::move(msg),
+ pg.get_osdmap_epoch()).then(
+ [this, soid, shard] {
+ return recovering.at(soid).wait_for_pushes(shard);
+ });
+ });
+ });
+ }
+ return prepare_pops.then([this, soid] {
+ auto &recovery = recovering.at(soid);
auto push_info = recovery.pushing.begin();
object_stat_sum_t stat = {};
if (push_info != recovery.pushing.end()) {
- stat = push_info->second.stat;
+ stat = push_info->second.stat;
} else {
- // no push happened, take pull_info's stat
- assert(recovery.pi);
- stat = recovery.pi->stat;
+ // no push happened, take pull_info's stat
+ assert(recovery.pi);
+ stat = recovery.pi->stat;
}
pg.get_recovery_handler()->on_global_recover(soid, stat, false);
return seastar::make_ready_future<>();
}).handle_exception([this, soid](auto e) {
- auto& recovery = recovering.at(soid);
- if (recovery.obc)
- recovery.obc->drop_recovery_read();
+ auto &recovery = recovering.at(soid);
+ if (recovery.obc) {
+ recovery.obc->drop_recovery_read();
+ }
recovering.erase(soid);
return seastar::make_exception_future<>(e);
});
- };
+ }; // push_func
auto& recovery_waiter = recovering.at(soid);
if (recovery_waiter.obc) {
});
}
-seastar::future<> ReplicatedRecoveryBackend::prep_push(
+seastar::future<std::map<pg_shard_t, PushOp>>
+ReplicatedRecoveryBackend::prep_push(
const hobject_t& soid,
eversion_t need,
- std::map<pg_shard_t, PushOp>* pops,
const std::vector<pg_shard_t>& shards)
{
logger().debug("{}: {}, {}", __func__, soid, need);
- return seastar::do_with(std::map<pg_shard_t, interval_set<uint64_t>>(),
- [this, soid, pops, &shards](auto& data_subsets) {
+ return seastar::do_with(std::map<pg_shard_t, PushOp>(),
+ std::map<pg_shard_t, interval_set<uint64_t>>(),
+ [this, soid, &shards](auto& pops,
+ auto& data_subsets) {
return seastar::parallel_for_each(shards,
[this, soid, pops, &data_subsets](auto pg_shard) mutable {
- pops->emplace(pg_shard, PushOp());
+ pops.emplace(pg_shard, PushOp());
auto& recovery_waiter = recovering.at(soid);
auto& obc = recovery_waiter.obc;
auto& data_subset = data_subsets[pg_shard];
HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS);
return build_push_op(pi.recovery_info, pi.recovery_progress,
- &pi.stat, &(*pops)[pg_shard]).then(
+ &pi.stat, &pops[pg_shard]).then(
[this, soid, pg_shard](auto new_progress) {
auto& recovery_waiter = recovering.at(soid);
auto& pi = recovery_waiter.pushing[pg_shard];
pi.recovery_progress = new_progress;
return seastar::make_ready_future<>();
});
+ }).then([&pops]() mutable {
+ return seastar::make_ready_future<std::map<pg_shard_t, PushOp>>(std::move(pops));
});
});
}