}
}
if (!failure_queue.empty()) {
- // send_failures can run in background, because messages
- // are sent in order, if later checks find out the previous
- // "failed" peers to be healthy, that "still alive" messages
- // would be sent after the previous "osd failure" messages
- // which is totally safe.
+ // send_failures can run in background, because
+ // 1. After the execution of send_failures, no msg is actually
+ // sent, which means the sending operation is not done,
+ // which further seems to involve problems risks that when
+ // osd shuts down, the left part of the sending operation
+ // may reference OSD and Heartbeat instances that are already
+ // deleted. However, remaining work of that sending operation
+ // involves no reference back to OSD or Heartbeat instances,
+ // which means it wouldn't involve the above risks.
+ // 2. messages are sent in order, if later checks find out
+ // the previous "failed" peers to be healthy, that "still
+ // alive" messages would be sent after the previous "osd
+ // failure" messages which is totally safe.
(void)send_failures(std::move(failure_queue));
}
}
seastar::future<> Heartbeat::send_failures(failure_queue_t&& failure_queue)
{
- using failure_item_t = typename failure_queue_t::value_type;
- return seastar::parallel_for_each(failure_queue,
- [this](failure_item_t& failure_item) {
- auto [osd, failed_since] = failure_item;
- if (failure_pending.count(osd)) {
- return seastar::now();
- }
- auto failed_for = chrono::duration_cast<chrono::seconds>(
- clock::now() - failed_since).count();
- auto osdmap = service.get_osdmap_service().get_map();
- auto failure_report =
- make_message<MOSDFailure>(monc.get_fsid(),
- osd,
- osdmap->get_addrs(osd),
- static_cast<int>(failed_for),
- osdmap->get_epoch());
- failure_pending.emplace(osd, failure_info_t{failed_since,
- osdmap->get_addrs(osd)});
- return monc.send_message(failure_report);
- });
+ std::vector<seastar::future<>> futures;
+ const auto now = clock::now();
+ for (auto [osd, failed_since] : failure_queue) {
+ if (failure_pending.count(osd)) {
+ continue;
+ }
+ auto failed_for = chrono::duration_cast<chrono::seconds>(
+ now - failed_since).count();
+ auto osdmap = service.get_osdmap_service().get_map();
+ auto failure_report =
+ make_message<MOSDFailure>(monc.get_fsid(),
+ osd,
+ osdmap->get_addrs(osd),
+ static_cast<int>(failed_for),
+ osdmap->get_epoch());
+ failure_pending.emplace(osd, failure_info_t{failed_since,
+ osdmap->get_addrs(osd)});
+ futures.push_back(monc.send_message(failure_report));
+ }
+
+ return seastar::when_all_succeed(futures.begin(), futures.end());
}
seastar::future<> Heartbeat::send_still_alive(osd_id_t osd,