class Operation;
class Blocker;
-/**
- * Provides an abstraction for registering and unregistering a blocker
- * for the duration of a future becoming available.
- */
-template <typename Fut>
-class blocking_future_detail {
-// just as a scaffolding for the transition from blocking_future
-public:
- friend class Blocker;
- Blocker *blocker;
- Fut fut;
- blocking_future_detail(Blocker *b, Fut &&f)
- : blocker(b), fut(std::move(f)) {}
-
- template <typename V, typename... U>
- friend blocking_future_detail<seastar::future<V>>
- make_ready_blocking_future(U&&... args);
-
- template <typename V, typename Exception>
- friend blocking_future_detail<seastar::future<V>>
- make_exception_blocking_future(Exception&& e);
-
- template <typename U>
- friend blocking_future_detail<seastar::future<>> join_blocking_futures(U &&u);
-
- template <typename InterruptCond, typename T>
- friend blocking_future_detail<
- ::crimson::interruptible::interruptible_future<InterruptCond>>
- join_blocking_interruptible_futures(T&& t);
-
- template <typename U>
- friend class blocking_future_detail;
-
-public:
- template <typename F>
- auto then(F &&f) && {
- using result = decltype(std::declval<Fut>().then(f));
- return blocking_future_detail<seastar::futurize_t<result>>(
- blocker,
- std::move(fut).then(std::forward<F>(f)));
- }
- template <typename InterruptCond, typename F>
- auto then_interruptible(F &&f) && {
- using result = decltype(std::declval<Fut>().then_interruptible(f));
- return blocking_future_detail<
- typename ::crimson::interruptible::interruptor<
- InterruptCond>::template futurize<result>::type>(
- blocker,
- std::move(fut).then_interruptible(std::forward<F>(f)));
- }
-};
-
-template <typename T=void>
-using blocking_future = blocking_future_detail<seastar::future<T>>;
-
-template <typename InterruptCond, typename T = void>
-using blocking_interruptible_future = blocking_future_detail<
- ::crimson::interruptible::interruptible_future<InterruptCond, T>>;
-
-template <typename InterruptCond, typename V, typename U>
-blocking_interruptible_future<InterruptCond, V>
-make_ready_blocking_interruptible_future(U&& args) {
- return blocking_interruptible_future<InterruptCond, V>(
- nullptr,
- seastar::make_ready_future<V>(std::forward<U>(args)));
-}
-
-template <typename InterruptCond, typename V, typename Exception>
-blocking_interruptible_future<InterruptCond, V>
-make_exception_blocking_interruptible_future(Exception&& e) {
- return blocking_interruptible_future<InterruptCond, V>(
- nullptr,
- seastar::make_exception_future<InterruptCond, V>(e));
-}
-
-template <typename V=void, typename... U>
-blocking_future_detail<seastar::future<V>> make_ready_blocking_future(U&&... args) {
- return blocking_future<V>(
- nullptr,
- seastar::make_ready_future<V>(std::forward<U>(args)...));
-}
-
-template <typename V, typename Exception>
-blocking_future_detail<seastar::future<V>>
-make_exception_blocking_future(Exception&& e) {
- return blocking_future<V>(
- nullptr,
- seastar::make_exception_future<V>(e));
-}
namespace detail {
void dump_time_event(const char* name,
*/
class Blocker {
public:
- template <typename T>
- blocking_future<T> make_blocking_future(seastar::future<T> &&f) {
- return blocking_future<T>(this, std::move(f));
- }
-
- template <typename InterruptCond, typename T>
- blocking_interruptible_future<InterruptCond, T>
- make_blocking_future(
- crimson::interruptible::interruptible_future<InterruptCond, T> &&f) {
- return blocking_interruptible_future<InterruptCond, T>(
- this, std::move(f));
- }
- template <typename InterruptCond, typename T = void>
- blocking_interruptible_future<InterruptCond, T>
- make_blocking_interruptible_future(seastar::future<T> &&f) {
- return blocking_interruptible_future<InterruptCond, T>(
- this,
- ::crimson::interruptible::interruptor<InterruptCond>::make_interruptible(
- std::move(f)));
- }
-
void dump(ceph::Formatter *f) const;
virtual ~Blocker() = default;
friend class Trigger;
};
-class AggregateBlocker : public BlockerT<AggregateBlocker> {
- std::vector<Blocker*> parent_blockers;
-public:
- AggregateBlocker(std::vector<Blocker*> &&parent_blockers)
- : parent_blockers(std::move(parent_blockers)) {}
- static constexpr const char *type_name = "AggregateBlocker";
-private:
- void dump_detail(ceph::Formatter *f) const final;
-};
-
-template <typename T>
-blocking_future<> join_blocking_futures(T &&t) {
- std::vector<Blocker*> blockers;
- blockers.reserve(t.size());
- for (auto &&bf: t) {
- blockers.push_back(bf.blocker);
- bf.blocker = nullptr;
- }
- auto agg = std::make_unique<AggregateBlocker>(std::move(blockers));
- return agg->make_blocking_future(
- seastar::parallel_for_each(
- std::forward<T>(t),
- [](auto &&bf) {
- return std::move(bf.fut);
- }).then([agg=std::move(agg)] {
- return seastar::make_ready_future<>();
- }));
-}
-
-template <typename InterruptCond, typename T>
-blocking_interruptible_future<InterruptCond>
-join_blocking_interruptible_futures(T&& t) {
- std::vector<Blocker*> blockers;
- blockers.reserve(t.size());
- for (auto &&bf: t) {
- blockers.push_back(bf.blocker);
- bf.blocker = nullptr;
- }
- auto agg = std::make_unique<AggregateBlocker>(std::move(blockers));
- return agg->make_blocking_future(
- ::crimson::interruptible::interruptor<InterruptCond>::parallel_for_each(
- std::forward<T>(t),
- [](auto &&bf) {
- return std::move(bf.fut);
- }).then_interruptible([agg=std::move(agg)] {
- return seastar::make_ready_future<>();
- }));
-}
-
-
/**
* Common base for all crimson-osd operations. Mainly provides
* an interface for registering ops in flight and dumping
* that phase after placing itself in the queue for the next one to preserve
* ordering.
*/
- template <typename T>
- blocking_future<> enter(T &t) {
- /* Strictly speaking, we probably want the blocker to be registered on
- * the previous stage until wait_barrier() resolves and on the next
- * until enter() resolves, but blocking_future will need some refactoring
- * to permit that. TODO
- */
- return t.make_blocking_future(
- wait_barrier().then([this, &t] {
- auto fut = t.enter();
- exit();
- return std::move(fut).then([this](auto &&barrier_ref) {
- barrier = std::move(barrier_ref);
- return seastar::now();
- });
- })
- );
- }
-
template <typename OpT, typename T>
seastar::future<>
enter(T &stage, typename T::BlockingEvent::template Trigger<OpT>&& t) {