log_entries, t, peering_state.get_pg_trim_to(),
peering_state.get_pg_committed_to());
-
set<pg_shard_t> waiting_on;
+
+ waiting_on.insert(pg_whoami);
+
+ // preapre log_entry_update_waiting_on prior to sending requests
+ for (const auto &peer: get_acting_recovery_backfill()) {
+ if (peer == pg_whoami) {
+ continue;
+ }
+ ceph_assert(peering_state.get_peer_missing().count(peer));
+ ceph_assert(peering_state.has_peer_info(peer));
+ waiting_on.insert(peer);
+ }
+
+ DEBUGDPP("inserting rep_tid {} waiting on {}", *this, rep_tid, waiting_on);
+ log_entry_update_waiting_on.insert(
+ std::make_pair(rep_tid,
+ log_update_t{std::move(waiting_on)}));
+
+ // Send missing_requests to peers
for (const auto &peer: get_acting_recovery_backfill()) {
if (peer == pg_whoami) {
continue;
rep_tid,
peering_state.get_pg_trim_to(),
peering_state.get_pg_committed_to());
- waiting_on.insert(peer);
DEBUGDPP("sending log missing_request (rep_tid: {} entries: {}) to osd {}",
*this, rep_tid, log_entries, peer.osd);
std::move(log_m),
get_osdmap_epoch()));
}
- waiting_on.insert(pg_whoami);
- DEBUGDPP("inserting rep_tid {}", *this, rep_tid);
- log_entry_update_waiting_on.insert(
- std::make_pair(rep_tid,
- log_update_t{std::move(waiting_on)}));
+
co_await interruptor::make_interruptible(
shard_services.get_store().do_transaction(
get_collection_ref(), std::move(t)
log_entry_update_waiting_on.erase(it);
}
} else {
- logger().error("{} : {} got reply {} on unknown tid {}",
- __func__, peering_state.get_info().pgid, *m, m->get_tid());
+ ceph_abort_msg(fmt::format(
+ "{} : {} got reply {} on unknown tid {}",
+ __func__, peering_state.get_info().pgid, *m, m->get_tid()));
}
return seastar::now();
}