});
}
-PGPeeringPipeline &BackfillRecovery::bp(PG &pg)
+PGPeeringPipeline &BackfillRecovery::peering_pp(PG &pg)
{
return pg.peering_request_pg_pipeline;
}
// with the backfill_pipeline we protect it from a second entry from
// the implementation of BackfillListener.
// additionally, this stage serves to synchronize with PeeringEvent.
- bp(*pg).process
+ peering_pp(*pg).process
).then_interruptible([this] {
pg->get_recovery_handler()->dispatch_backfill_event(std::move(evt));
return seastar::make_ready_future<bool>(false);
boost::intrusive_ptr<const boost::statechart::event_base> evt;
PipelineHandle handle;
- static PGPeeringPipeline &bp(PG &pg);
+ static PGPeeringPipeline &peering_pp(PG &pg);
interruptible_future<bool> do_recovery() override;
};
return get_osd_priv(conn.get()).client_request_conn_pipeline;
}
-ClientRequest::PGPipeline &ClientRequest::pp(PG &pg)
+ClientRequest::PGPipeline &ClientRequest::client_pp(PG &pg)
{
return pg.request_pg_pipeline;
}
return interruptor::now();
});
}
- return ihref.enter_stage<interruptor>(pp(pg).await_map, *this
+ return ihref.enter_stage<interruptor>(client_pp(pg).await_map, *this
).then_interruptible([this, this_instance_id, &pg, &ihref] {
logger().debug("{}.{}: after await_map stage", *this, this_instance_id);
return ihref.enter_blocker(
m->get_min_epoch(), nullptr);
}).then_interruptible([this, this_instance_id, &pg, &ihref](auto map) {
logger().debug("{}.{}: after wait_for_map", *this, this_instance_id);
- return ihref.enter_stage<interruptor>(pp(pg).wait_for_active, *this);
+ return ihref.enter_stage<interruptor>(client_pp(pg).wait_for_active, *this);
}).then_interruptible([this, this_instance_id, &pg, &ihref]() {
logger().debug(
"{}.{}: after wait_for_active stage", *this, this_instance_id);
ClientRequest::process_op(instance_handle_t &ihref, Ref<PG> &pg)
{
return ihref.enter_stage<interruptor>(
- pp(*pg).recover_missing,
+ client_pp(*pg).recover_missing,
*this
).then_interruptible(
[this, pg]() mutable {
reply->set_reply_versions(completed->version, completed->user_version);
return conn->send(std::move(reply));
} else {
- return ihref.enter_stage<interruptor>(pp(*pg).get_obc, *this
+ return ihref.enter_stage<interruptor>(client_pp(*pg).get_obc, *this
).then_interruptible(
[this, pg, &ihref]() mutable -> PG::load_obc_iertr::future<> {
logger().debug("{}: in get_obc stage", *this);
[this, pg, &ihref](auto obc) mutable {
logger().debug("{}: got obc {}", *this, obc->obs);
return ihref.enter_stage<interruptor>(
- pp(*pg).process, *this
+ client_pp(*pg).process, *this
).then_interruptible([this, pg, obc, &ihref]() mutable {
return do_process(ihref, pg, obc);
});
return pg->do_osd_ops(m, conn, obc, op_info, snapc).safe_then_unpack_interruptible(
[this, pg, &ihref](auto submitted, auto all_completed) mutable {
return submitted.then_interruptible([this, pg, &ihref] {
- return ihref.enter_stage<interruptor>(pp(*pg).wait_repop, *this);
+ return ihref.enter_stage<interruptor>(client_pp(*pg).wait_repop, *this);
}).then_interruptible(
[this, pg, all_completed=std::move(all_completed), &ihref]() mutable {
return all_completed.safe_then_interruptible(
[this, pg, &ihref](MURef<MOSDOpReply> reply) {
- return ihref.enter_stage<interruptor>(pp(*pg).send_reply, *this
+ return ihref.enter_stage<interruptor>(client_pp(*pg).send_reply, *this
).then_interruptible(
[this, reply=std::move(reply)]() mutable {
logger().debug("{}: sending response", *this);
Ref<PG> &pg);
bool is_pg_op() const;
- PGPipeline &pp(PG &pg);
+ PGPipeline &client_pp(PG &pg);
template <typename Errorator>
using interruptible_errorator =
{
}
-CommonPGPipeline& InternalClientRequest::pp()
+CommonPGPipeline& InternalClientRequest::client_pp()
{
return pg->request_pg_pipeline;
}
logger().debug("{}: in repeat", *this);
return interruptor::with_interruption([this]() mutable {
return enter_stage<interruptor>(
- pp().wait_for_active
+ client_pp().wait_for_active
).then_interruptible([this] {
return with_blocking_event<PGActivationBlocker::BlockingEvent,
interruptor>([this] (auto&& trigger) {
});
}).then_interruptible([this] {
return enter_stage<interruptor>(
- pp().recover_missing);
+ client_pp().recover_missing);
}).then_interruptible([this] {
return do_recover_missing(pg, get_target_oid());
}).then_interruptible([this] {
return enter_stage<interruptor>(
- pp().get_obc);
+ client_pp().get_obc);
}).then_interruptible([this] () -> PG::load_obc_iertr::future<> {
logger().debug("{}: getting obc lock", *this);
return seastar::do_with(create_osd_ops(),
assert(ret == 0);
return pg->with_locked_obc(get_target_oid(), op_info,
[&osd_ops, this](auto obc) {
- return enter_stage<interruptor>(pp().process).then_interruptible(
+ return enter_stage<interruptor>(client_pp().process
+ ).then_interruptible(
[obc=std::move(obc), &osd_ops, this] {
return pg->do_osd_ops(
std::move(obc),
void print(std::ostream &) const final;
void dump_detail(Formatter *f) const final;
- CommonPGPipeline& pp();
+ CommonPGPipeline& client_pp();
seastar::future<> do_process();
return get_osd_priv(conn.get()).replicated_request_conn_pipeline;
}
-ClientRequest::PGPipeline &LogMissingRequest::pp(PG &pg)
+ClientRequest::PGPipeline &LogMissingRequest::client_pp(PG &pg)
{
return pg.request_pg_pipeline;
}
> tracking_events;
private:
- ClientRequest::PGPipeline &pp(PG &pg);
+ ClientRequest::PGPipeline &client_pp(PG &pg);
crimson::net::ConnectionRef conn;
// must be after `conn` to ensure the ConnectionPipeline's is alive
return get_osd_priv(conn.get()).replicated_request_conn_pipeline;
}
-ClientRequest::PGPipeline &LogMissingRequestReply::pp(PG &pg)
+ClientRequest::PGPipeline &LogMissingRequestReply::client_pp(PG &pg)
{
return pg.request_pg_pipeline;
}
> tracking_events;
private:
- ClientRequest::PGPipeline &pp(PG &pg);
+ ClientRequest::PGPipeline &client_pp(PG &pg);
crimson::net::ConnectionRef conn;
// must be after `conn` to ensure the ConnectionPipeline's is alive
template <class T>
-PGPeeringPipeline &PeeringEvent<T>::pp(PG &pg)
+PGPeeringPipeline &PeeringEvent<T>::peering_pp(PG &pg)
{
return pg.peering_request_pg_pipeline;
}
using interruptor = typename T::interruptor;
return interruptor::with_interruption([this, pg, &shard_services] {
logger().debug("{}: pg present", *this);
- return this->template enter_stage<interruptor>(pp(*pg).await_map
+ return this->template enter_stage<interruptor>(peering_pp(*pg).await_map
).then_interruptible([this, pg] {
return this->template with_blocking_event<
PG_OSDMapGate::OSDMapBlocker::BlockingEvent
std::move(trigger), evt.get_epoch_sent());
});
}).then_interruptible([this, pg](auto) {
- return this->template enter_stage<interruptor>(pp(*pg).process);
+ return this->template enter_stage<interruptor>(peering_pp(*pg).process);
}).then_interruptible([this, pg, &shard_services] {
return pg->do_peering_event(evt, ctx
).then_interruptible([this, pg, &shard_services] {
static constexpr OperationTypeCode type = OperationTypeCode::peering_event;
protected:
- PGPeeringPipeline &pp(PG &pg);
+ PGPeeringPipeline &peering_pp(PG &pg);
PeeringCtx ctx;
pg_shard_t from;
return get_osd_priv(conn.get()).replicated_request_conn_pipeline;
}
-ClientRequest::PGPipeline &RepRequest::pp(PG &pg)
+ClientRequest::PGPipeline &RepRequest::client_pp(PG &pg)
{
return pg.request_pg_pipeline;
}
IRef ref = this;
return interruptor::with_interruption([this, pg] {
logger().debug("{}: pg present", *this);
- return this->template enter_stage<interruptor>(pp(*pg).await_map
+ return this->template enter_stage<interruptor>(client_pp(*pg).await_map
).then_interruptible([this, pg] {
return this->template with_blocking_event<
PG_OSDMapGate::OSDMapBlocker::BlockingEvent
> tracking_events;
private:
- ClientRequest::PGPipeline &pp(PG &pg);
+ ClientRequest::PGPipeline &client_pp(PG &pg);
crimson::net::ConnectionRef conn;
PipelineHandle handle;
});
}
-CommonPGPipeline& SnapTrimEvent::pp()
+CommonPGPipeline& SnapTrimEvent::client_pp()
{
return pg->request_pg_pipeline;
}
{
return interruptor::with_interruption([&shard_services, this] {
return enter_stage<interruptor>(
- pp().wait_for_active
+ client_pp().wait_for_active
).then_interruptible([this] {
return with_blocking_event<PGActivationBlocker::BlockingEvent,
interruptor>([this] (auto&& trigger) {
});
}).then_interruptible([this] {
return enter_stage<interruptor>(
- pp().recover_missing);
+ client_pp().recover_missing);
}).then_interruptible([] {
//return do_recover_missing(pg, get_target_oid());
return seastar::now();
}).then_interruptible([this] {
return enter_stage<interruptor>(
- pp().get_obc);
+ client_pp().get_obc);
}).then_interruptible([this] {
return pg->snaptrim_mutex.lock(*this);
}).then_interruptible([this] {
return enter_stage<interruptor>(
- pp().process);
+ client_pp().process);
}).then_interruptible([&shard_services, this] {
return interruptor::async([this] {
std::vector<hobject_t> to_trim;
}
-CommonPGPipeline& SnapTrimObjSubEvent::pp()
+CommonPGPipeline& SnapTrimObjSubEvent::client_pp()
{
return pg->request_pg_pipeline;
}
ShardServices &shard_services, Ref<PG> _pg)
{
return enter_stage<interruptor>(
- pp().wait_for_active
+ client_pp().wait_for_active
).then_interruptible([this] {
return with_blocking_event<PGActivationBlocker::BlockingEvent,
interruptor>([this] (auto&& trigger) {
});
}).then_interruptible([this] {
return enter_stage<interruptor>(
- pp().recover_missing);
+ client_pp().recover_missing);
}).then_interruptible([] {
//return do_recover_missing(pg, get_target_oid());
return seastar::now();
}).then_interruptible([this] {
return enter_stage<interruptor>(
- pp().get_obc);
+ client_pp().get_obc);
}).then_interruptible([this] {
logger().debug("{}: getting obc for {}", *this, coid);
// end of commonality
[this](auto head_obc, auto clone_obc) {
logger().debug("{}: got clone_obc={}", *this, clone_obc->get_oid());
return enter_stage<interruptor>(
- pp().process
+ client_pp().process
).then_interruptible(
[this,clone_obc=std::move(clone_obc), head_obc=std::move(head_obc)]() mutable {
logger().debug("{}: processing clone_obc={}", *this, clone_obc->get_oid());
ShardServices &shard_services, Ref<PG> pg);
private:
- CommonPGPipeline& pp();
+ CommonPGPipeline& client_pp();
// bases on 998cb8c141bb89aafae298a9d5e130fbd78fe5f2
struct SubOpBlocker : crimson::BlockerT<SubOpBlocker> {
remove_or_update_iertr::future<> with_pg(
ShardServices &shard_services, Ref<PG> pg);
- CommonPGPipeline& pp();
+ CommonPGPipeline& client_pp();
private:
object_stat_sum_t delta_stats;