#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
+#include "crimson/common/coroutine.h"
#include "crimson/common/exception.h"
#include "crimson/common/log.h"
#include "crimson/osd/pg.h"
DEBUGDPP("{} start", *pgref, *this);
PG &pg = *pgref;
if (pg.can_discard_op(*m)) {
- return shard_services->send_incremental_map(
- std::ref(get_foreign_connection()), m->get_map_epoch()
- ).then([FNAME, this, this_instance_id, pgref] {
- DEBUGDPP("{}: discarding {}", *pgref, *this, this_instance_id);
- pgref->client_request_orderer.remove_request(*this);
- complete_request();
- return interruptor::now();
- });
+ co_await interruptor::make_interruptible(
+ shard_services->send_incremental_map(
+ std::ref(get_foreign_connection()), m->get_map_epoch()
+ ));
+ DEBUGDPP("{}: discarding {}", *pgref, *this, this_instance_id);
+ pgref->client_request_orderer.remove_request(*this);
+ complete_request();
+ co_return;
}
DEBUGDPP("{}.{}: entering await_map stage",
*pgref, *this, this_instance_id);
- return ihref.enter_stage<interruptor>(client_pp(pg).await_map, *this
- ).then_interruptible([FNAME, this, this_instance_id, &pg, &ihref] {
- DEBUGDPP("{}.{}: entered await_map stage, waiting for map",
- pg, *this, this_instance_id);
- return ihref.enter_blocker(
+ co_await ihref.enter_stage<interruptor>(client_pp(pg).await_map, *this);
+ DEBUGDPP("{}.{}: entered await_map stage, waiting for map",
+ pg, *this, this_instance_id);
+ auto map_epoch = co_await interruptor::make_interruptible(
+ ihref.enter_blocker(
*this, pg.osdmap_gate, &decltype(pg.osdmap_gate)::wait_for_map,
- m->get_min_epoch(), nullptr);
- }).then_interruptible(
- [FNAME, this, this_instance_id, &pg, &ihref](auto map_epoch) {
- DEBUGDPP("{}.{}: map epoch got {}, entering wait_for_active",
- pg, *this, this_instance_id, map_epoch);
- return ihref.enter_stage<interruptor>(client_pp(pg).wait_for_active, *this);
- }).then_interruptible([FNAME, this, this_instance_id, &pg, &ihref]() {
- DEBUGDPP("{}.{}: entered wait_for_active stage, waiting for active",
- pg, *this, this_instance_id);
- return ihref.enter_blocker(
+ m->get_min_epoch(), nullptr));
+
+ DEBUGDPP("{}.{}: map epoch got {}, entering wait_for_active",
+ pg, *this, this_instance_id, map_epoch);
+ co_await ihref.enter_stage<interruptor>(client_pp(pg).wait_for_active, *this);
+
+ DEBUGDPP("{}.{}: entered wait_for_active stage, waiting for active",
+ pg, *this, this_instance_id);
+ co_await interruptor::make_interruptible(
+ ihref.enter_blocker(
*this,
pg.wait_for_active_blocker,
- &decltype(pg.wait_for_active_blocker)::wait);
- }).then_interruptible(
- [FNAME, this, pgref, this_instance_id, &ihref]() mutable
- -> interruptible_future<> {
- DEBUGDPP("{}.{}: pg active, entering process[_pg]_op",
- *pgref, *this, this_instance_id);
- if (is_pg_op()) {
- return process_pg_op(pgref);
- } else {
- return process_op(ihref, pgref, this_instance_id);
- }
- }).then_interruptible([FNAME, this, this_instance_id, pgref, &ihref] {
- DEBUGDPP("{}.{}: process[_pg]_op complete, completing handle",
- *pgref, *this, this_instance_id);
- return ihref.handle.complete();
- }).then_interruptible([FNAME, this, this_instance_id, pgref] {
- DEBUGDPP("{}.{}: process[_pg]_op complete,"
- "removing request from orderer",
- *pgref, *this, this_instance_id);
- pgref->client_request_orderer.remove_request(*this);
- complete_request();
- });
+ &decltype(pg.wait_for_active_blocker)::wait));
+
+ DEBUGDPP("{}.{}: pg active, entering process[_pg]_op",
+ *pgref, *this, this_instance_id);
+
+ co_await (is_pg_op() ? process_pg_op(pgref) :
+ process_op(ihref, pgref, this_instance_id));
+
+ DEBUGDPP("{}.{}: process[_pg]_op complete, completing handle",
+ *pgref, *this, this_instance_id);
+ co_await interruptor::make_interruptible(ihref.handle.complete());
+
+ DEBUGDPP("{}.{}: process[_pg]_op complete,"
+ "removing request from orderer",
+ *pgref, *this, this_instance_id);
+ pgref->client_request_orderer.remove_request(*this);
+ complete_request();
}
seastar::future<> ClientRequest::with_pg_process(