bool clear_omap = !push_op.before_progress.omap_complete;
response.soid = push_op.recovery_info.soid;
+ if (first) {
+ auto [oi, ssc] = get_md_from_push_op(push_op);
+ replica_push_targets[push_op.recovery_info.soid] =
+ pg.obc_loader.create_cached_obc_from_push_data(
+ oi,
+ ssc);
+ }
+
co_await submit_push_data(
push_op.recovery_info, first, complete, clear_omap,
std::move(data_zeros),
push_op.attrset,
std::move(push_op.omap_entries), &t);
+ epoch_t epoch_frozen = pg.get_osdmap_epoch();
+ DEBUGDPP("submitting transaction", pg);
+
if (complete) {
+ auto ptiter = replica_push_targets.find(push_op.recovery_info.soid);
+ ceph_assert(ptiter != replica_push_targets.end());
+ auto manager = pg.obc_loader.get_obc_manager(ptiter->second);
+ manager.lock_excl_sync(); /* cannot already be locked */
+
co_await pg.get_recovery_handler()->on_local_recover(
push_op.recovery_info.soid, push_op.recovery_info,
false, t);
- }
- epoch_t epoch_frozen = pg.get_osdmap_epoch();
- DEBUGDPP("submitting transaction", pg);
-
- co_await interruptor::make_interruptible(
- shard_services.get_store().do_transaction(coll, std::move(t)));
+ co_await interruptor::make_interruptible(
+ shard_services.get_store().do_transaction(coll, std::move(t)));
+ replica_push_targets.erase(ptiter);
- if (complete) {
- //TODO: this should be grouped with pg.on_local_recover somehow.
pg.get_recovery_handler()->_committed_pushed_object(
epoch_frozen, pg.get_info().last_complete);
+ } else {
+ co_await interruptor::make_interruptible(
+ shard_services.get_store().do_transaction(coll, std::move(t)));
}
auto reply = crimson::make_message<MOSDPGPushReply>();