[](auto& op) { return ceph_osd_op_type_pg(op.op.op); });
}
-seastar::future<> ClientRequest::with_pg_process(Ref<PG> pgref)
+ClientRequest::interruptible_future<> ClientRequest::with_pg_process_interruptible(
+ Ref<PG> pgref, const unsigned this_instance_id, instance_handle_t &ihref)
+{
+ LOG_PREFIX(ClientRequest::with_pg_process);
+ DEBUGDPP(
+ "{}: same_interval_since: {}",
+ *pgref, *this, pgref->get_interval_start_epoch());
+
+ DEBUGDPP("{} start", *pgref, *this);
+ PG &pg = *pgref;
+ if (pg.can_discard_op(*m)) {
+ return shard_services->send_incremental_map(
+ std::ref(get_foreign_connection()), m->get_map_epoch()
+ ).then([FNAME, this, this_instance_id, pgref] {
+ DEBUGDPP("{}: discarding {}", *pgref, *this, this_instance_id);
+ pgref->client_request_orderer.remove_request(*this);
+ complete_request();
+ return interruptor::now();
+ });
+ }
+ DEBUGDPP("{}.{}: entering await_map stage",
+ *pgref, *this, this_instance_id);
+ return ihref.enter_stage<interruptor>(client_pp(pg).await_map, *this
+ ).then_interruptible([FNAME, this, this_instance_id, &pg, &ihref] {
+ DEBUGDPP("{}.{}: entered await_map stage, waiting for map",
+ pg, *this, this_instance_id);
+ return ihref.enter_blocker(
+ *this, pg.osdmap_gate, &decltype(pg.osdmap_gate)::wait_for_map,
+ m->get_min_epoch(), nullptr);
+ }).then_interruptible(
+ [FNAME, this, this_instance_id, &pg, &ihref](auto map_epoch) {
+ DEBUGDPP("{}.{}: map epoch got {}, entering wait_for_active",
+ pg, *this, this_instance_id, map_epoch);
+ return ihref.enter_stage<interruptor>(client_pp(pg).wait_for_active, *this);
+ }).then_interruptible([FNAME, this, this_instance_id, &pg, &ihref]() {
+ DEBUGDPP("{}.{}: entered wait_for_active stage, waiting for active",
+ pg, *this, this_instance_id);
+ return ihref.enter_blocker(
+ *this,
+ pg.wait_for_active_blocker,
+ &decltype(pg.wait_for_active_blocker)::wait);
+ }).then_interruptible(
+ [FNAME, this, pgref, this_instance_id, &ihref]() mutable
+ -> interruptible_future<> {
+ DEBUGDPP("{}.{}: pg active, entering process[_pg]_op",
+ *pgref, *this, this_instance_id);
+ if (is_pg_op()) {
+ return process_pg_op(pgref);
+ } else {
+ return process_op(ihref, pgref, this_instance_id);
+ }
+ }).then_interruptible([FNAME, this, this_instance_id, pgref, &ihref] {
+ DEBUGDPP("{}.{}: process[_pg]_op complete, completing handle",
+ *pgref, *this, this_instance_id);
+ return ihref.handle.complete();
+ }).then_interruptible([FNAME, this, this_instance_id, pgref] {
+ DEBUGDPP("{}.{}: process[_pg]_op complete,"
+ "removing request from orderer",
+ *pgref, *this, this_instance_id);
+ pgref->client_request_orderer.remove_request(*this);
+ complete_request();
+ });
+}
+
+seastar::future<> ClientRequest::with_pg_process(
+ Ref<PG> pgref)
{
ceph_assert_always(shard_services);
LOG_PREFIX(ClientRequest::with_pg_process);
auto instance_handle = get_instance_handle();
auto &ihref = *instance_handle;
return interruptor::with_interruption(
- [FNAME, this, pgref, this_instance_id, &ihref]() mutable {
- DEBUGDPP("{} start", *pgref, *this);
- PG &pg = *pgref;
- if (pg.can_discard_op(*m)) {
- return shard_services->send_incremental_map(
- std::ref(get_foreign_connection()), m->get_map_epoch()
- ).then([FNAME, this, this_instance_id, pgref] {
- DEBUGDPP("{}: discarding {}", *pgref, *this, this_instance_id);
- pgref->client_request_orderer.remove_request(*this);
- complete_request();
- return interruptor::now();
- });
- }
- DEBUGDPP("{}.{}: entering await_map stage",
- *pgref, *this, this_instance_id);
- return ihref.enter_stage<interruptor>(client_pp(pg).await_map, *this
- ).then_interruptible([FNAME, this, this_instance_id, &pg, &ihref] {
- DEBUGDPP("{}.{}: entered await_map stage, waiting for map",
- pg, *this, this_instance_id);
- return ihref.enter_blocker(
- *this, pg.osdmap_gate, &decltype(pg.osdmap_gate)::wait_for_map,
- m->get_min_epoch(), nullptr);
- }).then_interruptible(
- [FNAME, this, this_instance_id, &pg, &ihref](auto map_epoch) {
- DEBUGDPP("{}.{}: map epoch got {}, entering wait_for_active",
- pg, *this, this_instance_id, map_epoch);
- return ihref.enter_stage<interruptor>(client_pp(pg).wait_for_active, *this);
- }).then_interruptible([FNAME, this, this_instance_id, &pg, &ihref]() {
- DEBUGDPP("{}.{}: entered wait_for_active stage, waiting for active",
- pg, *this, this_instance_id);
- return ihref.enter_blocker(
- *this,
- pg.wait_for_active_blocker,
- &decltype(pg.wait_for_active_blocker)::wait);
- }).then_interruptible(
- [FNAME, this, pgref, this_instance_id, &ihref]() mutable
- -> interruptible_future<> {
- DEBUGDPP("{}.{}: pg active, entering process[_pg]_op",
- *pgref, *this, this_instance_id);
- if (is_pg_op()) {
- return process_pg_op(pgref);
- } else {
- return process_op(ihref, pgref, this_instance_id);
- }
- }).then_interruptible([FNAME, this, this_instance_id, pgref, &ihref] {
- DEBUGDPP("{}.{}: process[_pg]_op complete, completing handle",
- *pgref, *this, this_instance_id);
- return ihref.handle.complete();
- }).then_interruptible([FNAME, this, this_instance_id, pgref] {
- DEBUGDPP("{}.{}: process[_pg]_op complete,"
- "removing request from orderer",
- *pgref, *this, this_instance_id);
- pgref->client_request_orderer.remove_request(*this);
- complete_request();
- });
+ [this, pgref, this_instance_id, &ihref]() mutable {
+ return with_pg_process_interruptible(pgref, this_instance_id, ihref);
}, [FNAME, this, this_instance_id, pgref](std::exception_ptr eptr) {
DEBUGDPP("{}.{}: interrupted due to {}",
*pgref, *this, this_instance_id, eptr);