From 12a19749e306fb0b9aad8861253369d5ccb17c93 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Thu, 16 Jan 2020 11:41:25 -0800 Subject: [PATCH] crimson/osd/osd_operation: add support for AggregateBlocker Signed-off-by: Samuel Just --- src/crimson/osd/osd_operation.cc | 10 ++++++ src/crimson/osd/osd_operation.h | 62 ++++++++++++++++++++++++++++---- 2 files changed, 65 insertions(+), 7 deletions(-) diff --git a/src/crimson/osd/osd_operation.cc b/src/crimson/osd/osd_operation.cc index f906db29934..b5f3c3cbbc8 100644 --- a/src/crimson/osd/osd_operation.cc +++ b/src/crimson/osd/osd_operation.cc @@ -51,6 +51,16 @@ void Blocker::dump(ceph::Formatter* f) const f->close_section(); } +void AggregateBlocker::dump_detail(ceph::Formatter *f) const +{ + f->open_array_section("parent_blockers"); + for (auto b : parent_blockers) { + f->open_object_section("parent_blocker"); + b->dump(f); + f->close_section(); + } + f->close_section(); +} OperationThrottler::OperationThrottler(ConfigProxy &conf) : scheduler(crimson::osd::scheduler::make_scheduler(conf)) diff --git a/src/crimson/osd/osd_operation.h b/src/crimson/osd/osd_operation.h index a4382c09586..6e368384d57 100644 --- a/src/crimson/osd/osd_operation.h +++ b/src/crimson/osd/osd_operation.h @@ -60,21 +60,39 @@ class Blocker; * Provides an abstraction for registering and unregistering a blocker * for the duration of a future becoming available. */ -template -class blocking_future { +template +class blocking_future_detail { friend class Operation; friend class Blocker; Blocker *blocker; - seastar::future fut; - blocking_future(Blocker *b, seastar::future &&f) + Fut fut; + blocking_future_detail(Blocker *b, Fut &&f) : blocker(b), fut(std::move(f)) {} template - friend blocking_future make_ready_blocking_future(U&&... args); + friend blocking_future_detail> make_ready_blocking_future(U&&... args); + + template + friend blocking_future_detail> join_blocking_futures(U &&u); + + template + friend class blocking_future_detail; + +public: + template + auto then(F &&f) && { + using result = decltype(std::declval().then(f)); + return blocking_future_detail>( + blocker, + std::move(fut).then(std::forward(f))); + } }; +template +using blocking_future = blocking_future_detail>; + template -blocking_future make_ready_blocking_future(U&&... args) { +blocking_future_detail> make_ready_blocking_future(U&&... args) { return blocking_future( nullptr, seastar::make_ready_future(std::forward(args)...)); @@ -91,7 +109,7 @@ protected: public: template blocking_future make_blocking_future(seastar::future &&f) { - return blocking_future(this, std::move(f)); + return blocking_future(this, std::move(f)); } void dump(ceph::Formatter *f) const; @@ -111,6 +129,36 @@ public: virtual ~BlockerT() = default; }; +class AggregateBlocker : public BlockerT { + vector parent_blockers; +protected: + void dump_detail(ceph::Formatter *f) const final; +public: + AggregateBlocker(vector &&parent_blockers) + : parent_blockers(std::move(parent_blockers)) {} + static constexpr const char *type_name = "AggregateBlocker"; +}; + +template +blocking_future<> join_blocking_futures(T &&t) { + vector blockers; + blockers.reserve(t.size()); + for (auto &&bf: t) { + blockers.push_back(bf.blocker); + bf.blocker = nullptr; + } + auto agg = std::make_unique(std::move(blockers)); + return agg->make_blocking_future( + seastar::parallel_for_each( + std::forward(t), + [](auto &&bf) { + return std::move(bf.fut); + }).then([agg=std::move(agg)] { + return seastar::now(); + })); +} + + /** * Common base for all crimson-osd operations. Mainly provides * an interface for registering ops in flight and dumping -- 2.39.5