]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: drop the nested interruptors in snap trimming
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Tue, 13 Dec 2022 18:30:28 +0000 (18:30 +0000)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Tue, 28 Feb 2023 16:22:05 +0000 (16:22 +0000)
`crimson::interruptible` does not support that. See the
`DISABLED_nested_interruptors` unit test.

Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/crimson/osd/osd_operations/snaptrim_event.cc
src/crimson/osd/osd_operations/snaptrim_event.h

index 505092bcb53c6f63e5fbce19aafd8a5385ef1fe7..1e7dcda761afe1ee115aa0d0c976491b8b711045 100644 (file)
@@ -48,13 +48,11 @@ void SnapTrimEvent::SubOpBlocker::emplace_back(Args&&... args)
   subops.emplace_back(std::forward<Args>(args)...);
 };
 
-seastar::future<> SnapTrimEvent::SubOpBlocker::wait_completion()
+SnapTrimEvent::interruptible_future<>
+SnapTrimEvent::SubOpBlocker::wait_completion()
 {
-  auto rng = subops | std::views::values;
-  return seastar::when_all_succeed(
-    std::begin(rng), std::end(rng)
-  ).then([] (auto&&...) {
-    return seastar::now();
+  return interruptor::do_for_each(subops, [](auto&& kv) {
+    return std::move(kv.second);
   });
 }
 
@@ -149,7 +147,10 @@ seastar::future<seastar::stop_iteration> SnapTrimEvent::with_pg(
             pg,
             object,
             snapid);
-          subop_blocker.emplace_back(op->get_id(), std::move(fut));
+          subop_blocker.emplace_back(
+            op->get_id(),
+            std::move(fut).handle_error_interruptible(crimson::ct_error::assert_all{})
+          );
         }
         return enter_stage<interruptor>(
           wait_subop
@@ -194,7 +195,8 @@ CommonPGPipeline& SnapTrimObjSubEvent::pp()
   return pg->request_pg_pipeline;
 }
 
-seastar::future<> SnapTrimObjSubEvent::start()
+SnapTrimObjSubEvent::remove_or_update_iertr::future<>
+SnapTrimObjSubEvent::start()
 {
   logger().debug("{}: start", *this);
 
@@ -478,66 +480,59 @@ SnapTrimObjSubEvent::remove_or_update(
   });
 }
 
