]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: submit_transaction() refactoring 45794/head
authorMatan Breizman <mbreizma@redhat.com>
Mon, 4 Apr 2022 12:24:54 +0000 (12:24 +0000)
committerMatan Breizman <mbreizma@redhat.com>
Sun, 1 May 2022 10:42:31 +0000 (10:42 +0000)
Signed-off-by: Matan Breizman <mbreizma@redhat.com>
src/crimson/osd/ops_executer.cc
src/crimson/osd/ops_executer.h
src/crimson/osd/pg.cc
src/crimson/osd/pg.h

index 81a7b50d2ce62d5faff9ec1dd2027caedd04f73c..75366d0ac15fc69214cf2642a5bd7f58afdc47dc 100644 (file)
@@ -676,6 +676,39 @@ OpsExecuter::do_execute_op(OSDOp& osd_op)
   }
 }
 
+void OpsExecuter::fill_op_params_bump_pg_version()
+{
+  osd_op_params->req_id = msg->get_reqid();
+  osd_op_params->mtime = msg->get_mtime();
+  osd_op_params->at_version = pg->next_version();
+  osd_op_params->pg_trim_to = pg->get_pg_trim_to();
+  osd_op_params->min_last_complete_ondisk = pg->get_min_last_complete_ondisk();
+  osd_op_params->last_complete = pg->get_info().last_complete;
+  if (user_modify) {
+    osd_op_params->user_at_version = osd_op_params->at_version.version;
+  }
+}
+
+std::vector<pg_log_entry_t> OpsExecuter::prepare_transaction(
+  const std::vector<OSDOp>& ops)
+{
+  std::vector<pg_log_entry_t> log_entries;
+  log_entries.emplace_back(obc->obs.exists ?
+      pg_log_entry_t::MODIFY : pg_log_entry_t::DELETE,
+    obc->obs.oi.soid, osd_op_params->at_version, obc->obs.oi.version,
+    osd_op_params->user_modify ? osd_op_params->at_version.version : 0,
+    osd_op_params->req_id, osd_op_params->mtime,
+    op_info.allows_returnvec() && !ops.empty() ? ops.back().rval.code : 0);
+  if (op_info.allows_returnvec()) {
+    // also the per-op values are recorded in the pg log
+    log_entries.back().set_op_returns(ops);
+    logger().debug("{} op_returns: {}",
+                   __func__, log_entries.back().op_returns);
+  }
+  log_entries.back().clean_regions = std::move(osd_op_params->clean_regions);
+  return log_entries;
+}
+
 // Defined here because there is a circular dependency between OpsExecuter and PG
 uint32_t OpsExecuter::get_pool_stripe_width() const {
   return pg->get_pool().info.get_stripe_width();
index 659c682da3a65946ac53b6e6e1d8bfc9b7cd8484..61776526dadcf2305c3cfef0f0d9dabe21236b20 100644 (file)
@@ -257,7 +257,11 @@ public:
   using rep_op_fut_t =
     interruptible_future<rep_op_fut_tuple>;
   template <typename MutFunc>
-  rep_op_fut_t flush_changes_n_do_ops_effects(MutFunc&& mut_func) &&;
+  rep_op_fut_t flush_changes_n_do_ops_effects(const std::vector<OSDOp>& ops,
+    MutFunc&& mut_func) &&;
+  std::vector<pg_log_entry_t> prepare_transaction(
+    const std::vector<OSDOp>& ops);
+  void fill_op_params_bump_pg_version();
 
   const hobject_t &get_target() const {
     return obc->obs.oi.soid;
@@ -323,7 +327,9 @@ auto OpsExecuter::with_effect_on_obc(
 
 template <typename MutFunc>
 OpsExecuter::rep_op_fut_t
-OpsExecuter::flush_changes_n_do_ops_effects(MutFunc&& mut_func) &&
+OpsExecuter::flush_changes_n_do_ops_effects(
+  const std::vector<OSDOp>& ops,
+  MutFunc&& mut_func) &&
 {
   const bool want_mutate = !txn.empty();
   // osd_op_params are instantiated by every wr-like operation.
@@ -334,12 +340,12 @@ OpsExecuter::flush_changes_n_do_ops_effects(MutFunc&& mut_func) &&
        seastar::now(),
        interruptor::make_interruptible(osd_op_errorator::now()));
   if (want_mutate) {
-    osd_op_params->req_id = msg->get_reqid();
-    osd_op_params->mtime = msg->get_mtime();
+    fill_op_params_bump_pg_version();
+    auto log_entries = prepare_transaction(ops);
     auto [submitted, all_completed] = std::forward<MutFunc>(mut_func)(std::move(txn),
                                                     std::move(obc),
                                                     std::move(*osd_op_params),
-                                                    user_modify);
+                                                    std::move(log_entries));
     maybe_mutated = interruptor::make_ready_future<rep_op_fut_tuple>(
        std::move(submitted),
        osd_op_ierrorator::future<>(std::move(all_completed)));
index d51b939956e27616acc5af2900d54cf1269028ac..a9caa5a3dd45f5cae113dd67f42622e5cf99e7a7 100644 (file)
@@ -571,11 +571,10 @@ seastar::future<> PG::WaitForActiveBlocker::stop()
 std::tuple<PG::interruptible_future<>,
            PG::interruptible_future<>>
 PG::submit_transaction(
-  const OpInfo& op_info,
-  const std::vector<OSDOp>& ops,
   ObjectContextRef&& obc,
   ceph::os::Transaction&& txn,
-  osd_op_params_t&& osd_op_p)
+  osd_op_params_t&& osd_op_p,
+  std::vector<pg_log_entry_t>&& log_entries)
 {
   if (__builtin_expect(stopping, false)) {
     return {seastar::make_exception_future<>(
@@ -589,21 +588,6 @@ PG::submit_transaction(
     throw crimson::common::actingset_changed(is_primary());
   }
 
-  std::vector<pg_log_entry_t> log_entries;
-  log_entries.emplace_back(obc->obs.exists ?
-                     pg_log_entry_t::MODIFY : pg_log_entry_t::DELETE,
-                   obc->obs.oi.soid, osd_op_p.at_version, obc->obs.oi.version,
-                   osd_op_p.user_modify ? osd_op_p.at_version.version : 0,
-                   osd_op_p.req_id, osd_op_p.mtime,
-                    op_info.allows_returnvec() && !ops.empty() ? ops.back().rval.code : 0);
-  // TODO: refactor the submit_transaction
-  if (op_info.allows_returnvec()) {
-    // also the per-op values are recorded in the pg log
-    log_entries.back().set_op_returns(ops);
-    logger().debug("{} op_returns: {}",
-                   __func__, log_entries.back().op_returns);
-  }
-  log_entries.back().clean_regions = std::move(osd_op_p.clean_regions);
   peering_state.pre_submit_op(obc->obs.oi.soid, log_entries, osd_op_p.at_version);
   peering_state.append_log_with_trim_to_updated(std::move(log_entries), osd_op_p.at_version,
                                                txn, true, false);
@@ -628,19 +612,6 @@ PG::submit_transaction(
   }));
 }
 
-void PG::fill_op_params_bump_pg_version(
-  osd_op_params_t& osd_op_p,
-  const bool user_modify)
-{
-  osd_op_p.at_version = next_version();
-  osd_op_p.pg_trim_to = get_pg_trim_to();
-  osd_op_p.min_last_complete_ondisk = get_min_last_complete_ondisk();
-  osd_op_p.last_complete = get_info().last_complete;
-  if (user_modify) {
-    osd_op_p.user_at_version = osd_op_p.at_version.version;
-  }
-}
-
 PG::interruptible_future<> PG::repair_object(
   const hobject_t& oid,
   eversion_t& v) 
@@ -661,7 +632,6 @@ PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<Ret>>
 PG::do_osd_ops_execute(
   seastar::lw_shared_ptr<OpsExecuter> ox,
   std::vector<OSDOp>& ops,
-  const OpInfo &op_info,
   SuccessFunc&& success_func,
   FailureFunc&& failure_func)
 {
@@ -677,26 +647,24 @@ PG::do_osd_ops_execute(
       ox->get_target(),
       ceph_osd_op_name(osd_op.op.op));
     return ox->execute_op(osd_op);
-  }).safe_then_interruptible([this, ox, &op_info, &ops] {
+  }).safe_then_interruptible([this, ox, &ops] {
     logger().debug(
       "do_osd_ops_execute: object {} all operations successful",
       ox->get_target());
     peering_state.apply_op_stats(ox->get_target(), ox->get_stats());
-    return std::move(*ox).flush_changes_n_do_ops_effects(
-      [this, &op_info, &ops] (auto&& txn,
-                              auto&& obc,
-                              auto&& osd_op_p,
-                              bool user_modify) {
+    return std::move(*ox).flush_changes_n_do_ops_effects(ops,
+      [this] (auto&& txn,
+              auto&& obc,
+              auto&& osd_op_p,
+              auto&& log_entries) {
        logger().debug(
          "do_osd_ops_execute: object {} submitting txn",
          obc->get_oid());
-        fill_op_params_bump_pg_version(osd_op_p, user_modify);
        return submit_transaction(
-          op_info,
-          ops,
           std::move(obc),
           std::move(txn),
-          std::move(osd_op_p));
+          std::move(osd_op_p),
+          std::move(log_entries));
     });
   }).safe_then_unpack_interruptible(
     [success_func=std::move(success_func), rollbacker, this, failure_func_ptr]
@@ -752,7 +720,6 @@ PG::do_osd_ops(
     seastar::make_lw_shared<OpsExecuter>(
       Ref<PG>{this}, obc, op_info, *m),
     m->ops,
-    op_info,
     [this, m, obc, may_write = op_info.may_write(),
      may_read = op_info.may_read(), rvec = op_info.allows_returnvec()] {
       // TODO: should stop at the first op which returns a negative retval,
@@ -820,7 +787,6 @@ PG::do_osd_ops(
       seastar::make_lw_shared<OpsExecuter>(
         Ref<PG>{this}, std::move(obc), op_info, msg_params),
       ops,
-      std::as_const(op_info),
       std::move(success_func),
       std::move(failure_func));
   });
index 18aa9ede7b9380075ee97633d86a3be4766707aa..f8284771e76afe21d90da0832cba05404dafdfd0 100644 (file)
@@ -599,17 +599,15 @@ private:
   do_osd_ops_iertr::future<pg_rep_op_fut_t<Ret>> do_osd_ops_execute(
     seastar::lw_shared_ptr<OpsExecuter> ox,
     std::vector<OSDOp>& ops,
-    const OpInfo &op_info,
     SuccessFunc&& success_func,
     FailureFunc&& failure_func);
   interruptible_future<MURef<MOSDOpReply>> do_pg_ops(Ref<MOSDOp> m);
   std::tuple<interruptible_future<>, interruptible_future<>>
   submit_transaction(
-    const OpInfo& op_info,
-    const std::vector<OSDOp>& ops,
     ObjectContextRef&& obc,
     ceph::os::Transaction&& txn,
-    osd_op_params_t&& oop);
+    osd_op_params_t&& oop,
+    std::vector<pg_log_entry_t>&& log_entries);
   interruptible_future<> repair_object(
     const hobject_t& oid,
     eversion_t& v);