From: Yingxin Cheng Date: Tue, 31 Mar 2020 01:24:06 +0000 (+0800) Subject: crimson/osd: support 1 lossy connection between a heartbeat peer X-Git-Tag: v16.1.0~1895^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=cf386ef665d34f54e2acac44da0b87921d27152a;p=ceph.git crimson/osd: support 1 lossy connection between a heartbeat peer 1. fix dead loop due to racing of one heartbeat connection: The Peer (renamed from PeerInfo) attempts to identify if the racing is happening, this is because the heartbeat peer is not necessarily to be symmetric. If racing is actually happening, Peer then decides to wait for another side if it loses, or connect proactively if it wins (whoami > peer). 2. fix dead loop between hb_front and hb_back connections of the same peer: For a reset event from either con_front or con_back, the heartbeat class should not simply remove the related PeerInfo which contains both of them. Instead, Peer is improved (complicated) to be a session-alike class which remembers/tracks individual front/back connectivity. And it only starts to track from a clean ping_history (same with the original logic) when both are connected, and stop tracking as soon as either of them is reset. 3. keep the compatibility with classic heartbeat: The most important thing is to keep healthy/unhealthy check identical to the original logic, because it has been working for a long time. The less important thing is that the original messenger policy and Heartbeat::handle_ping() are untouched, so supposingly our heartbeat component can still talk with classic OSD. Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/osd/heartbeat.cc b/src/crimson/osd/heartbeat.cc index 8d4e71de1aee..b5a40fb89778 100644 --- a/src/crimson/osd/heartbeat.cc +++ b/src/crimson/osd/heartbeat.cc @@ -25,11 +25,13 @@ namespace { } } -Heartbeat::Heartbeat(const crimson::osd::ShardServices& service, +Heartbeat::Heartbeat(osd_id_t whoami, + const crimson::osd::ShardServices& service, crimson::mon::Client& monc, crimson::net::MessengerRef front_msgr, crimson::net::MessengerRef back_msgr) - : service{service}, + : whoami{whoami}, + service{service}, monc{monc}, front_msgr{front_msgr}, back_msgr{back_msgr}, @@ -116,6 +118,7 @@ void Heartbeat::set_require_authorizer(bool require_authorizer) void Heartbeat::add_peer(osd_id_t _peer, epoch_t epoch) { + assert(whoami != _peer); auto [iter, added] = peers.try_emplace(_peer, *this, _peer); auto& peer = iter->second; peer.set_epoch(epoch); @@ -210,7 +213,33 @@ void Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replac } if (auto found = peers.find(peer); found != peers.end()) { - found->second.handle_reset(conn); + found->second.handle_reset(conn, is_replace); + } +} + +void Heartbeat::ms_handle_connect(crimson::net::ConnectionRef conn) +{ + auto peer = conn->get_peer_id(); + if (conn->get_peer_type() != entity_name_t::TYPE_OSD || + peer == entity_name_t::NEW) { + return; + } + if (auto found = peers.find(peer); + found != peers.end()) { + found->second.handle_connect(conn); + } +} + +void Heartbeat::ms_handle_accept(crimson::net::ConnectionRef conn) +{ + auto peer = conn->get_peer_id(); + if (conn->get_peer_type() != entity_name_t::TYPE_OSD || + peer == entity_name_t::NEW) { + return; + } + if (auto found = peers.find(peer); + found != peers.end()) { + found->second.handle_accept(conn); } } @@ -322,32 +351,24 @@ void Heartbeat::print(std::ostream& out) const out << "heartbeat"; } -void Heartbeat::Peer::connect() -{ - logger().info("peer osd.{} added", peer); - auto osdmap = heartbeat.service.get_osdmap_service().get_map(); - // TODO: use addrs - con_front = heartbeat.front_msgr->connect( - osdmap->get_hb_front_addrs(peer).front(), - entity_name_t(CEPH_ENTITY_TYPE_OSD, peer)); - con_back = heartbeat.back_msgr->connect( - osdmap->get_hb_back_addrs(peer).front(), - entity_name_t(CEPH_ENTITY_TYPE_OSD, peer)); -} - Heartbeat::Peer::Peer(Heartbeat& heartbeat, osd_id_t peer) : heartbeat(heartbeat), peer(peer) -{ connect(); } - -void Heartbeat::Peer::disconnect() { - logger().info("peer osd.{} removed", peer); - con_front->mark_down(); - con_back->mark_down(); + logger().info("Heartbeat::Peer: osd.{} added", peer); + connect_front(); + connect_back(); } Heartbeat::Peer::~Peer() -{ disconnect(); } +{ + logger().info("Heartbeat::Peer: osd.{} removed", peer); + if (con_front) { + con_front->mark_down(); + } + if (con_back) { + con_back->mark_down(); + } +} bool Heartbeat::Peer::pinged() const { @@ -386,13 +407,13 @@ Heartbeat::Peer::failed_since(clock::time_point now) const auto oldest_deadline = ping_history.begin()->second.deadline; auto failed_since = std::min(last_rx_back, last_rx_front); if (clock::is_zero(failed_since)) { - logger().error("failed_since: no reply from osd.{} " + logger().error("Heartbeat::Peer::failed_since(): no reply from osd.{} " "ever on either front or back, first ping sent {} " "(oldest deadline {})", peer, first_tx, oldest_deadline); failed_since = first_tx; } else { - logger().error("failed_since: no reply from osd.{} " + logger().error("Heartbeat::Peer::failed_since(): no reply from osd.{} " "since back {} front {} (oldest deadline {})", peer, last_rx_back, last_rx_front, oldest_deadline); } @@ -402,16 +423,12 @@ Heartbeat::Peer::failed_since(clock::time_point now) const } } -void Heartbeat::Peer::send_heartbeat( +void Heartbeat::Peer::do_send_heartbeat( clock::time_point now, ceph::signedspan mnow, - std::vector>& futures) + std::vector>* futures) { - if (!pinged()) { - first_tx = now; - } - last_tx = now; - + assert(session_started); const utime_t sent_stamp{now}; const auto deadline = now + std::chrono::seconds(local_conf()->osd_heartbeat_grace); @@ -430,13 +447,95 @@ void Heartbeat::Peer::send_heartbeat( heartbeat.service.get_osdmap_service().get_up_epoch(), min_message); reply->second.unacknowledged++; - futures.push_back(con->send(std::move(ping))); + if (futures) { + futures->push_back(con->send(std::move(ping))); + } + } +} + +void Heartbeat::Peer::send_heartbeat( + clock::time_point now, + ceph::signedspan mnow, + std::vector>& futures) +{ + if (!pinged()) { + first_tx = now; + } + last_tx = now; + + if (session_started) { + do_send_heartbeat(now, mnow, &futures); + + // validate connection addresses + const auto osdmap = heartbeat.service.get_osdmap_service().get_map(); + const auto front_addr = osdmap->get_hb_front_addrs(peer).front(); + if (con_front->get_peer_addr() != front_addr) { + logger().info("Heartbeat::Peer::send_heartbeat(): " + "peer osd.{} con_front has new address {} over {}, reset", + peer, front_addr, con_front->get_peer_addr()); + con_front->mark_down(); + has_racing = false; + handle_reset(con_front, false); + } + const auto back_addr = osdmap->get_hb_back_addrs(peer).front(); + if (con_back->get_peer_addr() != back_addr) { + logger().info("Heartbeat::Peer::send_heartbeat(): " + "peer osd.{} con_back has new address {} over {}, reset", + peer, back_addr, con_back->get_peer_addr()); + con_back->mark_down(); + has_racing = false; + handle_reset(con_back, false); + } + } else { + // we should send MOSDPing but still cannot at this moment + if (pending_send) { + // we have already pending for a entire heartbeat interval + logger().warn("Heartbeat::Peer::send_heartbeat(): " + "heartbeat to {} is still pending...", peer); + has_racing = false; + // retry con_front if still pending + if (!front_ready) { + if (con_front) { + con_front->mark_down(); + handle_reset(con_front, false); + } else { + connect_front(); + } + } + // retry con_back if still pending + if (!back_ready) { + if (con_back) { + con_back->mark_down(); + handle_reset(con_back, false); + } else { + connect_back(); + } + } + } else { + logger().info("Heartbeat::Peer::send_heartbeat(): " + "heartbeat to {} is pending send...", peer); + // maintain an entry in ping_history for unhealthy check + if (ping_history.empty()) { + const utime_t sent_stamp{now}; + const auto deadline = + now + std::chrono::seconds(local_conf()->osd_heartbeat_grace); + [[maybe_unused]] auto [reply, added] = + ping_history.emplace(sent_stamp, reply_t{deadline, 0}); + } else { // the entry is already added + assert(ping_history.size() == 1); + } + pending_send = true; + } } } seastar::future<> Heartbeat::Peer::handle_reply( crimson::net::Connection* conn, Ref m) { + if (!session_started) { + // we haven't sent any ping yet + return seastar::now(); + } auto ping = ping_history.find(m->ping_stamp); if (ping == ping_history.end()) { // old replies, deprecated by newly sent pings. @@ -444,6 +543,7 @@ seastar::future<> Heartbeat::Peer::handle_reply( } const auto now = clock::now(); auto& unacked = ping->second.unacknowledged; + assert(unacked); if (conn == con_back.get()) { last_rx_back = now; unacked--; @@ -460,18 +560,187 @@ seastar::future<> Heartbeat::Peer::handle_reply( return seastar::now(); } -void Heartbeat::Peer::handle_reset(crimson::net::ConnectionRef conn) +void Heartbeat::Peer::handle_reset(crimson::net::ConnectionRef conn, bool is_replace) +{ + if (con_front == conn) { + con_front = nullptr; + if (is_replace) { + assert(!front_ready); + assert(!session_started); + // set the racing connection, will be handled by handle_accept() + con_front = heartbeat.front_msgr->connect( + conn->get_peer_addr(), entity_name_t(CEPH_ENTITY_TYPE_OSD, peer)); + has_racing = true; + logger().warn("Heartbeat::Peer::handle_reset(): " + "con_front racing with osd.{}, updated by {}", + peer, con_front); + } else { + if (front_ready) { + front_ready = false; + } + if (session_started) { + reset_session(); + } + assert(heartbeat.whoami != peer); + if (heartbeat.whoami > peer || !has_racing) { + connect_front(); + } else { // whoami < peer && has_racing + logger().info("Heartbeat::Peer::handle_reset(): " + "con_front racing detected and lose, " + "waiting for osd.{} connect me", peer); + } + } + } else if (con_back == conn) { + con_back = nullptr; + if (is_replace) { + assert(!back_ready); + assert(!session_started); + // set the racing connection, will be handled by handle_accept() + con_back = heartbeat.back_msgr->connect( + conn->get_peer_addr(), entity_name_t(CEPH_ENTITY_TYPE_OSD, peer)); + has_racing = true; + logger().warn("Heartbeat::Peer::handle_reset(): " + "con_back racing with osd.{}, updated by {}", + peer, con_back); + } else { + if (back_ready) { + back_ready = false; + } + if (session_started) { + reset_session(); + } + if (heartbeat.whoami == peer) { + logger().error("Heartbeat::Peer::handle_reset(): " + "peer is myself ({})", peer); + } else if (heartbeat.whoami > peer || !has_racing) { + connect_back(); + } else { // whoami < peer && has_racing + logger().info("Heartbeat::Peer::handle_reset(): " + "con_back racing detected and lose, " + "waiting for osd.{} connect me", peer); + } + } + } else { + // ignore the unrelated conn + } +} + +void Heartbeat::Peer::handle_connect(crimson::net::ConnectionRef conn) { - if (con_front != conn && con_back != conn) { - return; + if (con_front == conn) { + assert(!front_ready); + assert(!session_started); + notify_front_ready(); + } else if (con_back == conn) { + assert(!back_ready); + assert(!session_started); + notify_back_ready(); + } else { + // ignore the unrelated connection + } +} + +void Heartbeat::Peer::handle_accept(crimson::net::ConnectionRef conn) +{ + handle_connect(conn); + + const auto peer_addr = conn->get_peer_addr(); + const auto osdmap = heartbeat.service.get_osdmap_service().get_map(); + const auto front_addr = osdmap->get_hb_front_addrs(peer).front(); + if (!con_front && front_addr == peer_addr) { + logger().info("Heartbeat::Peer::handle_accept(): " + "con_front racing resolved for osd.{}", peer); + con_front = conn; + notify_front_ready(); + } + const auto back_addr = osdmap->get_hb_back_addrs(peer).front(); + if (!con_back && back_addr == peer_addr) { + logger().info("Heartbeat::Peer::handle_accept(): " + "con_back racing resolved for osd.{}", peer); + con_back = conn; + notify_back_ready(); + } +} + +void Heartbeat::Peer::start_session() +{ + logger().info("Heartbeat::Peer: osd.{} started (send={})", + peer, pending_send); + assert(!session_started); + session_started = true; + ping_history.clear(); + if (pending_send) { + pending_send = false; + do_send_heartbeat(clock::now(), heartbeat.service.get_mnow(), nullptr); + } +} + +void Heartbeat::Peer::reset_session() +{ + logger().info("Heartbeat::Peer: osd.{} reset", peer); + assert(session_started); + if (!ping_history.empty()) { + // we lost our ping_history of the last session, but still need to keep + // the oldest deadline for unhealthy check. + auto oldest = ping_history.begin(); + auto sent_stamp = oldest->first; + auto deadline = oldest->second.deadline; + ping_history.clear(); + ping_history.emplace(sent_stamp, reply_t{deadline, 0}); + } + session_started = false; +} + +void Heartbeat::Peer::notify_front_ready() +{ + assert(con_front); + assert(!front_ready); + assert(!session_started); + front_ready = true; + if (front_ready && back_ready) { + start_session(); + } +} + +void Heartbeat::Peer::connect_front() +{ + assert(!con_front); + assert(!front_ready); + assert(!session_started); + auto osdmap = heartbeat.service.get_osdmap_service().get_map(); + // TODO: use addrs + con_front = heartbeat.front_msgr->connect( + osdmap->get_hb_front_addrs(peer).front(), + entity_name_t(CEPH_ENTITY_TYPE_OSD, peer)); + if (con_front->is_connected()) { + notify_front_ready(); + } +} + +void Heartbeat::Peer::notify_back_ready() +{ + assert(con_back); + assert(!back_ready); + assert(!session_started); + back_ready = true; + if (front_ready && back_ready) { + start_session(); + } +} + +void Heartbeat::Peer::connect_back() +{ + assert(!con_back); + assert(!back_ready); + assert(!session_started); + auto osdmap = heartbeat.service.get_osdmap_service().get_map(); + // TODO: use addrs + con_back = heartbeat.back_msgr->connect( + osdmap->get_hb_back_addrs(peer).front(), + entity_name_t(CEPH_ENTITY_TYPE_OSD, peer)); + if (con_back->is_connected()) { + notify_back_ready(); } - disconnect(); - first_tx = {}; - last_tx = {}; - last_rx_front = {}; - last_rx_back = {}; - ping_history = {}; - connect(); } bool Heartbeat::FailingPeers::add_pending( diff --git a/src/crimson/osd/heartbeat.h b/src/crimson/osd/heartbeat.h index f60c0d5396ca..dd1ab949a7d9 100644 --- a/src/crimson/osd/heartbeat.h +++ b/src/crimson/osd/heartbeat.h @@ -26,7 +26,8 @@ class Heartbeat : public crimson::net::Dispatcher { public: using osd_id_t = int; - Heartbeat(const crimson::osd::ShardServices& service, + Heartbeat(osd_id_t whoami, + const crimson::osd::ShardServices& service, crimson::mon::Client& monc, crimson::net::MessengerRef front_msgr, crimson::net::MessengerRef back_msgr); @@ -48,6 +49,8 @@ public: seastar::future<> ms_dispatch(crimson::net::Connection* conn, MessageRef m) override; void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override; + void ms_handle_connect(crimson::net::ConnectionRef conn) override; + void ms_handle_accept(crimson::net::ConnectionRef conn) override; void print(std::ostream&) const; private: @@ -70,6 +73,7 @@ private: const entity_addrvec_t& addrs, ChainedDispatchersRef); private: + const osd_id_t whoami; const crimson::osd::ShardServices& service; crimson::mon::Client& monc; crimson::net::MessengerRef front_msgr; @@ -141,7 +145,9 @@ class Heartbeat::Peer { ceph::signedspan mnow, std::vector>&); seastar::future<> handle_reply(crimson::net::Connection*, Ref); - void handle_reset(crimson::net::ConnectionRef); + void handle_reset(crimson::net::ConnectionRef, bool is_replace); + void handle_connect(crimson::net::ConnectionRef); + void handle_accept(crimson::net::ConnectionRef); private: bool pinged() const; @@ -151,33 +157,56 @@ class Heartbeat::Peer { HEALTHY, }; health_state do_health_screen(clock::time_point now) const; - void connect(); - void disconnect(); + + // a session starts when con_front and con_back are both connected + void start_session(); + // a session resets when either con_front or con_back is reset + void reset_session(); + // notify when con_front becomes ready, possibly start session + void notify_front_ready(); + void connect_front(); + // notify when con_back becomes ready, possibly start session + void notify_back_ready(); + void connect_back(); + + void do_send_heartbeat(clock::time_point now, + ceph::signedspan mnow, + std::vector>*); private: Heartbeat& heartbeat; const osd_id_t peer; - /// peer connection (front) - crimson::net::ConnectionRef con_front; - /// peer connection (back) - crimson::net::ConnectionRef con_back; - /// time we sent our first ping request + // time we sent our first ping request clock::time_point first_tx; - /// last time we sent a ping request + // last time we sent a ping request clock::time_point last_tx; - /// last time we got a ping reply on the front side + // last time we got a ping reply on the front side clock::time_point last_rx_front; - /// last time we got a ping reply on the back side + // last time we got a ping reply on the back side clock::time_point last_rx_back; - /// most recent epoch we wanted this peer + // most recent epoch we wanted this peer epoch_t epoch; + // if racing happened + bool has_racing = false; + // peer connection (front) + crimson::net::ConnectionRef con_front; + bool front_ready = false; + // peer connection (back) + crimson::net::ConnectionRef con_back; + bool back_ready = false; + + // start to send pings and track ping_history + bool session_started = false; + // if need to send heartbeat when session started + bool pending_send = false; + struct reply_t { clock::time_point deadline; // one sent over front conn, another sent over back conn uint8_t unacknowledged = 0; }; - /// history of inflight pings, arranging by timestamp we sent + // history of inflight pings, arranging by timestamp we sent std::map ping_history; }; diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index 5a082d6c756e..590d8974d15c 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -82,7 +82,7 @@ OSD::OSD(int id, uint32_t nonce, local_conf().get_val("osd_data"), local_conf().get_config_values())}, shard_services{*this, *cluster_msgr, *public_msgr, *monc, *mgrc, *store}, - heartbeat{new Heartbeat{shard_services, *monc, hb_front_msgr, hb_back_msgr}}, + heartbeat{new Heartbeat{whoami, shard_services, *monc, hb_front_msgr, hb_back_msgr}}, // do this in background heartbeat_timer{[this] { update_heartbeat_peers(); }}, asok{seastar::make_lw_shared()},