]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson: introduce PG::run_executer,submit_executer
authorSamuel Just <sjust@redhat.com>
Thu, 26 Sep 2024 21:10:06 +0000 (14:10 -0700)
committerSamuel Just <sjust@redhat.com>
Tue, 15 Oct 2024 03:37:26 +0000 (20:37 -0700)
These are intended to replace do_osd_ops*.  The implementation
is simpler and does not involve passing success and failure
callbacks.  It also moves responsibility for dealing with
the MOSDOpReply and client related error handling over to
ClientRequest.

do_osd_op* will be removed once users are switched over.

Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/osd/pg.cc
src/crimson/osd/pg.h

index 26d1fa883bbce56e3f9714906533e5a9771301a4..bb5c1e9000bafebe08db049d00304796a52d063b 100644 (file)
@@ -13,6 +13,9 @@
 #include <boost/range/numeric.hpp>
 #include <fmt/format.h>
 #include <fmt/ostream.h>
+
+#include <seastar/util/defer.hh>
+
 #include "include/utime_fmt.h"
 
 #include "common/hobject.h"
@@ -1251,6 +1254,82 @@ PG::interruptible_future<eversion_t> PG::submit_error_log(
   });
 }
 
+PG::run_executer_fut PG::run_executer(
+  seastar::lw_shared_ptr<OpsExecuter> ox,
+  ObjectContextRef obc,
+  const OpInfo &op_info,
+  std::vector<OSDOp>& ops)
+{
+  LOG_PREFIX(PG::run_executer);
+  auto rollbacker = ox->create_rollbacker(
+    [stored_obc=duplicate_obc(obc)](auto &obc) mutable {
+      obc->update_from(*stored_obc);
+    });
+  auto rollback_on_error = seastar::defer([&rollbacker] {
+    rollbacker.rollback_obc_if_modified();
+  });
+
+  for (auto &op: ops) {
+    DEBUGDPP("object {} handle op {}", *this, ox->get_target(), op);
+    co_await ox->execute_op(op);
+  }
+  DEBUGDPP("object {} all operations successful", *this, ox->get_target());
+
+  // check for full
+  if ((ox->delta_stats.num_bytes > 0 ||
+       ox->delta_stats.num_objects > 0) &&
+      get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL)) {
+    const auto& m = ox->get_message();
+    if (m.get_reqid().name.is_mds() ||   // FIXME: ignore MDS for now
+       m.has_flag(CEPH_OSD_FLAG_FULL_FORCE)) {
+      INFODPP("full, but proceeding due to FULL_FORCE, or MDS", *this);
+    } else if (m.has_flag(CEPH_OSD_FLAG_FULL_TRY)) {
+      // they tried, they failed.
+      INFODPP("full, replying to FULL_TRY op", *this);
+      if (get_pgpool().info.has_flag(pg_pool_t::FLAG_FULL_QUOTA)) {
+       co_await run_executer_fut(
+         crimson::ct_error::edquot::make());
+      } else {
+       co_await run_executer_fut(
+         crimson::ct_error::enospc::make());
+      }
+    } else {
+      // drop request
+      INFODPP("full, dropping request (bad client)", *this);
+      co_await run_executer_fut(
+       crimson::ct_error::eagain::make());
+    }
+  }
+  rollback_on_error.cancel();
+}
+
+PG::submit_executer_fut PG::submit_executer(
+  seastar::lw_shared_ptr<OpsExecuter> ox,
+  const std::vector<OSDOp>& ops)
+{
+  LOG_PREFIX(PG::submit_executer);
+  // transaction must commit at this point
+  return std::move(
+    *ox
+  ).flush_changes_n_do_ops_effects(
+    ops,
+    snap_mapper,
+    osdriver,
+    [FNAME, this](auto&& txn,
+                 auto&& obc,
+                 auto&& osd_op_p,
+                 auto&& log_entries) {
+      DEBUGDPP("object {} submitting txn", *this, obc->get_oid());
+      mutate_object(obc, txn, osd_op_p);
+      return submit_transaction(
+       std::move(obc),
+       std::move(txn),
+       std::move(osd_op_p),
+       std::move(log_entries));
+    });
+}
+
+
 PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<MURef<MOSDOpReply>>>
 PG::do_osd_ops(
   Ref<MOSDOp> m,
index 5bd5c3aeff849b4afc130e5a442ee0c6fd4f714e..c91f93171dbc168559150551788325e3204e1646 100644 (file)
@@ -645,6 +645,33 @@ private:
     }
   } background_process_lock;
 
+  using run_executer_ertr = crimson::compound_errorator_t<
+    OpsExecuter::osd_op_errorator,
+    crimson::errorator<
+      crimson::ct_error::edquot,
+      crimson::ct_error::eagain,
+      crimson::ct_error::enospc
+      >
+    >;
+  using run_executer_iertr = crimson::interruptible::interruptible_errorator<
+    ::crimson::osd::IOInterruptCondition,
+    run_executer_ertr>;
+  using run_executer_fut = run_executer_iertr::future<>;
+  run_executer_fut run_executer(
+    seastar::lw_shared_ptr<OpsExecuter> ox,
+    ObjectContextRef obc,
+    const OpInfo &op_info,
+    std::vector<OSDOp>& ops);
+
+  using submit_executer_ret = std::tuple<
+    interruptible_future<>,
+    interruptible_future<>>;
+  using submit_executer_fut = interruptible_future<
+    submit_executer_ret>;
+  submit_executer_fut submit_executer(
+    seastar::lw_shared_ptr<OpsExecuter> ox,
+    const std::vector<OSDOp>& ops);
+
   using do_osd_ops_ertr = crimson::errorator<
    crimson::ct_error::eagain>;
   using do_osd_ops_iertr =