]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: update snaps based on log entries on primary and replica
authorSamuel Just <sjust@redhat.com>
Wed, 24 Jul 2024 05:10:58 +0000 (22:10 -0700)
committerMatan Breizman <mbreizma@redhat.com>
Wed, 4 Sep 2024 09:58:59 +0000 (09:58 +0000)
Signed-off-by: Samuel Just <sjust@redhat.com>
Signed-off-by: Xuehan Xu <xuxuehan@qianxin.com>
src/crimson/osd/ec_backend.cc
src/crimson/osd/ops_executer.cc
src/crimson/osd/ops_executer.h
src/crimson/osd/osd_operations/snaptrim_event.cc
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/crimson/osd/pg_backend.h
src/crimson/osd/replicated_backend.cc

index 26f7e5dedde0d9baaff90466029e1a0dc25a8152..32eaaf02b3f3723822d11dcd82808a78283b0697 100644 (file)
@@ -32,6 +32,6 @@ ECBackend::submit_transaction(const std::set<pg_shard_t> &pg_shards,
                              std::vector<pg_log_entry_t>&& log_entries)
 {
   // todo
-  return {seastar::now(),
-         seastar::make_ready_future<crimson::osd::acked_peers_t>()};
+  return make_ready_future<rep_op_ret_t>(seastar::now(),
+         seastar::make_ready_future<crimson::osd::acked_peers_t>());
 }
index 7c46754866799bb290425aab0e70442373411de1..e5982680ca85503a3e44f414044b09800ca9e142 100644 (file)
@@ -1020,14 +1020,6 @@ OpsExecuter::flush_clone_metadata(
   update_clone_overlap();
   if (cloning_ctx) {
     std::move(*cloning_ctx).apply_to(log_entries, *obc);
-    const auto& coid = log_entries.front().soid;
-    const auto& cloned_snaps = obc->ssc->snapset.clone_snaps[coid.snap];
-    maybe_snap_mapped = snap_map_clone(
-      coid,
-      std::set<snapid_t>{std::begin(cloned_snaps), std::end(cloned_snaps)},
-      snap_mapper,
-      osdriver,
-      txn);
   }
   if (snapc.seq > obc->ssc->snapset.seq) {
      // update snapset with latest snap context
index 834266ce68f052463414169aff29f4d6d33a0009..812e246826688ed4456d2092d3b9540bb81ee3fd 100644 (file)
@@ -272,6 +272,7 @@ private:
     OSDriver& osdriver,
     ceph::os::Transaction& txn);
 
+public:
   static interruptible_future<> snap_map_remove(
     const hobject_t& soid,
     SnapMapper& snap_mapper,
@@ -290,6 +291,7 @@ private:
     OSDriver& osdriver,
     ceph::os::Transaction& txn);
 
+private:
   // this gizmo could be wrapped in std::optional for the sake of lazy
   // initialization. we don't need it for ops that doesn't have effect
   // TODO: verify the init overhead of chunked_fifo
@@ -534,14 +536,17 @@ OpsExecuter::flush_changes_n_do_ops_effects(
       if (auto log_rit = log_entries.rbegin(); log_rit != log_entries.rend()) {
         ceph_assert(log_rit->version == osd_op_params->at_version);
       }
-      auto [submitted, all_completed] =
-        std::forward<MutFunc>(mut_func)(std::move(txn),
-                                        std::move(obc),
-                                        std::move(*osd_op_params),
-                                        std::move(log_entries));
-      return interruptor::make_ready_future<rep_op_fut_tuple>(
-       std::move(submitted),
-       osd_op_ierrorator::future<>(std::move(all_completed)));
+      return std::forward<MutFunc>(mut_func)(std::move(txn),
+                                            std::move(obc),
+                                            std::move(*osd_op_params),
+                                            std::move(log_entries)
+      ).then_interruptible([](auto p) {
+       auto &submitted = std::get<0>(p);
+       auto &all_completed = std::get<1>(p);
+       return interruptor::make_ready_future<rep_op_fut_tuple>(
+         std::move(submitted),
+         osd_op_ierrorator::future<>(std::move(all_completed)));
+      });
     });
   }
   apply_stats();
