auto& recovery_waiter = recovering[soid];
return seastar::do_with(std::map<pg_shard_t, PushOp>(), get_shards_to_push(soid),
[this, soid, need](auto& pops, auto& shards) {
- return maybe_pull_missing_obj(soid, need).then(
- [this, &pops, &shards, soid, need, &recovery_waiter](bool pulled) mutable {
- return [this, &recovery_waiter, soid, pulled] {
- if (!recovery_waiter.obc) {
- return pg.get_or_load_head_obc(soid).safe_then(
- [&recovery_waiter, pulled](auto p) {
- auto& [obc, existed] = p;
- logger().debug("recover_object: loaded obc: {}", obc->obs.oi.soid);
- recovery_waiter.obc = obc;
- if (!existed) {
- // obc is loaded with excl lock
- recovery_waiter.obc->put_lock_type(RWState::RWEXCL);
- }
- bool got = recovery_waiter.obc->get_recovery_read().get0();
- ceph_assert_always(pulled ? got : 1);
- if (!got) {
- return recovery_waiter.obc->get_recovery_read(true)
- .then([](bool) { return seastar::now(); });
- }
- return seastar::make_ready_future<>();
- }, crimson::osd::PG::load_obc_ertr::all_same_way(
- [this, &recovery_waiter, soid](const std::error_code& e) {
- auto [obc, existed] =
- shard_services.obc_registry.get_cached_obc(soid);
- logger().debug("recover_object: load failure of obc: {}",
- obc->obs.oi.soid);
- recovery_waiter.obc = obc;
- // obc is loaded with excl lock
- recovery_waiter.obc->put_lock_type(RWState::RWEXCL);
- ceph_assert_always(recovery_waiter.obc->get_recovery_read().get0());
- return seastar::make_ready_future<>();
- })
- );
- } else {
- logger().debug("recover_object: already has obc!");
- }
- return seastar::now();
- }().then([this, soid, need, &pops, &shards] {
- return prep_push(soid, need, &pops, shards);
- });
+ return maybe_pull_missing_obj(soid, need).then([this, soid](bool pulled) {
+ return load_obc_for_recovery(soid, pulled);
+ }).then([this, soid, need, &pops, &shards] {
+ return prep_push(soid, need, &pops, shards);
}).handle_exception([this, soid](auto e) {
auto& recovery_waiter = recovering[soid];
if (recovery_waiter.obc)
});
}
+seastar::future<> ReplicatedRecoveryBackend::load_obc_for_recovery(
+ const hobject_t& soid,
+ bool pulled)
+{
+ auto& recovery_waiter = recovering[soid];
+ if (recovery_waiter.obc) {
+ return seastar::now();
+ }
+ return pg.get_or_load_head_obc(soid).safe_then(
+ [&recovery_waiter, pulled](auto p) {
+ auto& [obc, existed] = p;
+ logger().debug("load_obc_for_recovery: loaded obc: {}", obc->obs.oi.soid);
+ recovery_waiter.obc = obc;
+ if (!existed) {
+ // obc is loaded with excl lock
+ recovery_waiter.obc->put_lock_type(RWState::RWEXCL);
+ }
+ bool got = recovery_waiter.obc->get_recovery_read().get0();
+ ceph_assert_always(pulled ? got : 1);
+ if (got) {
+ return seastar::make_ready_future<>();
+ }
+ return recovery_waiter.obc->get_recovery_read(true).then([](bool) {
+ return seastar::now();
+ });
+ }, crimson::osd::PG::load_obc_ertr::all_same_way(
+ [this, &recovery_waiter, soid](const std::error_code& e) {
+ auto [obc, existed] =
+ shard_services.obc_registry.get_cached_obc(soid);
+ logger().debug("load_obc_for_recovery: load failure of obc: {}",
+ obc->obs.oi.soid);
+ recovery_waiter.obc = obc;
+ // obc is loaded with excl lock
+ recovery_waiter.obc->put_lock_type(RWState::RWEXCL);
+ ceph_assert_always(recovery_waiter.obc->get_recovery_read().get0());
+ return seastar::make_ready_future<>();
+ })
+ );
+}
+
seastar::future<> ReplicatedRecoveryBackend::push_delete(
const hobject_t& soid,
eversion_t need)