]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: bring SnapTrimObjSubEvent
authorRadosław Zarzyński <rzarzyns@redhat.com>
Tue, 6 Dec 2022 21:20:15 +0000 (22:20 +0100)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Tue, 28 Feb 2023 16:22:04 +0000 (16:22 +0000)
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/crimson/osd/osd_operation.h
src/crimson/osd/osd_operation_external_tracking.h
src/crimson/osd/osd_operations/common/pg_pipeline.h
src/crimson/osd/osd_operations/snaptrim_event.cc
src/crimson/osd/osd_operations/snaptrim_event.h
src/crimson/osd/pg.h

index 181276b07acc49802aaa5b1e7ffe643318fe0b27..8ef44ee9e78947569a18b0798e5b7a5658b6903b 100644 (file)
@@ -46,6 +46,7 @@ enum class OperationTypeCode {
   logmissing_request,
   logmissing_request_reply,
   snaptrim_event,
+  snaptrimobj_subevent,
   last_op
 };
 
@@ -62,6 +63,7 @@ static constexpr const char* const OP_NAMES[] = {
   "logmissing_request",
   "logmissing_request_reply",
   "snaptrim_event",
+  "snaptrimobj_subevent",
 };
 
 // prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry:
index 2e0cbd01c92a2d9bd75b75fbc5ab1ba7d0c218bf..4b6dbf4b71007fe62a351d8c9fe3057e0795c7c3 100644 (file)
@@ -297,4 +297,11 @@ struct EventBackendRegistry<osd::PGAdvanceMap> {
   }
 };
 
+template <>
+struct EventBackendRegistry<osd::SnapTrimObjSubEvent> {
+  static std::tuple<> get_backends() {
+    return {};
+  }
+};
+
 } // namespace crimson
