]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/common/operation: add OrderedConcurrentPhase
authorSamuel Just <sjust@redhat.com>
Tue, 19 Jan 2021 03:46:30 +0000 (19:46 -0800)
committerSamuel Just <sjust@redhat.com>
Mon, 1 Feb 2021 20:48:39 +0000 (12:48 -0800)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/common/operation.cc
src/crimson/common/operation.h

index 0a4c1a0cd0f3fcfeb6bdfd2bc3a445c76a7970e6..7f28ce31f0733aef329c71678c0874aef08fbf1a 100644 (file)
@@ -66,4 +66,8 @@ void OrderedExclusivePhase::dump_detail(ceph::Formatter* f) const
 {
 }
 
+void OrderedConcurrentPhase::dump_detail(ceph::Formatter* f) const
+{
+}
+
 }
index d81ff0075e633d27b10d56eefa125b950fa23968..07806768a938fe3228286ecfa1755f1aeabe4357 100644 (file)
@@ -414,4 +414,66 @@ private:
   seastar::shared_mutex mutex;
 };
 
+/**
+ * Permits multiple ops to inhabit the stage concurrently, but ensures that
+ * they will proceed to the next stage in the order in which they called
+ * enter.
+ */
+class OrderedConcurrentPhase : public PipelineStageI {
+  void dump_detail(ceph::Formatter *f) const final;
+  const char *get_type_name() const final {
+    return name;
+  }
+
+  class ExitBarrier : public PipelineExitBarrierI {
+    OrderedConcurrentPhase *phase;
+    std::optional<seastar::future<>> barrier;
+  public:
+    ExitBarrier(
+      OrderedConcurrentPhase *phase,
+      seastar::future<> &&barrier) : phase(phase), barrier(std::move(barrier)) {}
+
+    seastar::future<> wait() final {
+      assert(phase);
+      assert(barrier);
+      auto ret = std::move(*barrier);
+      barrier = std::nullopt;
+      return ret;
+    }
+
+    void exit() final {
+      if (barrier) {
+       static_cast<void>(
+         std::move(*barrier).then([phase=this->phase] { phase->mutex.unlock(); }));
+       barrier = std::nullopt;
+       phase = nullptr;
+      }
+      if (phase) {
+       phase->mutex.unlock();
+       phase = nullptr;
+      }
+    }
+
+    void cancel() final {
+      exit();
+    }
+
+    ~ExitBarrier() final {
+      cancel();
+    }
+  };
+
+public:
+  seastar::future<PipelineExitBarrierI::Ref> enter() final {
+    return seastar::make_ready_future<PipelineExitBarrierI::Ref>(
+      new ExitBarrier{this, mutex.lock()});
+  }
+
+  OrderedConcurrentPhase(const char *name) : name(name) {}
+
+private:
+  const char * name;
+  seastar::shared_mutex mutex;
+};
+
 }