From: Matan Breizman Date: Sun, 25 Dec 2022 12:34:51 +0000 (+0000) Subject: crimoson/osd/replicated_recovery_backend: Rename pop to push_op X-Git-Tag: v18.1.0~540^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=91309fb5df75f8962912a92be72ccd06c02b5ede;p=ceph.git crimoson/osd/replicated_recovery_backend: Rename pop to push_op Signed-off-by: Matan Breizman --- diff --git a/src/crimson/osd/replicated_recovery_backend.cc b/src/crimson/osd/replicated_recovery_backend.cc index 114e1efd7dab2..b2e3dfbd63826 100644 --- a/src/crimson/osd/replicated_recovery_backend.cc +++ b/src/crimson/osd/replicated_recovery_backend.cc @@ -332,11 +332,11 @@ ReplicatedRecoveryBackend::prep_push( return build_push_op(push_info.recovery_info, push_info.recovery_progress, &push_info.stat).then_interruptible( - [this, soid, pg_shard](auto pop) { + [this, soid, pg_shard](auto push_op) { auto& recovery_waiter = get_recovering(soid); auto& push_info = recovery_waiter.pushing[pg_shard]; - push_info.recovery_progress = pop.after_progress; - return pop; + push_info.recovery_progress = push_op.after_progress; + return push_op; }); } @@ -386,10 +386,11 @@ ReplicatedRecoveryBackend::build_push_op( recovery_info.version, PushOp(), [this, &recovery_info, &progress, stat] - (auto& new_progress, auto& available, auto& v, auto& pop) { + (auto& new_progress, auto& available, auto& v, auto& push_op) { return read_metadata_for_push_op(recovery_info.soid, progress, new_progress, - v, &pop).then_interruptible([&](eversion_t local_ver) mutable { + v, &push_op + ).then_interruptible([&](eversion_t local_ver) mutable { // If requestor didn't know the version, use ours if (v == eversion_t()) { v = local_ver; @@ -401,15 +402,17 @@ ReplicatedRecoveryBackend::build_push_op( return read_omap_for_push_op(recovery_info.soid, progress, new_progress, - available, &pop); - }).then_interruptible([this, &recovery_info, &progress, &available, &pop]() mutable { + available, &push_op); + }).then_interruptible([this, &recovery_info, &progress, + &available, &push_op]() mutable { logger().debug("build_push_op: available: {}, copy_subset: {}", available, recovery_info.copy_subset); return read_object_for_push_op(recovery_info.soid, recovery_info.copy_subset, progress.data_recovered_to, - available, &pop); - }).then_interruptible([&recovery_info, &v, &progress, &new_progress, stat, &pop] + available, &push_op); + }).then_interruptible([&recovery_info, &v, &progress, + &new_progress, stat, &push_op] (uint64_t recovered_to) mutable { new_progress.data_recovered_to = recovered_to; if (new_progress.is_complete(recovery_info)) { @@ -422,17 +425,18 @@ ReplicatedRecoveryBackend::build_push_op( new_progress.omap_complete = false; } if (stat) { - stat->num_keys_recovered += pop.omap_entries.size(); - stat->num_bytes_recovered += pop.data.length(); + stat->num_keys_recovered += push_op.omap_entries.size(); + stat->num_bytes_recovered += push_op.data.length(); } - pop.version = v; - pop.soid = recovery_info.soid; - pop.recovery_info = recovery_info; - pop.after_progress = new_progress; - pop.before_progress = progress; - logger().debug("build_push_op: pop version: {}, pop data length: {}", - pop.version, pop.data.length()); - return seastar::make_ready_future(std::move(pop)); + push_op.version = v; + push_op.soid = recovery_info.soid; + push_op.recovery_info = recovery_info; + push_op.after_progress = new_progress; + push_op.before_progress = progress; + logger().debug("build_push_op: push_op version:" + " {}, push_op data length: {}", + push_op.version, push_op.data.length()); + return seastar::make_ready_future(std::move(push_op)); }); }); } @@ -641,14 +645,14 @@ ReplicatedRecoveryBackend::handle_pull(Ref m) assert(recovery_info.clone_subset.empty()); } return build_push_op(recovery_info, progress, 0); - }).then_interruptible([this, from](auto pop) { + }).then_interruptible([this, from](auto push_op) { auto msg = crimson::make_message(); msg->from = pg.get_pg_whoami(); msg->pgid = pg.get_pgid(); msg->map_epoch = pg.get_osdmap_epoch(); msg->min_epoch = pg.get_last_peering_reset(); msg->set_priority(pg.get_recovery_op_priority()); - msg->pushes.push_back(std::move(pop)); + msg->pushes.push_back(std::move(push_op)); return shard_services.send_to_osd(from.osd, std::move(msg), pg.get_osdmap_epoch()); }); @@ -659,78 +663,79 @@ ReplicatedRecoveryBackend::handle_pull(Ref m) RecoveryBackend::interruptible_future ReplicatedRecoveryBackend::_handle_pull_response( pg_shard_t from, - PushOp& pop, + PushOp& push_op, PullOp* response, ceph::os::Transaction* t) { logger().debug("handle_pull_response {} {} data.size() is {} data_included: {}", - pop.recovery_info, pop.after_progress, pop.data.length(), pop.data_included); + push_op.recovery_info, push_op.after_progress, + push_op.data.length(), push_op.data_included); - const hobject_t &hoid = pop.soid; + const hobject_t &hoid = push_op.soid; auto& recovery_waiter = get_recovering(hoid); auto& pull_info = *recovery_waiter.pull_info; if (pull_info.recovery_info.size == (uint64_t(-1))) { - pull_info.recovery_info.size = pop.recovery_info.size; + pull_info.recovery_info.size = push_op.recovery_info.size; pull_info.recovery_info.copy_subset.intersection_of( - pop.recovery_info.copy_subset); + push_op.recovery_info.copy_subset); } // If primary doesn't have object info and didn't know version if (pull_info.recovery_info.version == eversion_t()) - pull_info.recovery_info.version = pop.version; + pull_info.recovery_info.version = push_op.version; auto prepare_waiter = interruptor::make_interruptible( seastar::make_ready_future<>()); if (pull_info.recovery_progress.first) { prepare_waiter = pg.obc_loader.with_obc( pull_info.recovery_info.soid, - [&pull_info, &recovery_waiter, &pop](auto obc) { + [&pull_info, &recovery_waiter, &push_op](auto obc) { pull_info.obc = obc; recovery_waiter.obc = obc; - obc->obs.oi.decode_no_oid(pop.attrset.at(OI_ATTR), pop.soid); + obc->obs.oi.decode_no_oid(push_op.attrset.at(OI_ATTR), push_op.soid); pull_info.recovery_info.oi = obc->obs.oi; return crimson::osd::PG::load_obc_ertr::now(); }).handle_error_interruptible(crimson::ct_error::assert_all{}); }; return prepare_waiter.then_interruptible( - [this, &pull_info, &pop, t, response]() mutable { + [this, &pull_info, &push_op, t, response]() mutable { const bool first = pull_info.recovery_progress.first; - pull_info.recovery_progress = pop.after_progress; + pull_info.recovery_progress = push_op.after_progress; logger().debug("new recovery_info {}, new progress {}", pull_info.recovery_info, pull_info.recovery_progress); interval_set data_zeros; { - uint64_t offset = pop.before_progress.data_recovered_to; - uint64_t length = (pop.after_progress.data_recovered_to - - pop.before_progress.data_recovered_to); + uint64_t offset = push_op.before_progress.data_recovered_to; + uint64_t length = (push_op.after_progress.data_recovered_to - + push_op.before_progress.data_recovered_to); if (length) { data_zeros.insert(offset, length); } } auto [usable_intervals, data] = trim_pushed_data(pull_info.recovery_info.copy_subset, - pop.data_included, pop.data); + push_op.data_included, push_op.data); bool complete = pull_info.is_complete(); - bool clear_omap = !pop.before_progress.omap_complete; + bool clear_omap = !push_op.before_progress.omap_complete; return submit_push_data(pull_info.recovery_info, first, complete, clear_omap, std::move(data_zeros), std::move(usable_intervals), - std::move(data), std::move(pop.omap_header), - pop.attrset, std::move(pop.omap_entries), t) + std::move(data), std::move(push_op.omap_header), + push_op.attrset, std::move(push_op.omap_entries), t) .then_interruptible( - [this, response, &pull_info, &pop, complete, + [this, response, &pull_info, &push_op, complete, t, bytes_recovered=data.length()] { - pull_info.stat.num_keys_recovered += pop.omap_entries.size(); + pull_info.stat.num_keys_recovered += push_op.omap_entries.size(); pull_info.stat.num_bytes_recovered += bytes_recovered; if (complete) { pull_info.stat.num_objects_recovered++; pg.get_recovery_handler()->on_local_recover( - pop.soid, get_recovering(pop.soid).pull_info->recovery_info, + push_op.soid, get_recovering(push_op.soid).pull_info->recovery_info, false, *t); return true; } else { - response->soid = pop.soid; + response->soid = push_op.soid; response->recovery_info = pull_info.recovery_info; response->recovery_progress = pull_info.recovery_progress; return false; @@ -747,15 +752,15 @@ ReplicatedRecoveryBackend::handle_pull_response( logger().debug("{}: discarding {}", __func__, *m); return seastar::now(); } - const PushOp& pop = m->pushes[0]; //TODO: only one push per message for now. - if (pop.version == eversion_t()) { + const PushOp& push_op = m->pushes[0]; //TODO: only one push per message for now. + if (push_op.version == eversion_t()) { // replica doesn't have it! - pg.get_recovery_handler()->on_failed_recover({ m->from }, pop.soid, - get_recovering(pop.soid).pull_info->recovery_info.version); + pg.get_recovery_handler()->on_failed_recover({ m->from }, push_op.soid, + get_recovering(push_op.soid).pull_info->recovery_info.version); return seastar::make_exception_future<>( std::runtime_error(fmt::format( "Error on pushing side {} when pulling obj {}", - m->from, pop.soid))); + m->from, push_op.soid))); } logger().debug("{}: {}", __func__, *m); @@ -763,8 +768,9 @@ ReplicatedRecoveryBackend::handle_pull_response( return seastar::do_with(ceph::os::Transaction(), m.get(), [this, &response](auto& t, auto& m) { pg_shard_t from = m->from; - PushOp& pop = m->pushes[0]; // only one push per message for now - return _handle_pull_response(from, pop, &response, &t).then_interruptible( + PushOp& push_op = m->pushes[0]; // only one push per message for now + return _handle_pull_response(from, push_op, &response, &t + ).then_interruptible( [this, &t](bool complete) { epoch_t epoch_frozen = pg.get_osdmap_epoch(); logger().debug("ReplicatedRecoveryBackend::handle_pull_response: do_transaction..."); @@ -777,8 +783,8 @@ ReplicatedRecoveryBackend::handle_pull_response( }); }).then_interruptible([this, m, &response](bool complete) { if (complete) { - auto& pop = m->pushes[0]; - get_recovering(pop.soid).set_pulled(); + auto& push_op = m->pushes[0]; + get_recovering(push_op.soid).set_pulled(); return seastar::make_ready_future<>(); } else { auto reply = crimson::make_message(); @@ -797,36 +803,39 @@ ReplicatedRecoveryBackend::handle_pull_response( RecoveryBackend::interruptible_future<> ReplicatedRecoveryBackend::_handle_push( pg_shard_t from, - PushOp &pop, + PushOp &push_op, PushReplyOp *response, ceph::os::Transaction *t) { logger().debug("{}", __func__); - bool first = pop.before_progress.first; + bool first = push_op.before_progress.first; interval_set data_zeros; { - uint64_t offset = pop.before_progress.data_recovered_to; - uint64_t length = (pop.after_progress.data_recovered_to - - pop.before_progress.data_recovered_to); + uint64_t offset = push_op.before_progress.data_recovered_to; + uint64_t length = (push_op.after_progress.data_recovered_to - + push_op.before_progress.data_recovered_to); if (length) { data_zeros.insert(offset, length); } } - bool complete = (pop.after_progress.data_complete && - pop.after_progress.omap_complete); - bool clear_omap = !pop.before_progress.omap_complete; - response->soid = pop.recovery_info.soid; + bool complete = (push_op.after_progress.data_complete && + push_op.after_progress.omap_complete); + bool clear_omap = !push_op.before_progress.omap_complete; + response->soid = push_op.recovery_info.soid; - return submit_push_data(pop.recovery_info, first, complete, clear_omap, - std::move(data_zeros), std::move(pop.data_included), - std::move(pop.data), std::move(pop.omap_header), - pop.attrset, std::move(pop.omap_entries), t) + return submit_push_data(push_op.recovery_info, first, complete, clear_omap, + std::move(data_zeros), + std::move(push_op.data_included), + std::move(push_op.data), + std::move(push_op.omap_header), + push_op.attrset, + std::move(push_op.omap_entries), t) .then_interruptible( - [this, complete, &pop, t] { + [this, complete, &push_op, t] { if (complete) { pg.get_recovery_handler()->on_local_recover( - pop.recovery_info.soid, pop.recovery_info, + push_op.recovery_info.soid, push_op.recovery_info, false, *t); } }); @@ -846,10 +855,10 @@ ReplicatedRecoveryBackend::handle_push( logger().debug("{}: {}", __func__, *m); return seastar::do_with(PushReplyOp(), [this, m](auto& response) { - PushOp& pop = m->pushes[0]; // TODO: only one push per message for now + PushOp& push_op = m->pushes[0]; // TODO: only one push per message for now return seastar::do_with(ceph::os::Transaction(), - [this, m, &pop, &response](auto& t) { - return _handle_push(m->from, pop, &response, &t).then_interruptible( + [this, m, &push_op, &response](auto& t) { + return _handle_push(m->from, push_op, &response, &t).then_interruptible( [this, &t] { epoch_t epoch_frozen = pg.get_osdmap_epoch(); logger().debug("ReplicatedRecoveryBackend::handle_push: do_transaction..."); @@ -893,9 +902,10 @@ ReplicatedRecoveryBackend::_handle_push_reply( if (!push_info.recovery_progress.data_complete && !error) { return build_push_op(push_info.recovery_info, push_info.recovery_progress, &push_info.stat - ).then_interruptible([&push_info] (auto pop) { - push_info.recovery_progress = pop.after_progress; - return seastar::make_ready_future>(std::move(pop)); + ).then_interruptible([&push_info] (auto push_op) { + push_info.recovery_progress = push_op.after_progress; + return seastar::make_ready_future>( + std::move(push_op)); }).handle_exception_interruptible( [recovering_iter, &push_info, peer] (auto e) { push_info.recovery_progress.error = true; diff --git a/src/crimson/osd/replicated_recovery_backend.h b/src/crimson/osd/replicated_recovery_backend.h index e58e887d48ac0..1ada19b18a079 100644 --- a/src/crimson/osd/replicated_recovery_backend.h +++ b/src/crimson/osd/replicated_recovery_backend.h @@ -66,7 +66,7 @@ protected: /// recovery @c pop.soid interruptible_future _handle_pull_response( pg_shard_t from, - PushOp& pop, + PushOp& push_op, PullOp* response, ceph::os::Transaction* t); std::pair, ceph::bufferlist> trim_pushed_data( @@ -90,7 +90,7 @@ protected: ObjectStore::Transaction *t); interruptible_future<> _handle_push( pg_shard_t from, - PushOp& pop, + PushOp& push_op, PushReplyOp *response, ceph::os::Transaction *t); interruptible_future> _handle_push_reply(