instance_handle_t &ihref, Ref<PG> pg, unsigned this_instance_id)
{
LOG_PREFIX(ClientRequest::process_op);
- return ihref.enter_stage<interruptor>(
+ co_await ihref.enter_stage<interruptor>(
client_pp(*pg).recover_missing, *this
- ).then_interruptible([pg, this]() mutable {
- return recover_missings(pg, m->get_hobj(), snaps_need_to_recover());
- }).then_interruptible([FNAME, this, pg, this_instance_id, &ihref]() mutable {
- DEBUGDPP("{}.{}: checking already_complete",
+ );
+ co_await recover_missings(pg, m->get_hobj(), snaps_need_to_recover());
+
+ DEBUGDPP("{}.{}: checking already_complete",
+ *pg, *this, this_instance_id);
+ auto completed = co_await pg->already_complete(m->get_reqid());
+
+ if (completed) {
+ DEBUGDPP("{}.{}: already completed, sending reply",
*pg, *this, this_instance_id);
- return pg->already_complete(m->get_reqid()).then_interruptible(
- [FNAME, this, pg, this_instance_id, &ihref](auto completed) mutable
- -> PG::load_obc_iertr::future<> {
- if (completed) {
- DEBUGDPP("{}.{}: already completed, sending reply",
- *pg, *this, this_instance_id);
- auto reply = crimson::make_message<MOSDOpReply>(
- m.get(), completed->err, pg->get_osdmap_epoch(),
- CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false);
- reply->set_reply_versions(completed->version, completed->user_version);
- // TODO: gate the crosscore sending
- return get_foreign_connection().send_with_throttling(std::move(reply));
- } else {
- DEBUGDPP("{}.{}: not completed, entering get_obc stage",
- *pg, *this, this_instance_id);
- return ihref.enter_stage<interruptor>(client_pp(*pg).get_obc, *this
- ).then_interruptible(
- [FNAME, this, pg, this_instance_id, &ihref]() mutable
- -> PG::load_obc_iertr::future<> {
- DEBUGDPP("{}.{}: entered get_obc stage, about to wait_scrub",
+ auto reply = crimson::make_message<MOSDOpReply>(
+ m.get(), completed->err, pg->get_osdmap_epoch(),
+ CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false);
+ reply->set_reply_versions(completed->version, completed->user_version);
+ // TODO: gate the crosscore sending
+ co_await interruptor::make_interruptible(
+ get_foreign_connection().send_with_throttling(std::move(reply))
+ );
+ co_return;
+ }
+
+ DEBUGDPP("{}.{}: not completed, entering get_obc stage",
+ *pg, *this, this_instance_id);
+ co_await ihref.enter_stage<interruptor>(client_pp(*pg).get_obc, *this);
+
+ DEBUGDPP("{}.{}: entered get_obc stage, about to wait_scrub",
+ *pg, *this, this_instance_id);
+ if (int res = op_info.set_from_op(&*m, *pg->get_osdmap());
+ res != 0) {
+ co_await reply_op_error(pg, res);
+ co_return;
+ }
+ co_await ihref.enter_blocker(
+ *this, pg->scrubber, &decltype(pg->scrubber)::wait_scrub,
+ m->get_hobj());
+
+ DEBUGDPP("{}.{}: past scrub blocker, getting obc",
+ *pg, *this, this_instance_id);
+ co_await pg->with_locked_obc(
+ m->get_hobj(), op_info,
+ [FNAME, this, pg, this_instance_id, &ihref] (
+ auto head, auto obc
+ ) -> interruptible_future<> {
+ DEBUGDPP("{}.{}: got obc {}, entering process stage",
+ *pg, *this, this_instance_id, obc->obs);
+ return ihref.enter_stage<interruptor>(
+ client_pp(*pg).process, *this
+ ).then_interruptible(
+ [FNAME, this, pg, this_instance_id, obc, &ihref]() mutable {
+ DEBUGDPP("{}.{}: in process stage, calling do_process",
*pg, *this, this_instance_id);
- if (int res = op_info.set_from_op(&*m, *pg->get_osdmap());
- res != 0) {
- return reply_op_error(pg, res);
- }
- return ihref.enter_blocker(
- *this,
- pg->scrubber,
- &decltype(pg->scrubber)::wait_scrub,
- m->get_hobj()
- ).then_interruptible(
- [FNAME, this, pg, this_instance_id, &ihref]() mutable {
- DEBUGDPP("{}.{}: past scrub blocker, getting obc",
- *pg, *this, this_instance_id);
- return pg->with_locked_obc(
- m->get_hobj(), op_info,
- [FNAME, this, pg, this_instance_id, &ihref](
- auto head, auto obc) mutable {
- DEBUGDPP("{}.{}: got obc {}, entering process stage",
- *pg, *this, this_instance_id, obc->obs);
- return ihref.enter_stage<interruptor>(
- client_pp(*pg).process, *this
- ).then_interruptible(
- [FNAME, this, pg, this_instance_id, obc, &ihref]() mutable {
- DEBUGDPP("{}.{}: in process stage, calling do_process",
- *pg, *this, this_instance_id);
- return do_process(ihref, pg, obc, this_instance_id);
- });
- });
- });
- });
- }
- });
- }).handle_error_interruptible(
+ return do_process(ihref, pg, obc, this_instance_id);
+ });
+ }
+ ).handle_error_interruptible(
PG::load_obc_ertr::all_same_way(
- [FNAME, this, pg=std::move(pg), this_instance_id](const auto &code) {
- DEBUGDPP("{}.{}: saw error code {}",
- *pg, *this, this_instance_id, code);
- assert(code.value() > 0);
- return reply_op_error(pg, -code.value());
- }));
+ [FNAME, this, pg=std::move(pg), this_instance_id](
+ const auto &code
+ ) -> interruptible_future<> {
+ DEBUGDPP("{}.{}: saw error code {}",
+ *pg, *this, this_instance_id, code);
+ assert(code.value() > 0);
+ return reply_op_error(pg, -code.value());
+ })
+ );
}
ClientRequest::interruptible_future<>