From 2f88d7fbd4e092c0b24033bf4f699d33715143d6 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Tue, 15 Dec 2020 01:32:08 +0800 Subject: [PATCH] crimson/osd: let build_push_op() return PushOp for better readability Signed-off-by: Kefu Chai --- .../osd/replicated_recovery_backend.cc | 188 +++++++++--------- src/crimson/osd/replicated_recovery_backend.h | 10 +- 2 files changed, 94 insertions(+), 104 deletions(-) diff --git a/src/crimson/osd/replicated_recovery_backend.cc b/src/crimson/osd/replicated_recovery_backend.cc index 3e81ea76fff..9a1fdc16fcb 100644 --- a/src/crimson/osd/replicated_recovery_backend.cc +++ b/src/crimson/osd/replicated_recovery_backend.cc @@ -332,11 +332,12 @@ ReplicatedRecoveryBackend::prep_push( HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS); return build_push_op(pi.recovery_info, pi.recovery_progress, - &pi.stat, &pops[pg_shard]).then( - [this, soid, pg_shard](auto new_progress) { + &pi.stat).then( + [this, soid, pg_shard, &pops](auto pop) { auto& recovery_waiter = recovering.at(soid); auto& pi = recovery_waiter.pushing[pg_shard]; - pi.recovery_progress = new_progress; + pi.recovery_progress = pop.after_progress; + pops[pg_shard] = std::move(pop); return seastar::make_ready_future<>(); }); }).then([&pops]() mutable { @@ -378,12 +379,11 @@ void ReplicatedRecoveryBackend::prepare_pull(PullOp& po, PullInfo& pi, pi.recovery_progress = po.recovery_progress; } -seastar::future ReplicatedRecoveryBackend::build_push_op( +seastar::future ReplicatedRecoveryBackend::build_push_op( const ObjectRecoveryInfo& recovery_info, const ObjectRecoveryProgress& progress, - object_stat_sum_t* stat, - PushOp* pop - ) { + object_stat_sum_t* stat) +{ logger().debug("{} {} @{}", __func__, recovery_info.soid, recovery_info.version); return seastar::do_with(ObjectRecoveryProgress(progress), @@ -391,9 +391,10 @@ seastar::future ReplicatedRecoveryBackend::build_push_op uint64_t(crimson::common::local_conf() ->osd_recovery_max_chunk), eversion_t(), - [this, &recovery_info, &progress, stat, pop] - (auto& new_progress, auto& oi, auto& available, auto& v) { - return [this, &recovery_info, &progress, &new_progress, &oi, pop, &v] { + PushOp(), + [this, &recovery_info, &progress, stat] + (auto& new_progress, auto& oi, auto& available, auto& v, auto& pop) { + return [this, &recovery_info, &progress, &new_progress, &oi, &v, pop=&pop] { v = recovery_info.version; if (progress.first) { return backend->omap_get_header(coll, ghobject_t(recovery_info.soid)) @@ -419,20 +420,20 @@ seastar::future ReplicatedRecoveryBackend::build_push_op ); } return seastar::make_ready_future<>(); - }().then([this, &recovery_info, &progress, &new_progress, &available, pop] { + }().then([this, &recovery_info, &progress, &new_progress, &available, &pop]() mutable { return read_omap_for_push_op(recovery_info.soid, progress, new_progress, - available, pop); - }).then([this, &recovery_info, &progress, &available, pop] { + available, &pop); + }).then([this, &recovery_info, &progress, &available, &pop]() mutable { logger().debug("build_push_op: available: {}, copy_subset: {}", available, recovery_info.copy_subset); return read_object_for_push_op(recovery_info.soid, recovery_info.copy_subset, progress.data_recovered_to, - available, pop); - }).then([&recovery_info, &v, &progress, &new_progress, stat, pop] - (uint64_t recovered_to) { + available, &pop); + }).then([&recovery_info, &v, &progress, &new_progress, stat, &pop] + (uint64_t recovered_to) mutable { new_progress.data_recovered_to = recovered_to; if (new_progress.is_complete(recovery_info)) { new_progress.data_complete = true; @@ -444,18 +445,17 @@ seastar::future ReplicatedRecoveryBackend::build_push_op new_progress.omap_complete = false; } if (stat) { - stat->num_keys_recovered += pop->omap_entries.size(); - stat->num_bytes_recovered += pop->data.length(); + stat->num_keys_recovered += pop.omap_entries.size(); + stat->num_bytes_recovered += pop.data.length(); } - pop->version = v; - pop->soid = recovery_info.soid; - pop->recovery_info = recovery_info; - pop->after_progress = new_progress; - pop->before_progress = progress; + pop.version = v; + pop.soid = recovery_info.soid; + pop.recovery_info = recovery_info; + pop.after_progress = new_progress; + pop.before_progress = progress; logger().debug("build_push_op: pop version: {}, pop data length: {}", - pop->version, pop->data.length()); - return seastar::make_ready_future - (std::move(new_progress)); + pop.version, pop.data.length()); + return seastar::make_ready_future(std::move(pop)); }); }); } @@ -586,42 +586,34 @@ seastar::future<> ReplicatedRecoveryBackend::handle_pull(Ref m) [this, m, from = m->from](auto& pulls) { return seastar::parallel_for_each(pulls, [this, m, from](auto& pull_op) { const hobject_t& soid = pull_op.soid; - return seastar::do_with(PushOp(), - [this, &soid, &pull_op, from](auto& pop) { - logger().debug("handle_pull: {}", soid); - return backend->stat(coll, ghobject_t(soid)).then( - [this, &pull_op, &pop](auto st) { - ObjectRecoveryInfo &recovery_info = pull_op.recovery_info; - ObjectRecoveryProgress &progress = pull_op.recovery_progress; - if (progress.first && recovery_info.size == ((uint64_t) -1)) { - // Adjust size and copy_subset - recovery_info.size = st.st_size; - if (st.st_size) { - interval_set object_range; - object_range.insert(0, st.st_size); - recovery_info.copy_subset.intersection_of(object_range); - } else { - recovery_info.copy_subset.clear(); - } - assert(recovery_info.clone_subset.empty()); - } - return build_push_op(recovery_info, progress, 0, &pop); - }).handle_exception([soid, &pop](auto e) { - pop.recovery_info.version = eversion_t(); - pop.version = eversion_t(); - pop.soid = soid; - return seastar::make_ready_future(); - }).then([this, &pop, from](auto new_progress) { - auto msg = make_message(); - msg->from = pg.get_pg_whoami(); - msg->pgid = pg.get_pgid(); - msg->map_epoch = pg.get_osdmap_epoch(); - msg->min_epoch = pg.get_last_peering_reset(); - msg->set_priority(pg.get_recovery_op_priority()); - msg->pushes.push_back(pop); - return shard_services.send_to_osd(from.osd, std::move(msg), - pg.get_osdmap_epoch()); - }); + logger().debug("handle_pull: {}", soid); + return backend->stat(coll, ghobject_t(soid)).then( + [this, &pull_op](auto st) { + ObjectRecoveryInfo &recovery_info = pull_op.recovery_info; + ObjectRecoveryProgress &progress = pull_op.recovery_progress; + if (progress.first && recovery_info.size == ((uint64_t) -1)) { + // Adjust size and copy_subset + recovery_info.size = st.st_size; + if (st.st_size) { + interval_set object_range; + object_range.insert(0, st.st_size); + recovery_info.copy_subset.intersection_of(object_range); + } else { + recovery_info.copy_subset.clear(); + } + assert(recovery_info.clone_subset.empty()); + } + return build_push_op(recovery_info, progress, 0); + }).then([this, from](auto pop) { + auto msg = make_message(); + msg->from = pg.get_pg_whoami(); + msg->pgid = pg.get_pgid(); + msg->map_epoch = pg.get_osdmap_epoch(); + msg->min_epoch = pg.get_last_peering_reset(); + msg->set_priority(pg.get_recovery_op_priority()); + msg->pushes.push_back(std::move(pop)); + return shard_services.send_to_osd(from.osd, std::move(msg), + pg.get_osdmap_epoch()); }); }); }); @@ -827,10 +819,10 @@ seastar::future<> ReplicatedRecoveryBackend::handle_push( }); } -seastar::future ReplicatedRecoveryBackend::_handle_push_reply( +seastar::future> +ReplicatedRecoveryBackend::_handle_push_reply( pg_shard_t peer, - const PushReplyOp &op, - PushOp *reply) + const PushReplyOp &op) { const hobject_t& soid = op.soid; logger().debug("{}, soid {}, from {}", __func__, soid, peer); @@ -838,27 +830,26 @@ seastar::future ReplicatedRecoveryBackend::_handle_push_reply( if (recovering_iter == recovering.end() || !recovering_iter->second.pushing.count(peer)) { logger().debug("huh, i wasn't pushing {} to osd.{}", soid, peer); - return seastar::make_ready_future(true); + return seastar::make_ready_future>(); } else { auto& pi = recovering_iter->second.pushing[peer]; - return [this, &pi, &soid, reply, peer, recovering_iter] { - bool error = pi.recovery_progress.error; - if (!pi.recovery_progress.data_complete && !error) { - return build_push_op(pi.recovery_info, pi.recovery_progress, - &pi.stat, reply).then([&pi] (auto new_progress) { - pi.recovery_progress = new_progress; - return seastar::make_ready_future(false); - }); - } - if (!error) - pg.get_recovery_handler()->on_peer_recover(peer, soid, pi.recovery_info); - recovering_iter->second.set_pushed(peer); - return seastar::make_ready_future(true); - }().handle_exception([recovering_iter, &pi, peer] (auto e) { - pi.recovery_progress.error = true; - recovering_iter->second.set_push_failed(peer, e); - return seastar::make_ready_future(true); - }); + bool error = pi.recovery_progress.error; + if (!pi.recovery_progress.data_complete && !error) { + return build_push_op(pi.recovery_info, pi.recovery_progress, + &pi.stat).then([&pi] (auto pop) { + pi.recovery_progress = pop.after_progress; + return seastar::make_ready_future>(std::move(pop)); + }).handle_exception([recovering_iter, &pi, peer] (auto e) { + pi.recovery_progress.error = true; + recovering_iter->second.set_push_failed(peer, e); + return seastar::make_ready_future>(); + }); + } + if (!error) { + pg.get_recovery_handler()->on_peer_recover(peer, soid, pi.recovery_info); + } + recovering_iter->second.set_pushed(peer); + return seastar::make_ready_future>(); } } @@ -869,21 +860,22 @@ seastar::future<> ReplicatedRecoveryBackend::handle_push_reply( auto from = m->from; auto& push_reply = m->replies[0]; //TODO: only one reply per message - return seastar::do_with(PushOp(), [this, &push_reply, from](auto& pop) { - return _handle_push_reply(from, push_reply, &pop).then( - [this, &pop, from](bool finished) { - if (!finished) { - auto msg = make_message(); - msg->from = pg.get_pg_whoami(); - msg->pgid = pg.get_pgid(); - msg->map_epoch = pg.get_osdmap_epoch(); - msg->min_epoch = pg.get_last_peering_reset(); - msg->set_priority(pg.get_recovery_op_priority()); - msg->pushes.push_back(pop); - return shard_services.send_to_osd(from.osd, std::move(msg), pg.get_osdmap_epoch()); - } + return _handle_push_reply(from, push_reply).then( + [this, from](std::optional push_op) { + if (push_op) { + auto msg = make_message(); + msg->from = pg.get_pg_whoami(); + msg->pgid = pg.get_pgid(); + msg->map_epoch = pg.get_osdmap_epoch(); + msg->min_epoch = pg.get_last_peering_reset(); + msg->set_priority(pg.get_recovery_op_priority()); + msg->pushes.push_back(std::move(*push_op)); + return shard_services.send_to_osd(from.osd, + std::move(msg), + pg.get_osdmap_epoch()); + } else { return seastar::make_ready_future<>(); - }); + } }); } diff --git a/src/crimson/osd/replicated_recovery_backend.h b/src/crimson/osd/replicated_recovery_backend.h index 5017558012d..8298db642d1 100644 --- a/src/crimson/osd/replicated_recovery_backend.h +++ b/src/crimson/osd/replicated_recovery_backend.h @@ -55,11 +55,10 @@ protected: eversion_t need); std::vector get_shards_to_push( const hobject_t& soid) const; - seastar::future build_push_op( + seastar::future build_push_op( const ObjectRecoveryInfo& recovery_info, const ObjectRecoveryProgress& progress, - object_stat_sum_t* stat, - PushOp* pop); + object_stat_sum_t* stat); seastar::future _handle_pull_response( pg_shard_t from, PushOp& pop, @@ -91,10 +90,9 @@ protected: const PushOp &pop, PushReplyOp *response, ceph::os::Transaction *t); - seastar::future _handle_push_reply( + seastar::future> _handle_push_reply( pg_shard_t peer, - const PushReplyOp &op, - PushOp *reply); + const PushReplyOp &op); seastar::future<> on_local_recover_persist( const hobject_t& soid, const ObjectRecoveryInfo& _recovery_info, -- 2.39.5