assert(is_recovering(soid));
// start tracking the recovery of soid
return maybe_pull_missing_obj(soid, need).then([this, soid, need] {
- return maybe_push_shards(soid, need);
+ auto& recovery_waiter = recovering.at(soid);
+ if (recovery_waiter.obc) {
+ return maybe_push_shards(soid, need);
+ } else {
+ logger().debug("recover_object: loading obc: {}", soid);
+ return pg.with_head_obc<RWState::RWREAD>(soid,
+ [this, soid, need, &recovery_waiter](auto obc) {
+ logger().debug("recover_object: loaded obc: {}", obc->obs.oi.soid);
+ recovery_waiter.obc = obc;
+ recovery_waiter.obc->wait_recovery_read();
+ return maybe_push_shards(soid, need);
+ }).handle_error(
+ crimson::osd::PG::load_obc_ertr::all_same_way([soid](auto& code) {
+ // TODO: may need eio handling?
+ logger().error("recover_object saw error code {}, ignoring object {}",
+ code, soid);
+ }));
+ }
});
}
const hobject_t& soid,
eversion_t need)
{
- auto push_func = [this, soid, need] {
- auto prepare_pops = seastar::now();
- if (auto shards = get_shards_to_push(soid);
- !shards.empty()) {
- prepare_pops = seastar::parallel_for_each(std::move(shards),
- [this, need, soid](auto shard) {
- return prep_push(soid, need, shard).then([this, soid, shard](auto push) {
- auto msg = make_message<MOSDPGPush>();
- msg->from = pg.get_pg_whoami();
- msg->pgid = pg.get_pgid();
- msg->map_epoch = pg.get_osdmap_epoch();
- msg->min_epoch = pg.get_last_peering_reset();
- msg->pushes.push_back(std::move(push));
- msg->set_priority(pg.get_recovery_op_priority());
- return shard_services.send_to_osd(shard.osd,
- std::move(msg),
- pg.get_osdmap_epoch()).then(
- [this, soid, shard] {
- return recovering.at(soid).wait_for_pushes(shard);
- });
- });
- });
- }
- return prepare_pops.then([this, soid] {
- auto &recovery = recovering.at(soid);
- auto push_info = recovery.pushing.begin();
- object_stat_sum_t stat = {};
- if (push_info != recovery.pushing.end()) {
- stat = push_info->second.stat;
- } else {
- // no push happened, take pull_info's stat
- assert(recovery.pi);
- stat = recovery.pi->stat;
- }
- pg.get_recovery_handler()->on_global_recover(soid, stat, false);
- return seastar::make_ready_future<>();
- }).handle_exception([this, soid](auto e) {
- auto &recovery = recovering.at(soid);
- if (recovery.obc) {
- recovery.obc->drop_recovery_read();
- }
- recovering.erase(soid);
- return seastar::make_exception_future<>(e);
+ auto prepare_pops = seastar::now();
+ if (auto shards = get_shards_to_push(soid); !shards.empty()) {
+ prepare_pops = seastar::parallel_for_each(std::move(shards),
+ [this, need, soid](auto shard) {
+ return prep_push(soid, need, shard).then([this, soid, shard](auto push) {
+ auto msg = make_message<MOSDPGPush>();
+ msg->from = pg.get_pg_whoami();
+ msg->pgid = pg.get_pgid();
+ msg->map_epoch = pg.get_osdmap_epoch();
+ msg->min_epoch = pg.get_last_peering_reset();
+ msg->pushes.push_back(std::move(push));
+ msg->set_priority(pg.get_recovery_op_priority());
+ return shard_services.send_to_osd(shard.osd,
+ std::move(msg),
+ pg.get_osdmap_epoch()).then(
+ [this, soid, shard] {
+ return recovering.at(soid).wait_for_pushes(shard);
+ });
+ });
});
- }; // push_func
-
- auto& recovery_waiter = recovering.at(soid);
- if (recovery_waiter.obc) {
- return seastar::futurize_invoke(std::move(push_func));
}
- logger().debug("recover_object: loading obc: {}", soid);
- return pg.with_head_obc<RWState::RWREAD>(soid,
- [&recovery_waiter, push_func=std::move(push_func)](auto obc) {
- logger().debug("recover_object: loaded obc: {}", obc->obs.oi.soid);
- recovery_waiter.obc = obc;
- recovery_waiter.obc->wait_recovery_read();
- return seastar::futurize_invoke(std::move(push_func));
- }).handle_error(
- crimson::osd::PG::load_obc_ertr::all_same_way([soid](auto& code) {
- //TODO: may need eio handling?
- logger().error("recover_object saw error code {},"
- " ignoring object {}", code, soid);
- }));
+ return prepare_pops.then([this, soid] {
+ auto &recovery = recovering.at(soid);
+ auto push_info = recovery.pushing.begin();
+ object_stat_sum_t stat = {};
+ if (push_info != recovery.pushing.end()) {
+ stat = push_info->second.stat;
+ } else {
+ // no push happened, take pull_info's stat
+ assert(recovery.pi);
+ stat = recovery.pi->stat;
+ }
+ pg.get_recovery_handler()->on_global_recover(soid, stat, false);
+ return seastar::make_ready_future<>();
+ }).handle_exception([this, soid](auto e) {
+ auto &recovery = recovering.at(soid);
+ if (recovery.obc) {
+ recovery.obc->drop_recovery_read();
+ }
+ recovering.erase(soid);
+ return seastar::make_exception_future<>(e);
+ });
}
seastar::future<>