]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/.../client_request: factor out with_pg_interruptible
authorSamuel Just <sjust@redhat.com>
Mon, 5 Feb 2024 00:04:49 +0000 (16:04 -0800)
committerMatan Breizman <mbreizma@redhat.com>
Thu, 16 May 2024 11:39:18 +0000 (14:39 +0300)
Signed-off-by: Samuel Just <sjust@redhat.com>
(cherry picked from commit b521cc3d822c39084f72c4f22f621928a184d731)

src/crimson/osd/osd_operations/client_request.cc
src/crimson/osd/osd_operations/client_request.h

index 00ed92adf650a803e31e5314baac1619afeef475..9b9de50c149d3176512e77eeb8258c1178d9a226 100644 (file)
@@ -98,7 +98,72 @@ bool ClientRequest::is_pg_op() const
     [](auto& op) { return ceph_osd_op_type_pg(op.op.op); });
 }
 
-seastar::future<> ClientRequest::with_pg_process(Ref<PG> pgref)
+ClientRequest::interruptible_future<> ClientRequest::with_pg_process_interruptible(
+  Ref<PG> pgref, const unsigned this_instance_id, instance_handle_t &ihref)
+{
+  LOG_PREFIX(ClientRequest::with_pg_process);
+  DEBUGDPP(
+    "{}: same_interval_since: {}",
+    *pgref, *this, pgref->get_interval_start_epoch());
+
+  DEBUGDPP("{} start", *pgref, *this);
+  PG &pg = *pgref;
+  if (pg.can_discard_op(*m)) {
+    return shard_services->send_incremental_map(
+      std::ref(get_foreign_connection()), m->get_map_epoch()
+    ).then([FNAME, this, this_instance_id, pgref] {
+      DEBUGDPP("{}: discarding {}", *pgref, *this, this_instance_id);
+      pgref->client_request_orderer.remove_request(*this);
+      complete_request();
+      return interruptor::now();
+    });
+  }
+  DEBUGDPP("{}.{}: entering await_map stage",
+          *pgref, *this, this_instance_id);
+  return ihref.enter_stage<interruptor>(client_pp(pg).await_map, *this
+  ).then_interruptible([FNAME, this, this_instance_id, &pg, &ihref] {
+    DEBUGDPP("{}.{}: entered await_map stage, waiting for map",
+            pg, *this, this_instance_id);
+    return ihref.enter_blocker(
+      *this, pg.osdmap_gate, &decltype(pg.osdmap_gate)::wait_for_map,
+      m->get_min_epoch(), nullptr);
+  }).then_interruptible(
+    [FNAME, this, this_instance_id, &pg, &ihref](auto map_epoch) {
+    DEBUGDPP("{}.{}: map epoch got {}, entering wait_for_active",
+            pg, *this, this_instance_id, map_epoch);
+    return ihref.enter_stage<interruptor>(client_pp(pg).wait_for_active, *this);
+  }).then_interruptible([FNAME, this, this_instance_id, &pg, &ihref]() {
+    DEBUGDPP("{}.{}: entered wait_for_active stage, waiting for active",
+            pg, *this, this_instance_id);
+    return ihref.enter_blocker(
+      *this,
+      pg.wait_for_active_blocker,
+      &decltype(pg.wait_for_active_blocker)::wait);
+  }).then_interruptible(
+    [FNAME, this, pgref, this_instance_id, &ihref]() mutable
+    -> interruptible_future<> {
+    DEBUGDPP("{}.{}: pg active, entering process[_pg]_op",
+            *pgref, *this, this_instance_id);
+    if (is_pg_op()) {
+      return process_pg_op(pgref);
+    } else {
+      return process_op(ihref, pgref, this_instance_id);
+    }
+  }).then_interruptible([FNAME, this, this_instance_id, pgref, &ihref] {
+    DEBUGDPP("{}.{}: process[_pg]_op complete, completing handle",
+            *pgref, *this, this_instance_id);
+    return ihref.handle.complete();
+  }).then_interruptible([FNAME, this, this_instance_id, pgref] {
+    DEBUGDPP("{}.{}: process[_pg]_op complete,"
+            "removing request from orderer",
+            *pgref, *this, this_instance_id);
+    pgref->client_request_orderer.remove_request(*this);
+    complete_request();
+  });
+}
+
+seastar::future<> ClientRequest::with_pg_process(
+  Ref<PG> pgref)
 {
   ceph_assert_always(shard_services);
   LOG_PREFIX(ClientRequest::with_pg_process);
@@ -110,61 +175,8 @@ seastar::future<> ClientRequest::with_pg_process(Ref<PG> pgref)
   auto instance_handle = get_instance_handle();
   auto &ihref = *instance_handle;
   return interruptor::with_interruption(
-    [FNAME, this, pgref, this_instance_id, &ihref]() mutable {
-      DEBUGDPP("{} start", *pgref, *this);
-      PG &pg = *pgref;
-      if (pg.can_discard_op(*m)) {
-       return shard_services->send_incremental_map(
-         std::ref(get_foreign_connection()), m->get_map_epoch()
-       ).then([FNAME, this, this_instance_id, pgref] {
-         DEBUGDPP("{}: discarding {}", *pgref, *this, this_instance_id);
-         pgref->client_request_orderer.remove_request(*this);
-         complete_request();
-         return interruptor::now();
-       });
-      }
-      DEBUGDPP("{}.{}: entering await_map stage",
-              *pgref, *this, this_instance_id);
-      return ihref.enter_stage<interruptor>(client_pp(pg).await_map, *this
-      ).then_interruptible([FNAME, this, this_instance_id, &pg, &ihref] {
-       DEBUGDPP("{}.{}: entered await_map stage, waiting for map",
-                pg, *this, this_instance_id);
-       return ihref.enter_blocker(
-         *this, pg.osdmap_gate, &decltype(pg.osdmap_gate)::wait_for_map,
-         m->get_min_epoch(), nullptr);
-      }).then_interruptible(
-       [FNAME, this, this_instance_id, &pg, &ihref](auto map_epoch) {
-       DEBUGDPP("{}.{}: map epoch got {}, entering wait_for_active",
-                pg, *this, this_instance_id, map_epoch);
-       return ihref.enter_stage<interruptor>(client_pp(pg).wait_for_active, *this);
-      }).then_interruptible([FNAME, this, this_instance_id, &pg, &ihref]() {
-       DEBUGDPP("{}.{}: entered wait_for_active stage, waiting for active",
-                pg, *this, this_instance_id);
-       return ihref.enter_blocker(
-         *this,
-         pg.wait_for_active_blocker,
-         &decltype(pg.wait_for_active_blocker)::wait);
-       }).then_interruptible(
-         [FNAME, this, pgref, this_instance_id, &ihref]() mutable
-         -> interruptible_future<> {
-       DEBUGDPP("{}.{}: pg active, entering process[_pg]_op",
-                *pgref, *this, this_instance_id);
-       if (is_pg_op()) {
-         return process_pg_op(pgref);
-       } else {
-         return process_op(ihref, pgref, this_instance_id);
-       }
-      }).then_interruptible([FNAME, this, this_instance_id, pgref, &ihref] {
-       DEBUGDPP("{}.{}: process[_pg]_op complete, completing handle",
-                *pgref, *this, this_instance_id);
-        return ihref.handle.complete();
-      }).then_interruptible([FNAME, this, this_instance_id, pgref] {
-       DEBUGDPP("{}.{}: process[_pg]_op complete,"
-                "removing request from orderer",
-                *pgref, *this, this_instance_id);
-       pgref->client_request_orderer.remove_request(*this);
-       complete_request();
-      });
+    [this, pgref, this_instance_id, &ihref]() mutable {
+      return with_pg_process_interruptible(pgref, this_instance_id, ihref);
     }, [FNAME, this, this_instance_id, pgref](std::exception_ptr eptr) {
       DEBUGDPP("{}.{}: interrupted due to {}",
               *pgref, *this, this_instance_id, eptr);
index 3bc81fd4d44410ad98523de5b2830182f45266df..67c3c9b1228d0fb75044a58c00b2fde32b8b3194 100644 (file)
@@ -253,6 +253,9 @@ public:
     r_conn = make_local_shared_foreign(std::move(conn));
   }
 
+  interruptible_future<> with_pg_process_interruptible(
+    Ref<PG> pgref, const unsigned instance_id, instance_handle_t &ihref);
+
   seastar::future<> with_pg_process(Ref<PG> pg);
 
 public: