]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: support 1 lossy connection between a heartbeat peer
authorYingxin Cheng <yingxin.cheng@intel.com>
Tue, 31 Mar 2020 01:24:06 +0000 (09:24 +0800)
committerKefu Chai <kchai@redhat.com>
Mon, 15 Jun 2020 14:06:42 +0000 (22:06 +0800)
1. fix dead loop due to racing of one heartbeat connection:

The Peer (renamed from PeerInfo) attempts to identify if the racing is
happening, this is because the heartbeat peer is not necessarily to be
symmetric. If racing is actually happening, Peer then decides to wait
for another side if it loses, or connect proactively if it wins
(whoami > peer).

2. fix dead loop between hb_front and hb_back connections of the same
peer:

For a reset event from either con_front or con_back, the heartbeat class
should not simply remove the related PeerInfo which contains both of
them. Instead, Peer is improved (complicated) to be a session-alike
class which remembers/tracks individual front/back connectivity. And it
only starts to track from a clean ping_history (same with the original
logic) when both are connected, and stop tracking as soon as either of
them is reset.

3. keep the compatibility with classic heartbeat:

The most important thing is to keep healthy/unhealthy check identical to
the original logic, because it has been working for a long time.  The
less important thing is that the original messenger policy and
Heartbeat::handle_ping() are untouched, so supposingly our heartbeat
component can still talk with classic OSD.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/osd/heartbeat.cc
src/crimson/osd/heartbeat.h
src/crimson/osd/osd.cc

index 8d4e71de1aee4d73bf8df423f70211ad8f54be0b..b5a40fb8977824cf306231ee632b05485506a306 100644 (file)
@@ -25,11 +25,13 @@ namespace {
   }
 }
 
-Heartbeat::Heartbeat(const crimson::osd::ShardServices& service,
+Heartbeat::Heartbeat(osd_id_t whoami,
+                     const crimson::osd::ShardServices& service,
                      crimson::mon::Client& monc,
                      crimson::net::MessengerRef front_msgr,
                      crimson::net::MessengerRef back_msgr)
-  : service{service},
+  : whoami{whoami},
+    service{service},
     monc{monc},
     front_msgr{front_msgr},
     back_msgr{back_msgr},
@@ -116,6 +118,7 @@ void Heartbeat::set_require_authorizer(bool require_authorizer)
 
 void Heartbeat::add_peer(osd_id_t _peer, epoch_t epoch)
 {
+  assert(whoami != _peer);
   auto [iter, added] = peers.try_emplace(_peer, *this, _peer);
   auto& peer = iter->second;
   peer.set_epoch(epoch);
@@ -210,7 +213,33 @@ void Heartbeat::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replac
   }
   if (auto found = peers.find(peer);
       found != peers.end()) {
-    found->second.handle_reset(conn);
+    found->second.handle_reset(conn, is_replace);
+  }
+}
+
+void Heartbeat::ms_handle_connect(crimson::net::ConnectionRef conn)
+{
+  auto peer = conn->get_peer_id();
+  if (conn->get_peer_type() != entity_name_t::TYPE_OSD ||
+      peer == entity_name_t::NEW) {
+    return;
+  }
+  if (auto found = peers.find(peer);
+      found != peers.end()) {
+    found->second.handle_connect(conn);
+  }
+}
+
+void Heartbeat::ms_handle_accept(crimson::net::ConnectionRef conn)
+{
+  auto peer = conn->get_peer_id();
+  if (conn->get_peer_type() != entity_name_t::TYPE_OSD ||
+      peer == entity_name_t::NEW) {
+    return;
+  }
+  if (auto found = peers.find(peer);
+      found != peers.end()) {
+    found->second.handle_accept(conn);
   }
 }
 
@@ -322,32 +351,24 @@ void Heartbeat::print(std::ostream& out) const
   out << "heartbeat";
 }
 
-void Heartbeat::Peer::connect()
-{
-  logger().info("peer osd.{} added", peer);
-  auto osdmap = heartbeat.service.get_osdmap_service().get_map();
-  // TODO: use addrs
-  con_front = heartbeat.front_msgr->connect(
-      osdmap->get_hb_front_addrs(peer).front(),
-      entity_name_t(CEPH_ENTITY_TYPE_OSD, peer));
-  con_back = heartbeat.back_msgr->connect(
-      osdmap->get_hb_back_addrs(peer).front(),
-      entity_name_t(CEPH_ENTITY_TYPE_OSD, peer));
-}
-
 Heartbeat::Peer::Peer(Heartbeat& heartbeat, osd_id_t peer)
   : heartbeat(heartbeat), peer(peer)
