]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson: convert client_request, internal_client_request, snaptrim_event to use obc...
authorSamuel Just <sjust@redhat.com>
Thu, 24 Oct 2024 23:39:46 +0000 (16:39 -0700)
committerSamuel Just <sjust@redhat.com>
Fri, 13 Dec 2024 20:32:26 +0000 (12:32 -0800)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/osd/osd_operations/client_request.cc
src/crimson/osd/osd_operations/client_request.h
src/crimson/osd/osd_operations/internal_client_request.cc
src/crimson/osd/osd_operations/internal_client_request.h
src/crimson/osd/osd_operations/snaptrim_event.cc
src/crimson/osd/osd_operations/snaptrim_event.h

index ee78b5632100221fa4ceda21030a4c04be114136..fcd0f318db2ef1cec7a19e93a050a2f21ee2fb4a 100644 (file)
@@ -101,7 +101,7 @@ PerShardPipeline &ClientRequest::get_pershard_pipeline(
   return shard_services.get_client_request_pipeline();
 }
 
-ClientRequest::PGPipeline &ClientRequest::client_pp(PG &pg)
+CommonPGPipeline &ClientRequest::client_pp(PG &pg)
 {
   return pg.request_pg_pipeline;
 }
@@ -140,6 +140,15 @@ ClientRequest::interruptible_future<> ClientRequest::with_pg_process_interruptib
 
   DEBUGDPP("{} start", *pgref, *this);
   PG &pg = *pgref;
+
+  DEBUGDPP("{}.{}: entering wait_pg_ready stage",
+          *pgref, *this, this_instance_id);
+  // The prior stage is OrderedExclusive (PerShardPipeline::create_or_wait_pg)
+  // and wait_pg_ready is OrderedConcurrent.  This transition, therefore, cannot
+  // block and using enter_stage_sync is legal and more efficient than
+  // enter_stage.
+  ihref.enter_stage_sync(client_pp(pg).wait_pg_ready, *this);
+
   if (!m->get_hobj().get_key().empty()) {
     // There are no users of locator. It was used to ensure that multipart-upload
     // parts would end up in the same PG so that they could be clone_range'd into
@@ -156,28 +165,22 @@ ClientRequest::interruptible_future<> ClientRequest::with_pg_process_interruptib
     DEBUGDPP("{}: discarding {}", *pgref, *this, this_instance_id);
     co_return;
   }
-  DEBUGDPP("{}.{}: entering await_map stage",
-          *pgref, *this, this_instance_id);
-  co_await ihref.enter_stage<interruptor>(client_pp(pg).await_map, *this);
-  DEBUGDPP("{}.{}: entered await_map stage, waiting for map",
-          pg, *this, this_instance_id);
+
   auto map_epoch = co_await interruptor::make_interruptible(
     ihref.enter_blocker(
       *this, pg.osdmap_gate, &decltype(pg.osdmap_gate)::wait_for_map,
       m->get_min_epoch(), nullptr));
 
