HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS);
return build_push_op(pi.recovery_info, pi.recovery_progress,
- &pi.stat, &pops[pg_shard]).then(
- [this, soid, pg_shard](auto new_progress) {
+ &pi.stat).then(
+ [this, soid, pg_shard, &pops](auto pop) {
auto& recovery_waiter = recovering.at(soid);
auto& pi = recovery_waiter.pushing[pg_shard];
- pi.recovery_progress = new_progress;
+ pi.recovery_progress = pop.after_progress;
+ pops[pg_shard] = std::move(pop);
return seastar::make_ready_future<>();
});
}).then([&pops]() mutable {
pi.recovery_progress = po.recovery_progress;
}
-seastar::future<ObjectRecoveryProgress> ReplicatedRecoveryBackend::build_push_op(
+seastar::future<PushOp> ReplicatedRecoveryBackend::build_push_op(
const ObjectRecoveryInfo& recovery_info,
const ObjectRecoveryProgress& progress,
- object_stat_sum_t* stat,
- PushOp* pop
- ) {
+ object_stat_sum_t* stat)
+{
logger().debug("{} {} @{}",
__func__, recovery_info.soid, recovery_info.version);
return seastar::do_with(ObjectRecoveryProgress(progress),
uint64_t(crimson::common::local_conf()
->osd_recovery_max_chunk),
eversion_t(),
- [this, &recovery_info, &progress, stat, pop]
- (auto& new_progress, auto& oi, auto& available, auto& v) {
- return [this, &recovery_info, &progress, &new_progress, &oi, pop, &v] {
+ PushOp(),
+ [this, &recovery_info, &progress, stat]
+ (auto& new_progress, auto& oi, auto& available, auto& v, auto& pop) {
+ return [this, &recovery_info, &progress, &new_progress, &oi, &v, pop=&pop] {
v = recovery_info.version;
if (progress.first) {
return backend->omap_get_header(coll, ghobject_t(recovery_info.soid))
);
}
return seastar::make_ready_future<>();
- }().then([this, &recovery_info, &progress, &new_progress, &available, pop] {
+ }().then([this, &recovery_info, &progress, &new_progress, &available, &pop]() mutable {
return read_omap_for_push_op(recovery_info.soid,
progress,
new_progress,
- available, pop);
- }).then([this, &recovery_info, &progress, &available, pop] {
+ available, &pop);
+ }).then([this, &recovery_info, &progress, &available, &pop]() mutable {
logger().debug("build_push_op: available: {}, copy_subset: {}",
available, recovery_info.copy_subset);
return read_object_for_push_op(recovery_info.soid,
recovery_info.copy_subset,
progress.data_recovered_to,
- available, pop);
- }).then([&recovery_info, &v, &progress, &new_progress, stat, pop]
- (uint64_t recovered_to) {
+ available, &pop);
+ }).then([&recovery_info, &v, &progress, &new_progress, stat, &pop]
+ (uint64_t recovered_to) mutable {
new_progress.data_recovered_to = recovered_to;
if (new_progress.is_complete(recovery_info)) {
new_progress.data_complete = true;
new_progress.omap_complete = false;
}
if (stat) {
- stat->num_keys_recovered += pop->omap_entries.size();
- stat->num_bytes_recovered += pop->data.length();
+ stat->num_keys_recovered += pop.omap_entries.size();
+ stat->num_bytes_recovered += pop.data.length();
}
- pop->version = v;
- pop->soid = recovery_info.soid;
- pop->recovery_info = recovery_info;
- pop->after_progress = new_progress;
- pop->before_progress = progress;
+ pop.version = v;
+ pop.soid = recovery_info.soid;
+ pop.recovery_info = recovery_info;
+ pop.after_progress = new_progress;
+ pop.before_progress = progress;
logger().debug("build_push_op: pop version: {}, pop data length: {}",
- pop->version, pop->data.length());
- return seastar::make_ready_future<ObjectRecoveryProgress>
- (std::move(new_progress));
+ pop.version, pop.data.length());
+ return seastar::make_ready_future<PushOp>(std::move(pop));
});
});
}
[this, m, from = m->from](auto& pulls) {
return seastar::parallel_for_each(pulls, [this, m, from](auto& pull_op) {
const hobject_t& soid = pull_op.soid;
- return seastar::do_with(PushOp(),
- [this, &soid, &pull_op, from](auto& pop) {
- logger().debug("handle_pull: {}", soid);
- return backend->stat(coll, ghobject_t(soid)).then(
- [this, &pull_op, &pop](auto st) {
- ObjectRecoveryInfo &recovery_info = pull_op.recovery_info;
- ObjectRecoveryProgress &progress = pull_op.recovery_progress;
- if (progress.first && recovery_info.size == ((uint64_t) -1)) {
- // Adjust size and copy_subset
- recovery_info.size = st.st_size;
- if (st.st_size) {
- interval_set<uint64_t> object_range;
- object_range.insert(0, st.st_size);
- recovery_info.copy_subset.intersection_of(object_range);
- } else {
- recovery_info.copy_subset.clear();
- }
- assert(recovery_info.clone_subset.empty());
- }
- return build_push_op(recovery_info, progress, 0, &pop);
- }).handle_exception([soid, &pop](auto e) {
- pop.recovery_info.version = eversion_t();
- pop.version = eversion_t();
- pop.soid = soid;
- return seastar::make_ready_future<ObjectRecoveryProgress>();
- }).then([this, &pop, from](auto new_progress) {
- auto msg = make_message<MOSDPGPush>();
- msg->from = pg.get_pg_whoami();
- msg->pgid = pg.get_pgid();
- msg->map_epoch = pg.get_osdmap_epoch();
- msg->min_epoch = pg.get_last_peering_reset();
- msg->set_priority(pg.get_recovery_op_priority());
- msg->pushes.push_back(pop);
- return shard_services.send_to_osd(from.osd, std::move(msg),
- pg.get_osdmap_epoch());
- });
+ logger().debug("handle_pull: {}", soid);
+ return backend->stat(coll, ghobject_t(soid)).then(
+ [this, &pull_op](auto st) {
+ ObjectRecoveryInfo &recovery_info = pull_op.recovery_info;
+ ObjectRecoveryProgress &progress = pull_op.recovery_progress;
+ if (progress.first && recovery_info.size == ((uint64_t) -1)) {
+ // Adjust size and copy_subset
+ recovery_info.size = st.st_size;
+ if (st.st_size) {
+ interval_set<uint64_t> object_range;
+ object_range.insert(0, st.st_size);
+ recovery_info.copy_subset.intersection_of(object_range);
+ } else {
+ recovery_info.copy_subset.clear();
+ }
+ assert(recovery_info.clone_subset.empty());
+ }
+ return build_push_op(recovery_info, progress, 0);
+ }).then([this, from](auto pop) {
+ auto msg = make_message<MOSDPGPush>();
+ msg->from = pg.get_pg_whoami();
+ msg->pgid = pg.get_pgid();
+ msg->map_epoch = pg.get_osdmap_epoch();
+ msg->min_epoch = pg.get_last_peering_reset();
+ msg->set_priority(pg.get_recovery_op_priority());
+ msg->pushes.push_back(std::move(pop));
+ return shard_services.send_to_osd(from.osd, std::move(msg),
+ pg.get_osdmap_epoch());
});
});
});
});
}
-seastar::future<bool> ReplicatedRecoveryBackend::_handle_push_reply(
+seastar::future<std::optional<PushOp>>
+ReplicatedRecoveryBackend::_handle_push_reply(
pg_shard_t peer,
- const PushReplyOp &op,
- PushOp *reply)
+ const PushReplyOp &op)
{
const hobject_t& soid = op.soid;
logger().debug("{}, soid {}, from {}", __func__, soid, peer);
if (recovering_iter == recovering.end()
|| !recovering_iter->second.pushing.count(peer)) {
logger().debug("huh, i wasn't pushing {} to osd.{}", soid, peer);
- return seastar::make_ready_future<bool>(true);
+ return seastar::make_ready_future<std::optional<PushOp>>();
} else {
auto& pi = recovering_iter->second.pushing[peer];
- return [this, &pi, &soid, reply, peer, recovering_iter] {
- bool error = pi.recovery_progress.error;
- if (!pi.recovery_progress.data_complete && !error) {
- return build_push_op(pi.recovery_info, pi.recovery_progress,
- &pi.stat, reply).then([&pi] (auto new_progress) {
- pi.recovery_progress = new_progress;
- return seastar::make_ready_future<bool>(false);
- });
- }
- if (!error)
- pg.get_recovery_handler()->on_peer_recover(peer, soid, pi.recovery_info);
- recovering_iter->second.set_pushed(peer);
- return seastar::make_ready_future<bool>(true);
- }().handle_exception([recovering_iter, &pi, peer] (auto e) {
- pi.recovery_progress.error = true;
- recovering_iter->second.set_push_failed(peer, e);
- return seastar::make_ready_future<bool>(true);
- });
+ bool error = pi.recovery_progress.error;
+ if (!pi.recovery_progress.data_complete && !error) {
+ return build_push_op(pi.recovery_info, pi.recovery_progress,
+ &pi.stat).then([&pi] (auto pop) {
+ pi.recovery_progress = pop.after_progress;
+ return seastar::make_ready_future<std::optional<PushOp>>(std::move(pop));
+ }).handle_exception([recovering_iter, &pi, peer] (auto e) {
+ pi.recovery_progress.error = true;
+ recovering_iter->second.set_push_failed(peer, e);
+ return seastar::make_ready_future<std::optional<PushOp>>();
+ });
+ }
+ if (!error) {
+ pg.get_recovery_handler()->on_peer_recover(peer, soid, pi.recovery_info);
+ }
+ recovering_iter->second.set_pushed(peer);
+ return seastar::make_ready_future<std::optional<PushOp>>();
}
}
auto from = m->from;
auto& push_reply = m->replies[0]; //TODO: only one reply per message
- return seastar::do_with(PushOp(), [this, &push_reply, from](auto& pop) {
- return _handle_push_reply(from, push_reply, &pop).then(
- [this, &pop, from](bool finished) {
- if (!finished) {
- auto msg = make_message<MOSDPGPush>();
- msg->from = pg.get_pg_whoami();
- msg->pgid = pg.get_pgid();
- msg->map_epoch = pg.get_osdmap_epoch();
- msg->min_epoch = pg.get_last_peering_reset();
- msg->set_priority(pg.get_recovery_op_priority());
- msg->pushes.push_back(pop);
- return shard_services.send_to_osd(from.osd, std::move(msg), pg.get_osdmap_epoch());
- }
+ return _handle_push_reply(from, push_reply).then(
+ [this, from](std::optional<PushOp> push_op) {
+ if (push_op) {
+ auto msg = make_message<MOSDPGPush>();
+ msg->from = pg.get_pg_whoami();
+ msg->pgid = pg.get_pgid();
+ msg->map_epoch = pg.get_osdmap_epoch();
+ msg->min_epoch = pg.get_last_peering_reset();
+ msg->set_priority(pg.get_recovery_op_priority());
+ msg->pushes.push_back(std::move(*push_op));
+ return shard_services.send_to_osd(from.osd,
+ std::move(msg),
+ pg.get_osdmap_epoch());
+ } else {
return seastar::make_ready_future<>();
- });
+ }
});
}