]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson: manage log submission atomicity independently of pipeline stages
authorSamuel Just <sjust@redhat.com>
Thu, 24 Oct 2024 23:39:18 +0000 (16:39 -0700)
committerSamuel Just <sjust@redhat.com>
Wed, 11 Dec 2024 02:02:01 +0000 (18:02 -0800)
It's important to construct and submit log entries atomically because
the submission order needs to match the entry numbering.  Previously,
this occurred under the pg-wide exclusive process stage.  Shortly, each
obc will have its own process stage, so we need to ensure that atomicity
seperately from the pipeline stages.  Introduce a simple mutex.

Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/osd/osd_operations/snaptrim_event.cc
src/crimson/osd/pg.cc
src/crimson/osd/pg.h

index 7f78457167d03eb77fa1175c7cf0a7b3a64da30a..459c98bb9c0f6e1720b5fd62dcac38f6c8f87356 100644 (file)
@@ -414,21 +414,31 @@ SnapTrimObjSubEvent::start()
   logger().debug("{}: got obc={}", *this, obc_manager.get_obc()->get_oid());
 
   co_await enter_stage<interruptor>(client_pp().process);
+  auto all_completed = interruptor::now();
+  {
+    // as with PG::submit_executer, we need to build the pg log entries
+    // and submit the transaction atomically
+    co_await interruptor::make_interruptible(pg->submit_lock.lock());
+    auto unlocker = seastar::defer([this] {
+      pg->submit_lock.unlock();
+    });
 
-  logger().debug("{}: processing obc={}", *this, obc_manager.get_obc()->get_oid());
-
-  auto txn = co_await remove_or_update(
-    obc_manager.get_obc(), obc_manager.get_head_obc());
-
-  auto [submitted, all_completed] = co_await pg->submit_transaction(
-    ObjectContextRef(obc_manager.get_obc()),
-    nullptr,
-    std::move(txn),
-    std::move(osd_op_p),
-    std::move(log_entries)
-  );
-
-  co_await std::move(submitted);
+    logger().debug("{}: calling remove_or_update obc={}",
+                  *this, obc_manager.get_obc()->get_oid());
+
+    auto txn = co_await remove_or_update(
+      obc_manager.get_obc(), obc_manager.get_head_obc());
+
+    auto submitted = interruptor::now();
+    std::tie(submitted, all_completed) = co_await pg->submit_transaction(
+      ObjectContextRef(obc_manager.get_obc()),
+      nullptr,
+      std::move(txn),
+      std::move(osd_op_p),
+      std::move(log_entries)
+    );
+    co_await std::move(submitted);
+  }
 
   co_await enter_stage<interruptor>(client_pp().wait_repop);
 
index a276b0e43661a4c6069e2de6fbffc439a4e237c8..c677961fe0fc47093d535a881cecae7636e9b067 100644 (file)
@@ -1039,6 +1039,12 @@ PG::interruptible_future<eversion_t> PG::submit_error_log(
   const std::error_code e,
   ceph_tid_t rep_tid)
 {
+  // as with submit_executer, need to ensure that log numbering and submission
+  // are atomic
+  co_await interruptor::make_interruptible(submit_lock.lock());
+  auto unlocker = seastar::defer([this] {
+    submit_lock.unlock();
+  });
   LOG_PREFIX(PG::submit_error_log);
   DEBUGDPP("{} rep_tid: {} error: {}",
           *this, *m, rep_tid, e);
@@ -1156,8 +1162,15 @@ PG::submit_executer_fut PG::submit_executer(
   OpsExecuter &&ox,
   const std::vector<OSDOp>& ops) {
   LOG_PREFIX(PG::submit_executer);
-  // transaction must commit at this point
-  return std::move(
+
+  // we need to build the pg log entries and submit the transaction
+  // atomically to ensure log ordering
+  co_await interruptor::make_interruptible(submit_lock.lock());
+  auto unlocker = seastar::defer([this] {
+    submit_lock.unlock();
+  });
+
+  auto [submitted, completed] = co_await std::move(
     ox
   ).flush_changes_n_do_ops_effects(
     ops,
@@ -1177,6 +1190,10 @@ PG::submit_executer_fut PG::submit_executer(
        std::move(osd_op_p),
        std::move(log_entries));
     });
+
+  co_return std::make_tuple(
+    std::move(submitted).then_interruptible([unlocker=std::move(unlocker)] {}),
+    std::move(completed));
 }
 
 PG::interruptible_future<MURef<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
index 15aeec0e4f35c0fd93c3b73d3ec91a603cb7f6b3..2adaf69f26b3016da468db96824be31782d48c94 100644 (file)
@@ -663,6 +663,7 @@ private:
     const OpInfo &op_info,
     std::vector<OSDOp>& ops);
 
+  seastar::shared_mutex submit_lock;
   using submit_executer_ret = std::tuple<
     interruptible_future<>,
     interruptible_future<>>;