blocking_future_detail(Blocker *b, Fut &&f)
: blocker(b), fut(std::move(f)) {}
- template <typename V, typename U>
- friend blocking_future_detail<seastar::future<V>> make_ready_blocking_future(U&& args);
+ template <typename V, typename... U>
+ friend blocking_future_detail<seastar::future<V>>
+ make_ready_blocking_future(U&&... args);
+
template <typename V, typename Exception>
friend blocking_future_detail<seastar::future<V>>
make_exception_blocking_future(Exception&& e);
template <typename T=void>
using blocking_future = blocking_future_detail<seastar::future<T>>;
-template <typename V, typename U>
-blocking_future_detail<seastar::future<V>> make_ready_blocking_future(U&& args) {
+template <typename V=void, typename... U>
+blocking_future_detail<seastar::future<V>> make_ready_blocking_future(U&&... args) {
return blocking_future<V>(
nullptr,
- seastar::make_ready_future<V>(std::forward<U>(args)));
+ seastar::make_ready_future<V>(std::forward<U>(args)...));
}
template <typename V, typename Exception>
}
};
+class PipelineExitBarrierI {
+public:
+ using Ref = std::unique_ptr<PipelineExitBarrierI>;
+
+ /// Waits for exit barrier
+ virtual seastar::future<> wait() = 0;
+
+ /// Releases pipeline stage, can only be called after wait
+ virtual void exit() = 0;
+
+ /// Releases pipeline resources without waiting on barrier
+ virtual void cancel() = 0;
+
+ /// Must ensure that resources are released, likely by calling cancel()
+ virtual ~PipelineExitBarrierI() {}
+};
+
+class PipelineStageI : public Blocker {
+public:
+ virtual seastar::future<PipelineExitBarrierI::Ref> enter() = 0;
+};
+
+class PipelineHandle {
+ PipelineExitBarrierI::Ref barrier;
+
+ auto wait_barrier() {
+ return barrier ? barrier->wait() : seastar::now();
+ }
+
+public:
+ PipelineHandle() = default;
+
+ PipelineHandle(const PipelineHandle&) = delete;
+ PipelineHandle(PipelineHandle&&) = default;
+ PipelineHandle &operator=(const PipelineHandle&) = delete;
+ PipelineHandle &operator=(PipelineHandle&&) = default;
+
+ /**
+ * Returns a future which unblocks when the handle has entered the passed
+ * OrderedPipelinePhase. If already in a phase, enter will also release
+ * that phase after placing itself in the queue for the next one to preserve
+ * ordering.
+ */
+ template <typename T>
+ blocking_future<> enter(T &t) {
+ /* Strictly speaking, we probably want the blocker to be registered on
+ * the previous stage until wait_barrier() resolves and on the next
+ * until enter() resolves, but blocking_future will need some refactoring
+ * to permit that. TODO
+ */
+ return t.make_blocking_future(
+ wait_barrier().then([this, &t] {
+ auto fut = t.enter();
+ exit();
+ return std::move(fut).then([this](auto &&barrier_ref) {
+ barrier = std::move(barrier_ref);
+ return seastar::now();
+ });
+ })
+ );
+ }
+
+ /**
+ * Completes pending exit barrier without entering a new one.
+ */
+ seastar::future<> complete() {
+ auto ret = wait_barrier();
+ barrier.reset();
+ return ret;
+ }
+
+ /**
+ * Exits current phase, skips exit barrier, should only be used for op
+ * failure. Permitting the handle to be destructed as the same effect.
+ */
+ void exit() {
+ barrier.reset();
+ }
+
+};
+
/**
* Ensures that at most one op may consider itself in the phase at a time.
* Ops will see enter() unblock in the order in which they tried to enter
* resolve) a new phase prior to exiting the previous one will ensure that
* the op ordering is preserved.
*/
-class OrderedPipelinePhase : public Blocker {
-private:
+class OrderedExclusivePhase : public PipelineStageI {
void dump_detail(ceph::Formatter *f) const final;
const char *get_type_name() const final {
return name;
}
-public:
- /**
- * Used to encapsulate pipeline residency state.
- */
- class Handle {
- OrderedPipelinePhase *phase = nullptr;
-
+ class ExitBarrier : public PipelineExitBarrierI {
+ OrderedExclusivePhase *phase;
public:
- Handle() = default;
+ ExitBarrier(OrderedExclusivePhase *phase) : phase(phase) {}
- Handle(const Handle&) = delete;
- Handle(Handle&&) = default;
- Handle &operator=(const Handle&) = delete;
- Handle &operator=(Handle&&) = default;
+ seastar::future<> wait() final {
+ return seastar::now();
+ }
- /**
- * Returns a future which unblocks when the handle has entered the passed
- * OrderedPipelinePhase. If already in a phase, enter will also release
- * that phase after placing itself in the queue for the next one to preserve
- * ordering.
- */
- blocking_future<> enter(OrderedPipelinePhase &phase);
+ void exit() final {
+ if (phase) {
+ phase->exit();
+ phase = nullptr;
+ }
+ }
- /**
- * Releases the current phase if there is one. Called in ~Handle().
- */
- void exit();
+ void cancel() final {
+ exit();
+ }
- ~Handle();
+ ~ExitBarrier() final {
+ cancel();
+ }
};
- OrderedPipelinePhase(const char *name) : name(name) {}
+ void exit() {
+ mutex.unlock();
+ }
+
+public:
+ seastar::future<PipelineExitBarrierI::Ref> enter() final {
+ return mutex.lock().then([this] {
+ return PipelineExitBarrierI::Ref(new ExitBarrier{this});
+ });
+ }
+
+ OrderedExclusivePhase(const char *name) : name(name) {}
private:
const char * name;
crimson::net::ConnectionRef conn;
Ref<MOSDOp> m;
OpInfo op_info;
- OrderedPipelinePhase::Handle handle;
+ PipelineHandle handle;
public:
class ConnectionPipeline {
- OrderedPipelinePhase await_map = {
+ OrderedExclusivePhase await_map = {
"ClientRequest::ConnectionPipeline::await_map"
};
- OrderedPipelinePhase get_pg = {
+ OrderedExclusivePhase get_pg = {
"ClientRequest::ConnectionPipeline::get_pg"
};
friend class ClientRequest;
};
class PGPipeline {
- OrderedPipelinePhase await_map = {
+ OrderedExclusivePhase await_map = {
"ClientRequest::PGPipeline::await_map"
};
- OrderedPipelinePhase wait_for_active = {
+ OrderedExclusivePhase wait_for_active = {
"ClientRequest::PGPipeline::wait_for_active"
};
- OrderedPipelinePhase recover_missing = {
+ OrderedExclusivePhase recover_missing = {
"ClientRequest::PGPipeline::recover_missing"
};
- OrderedPipelinePhase get_obc = {
+ OrderedExclusivePhase get_obc = {
"ClientRequest::PGPipeline::get_obc"
};
- OrderedPipelinePhase process = {
+ OrderedExclusivePhase process = {
"ClientRequest::PGPipeline::process"
};
friend class ClientRequest;