return build_push_op(push_info.recovery_info,
push_info.recovery_progress,
&push_info.stat).then_interruptible(
- [this, soid, pg_shard](auto pop) {
+ [this, soid, pg_shard](auto push_op) {
auto& recovery_waiter = get_recovering(soid);
auto& push_info = recovery_waiter.pushing[pg_shard];
- push_info.recovery_progress = pop.after_progress;
- return pop;
+ push_info.recovery_progress = push_op.after_progress;
+ return push_op;
});
}
recovery_info.version,
PushOp(),
[this, &recovery_info, &progress, stat]
- (auto& new_progress, auto& available, auto& v, auto& pop) {
+ (auto& new_progress, auto& available, auto& v, auto& push_op) {
return read_metadata_for_push_op(recovery_info.soid,
progress, new_progress,
- v, &pop).then_interruptible([&](eversion_t local_ver) mutable {
+ v, &push_op
+ ).then_interruptible([&](eversion_t local_ver) mutable {
// If requestor didn't know the version, use ours
if (v == eversion_t()) {
v = local_ver;
return read_omap_for_push_op(recovery_info.soid,
progress,
new_progress,
- available, &pop);
- }).then_interruptible([this, &recovery_info, &progress, &available, &pop]() mutable {
+ available, &push_op);
+ }).then_interruptible([this, &recovery_info, &progress,
+ &available, &push_op]() mutable {
logger().debug("build_push_op: available: {}, copy_subset: {}",
available, recovery_info.copy_subset);
return read_object_for_push_op(recovery_info.soid,
recovery_info.copy_subset,
progress.data_recovered_to,
- available, &pop);
- }).then_interruptible([&recovery_info, &v, &progress, &new_progress, stat, &pop]
+ available, &push_op);
+ }).then_interruptible([&recovery_info, &v, &progress,
+ &new_progress, stat, &push_op]
(uint64_t recovered_to) mutable {
new_progress.data_recovered_to = recovered_to;
if (new_progress.is_complete(recovery_info)) {
new_progress.omap_complete = false;
}
if (stat) {
- stat->num_keys_recovered += pop.omap_entries.size();
- stat->num_bytes_recovered += pop.data.length();
+ stat->num_keys_recovered += push_op.omap_entries.size();
+ stat->num_bytes_recovered += push_op.data.length();
}
- pop.version = v;
- pop.soid = recovery_info.soid;
- pop.recovery_info = recovery_info;
- pop.after_progress = new_progress;
- pop.before_progress = progress;
- logger().debug("build_push_op: pop version: {}, pop data length: {}",
- pop.version, pop.data.length());
- return seastar::make_ready_future<PushOp>(std::move(pop));
+ push_op.version = v;
+ push_op.soid = recovery_info.soid;
+ push_op.recovery_info = recovery_info;
+ push_op.after_progress = new_progress;
+ push_op.before_progress = progress;
+ logger().debug("build_push_op: push_op version:"
+ " {}, push_op data length: {}",
+ push_op.version, push_op.data.length());
+ return seastar::make_ready_future<PushOp>(std::move(push_op));
});
});
}
assert(recovery_info.clone_subset.empty());
}
return build_push_op(recovery_info, progress, 0);
- }).then_interruptible([this, from](auto pop) {
+ }).then_interruptible([this, from](auto push_op) {
auto msg = crimson::make_message<MOSDPGPush>();
msg->from = pg.get_pg_whoami();
msg->pgid = pg.get_pgid();
msg->map_epoch = pg.get_osdmap_epoch();
msg->min_epoch = pg.get_last_peering_reset();
msg->set_priority(pg.get_recovery_op_priority());
- msg->pushes.push_back(std::move(pop));
+ msg->pushes.push_back(std::move(push_op));
return shard_services.send_to_osd(from.osd, std::move(msg),
pg.get_osdmap_epoch());
});
RecoveryBackend::interruptible_future<bool>
ReplicatedRecoveryBackend::_handle_pull_response(
pg_shard_t from,
- PushOp& pop,
+ PushOp& push_op,
PullOp* response,
ceph::os::Transaction* t)
{
logger().debug("handle_pull_response {} {} data.size() is {} data_included: {}",
- pop.recovery_info, pop.after_progress, pop.data.length(), pop.data_included);
+ push_op.recovery_info, push_op.after_progress,
+ push_op.data.length(), push_op.data_included);
- const hobject_t &hoid = pop.soid;
+ const hobject_t &hoid = push_op.soid;
auto& recovery_waiter = get_recovering(hoid);
auto& pull_info = *recovery_waiter.pull_info;
if (pull_info.recovery_info.size == (uint64_t(-1))) {
- pull_info.recovery_info.size = pop.recovery_info.size;
+ pull_info.recovery_info.size = push_op.recovery_info.size;
pull_info.recovery_info.copy_subset.intersection_of(
- pop.recovery_info.copy_subset);
+ push_op.recovery_info.copy_subset);
}
// If primary doesn't have object info and didn't know version
if (pull_info.recovery_info.version == eversion_t())
- pull_info.recovery_info.version = pop.version;
+ pull_info.recovery_info.version = push_op.version;
auto prepare_waiter = interruptor::make_interruptible(
seastar::make_ready_future<>());
if (pull_info.recovery_progress.first) {
prepare_waiter = pg.obc_loader.with_obc<RWState::RWNONE>(
pull_info.recovery_info.soid,
- [&pull_info, &recovery_waiter, &pop](auto obc) {
+ [&pull_info, &recovery_waiter, &push_op](auto obc) {
pull_info.obc = obc;
recovery_waiter.obc = obc;
- obc->obs.oi.decode_no_oid(pop.attrset.at(OI_ATTR), pop.soid);
+ obc->obs.oi.decode_no_oid(push_op.attrset.at(OI_ATTR), push_op.soid);
pull_info.recovery_info.oi = obc->obs.oi;
return crimson::osd::PG::load_obc_ertr::now();
}).handle_error_interruptible(crimson::ct_error::assert_all{});
};
return prepare_waiter.then_interruptible(
- [this, &pull_info, &pop, t, response]() mutable {
+ [this, &pull_info, &push_op, t, response]() mutable {
const bool first = pull_info.recovery_progress.first;
- pull_info.recovery_progress = pop.after_progress;
+ pull_info.recovery_progress = push_op.after_progress;
logger().debug("new recovery_info {}, new progress {}",
pull_info.recovery_info, pull_info.recovery_progress);
interval_set<uint64_t> data_zeros;
{
- uint64_t offset = pop.before_progress.data_recovered_to;
- uint64_t length = (pop.after_progress.data_recovered_to -
- pop.before_progress.data_recovered_to);
+ uint64_t offset = push_op.before_progress.data_recovered_to;
+ uint64_t length = (push_op.after_progress.data_recovered_to -
+ push_op.before_progress.data_recovered_to);
if (length) {
data_zeros.insert(offset, length);
}
}
auto [usable_intervals, data] =
trim_pushed_data(pull_info.recovery_info.copy_subset,
- pop.data_included, pop.data);
+ push_op.data_included, push_op.data);
bool complete = pull_info.is_complete();
- bool clear_omap = !pop.before_progress.omap_complete;
+ bool clear_omap = !push_op.before_progress.omap_complete;
return submit_push_data(pull_info.recovery_info,
first, complete, clear_omap,
std::move(data_zeros), std::move(usable_intervals),
- std::move(data), std::move(pop.omap_header),
- pop.attrset, std::move(pop.omap_entries), t)
+ std::move(data), std::move(push_op.omap_header),
+ push_op.attrset, std::move(push_op.omap_entries), t)
.then_interruptible(
- [this, response, &pull_info, &pop, complete,
+ [this, response, &pull_info, &push_op, complete,
t, bytes_recovered=data.length()] {
- pull_info.stat.num_keys_recovered += pop.omap_entries.size();
+ pull_info.stat.num_keys_recovered += push_op.omap_entries.size();
pull_info.stat.num_bytes_recovered += bytes_recovered;
if (complete) {
pull_info.stat.num_objects_recovered++;
pg.get_recovery_handler()->on_local_recover(
- pop.soid, get_recovering(pop.soid).pull_info->recovery_info,
+ push_op.soid, get_recovering(push_op.soid).pull_info->recovery_info,
false, *t);
return true;
} else {
- response->soid = pop.soid;
+ response->soid = push_op.soid;
response->recovery_info = pull_info.recovery_info;
response->recovery_progress = pull_info.recovery_progress;
return false;
logger().debug("{}: discarding {}", __func__, *m);
return seastar::now();
}
- const PushOp& pop = m->pushes[0]; //TODO: only one push per message for now.
- if (pop.version == eversion_t()) {
+ const PushOp& push_op = m->pushes[0]; //TODO: only one push per message for now.
+ if (push_op.version == eversion_t()) {
// replica doesn't have it!
- pg.get_recovery_handler()->on_failed_recover({ m->from }, pop.soid,
- get_recovering(pop.soid).pull_info->recovery_info.version);
+ pg.get_recovery_handler()->on_failed_recover({ m->from }, push_op.soid,
+ get_recovering(push_op.soid).pull_info->recovery_info.version);
return seastar::make_exception_future<>(
std::runtime_error(fmt::format(
"Error on pushing side {} when pulling obj {}",
- m->from, pop.soid)));
+ m->from, push_op.soid)));
}
logger().debug("{}: {}", __func__, *m);
return seastar::do_with(ceph::os::Transaction(), m.get(),
[this, &response](auto& t, auto& m) {
pg_shard_t from = m->from;
- PushOp& pop = m->pushes[0]; // only one push per message for now
- return _handle_pull_response(from, pop, &response, &t).then_interruptible(
+ PushOp& push_op = m->pushes[0]; // only one push per message for now
+ return _handle_pull_response(from, push_op, &response, &t
+ ).then_interruptible(
[this, &t](bool complete) {
epoch_t epoch_frozen = pg.get_osdmap_epoch();
logger().debug("ReplicatedRecoveryBackend::handle_pull_response: do_transaction...");
});
}).then_interruptible([this, m, &response](bool complete) {
if (complete) {
- auto& pop = m->pushes[0];
- get_recovering(pop.soid).set_pulled();
+ auto& push_op = m->pushes[0];
+ get_recovering(push_op.soid).set_pulled();
return seastar::make_ready_future<>();
} else {
auto reply = crimson::make_message<MOSDPGPull>();
RecoveryBackend::interruptible_future<>
ReplicatedRecoveryBackend::_handle_push(
pg_shard_t from,
- PushOp &pop,
+ PushOp &push_op,
PushReplyOp *response,
ceph::os::Transaction *t)
{
logger().debug("{}", __func__);
- bool first = pop.before_progress.first;
+ bool first = push_op.before_progress.first;
interval_set<uint64_t> data_zeros;
{
- uint64_t offset = pop.before_progress.data_recovered_to;
- uint64_t length = (pop.after_progress.data_recovered_to -
- pop.before_progress.data_recovered_to);
+ uint64_t offset = push_op.before_progress.data_recovered_to;
+ uint64_t length = (push_op.after_progress.data_recovered_to -
+ push_op.before_progress.data_recovered_to);
if (length) {
data_zeros.insert(offset, length);
}
}
- bool complete = (pop.after_progress.data_complete &&
- pop.after_progress.omap_complete);
- bool clear_omap = !pop.before_progress.omap_complete;
- response->soid = pop.recovery_info.soid;
+ bool complete = (push_op.after_progress.data_complete &&
+ push_op.after_progress.omap_complete);
+ bool clear_omap = !push_op.before_progress.omap_complete;
+ response->soid = push_op.recovery_info.soid;
- return submit_push_data(pop.recovery_info, first, complete, clear_omap,
- std::move(data_zeros), std::move(pop.data_included),
- std::move(pop.data), std::move(pop.omap_header),
- pop.attrset, std::move(pop.omap_entries), t)
+ return submit_push_data(push_op.recovery_info, first, complete, clear_omap,
+ std::move(data_zeros),
+ std::move(push_op.data_included),
+ std::move(push_op.data),
+ std::move(push_op.omap_header),
+ push_op.attrset,
+ std::move(push_op.omap_entries), t)
.then_interruptible(
- [this, complete, &pop, t] {
+ [this, complete, &push_op, t] {
if (complete) {
pg.get_recovery_handler()->on_local_recover(
- pop.recovery_info.soid, pop.recovery_info,
+ push_op.recovery_info.soid, push_op.recovery_info,
false, *t);
}
});
logger().debug("{}: {}", __func__, *m);
return seastar::do_with(PushReplyOp(), [this, m](auto& response) {
- PushOp& pop = m->pushes[0]; // TODO: only one push per message for now
+ PushOp& push_op = m->pushes[0]; // TODO: only one push per message for now
return seastar::do_with(ceph::os::Transaction(),
- [this, m, &pop, &response](auto& t) {
- return _handle_push(m->from, pop, &response, &t).then_interruptible(
+ [this, m, &push_op, &response](auto& t) {
+ return _handle_push(m->from, push_op, &response, &t).then_interruptible(
[this, &t] {
epoch_t epoch_frozen = pg.get_osdmap_epoch();
logger().debug("ReplicatedRecoveryBackend::handle_push: do_transaction...");
if (!push_info.recovery_progress.data_complete && !error) {
return build_push_op(push_info.recovery_info, push_info.recovery_progress,
&push_info.stat
- ).then_interruptible([&push_info] (auto pop) {
- push_info.recovery_progress = pop.after_progress;
- return seastar::make_ready_future<std::optional<PushOp>>(std::move(pop));
+ ).then_interruptible([&push_info] (auto push_op) {
+ push_info.recovery_progress = push_op.after_progress;
+ return seastar::make_ready_future<std::optional<PushOp>>(
+ std::move(push_op));
}).handle_exception_interruptible(
[recovering_iter, &push_info, peer] (auto e) {
push_info.recovery_progress.error = true;