]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: split PG::do_osd_ops() to facilitate InternalClientRequest.
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Mon, 29 Mar 2021 17:03:19 +0000 (17:03 +0000)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Mon, 10 May 2021 16:01:32 +0000 (18:01 +0200)
This commit brings `PG::do_osd_ops_execute()` a subset of
`PG::do_osd_ops()`; it handles the ops execution through
`OpsExecuter` and the `submit_transaction()` but it stays
indepedent from `MOSDOp` and `MOSDOpReply`. This trait
facilitates the `InternalClientRequest`.

Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/crimson/osd/ops_executer.h
src/crimson/osd/pg.cc
src/crimson/osd/pg.h

index ced84b9e8e09ffee63b4f767ca6e197c47f64ac2..8a2f92cdcaa1f8e1b1f6ab727c19d0f189d01703 100644 (file)
 #include "crimson/osd/pg_interval_interrupt_condition.h"
 #include "crimson/osd/shard_services.h"
 
-class PG;
-class PGLSFilter;
 class OSDOp;
 
 namespace crimson::osd {
+class PG;
 
 // OpsExecuter -- a class for executing ops targeting a certain object.
 class OpsExecuter {
index 44ebcd3c8829d88db106f681d314c5ee056b04d4..23f8e55b6a4784c01dd6983fbd10092990198b6a 100644 (file)
@@ -698,43 +698,36 @@ PG::interruptible_future<> PG::repair_object(
   return std::move(fut);
 }
 
-PG::do_osd_ops_iertr::future<Ref<MOSDOpReply>> 
-PG::do_osd_ops(
+template <class Ret, class SuccessFunc, class FailureFunc>
+PG::do_osd_ops_iertr::future<Ret> PG::do_osd_ops_execute(
+  OpsExecuter&& ox,
+  std::vector<OSDOp> ops,
   Ref<MOSDOp> m,
   ObjectContextRef obc,
-  const OpInfo &op_info)
+  const OpInfo &op_info,
+  SuccessFunc&& success_func,
+  FailureFunc&& failure_func)
 {
-  if (__builtin_expect(stopping, false)) {
-    throw crimson::common::system_shutdown_exception();
-  }
-
-  using osd_op_ierrorator = OpsExecuter::osd_op_ierrorator;
-  using osd_op_errorator = OpsExecuter::osd_op_errorator;
-  const auto oid = m->get_snapid() == CEPH_SNAPDIR ? m->get_hobj().get_head()
-                                                   : m->get_hobj();
-  auto ox = std::make_unique<OpsExecuter>(
-    obc, op_info, get_pool().info, get_backend(), *m);
-  return interruptor::do_for_each(
-    m->ops.begin(), m->ops.end(), [m, ox = ox.get()](OSDOp& osd_op) {
+  return interruptor::do_for_each(ops, [m, &ox](OSDOp& osd_op) {
     logger().debug(
-      "do_osd_ops: {} - object {} - handling op {}",
+      "do_osd_ops_execute: {} - object {} - handling op {}",
       *m,
-      ox->get_target(),
+      ox.get_target(),
       ceph_osd_op_name(osd_op.op.op));
-    return ox->execute_op(osd_op);
-  }).safe_then_interruptible([this, m, ox = ox.get(), &op_info] {
+    return ox.execute_op(osd_op);
+  }).safe_then_interruptible([this, m, &ox, &op_info] {
     logger().debug(
-      "do_osd_ops: {} - object {} all operations successful",
+      "do_osd_ops_execute: {} - object {} all operations successful",
       *m,
-      ox->get_target());
-    return std::move(*ox).flush_changes_n_do_ops_effects(
+      ox.get_target());
+    return std::move(ox).flush_changes_n_do_ops_effects(
       Ref<PG>{this},
       [this, m, &op_info] (auto&& txn,
                           auto&& obc,
                           auto&& osd_op_p,
-                           bool user_modify) -> osd_op_ierrorator::future<> {
+                           bool user_modify) {
        logger().debug(
-         "do_osd_ops: {} - object {} submitting txn",
+         "do_osd_ops_execute: {} - object {} submitting txn",
          *m,
          obc->get_oid());
         fill_op_params_bump_pg_version(osd_op_p, std::move(m), user_modify);
@@ -743,41 +736,76 @@ PG::do_osd_ops(
           std::move(obc),
           std::move(txn),
           std::move(osd_op_p));
-      });
-  }).safe_then_interruptible_tuple([this, m, obc, rvec = op_info.allows_returnvec()]()
-    -> PG::do_osd_ops_iertr::future<Ref<MOSDOpReply>> {
-    // TODO: should stop at the first op which returns a negative retval,
-    //       cmpext uses it for returning the index of first unmatched byte
-    int result = m->ops.empty() ? 0 : m->ops.back().rval.code;
-    if (result > 0 && !rvec) {
-      result = 0;
-    }
-    auto reply = make_message<MOSDOpReply>(m.get(),
-                                           result,
-                                           get_osdmap_epoch(),
-                                           0,
-                                           false);
-    reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
-    logger().debug(
-      "do_osd_ops: {} - object {} sending reply",
-      *m,
-      obc->obs.oi.soid);
-    return PG::do_osd_ops_ertr::make_ready_future<Ref<MOSDOpReply>>(
-      std::move(reply));
+    });
+  }).safe_then_interruptible_tuple([success_func=std::move(success_func)] {
+    return std::move(success_func)();
   }, crimson::ct_error::object_corrupted::handle([m, obc, this] {
-    return repair_object(m, obc->obs.oi.soid, obc->obs.oi.version)
-    .then_interruptible([] {
-      return PG::do_osd_ops_ertr::future<Ref<MOSDOpReply>>(
-      crimson::ct_error::eagain::make());
+    return repair_object(m, obc->obs.oi.soid, obc->obs.oi.version).then_interruptible([] {
+      return do_osd_ops_iertr::future<Ret>{crimson::ct_error::eagain::make()};
     });
-  }), osd_op_errorator::all_same_way([ox = std::move(ox),
-                                      m,
-                                      obc,
-                                      this] (const std::error_code& e) {
-    return handle_failed_op(e, std::move(obc), *ox, *m);
+  }), OpsExecuter::osd_op_errorator::all_same_way(
+    [&ox, m, obc, failure_func=std::move(failure_func), this] (const std::error_code& e) mutable {
+    const bool need_reload_obc = ox.has_seen_write();
+    logger().debug(
+      "do_osd_ops_execute: {} - object {} got error {}, {}; need_reload_obc {}",
+      m,
+      obc->obs.oi.soid,
+      e.value(),
+      e.message(),
+      need_reload_obc);
+    return (
+      need_reload_obc ? reload_obc(*obc)
+                      : interruptor::make_interruptible(load_obc_ertr::now())
+    ).safe_then_interruptible([&e, failure_func=std::move(failure_func)] {
+      return std::move(failure_func)(e);
+    }, load_obc_ertr::assert_all{ "can't live with object state messed up" });
   }));
 }
 
+PG::do_osd_ops_iertr::future<Ref<MOSDOpReply>>
+PG::do_osd_ops(
+  Ref<MOSDOp> m,
+  ObjectContextRef obc,
+  const OpInfo &op_info)
+{
+  if (__builtin_expect(stopping, false)) {
+    throw crimson::common::system_shutdown_exception();
+  }
+  auto ox = std::make_unique<OpsExecuter>(
+    obc, op_info, get_pool().info, get_backend(), *m);
+  return do_osd_ops_execute<Ref<MOSDOpReply>>(
+    std::move(*ox), m->ops, m, obc, op_info,
+    [this, m, obc, rvec = op_info.allows_returnvec()] {
+      // TODO: should stop at the first op which returns a negative retval,
+      //       cmpext uses it for returning the index of first unmatched byte
+      int result = m->ops.empty() ? 0 : m->ops.back().rval.code;
+      if (result > 0 && !rvec) {
+        result = 0;
+      }
+      auto reply = make_message<MOSDOpReply>(m.get(),
+                                             result,
+                                             get_osdmap_epoch(),
+                                             0,
+                                             false);
+      reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+      logger().debug(
+        "do_osd_ops: {} - object {} sending reply",
+        *m,
+        obc->obs.oi.soid);
+      return do_osd_ops_iertr::make_ready_future<Ref<MOSDOpReply>>(
+        std::move(reply));
+    },
+    [m, this] (const std::error_code& e) {
+      auto reply = make_message<MOSDOpReply>(
+        m.get(), -e.value(), get_osdmap_epoch(), 0, false);
+      reply->set_enoent_reply_versions(
+        peering_state.get_info().last_update,
+        peering_state.get_info().last_user_version);
+      return do_osd_ops_iertr::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+    }
+  ).finally([ox_deleter=std::move(ox)] {});
+}
+
 PG::interruptible_future<Ref<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
 {
   if (__builtin_expect(stopping, false)) {
index 9de11f7dda9e47a9250f1ebc77c1e2bdb6350c5f..373f92562cc570341735e93daf5f2810f9ff1e1b 100644 (file)
@@ -24,6 +24,7 @@
 #include "crimson/os/futurized_collection.h"
 #include "crimson/osd/backfill_state.h"
 #include "crimson/osd/pg_interval_interrupt_condition.h"
+#include "crimson/osd/ops_executer.h"
 #include "crimson/osd/osd_operations/client_request.h"
 #include "crimson/osd/osd_operations/peering_event.h"
 #include "crimson/osd/osd_operations/replicated_request.h"
@@ -577,6 +578,15 @@ private:
     Ref<MOSDOp> m,
     ObjectContextRef obc,
     const OpInfo &op_info);
+  template <class Ret, class SuccessFunc, class FailureFunc>
+  do_osd_ops_iertr::future<Ret> do_osd_ops_execute(
+    OpsExecuter&& ox,
+    std::vector<OSDOp> ops,
+    Ref<MOSDOp> m,
+    ObjectContextRef obc,
+    const OpInfo &op_info,
+    SuccessFunc&& success_func,
+    FailureFunc&& failure_func);
   interruptible_future<Ref<MOSDOpReply>> do_pg_ops(Ref<MOSDOp> m);
   interruptible_future<> submit_transaction(const OpInfo& op_info,
                                       ObjectContextRef&& obc,