ceph::os::Transaction&& txn,
std::map<int, MOSDPGPushReply*> replies)
{
+ std::ignore = shard_services.get_store().do_transaction(
+ ::RecoveryBackend::coll, std::move(txn)
+ ).then([replies=std::move(replies), this]() mutable {
+ if (auto msgit = replies.find(get_parent()->whoami_shard().osd);
+ msgit != std::end(replies)) {
+ std::ignore = handle_push_reply(Ref<MOSDPGPushReply>{msgit->second});
+ replies.erase(msgit);
+ }
+ return seastar::do_for_each(replies, [this] (auto&& kv) {
+ auto [osd, msg] = kv;
+ return pg.get_shard_services().send_to_osd(
+ osd,
+ MessageURef{msg},
+ pg.get_osdmap_epoch());
+ });
+ });
}
RecoveryBackend::interruptible_future<>