}
}
// yay!
- consume_map(osdmap->get_epoch());
+ return consume_map(osdmap->get_epoch());
+ }).then([m, this] {
if (state.is_active()) {
logger().info("osd.{}: now active", whoami);
if (!osdmap->exists(whoami)) {
return do_peering_event(m->get_spg(), std::move(evt));
}
-
-void OSD::consume_map(epoch_t epoch)
+seastar::future<> OSD::consume_map(epoch_t epoch)
{
// todo: m-to-n: broadcast this news to all shards
- auto first = waiting_peering.lower_bound(epoch);
- auto last = waiting_peering.end();
- std::for_each(first, last,
- [epoch, this](auto& blocked_requests) {
- blocked_requests.second.set_value(epoch);
- });
- waiting_peering.erase(first, last);
+ return seastar::parallel_for_each(pgs.begin(), pgs.end(), [=](auto& pg) {
+ return advance_pg_to(pg.second, epoch);
+ }).then([epoch, this] {
+ auto first = waiting_peering.lower_bound(epoch);
+ auto last = waiting_peering.end();
+ std::for_each(first, last, [epoch, this](auto& blocked_requests) {
+ blocked_requests.second.set_value(epoch);
+ });
+ waiting_peering.erase(first, last);
+ return seastar::now();
+ });
}
seastar::future<>
return _send_alive(pg->get_need_up_thru());
});
} else {
+ logger().warn("pg not found: {}", pgid);
// todo: handle_pg_query_nopg()
return seastar::now();
}
waiting_peering_t waiting_peering;
// wait for an osdmap whose epoch is greater or equal to given epoch
seastar::future<epoch_t> wait_for_map(epoch_t epoch);
- void consume_map(epoch_t epoch);
+ seastar::future<> consume_map(epoch_t epoch);
seastar::future<> do_peering_event(spg_t pgid,
std::unique_ptr<PGPeeringEvent> evt);
seastar::future<> advance_pg_to(Ref<PG> pg, epoch_t to);