index a5c0c9fbcb3d344b835496b94790d9156aeb94e2..58fa07b8b4d2537c5c978d0a0d2f71799664ce32 100644 (file)
@@ -12,6 +12,7 @@ class CommonPGPipeline {
 protected:
   friend class InternalClientRequest;
   friend class SnapTrimEvent;
+  friend class SnapTrimObjSubEvent;
 
   struct WaitForActive : OrderedExclusivePhaseT<WaitForActive> {
     static constexpr auto type_name = "CommonPGPipeline:::wait_for_active";
index c3ad65257f6f576723d14a7a3596dee5b9bd6dad..dd089b8747cdb42ab195add356b81fec64a8035b 100644 (file)
@@ -5,6 +5,7 @@
 
 #include "crimson/osd/osd_operations/snaptrim_event.h"
 #include "crimson/osd/pg.h"
+#include "include/expected.hpp"
 
 namespace {
   seastar::logger& logger() {
@@ -19,6 +20,13 @@ namespace crimson {
       return {};
     }
   };
+
+  template <>
+  struct EventBackendRegistry<osd::SnapTrimObjSubEvent> {
+    static std::tuple<> get_backends() {
+      return {};
+    }
+  };
 }
 
 namespace crimson::osd {
@@ -136,6 +144,11 @@ seastar::future<seastar::stop_iteration> SnapTrimEvent::with_pg(
         }
         for (const auto& object : to_trim) {
           logger().debug("{}: trimming {}", *this, object);
+          auto [op, fut] = shard_services.start_operation<SnapTrimObjSubEvent>(
+            pg,
+            object,
+            snapid);
+          subop_blocker.emplace_back(op->get_id(), std::move(fut));
         }
         return subop_blocker.wait_completion().then([] {
           return seastar::make_ready_future<seastar::stop_iteration>(
@@ -151,4 +164,336 @@ seastar::future<seastar::stop_iteration> SnapTrimEvent::with_pg(
   }, pg);
 }
 
+
+CommonPGPipeline& SnapTrimObjSubEvent::pp()
+{
+  return pg->request_pg_pipeline;
+}
+
+seastar::future<> SnapTrimObjSubEvent::start()
+{
+  logger().debug("{}: start", *this);
+
+  IRef ref = this;
+  auto maybe_delay = seastar::now();
+  if (auto delay = 0; delay) {
+    maybe_delay = seastar::sleep(
+      std::chrono::milliseconds(std::lround(delay * 1000)));
+  }
+  return maybe_delay.then([this] {
+    return with_pg(pg->get_shard_services(), pg);
+  }).finally([ref=std::move(ref), this] {
+    logger().debug("{}: complete", *ref);
+    return handle.complete();
+  });
+}
+
+tl::expected<SnapTrimObjSubEvent::remove_or_update_ret_t, int>
+SnapTrimObjSubEvent::remove_or_update(
+  ObjectContextRef obc,
+  ObjectContextRef head_obc)
+{
+  ceph::os::Transaction txn{};
+  std::vector<pg_log_entry_t> log_entries{};
+
+  SnapSet& snapset = obc->ssc->snapset;
+  auto citer = snapset.clone_snaps.find(coid.snap);
+  if (citer == snapset.clone_snaps.end()) {
+    logger().error("{}: No clone_snaps in snapset {} for object {}",
+                   *this, snapset, coid);
+    return tl::unexpected{-ENOENT};
+  }
+  const auto& old_snaps = citer->second;
+  if (old_snaps.empty()) {
+    logger().error("{}: no object info snaps for object {}",
+                   *this, coid);
+    return tl::unexpected{-ENOENT};
+  }
+  if (snapset.seq == 0) {
+    logger().error("{}: no snapset.seq for object {}",
+                   *this, coid);
+    return tl::unexpected{-ENOENT};
+  }
+  const OSDMapRef& osdmap = pg->get_osdmap();
+  std::set<snapid_t> new_snaps;
+  for (const auto& old_snap : old_snaps) {
+    if (!osdmap->in_removed_snaps_queue(pg->get_info().pgid.pgid.pool(),
+                                        old_snap)
+        && old_snap != snap_to_trim) {
+      new_snaps.insert(old_snap);
+    }
+  }
+
+  std::vector<snapid_t>::iterator p = snapset.clones.end();
+  if (new_snaps.empty()) {
+    p = std::find(snapset.clones.begin(), snapset.clones.end(), coid.snap);
+    if (p == snapset.clones.end()) {
+      logger().error("{}: Snap {} not in clones",
+                     *this, coid.snap);
+      return tl::unexpected{-ENOENT};
+    }
+  }
+  int64_t num_objects_before_trim = delta_stats.num_objects;
+  osd_op_p.at_version = pg->next_version();
+  object_info_t &coi = obc->obs.oi;
+  if (new_snaps.empty()) {
+    // remove clone
+    logger().info("{}: {} snaps {} -> {} ... deleting",
+                  *this, coid, old_snaps, new_snaps);
+
+    // ...from snapset
+    assert(p != snapset.clones.end());
+
+    snapid_t last = coid.snap;
+    delta_stats.num_bytes -= snapset.get_clone_bytes(last);
+
+    if (p != snapset.clones.begin()) {
+      // not the oldest... merge overlap into next older clone
+      std::vector<snapid_t>::iterator n = p - 1;
+      hobject_t prev_coid = coid;
+      prev_coid.snap = *n;
+
+      // does the classical OSD really need is_present_clone(prev_coid)?
+      delta_stats.num_bytes -= snapset.get_clone_bytes(*n);
+      snapset.clone_overlap[*n].intersection_of(
+       snapset.clone_overlap[*p]);
+      delta_stats.num_bytes += snapset.get_clone_bytes(*n);
+    }
+    delta_stats.num_objects--;
+    if (coi.is_dirty()) {
+      delta_stats.num_objects_dirty--;
+    }
+    if (coi.is_omap()) {
+      delta_stats.num_objects_omap--;
+    }
+    if (coi.is_whiteout()) {
+      logger().debug("{}: trimming whiteout on {}",
+                     *this, coid);
+      delta_stats.num_whiteouts--;
+    }
+    delta_stats.num_object_clones--;
+
+    obc->obs.exists = false;
+
+    snapset.clones.erase(p);
+    snapset.clone_overlap.erase(last);
+    snapset.clone_size.erase(last);
+    snapset.clone_snaps.erase(last);
+
+    log_entries.emplace_back(
+      pg_log_entry_t{
+       pg_log_entry_t::DELETE,
+       coid,
+        osd_op_p.at_version,
+       coi.version,
+       0,
+       osd_reqid_t(),
+       coi.mtime, // will be replaced in `apply_to()`
+       0}
+      );
+    txn.remove(
+      pg->get_collection_ref()->get_cid(),
+      ghobject_t{coid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD});
+
+    coi = object_info_t(coid);
+
+  } else {
+    // save adjusted snaps for this object
+    logger().info("{}: {} snaps {} -> {}",
+                  *this, coid, old_snaps, new_snaps);
+    snapset.clone_snaps[coid.snap] =
+      std::vector<snapid_t>(new_snaps.rbegin(), new_snaps.rend());
+    // we still do a 'modify' event on this object just to trigger a
+    // snapmapper.update ... :(
+
+    coi.prior_version = coi.version;
+    coi.version = osd_op_p.at_version;
+    ceph::bufferlist bl;
+    encode(coi, bl, pg->get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
+    txn.setattr(
+      pg->get_collection_ref()->get_cid(),
+      ghobject_t{coid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD},
+      OI_ATTR,
+      bl);
+    log_entries.emplace_back(
+      pg_log_entry_t{
+       pg_log_entry_t::MODIFY,
+       coid,
+       coi.version,
+       coi.prior_version,
+       0,
+       osd_reqid_t(),
+       coi.mtime,
+       0}
+      );
+  }
+
+  osd_op_p.at_version = pg->next_version();
+
+  // save head snapset
+  logger().debug("{}: {} new snapset {} on {}",
+                 *this, coid, snapset, head_obc->obs.oi);
+  const auto head_oid = coid.get_head();
+  if (snapset.clones.empty() && head_obc->obs.oi.is_whiteout()) {
+    // NOTE: this arguably constitutes minor interference with the
+    // tiering agent if this is a cache tier since a snap trim event
+    // is effectively evicting a whiteout we might otherwise want to
+    // keep around.
+    logger().info("{}: {} removing {}",
+                  *this, coid, head_oid);
+    log_entries.emplace_back(
+      pg_log_entry_t{
+       pg_log_entry_t::DELETE,
+       head_oid,
+       osd_op_p.at_version,
+       head_obc->obs.oi.version,
+       0,
+       osd_reqid_t(),
+       coi.mtime, // will be replaced in `apply_to()`
+       0}
+      );
+    logger().info("{}: remove snap head", *this);
+    object_info_t& oi = head_obc->obs.oi;
+    delta_stats.num_objects--;
+    if (oi.is_dirty()) {
+      delta_stats.num_objects_dirty--;
+    }
+    if (oi.is_omap()) {
+      delta_stats.num_objects_omap--;
+    }
+    if (oi.is_whiteout()) {
+      logger().debug("{}: trimming whiteout on {}",
+                     *this, oi.soid);
+      delta_stats.num_whiteouts--;
+    }
+    head_obc->obs.exists = false;
+    head_obc->obs.oi = object_info_t(head_oid);
+    txn.remove(pg->get_collection_ref()->get_cid(),
+              ghobject_t{head_oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD});
+  } else {
+    snapset.snaps.clear();
+    logger().info("{}: writing updated snapset on {}, snapset is {}",
+                  *this, head_oid, snapset);
+    log_entries.emplace_back(
+      pg_log_entry_t{
+       pg_log_entry_t::MODIFY,
+       head_oid,
+       osd_op_p.at_version,
+       head_obc->obs.oi.version,
+       0,
+       osd_reqid_t(),
+       coi.mtime,
+       0}
+      );
+
+    head_obc->obs.oi.prior_version = head_obc->obs.oi.version;
+    head_obc->obs.oi.version = osd_op_p.at_version;
+
+    std::map<std::string, ceph::bufferlist, std::less<>> attrs;
+    ceph::bufferlist bl;
+    encode(snapset, bl);
+    attrs[SS_ATTR] = std::move(bl);
+
+    bl.clear();
+    encode(head_obc->obs.oi, bl,
+           pg->get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
+    attrs[OI_ATTR] = std::move(bl);
+    txn.setattrs(
+      pg->get_collection_ref()->get_cid(),
+      ghobject_t{head_oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD},
+      attrs);
+  }
+
+  // Stats reporting - Set number of objects trimmed
+  if (num_objects_before_trim > delta_stats.num_objects) {
+    int64_t num_objects_trimmed =
+      num_objects_before_trim - delta_stats.num_objects;
+    //add_objects_trimmed_count(num_objects_trimmed);
+  }
+  return std::make_pair(std::move(txn), std::move(log_entries));
+}
+
+seastar::future<> SnapTrimObjSubEvent::with_pg(
+  ShardServices &shard_services, Ref<PG> _pg)
+{
+  return interruptor::with_interruption([this] {
+    return enter_stage<interruptor>(
+      pp().wait_for_active
+    ).then_interruptible([this] {
+      return with_blocking_event<PGActivationBlocker::BlockingEvent,
+                                 interruptor>([this] (auto&& trigger) {
+        return pg->wait_for_active_blocker.wait(std::move(trigger));
+      });
+    }).then_interruptible([this] {
+      return enter_stage<interruptor>(
+        pp().recover_missing);
+    }).then_interruptible([this] {
+      //return do_recover_missing(pg, get_target_oid());
+      return seastar::now();
+    }).then_interruptible([this] {
+      return enter_stage<interruptor>(
+        pp().get_obc);
+    }).then_interruptible([this] {
+      logger().debug("{}: getting obc for {}", *this, coid);
+      // end of commonality
+      // with_cone_obc lock both clone's and head's obcs
+      return pg->obc_loader.with_clone_obc<RWState::RWWRITE>(coid, [this](auto clone_obc) {
+        logger().debug("{}: got clone_obc={}", *this, clone_obc);
+        return enter_stage<interruptor>(
+          pp().process
+        ).then_interruptible([this, clone_obc=std::move(clone_obc)]() mutable {
+          logger().debug("{}: processing clone_obc={}", *this, clone_obc);
+          auto head_obc = clone_obc->head;
+          return interruptor::async([=, this]() mutable {
+            if (auto ret = remove_or_update(clone_obc, head_obc);
+                !ret.has_value()) {
+              logger().error("{}: trimmig error {}",
+                             *this, ret.error());
+             //pg->state_set(PG_STATE_SNAPTRIM_ERROR);
+            } else {
+              auto [txn, log_entries] = std::move(ret).value();
+              auto [submitted, all_completed] = pg->submit_transaction(
+                std::move(clone_obc),
+                std::move(txn),
+                std::move(osd_op_p),
+                std::move(log_entries));
+              submitted.get();
+              all_completed.get();
+            }
+          }).then_interruptible([this] {
+            return enter_stage<interruptor>(
+              wait_repop
+            );
+          }).then_interruptible([this, clone_obc=std::move(clone_obc)] {
+            return PG::load_obc_iertr::now();
+          });
+        });
+      }).handle_error_interruptible(PG::load_obc_ertr::all_same_way([] {
+        return seastar::now();
+      }));
+    }).then_interruptible([] {
+      // end of commonality
+      return seastar::now();
+    });
+  }, [this](std::exception_ptr eptr) {
+    // TODO: better debug output
+    logger().debug("{}: interrupted {}", *this, eptr);
+  }, pg);
+}
+
+void SnapTrimObjSubEvent::print(std::ostream &lhs) const
+{
+  lhs << "SnapTrimObjSubEvent("
+      << "coid=" << coid
+      << " snapid=" << snap_to_trim
+      << ")";
+}
+
+void SnapTrimObjSubEvent::dump_detail(Formatter *f) const
+{
+  f->open_object_section("SnapTrimObjSubEvent");
+  f->dump_stream("coid") << coid;
+  f->close_section();
+}
+
 } // namespace crimson::osd
index bfda550f999621ecda6e0d9b61011e4d5e9a5a39..4ec54af66a39034e453592bdf5ef3aee047eaaca 100644 (file)
@@ -80,8 +80,69 @@ public:
   > tracking_events;
 };
 
+// remove single object. a SnapTrimEvent can create multiple subrequests.
+// the division of labour is needed because of the restriction that an Op
+// cannot revisite a pipeline's stage it already saw.
+class SnapTrimObjSubEvent : public PhasedOperationT<SnapTrimObjSubEvent> {
+public:
+  static constexpr OperationTypeCode type =
+    OperationTypeCode::snaptrimobj_subevent;
+
+  SnapTrimObjSubEvent(
+    Ref<PG> pg,
+    const hobject_t& coid,
+    snapid_t snap_to_trim)
+  : pg(std::move(pg)),
+    coid(coid),
+    snap_to_trim(snap_to_trim) {
+  }
+
+  void print(std::ostream &) const final;
+  void dump_detail(ceph::Formatter* f) const final;
+  seastar::future<> start();
+  seastar::future<> with_pg(
+    ShardServices &shard_services, Ref<PG> pg);
+
+  CommonPGPipeline& pp();
+
+private:
+  object_stat_sum_t delta_stats;
+
+  using remove_or_update_ret_t =
+    std::pair<ceph::os::Transaction, std::vector<pg_log_entry_t>>;
+  tl::expected<remove_or_update_ret_t, int>
+  remove_or_update(ObjectContextRef obc, ObjectContextRef head_obc);
+
+  // we don't need to synchronize with other instances started by
+  // SnapTrimEvent; it's here for the sake of op tracking.
+  struct WaitRepop : OrderedConcurrentPhaseT<WaitRepop> {
+    static constexpr auto type_name = "SnapTrimObjSubEvent::wait_repop";
+  } wait_repop;
+
+  Ref<PG> pg;
+  PipelineHandle handle;
+  osd_op_params_t osd_op_p;
+  const hobject_t coid;
+  const snapid_t snap_to_trim;
+
+public:
+  PipelineHandle& get_handle() { return handle; }
+
+  std::tuple<
+    StartEvent,
+    CommonPGPipeline::WaitForActive::BlockingEvent,
+    PGActivationBlocker::BlockingEvent,
+    CommonPGPipeline::RecoverMissing::BlockingEvent,
+    CommonPGPipeline::GetOBC::BlockingEvent,
+    CommonPGPipeline::Process::BlockingEvent,
+    WaitRepop::BlockingEvent,
+    CompletionEvent
+  > tracking_events;
+};
+
 } // namespace crimson::osd
 
 #if FMT_VERSION >= 90000
 template <> struct fmt::formatter<crimson::osd::SnapTrimEvent> : fmt::ostream_formatter {};
+template <> struct fmt::formatter<crimson::osd::SnapTrimObjSubEvent> : fmt::ostream_formatter {};
 #endif
index f8be8e1cb5ed2ac620d1f1d962c2e123f10437a4..48e4d1fd210c70c05ac701c8d3425e2244f3ef52 100644 (file)
@@ -721,6 +721,7 @@ private:
   friend class InternalClientRequest;
   friend class WatchTimeoutRequest;
   friend class SnapTrimEvent;
+  friend class SnapTrimObjSubEvent;
 private:
   seastar::future<bool> find_unfound() {
     return seastar::make_ready_future<bool>(true);