log_entries, t, peering_state.get_pg_trim_to(),
peering_state.get_pg_committed_to());
- return seastar::do_with(log_entries, set<pg_shard_t>{},
- [this, t=std::move(t), rep_tid](auto& log_entries, auto& waiting_on) mutable {
- return interruptor::do_for_each(get_acting_recovery_backfill(),
- [this, log_entries, waiting_on, rep_tid]
- (auto& i) mutable {
- pg_shard_t peer(i);
- if (peer == pg_whoami) {
- return seastar::now();
- }
- ceph_assert(peering_state.get_peer_missing().count(peer));
- ceph_assert(peering_state.has_peer_info(peer));
- auto log_m = crimson::make_message<MOSDPGUpdateLogMissing>(
- log_entries,
- spg_t(peering_state.get_info().pgid.pgid, i.shard),
- pg_whoami.shard,
- get_osdmap_epoch(),
- get_last_peering_reset(),
- rep_tid,
- peering_state.get_pg_trim_to(),
- peering_state.get_pg_committed_to());
- waiting_on.insert(peer);
- logger().debug("submit_error_log: sending log"
- "missing_request (rep_tid: {} entries: {})"
- " to osd {}", rep_tid, log_entries, peer.osd);
- return shard_services.send_to_osd(peer.osd,
- std::move(log_m),
- get_osdmap_epoch());
- }).then_interruptible([this, waiting_on, t=std::move(t), rep_tid] () mutable {
- waiting_on.insert(pg_whoami);
- logger().debug("submit_error_log: inserting rep_tid {}", rep_tid);
- log_entry_update_waiting_on.insert(
- std::make_pair(rep_tid,
- log_update_t{std::move(waiting_on)}));
- return shard_services.get_store().do_transaction(
- get_collection_ref(), std::move(t)
- ).then([this] {
- peering_state.update_trim_to();
- return seastar::make_ready_future<eversion_t>(projected_last_update);
- });
- });
- });
+
+ set<pg_shard_t> waiting_on;
+ for (const auto &peer: get_acting_recovery_backfill()) {
+ if (peer == pg_whoami) {
+ continue;
+ }
+ ceph_assert(peering_state.get_peer_missing().count(peer));
+ ceph_assert(peering_state.has_peer_info(peer));
+ auto log_m = crimson::make_message<MOSDPGUpdateLogMissing>(
+ log_entries,
+ spg_t(peering_state.get_info().pgid.pgid, peer.shard),
+ pg_whoami.shard,
+ get_osdmap_epoch(),
+ get_last_peering_reset(),
+ rep_tid,
+ peering_state.get_pg_trim_to(),
+ peering_state.get_pg_committed_to());
+ waiting_on.insert(peer);
+
+ logger().debug("submit_error_log: sending log"
+ "missing_request (rep_tid: {} entries: {})"
+ " to osd {}", rep_tid, log_entries, peer.osd);
+ co_await interruptor::make_interruptible(
+ shard_services.send_to_osd(
+ peer.osd,
+ std::move(log_m),
+ get_osdmap_epoch()));
+ }
+ waiting_on.insert(pg_whoami);
+ logger().debug("submit_error_log: inserting rep_tid {}", rep_tid);
+ log_entry_update_waiting_on.insert(
+ std::make_pair(rep_tid,
+ log_update_t{std::move(waiting_on)}));
+ co_await interruptor::make_interruptible(
+ shard_services.get_store().do_transaction(
+ get_collection_ref(), std::move(t)
+ ));
+
+ peering_state.update_trim_to();
+ co_return projected_last_update;
}
PG::run_executer_fut PG::run_executer(