]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd/osd_operation: add support for AggregateBlocker
authorSamuel Just <sjust@redhat.com>
Thu, 16 Jan 2020 19:41:25 +0000 (11:41 -0800)
committerXuehan Xu <xxhdx1985126@163.com>
Sun, 26 Apr 2020 07:46:35 +0000 (15:46 +0800)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/osd/osd_operation.cc
src/crimson/osd/osd_operation.h

index f906db29934ff5b340dc718013f91c66060a9bed..b5f3c3cbbc846e9945bf8ec6e5fa36e19205e18e 100644 (file)
@@ -51,6 +51,16 @@ void Blocker::dump(ceph::Formatter* f) const
   f->close_section();
 }
 
+void AggregateBlocker::dump_detail(ceph::Formatter *f) const
+{
+  f->open_array_section("parent_blockers");
+  for (auto b : parent_blockers) {
+    f->open_object_section("parent_blocker");
+    b->dump(f);
+    f->close_section();
+  }
+  f->close_section();
+}
 
 OperationThrottler::OperationThrottler(ConfigProxy &conf)
   : scheduler(crimson::osd::scheduler::make_scheduler(conf))
index a4382c09586c4e4f71877a2605ee7b95db13980f..6e368384d576c966361fc5353a5fc1ccfd654af8 100644 (file)
@@ -60,21 +60,39 @@ class Blocker;
  * Provides an abstraction for registering and unregistering a blocker
  * for the duration of a future becoming available.
  */
-template <typename... T>
-class blocking_future {
+template <typename Fut>
+class blocking_future_detail {
   friend class Operation;
   friend class Blocker;
   Blocker *blocker;
-  seastar::future<T...> fut;
-  blocking_future(Blocker *b, seastar::future<T...> &&f)
+  Fut fut;
+  blocking_future_detail(Blocker *b, Fut &&f)
     : blocker(b), fut(std::move(f)) {}
 
   template <typename... V, typename... U>
-  friend blocking_future<V...> make_ready_blocking_future(U&&... args);
+  friend blocking_future_detail<seastar::future<V...>> make_ready_blocking_future(U&&... args);
+
+  template <typename U>
+  friend blocking_future_detail<seastar::future<>> join_blocking_futures(U &&u);
+
+  template <typename U>
+  friend class blocking_future_detail;
+
+public:
+  template <typename F>
+  auto then(F &&f) && {
+    using result = decltype(std::declval<Fut>().then(f));
+    return blocking_future_detail<seastar::futurize_t<result>>(
+      blocker,
+      std::move(fut).then(std::forward<F>(f)));
+  }
 };
 
+template <typename... T>
+using blocking_future = blocking_future_detail<seastar::future<T...>>;
+
 template <typename... V, typename... U>
-blocking_future<V...> make_ready_blocking_future(U&&... args) {
+blocking_future_detail<seastar::future<V...>> make_ready_blocking_future(U&&... args) {
   return blocking_future<V...>(
     nullptr,
     seastar::make_ready_future<V...>(std::forward<U>(args)...));
@@ -91,7 +109,7 @@ protected:
 public:
   template <typename... T>
   blocking_future<T...> make_blocking_future(seastar::future<T...> &&f) {
-    return blocking_future(this, std::move(f));
+    return blocking_future<T...>(this, std::move(f));
   }
 
   void dump(ceph::Formatter *f) const;
@@ -111,6 +129,36 @@ public:
   virtual ~BlockerT() = default;
 };
 
+class AggregateBlocker : public BlockerT<AggregateBlocker> {
+  vector<Blocker*> parent_blockers;
+protected:
+  void dump_detail(ceph::Formatter *f) const final;
+public:
+  AggregateBlocker(vector<Blocker*> &&parent_blockers)
+    : parent_blockers(std::move(parent_blockers)) {}
+  static constexpr const char *type_name = "AggregateBlocker";
+};
+
+template <typename T>
+blocking_future<> join_blocking_futures(T &&t) {
+  vector<Blocker*> blockers;
+  blockers.reserve(t.size());
+  for (auto &&bf: t) {
+    blockers.push_back(bf.blocker);
+    bf.blocker = nullptr;
+  }
+  auto agg = std::make_unique<AggregateBlocker>(std::move(blockers));
+  return agg->make_blocking_future(
+    seastar::parallel_for_each(
+      std::forward<T>(t),
+      [](auto &&bf) {
+       return std::move(bf.fut);
+      }).then([agg=std::move(agg)] {
+       return seastar::now();
+      }));
+}
+
+
 /**
  * Common base for all crimson-osd operations.  Mainly provides
  * an interface for registering ops in flight and dumping