// start tracking the recovery of soid
return seastar::do_with(std::map<pg_shard_t, PushOp>(), get_shards_to_push(soid),
[this, soid, need](auto& pops, auto& shards) {
- return maybe_pull_missing_obj(soid, need).then([this, soid](bool pulled) {
- return load_obc_for_recovery(soid, pulled);
- }).safe_then([this, soid, need, &pops, &shards] {
- if (!shards.empty()) {
- return prep_push(soid, need, &pops, shards);
- } else {
- return seastar::now();
- }
- }, crimson::ct_error::all_same_way([this, soid](const std::error_code& e) {
- auto recovery_waiter = recovering.find(soid);
- if (auto obc = recovery_waiter->second.obc; obc) {
- obc->drop_recovery_read();
- }
- recovering.erase(recovery_waiter);
- return seastar::make_exception_future<>(e);
- })).then([this, &pops, &shards, soid] {
+ return maybe_pull_missing_obj(soid, need).then(
+ [this, soid, need, &pops, &shards](bool pulled) {
+ return maybe_push_shards(soid, need, pops, shards);
+ });
+ });
+}
+
+seastar::future<>
+ReplicatedRecoveryBackend::maybe_push_shards(
+ const hobject_t& soid,
+ eversion_t need,
+ std::map<pg_shard_t, PushOp>& pops,
+ std::list<std::map<pg_shard_t, pg_missing_t>::const_iterator>& shards)
+{
+ auto push_func = [this, soid, need, &pops, &shards] {
+ auto fut = seastar::now();
+ if (!shards.empty())
+ fut = prep_push(soid, need, &pops, shards);
+ return fut.then([this, &pops, &shards, soid] {
return seastar::parallel_for_each(shards,
[this, &pops, soid](auto shard) {
auto msg = make_message<MOSDPGPush>();
msg->set_priority(pg.get_recovery_op_priority());
msg->pushes.push_back(pops[shard->first]);
return shard_services.send_to_osd(shard->first.osd, std::move(msg),
- pg.get_osdmap_epoch()).then(
+ pg.get_osdmap_epoch()).then(
[this, soid, shard] {
return recovering.at(soid).wait_for_pushes(shard->first);
});
recovering.erase(soid);
return seastar::make_exception_future<>(e);
});
- });
+ };
+
+ auto& recovery_waiter = recovering.at(soid);
+ if (recovery_waiter.obc) {
+ return seastar::futurize_invoke(std::move(push_func));
+ }
+ logger().debug("recover_object: loading obc: {}", soid);
+ return pg.with_head_obc<RWState::RWREAD>(soid,
+ [&recovery_waiter, push_func=std::move(push_func)](auto obc) {
+ logger().debug("recover_object: loaded obc: {}", obc->obs.oi.soid);
+ recovery_waiter.obc = obc;
+ recovery_waiter.obc->wait_recovery_read();
+ return seastar::futurize_invoke(std::move(push_func));
+ }).template safe_then(
+ [] { return seastar::now(); },
+ crimson::osd::PG::load_obc_ertr::all_same_way([soid](auto& code) {
+ //TODO: may need eio handling?
+ logger().error("recover_object saw error code {},"
+ " ignoring object {}", code, soid);
+ return seastar::now();
+ }));
}
seastar::future<bool>
});
}
-auto ReplicatedRecoveryBackend::load_obc_for_recovery(
- const hobject_t& soid,
- bool pulled) ->
- load_obc_ertr::future<>
-{
- auto& recovery_waiter = recovering.at(soid);
- if (recovery_waiter.obc) {
- return load_obc_ertr::now();
- }
- return pg.with_head_obc<RWState::RWREAD>(soid, [&recovery_waiter](auto obc) {
- logger().debug("load_obc_for_recovery: loaded obc: {}", obc->obs.oi.soid);
- recovery_waiter.obc = obc;
- recovery_waiter.obc->wait_recovery_read();
- return seastar::now();
- });
-}
-
seastar::future<> ReplicatedRecoveryBackend::push_delete(
const hobject_t& soid,
eversion_t need)