-seastar::future<> SnapTrimObjSubEvent::with_pg(
+SnapTrimObjSubEvent::remove_or_update_iertr::future<>
+SnapTrimObjSubEvent::with_pg(
   ShardServices &shard_services, Ref<PG> _pg)
 {
-  return interruptor::with_interruption([this] {
+  return enter_stage<interruptor>(
+    pp().wait_for_active
+  ).then_interruptible([this] {
+    return with_blocking_event<PGActivationBlocker::BlockingEvent,
+                               interruptor>([this] (auto&& trigger) {
+      return pg->wait_for_active_blocker.wait(std::move(trigger));
+    });
+  }).then_interruptible([this] {
     return enter_stage<interruptor>(
-      pp().wait_for_active
-    ).then_interruptible([this] {
-      return with_blocking_event<PGActivationBlocker::BlockingEvent,
-                                 interruptor>([this] (auto&& trigger) {
-        return pg->wait_for_active_blocker.wait(std::move(trigger));
-      });
-    }).then_interruptible([this] {
-      return enter_stage<interruptor>(
-        pp().recover_missing);
-    }).then_interruptible([this] {
-      //return do_recover_missing(pg, get_target_oid());
-      return seastar::now();
-    }).then_interruptible([this] {
+      pp().recover_missing);
+  }).then_interruptible([] {
+    //return do_recover_missing(pg, get_target_oid());
+    return seastar::now();
+  }).then_interruptible([this] {
+    return enter_stage<interruptor>(
+      pp().get_obc);
+  }).then_interruptible([this] {
+    logger().debug("{}: getting obc for {}", *this, coid);
+    // end of commonality
+    // with_cone_obc lock both clone's and head's obcs
+    return pg->obc_loader.with_clone_obc<RWState::RWWRITE>(coid, [this](auto clone_obc) {
+      logger().debug("{}: got clone_obc={}", *this, fmt::ptr(clone_obc.get()));
       return enter_stage<interruptor>(
-        pp().get_obc);
-    }).then_interruptible([this] {
-      logger().debug("{}: getting obc for {}", *this, coid);
-      // end of commonality
-      // with_cone_obc lock both clone's and head's obcs
-      return pg->obc_loader.with_clone_obc<RWState::RWWRITE>(coid, [this](auto clone_obc) {
-        logger().debug("{}: got clone_obc={}", *this, clone_obc);
-        return enter_stage<interruptor>(
-          pp().process
-        ).then_interruptible([this, clone_obc=std::move(clone_obc)]() mutable {
-          logger().debug("{}: processing clone_obc={}", *this, clone_obc);
-          return remove_or_update(
-            clone_obc, clone_obc->head
-          ).safe_then_unpack_interruptible([clone_obc, this]
-                                           (auto&& txn, auto&& log_entries) mutable {
-            auto [submitted, all_completed] = pg->submit_transaction(
-              std::move(clone_obc),
-              std::move(txn),
-              std::move(osd_op_p),
-              std::move(log_entries));
-            return submitted.then_interruptible(
-              [all_completed=std::move(all_completed), this] () mutable {
-              return enter_stage<interruptor>(
-                wait_repop
-              ).then_interruptible([all_completed=std::move(all_completed)] () mutable {
-                return std::move(all_completed);
-              });
+        pp().process
+      ).then_interruptible([this, clone_obc=std::move(clone_obc)]() mutable {
+        logger().debug("{}: processing clone_obc={}", *this, fmt::ptr(clone_obc.get()));
+        return remove_or_update(
+          clone_obc, clone_obc->head
+        ).safe_then_unpack_interruptible([clone_obc, this]
+                                         (auto&& txn, auto&& log_entries) mutable {
+          auto [submitted, all_completed] = pg->submit_transaction(
+            std::move(clone_obc),
+            std::move(txn),
+            std::move(osd_op_p),
+            std::move(log_entries));
+          return submitted.then_interruptible(
+            [all_completed=std::move(all_completed), this] () mutable {
+            return enter_stage<interruptor>(
+              wait_repop
+            ).then_interruptible([all_completed=std::move(all_completed)] () mutable {
+              return std::move(all_completed);
             });
           });
         });
-      }).handle_error_interruptible(PG::load_obc_ertr::all_same_way([] {
-        return seastar::now();
-      }));
-    }).then_interruptible([] {
-      // end of commonality
+      });
+    }).handle_error_interruptible(PG::load_obc_ertr::all_same_way([] {
       return seastar::now();
-    });
-  }, [this](std::exception_ptr eptr) {
-    // TODO: better debug output
-    logger().debug("{}: interrupted {}", *this, eptr);
-  }, pg);
+    }));
+  });
 }
 
 void SnapTrimObjSubEvent::print(std::ostream &lhs) const
index 902faf9c58409a4db67c9a061e8c8a593813e3df..13b42723ea281267325a49eac170fe63caf413d0 100644 (file)
@@ -54,14 +54,15 @@ private:
   struct SubOpBlocker : crimson::BlockerT<SubOpBlocker> {
     static constexpr const char* type_name = "CompoundOpBlocker";
 
-    using id_done_t = std::pair<crimson::Operation::id_t, seastar::future<>>;
+    using id_done_t = std::pair<crimson::Operation::id_t,
+                                interruptible_future<>>;
 
     void dump_detail(Formatter *f) const final;
 
     template <class... Args>
     void emplace_back(Args&&... args);
 
-    seastar::future<> wait_completion();
+    interruptible_future<> wait_completion();
   private:
     std::vector<id_done_t> subops;
   } subop_blocker;
@@ -106,6 +107,12 @@ public:
 // cannot revisite a pipeline's stage it already saw.
 class SnapTrimObjSubEvent : public PhasedOperationT<SnapTrimObjSubEvent> {
 public:
+  using remove_or_update_ertr =
+    crimson::errorator<crimson::ct_error::enoent>;
+  using remove_or_update_iertr =
+    crimson::interruptible::interruptible_errorator<
+      IOInterruptCondition, remove_or_update_ertr>;
+
   static constexpr OperationTypeCode type =
     OperationTypeCode::snaptrimobj_subevent;
 
@@ -120,8 +127,8 @@ public:
 
   void print(std::ostream &) const final;
   void dump_detail(ceph::Formatter* f) const final;
-  seastar::future<> start();
-  seastar::future<> with_pg(
+  remove_or_update_iertr::future<> start();
+  remove_or_update_iertr::future<> with_pg(
     ShardServices &shard_services, Ref<PG> pg);
 
   CommonPGPipeline& pp();
@@ -129,12 +136,6 @@ public:
 private:
   object_stat_sum_t delta_stats;
 
-  using remove_or_update_ertr =
-    crimson::errorator<crimson::ct_error::enoent>;
-  using remove_or_update_iertr =
-    crimson::interruptible::interruptible_errorator<
-      IOInterruptCondition, remove_or_update_ertr>;
-
   remove_or_update_iertr::future<> remove_clone(
     ObjectContextRef obc,
     ceph::os::Transaction& txn,