struct AggregateBlockingEvent {
struct TriggerI {
template <class FutureT>
- decltype(auto) maybe_record_blocking(FutureT&& fut,
- const typename T::Blocker& blocker) {
+ auto maybe_record_blocking(FutureT&& fut,
+ const typename T::Blocker& blocker) {
// AggregateBlockingEvent is supposed to be used on relatively cold
// paths (recovery), so we don't need to worry about the dynamic
// polymothps / dynamic memory's overhead.
seastar::future<>
enter(T &stage, typename T::BlockingEvent::template Trigger<OpT>&& t) {
return wait_barrier().then([this, &stage, t=std::move(t)] () mutable {
- auto fut = stage.enter();
- t.maybe_record_blocking(fut, stage);
+ auto fut = t.maybe_record_blocking(stage.enter(t), stage);
exit();
return std::move(fut).then(
[this, t=std::move(t)](auto &&barrier_ref) mutable {
}
public:
- seastar::future<PipelineExitBarrierI::Ref> enter() {
+ template <class... IgnoreArgs>
+ seastar::future<PipelineExitBarrierI::Ref> enter(IgnoreArgs&&...) {
return mutex.lock().then([this] {
return PipelineExitBarrierI::Ref(new ExitBarrier{this});
});
*/
template <class T>
class OrderedConcurrentPhaseT : public PipelineStageIT<T> {
+ using base_t = PipelineStageIT<T>;
+public:
+ struct BlockingEvent : base_t::BlockingEvent {
+ using base_t::BlockingEvent::BlockingEvent;
+
+ struct ExitBarrierEvent : TimeEvent<ExitBarrierEvent> {};
+
+ template <class OpT>
+ struct Trigger : base_t::BlockingEvent::template Trigger<OpT> {
+ using base_t::BlockingEvent::template Trigger<OpT>::Trigger;
+
+ template <class FutureT>
+ decltype(auto) maybe_record_exit_barrier(FutureT&& fut) {
+ if (!fut.available()) {
+ exit_barrier_event.trigger(this->op);
+ }
+ return std::forward<FutureT>(fut);
+ }
+
+ ExitBarrierEvent exit_barrier_event;
+ };
+ };
+
+private:
void dump_detail(ceph::Formatter *f) const final {}
+ template <class TriggerT>
class ExitBarrier final : public PipelineExitBarrierI {
OrderedConcurrentPhaseT *phase;
std::optional<seastar::future<>> barrier;
+ TriggerT trigger;
public:
ExitBarrier(
OrderedConcurrentPhaseT *phase,
- seastar::future<> &&barrier) : phase(phase), barrier(std::move(barrier)) {}
+ seastar::future<> &&barrier,
+ TriggerT& trigger) : phase(phase), barrier(std::move(barrier)), trigger(trigger) {}
seastar::future<> wait() final {
assert(phase);
assert(barrier);
auto ret = std::move(*barrier);
barrier = std::nullopt;
- return ret;
+ return trigger.maybe_record_exit_barrier(std::move(ret));
}
void exit() final {
};
public:
- seastar::future<PipelineExitBarrierI::Ref> enter() {
+ template <class TriggerT>
+ seastar::future<PipelineExitBarrierI::Ref> enter(TriggerT& t) {
return seastar::make_ready_future<PipelineExitBarrierI::Ref>(
- new ExitBarrier{this, mutex.lock()});
+ new ExitBarrier<TriggerT>{this, mutex.lock(), t});
}
private:
};
public:
- seastar::future<PipelineExitBarrierI::Ref> enter() {
+ template <class... IgnoreArgs>
+ seastar::future<PipelineExitBarrierI::Ref> enter(IgnoreArgs&&...) {
return seastar::make_ready_future<PipelineExitBarrierI::Ref>(
new ExitBarrier);
}
ClientRequest::PGPipeline::GetOBC::BlockingEvent::Backend,
ClientRequest::PGPipeline::Process::BlockingEvent::Backend,
ClientRequest::PGPipeline::WaitRepop::BlockingEvent::Backend,
+ ClientRequest::PGPipeline::WaitRepop::BlockingEvent::ExitBarrierEvent::Backend,
ClientRequest::PGPipeline::SendReply::BlockingEvent::Backend,
ClientRequest::CompletionEvent::Backend
{
const ClientRequest::PGPipeline::WaitRepop& blocker) override {
}
+ void handle(ClientRequest::PGPipeline::WaitRepop::BlockingEvent::ExitBarrierEvent& ev,
+ const Operation& op) override {
+ }
+
void handle(ClientRequest::PGPipeline::SendReply::BlockingEvent& ev,
const Operation& op,
const ClientRequest::PGPipeline::SendReply& blocker) override {