}
}
-void Heartbeat::add_peer(osd_id_t peer, epoch_t epoch)
+void Heartbeat::add_peer(osd_id_t _peer, epoch_t epoch)
{
- auto [peer_info, added] = peers.try_emplace(peer);
- auto& info = peer_info->second;
- info.epoch = epoch;
- if (added) {
- logger().info("add_peer({})", peer);
- auto osdmap = service.get_osdmap_service().get_map();
- // TODO: use addrs
- peer_info->second.con_front = front_msgr->connect(
- osdmap->get_hb_front_addrs(peer).front(), CEPH_ENTITY_TYPE_OSD);
- peer_info->second.con_back = back_msgr->connect(
- osdmap->get_hb_back_addrs(peer).front(), CEPH_ENTITY_TYPE_OSD);
- }
+ auto [iter, added] = peers.try_emplace(_peer, *this, _peer);
+ auto& peer = iter->second;
+ peer.set_epoch(epoch);
}
Heartbeat::osds_t Heartbeat::remove_down_peers()
{
osds_t osds;
- for (auto& peer : peers) {
- auto osd = peer.first;
+ for (auto& [osd, peer] : peers) {
auto osdmap = service.get_osdmap_service().get_map();
if (!osdmap->is_up(osd)) {
remove_peer(osd);
- } else if (peer.epoch < osdmap->get_epoch()) {
+ } else if (peer.get_epoch() < osdmap->get_epoch()) {
osds.push_back(osd);
}
}
void Heartbeat::remove_peer(osd_id_t peer)
{
- logger().info("remove_peer({})", peer);
auto found = peers.find(peer);
assert(found != peers.end());
- found->second.con_front->mark_down();
- found->second.con_back->mark_down();
peers.erase(peer);
}
void Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
{
- auto found = std::find_if(peers.begin(), peers.end(),
- [conn](const peers_map_t::value_type& peer) {
- return (peer.second.con_front == conn ||
- peer.second.con_back == conn);
- });
- if (found == peers.end()) {
- return;
+ // TODO: we should already have enough information to know which peer the
+ // conn belongs, so no need to do linear search here.
+ for (auto& [osd, peer] : peers) {
+ peer.handle_reset(conn);
}
- const auto peer = found->first;
- const auto epoch = found->second.epoch;
- remove_peer(peer);
- add_peer(peer, epoch);
}
seastar::future<> Heartbeat::handle_osd_ping(crimson::net::Connection* conn,
return seastar::now();
}
auto& peer = found->second;
- auto ping = peer.ping_history.find(m->ping_stamp);
- if (ping == peer.ping_history.end()) {
- // old replies, deprecated by newly sent pings.
- return seastar::now();
- }
- const auto now = clock::now();
- auto& unacked = ping->second.unacknowledged;
- if (conn == peer.con_back.get()) {
- peer.last_rx_back = now;
- unacked--;
- } else if (conn == peer.con_front.get()) {
- peer.last_rx_front = now;
- unacked--;
- }
- if (unacked == 0) {
- peer.ping_history.erase(peer.ping_history.begin(), ++ping);
- }
- if (peer.is_healthy(now)) {
- // cancel false reports
- if (auto pending = failure_pending.find(from);
- pending != failure_pending.end()) {
- return send_still_alive(from, pending->second.addrs);
- }
- }
- return seastar::now();
+ return peer.handle_reply(conn, m);
}
seastar::future<> Heartbeat::handle_you_died()
{
failure_queue_t failure_queue;
const auto now = clock::now();
- for (const auto& [osd, peer_info]: peers) {
- if (clock::is_zero(peer_info.first_tx)) {
- continue;
- }
-
- if (peer_info.is_unhealthy(now)) {
- auto oldest_deadline = peer_info.ping_history.begin()->second.deadline;
- auto failed_since = std::min(peer_info.last_rx_back,
- peer_info.last_rx_front);
- if (clock::is_zero(failed_since)) {
- logger().error("heartbeat_check: no reply from osd.{} "
- "ever on either front or back, first ping sent {} "
- "(oldest deadline {})",
- osd, peer_info.first_tx, oldest_deadline);
- failed_since = peer_info.first_tx;
- } else {
- logger().error("heartbeat_check: no reply from osd.{} "
- "since back {} front {} (oldest deadline {})",
- osd, peer_info.last_rx_back, peer_info.last_rx_front,
- oldest_deadline);
- }
+ for (const auto& [osd, peer] : peers) {
+ auto failed_since = peer.failed_since(now);
+ if (!clock::is_zero(failed_since)) {
failure_queue.emplace(osd, failed_since);
}
}
{
const auto mnow = service.get_mnow();
const auto now = clock::now();
- const auto deadline =
- now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
- const utime_t sent_stamp{now};
std::vector<seastar::future<>> futures;
- for (auto& item : peers) {
- auto& info = item.second;
- info.last_tx = now;
- if (clock::is_zero(info.first_tx)) {
- info.first_tx = now;
- }
- [[maybe_unused]] auto [reply, added] =
- info.ping_history.emplace(sent_stamp, reply_t{deadline, 0});
- crimson::net::ConnectionRef conns[] = {info.con_front, info.con_back};
- for (auto& con : conns) {
- if (con) {
- auto min_message = static_cast<uint32_t>(
- local_conf()->osd_heartbeat_min_size);
- auto ping = make_message<MOSDPing>(
- monc.get_fsid(),
- service.get_osdmap_service().get_map()->get_epoch(),
- MOSDPing::PING,
- sent_stamp,
- mnow,
- mnow,
- service.get_osdmap_service().get_up_epoch(),
- min_message);
- reply->second.unacknowledged++;
- futures.push_back(con->send(std::move(ping)));
- }
- }
+ for (auto& [osd, peer] : peers) {
+ peer.send_heartbeat(now, mnow, futures);
}
return seastar::when_all_succeed(futures.begin(), futures.end());
}
});
}
-bool Heartbeat::PeerInfo::is_unhealthy(clock::time_point now) const
+void Heartbeat::print(std::ostream& out) const
+{
+ out << "heartbeat";
+}
+
+void Heartbeat::Peer::connect()
+{
+ logger().info("peer osd.{} added", peer);
+ auto osdmap = heartbeat.service.get_osdmap_service().get_map();
+ // TODO: use addrs
+ con_front = heartbeat.front_msgr->connect(
+ osdmap->get_hb_front_addrs(peer).front(), CEPH_ENTITY_TYPE_OSD);
+ con_back = heartbeat.back_msgr->connect(
+ osdmap->get_hb_back_addrs(peer).front(), CEPH_ENTITY_TYPE_OSD);
+}
+
+Heartbeat::Peer::Peer(Heartbeat& heartbeat, osd_id_t peer)
+ : heartbeat(heartbeat), peer(peer)
+{ connect(); }
+
+void Heartbeat::Peer::disconnect()
+{
+ logger().info("peer osd.{} removed", peer);
+ con_front->mark_down();
+ con_back->mark_down();
+}
+
+Heartbeat::Peer::~Peer()
+{ disconnect(); }
+
+bool Heartbeat::Peer::is_unhealthy(clock::time_point now) const
{
if (ping_history.empty()) {
// we haven't sent a ping yet or we have got all replies,
}
}
-bool Heartbeat::PeerInfo::is_healthy(clock::time_point now) const
+bool Heartbeat::Peer::is_healthy(clock::time_point now) const
{
if (con_front && clock::is_zero(last_rx_front)) {
return false;
return !is_unhealthy(now);
}
-void Heartbeat::print(std::ostream& out) const
+Heartbeat::clock::time_point
+Heartbeat::Peer::failed_since(clock::time_point now) const
{
- out << "heartbeat";
+ if (clock::is_zero(first_tx)) {
+ return clock::zero();
+ }
+ if (!is_unhealthy(now)) {
+ return clock::zero();
+ }
+
+ auto oldest_deadline = ping_history.begin()->second.deadline;
+ auto failed_since = std::min(last_rx_back, last_rx_front);
+ if (clock::is_zero(failed_since)) {
+ logger().error("failed_since: no reply from osd.{} "
+ "ever on either front or back, first ping sent {} "
+ "(oldest deadline {})",
+ peer, first_tx, oldest_deadline);
+ failed_since = first_tx;
+ } else {
+ logger().error("failed_since: no reply from osd.{} "
+ "since back {} front {} (oldest deadline {})",
+ peer, last_rx_back, last_rx_front, oldest_deadline);
+ }
+ return failed_since;
+}
+
+void Heartbeat::Peer::send_heartbeat(
+ clock::time_point now,
+ ceph::signedspan mnow,
+ std::vector<seastar::future<>>& futures)
+{
+ if (clock::is_zero(first_tx)) {
+ first_tx = now;
+ }
+ last_tx = now;
+
+ const utime_t sent_stamp{now};
+ const auto deadline =
+ now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
+ [[maybe_unused]] auto [reply, added] =
+ ping_history.emplace(sent_stamp, reply_t{deadline, 0});
+ for (auto& con : {con_front, con_back}) {
+ if (con) {
+ auto min_message = static_cast<uint32_t>(
+ local_conf()->osd_heartbeat_min_size);
+ auto ping = make_message<MOSDPing>(
+ heartbeat.monc.get_fsid(),
+ heartbeat.service.get_osdmap_service().get_map()->get_epoch(),
+ MOSDPing::PING,
+ sent_stamp,
+ mnow,
+ mnow,
+ heartbeat.service.get_osdmap_service().get_up_epoch(),
+ min_message);
+ reply->second.unacknowledged++;
+ futures.push_back(con->send(std::move(ping)));
+ }
+ }
+}
+
+seastar::future<> Heartbeat::Peer::handle_reply(
+ crimson::net::Connection* conn, Ref<MOSDPing> m)
+{
+ auto ping = ping_history.find(m->ping_stamp);
+ if (ping == ping_history.end()) {
+ // old replies, deprecated by newly sent pings.
+ return seastar::now();
+ }
+ const auto now = clock::now();
+ auto& unacked = ping->second.unacknowledged;
+ if (conn == con_back.get()) {
+ last_rx_back = now;
+ unacked--;
+ } else if (conn == con_front.get()) {
+ last_rx_front = now;
+ unacked--;
+ }
+ if (unacked == 0) {
+ ping_history.erase(ping_history.begin(), ++ping);
+ }
+ if (is_healthy(now)) {
+ // cancel false reports
+ if (auto pending = heartbeat.failure_pending.find(peer);
+ pending != heartbeat.failure_pending.end()) {
+ return heartbeat.send_still_alive(peer, pending->second.addrs);
+ }
+ }
+ return seastar::now();
+}
+
+void Heartbeat::Peer::handle_reset(crimson::net::ConnectionRef conn)
+{
+ if (con_front != conn && con_back != conn) {
+ return;
+ }
+ disconnect();
+ first_tx = {};
+ last_tx = {};
+ last_rx_front = {};
+ last_rx_back = {};
+ ping_history = {};
+ connect();
}
// use real_clock so it can be converted to utime_t
using clock = ceph::coarse_real_clock;
- struct reply_t {
- clock::time_point deadline;
- // one sent over front conn, another sent over back conn
- uint8_t unacknowledged = 0;
- };
- struct PeerInfo {
- /// peer connection (front)
- crimson::net::ConnectionRef con_front;
- /// peer connection (back)
- crimson::net::ConnectionRef con_back;
- /// time we sent our first ping request
- clock::time_point first_tx;
- /// last time we sent a ping request
- clock::time_point last_tx;
- /// last time we got a ping reply on the front side
- clock::time_point last_rx_front;
- /// last time we got a ping reply on the back side
- clock::time_point last_rx_back;
- /// most recent epoch we wanted this peer
- epoch_t epoch;
- /// history of inflight pings, arranging by timestamp we sent
- std::map<utime_t, reply_t> ping_history;
-
- bool is_unhealthy(clock::time_point now) const;
- bool is_healthy(clock::time_point now) const;
- };
- using peers_map_t = std::map<osd_id_t, PeerInfo>;
+ class Peer;
+ using peers_map_t = std::map<osd_id_t, Peer>;
peers_map_t peers;
+
// osds which are considered failed
// osd_id => when was the last time that both front and back pings were acked
// or sent.
hb.print(out);
return out;
}
+
+class Heartbeat::Peer {
+ public:
+ Peer(Heartbeat&, osd_id_t);
+ ~Peer();
+ Peer(Peer&&) = delete;
+ Peer(const Peer&) = delete;
+ Peer& operator=(const Peer&) = delete;
+
+ void set_epoch(epoch_t epoch_) { epoch = epoch_; }
+ epoch_t get_epoch() const { return epoch; }
+
+ // if failure, return time_point since last active
+ // else, return clock::zero()
+ clock::time_point failed_since(clock::time_point now) const;
+ void send_heartbeat(clock::time_point now,
+ ceph::signedspan mnow,
+ std::vector<seastar::future<>>&);
+ seastar::future<> handle_reply(crimson::net::Connection*, Ref<MOSDPing>);
+ void handle_reset(crimson::net::ConnectionRef);
+
+ private:
+ bool is_unhealthy(clock::time_point now) const;
+ bool is_healthy(clock::time_point now) const;
+
+ void connect();
+ void disconnect();
+
+ private:
+ Heartbeat& heartbeat;
+ const osd_id_t peer;
+
+ /// peer connection (front)
+ crimson::net::ConnectionRef con_front;
+ /// peer connection (back)
+ crimson::net::ConnectionRef con_back;
+ /// time we sent our first ping request
+ clock::time_point first_tx;
+ /// last time we sent a ping request
+ clock::time_point last_tx;
+ /// last time we got a ping reply on the front side
+ clock::time_point last_rx_front;
+ /// last time we got a ping reply on the back side
+ clock::time_point last_rx_back;
+ /// most recent epoch we wanted this peer
+ epoch_t epoch;
+
+ struct reply_t {
+ clock::time_point deadline;
+ // one sent over front conn, another sent over back conn
+ uint8_t unacknowledged = 0;
+ };
+ /// history of inflight pings, arranging by timestamp we sent
+ std::map<utime_t, reply_t> ping_history;
+};