log_entries, t, peering_state.get_pg_trim_to(),
peering_state.get_min_last_complete_ondisk());
- set<pg_shard_t> waiting_on;
- for (auto &i : get_acting_recovery_backfill()) {
+ return seastar::do_with(log_entries, t, set<pg_shard_t>{},
+ [this, rep_tid](auto& log_entries, auto& t,auto& waiting_on) mutable {
+ return seastar::do_for_each(get_acting_recovery_backfill(),
+ [this, log_entries, t=std::move(t), waiting_on, rep_tid]
+ (auto& i) mutable {
pg_shard_t peer(i);
- if (peer == pg_whoami) continue;
+ if (peer == pg_whoami) {
+ return seastar::now();
+ }
ceph_assert(peering_state.get_peer_missing().count(peer));
ceph_assert(peering_state.has_peer_info(peer));
auto log_m = crimson::make_message<MOSDPGUpdateLogMissing>(
rep_tid,
peering_state.get_pg_trim_to(),
peering_state.get_min_last_complete_ondisk());
+ waiting_on.insert(peer);
logger().debug("submit_error_log: sending log"
"missing_request (rep_tid: {} entries: {})"
" to osd {}", rep_tid, log_entries, peer.osd);
- send_cluster_message(peer.osd, std::move(log_m), get_osdmap_epoch());
- waiting_on.insert(peer);
- }
- waiting_on.insert(pg_whoami);
- logger().debug("submit_error_log: inserting rep_tid {}", rep_tid);
- log_entry_update_waiting_on.insert(
- std::make_pair(rep_tid, log_update_t{std::move(waiting_on)}));
- return shard_services.get_store().do_transaction(
- get_collection_ref(), std::move(t))
- .then([this] {
+ return shard_services.send_to_osd(peer.osd,
+ std::move(log_m),
+ get_osdmap_epoch());
+ }).then([this, waiting_on, t=std::move(t), rep_tid] () mutable {
+ waiting_on.insert(pg_whoami);
+ logger().debug("submit_error_log: inserting rep_tid {}", rep_tid);
+ log_entry_update_waiting_on.insert(
+ std::make_pair(rep_tid,
+ log_update_t{std::move(waiting_on)}));
+ return shard_services.get_store().do_transaction(
+ get_collection_ref(), std::move(t)
+ ).then([this] {
peering_state.update_trim_to();
return seastar::now();
+ });
});
+ });
}
PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<MURef<MOSDOpReply>>>