}
}
-Heartbeat::Heartbeat(const crimson::osd::ShardServices& service,
+Heartbeat::Heartbeat(osd_id_t whoami,
+ const crimson::osd::ShardServices& service,
crimson::mon::Client& monc,
crimson::net::MessengerRef front_msgr,
crimson::net::MessengerRef back_msgr)
- : service{service},
+ : whoami{whoami},
+ service{service},
monc{monc},
front_msgr{front_msgr},
back_msgr{back_msgr},
void Heartbeat::add_peer(osd_id_t _peer, epoch_t epoch)
{
+ assert(whoami != _peer);
auto [iter, added] = peers.try_emplace(_peer, *this, _peer);
auto& peer = iter->second;
peer.set_epoch(epoch);
}
if (auto found = peers.find(peer);
found != peers.end()) {
- found->second.handle_reset(conn);
+ found->second.handle_reset(conn, is_replace);
+ }
+}
+
+void Heartbeat::ms_handle_connect(crimson::net::ConnectionRef conn)
+{
+ auto peer = conn->get_peer_id();
+ if (conn->get_peer_type() != entity_name_t::TYPE_OSD ||
+ peer == entity_name_t::NEW) {
+ return;
+ }
+ if (auto found = peers.find(peer);
+ found != peers.end()) {
+ found->second.handle_connect(conn);
+ }
+}
+
+void Heartbeat::ms_handle_accept(crimson::net::ConnectionRef conn)
+{
+ auto peer = conn->get_peer_id();
+ if (conn->get_peer_type() != entity_name_t::TYPE_OSD ||
+ peer == entity_name_t::NEW) {
+ return;
+ }
+ if (auto found = peers.find(peer);
+ found != peers.end()) {
+ found->second.handle_accept(conn);
}
}
out << "heartbeat";
}
-void Heartbeat::Peer::connect()
-{
- logger().info("peer osd.{} added", peer);
- auto osdmap = heartbeat.service.get_osdmap_service().get_map();
- // TODO: use addrs
- con_front = heartbeat.front_msgr->connect(
- osdmap->get_hb_front_addrs(peer).front(),
- entity_name_t(CEPH_ENTITY_TYPE_OSD, peer));
- con_back = heartbeat.back_msgr->connect(
- osdmap->get_hb_back_addrs(peer).front(),
- entity_name_t(CEPH_ENTITY_TYPE_OSD, peer));
-}
-
Heartbeat::Peer::Peer(Heartbeat& heartbeat, osd_id_t peer)
: heartbeat(heartbeat), peer(peer)
-{ connect(); }
-
-void Heartbeat::Peer::disconnect()
{
- logger().info("peer osd.{} removed", peer);
- con_front->mark_down();
- con_back->mark_down();
+ logger().info("Heartbeat::Peer: osd.{} added", peer);
+ connect_front();
+ connect_back();
}
Heartbeat::Peer::~Peer()
-{ disconnect(); }
+{
+ logger().info("Heartbeat::Peer: osd.{} removed", peer);
+ if (con_front) {
+ con_front->mark_down();
+ }
+ if (con_back) {
+ con_back->mark_down();
+ }
+}
bool Heartbeat::Peer::pinged() const
{
auto oldest_deadline = ping_history.begin()->second.deadline;
auto failed_since = std::min(last_rx_back, last_rx_front);
if (clock::is_zero(failed_since)) {
- logger().error("failed_since: no reply from osd.{} "
+ logger().error("Heartbeat::Peer::failed_since(): no reply from osd.{} "
"ever on either front or back, first ping sent {} "
"(oldest deadline {})",
peer, first_tx, oldest_deadline);
failed_since = first_tx;
} else {
- logger().error("failed_since: no reply from osd.{} "
+ logger().error("Heartbeat::Peer::failed_since(): no reply from osd.{} "
"since back {} front {} (oldest deadline {})",
peer, last_rx_back, last_rx_front, oldest_deadline);
}
}
}
-void Heartbeat::Peer::send_heartbeat(
+void Heartbeat::Peer::do_send_heartbeat(
clock::time_point now,
ceph::signedspan mnow,
- std::vector<seastar::future<>>& futures)
+ std::vector<seastar::future<>>* futures)
{
- if (!pinged()) {
- first_tx = now;
- }
- last_tx = now;
-
+ assert(session_started);
const utime_t sent_stamp{now};
const auto deadline =
now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
heartbeat.service.get_osdmap_service().get_up_epoch(),
min_message);
reply->second.unacknowledged++;
- futures.push_back(con->send(std::move(ping)));
+ if (futures) {
+ futures->push_back(con->send(std::move(ping)));
+ }
+ }
+}
+
+void Heartbeat::Peer::send_heartbeat(
+ clock::time_point now,
+ ceph::signedspan mnow,
+ std::vector<seastar::future<>>& futures)
+{
+ if (!pinged()) {
+ first_tx = now;
+ }
+ last_tx = now;
+
+ if (session_started) {
+ do_send_heartbeat(now, mnow, &futures);
+
+ // validate connection addresses
+ const auto osdmap = heartbeat.service.get_osdmap_service().get_map();
+ const auto front_addr = osdmap->get_hb_front_addrs(peer).front();
+ if (con_front->get_peer_addr() != front_addr) {
+ logger().info("Heartbeat::Peer::send_heartbeat(): "
+ "peer osd.{} con_front has new address {} over {}, reset",
+ peer, front_addr, con_front->get_peer_addr());
+ con_front->mark_down();
+ has_racing = false;
+ handle_reset(con_front, false);
+ }
+ const auto back_addr = osdmap->get_hb_back_addrs(peer).front();
+ if (con_back->get_peer_addr() != back_addr) {
+ logger().info("Heartbeat::Peer::send_heartbeat(): "
+ "peer osd.{} con_back has new address {} over {}, reset",
+ peer, back_addr, con_back->get_peer_addr());
+ con_back->mark_down();
+ has_racing = false;
+ handle_reset(con_back, false);
+ }
+ } else {
+ // we should send MOSDPing but still cannot at this moment
+ if (pending_send) {
+ // we have already pending for a entire heartbeat interval
+ logger().warn("Heartbeat::Peer::send_heartbeat(): "
+ "heartbeat to {} is still pending...", peer);
+ has_racing = false;
+ // retry con_front if still pending
+ if (!front_ready) {
+ if (con_front) {
+ con_front->mark_down();
+ handle_reset(con_front, false);
+ } else {
+ connect_front();
+ }
+ }
+ // retry con_back if still pending
+ if (!back_ready) {
+ if (con_back) {
+ con_back->mark_down();
+ handle_reset(con_back, false);
+ } else {
+ connect_back();
+ }
+ }
+ } else {
+ logger().info("Heartbeat::Peer::send_heartbeat(): "
+ "heartbeat to {} is pending send...", peer);
+ // maintain an entry in ping_history for unhealthy check
+ if (ping_history.empty()) {
+ const utime_t sent_stamp{now};
+ const auto deadline =
+ now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
+ [[maybe_unused]] auto [reply, added] =
+ ping_history.emplace(sent_stamp, reply_t{deadline, 0});
+ } else { // the entry is already added
+ assert(ping_history.size() == 1);
+ }
+ pending_send = true;
+ }
}
}
seastar::future<> Heartbeat::Peer::handle_reply(
crimson::net::Connection* conn, Ref<MOSDPing> m)
{
+ if (!session_started) {
+ // we haven't sent any ping yet
+ return seastar::now();
+ }
auto ping = ping_history.find(m->ping_stamp);
if (ping == ping_history.end()) {
// old replies, deprecated by newly sent pings.
}
const auto now = clock::now();
auto& unacked = ping->second.unacknowledged;
+ assert(unacked);
if (conn == con_back.get()) {
last_rx_back = now;
unacked--;
return seastar::now();
}
-void Heartbeat::Peer::handle_reset(crimson::net::ConnectionRef conn)
+void Heartbeat::Peer::handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
+{
+ if (con_front == conn) {
+ con_front = nullptr;
+ if (is_replace) {
+ assert(!front_ready);
+ assert(!session_started);
+ // set the racing connection, will be handled by handle_accept()
+ con_front = heartbeat.front_msgr->connect(
+ conn->get_peer_addr(), entity_name_t(CEPH_ENTITY_TYPE_OSD, peer));
+ has_racing = true;
+ logger().warn("Heartbeat::Peer::handle_reset(): "
+ "con_front racing with osd.{}, updated by {}",
+ peer, con_front);
+ } else {
+ if (front_ready) {
+ front_ready = false;
+ }
+ if (session_started) {
+ reset_session();
+ }
+ assert(heartbeat.whoami != peer);
+ if (heartbeat.whoami > peer || !has_racing) {
+ connect_front();
+ } else { // whoami < peer && has_racing
+ logger().info("Heartbeat::Peer::handle_reset(): "
+ "con_front racing detected and lose, "
+ "waiting for osd.{} connect me", peer);
+ }
+ }
+ } else if (con_back == conn) {
+ con_back = nullptr;
+ if (is_replace) {
+ assert(!back_ready);
+ assert(!session_started);
+ // set the racing connection, will be handled by handle_accept()
+ con_back = heartbeat.back_msgr->connect(
+ conn->get_peer_addr(), entity_name_t(CEPH_ENTITY_TYPE_OSD, peer));
+ has_racing = true;
+ logger().warn("Heartbeat::Peer::handle_reset(): "
+ "con_back racing with osd.{}, updated by {}",
+ peer, con_back);
+ } else {
+ if (back_ready) {
+ back_ready = false;
+ }
+ if (session_started) {
+ reset_session();
+ }
+ if (heartbeat.whoami == peer) {
+ logger().error("Heartbeat::Peer::handle_reset(): "
+ "peer is myself ({})", peer);
+ } else if (heartbeat.whoami > peer || !has_racing) {
+ connect_back();
+ } else { // whoami < peer && has_racing
+ logger().info("Heartbeat::Peer::handle_reset(): "
+ "con_back racing detected and lose, "
+ "waiting for osd.{} connect me", peer);
+ }
+ }
+ } else {
+ // ignore the unrelated conn
+ }
+}
+
+void Heartbeat::Peer::handle_connect(crimson::net::ConnectionRef conn)
{
- if (con_front != conn && con_back != conn) {
- return;
+ if (con_front == conn) {
+ assert(!front_ready);
+ assert(!session_started);
+ notify_front_ready();
+ } else if (con_back == conn) {
+ assert(!back_ready);
+ assert(!session_started);
+ notify_back_ready();
+ } else {
+ // ignore the unrelated connection
+ }
+}
+
+void Heartbeat::Peer::handle_accept(crimson::net::ConnectionRef conn)
+{
+ handle_connect(conn);
+
+ const auto peer_addr = conn->get_peer_addr();
+ const auto osdmap = heartbeat.service.get_osdmap_service().get_map();
+ const auto front_addr = osdmap->get_hb_front_addrs(peer).front();
+ if (!con_front && front_addr == peer_addr) {
+ logger().info("Heartbeat::Peer::handle_accept(): "
+ "con_front racing resolved for osd.{}", peer);
+ con_front = conn;
+ notify_front_ready();
+ }
+ const auto back_addr = osdmap->get_hb_back_addrs(peer).front();
+ if (!con_back && back_addr == peer_addr) {
+ logger().info("Heartbeat::Peer::handle_accept(): "
+ "con_back racing resolved for osd.{}", peer);
+ con_back = conn;
+ notify_back_ready();
+ }
+}
+
+void Heartbeat::Peer::start_session()
+{
+ logger().info("Heartbeat::Peer: osd.{} started (send={})",
+ peer, pending_send);
+ assert(!session_started);
+ session_started = true;
+ ping_history.clear();
+ if (pending_send) {
+ pending_send = false;
+ do_send_heartbeat(clock::now(), heartbeat.service.get_mnow(), nullptr);
+ }
+}
+
+void Heartbeat::Peer::reset_session()
+{
+ logger().info("Heartbeat::Peer: osd.{} reset", peer);
+ assert(session_started);
+ if (!ping_history.empty()) {
+ // we lost our ping_history of the last session, but still need to keep
+ // the oldest deadline for unhealthy check.
+ auto oldest = ping_history.begin();
+ auto sent_stamp = oldest->first;
+ auto deadline = oldest->second.deadline;
+ ping_history.clear();
+ ping_history.emplace(sent_stamp, reply_t{deadline, 0});
+ }
+ session_started = false;
+}
+
+void Heartbeat::Peer::notify_front_ready()
+{
+ assert(con_front);
+ assert(!front_ready);
+ assert(!session_started);
+ front_ready = true;
+ if (front_ready && back_ready) {
+ start_session();
+ }
+}
+
+void Heartbeat::Peer::connect_front()
+{
+ assert(!con_front);
+ assert(!front_ready);
+ assert(!session_started);
+ auto osdmap = heartbeat.service.get_osdmap_service().get_map();
+ // TODO: use addrs
+ con_front = heartbeat.front_msgr->connect(
+ osdmap->get_hb_front_addrs(peer).front(),
+ entity_name_t(CEPH_ENTITY_TYPE_OSD, peer));
+ if (con_front->is_connected()) {
+ notify_front_ready();
+ }
+}
+
+void Heartbeat::Peer::notify_back_ready()
+{
+ assert(con_back);
+ assert(!back_ready);
+ assert(!session_started);
+ back_ready = true;
+ if (front_ready && back_ready) {
+ start_session();
+ }
+}
+
+void Heartbeat::Peer::connect_back()
+{
+ assert(!con_back);
+ assert(!back_ready);
+ assert(!session_started);
+ auto osdmap = heartbeat.service.get_osdmap_service().get_map();
+ // TODO: use addrs
+ con_back = heartbeat.back_msgr->connect(
+ osdmap->get_hb_back_addrs(peer).front(),
+ entity_name_t(CEPH_ENTITY_TYPE_OSD, peer));
+ if (con_back->is_connected()) {
+ notify_back_ready();
}
- disconnect();
- first_tx = {};
- last_tx = {};
- last_rx_front = {};
- last_rx_back = {};
- ping_history = {};
- connect();
}
bool Heartbeat::FailingPeers::add_pending(
public:
using osd_id_t = int;
- Heartbeat(const crimson::osd::ShardServices& service,
+ Heartbeat(osd_id_t whoami,
+ const crimson::osd::ShardServices& service,
crimson::mon::Client& monc,
crimson::net::MessengerRef front_msgr,
crimson::net::MessengerRef back_msgr);
seastar::future<> ms_dispatch(crimson::net::Connection* conn,
MessageRef m) override;
void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
+ void ms_handle_connect(crimson::net::ConnectionRef conn) override;
+ void ms_handle_accept(crimson::net::ConnectionRef conn) override;
void print(std::ostream&) const;
private:
const entity_addrvec_t& addrs,
ChainedDispatchersRef);
private:
+ const osd_id_t whoami;
const crimson::osd::ShardServices& service;
crimson::mon::Client& monc;
crimson::net::MessengerRef front_msgr;
ceph::signedspan mnow,
std::vector<seastar::future<>>&);
seastar::future<> handle_reply(crimson::net::Connection*, Ref<MOSDPing>);
- void handle_reset(crimson::net::ConnectionRef);
+ void handle_reset(crimson::net::ConnectionRef, bool is_replace);
+ void handle_connect(crimson::net::ConnectionRef);
+ void handle_accept(crimson::net::ConnectionRef);
private:
bool pinged() const;
HEALTHY,
};
health_state do_health_screen(clock::time_point now) const;
- void connect();
- void disconnect();
+
+ // a session starts when con_front and con_back are both connected
+ void start_session();
+ // a session resets when either con_front or con_back is reset
+ void reset_session();
+ // notify when con_front becomes ready, possibly start session
+ void notify_front_ready();
+ void connect_front();
+ // notify when con_back becomes ready, possibly start session
+ void notify_back_ready();
+ void connect_back();
+
+ void do_send_heartbeat(clock::time_point now,
+ ceph::signedspan mnow,
+ std::vector<seastar::future<>>*);
private:
Heartbeat& heartbeat;
const osd_id_t peer;
- /// peer connection (front)
- crimson::net::ConnectionRef con_front;
- /// peer connection (back)
- crimson::net::ConnectionRef con_back;
- /// time we sent our first ping request
+ // time we sent our first ping request
clock::time_point first_tx;
- /// last time we sent a ping request
+ // last time we sent a ping request
clock::time_point last_tx;
- /// last time we got a ping reply on the front side
+ // last time we got a ping reply on the front side
clock::time_point last_rx_front;
- /// last time we got a ping reply on the back side
+ // last time we got a ping reply on the back side
clock::time_point last_rx_back;
- /// most recent epoch we wanted this peer
+ // most recent epoch we wanted this peer
epoch_t epoch;
+ // if racing happened
+ bool has_racing = false;
+ // peer connection (front)
+ crimson::net::ConnectionRef con_front;
+ bool front_ready = false;
+ // peer connection (back)
+ crimson::net::ConnectionRef con_back;
+ bool back_ready = false;
+
+ // start to send pings and track ping_history
+ bool session_started = false;
+ // if need to send heartbeat when session started
+ bool pending_send = false;
+
struct reply_t {
clock::time_point deadline;
// one sent over front conn, another sent over back conn
uint8_t unacknowledged = 0;
};
- /// history of inflight pings, arranging by timestamp we sent
+ // history of inflight pings, arranging by timestamp we sent
std::map<utime_t, reply_t> ping_history;
};