-  DEBUGDPP("{}.{}: map epoch got {}, entering wait_for_active",
+  DEBUGDPP("{}.{}: waited for epoch {}, waiting for active",
           pg, *this, this_instance_id, map_epoch);
-  co_await ihref.enter_stage<interruptor>(client_pp(pg).wait_for_active, *this);
-
-  DEBUGDPP("{}.{}: entered wait_for_active stage, waiting for active",
-          pg, *this, this_instance_id);
   co_await interruptor::make_interruptible(
     ihref.enter_blocker(
       *this,
       pg.wait_for_active_blocker,
       &decltype(pg.wait_for_active_blocker)::wait));
 
+  co_await ihref.enter_stage<interruptor>(client_pp(pg).get_obc, *this);
+
   if (int res = op_info.set_from_op(&*m, *pg.get_osdmap());
       res != 0) {
     co_await reply_op_error(pgref, res);
@@ -345,7 +348,13 @@ ClientRequest::process_op(
   instance_handle_t &ihref, Ref<PG> pg, unsigned this_instance_id)
 {
   LOG_PREFIX(ClientRequest::process_op);
-  ihref.enter_stage_sync(client_pp(*pg).recover_missing, *this);
+  ihref.obc_orderer = pg->obc_loader.get_obc_orderer(m->get_hobj());
+  auto obc_manager = pg->obc_loader.get_obc_manager(
+    *(ihref.obc_orderer),
+    m->get_hobj());
+  co_await ihref.enter_stage<interruptor>(
+    ihref.obc_orderer->obc_pp().process, *this);
+
   if (!pg->is_primary()) {
     DEBUGDPP(
       "Skipping recover_missings on non primary pg for soid {}",
@@ -365,16 +374,6 @@ ClientRequest::process_op(
     }
   }
 
-  /**
-   * The previous stage of recover_missing is a concurrent phase.
-   * Checking for already_complete requests must done exclusively.
-   * Since get_obc is also an exclusive stage, we can merge both stages into
-   * a single stage and avoid stage switching overhead.
-   */
-  DEBUGDPP("{}.{}: entering check_already_complete_get_obc",
-          *pg, *this, this_instance_id);
-  co_await ihref.enter_stage<interruptor>(
-    client_pp(*pg).check_already_complete_get_obc, *this);
   DEBUGDPP("{}.{}: checking already_complete",
           *pg, *this, this_instance_id);
   auto completed = co_await pg->already_complete(m->get_reqid());
@@ -402,12 +401,6 @@ ClientRequest::process_op(
   DEBUGDPP("{}.{}: past scrub blocker, getting obc",
           *pg, *this, this_instance_id);
 
-  auto obc_manager = pg->obc_loader.get_obc_manager(m->get_hobj());
-
-  // initiate load_and_lock in order, but wait concurrently
-  ihref.enter_stage_sync(
-      client_pp(*pg).lock_obc, *this);
-
   int load_err = co_await pg->obc_loader.load_and_lock(
     obc_manager, pg->get_lock_type(op_info)
   ).si_then([]() -> int {
@@ -425,13 +418,8 @@ ClientRequest::process_op(
     co_return;
   }
 
-  DEBUGDPP("{}.{}: got obc {}, entering process stage",
+  DEBUGDPP("{}.{}: obc {} loaded and locked, calling do_process",
           *pg, *this, this_instance_id, obc_manager.get_obc()->obs);
-  co_await ihref.enter_stage<interruptor>(
-    client_pp(*pg).process, *this);
-
-  DEBUGDPP("{}.{}: in process stage, calling do_process",
-          *pg, *this, this_instance_id);
   co_await do_process(
     ihref, pg, obc_manager.get_obc(), this_instance_id
   );
@@ -553,12 +541,14 @@ ClientRequest::do_process(
        std::move(ox), m->ops);
       co_await std::move(submitted);
     }
-    co_await ihref.enter_stage<interruptor>(client_pp(*pg).wait_repop, *this);
+    co_await ihref.enter_stage<interruptor>(
+      ihref.obc_orderer->obc_pp().wait_repop, *this);
 
     co_await std::move(all_completed);
   }
 
-  co_await ihref.enter_stage<interruptor>(client_pp(*pg).send_reply, *this);
+  co_await ihref.enter_stage<interruptor>(
+    ihref.obc_orderer->obc_pp().send_reply, *this);
 
   if (ret) {
     int err = -ret->value();
index 6d1043e27835eaf88f5063b568f9d77b68e5e9a3..bbf4ed33e451a6ecea6c07634a5a7d3a01c37b16 100644 (file)
@@ -11,6 +11,7 @@
 #include "osd/osd_op_util.h"
 #include "crimson/net/Connection.h"
 #include "crimson/osd/object_context.h"
+#include "crimson/osd/object_context_loader.h"
 #include "crimson/osd/osdmap_gate.h"
 #include "crimson/osd/osd_operation.h"
 #include "crimson/osd/osd_operations/client_request_common.h"
@@ -93,20 +94,18 @@ public:
     // don't leave any references on the source core, so we just bypass it by using
     // intrusive_ptr instead.
     using ref_t = boost::intrusive_ptr<instance_handle_t>;
+    std::optional<ObjectContextLoader::Orderer> obc_orderer;
     PipelineHandle handle;
 
     std::tuple<
-      PGPipeline::AwaitMap::BlockingEvent,
+      CommonPGPipeline::WaitPGReady::BlockingEvent,
       PG_OSDMapGate::OSDMapBlocker::BlockingEvent,
-      PGPipeline::WaitForActive::BlockingEvent,
       PGActivationBlocker::BlockingEvent,
-      PGPipeline::RecoverMissing::BlockingEvent,
+      CommonPGPipeline::GetOBC::BlockingEvent,
+      CommonOBCPipeline::Process::BlockingEvent,
       scrub::PGScrubber::BlockingEvent,
-      PGPipeline::CheckAlreadyCompleteGetObc::BlockingEvent,
-      PGPipeline::LockOBC::BlockingEvent,
-      PGPipeline::Process::BlockingEvent,
-      PGPipeline::WaitRepop::BlockingEvent,
-      PGPipeline::SendReply::BlockingEvent,
+      CommonOBCPipeline::WaitRepop::BlockingEvent,
+      CommonOBCPipeline::SendReply::BlockingEvent,
       CompletionEvent
       > pg_tracking_events;
 
@@ -293,7 +292,7 @@ private:
       unsigned this_instance_id);
   bool is_pg_op() const;
 
-  PGPipeline &client_pp(PG &pg);
+  CommonPGPipeline &client_pp(PG &pg);
 
   template <typename Errorator>
   using interruptible_errorator =
index 58f3bafe41092ee874e31916859bbb08750e6631..b8f7646bc7433659ced647d737f5ca85bd709516 100644 (file)
@@ -57,7 +57,12 @@ InternalClientRequest::with_interruption()
   LOG_PREFIX(InternalClientRequest::with_interruption);
   assert(pg->is_active());
 
-  co_await enter_stage<interruptor>(client_pp().recover_missing);
+  obc_orderer = pg->obc_loader.get_obc_orderer(get_target_oid());
+  auto obc_manager = pg->obc_loader.get_obc_manager(
+    *obc_orderer,
+    get_target_oid());
+
+  co_await enter_stage<interruptor>(obc_orderer->obc_pp().process);
 
   bool unfound = co_await do_recover_missing(
     pg, get_target_oid(), osd_reqid_t());
@@ -67,10 +72,8 @@ InternalClientRequest::with_interruption()
       std::make_error_code(std::errc::operation_canceled),
       fmt::format("{} is unfound, drop it!", get_target_oid()));
   }
-  co_await enter_stage<interruptor>(
-    client_pp().check_already_complete_get_obc);
 
-  DEBUGI("{}: getting obc lock", *this);
+  DEBUGI("{}: generating ops", *this);
 
   auto osd_ops = create_osd_ops();
 
@@ -80,21 +83,12 @@ InternalClientRequest::with_interruption()
     std::as_const(osd_ops), pg->get_pgid().pgid, *pg->get_osdmap());
   assert(ret == 0);
 
-  auto obc_manager = pg->obc_loader.get_obc_manager(get_target_oid());
-
-  // initiate load_and_lock in order, but wait concurrently
-  enter_stage_sync(client_pp().lock_obc);
-
   co_await pg->obc_loader.load_and_lock(
     obc_manager, pg->get_lock_type(op_info)
   ).handle_error_interruptible(
     crimson::ct_error::assert_all("unexpected error")
   );
 
-  DEBUGDPP("{}: got obc {}, entering process stage",
-          *pg, *this, obc_manager.get_obc()->obs);
-  co_await enter_stage<interruptor>(client_pp().process);
-
   auto params = get_do_osd_ops_params();
   OpsExecuter ox(
     pg, obc_manager.get_obc(), op_info, params, params.get_connection(),
@@ -114,6 +108,9 @@ InternalClientRequest::with_interruption()
     std::move(ox), osd_ops);
 
   co_await std::move(submitted);
+
+  co_await enter_stage<interruptor>(obc_orderer->obc_pp().wait_repop);
+
   co_await std::move(completed);
 
   DEBUGDPP("{}: complete", *pg, *this);
index cc457f35412e70cf034c3b8762aba3e20a4e9875..1cfde4ab080ecafd4afa6e41324ae031a1cdb49b 100644 (file)
@@ -4,6 +4,7 @@
 #pragma once
 
 #include "crimson/common/type_helpers.h"
+#include "crimson/osd/object_context_loader.h"
 #include "crimson/osd/osd_operation.h"
 #include "crimson/osd/osd_operations/client_request_common.h"
 #include "crimson/osd/pg.h"
@@ -48,6 +49,7 @@ private:
   Ref<PG> pg;
   epoch_t start_epoch;
   OpInfo op_info;
+  std::optional<ObjectContextLoader::Orderer> obc_orderer;
   PipelineHandle handle;
 
 public:
@@ -55,12 +57,8 @@ public:
 
   std::tuple<
     StartEvent,
-    CommonPGPipeline::WaitForActive::BlockingEvent,
-    PGActivationBlocker::BlockingEvent,
-    CommonPGPipeline::RecoverMissing::BlockingEvent,
-    CommonPGPipeline::CheckAlreadyCompleteGetObc::BlockingEvent,
-    CommonPGPipeline::LockOBC::BlockingEvent,
-    CommonPGPipeline::Process::BlockingEvent,
+    CommonOBCPipeline::Process::BlockingEvent,
+    CommonOBCPipeline::WaitRepop::BlockingEvent,
     CompletionEvent
   > tracking_events;
 };
index 459c98bb9c0f6e1720b5fd62dcac38f6c8f87356..f8fb7aef6f26ef4fb7df5ff0ee4490fdf9a624f2 100644 (file)
@@ -388,20 +388,24 @@ SnapTrimObjSubEvent::remove_or_update(
 SnapTrimObjSubEvent::snap_trim_obj_subevent_ret_t
 SnapTrimObjSubEvent::start()
 {
+  obc_orderer = pg->obc_loader.get_obc_orderer(
+    coid);
+
   ceph_assert(pg->is_active_clean());
 
-  auto exit_handle = seastar::defer([this] {
-    logger().debug("{}: exit", *this);
-    handle.exit();
+  auto exit_handle = seastar::defer([this, opref = IRef(this)] {
+    logger().debug("{}: exit", *opref);
+    std::ignore = handle.complete().then([opref = std::move(opref)] {});
   });
 
   co_await enter_stage<interruptor>(
-    client_pp().check_already_complete_get_obc);
+    obc_orderer->obc_pp().process);
 
   logger().debug("{}: getting obc for {}", *this, coid);
 
 
   auto obc_manager = pg->obc_loader.get_obc_manager(
+    *obc_orderer,
     coid, false /* resolve_oid */);
 
   co_await pg->obc_loader.load_and_lock(
@@ -413,7 +417,6 @@ SnapTrimObjSubEvent::start()
 
   logger().debug("{}: got obc={}", *this, obc_manager.get_obc()->get_oid());
 
-  co_await enter_stage<interruptor>(client_pp().process);
   auto all_completed = interruptor::now();
   {
     // as with PG::submit_executer, we need to build the pg log entries
@@ -440,12 +443,11 @@ SnapTrimObjSubEvent::start()
     co_await std::move(submitted);
   }
 
-  co_await enter_stage<interruptor>(client_pp().wait_repop);
+  co_await enter_stage<interruptor>(obc_orderer->obc_pp().wait_repop);
 
   co_await std::move(all_completed);
 
   logger().debug("{}: completed", *this);
-  co_await interruptor::make_interruptible(handle.complete());
 }
 
 void SnapTrimObjSubEvent::print(std::ostream &lhs) const
index fc99138d62f4158c9639592297c5c62c318efcdb..a2b4d3575684bc467e273f000f1929ac24ddcbfd 100644 (file)
@@ -6,6 +6,7 @@
 #include <iostream>
 #include <seastar/core/future.hh>
 
+#include "crimson/osd/object_context_loader.h"
 #include "crimson/osd/osdmap_gate.h"
 #include "crimson/osd/osd_operation.h"
 #include "crimson/common/subop_blocker.h"
@@ -154,6 +155,7 @@ private:
   }
 
   Ref<PG> pg;
+  std::optional<ObjectContextLoader::Orderer> obc_orderer;
   PipelineHandle handle;
   osd_op_params_t osd_op_p;
   const hobject_t coid;
@@ -165,9 +167,8 @@ public:
 
   std::tuple<
     StartEvent,
-    CommonPGPipeline::CheckAlreadyCompleteGetObc::BlockingEvent,
-    CommonPGPipeline::Process::BlockingEvent,
-    CommonPGPipeline::WaitRepop::BlockingEvent,
+    CommonOBCPipeline::Process::BlockingEvent,
+    CommonOBCPipeline::WaitRepop::BlockingEvent,
     CompletionEvent
   > tracking_events;
 };