return pg.repop_pipeline;
}
+RepRequest::interruptible_future<> RepRequest::with_pg_interruptible(
+ Ref<PG> pg)
+{
+ LOG_PREFIX(RepRequest::with_pg_interruptible);
+ DEBUGI("{}", *this);
+ return this->template enter_stage<interruptor>(repop_pipeline(*pg).process
+ ).then_interruptible([this, pg] {
+ return this->template with_blocking_event<
+ PG_OSDMapGate::OSDMapBlocker::BlockingEvent
+ >([this, pg](auto &&trigger) {
+ return pg->osdmap_gate.wait_for_map(
+ std::move(trigger), req->min_epoch);
+ });
+ }).then_interruptible([this, pg] (auto) {
+ return pg->handle_rep_op(req);
+ });
+}
+
seastar::future<> RepRequest::with_pg(
ShardServices &shard_services, Ref<PG> pg)
{
DEBUGI("{}", *this);
IRef ref = this;
return interruptor::with_interruption([this, pg] {
- LOG_PREFIX(RepRequest::with_pg);
- DEBUGI("{}: pg present", *this);
- return this->template enter_stage<interruptor>(repop_pipeline(*pg).process
- ).then_interruptible([this, pg] {
- return this->template with_blocking_event<
- PG_OSDMapGate::OSDMapBlocker::BlockingEvent
- >([this, pg](auto &&trigger) {
- return pg->osdmap_gate.wait_for_map(
- std::move(trigger), req->min_epoch);
- });
- }).then_interruptible([this, pg] (auto) {
- return pg->handle_rep_op(req);
- });
+ return with_pg_interruptible(pg);
}, [](std::exception_ptr) {
return seastar::now();
}, pg, pg->get_osdmap_epoch()).finally([this, ref=std::move(ref)] {