if (is_pg_op()) {
return process_pg_op(pgref);
} else {
- return process_op(
- ihref,
- pgref
- ).then_interruptible([](auto){});
+ return process_op(ihref, pgref);
}
}).then_interruptible([this, this_instance_id, pgref] {
logger().debug("{}.{}: after process*", *this, this_instance_id);
!m->has_flag(CEPH_OSD_FLAG_RETURNVEC));
reply->set_reply_versions(eversion_t(), 0);
reply->set_op_returns(std::vector<pg_log_op_return_item_t>{});
- return conn->send(std::move(reply)).then([] {
- return seastar::make_ready_future<ClientRequest::seq_mode_t>
- (seq_mode_t::OUT_OF_ORDER);
- });
+ return conn->send(std::move(reply));
}
-ClientRequest::interruptible_future<ClientRequest::seq_mode_t>
+ClientRequest::interruptible_future<>
ClientRequest::process_op(instance_handle_t &ihref, Ref<PG> &pg)
{
return ihref.enter_stage<interruptor>(
}).then_interruptible([this, pg, &ihref]() mutable {
return pg->already_complete(m->get_reqid()).then_interruptible(
[this, pg, &ihref](auto completed) mutable
- -> PG::load_obc_iertr::future<seq_mode_t> {
+ -> PG::load_obc_iertr::future<> {
if (completed) {
auto reply = crimson::make_message<MOSDOpReply>(
m.get(), completed->err, pg->get_osdmap_epoch(),
CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false);
reply->set_reply_versions(completed->version, completed->user_version);
- return conn->send(std::move(reply)).then([] {
- return seastar::make_ready_future<seq_mode_t>(seq_mode_t::OUT_OF_ORDER);
- });
+ return conn->send(std::move(reply));
} else {
return ihref.enter_stage<interruptor>(pp(*pg).get_obc, *this
).then_interruptible(
- [this, pg, &ihref]() mutable -> PG::load_obc_iertr::future<seq_mode_t> {
+ [this, pg, &ihref]() mutable -> PG::load_obc_iertr::future<> {
logger().debug("{}: got obc lock", *this);
op_info.set_from_op(&*m, *pg->get_osdmap());
- // XXX: `do_with()` is just a workaround for `with_obc_func_t` imposing
- // `future<void>`.
- return seastar::do_with(
- seq_mode_t{},
- [this, &pg, &ihref](seq_mode_t& mode) {
- return pg->with_locked_obc(
- m->get_hobj(), op_info,
- [this, pg, &mode, &ihref](auto obc) mutable {
- return ihref.enter_stage<interruptor>(pp(*pg).process, *this
- ).then_interruptible(
- [this, pg, obc, &mode, &ihref]() mutable {
- return do_process(ihref, pg, obc
- ).then_interruptible([&mode] (seq_mode_t _mode) {
- mode = _mode;
- return seastar::now();
- });
- });
- }).safe_then_interruptible([&mode] {
- return PG::load_obc_iertr::make_ready_future<seq_mode_t>(mode);
- });
- });
- });
+ return pg->with_locked_obc(
+ m->get_hobj(), op_info,
+ [this, pg, &ihref](auto obc) mutable {
+ return ihref.enter_stage<interruptor>(pp(*pg).process, *this
+ ).then_interruptible([this, pg, obc, &ihref]() mutable {
+ return do_process(ihref, pg, obc);
+ });
+ });
+ });
}
- });
- }).safe_then_interruptible([pg] (const seq_mode_t mode) {
- return seastar::make_ready_future<seq_mode_t>(mode);
- }, PG::load_obc_ertr::all_same_way([this, pg=std::move(pg)](const auto &code) {
- logger().error("ClientRequest saw error code {}", code);
- assert(code.value() > 0);
- return reply_op_error(pg, -code.value());
+ });
+ }).handle_error_interruptible(
+ PG::load_obc_ertr::all_same_way([this, pg=std::move(pg)](const auto &code) {
+ logger().error("ClientRequest saw error code {}", code);
+ assert(code.value() > 0);
+ return reply_op_error(pg, -code.value());
}));
}
-ClientRequest::interruptible_future<ClientRequest::seq_mode_t>
+ClientRequest::interruptible_future<>
ClientRequest::do_process(
instance_handle_t &ihref,
Ref<PG>& pg, crimson::osd::ObjectContextRef obc)
// primary can handle both normal ops and balanced reads
if (is_misdirected(*pg)) {
logger().trace("do_process: dropping misdirected op");
- return seastar::make_ready_future<seq_mode_t>(seq_mode_t::OUT_OF_ORDER);
+ return seastar::now();
} else if (const hobject_t& hoid = m->get_hobj();
!pg->get_peering_state().can_serve_replica_read(hoid)) {
return reply_op_error(pg, -EAGAIN);
// drop op on the floor; the client will handle returning EIO
if (m->has_flag(CEPH_OSD_FLAG_SUPPORTSPOOLEIO)) {
logger().debug("discarding op due to pool EIO flag");
- return seastar::make_ready_future<seq_mode_t>(seq_mode_t::IN_ORDER);
+ return seastar::now();
} else {
logger().debug("replying EIO due to pool EIO flag");
return reply_op_error(pg, -EIO);
).then_interruptible(
[this, reply=std::move(reply)]() mutable {
logger().debug("{}: sending response", *this);
- return conn->send(std::move(reply)).then([] {
- return seastar::make_ready_future<seq_mode_t>(seq_mode_t::IN_ORDER);
- });
+ return conn->send(std::move(reply));
});
}, crimson::ct_error::eagain::handle([this, pg, &ihref]() mutable {
return process_op(ihref, pg);