From: Yingxin Cheng Date: Sat, 14 Mar 2020 10:34:19 +0000 (+0800) Subject: crimson/osd: make send_heartbeat() atomic X-Git-Tag: v16.0.0~8^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=373e16499efbcb9e0ede0fee0b009e399784eee5;p=ceph.git crimson/osd: make send_heartbeat() atomic The item in Heartbeat:peers could be removed/re-added during the asynchronous operation. Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/osd/heartbeat.cc b/src/crimson/osd/heartbeat.cc index fdbcef94d51f..6f60654854ca 100644 --- a/src/crimson/osd/heartbeat.cc +++ b/src/crimson/osd/heartbeat.cc @@ -326,46 +326,41 @@ void Heartbeat::heartbeat_check() seastar::future<> Heartbeat::send_heartbeats() { - using peers_item_t = typename peers_map_t::value_type; - return seastar::parallel_for_each(peers, - [this](peers_item_t& item) { - const auto mnow = service.get_mnow(); - const auto now = clock::now(); - const auto deadline = - now + std::chrono::seconds(local_conf()->osd_heartbeat_grace); - auto& info = item.second; - info.last_tx = now; - if (clock::is_zero(info.first_tx)) { - info.first_tx = now; + const auto mnow = service.get_mnow(); + const auto now = clock::now(); + const auto deadline = + now + std::chrono::seconds(local_conf()->osd_heartbeat_grace); + const utime_t sent_stamp{now}; + + std::vector> futures; + for (auto& item : peers) { + auto& info = item.second; + info.last_tx = now; + if (clock::is_zero(info.first_tx)) { + info.first_tx = now; + } + [[maybe_unused]] auto [reply, added] = + info.ping_history.emplace(sent_stamp, reply_t{deadline, 0}); + crimson::net::ConnectionRef conns[] = {info.con_front, info.con_back}; + for (auto& con : conns) { + if (con) { + auto min_message = static_cast( + local_conf()->osd_heartbeat_min_size); + auto ping = make_message( + monc.get_fsid(), + service.get_osdmap_service().get_map()->get_epoch(), + MOSDPing::PING, + sent_stamp, + mnow, + mnow, + service.get_osdmap_service().get_up_epoch(), + min_message); + reply->second.unacknowledged++; + futures.push_back(con->send(std::move(ping))); } - const utime_t sent_stamp{now}; - [[maybe_unused]] auto [reply, added] = - info.ping_history.emplace(sent_stamp, reply_t{deadline, 0}); - std::vector conns{info.con_front, - info.con_back}; - return seastar::parallel_for_each(std::move(conns), - [sent_stamp, mnow, &reply=reply->second, this] (auto con) { - if (con) { - auto min_message = static_cast( - local_conf()->osd_heartbeat_min_size); - auto ping = make_message( - monc.get_fsid(), - service.get_osdmap_service().get_map()->get_epoch(), - MOSDPing::PING, - sent_stamp, - mnow, - mnow, - service.get_osdmap_service().get_up_epoch(), - min_message); - return con->send(ping).then([&reply] { - reply.unacknowledged++; - return seastar::now(); - }); - } else { - return seastar::now(); - } - }); - }); + } + } + return seastar::when_all_succeed(futures.begin(), futures.end()); } seastar::future<> Heartbeat::send_failures(failure_queue_t&& failure_queue)