}
template <class T>
-seastar::future<> PeeringEvent<T>::start()
+seastar::future<> PeeringEvent<T>::with_pg(
+ ShardServices &shard_services, Ref<PG> pg)
{
- logger().debug("{}: start", *this);
-
- typename T::IRef ref = static_cast<T*>(this);
- auto maybe_delay = seastar::now();
- if (delay) {
- maybe_delay = seastar::sleep(
- std::chrono::milliseconds(std::lround(delay * 1000)));
+ if (!pg) {
+ logger().warn("{}: pg absent, did not create", *this);
+ on_pg_absent();
+ handle.exit();
+ return complete_rctx_no_pg();
}
- return maybe_delay.then([this] {
- return get_pg();
- }).then([this](Ref<PG> pg) {
- if (!pg) {
- logger().warn("{}: pg absent, did not create", *this);
- on_pg_absent();
- handle.exit();
- return complete_rctx_no_pg();
- }
- using interruptor = typename T::interruptor;
- return interruptor::with_interruption([this, pg] {
- logger().debug("{}: pg present", *this);
- return this->template enter_stage<interruptor>(
- pp(*pg).await_map
- ).then_interruptible([this, pg] {
- return this->template with_blocking_event<PG_OSDMapGate::OSDMapBlocker::BlockingEvent>(
- [this, pg] (auto&& trigger) {
- return pg->osdmap_gate.wait_for_map(std::move(trigger),
- evt.get_epoch_sent());
+
+ using interruptor = typename T::interruptor;
+ return interruptor::with_interruption([this, pg, &shard_services] {
+ logger().debug("{}: pg present", *this);
+ return this->template enter_stage<interruptor>(pp(*pg).await_map
+ ).then_interruptible([this, pg] {
+ return this->template with_blocking_event<
+ PG_OSDMapGate::OSDMapBlocker::BlockingEvent
+ >([this, pg](auto &&trigger) {
+ return pg->osdmap_gate.wait_for_map(
+ std::move(trigger), evt.get_epoch_sent());
});
- }).then_interruptible([this, pg](auto) {
- return this->template enter_stage<interruptor>(pp(*pg).process);
- }).then_interruptible([this, pg] {
- // TODO: likely we should synchronize also with the pg log-based
- // recovery.
- return this->template enter_stage<interruptor>(BackfillRecovery::bp(*pg).process);
- }).then_interruptible([this, pg] {
- pg->do_peering_event(evt, ctx);
- handle.exit();
- return complete_rctx(pg);
- }).then_interruptible([this, pg] () -> typename T::template interruptible_future<> {
+ }).then_interruptible([this, pg](auto) {
+ return this->template enter_stage<interruptor>(pp(*pg).process);
+ }).then_interruptible([this, pg] {
+ // TODO: likely we should synchronize also with the pg log-based
+ // recovery.
+ return this->template enter_stage<interruptor>(
+ BackfillRecovery::bp(*pg).process);
+ }).then_interruptible([this, pg] {
+ pg->do_peering_event(evt, ctx);
+ handle.exit();
+ return complete_rctx(pg);
+ }).then_interruptible([pg, &shard_services]()
+ -> typename T::template interruptible_future<> {
if (!pg->get_need_up_thru()) {
return seastar::now();
}
return shard_services.send_alive(pg->get_same_interval_since());
- }).then_interruptible([this] {
+ }).then_interruptible([&shard_services] {
return shard_services.send_pg_temp();
});
- },
- [this](std::exception_ptr ep) {
- logger().debug("{}: interrupted with {}", *this, ep);
- return seastar::now();
- },
- pg);
- }).finally([ref=std::move(ref)] {
- logger().debug("{}: complete", *ref);
- });
+ }, [this](std::exception_ptr ep) {
+ logger().debug("{}: interrupted with {}", *this, ep);
+ }, pg);
}
template <class T>
std::move(ctx));
}
-ConnectionPipeline &RemotePeeringEvent::cp()
+ConnectionPipeline &RemotePeeringEvent::get_connection_pipeline()
{
return get_osd_priv(conn.get()).peering_request_conn_pipeline;
}
-RemotePeeringEvent::OSDPipeline &RemotePeeringEvent::op()
-{
- return osd.peering_request_osd_pipeline;
-}
-
void RemotePeeringEvent::on_pg_absent()
{
if (auto& e = get_event().get_event();
if (pg) {
return PeeringEvent::complete_rctx(pg);
} else {
- logger().debug("{}: OSDState is {}", *this, osd.state);
- return osd.state.when_active().then([this] {
- assert(osd.state.is_active());
- return shard_services.dispatch_context_messages(std::move(ctx));
- });
+ return shard_services.dispatch_context_messages(std::move(ctx));
}
}
seastar::future<> RemotePeeringEvent::complete_rctx_no_pg()
{
- logger().debug("{}: OSDState is {}", *this, osd.state);
- return osd.state.when_active().then([this] {
- assert(osd.state.is_active());
- return shard_services.dispatch_context_messages(std::move(ctx));
- });
+ return shard_services.dispatch_context_messages(std::move(ctx));
}
-seastar::future<Ref<PG>> RemotePeeringEvent::get_pg()
+seastar::future<> LocalPeeringEvent::start()
{
- return enter_stage<>(op().await_active).then([this] {
- return osd.state.when_active();
- }).then([this] {
- return enter_stage<>(cp().await_map);
- }).then([this] {
- using OSDMapBlockingEvent =
- OSD_OSDMapGate::OSDMapBlocker::BlockingEvent;
- return with_blocking_event<OSDMapBlockingEvent>(
- [this] (auto&& trigger) {
- return osd.osdmap_gate.wait_for_map(std::move(trigger),
- evt.get_epoch_sent());
- });
- }).then([this](auto epoch) {
- logger().debug("{}: got map {}", *this, epoch);
- return enter_stage<>(cp().get_pg);
- }).then([this] {
- return with_blocking_event<PGMap::PGCreationBlockingEvent>(
- [this] (auto&& trigger) {
- return osd.get_or_create_pg(std::move(trigger),
- pgid,
- evt.get_epoch_sent(),
- std::move(evt.create_info));
- });
+ logger().debug("{}: start", *this);
+
+ IRef ref = this;
+ auto maybe_delay = seastar::now();
+ if (delay) {
+ maybe_delay = seastar::sleep(
+ std::chrono::milliseconds(std::lround(delay * 1000)));
+ }
+ return maybe_delay.then([this] {
+ return with_pg(shard_services, pg);
+ }).finally([ref=std::move(ref)] {
+ logger().debug("{}: complete", *ref);
});
}
-seastar::future<Ref<PG>> LocalPeeringEvent::get_pg() {
- return seastar::make_ready_future<Ref<PG>>(pg);
-}
LocalPeeringEvent::~LocalPeeringEvent() {}
complete_rctx(Ref<PG>);
virtual seastar::future<> complete_rctx_no_pg() { return seastar::now();}
- virtual seastar::future<Ref<PG>> get_pg() = 0;
public:
template <typename... Args>
void print(std::ostream &) const final;
void dump_detail(ceph::Formatter* f) const final;
- seastar::future<> start();
+ seastar::future<> with_pg(
+ ShardServices &shard_services, Ref<PG> pg);
};
class RemotePeeringEvent : public PeeringEvent<RemotePeeringEvent> {
protected:
- OSD &osd;
crimson::net::ConnectionRef conn;
void on_pg_absent() final;
PeeringEvent::interruptible_future<> complete_rctx(Ref<PG> pg) override;
seastar::future<> complete_rctx_no_pg() override;
- seastar::future<Ref<PG>> get_pg() final;
public:
class OSDPipeline {
};
template <typename... Args>
- RemotePeeringEvent(OSD &osd, crimson::net::ConnectionRef conn, Args&&... args) :
+ RemotePeeringEvent(crimson::net::ConnectionRef conn, Args&&... args) :
PeeringEvent(std::forward<Args>(args)...),
- osd(osd),
conn(conn)
{}
#endif
CompletionEvent
> tracking_events;
-private:
- ConnectionPipeline &cp();
- OSDPipeline &op();
+
+ static constexpr bool can_create() { return true; }
+ auto get_create_info() { return std::move(evt.create_info); }
+ spg_t get_pgid() const {
+ return pgid;
+ }
+ ConnectionPipeline &get_connection_pipeline();
+ PipelineHandle &get_handle() { return handle; }
+ epoch_t get_epoch() const { return evt.get_epoch_sent(); }
};
class LocalPeeringEvent final : public PeeringEvent<LocalPeeringEvent> {
protected:
- seastar::future<Ref<PG>> get_pg() final;
-
Ref<PG> pg;
public:
pg(pg)
{}
+ seastar::future<> start();
virtual ~LocalPeeringEvent();
std::tuple<