]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: committed_osd_maps into coroutines
authorMatan Breizman <mbreizma@redhat.com>
Wed, 28 May 2025 10:48:38 +0000 (10:48 +0000)
committerMatan Breizman <mbreizma@redhat.com>
Sun, 27 Jul 2025 10:53:05 +0000 (10:53 +0000)
Signed-off-by: Matan Breizman <mbreizma@redhat.com>
src/crimson/osd/osd.cc

index b67070ec620a693eb660b085ad9bac3b3ea222f9..8a3ec2640c09cfec9a64a71be5984cc9bbd36ebc 100644 (file)
@@ -11,6 +11,7 @@
 #include <fmt/os.h>
 #include <fmt/ostream.h>
 #include <seastar/core/timer.hh>
+#include <seastar/coroutine/parallel_for_each.hh>
 
 #include "common/pick_address.h"
 #include "include/util.h"
@@ -1236,118 +1237,86 @@ seastar::future<> OSD::committed_osd_maps(
   INFO("osd.{} ({}, {})", whoami, first, last);
   // advance through the new maps
   auto old_map = osdmap;
-  return seastar::do_for_each(boost::make_counting_iterator(first),
-                              boost::make_counting_iterator(last + 1),
-                              [this, old_map, FNAME](epoch_t cur) {
-    return pg_shard_manager.get_local_map(
-      cur
-    ).then([this, old_map, FNAME](OSDMapService::local_cached_map_t&& o) {
-      osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(o));
-      std::set<int> old_osds;
-      old_map->get_all_osds(old_osds);
-      return seastar::parallel_for_each(
-       old_osds,
-       [this, FNAME, old_map](auto &osd_id) {
-       DEBUG("osd.{}: whoami ? {}, old up ? {} , now down ? {}",
-         osd_id, osd_id != whoami,
-         old_map->is_up(osd_id), osdmap->is_down(osd_id));
-       if (osd_id != whoami &&
-           old_map->is_up(osd_id) &&
-           osdmap->is_down(osd_id)) {
-         DEBUG("osd.{}: mark osd.{} down", whoami, osd_id);
-         return cluster_msgr->mark_down(
-           osdmap->get_cluster_addrs(osd_id).front());
-       }
-       return seastar::now();
-      }).then([this, o=std::move(o)]() mutable {
-       return pg_shard_manager.update_map(std::move(o));
-      });
-    }).then([this] {
-      if (get_shard_services().get_up_epoch() == 0 &&
-         osdmap->is_up(whoami) &&
-         osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) {
-       return pg_shard_manager.set_up_epoch(
-         osdmap->get_epoch()
-       ).then([this] {
-         if (!boot_epoch) {
-           boot_epoch = osdmap->get_epoch();
-         }
-       });
-      } else {
-       return seastar::now();
+  for (epoch_t cur = first; cur <= last; cur++) {
+    OSDMapService::local_cached_map_t&& o = co_await pg_shard_manager.get_local_map(cur);
+    osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(o));
+    std::set<int> old_osds;
+    old_map->get_all_osds(old_osds);
+    co_await seastar::coroutine::parallel_for_each(old_osds,
+        [this, FNAME, old_map](auto &osd_id) -> seastar::future<> {
+      DEBUG("osd.{}: whoami ? {}, old up ? {} , now down ? {}",
+        osd_id, osd_id != whoami,
+        old_map->is_up(osd_id), osdmap->is_down(osd_id));
+      if (osd_id != whoami &&
+          old_map->is_up(osd_id) &&
+          osdmap->is_down(osd_id)) {
+        DEBUG("osd.{}: mark osd.{} down", whoami, osd_id);
+        co_await cluster_msgr->mark_down(osdmap->get_cluster_addrs(osd_id).front());
       }
     });
-  }).then([FNAME, m, this] {
-    auto fut = seastar::now();
-    if (osdmap->is_up(whoami)) {
-      const auto up_from = osdmap->get_up_from(whoami);
-      INFO("osd.{}: map e {} marked me up: up_from {}, bind_epoch {}, state {}",
-          whoami, osdmap->get_epoch(), up_from, bind_epoch,
-          pg_shard_manager.get_osd_state_string());
-      if (bind_epoch < up_from &&
-          osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() &&
-          pg_shard_manager.is_booting()) {
-        INFO("osd.{}: activating...", whoami);
-        fut = pg_shard_manager.set_active().then([this] {
-          beacon_timer.arm_periodic(
-            std::chrono::seconds(local_conf()->osd_beacon_report_interval));
-         // timer continuation rearms when complete
-          tick_timer.arm(
-            std::chrono::seconds(TICK_INTERVAL));
-        });
-      }
-    } else {
-      if (pg_shard_manager.is_prestop()) {
-       got_stop_ack();
-       return seastar::now();
+
+    co_await pg_shard_manager.update_map(std::move(o));
+    if (get_shard_services().get_up_epoch() == 0 &&
+        osdmap->is_up(whoami) &&
+        osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) {
+      co_await pg_shard_manager.set_up_epoch(osdmap->get_epoch());
+      if (!boot_epoch) {
+        boot_epoch = osdmap->get_epoch();
       }
     }
-    return fut.then([this] {
-      return update_heartbeat_peers();
-    }).then([FNAME, this] {
-      return check_osdmap_features().then([FNAME, this] {
-        // yay!
-        INFO("osd.{}: committed_osd_maps: broadcasting osdmaps up"
-            " to {} epoch to pgs", whoami, osdmap->get_epoch());
-        return pg_shard_manager.broadcast_map_to_pgs(osdmap->get_epoch());
-      });
-    });
-  }).then([FNAME, m, this] {
-    if (pg_shard_manager.is_active()) {
-      INFO("osd.{}: now active", whoami);
-      if (!osdmap->exists(whoami) ||
-         osdmap->is_stop(whoami)) {
-        return shutdown();
-      }
-      if (should_restart()) {
-        return restart();
-      } else if (!pg_shard_manager.is_stopping()) {
-        /* 
-         * TODO: Missing start_waiting_for_healthy() counterpart.
-         * Only subscribe to the next map until implemented.
-         * See https://tracker.ceph.com/issues/66832 
-        */
-       return get_shard_services().osdmap_subscribe(osdmap->get_epoch() + 1, false);
-      } else {
-        return seastar::now();
-      }
-    } else if (pg_shard_manager.is_preboot()) {
-      INFO("osd.{}: now preboot", whoami);
+  }
 
-      if (m->get_source().is_mon()) {
-        return _preboot(
-          m->cluster_osdmap_trim_lower_bound, m->newest_map);
-      } else {
-        INFO("osd.{}: start_boot", whoami);
-        return start_boot();
-      }
+  if (osdmap->is_up(whoami)) {
+    const auto up_from = osdmap->get_up_from(whoami);
+    INFO("osd.{}: map e {} marked me up: up_from {}, bind_epoch {}, state {}",
+         whoami, osdmap->get_epoch(), up_from, bind_epoch,
+         pg_shard_manager.get_osd_state_string());
+    if (bind_epoch < up_from &&
+        osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() &&
+        pg_shard_manager.is_booting()) {
+      INFO("osd.{}: activating...", whoami);
+      co_await pg_shard_manager.set_active();
+      beacon_timer.arm_periodic(
+        std::chrono::seconds(local_conf()->osd_beacon_report_interval));
+      // timer continuation rearms when complete
+      tick_timer.arm(std::chrono::seconds(TICK_INTERVAL));
+    }
+    co_await update_heartbeat_peers();
+    co_await check_osdmap_features();
+    // yay!
+    INFO("osd.{}: committed_osd_maps: broadcasting osdmaps up"
+         " to {} epoch to pgs", whoami, osdmap->get_epoch());
+    co_await pg_shard_manager.broadcast_map_to_pgs(osdmap->get_epoch());
+  } else {
+    if (pg_shard_manager.is_prestop()) {
+      got_stop_ack();
+    }
+  }
+
+  if (pg_shard_manager.is_active()) {
+    INFO("osd.{}: now active", whoami);
+    if (!osdmap->exists(whoami) || osdmap->is_stop(whoami)) {
+      co_await shutdown();
+    } else if (should_restart()) {
+      co_await restart();
+    } else if (!pg_shard_manager.is_stopping()) {
+      /*
+       * TODO: Missing start_waiting_for_healthy() counterpart.
+       * Only subscribe to the next map until implemented.
+       * See https://tracker.ceph.com/issues/66832
+      */
+      co_await get_shard_services().osdmap_subscribe(osdmap->get_epoch() + 1, false);
+    }
+  } else if (pg_shard_manager.is_preboot()) {
+    INFO("osd.{}: now preboot", whoami);
+    if (m->get_source().is_mon()) {
+      co_await _preboot(m->cluster_osdmap_trim_lower_bound, m->newest_map);
     } else {
-      INFO("osd.{}: now {}", whoami,
-          pg_shard_manager.get_osd_state_string());
-      // XXX
-      return seastar::now();
+      INFO("osd.{}: start_boot", whoami);
+      co_await start_boot();
     }
-  });
+  }
+  INFO("osd.{}: now {}", whoami, pg_shard_manager.get_osd_state_string());
 }
 
 seastar::future<> OSD::handle_osd_op(