epoch_t same_interval_since = pgref->get_interval_start_epoch();
logger().debug("{} same_interval_since: {}", *this, same_interval_since);
const bool has_pg_op = is_pg_op();
- auto interruptible_do_op =
- interruptor::wrap_function([this, has_pg_op, pgref] {
- PG &pg = *pgref;
- if (pg.can_discard_op(*m)) {
- return interruptible_future<>(
- osd.send_incremental_map(conn, m->get_map_epoch()));
- }
+ auto interruptible_do_op = interruptor::wrap_function([=] {
+ PG &pg = *pgref;
+ if (pg.can_discard_op(*m)) {
+ return interruptible_future<>(
+ osd.send_incremental_map(conn, m->get_map_epoch()));
+ }
+ return with_blocking_future_interruptible<IOInterruptCondition>(
+ handle.enter(pp(pg).await_map)
+ ).then_interruptible([this, &pg] {
+ return with_blocking_future_interruptible<IOInterruptCondition>(
+ pg.osdmap_gate.wait_for_map(m->get_min_epoch()));
+ }).then_interruptible([this, &pg](auto map) {
return with_blocking_future_interruptible<IOInterruptCondition>(
- handle.enter(pp(pg).await_map)
- ).then_interruptible([this, &pg] {
- return with_blocking_future_interruptible<IOInterruptCondition>(
- pg.osdmap_gate.wait_for_map(m->get_min_epoch()));
- }).then_interruptible([this, &pg](auto map) {
- return with_blocking_future_interruptible<IOInterruptCondition>(
- handle.enter(pp(pg).wait_for_active));
- }).then_interruptible([this, &pg]() {
- return with_blocking_future_interruptible<IOInterruptCondition>(
- pg.wait_for_active_blocker.wait());
- }).then_interruptible([this, has_pg_op, pgref=std::move(pgref)]() mutable {
- if (m->finish_decode()) {
- m->clear_payload();
- }
- return (has_pg_op ?
- process_pg_op(pgref) :
- process_op(pgref));
- });
+ handle.enter(pp(pg).wait_for_active));
+ }).then_interruptible([this, &pg]() {
+ return with_blocking_future_interruptible<IOInterruptCondition>(
+ pg.wait_for_active_blocker.wait());
+ }).then_interruptible([this,
+ has_pg_op,
+ pgref=std::move(pgref)]() mutable {
+ if (m->finish_decode()) {
+ m->clear_payload();
+ }
+ return (has_pg_op ?
+ process_pg_op(pgref) :
+ process_op(pgref));
});
+ });
// keep the ordering of non-pg ops when across pg internvals
return (has_pg_op ?
interruptible_do_op() :