From: Kefu Chai Date: Mon, 14 Jan 2019 08:59:09 +0000 (+0800) Subject: crimson/osd: add heartbeat support X-Git-Tag: v14.1.0~225^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e9506e79ea1eefac8706dd2abe32711f3964a58b;p=ceph.git crimson/osd: add heartbeat support Signed-off-by: Kefu Chai --- diff --git a/src/crimson/osd/CMakeLists.txt b/src/crimson/osd/CMakeLists.txt index 624414701538..e86a11b5e605 100644 --- a/src/crimson/osd/CMakeLists.txt +++ b/src/crimson/osd/CMakeLists.txt @@ -1,5 +1,6 @@ add_executable(crimson-osd chained_dispatchers.cc + heartbeat.cc main.cc osd.cc osd_meta.cc diff --git a/src/crimson/osd/heartbeat.cc b/src/crimson/osd/heartbeat.cc new file mode 100644 index 000000000000..c19ef809dff8 --- /dev/null +++ b/src/crimson/osd/heartbeat.cc @@ -0,0 +1,300 @@ +#include "heartbeat.h" + +#include + +#include "messages/MOSDPing.h" +#include "messages/MOSDFailure.h" + +#include "crimson/common/config_proxy.h" +#include "crimson/net/Connection.h" +#include "crimson/net/SocketMessenger.h" +#include "crimson/osd/osdmap_service.h" +#include "crimson/mon/MonClient.h" + +#include "osd/OSDMap.h" + +using ceph::common::local_conf; + +namespace { + seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_osd); + } + + template + Ref make_message(Args&&... args) + { + return {new Message{std::forward(args)...}, false}; + } +} + +Heartbeat::Heartbeat(int whoami, + uint32_t nonce, + const OSDMapService& service, + ceph::mon::Client& monc) + : front_msgr{new ceph::net::SocketMessenger{entity_name_t::OSD(whoami), + "hb_front", nonce}}, + back_msgr{new ceph::net::SocketMessenger{entity_name_t::OSD(whoami), + "hb_back", nonce}}, + service{service}, + monc{monc}, + timer{[this] {send_heartbeats();}} +{} + +seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs, + entity_addrvec_t back_addrs) +{ + logger().info("heartbeat: start"); + // i only care about the address, so any unused port would work + for (auto& addr : boost::join(front_addrs.v, back_addrs.v)) { + addr.set_port(0); + } + front_msgr->try_bind(front_addrs, + local_conf()->ms_bind_port_min, + local_conf()->ms_bind_port_max); + back_msgr->try_bind(front_addrs, + local_conf()->ms_bind_port_min, + local_conf()->ms_bind_port_max); + return seastar::when_all_succeed(front_msgr->start(this), + back_msgr->start(this)).then([this] { + timer.arm_periodic( + std::chrono::seconds(local_conf()->osd_heartbeat_interval)); + }); +} + +seastar::future<> Heartbeat::stop() +{ + return seastar::when_all_succeed(front_msgr->shutdown(), + back_msgr->shutdown()); +} + +const entity_addrvec_t& Heartbeat::get_front_addrs() const +{ + return front_msgr->get_myaddrs(); +} + +const entity_addrvec_t& Heartbeat::get_back_addrs() const +{ + return back_msgr->get_myaddrs(); +} + +void Heartbeat::add_peer(osd_id_t peer) +{ + auto found = peers.find(peer); + if (found == peers.end()) { + logger().info("add_peer({})", peer); + PeerInfo info; + auto osdmap = service.get_map(); + // TODO: msgr v2 + info.con_front = + front_msgr->connect(osdmap->get_hb_front_addrs(peer).legacy_addr(), + CEPH_ENTITY_TYPE_OSD); + info.con_back = + back_msgr->connect(osdmap->get_hb_back_addrs(peer).legacy_addr(), + CEPH_ENTITY_TYPE_OSD); + peers.emplace(peer, std::move(info)); + } +} + +seastar::future<> Heartbeat::remove_peer(osd_id_t peer) +{ + auto found = peers.find(peer); + assert(found != peers.end()); + logger().info("remove_peer({})", peer); + return seastar::when_all_succeed(found->second.con_front->close(), + found->second.con_back->close()).then( + [this, peer] { + peers.erase(peer); + return seastar::now(); + }); +} + +seastar::future<> Heartbeat::ms_dispatch(ceph::net::ConnectionRef conn, + MessageRef m) +{ + logger().info("heartbeat: ms_dispatch {}", *m); + switch (m->get_type()) { + case CEPH_MSG_PING: + return handle_osd_ping(conn, boost::static_pointer_cast(m)); + default: + return seastar::now(); + } +} + +seastar::future<> Heartbeat::handle_osd_ping(ceph::net::ConnectionRef conn, + Ref m) +{ + switch (m->op) { + case MOSDPing::PING: + return handle_ping(conn, m); + case MOSDPing::PING_REPLY: + return handle_reply(conn, m); + case MOSDPing::YOU_DIED: + return handle_you_died(); + default: + return seastar::now(); + } +} + +seastar::future<> Heartbeat::handle_ping(ceph::net::ConnectionRef conn, + Ref m) +{ + auto min_message = static_cast( + local_conf()->osd_heartbeat_min_size); + auto reply = + make_message(m->fsid, + service.get_map()->get_epoch(), + MOSDPing::PING_REPLY, + m->stamp, + min_message); + return conn->send(reply); +} + +seastar::future<> Heartbeat::handle_reply(ceph::net::ConnectionRef conn, + Ref m) +{ + const osd_id_t from = m->get_source().num(); + auto found = peers.find(from); + if (found == peers.end()) { + // stale reply + return seastar::now(); + } + auto& peer = found->second; + auto ping = peer.ping_history.find(m->stamp); + if (ping == peer.ping_history.end()) { + // old replies, deprecated by newly sent pings. + return seastar::now(); + } + const auto now = clock::now(); + auto& unacked = ping->second.unacknowledged; + if (conn == peer.con_back) { + peer.last_rx_back = now; + unacked--; + } else if (conn == peer.con_front) { + peer.last_rx_front = now; + unacked--; + } + if (unacked == 0) { + peer.ping_history.erase(peer.ping_history.begin(), ++ping); + } + if (peer.is_healthy(now)) { + // cancel false reports + failure_queue.erase(from); + if (auto pending = failure_pending.find(from); + pending != failure_pending.end()) { + return send_still_alive(from, pending->second.addrs); + } + } + return seastar::now(); +} + +seastar::future<> Heartbeat::handle_you_died() +{ + // TODO: ask for newer osdmap + return seastar::now(); +} + +seastar::future<> Heartbeat::send_heartbeats() +{ + using peers_item_t = typename peers_map_t::value_type; + return seastar::parallel_for_each(peers, + [this](peers_item_t& item) { + const auto now = clock::now(); + const auto deadline = + now + std::chrono::seconds(local_conf()->osd_heartbeat_grace); + auto& [peer, info] = item; + info.last_tx = now; + if (clock::is_zero(info.first_tx)) { + info.first_tx = now; + } + const utime_t sent_stamp{now}; + auto [reply, added] = info.ping_history.emplace(sent_stamp, + reply_t{deadline, 0}); + std::vector conns{info.con_front, + info.con_back}; + return seastar::parallel_for_each(std::move(conns), + [=] (auto con) { + if (con) { + auto min_message = static_cast( + local_conf()->osd_heartbeat_min_size); + auto ping = make_message(monc.get_fsid(), + service.get_map()->get_epoch(), + MOSDPing::PING, + sent_stamp, + min_message); + return con->send(ping).then([&reply] { + reply->second.unacknowledged++; + return seastar::now(); + }); + } else { + return seastar::now(); + } + }); + }); +} + +seastar::future<> Heartbeat::send_failures() +{ + using failure_item_t = typename failure_queue_t::value_type; + return seastar::parallel_for_each(failure_queue, + [this](failure_item_t& failure_item) { + auto [osd, failed_since] = failure_item; + if (failure_pending.count(osd)) { + return seastar::now(); + } + auto failed_for = chrono::duration_cast( + clock::now() - failed_since).count(); + auto osdmap = service.get_map(); + auto failure_report = + make_message(monc.get_fsid(), + osd, + osdmap->get_addrs(osd), + static_cast(failed_for), + osdmap->get_epoch()); + failure_pending.emplace(osd, failure_info_t{failed_since, + osdmap->get_addrs(osd)}); + return monc.send_message(failure_report); + }).then([this] { + failure_queue.clear(); + return seastar::now(); + }); +} + +seastar::future<> Heartbeat::send_still_alive(osd_id_t osd, + const entity_addrvec_t& addrs) +{ + auto still_alive = make_message(monc.get_fsid(), + osd, + addrs, + 0, + service.get_map()->get_epoch(), + MOSDFailure::FLAG_ALIVE); + return monc.send_message(still_alive).then([=] { + failure_pending.erase(osd); + return seastar::now(); + }); +} + +bool Heartbeat::PeerInfo::is_unhealthy(clock::time_point now) const +{ + if (ping_history.empty()) { + // we haven't sent a ping yet or we have got all replies, + // in either way we are safe and healthy for now + return false; + } else { + auto oldest_ping = ping_history.begin(); + return now > oldest_ping->second.deadline; + } +} + +bool Heartbeat::PeerInfo::is_healthy(clock::time_point now) const +{ + if (con_front && clock::is_zero(last_rx_front)) { + return false; + } + if (con_back && clock::is_zero(last_rx_back)) { + return false; + } + // only declare to be healthy until we have received the first + // replies from both front/back connections + return !is_unhealthy(now); +} diff --git a/src/crimson/osd/heartbeat.h b/src/crimson/osd/heartbeat.h new file mode 100644 index 000000000000..f59bbabe56f8 --- /dev/null +++ b/src/crimson/osd/heartbeat.h @@ -0,0 +1,107 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include +#include "common/ceph_time.h" +#include "crimson/net/Dispatcher.h" +#include "crimson/net/Fwd.h" + +class MOSDPing; +class OSDMapService; + +namespace ceph::mon { + class Client; +} + +template using Ref = boost::intrusive_ptr; + +class Heartbeat : public ceph::net::Dispatcher { +public: + using osd_id_t = int; + + Heartbeat(int whoami, + uint32_t nonce, + const OSDMapService& service, + ceph::mon::Client& monc); + + seastar::future<> start(entity_addrvec_t front, + entity_addrvec_t back); + seastar::future<> stop(); + + void add_peer(osd_id_t peer); + seastar::future<> remove_peer(osd_id_t peer); + + seastar::future<> send_heartbeats(); + seastar::future<> send_failures(); + + const entity_addrvec_t& get_front_addrs() const; + const entity_addrvec_t& get_back_addrs() const; + + // Dispatcher methods + seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn, + MessageRef m) override; + +private: + seastar::future<> handle_osd_ping(ceph::net::ConnectionRef conn, + Ref m); + seastar::future<> handle_ping(ceph::net::ConnectionRef conn, + Ref m); + seastar::future<> handle_reply(ceph::net::ConnectionRef conn, + Ref m); + seastar::future<> handle_you_died(); + + seastar::future<> send_still_alive(osd_id_t, const entity_addrvec_t&); + +private: + std::unique_ptr front_msgr; + std::unique_ptr back_msgr; + const OSDMapService& service; + ceph::mon::Client& monc; + + seastar::timer timer; + // use real_clock so it can be converted to utime_t + using clock = ceph::coarse_real_clock; + + struct reply_t { + clock::time_point deadline; + // one sent over front conn, another sent over back conn + uint8_t unacknowledged = 0; + }; + struct PeerInfo { + /// peer connection (front) + ceph::net::ConnectionRef con_front; + /// peer connection (back) + ceph::net::ConnectionRef con_back; + /// time we sent our first ping request + clock::time_point first_tx; + /// last time we sent a ping request + clock::time_point last_tx; + /// last time we got a ping reply on the front side + clock::time_point last_rx_front; + /// last time we got a ping reply on the back side + clock::time_point last_rx_back; + /// history of inflight pings, arranging by timestamp we sent + std::map ping_history; + + bool is_unhealthy(clock::time_point now) const; + bool is_healthy(clock::time_point now) const; + }; + using peers_map_t = std::map; + peers_map_t peers; + + // osds which are considered failed + // osd_id => when was the last time that both front and back pings were acked + // use for calculating how long the OSD has been unresponsive + using failure_queue_t = std::map; + failure_queue_t failure_queue; + struct failure_info_t { + clock::time_point failed_since; + entity_addrvec_t addrs; + }; + // osds we've reported to monior as failed ones, but they are not marked down + // yet + std::map failure_pending; +}; diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index 2650e2cfc39c..7b465c336828 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -12,6 +12,7 @@ #include "crimson/os/cyan_object.h" #include "crimson/os/cyan_store.h" #include "crimson/os/Transaction.h" +#include "crimson/osd/heartbeat.h" #include "crimson/osd/osd_meta.h" #include "crimson/osd/pg.h" #include "crimson/osd/pg_meta.h" @@ -26,6 +27,7 @@ namespace { { return {new Message{std::forward(args)...}, false}; } + static constexpr int TICK_INTERVAL = 1; } using ceph::common::local_conf; @@ -38,6 +40,8 @@ OSD::OSD(int id, uint32_t nonce) public_msgr{new ceph::net::SocketMessenger{entity_name_t::OSD(whoami), "client", nonce}}, monc{*public_msgr}, + heartbeat{new Heartbeat{whoami, nonce, *this, monc}}, + heartbeat_timer{[this] { update_heartbeat_peers(); }} { for (auto msgr : {cluster_msgr.get(), public_msgr.get()}) { if (local_conf()->ms_crc_data) { @@ -155,6 +159,9 @@ seastar::future<> OSD::start() monc.sub_want("mgrmap", 0, 0); monc.sub_want("osdmap", 0, 0); return monc.renew_subs(); + }).then([this] { + return heartbeat->start(public_msgr->get_myaddrs(), + cluster_msgr->get_myaddrs()); }).then([this] { return start_boot(); }); @@ -205,15 +212,14 @@ seastar::future<> OSD::_send_boot() { state.set_booting(); - entity_addrvec_t hb_back_addrs; - entity_addrvec_t hb_front_addrs; - + logger().info("hb_back_msgr: {}", heartbeat->get_back_addrs()); + logger().info("hb_front_msgr: {}", heartbeat->get_front_addrs()); logger().info("cluster_msgr: {}", cluster_msgr->get_myaddr()); auto m = make_message(superblock, osdmap->get_epoch(), osdmap->get_epoch(), - hb_back_addrs, - hb_front_addrs, + heartbeat->get_back_addrs(), + heartbeat->get_front_addrs(), cluster_msgr->get_myaddrs(), CEPH_FEATURES_ALL); return monc.send_message(m); @@ -224,6 +230,8 @@ seastar::future<> OSD::stop() // see also OSD::shutdown() state.set_stopping(); return gate.close().then([this] { + return heartbeat->stop(); + }).then([this] { return monc.stop(); }).then([this] { return public_msgr->shutdown(); @@ -496,6 +504,8 @@ seastar::future<> OSD::committed_osd_maps(version_t first, state.set_active(); beacon_timer.arm_periodic( std::chrono::seconds(local_conf()->osd_beacon_report_interval)); + heartbeat_timer.arm_periodic( + std::chrono::seconds(TICK_INTERVAL)); } } @@ -575,3 +585,22 @@ seastar::future<> OSD::send_beacon() min_last_epoch_clean); return monc.send_message(m); } + +void OSD::update_heartbeat_peers() +{ + if (!state.is_active()) { + return; + } + for (auto& pg : pgs) { + vector up, acting; + osdmap->pg_to_up_acting_osds(pg.first.pgid, + &up, nullptr, + &acting, nullptr); + for (auto osd : boost::join(up, acting)) { + if (osd != CRUSH_ITEM_NONE) { + heartbeat->add_peer(osd); + } + } + } + // TODO: remove down OSD +} diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index 51af15256d99..fc66666fba00 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -21,6 +21,7 @@ class MOSDMap; class OSDMap; class OSDMeta; class PG; +class Heartbeat; namespace ceph::net { class Messenger; @@ -46,6 +47,9 @@ class OSD : public ceph::net::Dispatcher, ChainedDispatchers dispatchers; ceph::mon::Client monc; + std::unique_ptr heartbeat; + seastar::timer heartbeat_timer; + // TODO: use LRU cache std::map> osdmaps; std::map map_bl_cache; @@ -115,4 +119,5 @@ private: seastar::future<> shutdown(); seastar::future<> send_beacon(); + void update_heartbeat_peers(); };