-{ connect(); }
-
-void Heartbeat::Peer::disconnect()
 {
-  logger().info("peer osd.{} removed", peer);
-  con_front->mark_down();
-  con_back->mark_down();
+  logger().info("Heartbeat::Peer: osd.{} added", peer);
+  connect_front();
+  connect_back();
 }
 
 Heartbeat::Peer::~Peer()
-{ disconnect(); }
+{
+  logger().info("Heartbeat::Peer: osd.{} removed", peer);
+  if (con_front) {
+    con_front->mark_down();
+  }
+  if (con_back) {
+    con_back->mark_down();
+  }
+}
 
 bool Heartbeat::Peer::pinged() const
 {
@@ -386,13 +407,13 @@ Heartbeat::Peer::failed_since(clock::time_point now) const
     auto oldest_deadline = ping_history.begin()->second.deadline;
     auto failed_since = std::min(last_rx_back, last_rx_front);
     if (clock::is_zero(failed_since)) {
-      logger().error("failed_since: no reply from osd.{} "
+      logger().error("Heartbeat::Peer::failed_since(): no reply from osd.{} "
                      "ever on either front or back, first ping sent {} "
                      "(oldest deadline {})",
                      peer, first_tx, oldest_deadline);
       failed_since = first_tx;
     } else {
-      logger().error("failed_since: no reply from osd.{} "
+      logger().error("Heartbeat::Peer::failed_since(): no reply from osd.{} "
                      "since back {} front {} (oldest deadline {})",
                      peer, last_rx_back, last_rx_front, oldest_deadline);
     }
@@ -402,16 +423,12 @@ Heartbeat::Peer::failed_since(clock::time_point now) const
   }
 }
 
-void Heartbeat::Peer::send_heartbeat(
+void Heartbeat::Peer::do_send_heartbeat(
     clock::time_point now,
     ceph::signedspan mnow,
-    std::vector<seastar::future<>>& futures)
+    std::vector<seastar::future<>>* futures)
 {
-  if (!pinged()) {
-    first_tx = now;
-  }
-  last_tx = now;
-
+  assert(session_started);
   const utime_t sent_stamp{now};
   const auto deadline =
     now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
@@ -430,13 +447,95 @@ void Heartbeat::Peer::send_heartbeat(
       heartbeat.service.get_osdmap_service().get_up_epoch(),
       min_message);
     reply->second.unacknowledged++;
-    futures.push_back(con->send(std::move(ping)));
+    if (futures) {
+      futures->push_back(con->send(std::move(ping)));
+    }
+  }
+}
+
+void Heartbeat::Peer::send_heartbeat(
+    clock::time_point now,
+    ceph::signedspan mnow,
+    std::vector<seastar::future<>>& futures)
+{
+  if (!pinged()) {
+    first_tx = now;
+  }
+  last_tx = now;
+
+  if (session_started) {
+    do_send_heartbeat(now, mnow, &futures);
+
+    // validate connection addresses
+    const auto osdmap = heartbeat.service.get_osdmap_service().get_map();
+    const auto front_addr = osdmap->get_hb_front_addrs(peer).front();
+    if (con_front->get_peer_addr() != front_addr) {
+      logger().info("Heartbeat::Peer::send_heartbeat(): "
+                    "peer osd.{} con_front has new address {} over {}, reset",
+                    peer, front_addr, con_front->get_peer_addr());
+      con_front->mark_down();
+      has_racing = false;
+      handle_reset(con_front, false);
+    }
+    const auto back_addr = osdmap->get_hb_back_addrs(peer).front();
+    if (con_back->get_peer_addr() != back_addr) {
+      logger().info("Heartbeat::Peer::send_heartbeat(): "
+                    "peer osd.{} con_back has new address {} over {}, reset",
+                    peer, back_addr, con_back->get_peer_addr());
+      con_back->mark_down();
+      has_racing = false;
+      handle_reset(con_back, false);
+    }
+  } else {
+    // we should send MOSDPing but still cannot at this moment
+    if (pending_send) {
+      // we have already pending for a entire heartbeat interval
+      logger().warn("Heartbeat::Peer::send_heartbeat(): "
+                    "heartbeat to {} is still pending...", peer);
+      has_racing = false;
+      // retry con_front if still pending
+      if (!front_ready) {
+        if (con_front) {
+          con_front->mark_down();
+          handle_reset(con_front, false);
+        } else {
+          connect_front();
+        }
+      }
+      // retry con_back if still pending
+      if (!back_ready) {
+        if (con_back) {
+          con_back->mark_down();
+          handle_reset(con_back, false);
+        } else {
+          connect_back();
+        }
+      }
+    } else {
+      logger().info("Heartbeat::Peer::send_heartbeat(): "
+                    "heartbeat to {} is pending send...", peer);
+      // maintain an entry in ping_history for unhealthy check
+      if (ping_history.empty()) {
+        const utime_t sent_stamp{now};
+        const auto deadline =
+          now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
+        [[maybe_unused]] auto [reply, added] =
+          ping_history.emplace(sent_stamp, reply_t{deadline, 0});
+      } else { // the entry is already added
+        assert(ping_history.size() == 1);
+      }
+      pending_send = true;
+    }
   }
 }
 
 seastar::future<> Heartbeat::Peer::handle_reply(
     crimson::net::Connection* conn, Ref<MOSDPing> m)
 {
+  if (!session_started) {
+    // we haven't sent any ping yet
+    return seastar::now();
+  }
   auto ping = ping_history.find(m->ping_stamp);
   if (ping == ping_history.end()) {
     // old replies, deprecated by newly sent pings.
@@ -444,6 +543,7 @@ seastar::future<> Heartbeat::Peer::handle_reply(
   }
   const auto now = clock::now();
   auto& unacked = ping->second.unacknowledged;
+  assert(unacked);
   if (conn == con_back.get()) {
     last_rx_back = now;
     unacked--;
@@ -460,18 +560,187 @@ seastar::future<> Heartbeat::Peer::handle_reply(
   return seastar::now();
 }
 
-void Heartbeat::Peer::handle_reset(crimson::net::ConnectionRef conn)
+void Heartbeat::Peer::handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
+{
+  if (con_front == conn) {
+    con_front = nullptr;
+    if (is_replace) {
+      assert(!front_ready);
+      assert(!session_started);
+      // set the racing connection, will be handled by handle_accept()
+      con_front = heartbeat.front_msgr->connect(
+          conn->get_peer_addr(), entity_name_t(CEPH_ENTITY_TYPE_OSD, peer));
+      has_racing = true;
+      logger().warn("Heartbeat::Peer::handle_reset(): "
+                    "con_front racing with osd.{}, updated by {}",
+                    peer, con_front);
+    } else {
+      if (front_ready) {
+        front_ready = false;
+      }
+      if (session_started) {
+        reset_session();
+      }
+      assert(heartbeat.whoami != peer);
+      if (heartbeat.whoami > peer || !has_racing) {
+        connect_front();
+      } else { // whoami < peer && has_racing
+        logger().info("Heartbeat::Peer::handle_reset(): "
+                      "con_front racing detected and lose, "
+                      "waiting for osd.{} connect me", peer);
+      }
+    }
+  } else if (con_back == conn) {
+    con_back = nullptr;
+    if (is_replace) {
+      assert(!back_ready);
+      assert(!session_started);
+      // set the racing connection, will be handled by handle_accept()
+      con_back = heartbeat.back_msgr->connect(
+          conn->get_peer_addr(), entity_name_t(CEPH_ENTITY_TYPE_OSD, peer));
+      has_racing = true;
+      logger().warn("Heartbeat::Peer::handle_reset(): "
+                    "con_back racing with osd.{}, updated by {}",
+                    peer, con_back);
+    } else {
+      if (back_ready) {
+        back_ready = false;
+      }
+      if (session_started) {
+        reset_session();
+      }
+      if (heartbeat.whoami == peer) {
+        logger().error("Heartbeat::Peer::handle_reset(): "
+                       "peer is myself ({})", peer);
+      } else if (heartbeat.whoami > peer || !has_racing) {
+        connect_back();
+      } else { // whoami < peer && has_racing
+        logger().info("Heartbeat::Peer::handle_reset(): "
+                      "con_back racing detected and lose, "
+                      "waiting for osd.{} connect me", peer);
+      }
+    }
+  } else {
+    // ignore the unrelated conn
+  }
+}
+
+void Heartbeat::Peer::handle_connect(crimson::net::ConnectionRef conn)
 {
-  if (con_front != conn && con_back != conn) {
-    return;
+  if (con_front == conn) {
+    assert(!front_ready);
+    assert(!session_started);
+    notify_front_ready();
+  } else if (con_back == conn) {
+    assert(!back_ready);
+    assert(!session_started);
+    notify_back_ready();
+  } else {
+    // ignore the unrelated connection
+  }
+}
+
+void Heartbeat::Peer::handle_accept(crimson::net::ConnectionRef conn)
+{
+  handle_connect(conn);
+
+  const auto peer_addr = conn->get_peer_addr();
+  const auto osdmap = heartbeat.service.get_osdmap_service().get_map();
+  const auto front_addr = osdmap->get_hb_front_addrs(peer).front();
+  if (!con_front && front_addr == peer_addr) {
+    logger().info("Heartbeat::Peer::handle_accept(): "
+                  "con_front racing resolved for osd.{}", peer);
+    con_front = conn;
+    notify_front_ready();
+  }
+  const auto back_addr = osdmap->get_hb_back_addrs(peer).front();
+  if (!con_back && back_addr == peer_addr) {
+    logger().info("Heartbeat::Peer::handle_accept(): "
+                  "con_back racing resolved for osd.{}", peer);
+    con_back = conn;
+    notify_back_ready();
+  }
+}
+
+void Heartbeat::Peer::start_session()
+{
+  logger().info("Heartbeat::Peer: osd.{} started (send={})",
+                peer, pending_send);
+  assert(!session_started);
+  session_started = true;
+  ping_history.clear();
+  if (pending_send) {
+    pending_send = false;
+    do_send_heartbeat(clock::now(), heartbeat.service.get_mnow(), nullptr);
+  }
+}
+
+void Heartbeat::Peer::reset_session()
+{
+  logger().info("Heartbeat::Peer: osd.{} reset", peer);
+  assert(session_started);
+  if (!ping_history.empty()) {
+    // we lost our ping_history of the last session, but still need to keep
+    // the oldest deadline for unhealthy check.
+    auto oldest = ping_history.begin();
+    auto sent_stamp = oldest->first;
+    auto deadline = oldest->second.deadline;
+    ping_history.clear();
+    ping_history.emplace(sent_stamp, reply_t{deadline, 0});
+  }
+  session_started = false;
+}
+
+void Heartbeat::Peer::notify_front_ready()
+{
+  assert(con_front);
+  assert(!front_ready);
+  assert(!session_started);
+  front_ready = true;
+  if (front_ready && back_ready) {
+    start_session();
+  }
+}
+
+void Heartbeat::Peer::connect_front()
+{
+  assert(!con_front);
+  assert(!front_ready);
+  assert(!session_started);
+  auto osdmap = heartbeat.service.get_osdmap_service().get_map();
+  // TODO: use addrs
+  con_front = heartbeat.front_msgr->connect(
+      osdmap->get_hb_front_addrs(peer).front(),
+      entity_name_t(CEPH_ENTITY_TYPE_OSD, peer));
+  if (con_front->is_connected()) {
+    notify_front_ready();
+  }
+}
+
+void Heartbeat::Peer::notify_back_ready()
+{
+  assert(con_back);
+  assert(!back_ready);
+  assert(!session_started);
+  back_ready = true;
+  if (front_ready && back_ready) {
+    start_session();
+  }
+}
+
+void Heartbeat::Peer::connect_back()
+{
+  assert(!con_back);
+  assert(!back_ready);
+  assert(!session_started);
+  auto osdmap = heartbeat.service.get_osdmap_service().get_map();
+  // TODO: use addrs
+  con_back = heartbeat.back_msgr->connect(
+      osdmap->get_hb_back_addrs(peer).front(),
+      entity_name_t(CEPH_ENTITY_TYPE_OSD, peer));
+  if (con_back->is_connected()) {
+    notify_back_ready();
   }
-  disconnect();
-  first_tx = {};
-  last_tx = {};
-  last_rx_front = {};
-  last_rx_back = {};
-  ping_history = {};
-  connect();
 }
 
 bool Heartbeat::FailingPeers::add_pending(
index f60c0d5396cae3900021c7e1bfde6f93379ff2b4..dd1ab949a7d9922769b563e8d3b76aa7a3aa5fc9 100644 (file)
@@ -26,7 +26,8 @@ class Heartbeat : public crimson::net::Dispatcher {
 public:
   using osd_id_t = int;
 
-  Heartbeat(const crimson::osd::ShardServices& service,
+  Heartbeat(osd_id_t whoami,
+            const crimson::osd::ShardServices& service,
            crimson::mon::Client& monc,
            crimson::net::MessengerRef front_msgr,
            crimson::net::MessengerRef back_msgr);
@@ -48,6 +49,8 @@ public:
   seastar::future<> ms_dispatch(crimson::net::Connection* conn,
                                MessageRef m) override;
   void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
+  void ms_handle_connect(crimson::net::ConnectionRef conn) override;
+  void ms_handle_accept(crimson::net::ConnectionRef conn) override;
 
   void print(std::ostream&) const;
 private:
@@ -70,6 +73,7 @@ private:
                                    const entity_addrvec_t& addrs,
                                    ChainedDispatchersRef);
 private:
+  const osd_id_t whoami;
   const crimson::osd::ShardServices& service;
   crimson::mon::Client& monc;
   crimson::net::MessengerRef front_msgr;
@@ -141,7 +145,9 @@ class Heartbeat::Peer {
                       ceph::signedspan mnow,
                       std::vector<seastar::future<>>&);
   seastar::future<> handle_reply(crimson::net::Connection*, Ref<MOSDPing>);
-  void handle_reset(crimson::net::ConnectionRef);
+  void handle_reset(crimson::net::ConnectionRef, bool is_replace);
+  void handle_connect(crimson::net::ConnectionRef);
+  void handle_accept(crimson::net::ConnectionRef);
 
  private:
   bool pinged() const;
@@ -151,33 +157,56 @@ class Heartbeat::Peer {
     HEALTHY,
   };
   health_state do_health_screen(clock::time_point now) const;
-  void connect();
-  void disconnect();
+
+  // a session starts when con_front and con_back are both connected
+  void start_session();
+  // a session resets when either con_front or con_back is reset
+  void reset_session();
+  // notify when con_front becomes ready, possibly start session
+  void notify_front_ready();
+  void connect_front();
+  // notify when con_back becomes ready, possibly start session
+  void notify_back_ready();
+  void connect_back();
+
+  void do_send_heartbeat(clock::time_point now,
+                         ceph::signedspan mnow,
+                         std::vector<seastar::future<>>*);
 
  private:
   Heartbeat& heartbeat;
   const osd_id_t peer;
 
-  /// peer connection (front)
-  crimson::net::ConnectionRef con_front;
-  /// peer connection (back)
-  crimson::net::ConnectionRef con_back;
-  /// time we sent our first ping request
+  // time we sent our first ping request
   clock::time_point first_tx;
-  /// last time we sent a ping request
+  // last time we sent a ping request
   clock::time_point last_tx;
-  /// last time we got a ping reply on the front side
+  // last time we got a ping reply on the front side
   clock::time_point last_rx_front;
-  /// last time we got a ping reply on the back side
+  // last time we got a ping reply on the back side
   clock::time_point last_rx_back;
-  /// most recent epoch we wanted this peer
+  // most recent epoch we wanted this peer
   epoch_t epoch;
 
+  // if racing happened
+  bool has_racing = false;
+  // peer connection (front)
+  crimson::net::ConnectionRef con_front;
+  bool front_ready = false;
+  // peer connection (back)
+  crimson::net::ConnectionRef con_back;
+  bool back_ready = false;
+
+  // start to send pings and track ping_history
+  bool session_started = false;
+  // if need to send heartbeat when session started
+  bool pending_send = false;
+
   struct reply_t {
     clock::time_point deadline;
     // one sent over front conn, another sent over back conn
     uint8_t unacknowledged = 0;
   };
-  /// history of inflight pings, arranging by timestamp we sent
+  // history of inflight pings, arranging by timestamp we sent
   std::map<utime_t, reply_t> ping_history;
 };
index 5a082d6c756e675851d8bdca9ed057aafcfab4c2..590d8974d15c3121d22c01d92a30a7f741e36551 100644 (file)
@@ -82,7 +82,7 @@ OSD::OSD(int id, uint32_t nonce,
       local_conf().get_val<std::string>("osd_data"),
       local_conf().get_config_values())},
     shard_services{*this, *cluster_msgr, *public_msgr, *monc, *mgrc, *store},
-    heartbeat{new Heartbeat{shard_services, *monc, hb_front_msgr, hb_back_msgr}},
+    heartbeat{new Heartbeat{whoami, shard_services, *monc, hb_front_msgr, hb_back_msgr}},
     // do this in background
     heartbeat_timer{[this] { update_heartbeat_peers(); }},
     asok{seastar::make_lw_shared<crimson::admin::AdminSocket>()},