return interruptor::with_interruption([this, pgref]() mutable {
epoch_t same_interval_since = pgref->get_interval_start_epoch();
logger().debug("{} same_interval_since: {}", *this, same_interval_since);
- return with_sequencer(
- interruptor::wrap_function([this, pgref]() -> interruptible_future<> {
+ const bool has_pg_op = is_pg_op();
+ auto interruptible_do_op =
+ interruptor::wrap_function([this, has_pg_op, pgref]() -> interruptible_future<> {
PG &pg = *pgref;
if (pg.can_discard_op(*m)) {
return osd.send_incremental_map(conn, m->get_map_epoch());
}).then_interruptible([this, &pg]() {
return with_blocking_future_interruptible<IOInterruptCondition>(
pg.wait_for_active_blocker.wait());
- }).then_interruptible([this, pgref=std::move(pgref)]() mutable {
+ }).then_interruptible([this, has_pg_op, pgref=std::move(pgref)]() mutable {
if (m->finish_decode()) {
m->clear_payload();
}
- if (is_pg_op()) {
- return process_pg_op(pgref);
- } else {
- return process_op(pgref);
- }
+ return (has_pg_op ?
+ process_pg_op(pgref) :
+ process_op(pgref));
});
- })
- ).then_interruptible([pgref]() {
- return seastar::stop_iteration::yes;
- });
+ });
+ // keep the ordering of non-pg ops when across pg internvals
+ return (has_pg_op ?
+ interruptible_do_op() :
+ with_sequencer(std::move(interruptible_do_op)))
+ .then_interruptible([pgref]() {
+ return seastar::stop_iteration::yes;
+ });
}, [this, pgref](std::exception_ptr eptr) {
if (should_abort_request(*this, std::move(eptr))) {
sequencer.abort();