From ad14ecedb60f535c639df10aa685f945abab87f6 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Mon, 18 Jan 2021 19:46:30 -0800 Subject: [PATCH] crimson/common/operation: add OrderedConcurrentPhase Signed-off-by: Samuel Just --- src/crimson/common/operation.cc | 4 +++ src/crimson/common/operation.h | 62 +++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+) diff --git a/src/crimson/common/operation.cc b/src/crimson/common/operation.cc index 0a4c1a0cd0f3f..7f28ce31f0733 100644 --- a/src/crimson/common/operation.cc +++ b/src/crimson/common/operation.cc @@ -66,4 +66,8 @@ void OrderedExclusivePhase::dump_detail(ceph::Formatter* f) const { } +void OrderedConcurrentPhase::dump_detail(ceph::Formatter* f) const +{ +} + } diff --git a/src/crimson/common/operation.h b/src/crimson/common/operation.h index d81ff0075e633..07806768a938f 100644 --- a/src/crimson/common/operation.h +++ b/src/crimson/common/operation.h @@ -414,4 +414,66 @@ private: seastar::shared_mutex mutex; }; +/** + * Permits multiple ops to inhabit the stage concurrently, but ensures that + * they will proceed to the next stage in the order in which they called + * enter. + */ +class OrderedConcurrentPhase : public PipelineStageI { + void dump_detail(ceph::Formatter *f) const final; + const char *get_type_name() const final { + return name; + } + + class ExitBarrier : public PipelineExitBarrierI { + OrderedConcurrentPhase *phase; + std::optional> barrier; + public: + ExitBarrier( + OrderedConcurrentPhase *phase, + seastar::future<> &&barrier) : phase(phase), barrier(std::move(barrier)) {} + + seastar::future<> wait() final { + assert(phase); + assert(barrier); + auto ret = std::move(*barrier); + barrier = std::nullopt; + return ret; + } + + void exit() final { + if (barrier) { + static_cast( + std::move(*barrier).then([phase=this->phase] { phase->mutex.unlock(); })); + barrier = std::nullopt; + phase = nullptr; + } + if (phase) { + phase->mutex.unlock(); + phase = nullptr; + } + } + + void cancel() final { + exit(); + } + + ~ExitBarrier() final { + cancel(); + } + }; + +public: + seastar::future enter() final { + return seastar::make_ready_future( + new ExitBarrier{this, mutex.lock()}); + } + + OrderedConcurrentPhase(const char *name) : name(name) {} + +private: + const char * name; + seastar::shared_mutex mutex; +}; + } -- 2.39.5