]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd/pg: SnapTrimEvent to support interrupts 56998/head
authorXuehan Xu <xuxuehan@qianxin.com>
Thu, 25 Apr 2024 05:56:25 +0000 (13:56 +0800)
committerXuehan Xu <xuxuehan@qianxin.com>
Sun, 28 Apr 2024 08:57:47 +0000 (16:57 +0800)
SnapTrimEvent operations are scheduled from `PG::on_active_actmap()`
using a `seastar::do_until` loop. This commit replaces the loop type
into an `interruptor::repeat` and SnapTrimEvent are now scheduled by
`start_operation_may_interrupt`.

Previously, `SnapTrimEvent::start` handled interruptions by returning
a `crimson::ct_error::eagain::make();`. Now, the errorator is directly
returned via the `snap_trim_event_ret_t` and interrupts the loop
described above.

As a result, interruptions originated by interval changes are now
supported by SnapTrimEvent.

Signed-off-by: Xuehan Xu <xuxuehan@qianxin.com>
src/crimson/osd/osd_operations/snaptrim_event.cc
src/crimson/osd/osd_operations/snaptrim_event.h
src/crimson/osd/pg.cc
src/crimson/osd/pg.h

index d75b8e4461a892655c1722bd63724d4acd67e7bb..71b4cb810c15a4b3c037578d2c130d8da2900b49 100644 (file)
@@ -64,101 +64,96 @@ SnapTrimEvent::snap_trim_event_ret_t
 SnapTrimEvent::start()
 {
   ShardServices &shard_services = pg->get_shard_services();
-  return interruptor::with_interruption([&shard_services, this] {
+  return enter_stage<interruptor>(
+    client_pp().wait_for_active
+  ).then_interruptible([this] {
+    return with_blocking_event<PGActivationBlocker::BlockingEvent,
+                              interruptor>([this] (auto&& trigger) {
+      return pg->wait_for_active_blocker.wait(std::move(trigger));
+    });
+  }).then_interruptible([this] {
     return enter_stage<interruptor>(
-      client_pp().wait_for_active
-    ).then_interruptible([this] {
-      return with_blocking_event<PGActivationBlocker::BlockingEvent,
-                                 interruptor>([this] (auto&& trigger) {
-        return pg->wait_for_active_blocker.wait(std::move(trigger));
-      });
-    }).then_interruptible([this] {
-      return enter_stage<interruptor>(
-        client_pp().recover_missing);
-    }).then_interruptible([] {
-      //return do_recover_missing(pg, get_target_oid());
-      return seastar::now();
-    }).then_interruptible([this] {
-      return enter_stage<interruptor>(
-        client_pp().get_obc);
-    }).then_interruptible([this] {
-      return pg->background_process_lock.lock_with_op(*this);
-    }).then_interruptible([this] {
-      return enter_stage<interruptor>(
-        client_pp().process);
-    }).then_interruptible([&shard_services, this] {
-      return interruptor::async([this] {
-        using crimson::common::local_conf;
-        const auto max =
-          local_conf().get_val<uint64_t>("osd_pg_max_concurrent_snap_trims");
-        // we need to look for at least 1 snaptrim, otherwise we'll misinterpret
-        // the nullopt below and erase snapid.
-        auto to_trim = snap_mapper.get_next_objects_to_trim(
-          snapid,
-          max);
-        if (!to_trim.has_value()) {
-          return std::vector<hobject_t>{};
-        }
-        logger().debug("{}: async almost done line {}", *this, __LINE__);
-        return std::move(*to_trim);
-      }).then_interruptible([&shard_services, this] (const auto& to_trim) {
-        if (to_trim.empty()) {
-          // the legit ENOENT -> done
-          logger().debug("{}: to_trim is empty! Stopping iteration", *this);
-         pg->background_process_lock.unlock();
-          return snap_trim_iertr::make_ready_future<seastar::stop_iteration>(
-            seastar::stop_iteration::yes);
-        }
-        return [&shard_services, this](const auto &to_trim) {
-         for (const auto& object : to_trim) {
-           logger().debug("{}: trimming {}", *this, object);
-           subop_blocker.emplace_back(
-             shard_services.start_operation_may_interrupt<
-             interruptor, SnapTrimObjSubEvent>(
-               pg,
-               object,
-               snapid));
-         }
+      client_pp().recover_missing);
+  }).then_interruptible([] {
+    //return do_recover_missing(pg, get_target_oid());
+    return seastar::now();
+  }).then_interruptible([this] {
+    return enter_stage<interruptor>(
+      client_pp().get_obc);
+  }).then_interruptible([this] {
+    return pg->background_process_lock.lock_with_op(*this);
+  }).then_interruptible([this] {
+    return enter_stage<interruptor>(
+      client_pp().process);
+  }).then_interruptible([&shard_services, this] {
+    return interruptor::async([this] {
+      using crimson::common::local_conf;
+      const auto max =
+       local_conf().get_val<uint64_t>("osd_pg_max_concurrent_snap_trims");
+      // we need to look for at least 1 snaptrim, otherwise we'll misinterpret
+      // the nullopt below and erase snapid.
+      auto to_trim = snap_mapper.get_next_objects_to_trim(
+       snapid,
+       max);
+      if (!to_trim.has_value()) {
+       return std::vector<hobject_t>{};
+      }
+      logger().debug("{}: async almost done line {}", *this, __LINE__);
+      return std::move(*to_trim);
+    }).then_interruptible([&shard_services, this] (const auto& to_trim) {
+      if (to_trim.empty()) {
+       // the legit ENOENT -> done
+       logger().debug("{}: to_trim is empty! Stopping iteration", *this);
+       pg->background_process_lock.unlock();
+       return snap_trim_iertr::make_ready_future<seastar::stop_iteration>(
+         seastar::stop_iteration::yes);
+      }
+      return [&shard_services, this](const auto &to_trim) {
+       for (const auto& object : to_trim) {
+         logger().debug("{}: trimming {}", *this, object);
+         subop_blocker.emplace_back(
+           shard_services.start_operation_may_interrupt<
+           interruptor, SnapTrimObjSubEvent>(
+             pg,
+             object,
+             snapid));
+       }
+       return interruptor::now();
+      }(to_trim).then_interruptible([this] {
+       return enter_stage<interruptor>(wait_subop);
+      }).then_interruptible([this] {
+       logger().debug("{}: awaiting completion", *this);
+       return subop_blocker.interruptible_wait_completion();
+      }).finally([this] {
+       pg->background_process_lock.unlock();
+      }).si_then([this] {
+       if (!needs_pause) {
          return interruptor::now();
-       }(to_trim).then_interruptible([this] {
-         return enter_stage<interruptor>(wait_subop);
-       }).then_interruptible([this] {
-          logger().debug("{}: awaiting completion", *this);
-          return subop_blocker.interruptible_wait_completion();
-        }).finally([this] {
-         pg->background_process_lock.unlock();
-       }).si_then([this] {
-          if (!needs_pause) {
-            return interruptor::now();
-          }
-          // let's know operators we're waiting
-          return enter_stage<interruptor>(
-            wait_trim_timer
-          ).then_interruptible([this] {
-            using crimson::common::local_conf;
-            const auto time_to_sleep =
-              local_conf().template get_val<double>("osd_snap_trim_sleep");
-            logger().debug("{}: time_to_sleep {}", *this, time_to_sleep);
-            // TODO: this logic should be more sophisticated and distinguish
-            // between SSDs, HDDs and the hybrid case
-            return seastar::sleep(
-              std::chrono::milliseconds(std::lround(time_to_sleep * 1000)));
-          });
-        }).si_then([this] {
-          logger().debug("{}: all completed", *this);
-          return snap_trim_iertr::make_ready_future<seastar::stop_iteration>(
-            seastar::stop_iteration::no);
-        });
-      }).si_then([this](auto stop) {
-        return handle.complete().then([stop] {
-          return snap_trim_iertr::make_ready_future<seastar::stop_iteration>(stop);
-        });
+       }
+       // let's know operators we're waiting
+       return enter_stage<interruptor>(
+         wait_trim_timer
+       ).then_interruptible([this] {
+         using crimson::common::local_conf;
+         const auto time_to_sleep =
+           local_conf().template get_val<double>("osd_snap_trim_sleep");
+         logger().debug("{}: time_to_sleep {}", *this, time_to_sleep);
+         // TODO: this logic should be more sophisticated and distinguish
+         // between SSDs, HDDs and the hybrid case
+         return seastar::sleep(
+           std::chrono::milliseconds(std::lround(time_to_sleep * 1000)));
+       });
+      }).si_then([this] {
+       logger().debug("{}: all completed", *this);
+       return snap_trim_iertr::make_ready_future<seastar::stop_iteration>(
+         seastar::stop_iteration::no);
+      });
+    }).si_then([this](auto stop) {
+      return handle.complete().then([stop] {
+       return snap_trim_iertr::make_ready_future<seastar::stop_iteration>(stop);
       });
     });
-  }, [this](std::exception_ptr eptr) -> snap_trim_event_ret_t {
-    logger().debug("{}: interrupted {}", *this, eptr);
-    return crimson::ct_error::eagain::make();
-  }, pg).finally([this] {
+  }).finally([this] {
     // This SnapTrimEvent op lifetime is maintained within
     // PerShardState::start_operation() implementation.
     logger().debug("{}: exit", *this);
index 66132543b13d29aedbc9fa9401d6f24a7f213ed4..9d7cde724ef781919b5673c19c510e31d52651c2 100644 (file)
@@ -35,12 +35,9 @@ public:
   using remove_or_update_iertr =
     crimson::interruptible::interruptible_errorator<
       IOInterruptCondition, remove_or_update_ertr>;
-  using snap_trim_ertr = remove_or_update_ertr::extend<
-    crimson::ct_error::eagain>;
-  using snap_trim_iertr = remove_or_update_iertr::extend<
-    crimson::ct_error::eagain>;
+  using snap_trim_iertr = remove_or_update_iertr;
   using snap_trim_event_ret_t =
-    snap_trim_ertr::future<seastar::stop_iteration>;
+    snap_trim_iertr::future<seastar::stop_iteration>;
   using snap_trim_obj_subevent_ret_t =
       remove_or_update_iertr::future<>;
 
index 93fadb8b949374176b40ebfc6dc1fddba3209aa4..4962fe7631112b66361ed12833fb1a08e3cf1f89 100644 (file)
@@ -479,55 +479,75 @@ Context *PG::on_clean()
   return nullptr;
 }
 
+PG::interruptible_future<seastar::stop_iteration> PG::trim_snap(
+  snapid_t to_trim,
+  bool needs_pause)
+{
+  return interruptor::repeat([this, to_trim, needs_pause] {
+    logger().debug("{}: going to start SnapTrimEvent, to_trim={}",
+                   *this, to_trim);
+    return shard_services.start_operation_may_interrupt<
+      interruptor, SnapTrimEvent>(
+      this,
+      snap_mapper,
+      to_trim,
+      needs_pause
+    ).second.handle_error_interruptible(
+      crimson::ct_error::enoent::handle([this] {
+        logger().error("{}: ENOENT saw, trimming stopped", *this);
+        peering_state.state_set(PG_STATE_SNAPTRIM_ERROR);
+        publish_stats_to_osd();
+        return seastar::make_ready_future<seastar::stop_iteration>(
+          seastar::stop_iteration::yes);
+      })
+    );
+  }).then_interruptible([this, trimmed=to_trim] {
+    logger().debug("{}: trimmed snap={}", *this, trimmed);
+    snap_trimq.erase(trimmed);
+    return seastar::make_ready_future<seastar::stop_iteration>(
+      seastar::stop_iteration::no);
+  });
+}
+
 void PG::on_active_actmap()
 {
   logger().debug("{}: {} snap_trimq={}", *this, __func__, snap_trimq);
   peering_state.state_clear(PG_STATE_SNAPTRIM_ERROR);
   if (peering_state.is_active() && peering_state.is_clean()) {
+    if (peering_state.state_test(PG_STATE_SNAPTRIM)) {
+      logger().debug("{}: {} already trimming.", *this, __func__);
+      return;
+    }
     // loops until snap_trimq is empty or SNAPTRIM_ERROR.
     Ref<PG> pg_ref = this;
-    std::ignore = seastar::do_until(
-      [this] { return snap_trimq.empty()
-                      || peering_state.state_test(PG_STATE_SNAPTRIM_ERROR);
-      },
-      [this] {
-        peering_state.state_set(PG_STATE_SNAPTRIM);
+    std::ignore = interruptor::with_interruption([this] {
+      return interruptor::repeat(
+        [this]() -> interruptible_future<seastar::stop_iteration> {
+          if (snap_trimq.empty()
+              || peering_state.state_test(PG_STATE_SNAPTRIM_ERROR)) {
+            return seastar::make_ready_future<seastar::stop_iteration>(
+              seastar::stop_iteration::yes);
+          }
+          peering_state.state_set(PG_STATE_SNAPTRIM);
+          publish_stats_to_osd();
+          const auto to_trim = snap_trimq.range_start();
+          const auto needs_pause = !snap_trimq.empty();
+          return trim_snap(to_trim, needs_pause);
+        }
+      ).finally([this] {
+        logger().debug("{}: PG::on_active_actmap() finished trimming",
+                       *this);
+        peering_state.state_clear(PG_STATE_SNAPTRIM);
+        peering_state.state_clear(PG_STATE_SNAPTRIM_ERROR);
         publish_stats_to_osd();
-        const auto to_trim = snap_trimq.range_start();
-        snap_trimq.erase(to_trim);
-        const auto needs_pause = !snap_trimq.empty();
-        return seastar::repeat([to_trim, needs_pause, this] {
-          logger().debug("{}: going to start SnapTrimEvent, to_trim={}",
-                         *this, to_trim);
-          return shard_services.start_operation<SnapTrimEvent>(
-            this,
-            snap_mapper,
-            to_trim,
-            needs_pause
-          ).second.handle_error(
-            crimson::ct_error::enoent::handle([this] {
-              logger().error("{}: ENOENT saw, trimming stopped", *this);
-              peering_state.state_set(PG_STATE_SNAPTRIM_ERROR);
-              publish_stats_to_osd();
-              return seastar::make_ready_future<seastar::stop_iteration>(
-                seastar::stop_iteration::yes);
-            }), crimson::ct_error::eagain::handle([this] {
-              logger().info("{}: EAGAIN saw, trimming restarted", *this);
-              return seastar::make_ready_future<seastar::stop_iteration>(
-                seastar::stop_iteration::no);
-            })
-          );
-        }).then([this, trimmed=to_trim] {
-          logger().debug("{}: trimmed snap={}", *this, trimmed);
-        });
-      }
-    ).finally([this, pg_ref=std::move(pg_ref)] {
-      logger().debug("{}: PG::on_active_actmap() finished trimming",
-                     *this);
+      });
+    }, [this](std::exception_ptr eptr) {
+      logger().debug("{}: snap trimming interrupted", *this);
       peering_state.state_clear(PG_STATE_SNAPTRIM);
-      peering_state.state_clear(PG_STATE_SNAPTRIM_ERROR);
-      publish_stats_to_osd();
-    });
+    }, pg_ref);
+  } else {
+    logger().debug("{}: pg not clean, skipping snap trim");
+    assert(!peering_state.state_test(PG_STATE_SNAPTRIM));
   }
 }
 
index 9f49422bd1d06c8ffb914337705a4231ea510df6..d705a71bb785d1f4051895cf8288779dabddf25d 100644 (file)
@@ -603,6 +603,9 @@ private:
     const hobject_t& oid,
     eversion_t& v);
   void check_blocklisted_obc_watchers(ObjectContextRef &obc);
+  interruptible_future<seastar::stop_iteration> trim_snap(
+    snapid_t to_trim,
+    bool needs_pause);
 
 private:
   PG_OSDMapGate osdmap_gate;