]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: simplify the management of OpsExecuter's life-time. 41554/head
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Wed, 26 May 2021 13:20:52 +0000 (13:20 +0000)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Wed, 26 May 2021 14:30:27 +0000 (14:30 +0000)
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/crimson/osd/ops_executer.h
src/crimson/osd/pg.cc
src/crimson/osd/pg.h

index d78354a4e2451c49a8302e17f6a384077e9a8dde..c84b9b87965299a6e1014b519d169b56b22027bd 100644 (file)
@@ -12,6 +12,7 @@
 #include <seastar/core/chunked_fifo.hh>
 #include <seastar/core/future.hh>
 #include <seastar/core/shared_future.hh>
+#include <seastar/core/shared_ptr.hh>
 
 #include "common/dout.h"
 #include "common/static_ptr.h"
@@ -35,7 +36,7 @@ namespace crimson::osd {
 class PG;
 
 // OpsExecuter -- a class for executing ops targeting a certain object.
-class OpsExecuter {
+class OpsExecuter : public seastar::enable_lw_shared_from_this<OpsExecuter> {
   using call_errorator = crimson::errorator<
     crimson::stateful_ec,
     crimson::ct_error::enoent,
@@ -244,19 +245,10 @@ public:
   }
 
   template <class Func>
-  struct RollbackHelper {
-    interruptible_future<> rollback_obc_if_modified(const std::error_code& e);
-    ObjectContextRef get_obc() const {
-      return ox.obc;
-    }
-    OpsExecuter& ox;
-    Func func;
-  };
+  struct RollbackHelper;
 
   template <class Func>
-  RollbackHelper<Func> create_rollbacker(Func&& func) {
-    return {*this, std::forward<Func>(func)};
-  }
+  RollbackHelper<Func> create_rollbacker(Func&& func);
 
   interruptible_errorated_future<osd_op_errorator>
   execute_op(OSDOp& osd_op);
@@ -369,6 +361,24 @@ OpsExecuter::flush_changes_n_do_ops_effects(Ref<PG> pg, MutFunc&& mut_func) &&
   }
 }
 
