From 94f1110aebefe0672e31997cef3bcf0adbd6871a Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Sun, 28 Jun 2020 10:59:15 +0800 Subject: [PATCH] crimson/osd: improve readability of Heartbeat::Peer classes * Add rationales to introduce Heartbeat::Peer class series. * Find better names for internal interfaces. Signed-off-by: Yingxin Cheng --- src/crimson/osd/heartbeat.cc | 45 ++++++------ src/crimson/osd/heartbeat.h | 128 ++++++++++++++++++++++++----------- 2 files changed, 110 insertions(+), 63 deletions(-) diff --git a/src/crimson/osd/heartbeat.cc b/src/crimson/osd/heartbeat.cc index a5f06ba8a43..e3b8b3c6a2a 100644 --- a/src/crimson/osd/heartbeat.cc +++ b/src/crimson/osd/heartbeat.cc @@ -358,19 +358,15 @@ Heartbeat::Connection::~Connection() } } -bool Heartbeat::Connection::match(crimson::net::Connection* _conn) const +bool Heartbeat::Connection::matches(crimson::net::Connection* _conn) const { - if (conn && conn.get() == _conn) { - return true; - } else { - return false; - } + return (conn && conn.get() == _conn); } void Heartbeat::Connection::accepted(crimson::net::ConnectionRef accepted_conn) { if (!conn) { - if (accepted_conn->get_peer_addr() == connector.get_peer_addr(type)) { + if (accepted_conn->get_peer_addr() == listener.get_peer_addr(type)) { logger().info("Heartbeat::Connection::accepted(): " "{} racing resolved", *this); conn = accepted_conn; @@ -399,7 +395,7 @@ void Heartbeat::Connection::reset() conn = nullptr; if (is_connected) { is_connected = false; - connector.decrease_connected(); + listener.decrease_connected(); } if (!racing_detected || is_winner_side) { connect(); @@ -419,7 +415,7 @@ seastar::future<> Heartbeat::Connection::send(MessageRef msg) void Heartbeat::Connection::validate() { assert(is_connected); - auto peer_addr = connector.get_peer_addr(type); + auto peer_addr = listener.get_peer_addr(type); if (conn->get_peer_addr() != peer_addr) { logger().info("Heartbeat::Connection::validate(): " "{} has new address {} over {}, reset", @@ -447,13 +443,13 @@ void Heartbeat::Connection::set_connected() { assert(!is_connected); is_connected = true; - connector.increase_connected(); + listener.increase_connected(); } void Heartbeat::Connection::connect() { assert(!conn); - auto addr = connector.get_peer_addr(type); + auto addr = listener.get_peer_addr(type); conn = msgr.connect(addr, entity_name_t(CEPH_ENTITY_TYPE_OSD, peer)); if (conn->is_connected()) { set_connected(); @@ -485,20 +481,19 @@ Heartbeat::Session::failed_since(Heartbeat::clock::time_point now) const void Heartbeat::Session::set_inactive_history(clock::time_point now) { - assert(!started); + assert(!connected); if (ping_history.empty()) { const utime_t sent_stamp{now}; const auto deadline = now + std::chrono::seconds(local_conf()->osd_heartbeat_grace); - [[maybe_unused]] auto [reply, added] = - ping_history.emplace(sent_stamp, reply_t{deadline, 0}); + ping_history.emplace(sent_stamp, reply_t{deadline, 0}); } else { // the entry is already added assert(ping_history.size() == 1); } } Heartbeat::Peer::Peer(Heartbeat& heartbeat, osd_id_t peer) - : Connector(2), heartbeat{heartbeat}, peer{peer}, session{peer}, + : ConnectionListener(2), heartbeat{heartbeat}, peer{peer}, session{peer}, con_front(peer, heartbeat.whoami > peer, Connection::type_t::front, *heartbeat.front_msgr, *this), con_back(peer, heartbeat.whoami > peer, Connection::type_t::back, @@ -548,15 +543,15 @@ seastar::future<> Heartbeat::Peer::handle_reply( return seastar::now(); } type_t type; - if (con_front.match(conn)) { + if (con_front.matches(conn)) { type = type_t::front; - } else if (con_back.match(conn)) { + } else if (con_back.matches(conn)) { type = type_t::back; } else { return seastar::now(); } const auto now = clock::now(); - if (session.handle_reply(m->ping_stamp, type, now)) { + if (session.on_pong(m->ping_stamp, type, now)) { if (session.do_health_screen(now) == Session::health_state::HEALTHY) { return heartbeat.failing_peers.cancel_one(peer); } @@ -574,21 +569,21 @@ entity_addr_t Heartbeat::Peer::get_peer_addr(type_t type) } } -void Heartbeat::Peer::all_connected() +void Heartbeat::Peer::on_connected() { - logger().info("Heartbeat::Peer: osd.{} started (send={})", + logger().info("Heartbeat::Peer: osd.{} connected (send={})", peer, pending_send); - session.start(); + session.on_connected(); if (pending_send) { pending_send = false; do_send_heartbeat(clock::now(), heartbeat.service.get_mnow(), nullptr); } } -void Heartbeat::Peer::connection_lost() +void Heartbeat::Peer::on_disconnected() { - logger().info("Heartbeat::Peer: osd.{} reset", peer); - session.lost(); + logger().info("Heartbeat::Peer: osd.{} disconnected", peer); + session.on_disconnected(); } void Heartbeat::Peer::do_send_heartbeat( @@ -599,7 +594,7 @@ void Heartbeat::Peer::do_send_heartbeat( const utime_t sent_stamp{now}; const auto deadline = now + std::chrono::seconds(local_conf()->osd_heartbeat_grace); - session.emplace_history(sent_stamp, deadline); + session.on_ping(sent_stamp, deadline); for_each_conn([&, this] (auto& conn) { auto min_message = static_cast( local_conf()->osd_heartbeat_min_size); diff --git a/src/crimson/osd/heartbeat.h b/src/crimson/osd/heartbeat.h index f9f54228ebc..55f779a3a8b 100644 --- a/src/crimson/osd/heartbeat.h +++ b/src/crimson/osd/heartbeat.h @@ -83,7 +83,7 @@ private: // use real_clock so it can be converted to utime_t using clock = ceph::coarse_real_clock; - class Connector; + class ConnectionListener; class Connection; class Session; class Peer; @@ -130,21 +130,25 @@ inline std::ostream& operator<<(std::ostream& out, const Heartbeat& hb) { return out; } -class Heartbeat::Connector { +/* + * Event driven interface for Heartbeat::Peer to be notified when both hb_front + * and hb_back are connected, or connection is lost. + */ +class Heartbeat::ConnectionListener { public: - Connector(size_t connections) : connections{connections} {} + ConnectionListener(size_t connections) : connections{connections} {} void increase_connected() { assert(connected < connections); ++connected; if (connected == connections) { - all_connected(); + on_connected(); } } void decrease_connected() { assert(connected > 0); if (connected == connections) { - connection_lost(); + on_disconnected(); } --connected; } @@ -152,8 +156,8 @@ class Heartbeat::Connector { virtual entity_addr_t get_peer_addr(type_t) = 0; protected: - virtual void all_connected() = 0; - virtual void connection_lost() = 0; + virtual void on_connected() = 0; + virtual void on_disconnected() = 0; private: const size_t connections; @@ -162,18 +166,20 @@ class Heartbeat::Connector { class Heartbeat::Connection { public: - using type_t = Connector::type_t; + using type_t = ConnectionListener::type_t; Connection(osd_id_t peer, bool is_winner_side, type_t type, - crimson::net::Messenger& msgr, Connector& connector) - : peer{peer}, is_winner_side{is_winner_side}, type{type}, - msgr{msgr}, connector{connector} { + crimson::net::Messenger& msgr, + ConnectionListener& listener) + : peer{peer}, type{type}, + msgr{msgr}, listener{listener}, + is_winner_side{is_winner_side} { connect(); } ~Connection(); - bool match(crimson::net::Connection* _conn) const; - bool match(crimson::net::ConnectionRef conn) const { - return match(conn.get()); + bool matches(crimson::net::Connection* _conn) const; + bool matches(crimson::net::ConnectionRef conn) const { + return matches(conn.get()); } void connected() { set_connected(); @@ -191,14 +197,42 @@ class Heartbeat::Connection { void connect(); const osd_id_t peer; - const bool is_winner_side; const type_t type; crimson::net::Messenger& msgr; - Connector& connector; + ConnectionListener& listener; + +/* + * Resolve the following racing when both me and peer are trying to connect + * each other symmetrically, under SocketPolicy::lossy_client: + * + * OSD.A OSD.B + * - - + * |-[1]----> <----[2]-| + * \ / + * \ / + * delay.. X delay.. + * / \ + * |-[1]x> / \ | + * |(reset#1) (reset#2)| + * |(reconnectB) (reconnectA)| + * |-[2]---> <---[1]-| + * delay.. delay.. + * (remote close populated) + * |-[2]x> hb_back reset + */ class Heartbeat::Session { public: Session(osd_id_t peer) : peer{peer} {} void set_epoch(epoch_t epoch_) { epoch = epoch_; } epoch_t get_epoch() const { return epoch; } - bool is_started() const { return started; } + bool is_started() const { return connected; } bool pinged() const { if (clock::is_zero(first_tx)) { // i can never receive a pong without sending any ping message first. @@ -257,23 +309,23 @@ class Heartbeat::Session { last_tx = now; } - void start() { - assert(!started); - started = true; + void on_connected() { + assert(!connected); + connected = true; ping_history.clear(); } - void emplace_history(const utime_t& sent_stamp, - const clock::time_point& deadline) { - assert(started); + void on_ping(const utime_t& sent_stamp, + const clock::time_point& deadline) { + assert(connected); [[maybe_unused]] auto [reply, added] = ping_history.emplace(sent_stamp, reply_t{deadline, 2}); } - bool handle_reply(const utime_t& ping_stamp, - Connection::type_t type, - clock::time_point now) { - assert(started); + bool on_pong(const utime_t& ping_stamp, + Connection::type_t type, + clock::time_point now) { + assert(connected); auto ping = ping_history.find(ping_stamp); if (ping == ping_history.end()) { // old replies, deprecated by newly sent pings. @@ -294,9 +346,9 @@ class Heartbeat::Session { return true; } - void lost() { - assert(started); - started = false; + void on_disconnected() { + assert(connected); + connected = false; if (!ping_history.empty()) { // we lost our ping_history of the last session, but still need to keep // the oldest deadline for unhealthy check. @@ -313,7 +365,7 @@ class Heartbeat::Session { private: const osd_id_t peer; - bool started = false; + bool connected = false; // time we sent our first ping request clock::time_point first_tx; // last time we sent a ping request @@ -334,7 +386,7 @@ class Heartbeat::Session { std::map ping_history; }; -class Heartbeat::Peer final : private Heartbeat::Connector { +class Heartbeat::Peer final : private Heartbeat::ConnectionListener { public: Peer(Heartbeat&, osd_id_t); ~Peer(); @@ -355,7 +407,7 @@ class Heartbeat::Peer final : private Heartbeat::Connector { seastar::future<> handle_reply(crimson::net::Connection*, Ref); void handle_reset(crimson::net::ConnectionRef conn, bool is_replace) { for_each_conn([&] (auto& _conn) { - if (_conn.match(conn)) { + if (_conn.matches(conn)) { if (is_replace) { _conn.replaced(); } else { @@ -366,7 +418,7 @@ class Heartbeat::Peer final : private Heartbeat::Connector { } void handle_connect(crimson::net::ConnectionRef conn) { for_each_conn([&] (auto& _conn) { - if (_conn.match(conn)) { + if (_conn.matches(conn)) { _conn.connected(); } }); @@ -379,8 +431,8 @@ class Heartbeat::Peer final : private Heartbeat::Connector { private: entity_addr_t get_peer_addr(type_t type) override; - void all_connected() override; - void connection_lost() override; + void on_connected() override; + void on_disconnected() override; void do_send_heartbeat( clock::time_point, ceph::signedspan, std::vector>*); @@ -393,7 +445,7 @@ class Heartbeat::Peer final : private Heartbeat::Connector { Heartbeat& heartbeat; const osd_id_t peer; Session session; - // if need to send heartbeat when session started + // if need to send heartbeat when session connected bool pending_send = false; Connection con_front; Connection con_back; -- 2.39.5