sequencer.finish_op_out_of_order(*this, osd.get_shard_services().registry);
});
} else {
- return process_op(pgref).then_interruptible([this] {
- // NOTE: this assumes process_op() handles everything
- // in-order which I'm not sure about.
- sequencer.finish_op_in_order(*this);
+ return process_op(pgref).then_interruptible([this] (const seq_mode_t mode) {
+ if (mode == seq_mode_t::IN_ORDER) {
+ sequencer.finish_op_in_order(*this);
+ } else {
+ assert(mode == seq_mode_t::OUT_OF_ORDER);
+ sequencer.finish_op_out_of_order(*this, osd.get_shard_services().registry);
+ }
});
}
});
});
}
-ClientRequest::interruptible_future<>
+ClientRequest::interruptible_future<ClientRequest::seq_mode_t>
ClientRequest::process_op(Ref<PG> &pg)
{
return with_blocking_future_interruptible<interruptor::condition>(
}).then_interruptible([this, pg]() mutable {
return pg->already_complete(m->get_reqid()).then_unpack_interruptible(
[this, pg](bool completed, int ret) mutable
- -> PG::load_obc_iertr::future<> {
+ -> PG::load_obc_iertr::future<seq_mode_t> {
if (completed) {
auto reply = crimson::make_message<MOSDOpReply>(
m.get(), ret, pg->get_osdmap_epoch(),
CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false);
- return conn->send(std::move(reply));
+ return conn->send(std::move(reply)).then([] {
+ return seastar::make_ready_future<seq_mode_t>(seq_mode_t::OUT_OF_ORDER);
+ });
} else {
return with_blocking_future_interruptible<interruptor::condition>(
handle.enter(pp(*pg).get_obc)).then_interruptible(
- [this, pg]() mutable -> PG::load_obc_iertr::future<> {
+ [this, pg]() mutable -> PG::load_obc_iertr::future<seq_mode_t> {
logger().debug("{}: got obc lock", *this);
op_info.set_from_op(&*m, *pg->get_osdmap());
- return pg->with_locked_obc(m->get_hobj(), op_info,
- [this, pg](auto obc) mutable {
- return with_blocking_future_interruptible<interruptor::condition>(
- handle.enter(pp(*pg).process)
- ).then_interruptible([this, pg, obc]() mutable {
- return do_process(pg, obc);
+ // XXX: `do_with()` is just a workaround for `with_obc_func_t` imposing
+ // `future<void>`.
+ return seastar::do_with(seq_mode_t{}, [this, &pg] (seq_mode_t& mode) {
+ return pg->with_locked_obc(m->get_hobj(), op_info,
+ [this, pg, &mode](auto obc) mutable {
+ return with_blocking_future_interruptible<interruptor::condition>(
+ handle.enter(pp(*pg).process)
+ ).then_interruptible([this, pg, obc, &mode]() mutable {
+ return do_process(pg, obc).then_interruptible([&mode] (seq_mode_t _mode) {
+ mode = _mode;
+ return seastar::now();
+ });
+ });
+ }).safe_then_interruptible([&mode] {
+ return PG::load_obc_iertr::make_ready_future<seq_mode_t>(mode);
});
});
});
}
});
- }).safe_then_interruptible([pg=std::move(pg)] {
- return seastar::now();
+ }).safe_then_interruptible([pg=std::move(pg)] (const seq_mode_t mode) {
+ return seastar::make_ready_future<seq_mode_t>(mode);
}, PG::load_obc_ertr::all_same_way([](auto &code) {
logger().error("ClientRequest saw error code {}", code);
- return seastar::now();
+ return seastar::make_ready_future<seq_mode_t>(seq_mode_t::OUT_OF_ORDER);
}));
}
-ClientRequest::interruptible_future<>
+ClientRequest::interruptible_future<ClientRequest::seq_mode_t>
ClientRequest::do_process(Ref<PG>& pg, crimson::osd::ObjectContextRef obc)
{
if (!pg->is_primary()) {
// primary can handle both normal ops and balanced reads
if (is_misdirected(*pg)) {
logger().trace("do_process: dropping misdirected op");
- return seastar::now();
+ return seastar::make_ready_future<seq_mode_t>(seq_mode_t::OUT_OF_ORDER);
} else if (const hobject_t& hoid = m->get_hobj();
!pg->get_peering_state().can_serve_replica_read(hoid)) {
auto reply = crimson::make_message<MOSDOpReply>(
m.get(), -EAGAIN, pg->get_osdmap_epoch(),
m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK),
!m->has_flag(CEPH_OSD_FLAG_RETURNVEC));
- return conn->send(std::move(reply));
+ return conn->send(std::move(reply)).then([] {
+ return seastar::make_ready_future<seq_mode_t>(seq_mode_t::OUT_OF_ORDER);
+ });
}
}
return pg->do_osd_ops(m, obc, op_info).safe_then_unpack_interruptible(
return with_blocking_future_interruptible<interruptor::condition>(
handle.enter(pp(*pg).send_reply)).then_interruptible(
[this, reply=std::move(reply)]() mutable{
- return conn->send(std::move(reply));
+ return conn->send(std::move(reply)).then([] {
+ return seastar::make_ready_future<seq_mode_t>(seq_mode_t::IN_ORDER);
+ });
});
}, crimson::ct_error::eagain::handle([this, pg]() mutable {
return process_op(pg);