return with_blocking_future(
handle.enter(pp(pg).wait_for_active));
}).then([this, &pg]() mutable {
- return pg.wait_for_active();
+ return with_blocking_future(pg.wait_for_active_blocker.wait());
}).then([this, &pg]() mutable {
if (m->finish_decode()) {
m->clear_payload();
osdmap->get_pool_name(pgid.pool())),
osdmap,
this,
- this)
+ this),
+ wait_for_active_blocker(this)
{
peering_state.set_backend_predicates(
new ReadablePredicate(pg_whoami),
void PG::on_activate_complete()
{
- active_promise.set_value();
- active_promise = {};
+ wait_for_active_blocker.on_active();
if (peering_state.needs_recovery()) {
shard_services.start_operation<LocalPeeringEvent>(
return os;
}
-seastar::future<> PG::wait_for_active()
+void PG::WaitForActiveBlocker::dump_detail(Formatter *f) const
{
- logger().debug("wait_for_active: {}", peering_state.get_pg_state_string());
- if (peering_state.is_active()) {
- return seastar::now();
+ f->dump_stream("pgid") << pg->pgid;
+}
+
+void PG::WaitForActiveBlocker::on_active()
+{
+ p.set_value();
+ p = {};
+}
+
+blocking_future<> PG::WaitForActiveBlocker::wait()
+{
+ if (pg->peering_state.is_active()) {
+ return make_blocking_future(seastar::now());
} else {
- return active_promise.get_shared_future();
+ return make_blocking_future(p.get_shared_future());
}
}
PeeringState peering_state;
eversion_t projected_last_update;
- seastar::shared_promise<> active_promise;
- seastar::future<> wait_for_active();
+ class WaitForActiveBlocker : public BlockerT<WaitForActiveBlocker> {
+ PG *pg;
+
+ const spg_t pgid;
+ seastar::shared_promise<> p;
+
+ protected:
+ void dump_detail(Formatter *f) const;
+
+ public:
+ static constexpr const char *type_name = "WaitForActiveBlocker";
+
+ WaitForActiveBlocker(PG *pg) : pg(pg) {}
+ void on_active();
+ blocking_future<> wait();
+ } wait_for_active_blocker;
friend std::ostream& operator<<(std::ostream&, const PG& pg);
friend class ClientRequest;