]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: make send_heartbeat() atomic
authorYingxin Cheng <yingxin.cheng@intel.com>
Sat, 14 Mar 2020 10:34:19 +0000 (18:34 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Mon, 23 Mar 2020 05:00:15 +0000 (13:00 +0800)
The item in Heartbeat:peers could be removed/re-added during the
asynchronous operation.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
src/crimson/osd/heartbeat.cc

index fdbcef94d51f1eb4a0e9ab1507f5c13e5deec535..6f60654854caeb25e8555ff1f0902baabfb9659c 100644 (file)
@@ -326,46 +326,41 @@ void Heartbeat::heartbeat_check()
 
 seastar::future<> Heartbeat::send_heartbeats()
 {
-  using peers_item_t = typename peers_map_t::value_type;
-  return seastar::parallel_for_each(peers,
-    [this](peers_item_t& item) {
-      const auto mnow = service.get_mnow();
-      const auto now = clock::now();
-      const auto deadline =
-        now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
-      auto& info = item.second;
-      info.last_tx = now;
-      if (clock::is_zero(info.first_tx)) {
-        info.first_tx = now;
+  const auto mnow = service.get_mnow();
+  const auto now = clock::now();
+  const auto deadline =
+    now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
+  const utime_t sent_stamp{now};
+
+  std::vector<seastar::future<>> futures;
+  for (auto& item : peers) {
+    auto& info = item.second;
+    info.last_tx = now;
+    if (clock::is_zero(info.first_tx)) {
+      info.first_tx = now;
+    }
+    [[maybe_unused]] auto [reply, added] =
+      info.ping_history.emplace(sent_stamp, reply_t{deadline, 0});
+    crimson::net::ConnectionRef conns[] = {info.con_front, info.con_back};
+    for (auto& con : conns) {
+      if (con) {
+        auto min_message = static_cast<uint32_t>(
+          local_conf()->osd_heartbeat_min_size);
+        auto ping = make_message<MOSDPing>(
+          monc.get_fsid(),
+          service.get_osdmap_service().get_map()->get_epoch(),
+          MOSDPing::PING,
+          sent_stamp,
+          mnow,
+          mnow,
+          service.get_osdmap_service().get_up_epoch(),
+          min_message);
+        reply->second.unacknowledged++;
+        futures.push_back(con->send(std::move(ping)));
       }
-      const utime_t sent_stamp{now};
-      [[maybe_unused]] auto [reply, added] =
-        info.ping_history.emplace(sent_stamp, reply_t{deadline, 0});
-      std::vector<crimson::net::ConnectionRef> conns{info.con_front,
-                                                     info.con_back};
-      return seastar::parallel_for_each(std::move(conns),
-        [sent_stamp, mnow, &reply=reply->second, this] (auto con) {
-          if (con) {
-            auto min_message = static_cast<uint32_t>(
-              local_conf()->osd_heartbeat_min_size);
-            auto ping = make_message<MOSDPing>(
-             monc.get_fsid(),
-             service.get_osdmap_service().get_map()->get_epoch(),
-             MOSDPing::PING,
-             sent_stamp,
-             mnow,
-             mnow,
-             service.get_osdmap_service().get_up_epoch(),
-             min_message);
-            return con->send(ping).then([&reply] {
-              reply.unacknowledged++;
-              return seastar::now();
-            });
-          } else {
-            return seastar::now();
-          }
-        });
-    });
+    }
+  }
+  return seastar::when_all_succeed(futures.begin(), futures.end());
 }
 
 seastar::future<> Heartbeat::send_failures(failure_queue_t&& failure_queue)