]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
src/crimson/common: decouple handle from stage
authorSamuel Just <sjust@redhat.com>
Mon, 18 Jan 2021 22:06:18 +0000 (14:06 -0800)
committerSamuel Just <sjust@redhat.com>
Mon, 1 Feb 2021 20:48:39 +0000 (12:48 -0800)
We're going to introduce stages with other properties.

Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/common/operation.cc
src/crimson/common/operation.h
src/crimson/osd/osd_operations/background_recovery.h
src/crimson/osd/osd_operations/client_request.h
src/crimson/osd/osd_operations/peering_event.h
src/crimson/osd/osd_operations/pg_advance_map.h
src/crimson/osd/osd_operations/replicated_request.h

index 1657b7a5ebaf9a0cc2437eebdf7e01a11acf14a3..0a4c1a0cd0f3fcfeb6bdfd2bc3a445c76a7970e6 100644 (file)
@@ -62,29 +62,7 @@ void AggregateBlocker::dump_detail(ceph::Formatter *f) const
   f->close_section();
 }
 
-void OrderedPipelinePhase::Handle::exit()
-{
-  if (phase) {
-    phase->mutex.unlock();
-    phase = nullptr;
-  }
-}
-
-blocking_future<> OrderedPipelinePhase::Handle::enter(
-  OrderedPipelinePhase &new_phase)
-{
-  auto fut = new_phase.mutex.lock();
-  exit();
-  phase = &new_phase;
-  return new_phase.make_blocking_future(std::move(fut));
-}
-
-OrderedPipelinePhase::Handle::~Handle()
-{
-  exit();
-}
-
-void OrderedPipelinePhase::dump_detail(ceph::Formatter* f) const
+void OrderedExclusivePhase::dump_detail(ceph::Formatter* f) const
 {
 }
 
index 8eaefbdbfeabf1fa3083c3bdcf72a56556d37e84..d81ff0075e633d27b10d56eefa125b950fa23968 100644 (file)
@@ -43,8 +43,10 @@ class blocking_future_detail {
   blocking_future_detail(Blocker *b, Fut &&f)
     : blocker(b), fut(std::move(f)) {}
 
-  template <typename V, typename U>
-  friend blocking_future_detail<seastar::future<V>> make_ready_blocking_future(U&& args);
+  template <typename V, typename... U>
+  friend blocking_future_detail<seastar::future<V>>
+  make_ready_blocking_future(U&&... args);
+
   template <typename V, typename Exception>
   friend blocking_future_detail<seastar::future<V>>
   make_exception_blocking_future(Exception&& e);
@@ -68,11 +70,11 @@ public:
 template <typename T=void>
 using blocking_future = blocking_future_detail<seastar::future<T>>;
 
-template <typename V, typename U>
-blocking_future_detail<seastar::future<V>> make_ready_blocking_future(U&& args) {
+template <typename V=void, typename... U>
+blocking_future_detail<seastar::future<V>> make_ready_blocking_future(U&&... args) {
   return blocking_future<V>(
     nullptr,
-    seastar::make_ready_future<V>(std::forward<U>(args)));
+    seastar::make_ready_future<V>(std::forward<U>(args)...));
 }
 
 template <typename V, typename Exception>
@@ -275,6 +277,87 @@ protected:
   }
 };
 
