We'll want to supply this as part of with_pg_operation etc.
Signed-off-by: Samuel Just <sjust@redhat.com>
PeeringState::RequestScrub scrub_request{m->deep, m->repair};
return start_pg_operation<RemotePeeringEvent>(
conn,
- shard_services,
from_shard,
pgid,
PGPeeringEvent{m->epoch, m->epoch, scrub_request}).second;
std::unique_ptr<PGPeeringEvent> evt(m->get_event());
(void) start_pg_operation<RemotePeeringEvent>(
conn,
- shard_services,
pg_shard_t{from, m->get_spg().shard},
m->get_spg(),
std::move(*evt));
RemotePeeringEvent(std::forward<Args>(args)...), state(state) {}
PeeringEvent::interruptible_future<>
- complete_rctx(Ref<crimson::osd::PG> pg) final {
+ complete_rctx(
+ ShardServices &shard_services,
+ Ref<crimson::osd::PG> pg) final {
logger().debug("{}: submitting ctx transaction", *this);
state->ctx.accept_buffered_messages(ctx);
state = {};
auto op = osd.start_pg_operation<PeeringSubEvent>(
state,
conn,
- osd.get_shard_services(),
pg_shard_t(),
pgid,
m->epoch,
{
if (!pg) {
logger().warn("{}: pg absent, did not create", *this);
- on_pg_absent();
+ on_pg_absent(shard_services);
that()->get_handle().exit();
- return complete_rctx_no_pg();
+ return complete_rctx_no_pg(shard_services);
}
using interruptor = typename T::interruptor;
// recovery.
return this->template enter_stage<interruptor>(
BackfillRecovery::bp(*pg).process);
- }).then_interruptible([this, pg] {
+ }).then_interruptible([this, pg, &shard_services] {
pg->do_peering_event(evt, ctx);
that()->get_handle().exit();
- return complete_rctx(pg);
+ return complete_rctx(shard_services, pg);
}).then_interruptible([pg, &shard_services]()
-> typename T::template interruptible_future<> {
if (!pg->get_need_up_thru()) {
}
template <class T>
-void PeeringEvent<T>::on_pg_absent()
+void PeeringEvent<T>::on_pg_absent(ShardServices &)
{
logger().debug("{}: pg absent, dropping", *this);
}
template <class T>
typename PeeringEvent<T>::template interruptible_future<>
-PeeringEvent<T>::complete_rctx(Ref<PG> pg)
+PeeringEvent<T>::complete_rctx(ShardServices &shard_services, Ref<PG> pg)
{
logger().debug("{}: submitting ctx", *this);
return shard_services.dispatch_context(
return get_osd_priv(conn.get()).peering_request_conn_pipeline;
}
-void RemotePeeringEvent::on_pg_absent()
+void RemotePeeringEvent::on_pg_absent(ShardServices &shard_services)
{
if (auto& e = get_event().get_event();
e.dynamic_type() == MQuery::static_type()) {
}
}
-RemotePeeringEvent::interruptible_future<> RemotePeeringEvent::complete_rctx(Ref<PG> pg)
+RemotePeeringEvent::interruptible_future<> RemotePeeringEvent::complete_rctx(
+ ShardServices &shard_services,
+ Ref<PG> pg)
{
if (pg) {
- return PeeringEvent::complete_rctx(pg);
+ return PeeringEvent::complete_rctx(shard_services, pg);
} else {
return shard_services.dispatch_context_messages(std::move(ctx));
}
}
-seastar::future<> RemotePeeringEvent::complete_rctx_no_pg()
+seastar::future<> RemotePeeringEvent::complete_rctx_no_pg(
+ ShardServices &shard_services)
{
return shard_services.dispatch_context_messages(std::move(ctx));
}
std::chrono::milliseconds(std::lround(delay * 1000)));
}
return maybe_delay.then([this] {
- return with_pg(shard_services, pg);
+ return with_pg(pg->get_shard_services(), pg);
}).finally([ref=std::move(ref)] {
logger().debug("{}: complete", *ref);
});
protected:
PGPeeringPipeline &pp(PG &pg);
- ShardServices &shard_services;
PeeringCtx ctx;
pg_shard_t from;
spg_t pgid;
return evt;
}
- virtual void on_pg_absent();
+ virtual void on_pg_absent(ShardServices &);
virtual typename PeeringEvent::template interruptible_future<>
- complete_rctx(Ref<PG>);
+ complete_rctx(ShardServices &, Ref<PG>);
- virtual seastar::future<> complete_rctx_no_pg() { return seastar::now();}
+ virtual seastar::future<> complete_rctx_no_pg(
+ ShardServices &shard_services
+ ) { return seastar::now();}
public:
template <typename... Args>
PeeringEvent(
- ShardServices &shard_services, const pg_shard_t &from, const spg_t &pgid,
+ const pg_shard_t &from, const spg_t &pgid,
Args&&... args) :
- shard_services(shard_services),
from(from),
pgid(pgid),
evt(std::forward<Args>(args)...)
{}
template <typename... Args>
PeeringEvent(
- ShardServices &shard_services, const pg_shard_t &from, const spg_t &pgid,
+ const pg_shard_t &from, const spg_t &pgid,
float delay, Args&&... args) :
- shard_services(shard_services),
from(from),
pgid(pgid),
delay(delay),
// must be after conn due to ConnectionPipeline's life-time
PipelineHandle handle;
- void on_pg_absent() final;
- PeeringEvent::interruptible_future<> complete_rctx(Ref<PG> pg) override;
- seastar::future<> complete_rctx_no_pg() override;
+ void on_pg_absent(ShardServices &) final;
+ PeeringEvent::interruptible_future<> complete_rctx(
+ ShardServices &shard_services,
+ Ref<PG> pg) override;
+ seastar::future<> complete_rctx_no_pg(
+ ShardServices &shard_services
+ ) override;
public:
class OSDPipeline {
[this, epoch=get_osdmap_epoch()]() {
return shard_services.start_operation<LocalPeeringEvent>(
this,
- shard_services,
pg_whoami,
pgid,
epoch,
check_readable_timer.set_callback([last_peering_reset, this] {
(void) shard_services.start_operation<LocalPeeringEvent>(
this,
- shard_services,
pg_whoami,
pgid,
last_peering_reset,
__func__);
(void) shard_services.start_operation<LocalPeeringEvent>(
this,
- shard_services,
pg_whoami,
pgid,
float(0.001),
__func__);
(void) shard_services.start_operation<LocalPeeringEvent>(
this,
- shard_services,
pg_whoami,
pgid,
float(0.001),
" for pg: {}", __func__, pgid);
(void) shard_services.start_operation<LocalPeeringEvent>(
this,
- shard_services,
pg_whoami,
pgid,
float(0.001),
renew_lease_timer.set_callback([last_peering_reset, this] {
(void) shard_services.start_operation<LocalPeeringEvent>(
this,
- shard_services,
pg_whoami,
pgid,
last_peering_reset,
epoch_t epoch = get_osdmap_epoch();
(void) shard_services.start_operation<LocalPeeringEvent>(
this,
- shard_services,
pg_whoami,
pgid,
epoch,
void start_peering_event_operation(T &&evt, float delay = 0) {
(void) shard_services.start_operation<LocalPeeringEvent>(
this,
- shard_services,
pg_whoami,
pgid,
delay,
pg->get_pgid());
(void) pg->get_shard_services().start_operation<LocalPeeringEvent>(
static_cast<crimson::osd::PG*>(pg),
- pg->get_shard_services(),
pg->get_pg_whoami(),
pg->get_pgid(),
pg->get_osdmap_epoch(),
pg->get_pgid());
(void) pg->get_shard_services().start_operation<LocalPeeringEvent>(
static_cast<crimson::osd::PG*>(pg),
- pg->get_shard_services(),
pg->get_pg_whoami(),
pg->get_pgid(),
pg->get_osdmap_epoch(),
using LocalPeeringEvent = crimson::osd::LocalPeeringEvent;
std::ignore = pg->get_shard_services().start_operation<LocalPeeringEvent>(
static_cast<crimson::osd::PG*>(pg),
- pg->get_shard_services(),
pg->get_pg_whoami(),
pg->get_pgid(),
pg->get_osdmap_epoch(),
std::ignore = m.get_connection()->send(std::move(reply));
shard_services.start_operation<crimson::osd::LocalPeeringEvent>(
static_cast<crimson::osd::PG*>(&pg),
- shard_services,
pg.get_pg_whoami(),
pg.get_pgid(),
pg.get_osdmap_epoch(),
std::ignore = shard_services.start_operation<crimson::osd::LocalPeeringEvent>(
// TODO: abstract start_background_recovery
static_cast<crimson::osd::PG*>(&pg),
- shard_services,
pg.get_pg_whoami(),
pg.get_pgid(),
pg.get_osdmap_epoch(),