index f1247c3fa08f2312747cd26c74b2d7b4789a8d0f..a8001e75c4c8f7427bdf50d60e5955ccfa46bf30 100644 (file)
@@ -409,20 +409,25 @@ SnapTrimObjSubEvent::start()
          logger().debug("{}: processing clone_obc={}", *this, clone_obc->get_oid());
          return remove_or_update(
            clone_obc, head_obc
-         ).safe_then_interruptible([clone_obc, this](auto&& txn) mutable {
-           auto [submitted, all_completed] = pg->submit_transaction(
+         ).safe_then_interruptible(
+           [clone_obc, this](auto&& txn) mutable {
+           return pg->submit_transaction(
              std::move(clone_obc),
              std::move(txn),
              std::move(osd_op_p),
-             std::move(log_entries));
-           return submitted.then_interruptible(
-             [this, all_completed=std::move(all_completed)]() mutable {
-               return enter_stage<interruptor>(
-                 client_pp().wait_repop
-               ).then_interruptible([all_completed=std::move(all_completed)]() mutable{
-                 return std::move(all_completed);
+             std::move(log_entries)
+           ).then_interruptible([this](auto p) {
+             auto &submitted = std::get<0>(p);
+             auto &all_completed = std::get<1>(p);
+             return submitted.then_interruptible(
+               [this, all_completed=std::move(all_completed)]() mutable {
+                 return enter_stage<interruptor>(
+                   client_pp().wait_repop
+                 ).then_interruptible([all_completed=std::move(all_completed)]() mutable{
+                   return std::move(all_completed);
+                 });
                });
-             });
+           });
          });
        });
     },
index 291385de64991f4f159e875fe8a8b78a0f8e09eb..cd362a0c9f04765dff1fb0107772b4d70c2f071a 100644 (file)
@@ -894,8 +894,9 @@ void PG::mutate_object(
   }
 }
 
