From: Kefu Chai Date: Mon, 14 Dec 2020 16:38:05 +0000 (+0800) Subject: crimson/osd: let prep_push() return a map of PushOp X-Git-Tag: v17.0.0~295^2~2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=7b2ab64ed09a57ca54319e69eeb22be480cdc554;p=ceph.git crimson/osd: let prep_push() return a map of PushOp ReplicatedRecoveryBackend::prep_push() is responsible for building PushOps, so it's more natural to let it return a map of PushOp. Signed-off-by: Kefu Chai --- diff --git a/src/crimson/osd/replicated_recovery_backend.cc b/src/crimson/osd/replicated_recovery_backend.cc index 66cc0513ccc2f..3e81ea76fff76 100644 --- a/src/crimson/osd/replicated_recovery_backend.cc +++ b/src/crimson/osd/replicated_recovery_backend.cc @@ -27,10 +27,9 @@ seastar::future<> ReplicatedRecoveryBackend::recover_object( assert(is_recovering(soid)); // start tracking the recovery of soid return maybe_pull_missing_obj(soid, need).then([this, soid, need] { - return seastar::do_with(std::map(), - get_shards_to_push(soid), - [this, soid, need](auto& pops, auto& shards) { - return maybe_push_shards(soid, need, pops, shards); + return seastar::do_with(get_shards_to_push(soid), + [this, soid, need](auto& shards) { + return maybe_push_shards(soid, need, shards); }); }); } @@ -39,50 +38,53 @@ seastar::future<> ReplicatedRecoveryBackend::maybe_push_shards( const hobject_t& soid, eversion_t need, - std::map& pops, std::vector& shards) { - auto push_func = [this, soid, need, &pops, &shards] { - auto fut = seastar::now(); - if (!shards.empty()) - fut = prep_push(soid, need, &pops, shards); - return fut.then([this, &pops, &shards, soid] { - return seastar::parallel_for_each(shards, - [this, &pops, soid](auto shard) { - auto msg = make_message(); - msg->from = pg.get_pg_whoami(); - msg->pgid = pg.get_pgid(); - msg->map_epoch = pg.get_osdmap_epoch(); - msg->min_epoch = pg.get_last_peering_reset(); - msg->set_priority(pg.get_recovery_op_priority()); - msg->pushes.push_back(pops[shard]); - return shard_services.send_to_osd(shard.osd, std::move(msg), - pg.get_osdmap_epoch()).then( - [this, soid, shard] { - return recovering.at(soid).wait_for_pushes(shard); - }); - }); - }).then([this, soid] { - auto& recovery = recovering.at(soid); + auto push_func = [this, soid, need, &shards] { + auto prepare_pops = seastar::now(); + if (!shards.empty()) { + prepare_pops = + prep_push(soid, need, shards).then([this, &shards, soid](auto pops) { + return seastar::parallel_for_each(shards, + [this, &pops, soid](auto shard) { + auto msg = make_message(); + msg->from = pg.get_pg_whoami(); + msg->pgid = pg.get_pgid(); + msg->map_epoch = pg.get_osdmap_epoch(); + msg->min_epoch = pg.get_last_peering_reset(); + msg->pushes.push_back(pops[shard]); + msg->set_priority(pg.get_recovery_op_priority()); + return shard_services.send_to_osd(shard.osd, + std::move(msg), + pg.get_osdmap_epoch()).then( + [this, soid, shard] { + return recovering.at(soid).wait_for_pushes(shard); + }); + }); + }); + } + return prepare_pops.then([this, soid] { + auto &recovery = recovering.at(soid); auto push_info = recovery.pushing.begin(); object_stat_sum_t stat = {}; if (push_info != recovery.pushing.end()) { - stat = push_info->second.stat; + stat = push_info->second.stat; } else { - // no push happened, take pull_info's stat - assert(recovery.pi); - stat = recovery.pi->stat; + // no push happened, take pull_info's stat + assert(recovery.pi); + stat = recovery.pi->stat; } pg.get_recovery_handler()->on_global_recover(soid, stat, false); return seastar::make_ready_future<>(); }).handle_exception([this, soid](auto e) { - auto& recovery = recovering.at(soid); - if (recovery.obc) - recovery.obc->drop_recovery_read(); + auto &recovery = recovering.at(soid); + if (recovery.obc) { + recovery.obc->drop_recovery_read(); + } recovering.erase(soid); return seastar::make_exception_future<>(e); }); - }; + }; // push_func auto& recovery_waiter = recovering.at(soid); if (recovery_waiter.obc) { @@ -280,19 +282,21 @@ seastar::future<> ReplicatedRecoveryBackend::recover_delete( }); } -seastar::future<> ReplicatedRecoveryBackend::prep_push( +seastar::future> +ReplicatedRecoveryBackend::prep_push( const hobject_t& soid, eversion_t need, - std::map* pops, const std::vector& shards) { logger().debug("{}: {}, {}", __func__, soid, need); - return seastar::do_with(std::map>(), - [this, soid, pops, &shards](auto& data_subsets) { + return seastar::do_with(std::map(), + std::map>(), + [this, soid, &shards](auto& pops, + auto& data_subsets) { return seastar::parallel_for_each(shards, [this, soid, pops, &data_subsets](auto pg_shard) mutable { - pops->emplace(pg_shard, PushOp()); + pops.emplace(pg_shard, PushOp()); auto& recovery_waiter = recovering.at(soid); auto& obc = recovery_waiter.obc; auto& data_subset = data_subsets[pg_shard]; @@ -328,13 +332,15 @@ seastar::future<> ReplicatedRecoveryBackend::prep_push( HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS); return build_push_op(pi.recovery_info, pi.recovery_progress, - &pi.stat, &(*pops)[pg_shard]).then( + &pi.stat, &pops[pg_shard]).then( [this, soid, pg_shard](auto new_progress) { auto& recovery_waiter = recovering.at(soid); auto& pi = recovery_waiter.pushing[pg_shard]; pi.recovery_progress = new_progress; return seastar::make_ready_future<>(); }); + }).then([&pops]() mutable { + return seastar::make_ready_future>(std::move(pops)); }); }); } diff --git a/src/crimson/osd/replicated_recovery_backend.h b/src/crimson/osd/replicated_recovery_backend.h index 4646d78bb5f40..5017558012df1 100644 --- a/src/crimson/osd/replicated_recovery_backend.h +++ b/src/crimson/osd/replicated_recovery_backend.h @@ -44,10 +44,9 @@ protected: Ref m); seastar::future<> handle_recovery_delete_reply( Ref m); - seastar::future<> prep_push( + seastar::future> prep_push( const hobject_t& soid, eversion_t need, - std::map* pops, const std::vector& shards); void prepare_pull( PullOp& po, @@ -121,7 +120,6 @@ private: seastar::future<> maybe_push_shards( const hobject_t& soid, eversion_t need, - std::map& pops, std::vector& shards); /// read the remaining extents of object to be recovered and fill push_op