}
}
+seastar::future<Heartbeat::osds_t> Heartbeat::remove_down_peers()
+{
+ osds_t osds;
+ for (auto& peer : peers) {
+ osds.push_back(peer.first);
+ }
+ return seastar::map_reduce(std::move(osds),
+ [this](auto& osd) {
+ auto osdmap = service.get_map();
+ if (!osdmap->is_up(osd)) {
+ return remove_peer(osd).then([] {
+ return seastar::make_ready_future<osd_id_t>(-1);
+ });
+ } else if (peers[osd].epoch < osdmap->get_epoch()) {
+ return seastar::make_ready_future<osd_id_t>(osd);
+ } else {
+ return seastar::make_ready_future<osd_id_t>(-1);
+ }
+ }, osds_t{},
+ [this](osds_t&& extras, osd_id_t extra) {
+ if (extra >= 0) {
+ extras.push_back(extra);
+ }
+ return extras;
+ });
+}
+
+void Heartbeat::add_reporter_peers(int whoami)
+{
+ auto osdmap = service.get_map();
+ // include next and previous up osds to ensure we have a fully-connected set
+ set<int> want;
+ if (auto next = osdmap->get_next_up_osd_after(whoami); next >= 0) {
+ want.insert(next);
+ }
+ if (auto prev = osdmap->get_previous_up_osd_before(whoami); prev >= 0) {
+ want.insert(prev);
+ }
+ // make sure we have at least **min_down** osds coming from different
+ // subtree level (e.g., hosts) for fast failure detection.
+ auto min_down = local_conf().get_val<uint64_t>("mon_osd_min_down_reporters");
+ auto subtree = local_conf().get_val<string>("mon_osd_reporter_subtree_level");
+ osdmap->get_random_up_osds_by_subtree(
+ whoami, subtree, min_down, want, &want);
+ for (auto osd : want) {
+ add_peer(osd, osdmap->get_epoch());
+ }
+}
+
+seastar::future<> Heartbeat::update_peers(int whoami)
+{
+ const auto min_peers = static_cast<size_t>(
+ local_conf().get_val<int64_t>("osd_heartbeat_min_peers"));
+ return remove_down_peers().then([=](osds_t&& extra) {
+ add_reporter_peers(whoami);
+ // too many?
+ struct iteration_state {
+ osds_t::const_iterator where;
+ osds_t::const_iterator end;
+ };
+ return seastar::do_with(iteration_state{extra.begin(),extra.end()},
+ [=](iteration_state& s) {
+ return seastar::do_until(
+ [min_peers, &s, this] {
+ return peers.size() < min_peers || s.where == s.end; },
+ [&s, this] {
+ return remove_peer(*s.where); }
+ );
+ });
+ }).then([=] {
+ // or too few?
+ auto osdmap = service.get_map();
+ for (auto next = osdmap->get_next_up_osd_after(whoami);
+ peers.size() < min_peers && next >= 0 && next != whoami;
+ next = osdmap->get_next_up_osd_after(next)) {
+ add_peer(next, osdmap->get_epoch());
+ }
+ return seastar::now();
+ });
+}
+
seastar::future<> Heartbeat::remove_peer(osd_id_t peer)
{
auto found = peers.find(peer);
seastar::future<> stop();
void add_peer(osd_id_t peer, epoch_t epoch);
+ seastar::future<> update_peers(int whoami);
seastar::future<> remove_peer(osd_id_t peer);
seastar::future<> send_heartbeats();
seastar::future<> send_still_alive(osd_id_t, const entity_addrvec_t&);
+ using osds_t = std::vector<osd_id_t>;
+ /// remove down OSDs
+ /// @return peers not needed in this epoch
+ seastar::future<osds_t> remove_down_peers();
+ /// add enough reporters for fast failure detection
+ void add_reporter_peers(int whoami);
+
private:
std::unique_ptr<ceph::net::Messenger> front_msgr;
std::unique_ptr<ceph::net::Messenger> back_msgr;