seastar::shared_mutex mutex;
};
+/**
+ * Permits multiple ops to inhabit the stage concurrently, but ensures that
+ * they will proceed to the next stage in the order in which they called
+ * enter.
+ */
+class OrderedConcurrentPhase : public PipelineStageI {
+ void dump_detail(ceph::Formatter *f) const final;
+ const char *get_type_name() const final {
+ return name;
+ }
+
+ class ExitBarrier : public PipelineExitBarrierI {
+ OrderedConcurrentPhase *phase;
+ std::optional<seastar::future<>> barrier;
+ public:
+ ExitBarrier(
+ OrderedConcurrentPhase *phase,
+ seastar::future<> &&barrier) : phase(phase), barrier(std::move(barrier)) {}
+
+ seastar::future<> wait() final {
+ assert(phase);
+ assert(barrier);
+ auto ret = std::move(*barrier);
+ barrier = std::nullopt;
+ return ret;
+ }
+
+ void exit() final {
+ if (barrier) {
+ static_cast<void>(
+ std::move(*barrier).then([phase=this->phase] { phase->mutex.unlock(); }));
+ barrier = std::nullopt;
+ phase = nullptr;
+ }
+ if (phase) {
+ phase->mutex.unlock();
+ phase = nullptr;
+ }
+ }
+
+ void cancel() final {
+ exit();
+ }
+
+ ~ExitBarrier() final {
+ cancel();
+ }
+ };
+
+public:
+ seastar::future<PipelineExitBarrierI::Ref> enter() final {
+ return seastar::make_ready_future<PipelineExitBarrierI::Ref>(
+ new ExitBarrier{this, mutex.lock()});
+ }
+
+ OrderedConcurrentPhase(const char *name) : name(name) {}
+
+private:
+ const char * name;
+ seastar::shared_mutex mutex;
+};
+
}