seastar::future<> Heartbeat::send_heartbeats()
{
- using peers_item_t = typename peers_map_t::value_type;
- return seastar::parallel_for_each(peers,
- [this](peers_item_t& item) {
- const auto mnow = service.get_mnow();
- const auto now = clock::now();
- const auto deadline =
- now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
- auto& info = item.second;
- info.last_tx = now;
- if (clock::is_zero(info.first_tx)) {
- info.first_tx = now;
+ const auto mnow = service.get_mnow();
+ const auto now = clock::now();
+ const auto deadline =
+ now + std::chrono::seconds(local_conf()->osd_heartbeat_grace);
+ const utime_t sent_stamp{now};
+
+ std::vector<seastar::future<>> futures;
+ for (auto& item : peers) {
+ auto& info = item.second;
+ info.last_tx = now;
+ if (clock::is_zero(info.first_tx)) {
+ info.first_tx = now;
+ }
+ [[maybe_unused]] auto [reply, added] =
+ info.ping_history.emplace(sent_stamp, reply_t{deadline, 0});
+ crimson::net::ConnectionRef conns[] = {info.con_front, info.con_back};
+ for (auto& con : conns) {
+ if (con) {
+ auto min_message = static_cast<uint32_t>(
+ local_conf()->osd_heartbeat_min_size);
+ auto ping = make_message<MOSDPing>(
+ monc.get_fsid(),
+ service.get_osdmap_service().get_map()->get_epoch(),
+ MOSDPing::PING,
+ sent_stamp,
+ mnow,
+ mnow,
+ service.get_osdmap_service().get_up_epoch(),
+ min_message);
+ reply->second.unacknowledged++;
+ futures.push_back(con->send(std::move(ping)));
}
- const utime_t sent_stamp{now};
- [[maybe_unused]] auto [reply, added] =
- info.ping_history.emplace(sent_stamp, reply_t{deadline, 0});
- std::vector<crimson::net::ConnectionRef> conns{info.con_front,
- info.con_back};
- return seastar::parallel_for_each(std::move(conns),
- [sent_stamp, mnow, &reply=reply->second, this] (auto con) {
- if (con) {
- auto min_message = static_cast<uint32_t>(
- local_conf()->osd_heartbeat_min_size);
- auto ping = make_message<MOSDPing>(
- monc.get_fsid(),
- service.get_osdmap_service().get_map()->get_epoch(),
- MOSDPing::PING,
- sent_stamp,
- mnow,
- mnow,
- service.get_osdmap_service().get_up_epoch(),
- min_message);
- return con->send(ping).then([&reply] {
- reply.unacknowledged++;
- return seastar::now();
- });
- } else {
- return seastar::now();
- }
- });
- });
+ }
+ }
+ return seastar::when_all_succeed(futures.begin(), futures.end());
}
seastar::future<> Heartbeat::send_failures(failure_queue_t&& failure_queue)