out << "heartbeat";
}
-Heartbeat::Peer::Peer(Heartbeat& heartbeat, osd_id_t peer)
- : heartbeat(heartbeat), peer(peer)
+Heartbeat::Connection::~Connection()
{
- logger().info("Heartbeat::Peer: osd.{} added", peer);
- connect_front();
- connect_back();
+ if (conn) {
+ conn->mark_down();
+ }
}
-Heartbeat::Peer::~Peer()
+bool Heartbeat::Connection::match(crimson::net::Connection* _conn) const
{
- logger().info("Heartbeat::Peer: osd.{} removed", peer);
- if (con_front) {
- con_front->mark_down();
+ if (conn && conn.get() == _conn) {
+ return true;
+ } else {
+ return false;
}
- if (con_back) {
- con_back->mark_down();
+}
+
+void Heartbeat::Connection::accepted(crimson::net::ConnectionRef accepted_conn)
+{
+ if (!conn) {
+ if (accepted_conn->get_peer_addr() == connector.get_peer_addr(type)) {
+ logger().info("Heartbeat::Connection::accepted(): "
+ "{} racing resolved", *this);
+ conn = accepted_conn;
+ set_connected();
+ }
+ } else if (conn == accepted_conn) {
+ set_connected();
}
}
-bool Heartbeat::Peer::pinged() const
+void Heartbeat::Connection::replaced()
{
- if (clock::is_zero(first_tx)) {
- // i can never receive a pong without sending any ping message first.
- assert(clock::is_zero(last_rx_front) &&
- clock::is_zero(last_rx_back));
- return false;
+ assert(!is_connected);
+ auto replaced_conn = conn;
+ // set the racing connection, will be handled by handle_accept()
+ conn = msgr.connect(replaced_conn->get_peer_addr(),
+ replaced_conn->get_peer_name());
+ racing_detected = true;
+ logger().warn("Heartbeat::Connection::replaced(): {} racing", *this);
+ assert(conn != replaced_conn);
+ assert(!conn->is_connected());
+}
+
+void Heartbeat::Connection::reset()
+{
+ conn = nullptr;
+ if (is_connected) {
+ is_connected = false;
+ connector.decrease_connected();
+ }
+ if (!racing_detected || is_winner_side) {
+ connect();
} else {
- return true;
+ logger().info("Heartbeat::Connection::reset(): "
+ "{} racing detected and lose, "
+ "waiting for peer connect me", *this);
}
}
-Heartbeat::Peer::health_state
-Heartbeat::Peer::do_health_screen(clock::time_point now) const
+seastar::future<> Heartbeat::Connection::send(MessageRef msg)
{
- if (!pinged()) {
- // we are not healty nor unhealty because we haven't sent anything yet
- return health_state::UNKNOWN;
- } else if (!ping_history.empty() && ping_history.begin()->second.deadline < now) {
- return health_state::UNHEALTHY;
- } else if (!clock::is_zero(last_rx_front) &&
- !clock::is_zero(last_rx_back)) {
- // only declare to be healthy until we have received the first
- // replies from both front/back connections
- return health_state::HEALTHY;
- } else {
- return health_state::UNKNOWN;
+ assert(is_connected);
+ return conn->send(msg);
+}
+
+void Heartbeat::Connection::validate()
+{
+ assert(is_connected);
+ auto peer_addr = connector.get_peer_addr(type);
+ if (conn->get_peer_addr() != peer_addr) {
+ logger().info("Heartbeat::Connection::validate(): "
+ "{} has new address {} over {}, reset",
+ *this, peer_addr, conn->get_peer_addr());
+ conn->mark_down();
+ racing_detected = false;
+ reset();
+ }
+}
+
+void Heartbeat::Connection::retry()
+{
+ racing_detected = false;
+ if (!is_connected) {
+ if (conn) {
+ conn->mark_down();
+ reset();
+ } else {
+ connect();
+ }
+ }
+}
+
+void Heartbeat::Connection::set_connected()
+{
+ assert(!is_connected);
+ is_connected = true;
+ connector.increase_connected();
+}
+
+void Heartbeat::Connection::connect()
+{
+ assert(!conn);
+ auto addr = connector.get_peer_addr(type);
+ conn = msgr.connect(addr, entity_name_t(CEPH_ENTITY_TYPE_OSD, peer));
+ if (conn->is_connected()) {
+ set_connected();
}
}
Heartbeat::clock::time_point
-Heartbeat::Peer::failed_since(clock::time_point now) const
+Heartbeat::Session::failed_since(Heartbeat::clock::time_point now) const
{
if (do_health_screen(now) == health_state::UNHEALTHY) {
auto oldest_deadline = ping_history.begin()->second.deadline;
auto failed_since = std::min(last_rx_back, last_rx_front);
if (clock::is_zero(failed_since)) {
- logger().error("Heartbeat::Peer::failed_since(): no reply from osd.{} "
+ logger().error("Heartbeat::Session::failed_since(): no reply from osd.{} "
"ever on either front or back, first ping sent {} "
"(oldest deadline {})",
peer, first_tx, oldest_deadline);
failed_since = first_tx;
} else {
- logger().error("Heartbeat::Peer::failed_since(): no reply from osd.{} "
+ logger().error("Heartbeat::Session::failed_since(): no reply from osd.{} "
"since back {} front {} (oldest deadline {})",
peer, last_rx_back, last_rx_front, oldest_deadline);
}
}
}
-void Heartbeat::Peer::do_send_heartbeat(
- clock::time_point now,
- ceph::signedspan mnow,
- std::vector<seastar::future<>>* futures)
+void Heartbeat::Session::set_inactive_history(clock::time_point now)
{
- assert(session_started);
- const utime_t sent_stamp{now};
- const auto deadline =
- now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
- [[maybe_unused]] auto [reply, added] =
- ping_history.emplace(sent_stamp, reply_t{deadline, 0});
- for (auto& con : {con_front, con_back}) {
- auto min_message = static_cast<uint32_t>(
- local_conf()->osd_heartbeat_min_size);
- auto ping = make_message<MOSDPing>(
- heartbeat.monc.get_fsid(),
- heartbeat.service.get_osdmap_service().get_map()->get_epoch(),
- MOSDPing::PING,
- sent_stamp,
- mnow,
- mnow,
- heartbeat.service.get_osdmap_service().get_up_epoch(),
- min_message);
- reply->second.unacknowledged++;
- if (futures) {
- futures->push_back(con->send(std::move(ping)));
- }
+ assert(!started);
+ if (ping_history.empty()) {
+ const utime_t sent_stamp{now};
+ const auto deadline =
+ now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
+ [[maybe_unused]] auto [reply, added] =
+ ping_history.emplace(sent_stamp, reply_t{deadline, 0});
+ } else { // the entry is already added
+ assert(ping_history.size() == 1);
}
}
+Heartbeat::Peer::Peer(Heartbeat& heartbeat, osd_id_t peer)
+ : Connector(2), heartbeat{heartbeat}, peer{peer}, session{peer},
+ con_front(peer, heartbeat.whoami > peer, Connection::type_t::front,
+ *heartbeat.front_msgr, *this),
+ con_back(peer, heartbeat.whoami > peer, Connection::type_t::back,
+ *heartbeat.back_msgr, *this)
+{
+ logger().info("Heartbeat::Peer: osd.{} added", peer);
+}
+
+Heartbeat::Peer::~Peer()
+{
+ logger().info("Heartbeat::Peer: osd.{} removed", peer);
+}
+
void Heartbeat::Peer::send_heartbeat(
- clock::time_point now,
- ceph::signedspan mnow,
+ clock::time_point now, ceph::signedspan mnow,
std::vector<seastar::future<>>& futures)
{
- if (!pinged()) {
- first_tx = now;
- }
- last_tx = now;
-
- if (session_started) {
+ session.set_tx(now);
+ if (session.is_started()) {
do_send_heartbeat(now, mnow, &futures);
-
- // validate connection addresses
- const auto osdmap = heartbeat.service.get_osdmap_service().get_map();
- const auto front_addr = osdmap->get_hb_front_addrs(peer).front();
- if (con_front->get_peer_addr() != front_addr) {
- logger().info("Heartbeat::Peer::send_heartbeat(): "
- "peer osd.{} con_front has new address {} over {}, reset",
- peer, front_addr, con_front->get_peer_addr());
- con_front->mark_down();
- has_racing = false;
- handle_reset(con_front, false);
- }
- const auto back_addr = osdmap->get_hb_back_addrs(peer).front();
- if (con_back->get_peer_addr() != back_addr) {
- logger().info("Heartbeat::Peer::send_heartbeat(): "
- "peer osd.{} con_back has new address {} over {}, reset",
- peer, back_addr, con_back->get_peer_addr());
- con_back->mark_down();
- has_racing = false;
- handle_reset(con_back, false);
- }
+ for_each_conn([] (auto& conn) {
+ conn.validate();
+ });
} else {
// we should send MOSDPing but still cannot at this moment
if (pending_send) {
// we have already pending for a entire heartbeat interval
logger().warn("Heartbeat::Peer::send_heartbeat(): "
- "heartbeat to {} is still pending...", peer);
- has_racing = false;
- // retry con_front if still pending
- if (!front_ready) {
- if (con_front) {
- con_front->mark_down();
- handle_reset(con_front, false);
- } else {
- connect_front();
- }
- }
- // retry con_back if still pending
- if (!back_ready) {
- if (con_back) {
- con_back->mark_down();
- handle_reset(con_back, false);
- } else {
- connect_back();
- }
- }
+ "heartbeat to osd.{} is still pending...", peer);
+ for_each_conn([] (auto& conn) {
+ conn.retry();
+ });
} else {
logger().info("Heartbeat::Peer::send_heartbeat(): "
- "heartbeat to {} is pending send...", peer);
- // maintain an entry in ping_history for unhealthy check
- if (ping_history.empty()) {
- const utime_t sent_stamp{now};
- const auto deadline =
- now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
- [[maybe_unused]] auto [reply, added] =
- ping_history.emplace(sent_stamp, reply_t{deadline, 0});
- } else { // the entry is already added
- assert(ping_history.size() == 1);
- }
+ "heartbeat to osd.{} is pending send...", peer);
+ session.set_inactive_history(now);
pending_send = true;
}
}
seastar::future<> Heartbeat::Peer::handle_reply(
crimson::net::Connection* conn, Ref<MOSDPing> m)
{
- if (!session_started) {
+ if (!session.is_started()) {
// we haven't sent any ping yet
return seastar::now();
}
- auto ping = ping_history.find(m->ping_stamp);
- if (ping == ping_history.end()) {
- // old replies, deprecated by newly sent pings.
+ type_t type;
+ if (con_front.match(conn)) {
+ type = type_t::front;
+ } else if (con_back.match(conn)) {
+ type = type_t::back;
+ } else {
return seastar::now();
}
const auto now = clock::now();
- auto& unacked = ping->second.unacknowledged;
- assert(unacked);
- if (conn == con_back.get()) {
- last_rx_back = now;
- unacked--;
- } else if (conn == con_front.get()) {
- last_rx_front = now;
- unacked--;
- }
- if (unacked == 0) {
- ping_history.erase(ping_history.begin(), ++ping);
- }
- if (do_health_screen(now) == health_state::HEALTHY) {
- return heartbeat.failing_peers.cancel_one(peer);
- }
- return seastar::now();
-}
-
-void Heartbeat::Peer::handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
-{
- if (con_front == conn) {
- con_front = nullptr;
- if (is_replace) {
- assert(!front_ready);
- assert(!session_started);
- // set the racing connection, will be handled by handle_accept()
- con_front = heartbeat.front_msgr->connect(
- conn->get_peer_addr(), entity_name_t(CEPH_ENTITY_TYPE_OSD, peer));
- has_racing = true;
- logger().warn("Heartbeat::Peer::handle_reset(): "
- "con_front racing with osd.{}, updated by {}",
- peer, con_front);
- } else {
- if (front_ready) {
- front_ready = false;
- }
- if (session_started) {
- reset_session();
- }
- assert(heartbeat.whoami != peer);
- if (heartbeat.whoami > peer || !has_racing) {
- connect_front();
- } else { // whoami < peer && has_racing
- logger().info("Heartbeat::Peer::handle_reset(): "
- "con_front racing detected and lose, "
- "waiting for osd.{} connect me", peer);
- }
+ if (session.handle_reply(m->ping_stamp, type, now)) {
+ if (session.do_health_screen(now) == Session::health_state::HEALTHY) {
+ return heartbeat.failing_peers.cancel_one(peer);
}
- } else if (con_back == conn) {
- con_back = nullptr;
- if (is_replace) {
- assert(!back_ready);
- assert(!session_started);
- // set the racing connection, will be handled by handle_accept()
- con_back = heartbeat.back_msgr->connect(
- conn->get_peer_addr(), entity_name_t(CEPH_ENTITY_TYPE_OSD, peer));
- has_racing = true;
- logger().warn("Heartbeat::Peer::handle_reset(): "
- "con_back racing with osd.{}, updated by {}",
- peer, con_back);
- } else {
- if (back_ready) {
- back_ready = false;
- }
- if (session_started) {
- reset_session();
- }
- if (heartbeat.whoami == peer) {
- logger().error("Heartbeat::Peer::handle_reset(): "
- "peer is myself ({})", peer);
- } else if (heartbeat.whoami > peer || !has_racing) {
- connect_back();
- } else { // whoami < peer && has_racing
- logger().info("Heartbeat::Peer::handle_reset(): "
- "con_back racing detected and lose, "
- "waiting for osd.{} connect me", peer);
- }
- }
- } else {
- // ignore the unrelated conn
- }
-}
-
-void Heartbeat::Peer::handle_connect(crimson::net::ConnectionRef conn)
-{
- if (con_front == conn) {
- assert(!front_ready);
- assert(!session_started);
- notify_front_ready();
- } else if (con_back == conn) {
- assert(!back_ready);
- assert(!session_started);
- notify_back_ready();
- } else {
- // ignore the unrelated connection
}
+ return seastar::now();
}
-void Heartbeat::Peer::handle_accept(crimson::net::ConnectionRef conn)
+entity_addr_t Heartbeat::Peer::get_peer_addr(type_t type)
{
- handle_connect(conn);
-
- const auto peer_addr = conn->get_peer_addr();
const auto osdmap = heartbeat.service.get_osdmap_service().get_map();
- const auto front_addr = osdmap->get_hb_front_addrs(peer).front();
- if (!con_front && front_addr == peer_addr) {
- logger().info("Heartbeat::Peer::handle_accept(): "
- "con_front racing resolved for osd.{}", peer);
- con_front = conn;
- notify_front_ready();
- }
- const auto back_addr = osdmap->get_hb_back_addrs(peer).front();
- if (!con_back && back_addr == peer_addr) {
- logger().info("Heartbeat::Peer::handle_accept(): "
- "con_back racing resolved for osd.{}", peer);
- con_back = conn;
- notify_back_ready();
+ if (type == type_t::front) {
+ return osdmap->get_hb_front_addrs(peer).front();
+ } else {
+ return osdmap->get_hb_back_addrs(peer).front();
}
}
-void Heartbeat::Peer::start_session()
+void Heartbeat::Peer::all_connected()
{
logger().info("Heartbeat::Peer: osd.{} started (send={})",
peer, pending_send);
- assert(!session_started);
- session_started = true;
- ping_history.clear();
+ session.start();
if (pending_send) {
pending_send = false;
do_send_heartbeat(clock::now(), heartbeat.service.get_mnow(), nullptr);
}
}
-void Heartbeat::Peer::reset_session()
+void Heartbeat::Peer::connection_lost()
{
logger().info("Heartbeat::Peer: osd.{} reset", peer);
- assert(session_started);
- if (!ping_history.empty()) {
- // we lost our ping_history of the last session, but still need to keep
- // the oldest deadline for unhealthy check.
- auto oldest = ping_history.begin();
- auto sent_stamp = oldest->first;
- auto deadline = oldest->second.deadline;
- ping_history.clear();
- ping_history.emplace(sent_stamp, reply_t{deadline, 0});
- }
- session_started = false;
-}
-
-void Heartbeat::Peer::notify_front_ready()
-{
- assert(con_front);
- assert(!front_ready);
- assert(!session_started);
- front_ready = true;
- if (front_ready && back_ready) {
- start_session();
- }
-}
-
-void Heartbeat::Peer::connect_front()
-{
- assert(!con_front);
- assert(!front_ready);
- assert(!session_started);
- auto osdmap = heartbeat.service.get_osdmap_service().get_map();
- // TODO: use addrs
- con_front = heartbeat.front_msgr->connect(
- osdmap->get_hb_front_addrs(peer).front(),
- entity_name_t(CEPH_ENTITY_TYPE_OSD, peer));
- if (con_front->is_connected()) {
- notify_front_ready();
- }
-}
-
-void Heartbeat::Peer::notify_back_ready()
-{
- assert(con_back);
- assert(!back_ready);
- assert(!session_started);
- back_ready = true;
- if (front_ready && back_ready) {
- start_session();
- }
+ session.lost();
}
-void Heartbeat::Peer::connect_back()
+void Heartbeat::Peer::do_send_heartbeat(
+ Heartbeat::clock::time_point now,
+ ceph::signedspan mnow,
+ std::vector<seastar::future<>>* futures)
{
- assert(!con_back);
- assert(!back_ready);
- assert(!session_started);
- auto osdmap = heartbeat.service.get_osdmap_service().get_map();
- // TODO: use addrs
- con_back = heartbeat.back_msgr->connect(
- osdmap->get_hb_back_addrs(peer).front(),
- entity_name_t(CEPH_ENTITY_TYPE_OSD, peer));
- if (con_back->is_connected()) {
- notify_back_ready();
- }
+ const utime_t sent_stamp{now};
+ const auto deadline =
+ now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
+ session.emplace_history(sent_stamp, deadline);
+ for_each_conn([&, this] (auto& conn) {
+ auto min_message = static_cast<uint32_t>(
+ local_conf()->osd_heartbeat_min_size);
+ auto ping = make_message<MOSDPing>(
+ heartbeat.monc.get_fsid(),
+ heartbeat.service.get_osdmap_service().get_map()->get_epoch(),
+ MOSDPing::PING,
+ sent_stamp,
+ mnow,
+ mnow,
+ heartbeat.service.get_osdmap_service().get_up_epoch(),
+ min_message);
+ if (futures) {
+ futures->push_back(conn.send(std::move(ping)));
+ }
+ });
}
bool Heartbeat::FailingPeers::add_pending(
// use real_clock so it can be converted to utime_t
using clock = ceph::coarse_real_clock;
+ class Connector;
+ class Connection;
+ class Session;
class Peer;
using peers_map_t = std::map<osd_id_t, Peer>;
peers_map_t peers;
return out;
}
-class Heartbeat::Peer {
+class Heartbeat::Connector {
public:
- Peer(Heartbeat&, osd_id_t);
- ~Peer();
- Peer(Peer&&) = delete;
- Peer(const Peer&) = delete;
- Peer& operator=(const Peer&) = delete;
+ Connector(size_t connections) : connections{connections} {}
+
+ void increase_connected() {
+ assert(connected < connections);
+ ++connected;
+ if (connected == connections) {
+ all_connected();
+ }
+ }
+ void decrease_connected() {
+ assert(connected > 0);
+ if (connected == connections) {
+ connection_lost();
+ }
+ --connected;
+ }
+ enum class type_t { front, back };
+ virtual entity_addr_t get_peer_addr(type_t) = 0;
+
+ protected:
+ virtual void all_connected() = 0;
+ virtual void connection_lost() = 0;
- void set_epoch(epoch_t epoch_) { epoch = epoch_; }
- epoch_t get_epoch() const { return epoch; }
+ private:
+ const size_t connections;
+ size_t connected = 0;
+};
- // if failure, return time_point since last active
- // else, return clock::zero()
- clock::time_point failed_since(clock::time_point now) const;
- void send_heartbeat(clock::time_point now,
- ceph::signedspan mnow,
- std::vector<seastar::future<>>&);
- seastar::future<> handle_reply(crimson::net::Connection*, Ref<MOSDPing>);
- void handle_reset(crimson::net::ConnectionRef, bool is_replace);
- void handle_connect(crimson::net::ConnectionRef);
- void handle_accept(crimson::net::ConnectionRef);
+class Heartbeat::Connection {
+ public:
+ using type_t = Connector::type_t;
+ Connection(osd_id_t peer, bool is_winner_side, type_t type,
+ crimson::net::Messenger& msgr, Connector& connector)
+ : peer{peer}, is_winner_side{is_winner_side}, type{type},
+ msgr{msgr}, connector{connector} {
+ connect();
+ }
+ ~Connection();
+
+ bool match(crimson::net::Connection* _conn) const;
+ bool match(crimson::net::ConnectionRef conn) const {
+ return match(conn.get());
+ }
+ void connected() {
+ set_connected();
+ }
+ void accepted(crimson::net::ConnectionRef);
+ void replaced();
+ void reset();
+ seastar::future<> send(MessageRef msg);
+ void validate();
+ // retry connection if still pending
+ void retry();
private:
- bool pinged() const;
+ void set_connected();
+ void connect();
+
+ const osd_id_t peer;
+ const bool is_winner_side;
+ const type_t type;
+ crimson::net::Messenger& msgr;
+ Connector& connector;
+
+ crimson::net::ConnectionRef conn;
+ bool is_connected = false;
+ bool racing_detected = false;
+
+ friend std::ostream& operator<<(std::ostream& os, const Connection c) {
+ if (c.type == type_t::front) {
+ return os << "con_front(osd." << c.peer << ")";
+ } else {
+ return os << "con_back(osd." << c.peer << ")";
+ }
+ }
+};
+
+class Heartbeat::Session {
+ public:
+ Session(osd_id_t peer) : peer{peer} {}
+
+ void set_epoch(epoch_t epoch_) { epoch = epoch_; }
+ epoch_t get_epoch() const { return epoch; }
+ bool is_started() const { return started; }
+ bool pinged() const {
+ if (clock::is_zero(first_tx)) {
+ // i can never receive a pong without sending any ping message first.
+ assert(clock::is_zero(last_rx_front) &&
+ clock::is_zero(last_rx_back));
+ return false;
+ } else {
+ return true;
+ }
+ }
+
enum class health_state {
UNKNOWN,
UNHEALTHY,
HEALTHY,
};
- health_state do_health_screen(clock::time_point now) const;
-
- // a session starts when con_front and con_back are both connected
- void start_session();
- // a session resets when either con_front or con_back is reset
- void reset_session();
- // notify when con_front becomes ready, possibly start session
- void notify_front_ready();
- void connect_front();
- // notify when con_back becomes ready, possibly start session
- void notify_back_ready();
- void connect_back();
-
- void do_send_heartbeat(clock::time_point now,
- ceph::signedspan mnow,
- std::vector<seastar::future<>>*);
+ health_state do_health_screen(clock::time_point now) const {
+ if (!pinged()) {
+ // we are not healty nor unhealty because we haven't sent anything yet
+ return health_state::UNKNOWN;
+ } else if (!ping_history.empty() && ping_history.begin()->second.deadline < now) {
+ return health_state::UNHEALTHY;
+ } else if (!clock::is_zero(last_rx_front) &&
+ !clock::is_zero(last_rx_back)) {
+ // only declare to be healthy until we have received the first
+ // replies from both front/back connections
+ return health_state::HEALTHY;
+ } else {
+ return health_state::UNKNOWN;
+ }
+ }
+
+ clock::time_point failed_since(clock::time_point now) const;
+
+ void set_tx(clock::time_point now) {
+ if (!pinged()) {
+ first_tx = now;
+ }
+ last_tx = now;
+ }
+
+ void start() {
+ assert(!started);
+ started = true;
+ ping_history.clear();
+ }
+
+ void emplace_history(const utime_t& sent_stamp,
+ const clock::time_point& deadline) {
+ assert(started);
+ [[maybe_unused]] auto [reply, added] =
+ ping_history.emplace(sent_stamp, reply_t{deadline, 2});
+ }
+
+ bool handle_reply(const utime_t& ping_stamp,
+ Connection::type_t type,
+ clock::time_point now) {
+ assert(started);
+ auto ping = ping_history.find(ping_stamp);
+ if (ping == ping_history.end()) {
+ // old replies, deprecated by newly sent pings.
+ return false;
+ }
+ auto& unacked = ping->second.unacknowledged;
+ assert(unacked);
+ if (type == Connection::type_t::front) {
+ last_rx_front = now;
+ unacked--;
+ } else {
+ last_rx_back = now;
+ unacked--;
+ }
+ if (unacked == 0) {
+ ping_history.erase(ping_history.begin(), ++ping);
+ }
+ return true;
+ }
+
+ void lost() {
+ assert(started);
+ started = false;
+ if (!ping_history.empty()) {
+ // we lost our ping_history of the last session, but still need to keep
+ // the oldest deadline for unhealthy check.
+ auto oldest = ping_history.begin();
+ auto sent_stamp = oldest->first;
+ auto deadline = oldest->second.deadline;
+ ping_history.clear();
+ ping_history.emplace(sent_stamp, reply_t{deadline, 0});
+ }
+ }
+
+ // maintain an entry in ping_history for unhealthy check
+ void set_inactive_history(clock::time_point);
private:
- Heartbeat& heartbeat;
const osd_id_t peer;
-
+ bool started = false;
// time we sent our first ping request
clock::time_point first_tx;
// last time we sent a ping request
// most recent epoch we wanted this peer
epoch_t epoch;
- // if racing happened
- bool has_racing = false;
- // peer connection (front)
- crimson::net::ConnectionRef con_front;
- bool front_ready = false;
- // peer connection (back)
- crimson::net::ConnectionRef con_back;
- bool back_ready = false;
-
- // start to send pings and track ping_history
- bool session_started = false;
- // if need to send heartbeat when session started
- bool pending_send = false;
-
struct reply_t {
clock::time_point deadline;
// one sent over front conn, another sent over back conn
// history of inflight pings, arranging by timestamp we sent
std::map<utime_t, reply_t> ping_history;
};
+
+class Heartbeat::Peer final : private Heartbeat::Connector {
+ public:
+ Peer(Heartbeat&, osd_id_t);
+ ~Peer();
+ Peer(Peer&&) = delete;
+ Peer(const Peer&) = delete;
+ Peer& operator=(const Peer&) = delete;
+
+ void set_epoch(epoch_t epoch) { session.set_epoch(epoch); }
+ epoch_t get_epoch() const { return session.get_epoch(); }
+
+ // if failure, return time_point since last active
+ // else, return clock::zero()
+ clock::time_point failed_since(clock::time_point now) const {
+ return session.failed_since(now);
+ }
+ void send_heartbeat(
+ clock::time_point, ceph::signedspan, std::vector<seastar::future<>>&);
+ seastar::future<> handle_reply(crimson::net::Connection*, Ref<MOSDPing>);
+ void handle_reset(crimson::net::ConnectionRef conn, bool is_replace) {
+ for_each_conn([&] (auto& _conn) {
+ if (_conn.match(conn)) {
+ if (is_replace) {
+ _conn.replaced();
+ } else {
+ _conn.reset();
+ }
+ }
+ });
+ }
+ void handle_connect(crimson::net::ConnectionRef conn) {
+ for_each_conn([&] (auto& _conn) {
+ if (_conn.match(conn)) {
+ _conn.connected();
+ }
+ });
+ }
+ void handle_accept(crimson::net::ConnectionRef conn) {
+ for_each_conn([&] (auto& _conn) {
+ _conn.accepted(conn);
+ });
+ }
+
+ private:
+ entity_addr_t get_peer_addr(type_t type) override;
+ void all_connected() override;
+ void connection_lost() override;
+ void do_send_heartbeat(
+ clock::time_point, ceph::signedspan, std::vector<seastar::future<>>*);
+
+ template <typename Func>
+ void for_each_conn(Func&& f) {
+ f(con_front);
+ f(con_back);
+ }
+
+ Heartbeat& heartbeat;
+ const osd_id_t peer;
+ Session session;
+ // if need to send heartbeat when session started
+ bool pending_send = false;
+ Connection con_front;
+ Connection con_back;
+};