From: Matan Breizman Date: Wed, 28 May 2025 10:48:38 +0000 (+0000) Subject: crimson/osd: committed_osd_maps into coroutines X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=7a34ea4e2f96ba2c1f4213ff9cadc406fa5e1aed;p=ceph.git crimson/osd: committed_osd_maps into coroutines Signed-off-by: Matan Breizman --- diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index b67070ec620a6..8a3ec2640c09c 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -11,6 +11,7 @@ #include #include #include +#include #include "common/pick_address.h" #include "include/util.h" @@ -1236,118 +1237,86 @@ seastar::future<> OSD::committed_osd_maps( INFO("osd.{} ({}, {})", whoami, first, last); // advance through the new maps auto old_map = osdmap; - return seastar::do_for_each(boost::make_counting_iterator(first), - boost::make_counting_iterator(last + 1), - [this, old_map, FNAME](epoch_t cur) { - return pg_shard_manager.get_local_map( - cur - ).then([this, old_map, FNAME](OSDMapService::local_cached_map_t&& o) { - osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(o)); - std::set old_osds; - old_map->get_all_osds(old_osds); - return seastar::parallel_for_each( - old_osds, - [this, FNAME, old_map](auto &osd_id) { - DEBUG("osd.{}: whoami ? {}, old up ? {} , now down ? {}", - osd_id, osd_id != whoami, - old_map->is_up(osd_id), osdmap->is_down(osd_id)); - if (osd_id != whoami && - old_map->is_up(osd_id) && - osdmap->is_down(osd_id)) { - DEBUG("osd.{}: mark osd.{} down", whoami, osd_id); - return cluster_msgr->mark_down( - osdmap->get_cluster_addrs(osd_id).front()); - } - return seastar::now(); - }).then([this, o=std::move(o)]() mutable { - return pg_shard_manager.update_map(std::move(o)); - }); - }).then([this] { - if (get_shard_services().get_up_epoch() == 0 && - osdmap->is_up(whoami) && - osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) { - return pg_shard_manager.set_up_epoch( - osdmap->get_epoch() - ).then([this] { - if (!boot_epoch) { - boot_epoch = osdmap->get_epoch(); - } - }); - } else { - return seastar::now(); + for (epoch_t cur = first; cur <= last; cur++) { + OSDMapService::local_cached_map_t&& o = co_await pg_shard_manager.get_local_map(cur); + osdmap = make_local_shared_foreign(OSDMapService::local_cached_map_t(o)); + std::set old_osds; + old_map->get_all_osds(old_osds); + co_await seastar::coroutine::parallel_for_each(old_osds, + [this, FNAME, old_map](auto &osd_id) -> seastar::future<> { + DEBUG("osd.{}: whoami ? {}, old up ? {} , now down ? {}", + osd_id, osd_id != whoami, + old_map->is_up(osd_id), osdmap->is_down(osd_id)); + if (osd_id != whoami && + old_map->is_up(osd_id) && + osdmap->is_down(osd_id)) { + DEBUG("osd.{}: mark osd.{} down", whoami, osd_id); + co_await cluster_msgr->mark_down(osdmap->get_cluster_addrs(osd_id).front()); } }); - }).then([FNAME, m, this] { - auto fut = seastar::now(); - if (osdmap->is_up(whoami)) { - const auto up_from = osdmap->get_up_from(whoami); - INFO("osd.{}: map e {} marked me up: up_from {}, bind_epoch {}, state {}", - whoami, osdmap->get_epoch(), up_from, bind_epoch, - pg_shard_manager.get_osd_state_string()); - if (bind_epoch < up_from && - osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() && - pg_shard_manager.is_booting()) { - INFO("osd.{}: activating...", whoami); - fut = pg_shard_manager.set_active().then([this] { - beacon_timer.arm_periodic( - std::chrono::seconds(local_conf()->osd_beacon_report_interval)); - // timer continuation rearms when complete - tick_timer.arm( - std::chrono::seconds(TICK_INTERVAL)); - }); - } - } else { - if (pg_shard_manager.is_prestop()) { - got_stop_ack(); - return seastar::now(); + + co_await pg_shard_manager.update_map(std::move(o)); + if (get_shard_services().get_up_epoch() == 0 && + osdmap->is_up(whoami) && + osdmap->get_addrs(whoami) == public_msgr->get_myaddrs()) { + co_await pg_shard_manager.set_up_epoch(osdmap->get_epoch()); + if (!boot_epoch) { + boot_epoch = osdmap->get_epoch(); } } - return fut.then([this] { - return update_heartbeat_peers(); - }).then([FNAME, this] { - return check_osdmap_features().then([FNAME, this] { - // yay! - INFO("osd.{}: committed_osd_maps: broadcasting osdmaps up" - " to {} epoch to pgs", whoami, osdmap->get_epoch()); - return pg_shard_manager.broadcast_map_to_pgs(osdmap->get_epoch()); - }); - }); - }).then([FNAME, m, this] { - if (pg_shard_manager.is_active()) { - INFO("osd.{}: now active", whoami); - if (!osdmap->exists(whoami) || - osdmap->is_stop(whoami)) { - return shutdown(); - } - if (should_restart()) { - return restart(); - } else if (!pg_shard_manager.is_stopping()) { - /* - * TODO: Missing start_waiting_for_healthy() counterpart. - * Only subscribe to the next map until implemented. - * See https://tracker.ceph.com/issues/66832 - */ - return get_shard_services().osdmap_subscribe(osdmap->get_epoch() + 1, false); - } else { - return seastar::now(); - } - } else if (pg_shard_manager.is_preboot()) { - INFO("osd.{}: now preboot", whoami); + } - if (m->get_source().is_mon()) { - return _preboot( - m->cluster_osdmap_trim_lower_bound, m->newest_map); - } else { - INFO("osd.{}: start_boot", whoami); - return start_boot(); - } + if (osdmap->is_up(whoami)) { + const auto up_from = osdmap->get_up_from(whoami); + INFO("osd.{}: map e {} marked me up: up_from {}, bind_epoch {}, state {}", + whoami, osdmap->get_epoch(), up_from, bind_epoch, + pg_shard_manager.get_osd_state_string()); + if (bind_epoch < up_from && + osdmap->get_addrs(whoami) == public_msgr->get_myaddrs() && + pg_shard_manager.is_booting()) { + INFO("osd.{}: activating...", whoami); + co_await pg_shard_manager.set_active(); + beacon_timer.arm_periodic( + std::chrono::seconds(local_conf()->osd_beacon_report_interval)); + // timer continuation rearms when complete + tick_timer.arm(std::chrono::seconds(TICK_INTERVAL)); + } + co_await update_heartbeat_peers(); + co_await check_osdmap_features(); + // yay! + INFO("osd.{}: committed_osd_maps: broadcasting osdmaps up" + " to {} epoch to pgs", whoami, osdmap->get_epoch()); + co_await pg_shard_manager.broadcast_map_to_pgs(osdmap->get_epoch()); + } else { + if (pg_shard_manager.is_prestop()) { + got_stop_ack(); + } + } + + if (pg_shard_manager.is_active()) { + INFO("osd.{}: now active", whoami); + if (!osdmap->exists(whoami) || osdmap->is_stop(whoami)) { + co_await shutdown(); + } else if (should_restart()) { + co_await restart(); + } else if (!pg_shard_manager.is_stopping()) { + /* + * TODO: Missing start_waiting_for_healthy() counterpart. + * Only subscribe to the next map until implemented. + * See https://tracker.ceph.com/issues/66832 + */ + co_await get_shard_services().osdmap_subscribe(osdmap->get_epoch() + 1, false); + } + } else if (pg_shard_manager.is_preboot()) { + INFO("osd.{}: now preboot", whoami); + if (m->get_source().is_mon()) { + co_await _preboot(m->cluster_osdmap_trim_lower_bound, m->newest_map); } else { - INFO("osd.{}: now {}", whoami, - pg_shard_manager.get_osd_state_string()); - // XXX - return seastar::now(); + INFO("osd.{}: start_boot", whoami); + co_await start_boot(); } - }); + } + INFO("osd.{}: now {}", whoami, pg_shard_manager.get_osd_state_string()); } seastar::future<> OSD::handle_osd_op(