From: Yingxin Cheng Date: Wed, 17 Jun 2020 03:39:03 +0000 (+0800) Subject: crimson/osd: refactor Heartbeat::Peer X-Git-Tag: v16.1.0~1895^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=7a7feb7c62cffc93c392ea06a7be6b9c59651195;p=ceph.git crimson/osd: refactor Heartbeat::Peer * encapsulate con_front and con_back as Heartbeat::Connection; * encapsulate connectivity tracker as Heartbeat::Connector; * encapsulate the session-alike part of Heartbeat::Peer as Heartbeat::Session; Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/osd/heartbeat.cc b/src/crimson/osd/heartbeat.cc index b5a40fb89778..a5f06ba8a430 100644 --- a/src/crimson/osd/heartbeat.cc +++ b/src/crimson/osd/heartbeat.cc @@ -351,69 +351,129 @@ void Heartbeat::print(std::ostream& out) const out << "heartbeat"; } -Heartbeat::Peer::Peer(Heartbeat& heartbeat, osd_id_t peer) - : heartbeat(heartbeat), peer(peer) +Heartbeat::Connection::~Connection() { - logger().info("Heartbeat::Peer: osd.{} added", peer); - connect_front(); - connect_back(); + if (conn) { + conn->mark_down(); + } } -Heartbeat::Peer::~Peer() +bool Heartbeat::Connection::match(crimson::net::Connection* _conn) const { - logger().info("Heartbeat::Peer: osd.{} removed", peer); - if (con_front) { - con_front->mark_down(); + if (conn && conn.get() == _conn) { + return true; + } else { + return false; } - if (con_back) { - con_back->mark_down(); +} + +void Heartbeat::Connection::accepted(crimson::net::ConnectionRef accepted_conn) +{ + if (!conn) { + if (accepted_conn->get_peer_addr() == connector.get_peer_addr(type)) { + logger().info("Heartbeat::Connection::accepted(): " + "{} racing resolved", *this); + conn = accepted_conn; + set_connected(); + } + } else if (conn == accepted_conn) { + set_connected(); } } -bool Heartbeat::Peer::pinged() const +void Heartbeat::Connection::replaced() { - if (clock::is_zero(first_tx)) { - // i can never receive a pong without sending any ping message first. - assert(clock::is_zero(last_rx_front) && - clock::is_zero(last_rx_back)); - return false; + assert(!is_connected); + auto replaced_conn = conn; + // set the racing connection, will be handled by handle_accept() + conn = msgr.connect(replaced_conn->get_peer_addr(), + replaced_conn->get_peer_name()); + racing_detected = true; + logger().warn("Heartbeat::Connection::replaced(): {} racing", *this); + assert(conn != replaced_conn); + assert(!conn->is_connected()); +} + +void Heartbeat::Connection::reset() +{ + conn = nullptr; + if (is_connected) { + is_connected = false; + connector.decrease_connected(); + } + if (!racing_detected || is_winner_side) { + connect(); } else { - return true; + logger().info("Heartbeat::Connection::reset(): " + "{} racing detected and lose, " + "waiting for peer connect me", *this); } } -Heartbeat::Peer::health_state -Heartbeat::Peer::do_health_screen(clock::time_point now) const +seastar::future<> Heartbeat::Connection::send(MessageRef msg) { - if (!pinged()) { - // we are not healty nor unhealty because we haven't sent anything yet - return health_state::UNKNOWN; - } else if (!ping_history.empty() && ping_history.begin()->second.deadline < now) { - return health_state::UNHEALTHY; - } else if (!clock::is_zero(last_rx_front) && - !clock::is_zero(last_rx_back)) { - // only declare to be healthy until we have received the first - // replies from both front/back connections - return health_state::HEALTHY; - } else { - return health_state::UNKNOWN; + assert(is_connected); + return conn->send(msg); +} + +void Heartbeat::Connection::validate() +{ + assert(is_connected); + auto peer_addr = connector.get_peer_addr(type); + if (conn->get_peer_addr() != peer_addr) { + logger().info("Heartbeat::Connection::validate(): " + "{} has new address {} over {}, reset", + *this, peer_addr, conn->get_peer_addr()); + conn->mark_down(); + racing_detected = false; + reset(); + } +} + +void Heartbeat::Connection::retry() +{ + racing_detected = false; + if (!is_connected) { + if (conn) { + conn->mark_down(); + reset(); + } else { + connect(); + } + } +} + +void Heartbeat::Connection::set_connected() +{ + assert(!is_connected); + is_connected = true; + connector.increase_connected(); +} + +void Heartbeat::Connection::connect() +{ + assert(!conn); + auto addr = connector.get_peer_addr(type); + conn = msgr.connect(addr, entity_name_t(CEPH_ENTITY_TYPE_OSD, peer)); + if (conn->is_connected()) { + set_connected(); } } Heartbeat::clock::time_point -Heartbeat::Peer::failed_since(clock::time_point now) const +Heartbeat::Session::failed_since(Heartbeat::clock::time_point now) const { if (do_health_screen(now) == health_state::UNHEALTHY) { auto oldest_deadline = ping_history.begin()->second.deadline; auto failed_since = std::min(last_rx_back, last_rx_front); if (clock::is_zero(failed_since)) { - logger().error("Heartbeat::Peer::failed_since(): no reply from osd.{} " + logger().error("Heartbeat::Session::failed_since(): no reply from osd.{} " "ever on either front or back, first ping sent {} " "(oldest deadline {})", peer, first_tx, oldest_deadline); failed_since = first_tx; } else { - logger().error("Heartbeat::Peer::failed_since(): no reply from osd.{} " + logger().error("Heartbeat::Session::failed_since(): no reply from osd.{} " "since back {} front {} (oldest deadline {})", peer, last_rx_back, last_rx_front, oldest_deadline); } @@ -423,107 +483,58 @@ Heartbeat::Peer::failed_since(clock::time_point now) const } } -void Heartbeat::Peer::do_send_heartbeat( - clock::time_point now, - ceph::signedspan mnow, - std::vector>* futures) +void Heartbeat::Session::set_inactive_history(clock::time_point now) { - assert(session_started); - const utime_t sent_stamp{now}; - const auto deadline = - now + std::chrono::seconds(local_conf()->osd_heartbeat_grace); - [[maybe_unused]] auto [reply, added] = - ping_history.emplace(sent_stamp, reply_t{deadline, 0}); - for (auto& con : {con_front, con_back}) { - auto min_message = static_cast( - local_conf()->osd_heartbeat_min_size); - auto ping = make_message( - heartbeat.monc.get_fsid(), - heartbeat.service.get_osdmap_service().get_map()->get_epoch(), - MOSDPing::PING, - sent_stamp, - mnow, - mnow, - heartbeat.service.get_osdmap_service().get_up_epoch(), - min_message); - reply->second.unacknowledged++; - if (futures) { - futures->push_back(con->send(std::move(ping))); - } + assert(!started); + if (ping_history.empty()) { + const utime_t sent_stamp{now}; + const auto deadline = + now + std::chrono::seconds(local_conf()->osd_heartbeat_grace); + [[maybe_unused]] auto [reply, added] = + ping_history.emplace(sent_stamp, reply_t{deadline, 0}); + } else { // the entry is already added + assert(ping_history.size() == 1); } } +Heartbeat::Peer::Peer(Heartbeat& heartbeat, osd_id_t peer) + : Connector(2), heartbeat{heartbeat}, peer{peer}, session{peer}, + con_front(peer, heartbeat.whoami > peer, Connection::type_t::front, + *heartbeat.front_msgr, *this), + con_back(peer, heartbeat.whoami > peer, Connection::type_t::back, + *heartbeat.back_msgr, *this) +{ + logger().info("Heartbeat::Peer: osd.{} added", peer); +} + +Heartbeat::Peer::~Peer() +{ + logger().info("Heartbeat::Peer: osd.{} removed", peer); +} + void Heartbeat::Peer::send_heartbeat( - clock::time_point now, - ceph::signedspan mnow, + clock::time_point now, ceph::signedspan mnow, std::vector>& futures) { - if (!pinged()) { - first_tx = now; - } - last_tx = now; - - if (session_started) { + session.set_tx(now); + if (session.is_started()) { do_send_heartbeat(now, mnow, &futures); - - // validate connection addresses - const auto osdmap = heartbeat.service.get_osdmap_service().get_map(); - const auto front_addr = osdmap->get_hb_front_addrs(peer).front(); - if (con_front->get_peer_addr() != front_addr) { - logger().info("Heartbeat::Peer::send_heartbeat(): " - "peer osd.{} con_front has new address {} over {}, reset", - peer, front_addr, con_front->get_peer_addr()); - con_front->mark_down(); - has_racing = false; - handle_reset(con_front, false); - } - const auto back_addr = osdmap->get_hb_back_addrs(peer).front(); - if (con_back->get_peer_addr() != back_addr) { - logger().info("Heartbeat::Peer::send_heartbeat(): " - "peer osd.{} con_back has new address {} over {}, reset", - peer, back_addr, con_back->get_peer_addr()); - con_back->mark_down(); - has_racing = false; - handle_reset(con_back, false); - } + for_each_conn([] (auto& conn) { + conn.validate(); + }); } else { // we should send MOSDPing but still cannot at this moment if (pending_send) { // we have already pending for a entire heartbeat interval logger().warn("Heartbeat::Peer::send_heartbeat(): " - "heartbeat to {} is still pending...", peer); - has_racing = false; - // retry con_front if still pending - if (!front_ready) { - if (con_front) { - con_front->mark_down(); - handle_reset(con_front, false); - } else { - connect_front(); - } - } - // retry con_back if still pending - if (!back_ready) { - if (con_back) { - con_back->mark_down(); - handle_reset(con_back, false); - } else { - connect_back(); - } - } + "heartbeat to osd.{} is still pending...", peer); + for_each_conn([] (auto& conn) { + conn.retry(); + }); } else { logger().info("Heartbeat::Peer::send_heartbeat(): " - "heartbeat to {} is pending send...", peer); - // maintain an entry in ping_history for unhealthy check - if (ping_history.empty()) { - const utime_t sent_stamp{now}; - const auto deadline = - now + std::chrono::seconds(local_conf()->osd_heartbeat_grace); - [[maybe_unused]] auto [reply, added] = - ping_history.emplace(sent_stamp, reply_t{deadline, 0}); - } else { // the entry is already added - assert(ping_history.size() == 1); - } + "heartbeat to osd.{} is pending send...", peer); + session.set_inactive_history(now); pending_send = true; } } @@ -532,215 +543,79 @@ void Heartbeat::Peer::send_heartbeat( seastar::future<> Heartbeat::Peer::handle_reply( crimson::net::Connection* conn, Ref m) { - if (!session_started) { + if (!session.is_started()) { // we haven't sent any ping yet return seastar::now(); } - auto ping = ping_history.find(m->ping_stamp); - if (ping == ping_history.end()) { - // old replies, deprecated by newly sent pings. + type_t type; + if (con_front.match(conn)) { + type = type_t::front; + } else if (con_back.match(conn)) { + type = type_t::back; + } else { return seastar::now(); } const auto now = clock::now(); - auto& unacked = ping->second.unacknowledged; - assert(unacked); - if (conn == con_back.get()) { - last_rx_back = now; - unacked--; - } else if (conn == con_front.get()) { - last_rx_front = now; - unacked--; - } - if (unacked == 0) { - ping_history.erase(ping_history.begin(), ++ping); - } - if (do_health_screen(now) == health_state::HEALTHY) { - return heartbeat.failing_peers.cancel_one(peer); - } - return seastar::now(); -} - -void Heartbeat::Peer::handle_reset(crimson::net::ConnectionRef conn, bool is_replace) -{ - if (con_front == conn) { - con_front = nullptr; - if (is_replace) { - assert(!front_ready); - assert(!session_started); - // set the racing connection, will be handled by handle_accept() - con_front = heartbeat.front_msgr->connect( - conn->get_peer_addr(), entity_name_t(CEPH_ENTITY_TYPE_OSD, peer)); - has_racing = true; - logger().warn("Heartbeat::Peer::handle_reset(): " - "con_front racing with osd.{}, updated by {}", - peer, con_front); - } else { - if (front_ready) { - front_ready = false; - } - if (session_started) { - reset_session(); - } - assert(heartbeat.whoami != peer); - if (heartbeat.whoami > peer || !has_racing) { - connect_front(); - } else { // whoami < peer && has_racing - logger().info("Heartbeat::Peer::handle_reset(): " - "con_front racing detected and lose, " - "waiting for osd.{} connect me", peer); - } + if (session.handle_reply(m->ping_stamp, type, now)) { + if (session.do_health_screen(now) == Session::health_state::HEALTHY) { + return heartbeat.failing_peers.cancel_one(peer); } - } else if (con_back == conn) { - con_back = nullptr; - if (is_replace) { - assert(!back_ready); - assert(!session_started); - // set the racing connection, will be handled by handle_accept() - con_back = heartbeat.back_msgr->connect( - conn->get_peer_addr(), entity_name_t(CEPH_ENTITY_TYPE_OSD, peer)); - has_racing = true; - logger().warn("Heartbeat::Peer::handle_reset(): " - "con_back racing with osd.{}, updated by {}", - peer, con_back); - } else { - if (back_ready) { - back_ready = false; - } - if (session_started) { - reset_session(); - } - if (heartbeat.whoami == peer) { - logger().error("Heartbeat::Peer::handle_reset(): " - "peer is myself ({})", peer); - } else if (heartbeat.whoami > peer || !has_racing) { - connect_back(); - } else { // whoami < peer && has_racing - logger().info("Heartbeat::Peer::handle_reset(): " - "con_back racing detected and lose, " - "waiting for osd.{} connect me", peer); - } - } - } else { - // ignore the unrelated conn - } -} - -void Heartbeat::Peer::handle_connect(crimson::net::ConnectionRef conn) -{ - if (con_front == conn) { - assert(!front_ready); - assert(!session_started); - notify_front_ready(); - } else if (con_back == conn) { - assert(!back_ready); - assert(!session_started); - notify_back_ready(); - } else { - // ignore the unrelated connection } + return seastar::now(); } -void Heartbeat::Peer::handle_accept(crimson::net::ConnectionRef conn) +entity_addr_t Heartbeat::Peer::get_peer_addr(type_t type) { - handle_connect(conn); - - const auto peer_addr = conn->get_peer_addr(); const auto osdmap = heartbeat.service.get_osdmap_service().get_map(); - const auto front_addr = osdmap->get_hb_front_addrs(peer).front(); - if (!con_front && front_addr == peer_addr) { - logger().info("Heartbeat::Peer::handle_accept(): " - "con_front racing resolved for osd.{}", peer); - con_front = conn; - notify_front_ready(); - } - const auto back_addr = osdmap->get_hb_back_addrs(peer).front(); - if (!con_back && back_addr == peer_addr) { - logger().info("Heartbeat::Peer::handle_accept(): " - "con_back racing resolved for osd.{}", peer); - con_back = conn; - notify_back_ready(); + if (type == type_t::front) { + return osdmap->get_hb_front_addrs(peer).front(); + } else { + return osdmap->get_hb_back_addrs(peer).front(); } } -void Heartbeat::Peer::start_session() +void Heartbeat::Peer::all_connected() { logger().info("Heartbeat::Peer: osd.{} started (send={})", peer, pending_send); - assert(!session_started); - session_started = true; - ping_history.clear(); + session.start(); if (pending_send) { pending_send = false; do_send_heartbeat(clock::now(), heartbeat.service.get_mnow(), nullptr); } } -void Heartbeat::Peer::reset_session() +void Heartbeat::Peer::connection_lost() { logger().info("Heartbeat::Peer: osd.{} reset", peer); - assert(session_started); - if (!ping_history.empty()) { - // we lost our ping_history of the last session, but still need to keep - // the oldest deadline for unhealthy check. - auto oldest = ping_history.begin(); - auto sent_stamp = oldest->first; - auto deadline = oldest->second.deadline; - ping_history.clear(); - ping_history.emplace(sent_stamp, reply_t{deadline, 0}); - } - session_started = false; -} - -void Heartbeat::Peer::notify_front_ready() -{ - assert(con_front); - assert(!front_ready); - assert(!session_started); - front_ready = true; - if (front_ready && back_ready) { - start_session(); - } -} - -void Heartbeat::Peer::connect_front() -{ - assert(!con_front); - assert(!front_ready); - assert(!session_started); - auto osdmap = heartbeat.service.get_osdmap_service().get_map(); - // TODO: use addrs - con_front = heartbeat.front_msgr->connect( - osdmap->get_hb_front_addrs(peer).front(), - entity_name_t(CEPH_ENTITY_TYPE_OSD, peer)); - if (con_front->is_connected()) { - notify_front_ready(); - } -} - -void Heartbeat::Peer::notify_back_ready() -{ - assert(con_back); - assert(!back_ready); - assert(!session_started); - back_ready = true; - if (front_ready && back_ready) { - start_session(); - } + session.lost(); } -void Heartbeat::Peer::connect_back() +void Heartbeat::Peer::do_send_heartbeat( + Heartbeat::clock::time_point now, + ceph::signedspan mnow, + std::vector>* futures) { - assert(!con_back); - assert(!back_ready); - assert(!session_started); - auto osdmap = heartbeat.service.get_osdmap_service().get_map(); - // TODO: use addrs - con_back = heartbeat.back_msgr->connect( - osdmap->get_hb_back_addrs(peer).front(), - entity_name_t(CEPH_ENTITY_TYPE_OSD, peer)); - if (con_back->is_connected()) { - notify_back_ready(); - } + const utime_t sent_stamp{now}; + const auto deadline = + now + std::chrono::seconds(local_conf()->osd_heartbeat_grace); + session.emplace_history(sent_stamp, deadline); + for_each_conn([&, this] (auto& conn) { + auto min_message = static_cast( + local_conf()->osd_heartbeat_min_size); + auto ping = make_message( + heartbeat.monc.get_fsid(), + heartbeat.service.get_osdmap_service().get_map()->get_epoch(), + MOSDPing::PING, + sent_stamp, + mnow, + mnow, + heartbeat.service.get_osdmap_service().get_up_epoch(), + min_message); + if (futures) { + futures->push_back(conn.send(std::move(ping))); + } + }); } bool Heartbeat::FailingPeers::add_pending( diff --git a/src/crimson/osd/heartbeat.h b/src/crimson/osd/heartbeat.h index dd1ab949a7d9..f9f54228ebcc 100644 --- a/src/crimson/osd/heartbeat.h +++ b/src/crimson/osd/heartbeat.h @@ -83,6 +83,9 @@ private: // use real_clock so it can be converted to utime_t using clock = ceph::coarse_real_clock; + class Connector; + class Connection; + class Session; class Peer; using peers_map_t = std::map; peers_map_t peers; @@ -127,56 +130,190 @@ inline std::ostream& operator<<(std::ostream& out, const Heartbeat& hb) { return out; } -class Heartbeat::Peer { +class Heartbeat::Connector { public: - Peer(Heartbeat&, osd_id_t); - ~Peer(); - Peer(Peer&&) = delete; - Peer(const Peer&) = delete; - Peer& operator=(const Peer&) = delete; + Connector(size_t connections) : connections{connections} {} + + void increase_connected() { + assert(connected < connections); + ++connected; + if (connected == connections) { + all_connected(); + } + } + void decrease_connected() { + assert(connected > 0); + if (connected == connections) { + connection_lost(); + } + --connected; + } + enum class type_t { front, back }; + virtual entity_addr_t get_peer_addr(type_t) = 0; + + protected: + virtual void all_connected() = 0; + virtual void connection_lost() = 0; - void set_epoch(epoch_t epoch_) { epoch = epoch_; } - epoch_t get_epoch() const { return epoch; } + private: + const size_t connections; + size_t connected = 0; +}; - // if failure, return time_point since last active - // else, return clock::zero() - clock::time_point failed_since(clock::time_point now) const; - void send_heartbeat(clock::time_point now, - ceph::signedspan mnow, - std::vector>&); - seastar::future<> handle_reply(crimson::net::Connection*, Ref); - void handle_reset(crimson::net::ConnectionRef, bool is_replace); - void handle_connect(crimson::net::ConnectionRef); - void handle_accept(crimson::net::ConnectionRef); +class Heartbeat::Connection { + public: + using type_t = Connector::type_t; + Connection(osd_id_t peer, bool is_winner_side, type_t type, + crimson::net::Messenger& msgr, Connector& connector) + : peer{peer}, is_winner_side{is_winner_side}, type{type}, + msgr{msgr}, connector{connector} { + connect(); + } + ~Connection(); + + bool match(crimson::net::Connection* _conn) const; + bool match(crimson::net::ConnectionRef conn) const { + return match(conn.get()); + } + void connected() { + set_connected(); + } + void accepted(crimson::net::ConnectionRef); + void replaced(); + void reset(); + seastar::future<> send(MessageRef msg); + void validate(); + // retry connection if still pending + void retry(); private: - bool pinged() const; + void set_connected(); + void connect(); + + const osd_id_t peer; + const bool is_winner_side; + const type_t type; + crimson::net::Messenger& msgr; + Connector& connector; + + crimson::net::ConnectionRef conn; + bool is_connected = false; + bool racing_detected = false; + + friend std::ostream& operator<<(std::ostream& os, const Connection c) { + if (c.type == type_t::front) { + return os << "con_front(osd." << c.peer << ")"; + } else { + return os << "con_back(osd." << c.peer << ")"; + } + } +}; + +class Heartbeat::Session { + public: + Session(osd_id_t peer) : peer{peer} {} + + void set_epoch(epoch_t epoch_) { epoch = epoch_; } + epoch_t get_epoch() const { return epoch; } + bool is_started() const { return started; } + bool pinged() const { + if (clock::is_zero(first_tx)) { + // i can never receive a pong without sending any ping message first. + assert(clock::is_zero(last_rx_front) && + clock::is_zero(last_rx_back)); + return false; + } else { + return true; + } + } + enum class health_state { UNKNOWN, UNHEALTHY, HEALTHY, }; - health_state do_health_screen(clock::time_point now) const; - - // a session starts when con_front and con_back are both connected - void start_session(); - // a session resets when either con_front or con_back is reset - void reset_session(); - // notify when con_front becomes ready, possibly start session - void notify_front_ready(); - void connect_front(); - // notify when con_back becomes ready, possibly start session - void notify_back_ready(); - void connect_back(); - - void do_send_heartbeat(clock::time_point now, - ceph::signedspan mnow, - std::vector>*); + health_state do_health_screen(clock::time_point now) const { + if (!pinged()) { + // we are not healty nor unhealty because we haven't sent anything yet + return health_state::UNKNOWN; + } else if (!ping_history.empty() && ping_history.begin()->second.deadline < now) { + return health_state::UNHEALTHY; + } else if (!clock::is_zero(last_rx_front) && + !clock::is_zero(last_rx_back)) { + // only declare to be healthy until we have received the first + // replies from both front/back connections + return health_state::HEALTHY; + } else { + return health_state::UNKNOWN; + } + } + + clock::time_point failed_since(clock::time_point now) const; + + void set_tx(clock::time_point now) { + if (!pinged()) { + first_tx = now; + } + last_tx = now; + } + + void start() { + assert(!started); + started = true; + ping_history.clear(); + } + + void emplace_history(const utime_t& sent_stamp, + const clock::time_point& deadline) { + assert(started); + [[maybe_unused]] auto [reply, added] = + ping_history.emplace(sent_stamp, reply_t{deadline, 2}); + } + + bool handle_reply(const utime_t& ping_stamp, + Connection::type_t type, + clock::time_point now) { + assert(started); + auto ping = ping_history.find(ping_stamp); + if (ping == ping_history.end()) { + // old replies, deprecated by newly sent pings. + return false; + } + auto& unacked = ping->second.unacknowledged; + assert(unacked); + if (type == Connection::type_t::front) { + last_rx_front = now; + unacked--; + } else { + last_rx_back = now; + unacked--; + } + if (unacked == 0) { + ping_history.erase(ping_history.begin(), ++ping); + } + return true; + } + + void lost() { + assert(started); + started = false; + if (!ping_history.empty()) { + // we lost our ping_history of the last session, but still need to keep + // the oldest deadline for unhealthy check. + auto oldest = ping_history.begin(); + auto sent_stamp = oldest->first; + auto deadline = oldest->second.deadline; + ping_history.clear(); + ping_history.emplace(sent_stamp, reply_t{deadline, 0}); + } + } + + // maintain an entry in ping_history for unhealthy check + void set_inactive_history(clock::time_point); private: - Heartbeat& heartbeat; const osd_id_t peer; - + bool started = false; // time we sent our first ping request clock::time_point first_tx; // last time we sent a ping request @@ -188,20 +325,6 @@ class Heartbeat::Peer { // most recent epoch we wanted this peer epoch_t epoch; - // if racing happened - bool has_racing = false; - // peer connection (front) - crimson::net::ConnectionRef con_front; - bool front_ready = false; - // peer connection (back) - crimson::net::ConnectionRef con_back; - bool back_ready = false; - - // start to send pings and track ping_history - bool session_started = false; - // if need to send heartbeat when session started - bool pending_send = false; - struct reply_t { clock::time_point deadline; // one sent over front conn, another sent over back conn @@ -210,3 +333,68 @@ class Heartbeat::Peer { // history of inflight pings, arranging by timestamp we sent std::map ping_history; }; + +class Heartbeat::Peer final : private Heartbeat::Connector { + public: + Peer(Heartbeat&, osd_id_t); + ~Peer(); + Peer(Peer&&) = delete; + Peer(const Peer&) = delete; + Peer& operator=(const Peer&) = delete; + + void set_epoch(epoch_t epoch) { session.set_epoch(epoch); } + epoch_t get_epoch() const { return session.get_epoch(); } + + // if failure, return time_point since last active + // else, return clock::zero() + clock::time_point failed_since(clock::time_point now) const { + return session.failed_since(now); + } + void send_heartbeat( + clock::time_point, ceph::signedspan, std::vector>&); + seastar::future<> handle_reply(crimson::net::Connection*, Ref); + void handle_reset(crimson::net::ConnectionRef conn, bool is_replace) { + for_each_conn([&] (auto& _conn) { + if (_conn.match(conn)) { + if (is_replace) { + _conn.replaced(); + } else { + _conn.reset(); + } + } + }); + } + void handle_connect(crimson::net::ConnectionRef conn) { + for_each_conn([&] (auto& _conn) { + if (_conn.match(conn)) { + _conn.connected(); + } + }); + } + void handle_accept(crimson::net::ConnectionRef conn) { + for_each_conn([&] (auto& _conn) { + _conn.accepted(conn); + }); + } + + private: + entity_addr_t get_peer_addr(type_t type) override; + void all_connected() override; + void connection_lost() override; + void do_send_heartbeat( + clock::time_point, ceph::signedspan, std::vector>*); + + template + void for_each_conn(Func&& f) { + f(con_front); + f(con_back); + } + + Heartbeat& heartbeat; + const osd_id_t peer; + Session session; + // if need to send heartbeat when session started + bool pending_send = false; + Connection con_front; + Connection con_back; +};