std::chrono::seconds(TICK_INTERVAL));
}
}
-
+ // yay!
+ consume_map(osdmap->get_epoch());
if (state.is_active()) {
logger().info("osd.{}: now active", whoami);
if (!osdmap->exists(whoami)) {
return do_peering_event(m->get_spg(), std::move(evt));
}
+
+void OSD::consume_map(epoch_t epoch)
+{
+ // todo: m-to-n: broadcast this news to all shards
+ auto first = waiting_peering.lower_bound(epoch);
+ auto last = waiting_peering.end();
+ std::for_each(first, last,
+ [epoch, this](auto& blocked_requests) {
+ blocked_requests.second.set_value(epoch);
+ });
+ waiting_peering.erase(first, last);
+}
+
seastar::future<>
OSD::do_peering_event(spg_t pgid,
std::unique_ptr<PGPeeringEvent> evt)
{
if (auto pg = pgs.find(pgid); pg != pgs.end()) {
- return advance_pg_to(pg->second, osdmap->get_epoch()).then(
- [pg, evt=std::move(evt)]() mutable {
+ return wait_for_map(evt->get_epoch_sent()).then(
+ [pg=pg->second, this](epoch_t epoch) {
+ return advance_pg_to(pg, epoch);
+ }).then([pg, evt=std::move(evt)]() mutable {
return pg->second->do_peering_event(std::move(evt));
}).then([pg=pg->second, this] {
return _send_alive(pg->get_need_up_thru());
}
}
+seastar::future<epoch_t> OSD::wait_for_map(epoch_t epoch)
+{
+ const auto mine = osdmap->get_epoch();
+ if (mine >= epoch) {
+ return seastar::make_ready_future<epoch_t>(mine);
+ } else {
+ logger().info("evt epoch is {}, i have {}, will wait", epoch, mine);
+ return waiting_peering[epoch].get_shared_future();
+ }
+}
+
seastar::future<> OSD::advance_pg_to(Ref<PG> pg, epoch_t to)
{
auto from = pg->get_osdmap_epoch();
#include <seastar/core/future.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/shared_ptr.hh>
+#include <seastar/core/shared_future.hh>
#include <seastar/core/timer.hh>
#include "crimson/common/simple_lru.h"
version_t last,
Ref<MOSDMap> m);
+ // order the promises in descending order of the waited osdmap epoch,
+ // so we can access all the waiters expecting a map whose epoch is less
+ // than a given epoch
+ using waiting_peering_t = std::map<epoch_t, seastar::shared_promise<epoch_t>,
+ std::greater<epoch_t>>;
+ waiting_peering_t waiting_peering;
+ // wait for an osdmap whose epoch is greater or equal to given epoch
+ seastar::future<epoch_t> wait_for_map(epoch_t epoch);
+ void consume_map(epoch_t epoch);
seastar::future<> do_peering_event(spg_t pgid,
std::unique_ptr<PGPeeringEvent> evt);
seastar::future<> advance_pg_to(Ref<PG> pg, epoch_t to);