#include <fmt/os.h>
#include <fmt/ostream.h>
#include <seastar/core/timer.hh>
+#include <seastar/coroutine/parallel_for_each.hh>
#include "common/pick_address.h"
#include "include/util.h"
INFO("osd.{} ({}, {})", whoami, first, last);
// advance through the new maps
auto old_map = osdmap;
- return seastar::do_for_each(boost::make_counting_iterator(first),
- boost::make_counting_iterator(last + 1),
- [this, old_map, FNAME](epoch_t cur) {
- return pg_shard_manager.get_local_map(
- cur
- ).then([this, old_map, FNAME](OSDMapService::local_cached_map_t&& o) {
- osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(o));
- std::set<int> old_osds;
- old_map->get_all_osds(old_osds);
- return seastar::parallel_for_each(
- old_osds,
- [this, FNAME, old_map](auto &osd_id) {
- DEBUG("osd.{}: whoami ? {}, old up ? {} , now down ? {}",
- osd_id, osd_id != whoami,
- old_map->is_up(osd_id), osdmap->is_down(osd_id));
- if (osd_id != whoami &&
- old_map->is_up(osd_id) &&
- osdmap->is_down(osd_id)) {
- DEBUG("osd.{}: mark osd.{} down", whoami, osd_id);
- return cluster_msgr->mark_down(
- osdmap->get_cluster_addrs(osd_id).front());
- }
- return seastar::now();
- }).then([this, o=std::move(o)]() mutable {
- return pg_shard_manager.update_map(std::move(o));
- });
- }).then([this] {
- if (get_shard_services().get_up_epoch() == 0 &&
- osdmap->is_up(whoami) &&
- osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) {
- return pg_shard_manager.set_up_epoch(
- osdmap->get_epoch()
- ).then([this] {
- if (!boot_epoch) {
- boot_epoch = osdmap->get_epoch();
- }
- });
- } else {
- return seastar::now();
+ for (epoch_t cur = first; cur <= last; cur++) {
+ OSDMapService::local_cached_map_t&& o = co_await pg_shard_manager.get_local_map(cur);
+ osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(o));
+ std::set<int> old_osds;
+ old_map->get_all_osds(old_osds);
+ co_await seastar::coroutine::parallel_for_each(old_osds,
+ [this, FNAME, old_map](auto &osd_id) -> seastar::future<> {
+ DEBUG("osd.{}: whoami ? {}, old up ? {} , now down ? {}",
+ osd_id, osd_id != whoami,
+ old_map->is_up(osd_id), osdmap->is_down(osd_id));
+ if (osd_id != whoami &&
+ old_map->is_up(osd_id) &&
+ osdmap->is_down(osd_id)) {
+ DEBUG("osd.{}: mark osd.{} down", whoami, osd_id);
+ co_await cluster_msgr->mark_down(osdmap->get_cluster_addrs(osd_id).front());
}
});
- }).then([FNAME, m, this] {
- auto fut = seastar::now();
- if (osdmap->is_up(whoami)) {
- const auto up_from = osdmap->get_up_from(whoami);
- INFO("osd.{}: map e {} marked me up: up_from {}, bind_epoch {}, state {}",
- whoami, osdmap->get_epoch(), up_from, bind_epoch,
- pg_shard_manager.get_osd_state_string());
- if (bind_epoch < up_from &&
- osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() &&
- pg_shard_manager.is_booting()) {
- INFO("osd.{}: activating...", whoami);
- fut = pg_shard_manager.set_active().then([this] {
- beacon_timer.arm_periodic(
- std::chrono::seconds(local_conf()->osd_beacon_report_interval));
- // timer continuation rearms when complete
- tick_timer.arm(
- std::chrono::seconds(TICK_INTERVAL));
- });
- }
- } else {
- if (pg_shard_manager.is_prestop()) {
- got_stop_ack();
- return seastar::now();
+
+ co_await pg_shard_manager.update_map(std::move(o));
+ if (get_shard_services().get_up_epoch() == 0 &&
+ osdmap->is_up(whoami) &&
+ osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) {
+ co_await pg_shard_manager.set_up_epoch(osdmap->get_epoch());
+ if (!boot_epoch) {
+ boot_epoch = osdmap->get_epoch();
}
}
- return fut.then([this] {
- return update_heartbeat_peers();
- }).then([FNAME, this] {
- return check_osdmap_features().then([FNAME, this] {
- // yay!
- INFO("osd.{}: committed_osd_maps: broadcasting osdmaps up"
- " to {} epoch to pgs", whoami, osdmap->get_epoch());
- return pg_shard_manager.broadcast_map_to_pgs(osdmap->get_epoch());
- });
- });
- }).then([FNAME, m, this] {
- if (pg_shard_manager.is_active()) {
- INFO("osd.{}: now active", whoami);
- if (!osdmap->exists(whoami) ||
- osdmap->is_stop(whoami)) {
- return shutdown();
- }
- if (should_restart()) {
- return restart();
- } else if (!pg_shard_manager.is_stopping()) {
- /*
- * TODO: Missing start_waiting_for_healthy() counterpart.
- * Only subscribe to the next map until implemented.
- * See https://tracker.ceph.com/issues/66832
- */
- return get_shard_services().osdmap_subscribe(osdmap->get_epoch() + 1, false);
- } else {
- return seastar::now();
- }
- } else if (pg_shard_manager.is_preboot()) {
- INFO("osd.{}: now preboot", whoami);
+ }
- if (m->get_source().is_mon()) {
- return _preboot(
- m->cluster_osdmap_trim_lower_bound, m->newest_map);
- } else {
- INFO("osd.{}: start_boot", whoami);
- return start_boot();
- }
+ if (osdmap->is_up(whoami)) {
+ const auto up_from = osdmap->get_up_from(whoami);
+ INFO("osd.{}: map e {} marked me up: up_from {}, bind_epoch {}, state {}",
+ whoami, osdmap->get_epoch(), up_from, bind_epoch,
+ pg_shard_manager.get_osd_state_string());
+ if (bind_epoch < up_from &&
+ osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() &&
+ pg_shard_manager.is_booting()) {
+ INFO("osd.{}: activating...", whoami);
+ co_await pg_shard_manager.set_active();
+ beacon_timer.arm_periodic(
+ std::chrono::seconds(local_conf()->osd_beacon_report_interval));
+ // timer continuation rearms when complete
+ tick_timer.arm(std::chrono::seconds(TICK_INTERVAL));
+ }
+ co_await update_heartbeat_peers();
+ co_await check_osdmap_features();
+ // yay!
+ INFO("osd.{}: committed_osd_maps: broadcasting osdmaps up"
+ " to {} epoch to pgs", whoami, osdmap->get_epoch());
+ co_await pg_shard_manager.broadcast_map_to_pgs(osdmap->get_epoch());
+ } else {
+ if (pg_shard_manager.is_prestop()) {
+ got_stop_ack();
+ }
+ }
+
+ if (pg_shard_manager.is_active()) {
+ INFO("osd.{}: now active", whoami);
+ if (!osdmap->exists(whoami) || osdmap->is_stop(whoami)) {
+ co_await shutdown();
+ } else if (should_restart()) {
+ co_await restart();
+ } else if (!pg_shard_manager.is_stopping()) {
+ /*
+ * TODO: Missing start_waiting_for_healthy() counterpart.
+ * Only subscribe to the next map until implemented.
+ * See https://tracker.ceph.com/issues/66832
+ */
+ co_await get_shard_services().osdmap_subscribe(osdmap->get_epoch() + 1, false);
+ }
+ } else if (pg_shard_manager.is_preboot()) {
+ INFO("osd.{}: now preboot", whoami);
+ if (m->get_source().is_mon()) {
+ co_await _preboot(m->cluster_osdmap_trim_lower_bound, m->newest_map);
} else {
- INFO("osd.{}: now {}", whoami,
- pg_shard_manager.get_osd_state_string());
- // XXX
- return seastar::now();
+ INFO("osd.{}: start_boot", whoami);
+ co_await start_boot();
}
- });
+ }
+ INFO("osd.{}: now {}", whoami, pg_shard_manager.get_osd_state_string());
}
seastar::future<> OSD::handle_osd_op(