+template <class Func>
+struct OpsExecuter::RollbackHelper {
+  interruptible_future<> rollback_obc_if_modified(const std::error_code& e);
+  ObjectContextRef get_obc() const {
+    assert(ox);
+    return ox->obc;
+  }
+  seastar::lw_shared_ptr<OpsExecuter> ox;
+  Func func;
+};
+
+template <class Func>
+inline OpsExecuter::RollbackHelper<Func>
+OpsExecuter::create_rollbacker(Func&& func) {
+  return {shared_from_this(), std::forward<Func>(func)};
+}
+
+
 template <class Func>
 OpsExecuter::interruptible_future<>
 OpsExecuter::RollbackHelper<Func>::rollback_obc_if_modified(
@@ -393,14 +403,15 @@ OpsExecuter::RollbackHelper<Func>::rollback_obc_if_modified(
   // typically append them before any write. If OpsExecuter hasn't
   // seen any modifying operation, `obc` is supposed to be kept
   // unchanged.
-  const auto need_rollback = ox.has_seen_write();
+  assert(ox);
+  const auto need_rollback = ox->has_seen_write();
   crimson::get_logger(ceph_subsys_osd).debug(
     "{}: object {} got error {}, need_rollback={}",
     __func__,
-    ox.obc->get_oid(),
+    ox->obc->get_oid(),
     e,
     need_rollback);
-  return need_rollback ? func(*ox.obc) : interruptor::now();
+  return need_rollback ? func(*ox->obc) : interruptor::now();
 }
 
 // PgOpsExecuter -- a class for executing ops targeting a certain PG.
index fee497a45b4121ce0337474d3e4f03b3c8690d0f..6a5bd95d917b8d723a4014b3e4d33d35408fe619 100644 (file)
@@ -657,28 +657,29 @@ PG::interruptible_future<> PG::repair_object(
 template <class Ret, class SuccessFunc, class FailureFunc>
 PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<Ret>>
 PG::do_osd_ops_execute(
-  OpsExecuter&& ox,
+  seastar::lw_shared_ptr<OpsExecuter> ox,
   std::vector<OSDOp>& ops,
   const OpInfo &op_info,
   SuccessFunc&& success_func,
   FailureFunc&& failure_func)
 {
-  auto rollbacker = ox.create_rollbacker([this] (auto& obc) {
+  assert(ox);
+  auto rollbacker = ox->create_rollbacker([this] (auto& obc) {
     return reload_obc(obc).handle_error_interruptible(
       load_obc_ertr::assert_all{"can't live with object state messed up"});
   });
   auto failure_func_ptr = seastar::make_lw_shared(std::move(failure_func));
-  return interruptor::do_for_each(ops, [&ox](OSDOp& osd_op) {
+  return interruptor::do_for_each(ops, [ox](OSDOp& osd_op) {
     logger().debug(
       "do_osd_ops_execute: object {} - handling op {}",
-      ox.get_target(),
+      ox->get_target(),
       ceph_osd_op_name(osd_op.op.op));
-    return ox.execute_op(osd_op);
-  }).safe_then_interruptible([this, &ox, &op_info, &ops] {
+    return ox->execute_op(osd_op);
+  }).safe_then_interruptible([this, ox, &op_info, &ops] {
     logger().debug(
       "do_osd_ops_execute: object {} all operations successful",
-      ox.get_target());
-    return std::move(ox).flush_changes_n_do_ops_effects(
+      ox->get_target());
+    return std::move(*ox).flush_changes_n_do_ops_effects(
       Ref<PG>{this},
       [this, &op_info, &ops] (auto&& txn,
                               auto&& obc,
@@ -745,10 +746,11 @@ PG::do_osd_ops(
   if (__builtin_expect(stopping, false)) {
     throw crimson::common::system_shutdown_exception();
   }
-  auto ox = std::make_unique<OpsExecuter>(
-    std::move(obc), op_info, get_pool().info, get_backend(), *m);
   return do_osd_ops_execute<Ref<MOSDOpReply>>(
-    std::move(*ox), m->ops, op_info,
+    seastar::make_lw_shared<OpsExecuter>(
+      std::move(obc), op_info, get_pool().info, get_backend(), *m),
+    m->ops,
+    op_info,
     [this, m, rvec = op_info.allows_returnvec()] {
       // TODO: should stop at the first op which returns a negative retval,
       //       cmpext uses it for returning the index of first unmatched byte
@@ -776,13 +778,7 @@ PG::do_osd_ops(
         peering_state.get_info().last_update,
         peering_state.get_info().last_user_version);
       return do_osd_ops_iertr::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
-    }
-  ).safe_then_unpack_interruptible(
-    [ox_deleter=std::move(ox)](auto submitted, auto all_completed) mutable {
-    return do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t<Ref<MOSDOpReply>>>(
-      std::move(submitted),
-      all_completed.finally([ox_deleter=std::move(ox_deleter)] {}));
-  });
+    });
 }
 
 PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<>>
@@ -794,20 +790,13 @@ PG::do_osd_ops(
   do_osd_ops_success_func_t success_func,
   do_osd_ops_failure_func_t failure_func)
 {
-  auto ox = std::make_unique<OpsExecuter>(
-    std::move(obc), op_info, get_pool().info, get_backend(), msg_params);
   return do_osd_ops_execute<void>(
-    std::move(*ox),
+    seastar::make_lw_shared<OpsExecuter>(
+      std::move(obc), op_info, get_pool().info, get_backend(), msg_params),
     ops,
     std::as_const(op_info),
     std::move(success_func),
-    std::move(failure_func)
-  ).safe_then_unpack_interruptible(
-    [ox_deleter=std::move(ox)](auto submitted, auto all_completed) mutable {
-    return do_osd_ops_iertr::make_ready_future<pg_rep_op_fut_t<>>(
-      std::move(submitted),
-      all_completed.finally([ox_deleter=std::move(ox_deleter)] {}));
-  });
+    std::move(failure_func));
 }
 
 PG::interruptible_future<Ref<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
index ad42c4252f89a1de3127fb0aedafebb1230aa490..d0e96c3c58f2e2a25307b3f587e28d1fabd2ff68 100644 (file)
@@ -590,7 +590,7 @@ private:
     do_osd_ops_failure_func_t failure_func);
   template <class Ret, class SuccessFunc, class FailureFunc>
   do_osd_ops_iertr::future<pg_rep_op_fut_t<Ret>> do_osd_ops_execute(
-    OpsExecuter&& ox,
+    seastar::lw_shared_ptr<OpsExecuter> ox,
     std::vector<OSDOp>& ops,
     const OpInfo &op_info,
     SuccessFunc&& success_func,