}
}
-bool Heartbeat::Connection::match(crimson::net::Connection* _conn) const
+bool Heartbeat::Connection::matches(crimson::net::Connection* _conn) const
{
- if (conn && conn.get() == _conn) {
- return true;
- } else {
- return false;
- }
+ return (conn && conn.get() == _conn);
}
void Heartbeat::Connection::accepted(crimson::net::ConnectionRef accepted_conn)
{
if (!conn) {
- if (accepted_conn->get_peer_addr() == connector.get_peer_addr(type)) {
+ if (accepted_conn->get_peer_addr() == listener.get_peer_addr(type)) {
logger().info("Heartbeat::Connection::accepted(): "
"{} racing resolved", *this);
conn = accepted_conn;
conn = nullptr;
if (is_connected) {
is_connected = false;
- connector.decrease_connected();
+ listener.decrease_connected();
}
if (!racing_detected || is_winner_side) {
connect();
void Heartbeat::Connection::validate()
{
assert(is_connected);
- auto peer_addr = connector.get_peer_addr(type);
+ auto peer_addr = listener.get_peer_addr(type);
if (conn->get_peer_addr() != peer_addr) {
logger().info("Heartbeat::Connection::validate(): "
"{} has new address {} over {}, reset",
{
assert(!is_connected);
is_connected = true;
- connector.increase_connected();
+ listener.increase_connected();
}
void Heartbeat::Connection::connect()
{
assert(!conn);
- auto addr = connector.get_peer_addr(type);
+ auto addr = listener.get_peer_addr(type);
conn = msgr.connect(addr, entity_name_t(CEPH_ENTITY_TYPE_OSD, peer));
if (conn->is_connected()) {
set_connected();
void Heartbeat::Session::set_inactive_history(clock::time_point now)
{
- assert(!started);
+ assert(!connected);
if (ping_history.empty()) {
const utime_t sent_stamp{now};
const auto deadline =
now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
- [[maybe_unused]] auto [reply, added] =
- ping_history.emplace(sent_stamp, reply_t{deadline, 0});
+ ping_history.emplace(sent_stamp, reply_t{deadline, 0});
} else { // the entry is already added
assert(ping_history.size() == 1);
}
}
Heartbeat::Peer::Peer(Heartbeat& heartbeat, osd_id_t peer)
- : Connector(2), heartbeat{heartbeat}, peer{peer}, session{peer},
+ : ConnectionListener(2), heartbeat{heartbeat}, peer{peer}, session{peer},
con_front(peer, heartbeat.whoami > peer, Connection::type_t::front,
*heartbeat.front_msgr, *this),
con_back(peer, heartbeat.whoami > peer, Connection::type_t::back,
return seastar::now();
}
type_t type;
- if (con_front.match(conn)) {
+ if (con_front.matches(conn)) {
type = type_t::front;
- } else if (con_back.match(conn)) {
+ } else if (con_back.matches(conn)) {
type = type_t::back;
} else {
return seastar::now();
}
const auto now = clock::now();
- if (session.handle_reply(m->ping_stamp, type, now)) {
+ if (session.on_pong(m->ping_stamp, type, now)) {
if (session.do_health_screen(now) == Session::health_state::HEALTHY) {
return heartbeat.failing_peers.cancel_one(peer);
}
}
}
-void Heartbeat::Peer::all_connected()
+void Heartbeat::Peer::on_connected()
{
- logger().info("Heartbeat::Peer: osd.{} started (send={})",
+ logger().info("Heartbeat::Peer: osd.{} connected (send={})",
peer, pending_send);
- session.start();
+ session.on_connected();
if (pending_send) {
pending_send = false;
do_send_heartbeat(clock::now(), heartbeat.service.get_mnow(), nullptr);
}
}
-void Heartbeat::Peer::connection_lost()
+void Heartbeat::Peer::on_disconnected()
{
- logger().info("Heartbeat::Peer: osd.{} reset", peer);
- session.lost();
+ logger().info("Heartbeat::Peer: osd.{} disconnected", peer);
+ session.on_disconnected();
}
void Heartbeat::Peer::do_send_heartbeat(
const utime_t sent_stamp{now};
const auto deadline =
now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
- session.emplace_history(sent_stamp, deadline);
+ session.on_ping(sent_stamp, deadline);
for_each_conn([&, this] (auto& conn) {
auto min_message = static_cast<uint32_t>(
local_conf()->osd_heartbeat_min_size);
// use real_clock so it can be converted to utime_t
using clock = ceph::coarse_real_clock;
- class Connector;
+ class ConnectionListener;
class Connection;
class Session;
class Peer;
return out;
}
-class Heartbeat::Connector {
+/*
+ * Event driven interface for Heartbeat::Peer to be notified when both hb_front
+ * and hb_back are connected, or connection is lost.
+ */
+class Heartbeat::ConnectionListener {
public:
- Connector(size_t connections) : connections{connections} {}
+ ConnectionListener(size_t connections) : connections{connections} {}
void increase_connected() {
assert(connected < connections);
++connected;
if (connected == connections) {
- all_connected();
+ on_connected();
}
}
void decrease_connected() {
assert(connected > 0);
if (connected == connections) {
- connection_lost();
+ on_disconnected();
}
--connected;
}
virtual entity_addr_t get_peer_addr(type_t) = 0;
protected:
- virtual void all_connected() = 0;
- virtual void connection_lost() = 0;
+ virtual void on_connected() = 0;
+ virtual void on_disconnected() = 0;
private:
const size_t connections;
class Heartbeat::Connection {
public:
- using type_t = Connector::type_t;
+ using type_t = ConnectionListener::type_t;
Connection(osd_id_t peer, bool is_winner_side, type_t type,
- crimson::net::Messenger& msgr, Connector& connector)
- : peer{peer}, is_winner_side{is_winner_side}, type{type},
- msgr{msgr}, connector{connector} {
+ crimson::net::Messenger& msgr,
+ ConnectionListener& listener)
+ : peer{peer}, type{type},
+ msgr{msgr}, listener{listener},
+ is_winner_side{is_winner_side} {
connect();
}
~Connection();
- bool match(crimson::net::Connection* _conn) const;
- bool match(crimson::net::ConnectionRef conn) const {
- return match(conn.get());
+ bool matches(crimson::net::Connection* _conn) const;
+ bool matches(crimson::net::ConnectionRef conn) const {
+ return matches(conn.get());
}
void connected() {
set_connected();
void connect();
const osd_id_t peer;
- const bool is_winner_side;
const type_t type;
crimson::net::Messenger& msgr;
- Connector& connector;
+ ConnectionListener& listener;
+
+/*
+ * Resolve the following racing when both me and peer are trying to connect
+ * each other symmetrically, under SocketPolicy::lossy_client:
+ *
+ * OSD.A OSD.B
+ * - -
+ * |-[1]----> <----[2]-|
+ * \ /
+ * \ /
+ * delay.. X delay..
+ * / \
+ * |-[1]x> / \ <x[2]-|
+ * |<-[2]--- ---[1]->|
+ * |(reset#1) (reset#2)|
+ * |(reconnectB) (reconnectA)|
+ * |-[2]---> <---[1]-|
+ * delay.. delay..
+ * (remote close populated)
+ * |-[2]x> <x[1]-|
+ * |(reset#2) (reset#1)|
+ * | ... ... |
+ * (dead loop!)
+ *
+ * Our solution is to remember if such racing was happened recently, and
+ * establish connection asymmetrically only from the winner side whose osd-id
+ * is larger.
+ */
+ const bool is_winner_side;
+ bool racing_detected = false;
crimson::net::ConnectionRef conn;
bool is_connected = false;
- bool racing_detected = false;
friend std::ostream& operator<<(std::ostream& os, const Connection c) {
if (c.type == type_t::front) {
}
};
+/*
+ * Track the ping history and ping reply (the pong) from the same session, clean up
+ * history once hb_front or hb_back loses connection and restart the session once
+ * both connections are connected again.
+ *
+ * We cannot simply remove the entire Heartbeat::Peer once hb_front or hb_back
+ * loses connection, because we would end up with the following deadloop:
+ *
+ * OSD.A OSD.B
+ * - -
+ * hb_front reset <--(network)--- hb_front close
+ * | ^
+ * | |
+ * remove Peer B (dead loop!) remove Peer A
+ * | |
+ * V |
+ * hb_back close ----(network)---> hb_back reset
+ */
class Heartbeat::Session {
public:
Session(osd_id_t peer) : peer{peer} {}
void set_epoch(epoch_t epoch_) { epoch = epoch_; }
epoch_t get_epoch() const { return epoch; }
- bool is_started() const { return started; }
+ bool is_started() const { return connected; }
bool pinged() const {
if (clock::is_zero(first_tx)) {
// i can never receive a pong without sending any ping message first.
last_tx = now;
}
- void start() {
- assert(!started);
- started = true;
+ void on_connected() {
+ assert(!connected);
+ connected = true;
ping_history.clear();
}
- void emplace_history(const utime_t& sent_stamp,
- const clock::time_point& deadline) {
- assert(started);
+ void on_ping(const utime_t& sent_stamp,
+ const clock::time_point& deadline) {
+ assert(connected);
[[maybe_unused]] auto [reply, added] =
ping_history.emplace(sent_stamp, reply_t{deadline, 2});
}
- bool handle_reply(const utime_t& ping_stamp,
- Connection::type_t type,
- clock::time_point now) {
- assert(started);
+ bool on_pong(const utime_t& ping_stamp,
+ Connection::type_t type,
+ clock::time_point now) {
+ assert(connected);
auto ping = ping_history.find(ping_stamp);
if (ping == ping_history.end()) {
// old replies, deprecated by newly sent pings.
return true;
}
- void lost() {
- assert(started);
- started = false;
+ void on_disconnected() {
+ assert(connected);
+ connected = false;
if (!ping_history.empty()) {
// we lost our ping_history of the last session, but still need to keep
// the oldest deadline for unhealthy check.
private:
const osd_id_t peer;
- bool started = false;
+ bool connected = false;
// time we sent our first ping request
clock::time_point first_tx;
// last time we sent a ping request
std::map<utime_t, reply_t> ping_history;
};
-class Heartbeat::Peer final : private Heartbeat::Connector {
+class Heartbeat::Peer final : private Heartbeat::ConnectionListener {
public:
Peer(Heartbeat&, osd_id_t);
~Peer();
seastar::future<> handle_reply(crimson::net::Connection*, Ref<MOSDPing>);
void handle_reset(crimson::net::ConnectionRef conn, bool is_replace) {
for_each_conn([&] (auto& _conn) {
- if (_conn.match(conn)) {
+ if (_conn.matches(conn)) {
if (is_replace) {
_conn.replaced();
} else {
}
void handle_connect(crimson::net::ConnectionRef conn) {
for_each_conn([&] (auto& _conn) {
- if (_conn.match(conn)) {
+ if (_conn.matches(conn)) {
_conn.connected();
}
});
private:
entity_addr_t get_peer_addr(type_t type) override;
- void all_connected() override;
- void connection_lost() override;
+ void on_connected() override;
+ void on_disconnected() override;
void do_send_heartbeat(
clock::time_point, ceph::signedspan, std::vector<seastar::future<>>*);
Heartbeat& heartbeat;
const osd_id_t peer;
Session session;
- // if need to send heartbeat when session started
+ // if need to send heartbeat when session connected
bool pending_send = false;
Connection con_front;
Connection con_back;