-std::tuple<PG::interruptible_future<>,
-           PG::interruptible_future<>>
+PG::interruptible_future<
+  std::tuple<PG::interruptible_future<>,
+             PG::interruptible_future<>>>
 PG::submit_transaction(
   ObjectContextRef&& obc,
   ceph::os::Transaction&& txn,
@@ -903,9 +904,10 @@ PG::submit_transaction(
   std::vector<pg_log_entry_t>&& log_entries)
 {
   if (__builtin_expect(stopping, false)) {
-    return {seastar::make_exception_future<>(
-              crimson::common::system_shutdown_exception()),
-            seastar::now()};
+    co_return std::make_tuple(
+        interruptor::make_interruptible(seastar::make_exception_future<>(
+          crimson::common::system_shutdown_exception())),
+        interruptor::now());
   }
 
   epoch_t map_epoch = get_osdmap_epoch();
@@ -917,7 +919,7 @@ PG::submit_transaction(
   ceph_assert(log_entries.rbegin()->version >= projected_last_update);
   projected_last_update = log_entries.rbegin()->version;
 
-  auto [submitted, all_completed] = backend->submit_transaction(
+  auto [submitted, all_completed] = co_await backend->submit_transaction(
       peering_state.get_acting_recovery_backfill(),
       obc->obs.oi.soid,
       std::move(txn),
@@ -925,16 +927,19 @@ PG::submit_transaction(
       peering_state.get_last_peering_reset(),
       map_epoch,
       std::move(log_entries));
-  return std::make_tuple(std::move(submitted), all_completed.then_interruptible(
-    [this, last_complete=peering_state.get_info().last_complete,
+  co_return std::make_tuple(
+    std::move(submitted),
+    all_completed.then_interruptible(
+      [this, last_complete=peering_state.get_info().last_complete,
       at_version=osd_op_p.at_version](auto acked) {
-    for (const auto& peer : acked) {
-      peering_state.update_peer_last_complete_ondisk(
-        peer.shard, peer.last_complete_ondisk);
-    }
-    peering_state.complete_write(at_version, last_complete);
-    return seastar::now();
-  }));
+      for (const auto& peer : acked) {
+        peering_state.update_peer_last_complete_ondisk(
+          peer.shard, peer.last_complete_ondisk);
+      }
+      peering_state.complete_write(at_version, last_complete);
+      return seastar::now();
+    })
+  );
 }
 
 PG::interruptible_future<> PG::repair_object(
@@ -1453,6 +1458,11 @@ PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
   std::vector<pg_log_entry_t> log_entries;
   decode(log_entries, p);
   update_stats(req->pg_stats);
+
+  co_await update_snap_map(
+    log_entries,
+    txn);
+
   log_operation(std::move(log_entries),
                 req->pg_trim_to,
                 req->version,
@@ -1477,6 +1487,54 @@ PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
   );
   co_return;
 }
+
+PG::interruptible_future<> PG::update_snap_map(
+  const std::vector<pg_log_entry_t> &log_entries,
+  ObjectStore::Transaction& t)
+{
+  LOG_PREFIX(PG::update_snap_map);
+  for (auto i = log_entries.cbegin(); i != log_entries.cend(); ++i) {
+    OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
+    if (i->soid.snap < CEPH_MAXSNAP) {
+      if (i->is_delete()) {
+       co_await OpsExecuter::snap_map_remove(
+         i->soid,
+         snap_mapper,
+         osdriver,
+         t);
+      } else if (i->is_update()) {
+       ceph_assert(i->snaps.length() > 0);
+       vector<snapid_t> snaps;
+       bufferlist snapbl = i->snaps;
+       auto p = snapbl.cbegin();
+       try {
+         decode(snaps, p);
+       } catch (...) {
+         ERRORDPP("Failed to decode snaps on {}", *this, *i);
+         snaps.clear();
+       }
+       set<snapid_t> _snaps(snaps.begin(), snaps.end());
+       
+       if (i->is_clone() || i->is_promote()) {
+         co_await OpsExecuter::snap_map_clone(
+           i->soid,
+           _snaps,
+           snap_mapper,
+           osdriver,
+           t);
+       } else if (i->is_modify()) {
+         co_await OpsExecuter::snap_map_modify(
+           i->soid,
+           _snaps,
+           snap_mapper,
+           osdriver,
+           t);
+       } else {
+         ceph_assert(i->is_clean());
+       }
+      }
+    }
+  }
 }
 
 void PG::log_operation(
index b1e5e1fa22aa227565a52856952a50edbc40dad5..d4d3bb92e00347c62cc180836777b6a17b2db6e4 100644 (file)
@@ -591,6 +591,9 @@ public:
 
   interruptible_future<> handle_rep_op(Ref<MOSDRepOp> m);
   void update_stats(const pg_stat_t &stat);
+  interruptible_future<> update_snap_map(
+    const std::vector<pg_log_entry_t> &log_entries,
+    ObjectStore::Transaction& t);
   void log_operation(
     std::vector<pg_log_entry_t>&& logv,
     const eversion_t &trim_to,
@@ -673,7 +676,8 @@ private:
     SuccessFunc&& success_func,
     FailureFunc&& failure_func);
   interruptible_future<MURef<MOSDOpReply>> do_pg_ops(Ref<MOSDOp> m);
-  std::tuple<interruptible_future<>, interruptible_future<>>
+  interruptible_future<
+    std::tuple<interruptible_future<>, interruptible_future<>>>
   submit_transaction(
     ObjectContextRef&& obc,
     ceph::os::Transaction&& txn,
index 21dce24b899ec2e9e4c9b442b05879e91d528cc8..fa1f1405ffe0f4fcfa9d65f94bcc657379583558 100644 (file)
@@ -60,9 +60,10 @@ public:
   using interruptible_future =
     ::crimson::interruptible::interruptible_future<
       ::crimson::osd::IOInterruptCondition, T>;
-  using rep_op_fut_t =
+  using rep_op_ret_t = 
     std::tuple<interruptible_future<>,
               interruptible_future<crimson::osd::acked_peers_t>>;
+  using rep_op_fut_t = interruptible_future<rep_op_ret_t>;
   PGBackend(shard_id_t shard, CollectionRef coll,
             crimson::osd::ShardServices &shard_services,
             DoutPrefixProvider &dpp);
index 6ec8d30d59678b2d61299e96d6036f5172cbcbfa..cbb8c883e075273345278d9400608edf12601bb5 100644 (file)
@@ -5,6 +5,7 @@
 
 #include "messages/MOSDRepOpReply.h"
 
+#include "crimson/common/coroutine.h"
 #include "crimson/common/exception.h"
 #include "crimson/common/log.h"
 #include "crimson/os/futurized_store.h"
@@ -38,13 +39,16 @@ ReplicatedBackend::_read(const hobject_t& hoid,
 ReplicatedBackend::rep_op_fut_t
 ReplicatedBackend::submit_transaction(const std::set<pg_shard_t>& pg_shards,
                                       const hobject_t& hoid,
-                                      ceph::os::Transaction&& txn,
-                                      osd_op_params_t&& osd_op_p,
+                                      ceph::os::Transaction&& t,
+                                      osd_op_params_t&& opp,
                                       epoch_t min_epoch, epoch_t map_epoch,
-                                     std::vector<pg_log_entry_t>&& log_entries)
+                                     std::vector<pg_log_entry_t>&& logv)
 {
   LOG_PREFIX(ReplicatedBackend::submit_transaction);
   DEBUGDPP("object {}", dpp, hoid);
+  auto log_entries = std::move(logv);
+  auto txn = std::move(t);
+  auto osd_op_p = std::move(opp);
 
   const ceph_tid_t tid = shard_services.get_tid();
   auto pending_txn =
@@ -89,6 +93,8 @@ ReplicatedBackend::submit_transaction(const std::set<pg_shard_t>& pg_shards,
     }
   }
 
+  co_await pg.update_snap_map(log_entries, txn);
+
   pg.log_operation(
     std::move(log_entries),
     osd_op_p.pg_trim_to,
@@ -99,8 +105,8 @@ ReplicatedBackend::submit_transaction(const std::set<pg_shard_t>& pg_shards,
     false);
 
   auto all_completed = interruptor::make_interruptible(
-    shard_services.get_store().do_transaction(coll, std::move(txn))
-  ).then_interruptible([FNAME, this,
+      shard_services.get_store().do_transaction(coll, std::move(txn))
+   ).then_interruptible([FNAME, this,
                        peers=pending_txn->second.weak_from_this()] {
     if (!peers) {
       // for now, only actingset_changed can cause peers
@@ -117,13 +123,14 @@ ReplicatedBackend::submit_transaction(const std::set<pg_shard_t>& pg_shards,
   }).then_interruptible([pending_txn, this] {
     auto acked_peers = std::move(pending_txn->second.acked_peers);
     pending_trans.erase(pending_txn);
-    return seastar::make_ready_future<crimson::osd::acked_peers_t>(std::move(acked_peers));
+    return seastar::make_ready_future<
+      crimson::osd::acked_peers_t>(std::move(acked_peers));
   });
 
   auto sends_complete = seastar::when_all_succeed(
     sends->begin(), sends->end()
   ).finally([sends=std::move(sends)] {});
-  return {std::move(sends_complete), std::move(all_completed)};
+  co_return std::make_tuple(std::move(sends_complete), std::move(all_completed));
 }
 
 void ReplicatedBackend::on_actingset_changed(bool same_primary)