]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd/pg: set PG::projected_last_update when submitting
authorXuehan Xu <xuxuehan@qianxin.com>
Sat, 19 Aug 2023 07:08:20 +0000 (15:08 +0800)
committerXuehan Xu <xuxuehan@qianxin.com>
Tue, 7 Nov 2023 09:10:55 +0000 (17:10 +0800)
transactions

Otherwise, PG::projected_last_update's epoch is always outdated, which
would cause failure of asserts in the backfilling code

Signed-off-by: Xuehan Xu <xuxuehan@qianxin.com>
src/crimson/osd/ops_executer.cc
src/crimson/osd/osd_operations/snaptrim_event.cc
src/crimson/osd/osd_operations/snaptrim_event.h
src/crimson/osd/pg.cc
src/crimson/osd/pg.h

index 040870203bd950ea0d4785a0eeb127a6224d0c09..868a7a3fb13d25c1b7ff3c2cd82768b9984df900 100644 (file)
@@ -797,7 +797,7 @@ void OpsExecuter::fill_op_params_bump_pg_version()
 {
   osd_op_params->req_id = msg->get_reqid();
   osd_op_params->mtime = msg->get_mtime();
-  osd_op_params->at_version = pg->next_version();
+  osd_op_params->at_version = pg->get_next_version();
   osd_op_params->pg_trim_to = pg->get_pg_trim_to();
   osd_op_params->min_last_complete_ondisk = pg->get_min_last_complete_ondisk();
   osd_op_params->last_complete = pg->get_info().last_complete;
@@ -820,6 +820,7 @@ std::vector<pg_log_entry_t> OpsExecuter::prepare_transaction(
     osd_op_params->req_id,
     osd_op_params->mtime,
     op_info.allows_returnvec() && !ops.empty() ? ops.back().rval.code : 0);
+  osd_op_params->at_version.version++;
   if (op_info.allows_returnvec()) {
     // also the per-op values are recorded in the pg log
     log_entries.back().set_op_returns(ops);
@@ -957,6 +958,7 @@ std::unique_ptr<OpsExecuter::CloningContext> OpsExecuter::execute_clone(
     initial_obs.oi.mtime, // will be replaced in `apply_to()`
     0
   };
+  osd_op_params->at_version.version++;
   encode(cloned_snaps, cloning_ctx->log_entry.snaps);
 
   // TODO: update most recent clone_overlap and usage stats
@@ -1012,7 +1014,7 @@ std::pair<object_info_t, ObjectContextRef> OpsExecuter::prepare_clone(
   const hobject_t& coid)
 {
   object_info_t static_snap_oi(coid);
-  static_snap_oi.version = pg->next_version();
+  static_snap_oi.version = osd_op_params->at_version;
   static_snap_oi.prior_version = obc->obs.oi.version;
   static_snap_oi.copy_user_bits(obc->obs.oi);
   if (static_snap_oi.is_whiteout()) {
index ffd43d736ad393e038af37d03ecaa65440c3e9b7..263cdab37a8a0d4305af8ba610fe70343e61d114 100644 (file)
@@ -211,8 +211,7 @@ SnapTrimObjSubEvent::remove_or_update_iertr::future<>
 SnapTrimObjSubEvent::remove_clone(
   ObjectContextRef obc,
   ObjectContextRef head_obc,
-  ceph::os::Transaction& txn,
-  std::vector<pg_log_entry_t>& log_entries
+  ceph::os::Transaction& txn
 ) {
   const auto p = std::find(
     head_obc->ssc->snapset.clones.begin(),
@@ -259,17 +258,14 @@ SnapTrimObjSubEvent::remove_clone(
   head_obc->ssc->snapset.clone_size.erase(last);
   head_obc->ssc->snapset.clone_snaps.erase(last);
 
-  log_entries.emplace_back(
-    pg_log_entry_t{
-      pg_log_entry_t::DELETE,
-      coid,
-      osd_op_p.at_version,
-      obc->obs.oi.version,
-      0,
-      osd_reqid_t(),
-      obc->obs.oi.mtime, // will be replaced in `apply_to()`
-      0}
-    );
+  add_log_entry(
+    pg_log_entry_t::DELETE,
+    coid,
+    obc->obs.oi.version,
+    0,
+    osd_reqid_t(),
+    obc->obs.oi.mtime, // will be replaced in `apply_to()`
+    0);
   txn.remove(
     pg->get_collection_ref()->get_cid(),
     ghobject_t{coid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD});
@@ -280,8 +276,7 @@ SnapTrimObjSubEvent::remove_clone(
 void SnapTrimObjSubEvent::remove_head_whiteout(
   ObjectContextRef obc,
   ObjectContextRef head_obc,
-  ceph::os::Transaction& txn,
-  std::vector<pg_log_entry_t>& log_entries
+  ceph::os::Transaction& txn
 ) {
   // NOTE: this arguably constitutes minor interference with the
   // tiering agent if this is a cache tier since a snap trim event
@@ -290,17 +285,14 @@ void SnapTrimObjSubEvent::remove_head_whiteout(
   const auto head_oid = coid.get_head();
   logger().info("{}: {} removing {}",
                 *this, coid, head_oid);
-  log_entries.emplace_back(
-    pg_log_entry_t{
-      pg_log_entry_t::DELETE,
-      head_oid,
-      osd_op_p.at_version,
-      head_obc->obs.oi.version,
-      0,
-      osd_reqid_t(),
-      obc->obs.oi.mtime, // will be replaced in `apply_to()`
-      0}
-    );
+  add_log_entry(
+    pg_log_entry_t::DELETE,
+    head_oid,
+    head_obc->obs.oi.version,
+    0,
+    osd_reqid_t(),
+    obc->obs.oi.mtime, // will be replaced in `apply_to()`
+    0);
   logger().info("{}: remove snap head", *this);
   object_info_t& oi = head_obc->obs.oi;
   delta_stats.num_objects--;
@@ -326,8 +318,7 @@ SnapTrimObjSubEvent::adjust_snaps(
   ObjectContextRef obc,
   ObjectContextRef head_obc,
   const std::set<snapid_t>& new_snaps,
-  ceph::os::Transaction& txn,
-  std::vector<pg_log_entry_t>& log_entries
+  ceph::os::Transaction& txn
 ) {
   head_obc->ssc->snapset.clone_snaps[coid.snap] =
     std::vector<snapid_t>(new_snaps.rbegin(), new_snaps.rend());
@@ -345,17 +336,14 @@ SnapTrimObjSubEvent::adjust_snaps(
     ghobject_t{coid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD},
     OI_ATTR,
     bl);
-  log_entries.emplace_back(
-    pg_log_entry_t{
-      pg_log_entry_t::MODIFY,
-      coid,
-      obc->obs.oi.version,
-      obc->obs.oi.prior_version,
-      0,
-      osd_reqid_t(),
-      obc->obs.oi.mtime,
-      0}
-    );
+  add_log_entry(
+    pg_log_entry_t::MODIFY,
+    coid,
+    obc->obs.oi.prior_version,
+    0,
+    osd_reqid_t(),
+    obc->obs.oi.mtime,
+    0);
   return OpsExecuter::snap_map_modify(
     coid, new_snaps, pg->snap_mapper, pg->osdriver, txn);
 }
@@ -363,23 +351,19 @@ SnapTrimObjSubEvent::adjust_snaps(
 void SnapTrimObjSubEvent::update_head(
   ObjectContextRef obc,
   ObjectContextRef head_obc,
-  ceph::os::Transaction& txn,
-  std::vector<pg_log_entry_t>& log_entries
+  ceph::os::Transaction& txn
 ) {
   const auto head_oid = coid.get_head();
   logger().info("{}: writing updated snapset on {}, snapset is {}",
                 *this, head_oid, head_obc->ssc->snapset);
-  log_entries.emplace_back(
-    pg_log_entry_t{
-      pg_log_entry_t::MODIFY,
-      head_oid,
-      osd_op_p.at_version,
-      head_obc->obs.oi.version,
-      0,
-      osd_reqid_t(),
-      obc->obs.oi.mtime,
-      0}
-    );
+  add_log_entry(
+    pg_log_entry_t::MODIFY,
+    head_oid,
+    head_obc->obs.oi.version,
+    0,
+    osd_reqid_t(),
+    obc->obs.oi.mtime,
+    0);
 
   head_obc->obs.oi.prior_version = head_obc->obs.oi.version;
   head_obc->obs.oi.version = osd_op_p.at_version;
@@ -399,8 +383,7 @@ void SnapTrimObjSubEvent::update_head(
     attrs);
 }
 
-SnapTrimObjSubEvent::remove_or_update_iertr::future<
-  SnapTrimObjSubEvent::remove_or_update_ret_t>
+SnapTrimObjSubEvent::remove_or_update_iertr::future<ceph::os::Transaction>
 SnapTrimObjSubEvent::remove_or_update(
   ObjectContextRef obc,
   ObjectContextRef head_obc)
@@ -433,45 +416,41 @@ SnapTrimObjSubEvent::remove_or_update(
   }
 
   return seastar::do_with(ceph::os::Transaction{}, [=, this](auto &txn) {
-  std::vector<pg_log_entry_t> log_entries{};
-
-  int64_t num_objects_before_trim = delta_stats.num_objects;
-  osd_op_p.at_version = pg->next_version();
-  auto ret = remove_or_update_iertr::now();
-  if (new_snaps.empty()) {
-    // remove clone from snapset
-    logger().info("{}: {} snaps {} -> {} ... deleting",
-                  *this, coid, old_snaps, new_snaps);
-    ret = remove_clone(obc, head_obc, txn, log_entries);
-  } else {
-    // save adjusted snaps for this object
-    logger().info("{}: {} snaps {} -> {}",
-                  *this, coid, old_snaps, new_snaps);
-    ret = adjust_snaps(obc, head_obc, new_snaps, txn, log_entries);
-  }
-  return std::move(ret).si_then(
-    [&txn, obc, num_objects_before_trim, log_entries=std::move(log_entries), head_obc=std::move(head_obc), this]() mutable {
-    osd_op_p.at_version = pg->next_version();
-
-    // save head snapset
-    logger().debug("{}: {} new snapset {} on {}",
-                   *this, coid, head_obc->ssc->snapset, head_obc->obs.oi);
-    if (head_obc->ssc->snapset.clones.empty() && head_obc->obs.oi.is_whiteout()) {
-      remove_head_whiteout(obc, head_obc, txn, log_entries);
+    int64_t num_objects_before_trim = delta_stats.num_objects;
+    osd_op_p.at_version = pg->get_next_version();
+    auto ret = remove_or_update_iertr::now();
+    if (new_snaps.empty()) {
+      // remove clone from snapset
+      logger().info("{}: {} snaps {} -> {} ... deleting",
+                   *this, coid, old_snaps, new_snaps);
+      ret = remove_clone(obc, head_obc, txn);
     } else {
-      update_head(obc, head_obc, txn, log_entries);
-    }
-    // Stats reporting - Set number of objects trimmed
-    if (num_objects_before_trim > delta_stats.num_objects) {
-      //int64_t num_objects_trimmed =
-      //  num_objects_before_trim - delta_stats.num_objects;
-      //add_objects_trimmed_count(num_objects_trimmed);
+      // save adjusted snaps for this object
+      logger().info("{}: {} snaps {} -> {}",
+                   *this, coid, old_snaps, new_snaps);
+      ret = adjust_snaps(obc, head_obc, new_snaps, txn);
     }
-  }).si_then(
-    [&txn, log_entries=std::move(log_entries)] () mutable {
-    return remove_or_update_iertr::make_ready_future<remove_or_update_ret_t>(
-      std::make_pair(std::move(txn), std::move(log_entries)));
-  });
+    return std::move(ret).si_then(
+      [&txn, obc, num_objects_before_trim,
+      head_obc=std::move(head_obc), this]() mutable {
+      // save head snapset
+      logger().debug("{}: {} new snapset {} on {}",
+                    *this, coid, head_obc->ssc->snapset, head_obc->obs.oi);
+      if (head_obc->ssc->snapset.clones.empty() && head_obc->obs.oi.is_whiteout()) {
+       remove_head_whiteout(obc, head_obc, txn);
+      } else {
+       update_head(obc, head_obc, txn);
+      }
+      // Stats reporting - Set number of objects trimmed
+      if (num_objects_before_trim > delta_stats.num_objects) {
+       //int64_t num_objects_trimmed =
+       //  num_objects_before_trim - delta_stats.num_objects;
+       //add_objects_trimmed_count(num_objects_trimmed);
+      }
+    }).si_then(
+      [&txn] () mutable {
+      return std::move(txn);
+    });
   });
 }
 
@@ -509,8 +488,7 @@ SnapTrimObjSubEvent::start()
         logger().debug("{}: processing clone_obc={}", *this, clone_obc->get_oid());
         return remove_or_update(
           clone_obc, head_obc
-        ).safe_then_unpack_interruptible([clone_obc, this]
-                                         (auto&& txn, auto&& log_entries) mutable {
+        ).safe_then_interruptible([clone_obc, this](auto&& txn) mutable {
           auto [submitted, all_completed] = pg->submit_transaction(
             std::move(clone_obc),
             std::move(txn),
index afb24952a045cf8c3c97ed538b47bbf876a5a631..288bd207e98553dc9fbfb499ca634342670177a6 100644 (file)
@@ -147,28 +147,22 @@ private:
   remove_or_update_iertr::future<> remove_clone(
     ObjectContextRef obc,
     ObjectContextRef head_obc,
-    ceph::os::Transaction& txn,
-    std::vector<pg_log_entry_t>& log_entries);
+    ceph::os::Transaction& txn);
   void remove_head_whiteout(
     ObjectContextRef obc,
     ObjectContextRef head_obc,
-    ceph::os::Transaction& txn,
-    std::vector<pg_log_entry_t>& log_entries);
+    ceph::os::Transaction& txn);
   interruptible_future<> adjust_snaps(
     ObjectContextRef obc,
     ObjectContextRef head_obc,
     const std::set<snapid_t>& new_snaps,
-    ceph::os::Transaction& txn,
-    std::vector<pg_log_entry_t>& log_entries);
+    ceph::os::Transaction& txn);
   void update_head(
     ObjectContextRef obc,
     ObjectContextRef head_obc,
-    ceph::os::Transaction& txn,
-    std::vector<pg_log_entry_t>& log_entries);
+    ceph::os::Transaction& txn);
 
-  using remove_or_update_ret_t =
-    std::pair<ceph::os::Transaction, std::vector<pg_log_entry_t>>;
-  remove_or_update_iertr::future<remove_or_update_ret_t>
+  remove_or_update_iertr::future<ceph::os::Transaction>
   remove_or_update(ObjectContextRef obc, ObjectContextRef head_obc);
 
   // we don't need to synchronize with other instances started by
@@ -177,11 +171,32 @@ private:
     static constexpr auto type_name = "SnapTrimObjSubEvent::wait_repop";
   } wait_repop;
 
+  void add_log_entry(
+    int _op,
+    const hobject_t& _soid,
+    const eversion_t& pv,
+    version_t uv,
+    const osd_reqid_t& rid,
+    const utime_t& mt,
+    int return_code) {
+    log_entries.emplace_back(
+      _op,
+      _soid,
+      osd_op_p.at_version,
+      pv,
+      uv,
+      rid,
+      mt,
+      return_code);
+    osd_op_p.at_version.version++;
+  }
+
   Ref<PG> pg;
   PipelineHandle handle;
   osd_op_params_t osd_op_p;
   const hobject_t coid;
   const snapid_t snap_to_trim;
+  std::vector<pg_log_entry_t> log_entries;
 
 public:
   PipelineHandle& get_handle() { return handle; }
index 42052a4c84d9d9f0250a31bf25d01d3024e9fa42..9aa92359151fe80009358bcd64965d6f0cc7d546 100644 (file)
@@ -764,6 +764,10 @@ PG::submit_transaction(
   peering_state.append_log_with_trim_to_updated(std::move(log_entries), osd_op_p.at_version,
                                                txn, true, false);
 
+  ceph_assert(!log_entries.empty());
+  ceph_assert(log_entries.rbegin()->version >= projected_last_update);
+  projected_last_update = log_entries.rbegin()->version;
+
   auto [submitted, all_completed] = backend->mutate_object(
       peering_state.get_acting_recovery_backfill(),
       std::move(obc),
@@ -933,7 +937,7 @@ seastar::future<> PG::submit_error_log(
   mempool::osd_pglog::list<pg_log_entry_t> log_entries;
   log_entries.push_back(pg_log_entry_t(pg_log_entry_t::ERROR,
                                        obc->obs.oi.soid,
-                                       next_version(),
+                                       get_next_version(),
                                        eversion_t(), 0,
                                        reqid, utime_t(),
                                        -e.value()));
index 6a0231e452f60f2b8cc06228ed2f046b47fc9b12..02e7ef3804ef8e6a28f8707ee3053f3cc15e57b0 100644 (file)
@@ -611,9 +611,9 @@ private:
 
 public:
   cached_map_t get_osdmap() { return peering_state.get_osdmap(); }
-  eversion_t next_version() {
+  eversion_t get_next_version() {
     return eversion_t(get_osdmap_epoch(),
-                     ++projected_last_update.version);
+                     projected_last_update.version + 1);
   }
   ShardServices& get_shard_services() final {
     return shard_services;