+class PipelineExitBarrierI {
+public:
+  using Ref = std::unique_ptr<PipelineExitBarrierI>;
+
+  /// Waits for exit barrier
+  virtual seastar::future<> wait() = 0;
+
+  /// Releases pipeline stage, can only be called after wait
+  virtual void exit() = 0;
+
+  /// Releases pipeline resources without waiting on barrier
+  virtual void cancel() = 0;
+
+  /// Must ensure that resources are released, likely by calling cancel()
+  virtual ~PipelineExitBarrierI() {}
+};
+
+class PipelineStageI : public Blocker {
+public:
+  virtual seastar::future<PipelineExitBarrierI::Ref> enter() = 0;
+};
+
+class PipelineHandle {
+  PipelineExitBarrierI::Ref barrier;
+
+  auto wait_barrier() {
+    return barrier ? barrier->wait() : seastar::now();
+  }
+
+public:
+  PipelineHandle() = default;
+
+  PipelineHandle(const PipelineHandle&) = delete;
+  PipelineHandle(PipelineHandle&&) = default;
+  PipelineHandle &operator=(const PipelineHandle&) = delete;
+  PipelineHandle &operator=(PipelineHandle&&) = default;
+
+  /**
+     * Returns a future which unblocks when the handle has entered the passed
+     * OrderedPipelinePhase.  If already in a phase, enter will also release
+     * that phase after placing itself in the queue for the next one to preserve
+     * ordering.
+     */
+  template <typename T>
+  blocking_future<> enter(T &t) {
+    /* Strictly speaking, we probably want the blocker to be registered on
+     * the previous stage until wait_barrier() resolves and on the next
+     * until enter() resolves, but blocking_future will need some refactoring
+     * to permit that.  TODO
+     */
+    return t.make_blocking_future(
+      wait_barrier().then([this, &t] {
+       auto fut = t.enter();
+       exit();
+       return std::move(fut).then([this](auto &&barrier_ref) {
+         barrier = std::move(barrier_ref);
+         return seastar::now();
+       });
+      })
+    );
+  }
+
+  /**
+   * Completes pending exit barrier without entering a new one.
+   */
+  seastar::future<> complete() {
+    auto ret = wait_barrier();
+    barrier.reset();
+    return ret;
+  }
+
+  /**
+   * Exits current phase, skips exit barrier, should only be used for op
+   * failure.  Permitting the handle to be destructed as the same effect.
+   */
+  void exit() {
+    barrier.reset();
+  }
+
+};
+
 /**
  * Ensures that at most one op may consider itself in the phase at a time.
  * Ops will see enter() unblock in the order in which they tried to enter
@@ -282,45 +365,49 @@ protected:
  * resolve) a new phase prior to exiting the previous one will ensure that
  * the op ordering is preserved.
  */
