/// Waits for exit barrier
virtual std::optional<seastar::future<>> wait() = 0;
- /// Releases pipeline resources, after or without waiting
+ /// Releases pipeline resources.
+ /// If wait() has been called,
+ /// must release after the wait future is resolved.
virtual ~PipelineExitBarrierI() {}
};
template <typename OpT, typename T>
std::optional<seastar::future<>>
- do_enter_maybe_sync(T &stage, typename T::BlockingEvent::template Trigger<OpT>&& t) {
+ do_enter_maybe_sync(
+ T &stage,
+ typename T::BlockingEvent::template Trigger<OpT>&& t,
+ PipelineExitBarrierI::Ref&& moved_barrier) {
+ assert(!barrier);
if constexpr (!T::is_enter_sync) {
auto fut = t.maybe_record_blocking(stage.enter(t), stage);
- return std::move(fut).then(
- [this, t=std::move(t)](auto &&barrier_ref) {
- exit();
+ return std::move(fut
+ ).then([this, t=std::move(t),
+ moved_barrier=std::move(moved_barrier)](auto &&barrier_ref) {
+ // destruct moved_barrier and unlock after entered
+ assert(!barrier);
barrier = std::move(barrier_ref);
return seastar::now();
});
} else {
auto barrier_ref = stage.enter(t);
- exit();
+ // destruct moved_barrier and unlock after entered
barrier = std::move(barrier_ref);
return std::nullopt;
}
enter_maybe_sync(T &stage, typename T::BlockingEvent::template Trigger<OpT>&& t) {
assert(stage.core == seastar::this_shard_id());
auto wait_fut = wait_barrier();
+ auto moved_barrier = std::move(barrier);
+ barrier.reset();
if (wait_fut.has_value()) {
return wait_fut.value(
- ).then([this, &stage, t=std::move(t)]() mutable {
- auto ret = do_enter_maybe_sync<OpT, T>(stage, std::move(t));
+ ).then([this, &stage, t=std::move(t),
+ moved_barrier=std::move(moved_barrier)]() mutable {
+ auto ret = do_enter_maybe_sync<OpT, T>(
+ stage, std::move(t), std::move(moved_barrier));
if constexpr (!T::is_enter_sync) {
return std::move(ret.value());
} else {
}
});
} else {
- return do_enter_maybe_sync<OpT, T>(stage, std::move(t));
+ return do_enter_maybe_sync<OpT, T>(
+ stage, std::move(t), std::move(moved_barrier));
}
}
*/
seastar::future<> complete() {
auto ret = wait_barrier();
+ auto moved_barrier = std::move(barrier);
barrier.reset();
- return ret ? std::move(ret.value()) : seastar::now();
+ if (ret) {
+ return std::move(ret.value()
+ ).then([moved_barrier=std::move(moved_barrier)] {
+ // destruct moved_barrier and unlock after wait()
+ });
+ } else {
+ return seastar::now();
+ }
}
/**
phase->mutex.unlock();
});
} else {
- // wait() has been called
+ // wait() has been called, must unlock
+ // after the wait() future is resolved.
phase->mutex.unlock();
}
}