From: Samuel Just Date: Mon, 18 Jan 2021 22:06:18 +0000 (-0800) Subject: src/crimson/common: decouple handle from stage X-Git-Tag: v17.1.0~3095^2~4 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=86348b48b30b0b0fd8f5b5d8d70ea5c9dd74d00a;p=ceph.git src/crimson/common: decouple handle from stage We're going to introduce stages with other properties. Signed-off-by: Samuel Just --- diff --git a/src/crimson/common/operation.cc b/src/crimson/common/operation.cc index 1657b7a5ebaf9..0a4c1a0cd0f3f 100644 --- a/src/crimson/common/operation.cc +++ b/src/crimson/common/operation.cc @@ -62,29 +62,7 @@ void AggregateBlocker::dump_detail(ceph::Formatter *f) const f->close_section(); } -void OrderedPipelinePhase::Handle::exit() -{ - if (phase) { - phase->mutex.unlock(); - phase = nullptr; - } -} - -blocking_future<> OrderedPipelinePhase::Handle::enter( - OrderedPipelinePhase &new_phase) -{ - auto fut = new_phase.mutex.lock(); - exit(); - phase = &new_phase; - return new_phase.make_blocking_future(std::move(fut)); -} - -OrderedPipelinePhase::Handle::~Handle() -{ - exit(); -} - -void OrderedPipelinePhase::dump_detail(ceph::Formatter* f) const +void OrderedExclusivePhase::dump_detail(ceph::Formatter* f) const { } diff --git a/src/crimson/common/operation.h b/src/crimson/common/operation.h index 8eaefbdbfeabf..d81ff0075e633 100644 --- a/src/crimson/common/operation.h +++ b/src/crimson/common/operation.h @@ -43,8 +43,10 @@ class blocking_future_detail { blocking_future_detail(Blocker *b, Fut &&f) : blocker(b), fut(std::move(f)) {} - template - friend blocking_future_detail> make_ready_blocking_future(U&& args); + template + friend blocking_future_detail> + make_ready_blocking_future(U&&... args); + template friend blocking_future_detail> make_exception_blocking_future(Exception&& e); @@ -68,11 +70,11 @@ public: template using blocking_future = blocking_future_detail>; -template -blocking_future_detail> make_ready_blocking_future(U&& args) { +template +blocking_future_detail> make_ready_blocking_future(U&&... args) { return blocking_future( nullptr, - seastar::make_ready_future(std::forward(args))); + seastar::make_ready_future(std::forward(args)...)); } template @@ -275,6 +277,87 @@ protected: } }; +class PipelineExitBarrierI { +public: + using Ref = std::unique_ptr; + + /// Waits for exit barrier + virtual seastar::future<> wait() = 0; + + /// Releases pipeline stage, can only be called after wait + virtual void exit() = 0; + + /// Releases pipeline resources without waiting on barrier + virtual void cancel() = 0; + + /// Must ensure that resources are released, likely by calling cancel() + virtual ~PipelineExitBarrierI() {} +}; + +class PipelineStageI : public Blocker { +public: + virtual seastar::future enter() = 0; +}; + +class PipelineHandle { + PipelineExitBarrierI::Ref barrier; + + auto wait_barrier() { + return barrier ? barrier->wait() : seastar::now(); + } + +public: + PipelineHandle() = default; + + PipelineHandle(const PipelineHandle&) = delete; + PipelineHandle(PipelineHandle&&) = default; + PipelineHandle &operator=(const PipelineHandle&) = delete; + PipelineHandle &operator=(PipelineHandle&&) = default; + + /** + * Returns a future which unblocks when the handle has entered the passed + * OrderedPipelinePhase. If already in a phase, enter will also release + * that phase after placing itself in the queue for the next one to preserve + * ordering. + */ + template + blocking_future<> enter(T &t) { + /* Strictly speaking, we probably want the blocker to be registered on + * the previous stage until wait_barrier() resolves and on the next + * until enter() resolves, but blocking_future will need some refactoring + * to permit that. TODO + */ + return t.make_blocking_future( + wait_barrier().then([this, &t] { + auto fut = t.enter(); + exit(); + return std::move(fut).then([this](auto &&barrier_ref) { + barrier = std::move(barrier_ref); + return seastar::now(); + }); + }) + ); + } + + /** + * Completes pending exit barrier without entering a new one. + */ + seastar::future<> complete() { + auto ret = wait_barrier(); + barrier.reset(); + return ret; + } + + /** + * Exits current phase, skips exit barrier, should only be used for op + * failure. Permitting the handle to be destructed as the same effect. + */ + void exit() { + barrier.reset(); + } + +}; + /** * Ensures that at most one op may consider itself in the phase at a time. * Ops will see enter() unblock in the order in which they tried to enter @@ -282,45 +365,49 @@ protected: * resolve) a new phase prior to exiting the previous one will ensure that * the op ordering is preserved. */ -class OrderedPipelinePhase : public Blocker { -private: +class OrderedExclusivePhase : public PipelineStageI { void dump_detail(ceph::Formatter *f) const final; const char *get_type_name() const final { return name; } -public: - /** - * Used to encapsulate pipeline residency state. - */ - class Handle { - OrderedPipelinePhase *phase = nullptr; - + class ExitBarrier : public PipelineExitBarrierI { + OrderedExclusivePhase *phase; public: - Handle() = default; + ExitBarrier(OrderedExclusivePhase *phase) : phase(phase) {} - Handle(const Handle&) = delete; - Handle(Handle&&) = default; - Handle &operator=(const Handle&) = delete; - Handle &operator=(Handle&&) = default; + seastar::future<> wait() final { + return seastar::now(); + } - /** - * Returns a future which unblocks when the handle has entered the passed - * OrderedPipelinePhase. If already in a phase, enter will also release - * that phase after placing itself in the queue for the next one to preserve - * ordering. - */ - blocking_future<> enter(OrderedPipelinePhase &phase); + void exit() final { + if (phase) { + phase->exit(); + phase = nullptr; + } + } - /** - * Releases the current phase if there is one. Called in ~Handle(). - */ - void exit(); + void cancel() final { + exit(); + } - ~Handle(); + ~ExitBarrier() final { + cancel(); + } }; - OrderedPipelinePhase(const char *name) : name(name) {} + void exit() { + mutex.unlock(); + } + +public: + seastar::future enter() final { + return mutex.lock().then([this] { + return PipelineExitBarrierI::Ref(new ExitBarrier{this}); + }); + } + + OrderedExclusivePhase(const char *name) : name(name) {} private: const char * name; diff --git a/src/crimson/osd/osd_operations/background_recovery.h b/src/crimson/osd/osd_operations/background_recovery.h index 37e46c588a84d..b9adb87d9c6d9 100644 --- a/src/crimson/osd/osd_operations/background_recovery.h +++ b/src/crimson/osd/osd_operations/background_recovery.h @@ -86,7 +86,7 @@ private: class BackfillRecovery final : public BackgroundRecovery { public: class BackfillRecoveryPipeline { - OrderedPipelinePhase process = { + OrderedExclusivePhase process = { "BackfillRecovery::PGPipeline::process" }; friend class BackfillRecovery; @@ -104,7 +104,7 @@ public: private: boost::intrusive_ptr evt; - OrderedPipelinePhase::Handle handle; + PipelineHandle handle; seastar::future do_recovery() override; }; diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h index ea3124a93e532..099a8067ee662 100644 --- a/src/crimson/osd/osd_operations/client_request.h +++ b/src/crimson/osd/osd_operations/client_request.h @@ -18,32 +18,32 @@ class ClientRequest final : public OperationT { crimson::net::ConnectionRef conn; Ref m; OpInfo op_info; - OrderedPipelinePhase::Handle handle; + PipelineHandle handle; public: class ConnectionPipeline { - OrderedPipelinePhase await_map = { + OrderedExclusivePhase await_map = { "ClientRequest::ConnectionPipeline::await_map" }; - OrderedPipelinePhase get_pg = { + OrderedExclusivePhase get_pg = { "ClientRequest::ConnectionPipeline::get_pg" }; friend class ClientRequest; }; class PGPipeline { - OrderedPipelinePhase await_map = { + OrderedExclusivePhase await_map = { "ClientRequest::PGPipeline::await_map" }; - OrderedPipelinePhase wait_for_active = { + OrderedExclusivePhase wait_for_active = { "ClientRequest::PGPipeline::wait_for_active" }; - OrderedPipelinePhase recover_missing = { + OrderedExclusivePhase recover_missing = { "ClientRequest::PGPipeline::recover_missing" }; - OrderedPipelinePhase get_obc = { + OrderedExclusivePhase get_obc = { "ClientRequest::PGPipeline::get_obc" }; - OrderedPipelinePhase process = { + OrderedExclusivePhase process = { "ClientRequest::PGPipeline::process" }; friend class ClientRequest; diff --git a/src/crimson/osd/osd_operations/peering_event.h b/src/crimson/osd/osd_operations/peering_event.h index 3a6c0678c46ac..45c57f19d834c 100644 --- a/src/crimson/osd/osd_operations/peering_event.h +++ b/src/crimson/osd/osd_operations/peering_event.h @@ -26,10 +26,10 @@ public: static constexpr OperationTypeCode type = OperationTypeCode::peering_event; class PGPipeline { - OrderedPipelinePhase await_map = { + OrderedExclusivePhase await_map = { "PeeringEvent::PGPipeline::await_map" }; - OrderedPipelinePhase process = { + OrderedExclusivePhase process = { "PeeringEvent::PGPipeline::process" }; friend class PeeringEvent; @@ -37,7 +37,7 @@ public: }; protected: - OrderedPipelinePhase::Handle handle; + PipelineHandle handle; PGPipeline &pp(PG &pg); ShardServices &shard_services; @@ -102,10 +102,10 @@ protected: public: class ConnectionPipeline { - OrderedPipelinePhase await_map = { + OrderedExclusivePhase await_map = { "PeeringRequest::ConnectionPipeline::await_map" }; - OrderedPipelinePhase get_pg = { + OrderedExclusivePhase get_pg = { "PeeringRequest::ConnectionPipeline::get_pg" }; friend class RemotePeeringEvent; diff --git a/src/crimson/osd/osd_operations/pg_advance_map.h b/src/crimson/osd/osd_operations/pg_advance_map.h index 1b27037eb413f..f19c8631ede90 100644 --- a/src/crimson/osd/osd_operations/pg_advance_map.h +++ b/src/crimson/osd/osd_operations/pg_advance_map.h @@ -25,7 +25,7 @@ public: static constexpr OperationTypeCode type = OperationTypeCode::pg_advance_map; protected: - OrderedPipelinePhase::Handle handle; + PipelineHandle handle; OSD &osd; Ref pg; diff --git a/src/crimson/osd/osd_operations/replicated_request.h b/src/crimson/osd/osd_operations/replicated_request.h index 8e9cfc9fe4189..17e9e53794476 100644 --- a/src/crimson/osd/osd_operations/replicated_request.h +++ b/src/crimson/osd/osd_operations/replicated_request.h @@ -21,19 +21,19 @@ class PG; class RepRequest final : public OperationT { public: class ConnectionPipeline { - OrderedPipelinePhase await_map = { + OrderedExclusivePhase await_map = { "RepRequest::ConnectionPipeline::await_map" }; - OrderedPipelinePhase get_pg = { + OrderedExclusivePhase get_pg = { "RepRequest::ConnectionPipeline::get_pg" }; friend RepRequest; }; class PGPipeline { - OrderedPipelinePhase await_map = { + OrderedExclusivePhase await_map = { "RepRequest::PGPipeline::await_map" }; - OrderedPipelinePhase process = { + OrderedExclusivePhase process = { "RepRequest::PGPipeline::process" }; friend RepRequest; @@ -52,7 +52,7 @@ private: OSD &osd; crimson::net::ConnectionRef conn; Ref req; - OrderedPipelinePhase::Handle handle; + PipelineHandle handle; }; }