-class OrderedPipelinePhase : public Blocker {
-private:
+class OrderedExclusivePhase : public PipelineStageI {
   void dump_detail(ceph::Formatter *f) const final;
   const char *get_type_name() const final {
     return name;
   }
 
-public:
-  /**
-   * Used to encapsulate pipeline residency state.
-   */
-  class Handle {
-    OrderedPipelinePhase *phase = nullptr;
-
+  class ExitBarrier : public PipelineExitBarrierI {
+    OrderedExclusivePhase *phase;
   public:
-    Handle() = default;
+    ExitBarrier(OrderedExclusivePhase *phase) : phase(phase) {}
 
-    Handle(const Handle&) = delete;
-    Handle(Handle&&) = default;
-    Handle &operator=(const Handle&) = delete;
-    Handle &operator=(Handle&&) = default;
+    seastar::future<> wait() final {
+      return seastar::now();
+    }
 
-    /**
-     * Returns a future which unblocks when the handle has entered the passed
-     * OrderedPipelinePhase.  If already in a phase, enter will also release
-     * that phase after placing itself in the queue for the next one to preserve
-     * ordering.
-     */
-    blocking_future<> enter(OrderedPipelinePhase &phase);
+    void exit() final {
+      if (phase) {
+       phase->exit();
+       phase = nullptr;
+      }
+    }
 
-    /**
-     * Releases the current phase if there is one.  Called in ~Handle().
-     */
-    void exit();
+    void cancel() final {
+      exit();
+    }
 
-    ~Handle();
+    ~ExitBarrier() final {
+      cancel();
+    }
   };
 
-  OrderedPipelinePhase(const char *name) : name(name) {}
+  void exit() {
+    mutex.unlock();
+  }
+
+public:
+  seastar::future<PipelineExitBarrierI::Ref> enter() final {
+    return mutex.lock().then([this] {
+      return PipelineExitBarrierI::Ref(new ExitBarrier{this});
+    });
+  }
+
+  OrderedExclusivePhase(const char *name) : name(name) {}
 
 private:
   const char * name;
index 37e46c588a84da3f26ef1de3601e511c1f0162ba..b9adb87d9c6d9a9f156aee684595e35e57ff9360 100644 (file)
@@ -86,7 +86,7 @@ private:
 class BackfillRecovery final : public BackgroundRecovery {
 public:
   class BackfillRecoveryPipeline {
-    OrderedPipelinePhase process = {
+    OrderedExclusivePhase process = {
       "BackfillRecovery::PGPipeline::process"
     };
     friend class BackfillRecovery;
@@ -104,7 +104,7 @@ public:
 
 private:
   boost::intrusive_ptr<const boost::statechart::event_base> evt;
-  OrderedPipelinePhase::Handle handle;
+  PipelineHandle handle;
   seastar::future<bool> do_recovery() override;
 };
 
index ea3124a93e532d0e2e1d86c27b683ab137990fa3..099a8067ee6621349b6fe73290c0314dc529c3e1 100644 (file)
@@ -18,32 +18,32 @@ class ClientRequest final : public OperationT<ClientRequest> {
   crimson::net::ConnectionRef conn;
   Ref<MOSDOp> m;
   OpInfo op_info;
-  OrderedPipelinePhase::Handle handle;
+  PipelineHandle handle;
 
 public:
   class ConnectionPipeline {
-    OrderedPipelinePhase await_map = {
+    OrderedExclusivePhase await_map = {
       "ClientRequest::ConnectionPipeline::await_map"
     };
-    OrderedPipelinePhase get_pg = {
+    OrderedExclusivePhase get_pg = {
       "ClientRequest::ConnectionPipeline::get_pg"
     };
     friend class ClientRequest;
   };
   class PGPipeline {
-    OrderedPipelinePhase await_map = {
+    OrderedExclusivePhase await_map = {
       "ClientRequest::PGPipeline::await_map"
     };
-    OrderedPipelinePhase wait_for_active = {
+    OrderedExclusivePhase wait_for_active = {
       "ClientRequest::PGPipeline::wait_for_active"
     };
-    OrderedPipelinePhase recover_missing = {
+    OrderedExclusivePhase recover_missing = {
       "ClientRequest::PGPipeline::recover_missing"
     };
-    OrderedPipelinePhase get_obc = {
+    OrderedExclusivePhase get_obc = {
       "ClientRequest::PGPipeline::get_obc"
     };
-    OrderedPipelinePhase process = {
+    OrderedExclusivePhase process = {
       "ClientRequest::PGPipeline::process"
     };
     friend class ClientRequest;
index 3a6c0678c46ac752cf8e2443a397be72bcb4b4d5..45c57f19d834c522b4e4633982711dace08e46dc 100644 (file)
@@ -26,10 +26,10 @@ public:
   static constexpr OperationTypeCode type = OperationTypeCode::peering_event;
 
   class PGPipeline {
-    OrderedPipelinePhase await_map = {
+    OrderedExclusivePhase await_map = {
       "PeeringEvent::PGPipeline::await_map"
     };
-    OrderedPipelinePhase process = {
+    OrderedExclusivePhase process = {
       "PeeringEvent::PGPipeline::process"
     };
     friend class PeeringEvent;
@@ -37,7 +37,7 @@ public:
   };
 
 protected:
-  OrderedPipelinePhase::Handle handle;
+  PipelineHandle handle;
   PGPipeline &pp(PG &pg);
 
   ShardServices &shard_services;
@@ -102,10 +102,10 @@ protected:
 
 public:
   class ConnectionPipeline {
-    OrderedPipelinePhase await_map = {
+    OrderedExclusivePhase await_map = {
       "PeeringRequest::ConnectionPipeline::await_map"
     };
-    OrderedPipelinePhase get_pg = {
+    OrderedExclusivePhase get_pg = {
       "PeeringRequest::ConnectionPipeline::get_pg"
     };
     friend class RemotePeeringEvent;
index 1b27037eb413ffedca16fdb4c98ff4c555b34907..f19c8631ede90c2de863a729796fa06423131abf 100644 (file)
@@ -25,7 +25,7 @@ public:
   static constexpr OperationTypeCode type = OperationTypeCode::pg_advance_map;
 
 protected:
-  OrderedPipelinePhase::Handle handle;
+  PipelineHandle handle;
 
   OSD &osd;
   Ref<PG> pg;
index 8e9cfc9fe41893563ad12a0d35746f49c6395c24..17e9e537944761ec40da12c4e37f4dcba750b1d8 100644 (file)
@@ -21,19 +21,19 @@ class PG;
 class RepRequest final : public OperationT<RepRequest> {
 public:
   class ConnectionPipeline {
-    OrderedPipelinePhase await_map = {
+    OrderedExclusivePhase await_map = {
       "RepRequest::ConnectionPipeline::await_map"
     };
-    OrderedPipelinePhase get_pg = {
+    OrderedExclusivePhase get_pg = {
       "RepRequest::ConnectionPipeline::get_pg"
     };
     friend RepRequest;
   };
   class PGPipeline {
-    OrderedPipelinePhase await_map = {
+    OrderedExclusivePhase await_map = {
       "RepRequest::PGPipeline::await_map"
     };
-    OrderedPipelinePhase process = {
+    OrderedExclusivePhase process = {
       "RepRequest::PGPipeline::process"
     };
     friend RepRequest;
@@ -52,7 +52,7 @@ private:
   OSD &osd;
   crimson::net::ConnectionRef conn;
   Ref<MOSDRepOp> req;
-  OrderedPipelinePhase::Handle handle;
+  PipelineHandle handle;
 };
 
 }