timer{[this] {
heartbeat_check();
(void)send_heartbeats();
- }}
+ }},
+ failing_peers{*this}
{}
seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs,
std::vector<seastar::future<>> futures;
const auto now = clock::now();
for (auto [osd, failed_since] : failure_queue) {
- if (failure_pending.count(osd)) {
- continue;
- }
- auto failed_for = chrono::duration_cast<chrono::seconds>(
- now - failed_since).count();
- auto osdmap = service.get_osdmap_service().get_map();
- auto failure_report =
- make_message<MOSDFailure>(monc.get_fsid(),
- osd,
- osdmap->get_addrs(osd),
- static_cast<int>(failed_for),
- osdmap->get_epoch());
- failure_pending.emplace(osd, failure_info_t{failed_since,
- osdmap->get_addrs(osd)});
- futures.push_back(monc.send_message(failure_report));
+ failing_peers.add_pending(osd, failed_since, now, futures);
}
return seastar::when_all_succeed(futures.begin(), futures.end());
}
-seastar::future<> Heartbeat::send_still_alive(osd_id_t osd,
- const entity_addrvec_t& addrs)
-{
- auto still_alive = make_message<MOSDFailure>(
- monc.get_fsid(),
- osd,
- addrs,
- 0,
- service.get_osdmap_service().get_map()->get_epoch(),
- MOSDFailure::FLAG_ALIVE);
- return monc.send_message(still_alive).then([=] {
- failure_pending.erase(osd);
- return seastar::now();
- });
-}
-
void Heartbeat::print(std::ostream& out) const
{
out << "heartbeat";
ping_history.erase(ping_history.begin(), ++ping);
}
if (do_health_screen(now) == health_state::HEALTHY) {
- // cancel false reports
- if (auto pending = heartbeat.failure_pending.find(peer);
- pending != heartbeat.failure_pending.end()) {
- return heartbeat.send_still_alive(peer, pending->second.addrs);
- }
+ return heartbeat.failing_peers.cancel_one(peer);
}
return seastar::now();
}
ping_history = {};
connect();
}
+
+bool Heartbeat::FailingPeers::add_pending(
+ osd_id_t peer,
+ clock::time_point failed_since,
+ clock::time_point now,
+ std::vector<seastar::future<>>& futures)
+{
+ if (failure_pending.count(peer)) {
+ return false;
+ }
+ auto failed_for = chrono::duration_cast<chrono::seconds>(
+ now - failed_since).count();
+ auto osdmap = heartbeat.service.get_osdmap_service().get_map();
+ auto failure_report =
+ make_message<MOSDFailure>(heartbeat.monc.get_fsid(),
+ peer,
+ osdmap->get_addrs(peer),
+ static_cast<int>(failed_for),
+ osdmap->get_epoch());
+ failure_pending.emplace(peer, failure_info_t{failed_since,
+ osdmap->get_addrs(peer)});
+ futures.push_back(heartbeat.monc.send_message(failure_report));
+ return true;
+}
+
+seastar::future<> Heartbeat::FailingPeers::cancel_one(osd_id_t peer)
+{
+ if (auto pending = failure_pending.find(peer);
+ pending != failure_pending.end()) {
+ auto fut = send_still_alive(peer, pending->second.addrs);
+ failure_pending.erase(peer);
+ return fut;
+ }
+ return seastar::now();
+}
+
+seastar::future<>
+Heartbeat::FailingPeers::send_still_alive(
+ osd_id_t osd, const entity_addrvec_t& addrs)
+{
+ auto still_alive = make_message<MOSDFailure>(
+ heartbeat.monc.get_fsid(),
+ osd,
+ addrs,
+ 0,
+ heartbeat.service.get_osdmap_service().get_map()->get_epoch(),
+ MOSDFailure::FLAG_ALIVE);
+ return heartbeat.monc.send_message(still_alive);
+}
Ref<MOSDPing> m);
seastar::future<> handle_you_died();
- seastar::future<> send_still_alive(osd_id_t, const entity_addrvec_t&);
-
using osds_t = std::vector<osd_id_t>;
/// remove down OSDs
/// @return peers not needed in this epoch
seastar::future<> send_heartbeats();
void heartbeat_check();
- struct failure_info_t {
- clock::time_point failed_since;
- entity_addrvec_t addrs;
- };
// osds we've reported to monior as failed ones, but they are not marked down
// yet
- std::map<osd_id_t, failure_info_t> failure_pending;
crimson::common::Gated gate;
+
+ class FailingPeers {
+ public:
+ FailingPeers(Heartbeat& heartbeat) : heartbeat(heartbeat) {}
+ bool add_pending(osd_id_t peer,
+ clock::time_point failed_since,
+ clock::time_point now,
+ std::vector<seastar::future<>>& futures);
+ seastar::future<> cancel_one(osd_id_t peer);
+
+ private:
+ seastar::future<> send_still_alive(osd_id_t, const entity_addrvec_t&);
+
+ Heartbeat& heartbeat;
+
+ struct failure_info_t {
+ clock::time_point failed_since;
+ entity_addrvec_t addrs;
+ };
+ std::map<osd_id_t, failure_info_t> failure_pending;
+ } failing_peers;
};
inline std::ostream& operator<<(std::ostream& out, const Heartbeat& hb) {