]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/: move operation out of osd/ into common/
authorSamuel Just <sjust@redhat.com>
Sat, 9 Jan 2021 02:55:08 +0000 (02:55 +0000)
committerSamuel Just <sjust@redhat.com>
Tue, 19 Jan 2021 20:55:42 +0000 (20:55 +0000)
Seastore will need to use this as well.

Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/CMakeLists.txt
src/crimson/common/operation.cc [new file with mode: 0644]
src/crimson/common/operation.h [new file with mode: 0644]
src/crimson/osd/osd_operation.cc
src/crimson/osd/osd_operation.h
src/crimson/osd/osd_operations/compound_peering_request.cc
src/crimson/osd/pg_recovery.cc
src/crimson/osd/pg_recovery.h
src/crimson/osd/recovery_backend.h
src/crimson/osd/shard_services.h

index 26f7293365ec4bf425336802d82bf4c722a84e84..44e65a238f319907a2d082032f5e4b9be434cf9d 100644 (file)
@@ -18,6 +18,7 @@ set(crimson_common_srcs
   common/formatter.cc
   common/perf_counters_collection.cc
   common/log.cc
+  common/operation.cc
   common/throttle.cc
   common/tri_mutex.cc)
 
diff --git a/src/crimson/common/operation.cc b/src/crimson/common/operation.cc
new file mode 100644 (file)
index 0000000..1657b7a
--- /dev/null
@@ -0,0 +1,91 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "operation.h"
+#include "common/Formatter.h"
+
+namespace crimson {
+
+void Operation::dump(ceph::Formatter* f)
+{
+  f->open_object_section("operation");
+  f->dump_string("type", get_type_name());
+  f->dump_unsigned("id", id);
+  {
+    f->open_object_section("detail");
+    dump_detail(f);
+    f->close_section();
+  }
+  f->open_array_section("blockers");
+  for (auto &blocker : blockers) {
+    blocker->dump(f);
+  }
+  f->close_section();
+  f->close_section();
+}
+
+void Operation::dump_brief(ceph::Formatter* f)
+{
+  f->open_object_section("operation");
+  f->dump_string("type", get_type_name());
+  f->dump_unsigned("id", id);
+  f->close_section();
+}
+
+std::ostream &operator<<(std::ostream &lhs, const Operation &rhs) {
+  lhs << rhs.get_type_name() << "(id=" << rhs.get_id() << ", detail=";
+  rhs.print(lhs);
+  lhs << ")";
+  return lhs;
+}
+
+void Blocker::dump(ceph::Formatter* f) const
+{
+  f->open_object_section("blocker");
+  f->dump_string("op_type", get_type_name());
+  {
+    f->open_object_section("detail");
+    dump_detail(f);
+    f->close_section();
+  }
+  f->close_section();
+}
+
+void AggregateBlocker::dump_detail(ceph::Formatter *f) const
+{
+  f->open_array_section("parent_blockers");
+  for (auto b : parent_blockers) {
+    f->open_object_section("parent_blocker");
+    b->dump(f);
+    f->close_section();
+  }
+  f->close_section();
+}
+
+void OrderedPipelinePhase::Handle::exit()
+{
+  if (phase) {
+    phase->mutex.unlock();
+    phase = nullptr;
+  }
+}
+
+blocking_future<> OrderedPipelinePhase::Handle::enter(
+  OrderedPipelinePhase &new_phase)
+{
+  auto fut = new_phase.mutex.lock();
+  exit();
+  phase = &new_phase;
+  return new_phase.make_blocking_future(std::move(fut));
+}
+
+OrderedPipelinePhase::Handle::~Handle()
+{
+  exit();
+}
+
+void OrderedPipelinePhase::dump_detail(ceph::Formatter* f) const
+{
+}
+
+}
diff --git a/src/crimson/common/operation.h b/src/crimson/common/operation.h
new file mode 100644 (file)
index 0000000..1f555e4
--- /dev/null
@@ -0,0 +1,330 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <algorithm>
+#include <array>
+#include <set>
+#include <vector>
+#include <boost/intrusive/list.hpp>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+#include <seastar/core/shared_mutex.hh>
+#include <seastar/core/future.hh>
+#include <seastar/core/timer.hh>
+#include <seastar/core/lowres_clock.hh>
+#include <seastar/core/future-util.hh>
+
+#include "include/ceph_assert.h"
+
+namespace ceph {
+  class Formatter;
+}
+
+namespace crimson {
+
+using registry_hook_t = boost::intrusive::list_member_hook<
+  boost::intrusive::link_mode<boost::intrusive::auto_unlink>>;
+
+class Operation;
+class Blocker;
+
+/**
+ * Provides an abstraction for registering and unregistering a blocker
+ * for the duration of a future becoming available.
+ */
+template <typename Fut>
+class blocking_future_detail {
+  friend class Operation;
+  friend class Blocker;
+  Blocker *blocker;
+  Fut fut;
+  blocking_future_detail(Blocker *b, Fut &&f)
+    : blocker(b), fut(std::move(f)) {}
+
+  template <typename V, typename U>
+  friend blocking_future_detail<seastar::future<V>> make_ready_blocking_future(U&& args);
+  template <typename V, typename Exception>
+  friend blocking_future_detail<seastar::future<V>>
+  make_exception_blocking_future(Exception&& e);
+
+  template <typename U>
+  friend blocking_future_detail<seastar::future<>> join_blocking_futures(U &&u);
+
+  template <typename U>
+  friend class blocking_future_detail;
+
+public:
+  template <typename F>
+  auto then(F &&f) && {
+    using result = decltype(std::declval<Fut>().then(f));
+    return blocking_future_detail<seastar::futurize_t<result>>(
+      blocker,
+      std::move(fut).then(std::forward<F>(f)));
+  }
+};
+
+template <typename T=void>
+using blocking_future = blocking_future_detail<seastar::future<T>>;
+
+template <typename V, typename U>
+blocking_future_detail<seastar::future<V>> make_ready_blocking_future(U&& args) {
+  return blocking_future<V>(
+    nullptr,
+    seastar::make_ready_future<V>(std::forward<U>(args)));
+}
+
+template <typename V, typename Exception>
+blocking_future_detail<seastar::future<V>>
+make_exception_blocking_future(Exception&& e) {
+  return blocking_future<V>(
+    nullptr,
+    seastar::make_exception_future<V>(e));
+}
+
+/**
+ * Provides an interface for dumping diagnostic information about
+ * why a particular op is not making progress.
+ */
+class Blocker {
+public:
+  template <typename T>
+  blocking_future<T> make_blocking_future(seastar::future<T> &&f) {
+    return blocking_future<T>(this, std::move(f));
+  }
+  void dump(ceph::Formatter *f) const;
+  virtual ~Blocker() = default;
+
+private:
+  virtual void dump_detail(ceph::Formatter *f) const = 0;
+  virtual const char *get_type_name() const = 0;
+};
+
+template <typename T>
+class BlockerT : public Blocker {
+public:
+  virtual ~BlockerT() = default;
+private:
+  const char *get_type_name() const final {
+    return T::type_name;
+  }
+};
+
+class AggregateBlocker : public BlockerT<AggregateBlocker> {
+  std::vector<Blocker*> parent_blockers;
+public:
+  AggregateBlocker(std::vector<Blocker*> &&parent_blockers)
+    : parent_blockers(std::move(parent_blockers)) {}
+  static constexpr const char *type_name = "AggregateBlocker";
+private:
+  void dump_detail(ceph::Formatter *f) const final;
+};
+
+template <typename T>
+blocking_future<> join_blocking_futures(T &&t) {
+  std::vector<Blocker*> blockers;
+  blockers.reserve(t.size());
+  for (auto &&bf: t) {
+    blockers.push_back(bf.blocker);
+    bf.blocker = nullptr;
+  }
+  auto agg = std::make_unique<AggregateBlocker>(std::move(blockers));
+  return agg->make_blocking_future(
+    seastar::parallel_for_each(
+      std::forward<T>(t),
+      [](auto &&bf) {
+       return std::move(bf.fut);
+      }).then([agg=std::move(agg)] {
+       return seastar::make_ready_future<>();
+      }));
+}
+
+
+/**
+ * Common base for all crimson-osd operations.  Mainly provides
+ * an interface for registering ops in flight and dumping
+ * diagnostic information.
+ */
+class Operation : public boost::intrusive_ref_counter<
+  Operation, boost::thread_unsafe_counter> {
+ public:
+  uint64_t get_id() const {
+    return id;
+  }
+
+  virtual unsigned get_type() const = 0;
+  virtual const char *get_type_name() const = 0;
+  virtual void print(std::ostream &) const = 0;
+
+  template <typename T>
+  seastar::future<T> with_blocking_future(blocking_future<T> &&f) {
+    if (f.fut.available()) {
+      return std::move(f.fut);
+    }
+    assert(f.blocker);
+    add_blocker(f.blocker);
+    return std::move(f.fut).then_wrapped([this, blocker=f.blocker](auto &&arg) {
+      clear_blocker(blocker);
+      return std::move(arg);
+    });
+  }
+
+  void dump(ceph::Formatter *f);
+  void dump_brief(ceph::Formatter *f);
+  virtual ~Operation() = default;
+
+ private:
+  virtual void dump_detail(ceph::Formatter *f) const = 0;
+
+  registry_hook_t registry_hook;
+
+  std::vector<Blocker*> blockers;
+  uint64_t id = 0;
+  void set_id(uint64_t in_id) {
+    id = in_id;
+  }
+
+  void add_blocker(Blocker *b) {
+    blockers.push_back(b);
+  }
+
+  void clear_blocker(Blocker *b) {
+    auto iter = std::find(blockers.begin(), blockers.end(), b);
+    if (iter != blockers.end()) {
+      blockers.erase(iter);
+    }
+  }
+
+  friend class OperationRegistryI;
+  template <size_t>
+  friend class OperationRegistryT;
+};
+using OperationRef = boost::intrusive_ptr<Operation>;
+
+std::ostream &operator<<(std::ostream &, const Operation &op);
+
+/**
+ * Maintains a set of lists of all active ops.
+ */
+class OperationRegistryI {
+  friend class Operation;
+  seastar::timer<seastar::lowres_clock> shutdown_timer;
+  seastar::promise<> shutdown;
+
+protected:
+  virtual void do_register(Operation *op) = 0;
+  virtual bool registries_empty() const = 0;
+
+public:
+  template <typename T, typename... Args>
+  typename T::IRef create_operation(Args&&... args) {
+    typename T::IRef op = new T(std::forward<Args>(args)...);
+    do_register(&*op);
+    return op;
+  }
+
+  seastar::future<> stop() {
+    shutdown_timer.set_callback([this] {
+      if (registries_empty()) {
+       shutdown.set_value();
+       shutdown_timer.cancel();
+      }
+    });
+    shutdown_timer.arm_periodic(
+      std::chrono::milliseconds(100/*TODO: use option instead*/));
+    return shutdown.get_future();
+  }
+};
+
+
+template <size_t NUM_REGISTRIES>
+class OperationRegistryT : public OperationRegistryI {
+  using op_list_member_option = boost::intrusive::member_hook<
+    Operation,
+    registry_hook_t,
+    &Operation::registry_hook
+    >;
+  using op_list = boost::intrusive::list<
+    Operation,
+    op_list_member_option,
+    boost::intrusive::constant_time_size<false>>;
+
+  std::array<
+    op_list,
+    NUM_REGISTRIES
+  > registries;
+
+  std::array<
+    uint64_t,
+    NUM_REGISTRIES
+  > op_id_counters = {};
+
+protected:
+  void do_register(Operation *op) final {
+    registries[op->get_type()].push_back(*op);
+    op->set_id(op_id_counters[op->get_type()]++);
+  }
+
+  bool registries_empty() const final {
+    return std::all_of(registries.begin(),
+                      registries.end(),
+                      [](auto& opl) {
+                        return opl.empty();
+                      });
+  }
+};
+
+/**
+ * Ensures that at most one op may consider itself in the phase at a time.
+ * Ops will see enter() unblock in the order in which they tried to enter
+ * the phase.  entering (though not necessarily waiting for the future to
+ * resolve) a new phase prior to exiting the previous one will ensure that
+ * the op ordering is preserved.
+ */
+class OrderedPipelinePhase : public Blocker {
+private:
+  void dump_detail(ceph::Formatter *f) const final;
+  const char *get_type_name() const final {
+    return name;
+  }
+
+public:
+  /**
+   * Used to encapsulate pipeline residency state.
+   */
+  class Handle {
+    OrderedPipelinePhase *phase = nullptr;
+
+  public:
+    Handle() = default;
+
+    Handle(const Handle&) = delete;
+    Handle(Handle&&) = delete;
+    Handle &operator=(const Handle&) = delete;
+    Handle &operator=(Handle&&) = delete;
+
+    /**
+     * Returns a future which unblocks when the handle has entered the passed
+     * OrderedPipelinePhase.  If already in a phase, enter will also release
+     * that phase after placing itself in the queue for the next one to preserve
+     * ordering.
+     */
+    blocking_future<> enter(OrderedPipelinePhase &phase);
+
+    /**
+     * Releases the current phase if there is one.  Called in ~Handle().
+     */
+    void exit();
+
+    ~Handle();
+  };
+
+  OrderedPipelinePhase(const char *name) : name(name) {}
+
+private:
+  const char * name;
+  seastar::shared_mutex mutex;
+};
+
+}
index b5f3c3cbbc846e9945bf8ec6e5fa36e19205e18e..0ab3c2cf74d1442345635cb6ba2281740d76a49c 100644 (file)
@@ -6,62 +6,6 @@
 
 namespace crimson::osd {
 
-void Operation::dump(ceph::Formatter* f)
-{
-  f->open_object_section("operation");
-  f->dump_string("type", get_type_name());
-  f->dump_unsigned("id", id);
-  {
-    f->open_object_section("detail");
-    dump_detail(f);
-    f->close_section();
-  }
-  f->open_array_section("blockers");
-  for (auto &blocker : blockers) {
-    blocker->dump(f);
-  }
-  f->close_section();
-  f->close_section();
-}
-
-void Operation::dump_brief(ceph::Formatter* f)
-{
-  f->open_object_section("operation");
-  f->dump_string("type", get_type_name());
-  f->dump_unsigned("id", id);
-  f->close_section();
-}
-
-std::ostream &operator<<(std::ostream &lhs, const Operation &rhs) {
-  lhs << rhs.get_type_name() << "(id=" << rhs.get_id() << ", detail=";
-  rhs.print(lhs);
-  lhs << ")";
-  return lhs;
-}
-
-void Blocker::dump(ceph::Formatter* f) const
-{
-  f->open_object_section("blocker");
-  f->dump_string("op_type", get_type_name());
-  {
-    f->open_object_section("detail");
-    dump_detail(f);
-    f->close_section();
-  }
-  f->close_section();
-}
-
-void AggregateBlocker::dump_detail(ceph::Formatter *f) const
-{
-  f->open_array_section("parent_blockers");
-  for (auto b : parent_blockers) {
-    f->open_object_section("parent_blocker");
-    b->dump(f);
-    f->close_section();
-  }
-  f->close_section();
-}
-
 OperationThrottler::OperationThrottler(ConfigProxy &conf)
   : scheduler(crimson::osd::scheduler::make_scheduler(conf))
 {
@@ -129,31 +73,4 @@ void OperationThrottler::handle_conf_change(
   update_from_config(conf);
 }
 
-
-void OrderedPipelinePhase::Handle::exit()
-{
-  if (phase) {
-    phase->mutex.unlock();
-    phase = nullptr;
-  }
-}
-
-blocking_future<> OrderedPipelinePhase::Handle::enter(
-  OrderedPipelinePhase &new_phase)
-{
-  auto fut = new_phase.mutex.lock();
-  exit();
-  phase = &new_phase;
-  return new_phase.make_blocking_future(std::move(fut));
-}
-
-OrderedPipelinePhase::Handle::~Handle()
-{
-  exit();
-}
-
-void OrderedPipelinePhase::dump_detail(ceph::Formatter* f) const
-{
-}
-
 }
index 5178749b0ddabb382e2a8c2d2733974f3a8f799e..85f5be2edf031a60362a9a0506c63fc5f5f9487b 100644 (file)
@@ -3,25 +3,9 @@
 
 #pragma once
 
-#include <algorithm>
-#include <array>
-#include <set>
-#include <vector>
-#include <boost/intrusive/list.hpp>
-#include <boost/intrusive_ptr.hpp>
-#include <boost/smart_ptr/intrusive_ref_counter.hpp>
-#include <seastar/core/shared_mutex.hh>
-#include <seastar/core/future.hh>
-#include <seastar/core/timer.hh>
-#include <seastar/core/lowres_clock.hh>
-
-#include "include/ceph_assert.h"
+#include "crimson/common/operation.h"
 #include "crimson/osd/scheduler/scheduler.h"
 
-namespace ceph {
-  class Formatter;
-}
-
 namespace crimson::osd {
 
 enum class OperationTypeCode {
@@ -52,195 +36,14 @@ static_assert(
   (sizeof(OP_NAMES)/sizeof(OP_NAMES[0])) ==
   static_cast<int>(OperationTypeCode::last_op));
 
-class OperationRegistry;
-
-using registry_hook_t = boost::intrusive::list_member_hook<
-  boost::intrusive::link_mode<boost::intrusive::auto_unlink>>;
-
-class Operation;
-class Blocker;
-
-/**
- * Provides an abstraction for registering and unregistering a blocker
- * for the duration of a future becoming available.
- */
-template <typename Fut>
-class blocking_future_detail {
-  friend class Operation;
-  friend class Blocker;
-  Blocker *blocker;
-  Fut fut;
-  blocking_future_detail(Blocker *b, Fut &&f)
-    : blocker(b), fut(std::move(f)) {}
-
-  template <typename V, typename U>
-  friend blocking_future_detail<seastar::future<V>> make_ready_blocking_future(U&& args);
-  template <typename V, typename Exception>
-  friend blocking_future_detail<seastar::future<V>>
-  make_exception_blocking_future(Exception&& e);
-
-  template <typename U>
-  friend blocking_future_detail<seastar::future<>> join_blocking_futures(U &&u);
-
-  template <typename U>
-  friend class blocking_future_detail;
-
-public:
-  template <typename F>
-  auto then(F &&f) && {
-    using result = decltype(std::declval<Fut>().then(f));
-    return blocking_future_detail<seastar::futurize_t<result>>(
-      blocker,
-      std::move(fut).then(std::forward<F>(f)));
-  }
-};
-
-template <typename T=void>
-using blocking_future = blocking_future_detail<seastar::future<T>>;
-
-template <typename V, typename U>
-blocking_future_detail<seastar::future<V>> make_ready_blocking_future(U&& args) {
-  return blocking_future<V>(
-    nullptr,
-    seastar::make_ready_future<V>(std::forward<U>(args)));
-}
-
-template <typename V, typename Exception>
-blocking_future_detail<seastar::future<V>>
-make_exception_blocking_future(Exception&& e) {
-  return blocking_future<V>(
-    nullptr,
-    seastar::make_exception_future<V>(e));
-}
-
-/**
- * Provides an interface for dumping diagnostic information about
- * why a particular op is not making progress.
- */
-class Blocker {
-public:
-  template <typename T>
-  blocking_future<T> make_blocking_future(seastar::future<T> &&f) {
-    return blocking_future<T>(this, std::move(f));
-  }
-  void dump(ceph::Formatter *f) const;
-  virtual ~Blocker() = default;
-
-private:
-  virtual void dump_detail(ceph::Formatter *f) const = 0;
-  virtual const char *get_type_name() const = 0;
-};
-
-template <typename T>
-class BlockerT : public Blocker {
-public:
-  virtual ~BlockerT() = default;
-private:
-  const char *get_type_name() const final {
-    return T::type_name;
-  }
-};
-
-class AggregateBlocker : public BlockerT<AggregateBlocker> {
-  vector<Blocker*> parent_blockers;
-public:
-  AggregateBlocker(vector<Blocker*> &&parent_blockers)
-    : parent_blockers(std::move(parent_blockers)) {}
-  static constexpr const char *type_name = "AggregateBlocker";
-private:
-  void dump_detail(ceph::Formatter *f) const final;
-};
-
-template <typename T>
-blocking_future<> join_blocking_futures(T &&t) {
-  vector<Blocker*> blockers;
-  blockers.reserve(t.size());
-  for (auto &&bf: t) {
-    blockers.push_back(bf.blocker);
-    bf.blocker = nullptr;
-  }
-  auto agg = std::make_unique<AggregateBlocker>(std::move(blockers));
-  return agg->make_blocking_future(
-    seastar::parallel_for_each(
-      std::forward<T>(t),
-      [](auto &&bf) {
-       return std::move(bf.fut);
-      }).then([agg=std::move(agg)] {
-       return seastar::make_ready_future<>();
-      }));
-}
-
-
-/**
- * Common base for all crimson-osd operations.  Mainly provides
- * an interface for registering ops in flight and dumping
- * diagnostic information.
- */
-class Operation : public boost::intrusive_ref_counter<
-  Operation, boost::thread_unsafe_counter> {
- public:
-  uint64_t get_id() const {
-    return id;
-  }
-
-  virtual OperationTypeCode get_type() const = 0;
-  virtual const char *get_type_name() const = 0;
-  virtual void print(std::ostream &) const = 0;
-
-  template <typename T>
-  seastar::future<T> with_blocking_future(blocking_future<T> &&f) {
-    if (f.fut.available()) {
-      return std::move(f.fut);
-    }
-    assert(f.blocker);
-    add_blocker(f.blocker);
-    return std::move(f.fut).then_wrapped([this, blocker=f.blocker](auto &&arg) {
-      clear_blocker(blocker);
-      return std::move(arg);
-    });
-  }
-
-  void dump(ceph::Formatter *f);
-  void dump_brief(ceph::Formatter *f);
-  virtual ~Operation() = default;
-
- private:
-  virtual void dump_detail(ceph::Formatter *f) const = 0;
-
- private:
-  registry_hook_t registry_hook;
-
-  std::vector<Blocker*> blockers;
-  uint64_t id = 0;
-  void set_id(uint64_t in_id) {
-    id = in_id;
-  }
-
-  void add_blocker(Blocker *b) {
-    blockers.push_back(b);
-  }
-
-  void clear_blocker(Blocker *b) {
-    auto iter = std::find(blockers.begin(), blockers.end(), b);
-    if (iter != blockers.end()) {
-      blockers.erase(iter);
-    }
-  }
-
-  friend class OperationRegistry;
-};
-using OperationRef = boost::intrusive_ptr<Operation>;
-
-std::ostream &operator<<(std::ostream &, const Operation &op);
-
 template <typename T>
 class OperationT : public Operation {
 public:
   static constexpr const char *type_name = OP_NAMES[static_cast<int>(T::type)];
   using IRef = boost::intrusive_ptr<T>;
 
-  OperationTypeCode get_type() const final {
-    return T::type;
+  unsigned get_type() const final {
+    return static_cast<unsigned>(T::type);
   }
 
   const char *get_type_name() const final {
@@ -256,54 +59,9 @@ private:
 /**
  * Maintains a set of lists of all active ops.
  */
-class OperationRegistry {
-  friend class Operation;
-  using op_list_member_option = boost::intrusive::member_hook<
-    Operation,
-    registry_hook_t,
-    &Operation::registry_hook
-    >;
-  using op_list = boost::intrusive::list<
-    Operation,
-    op_list_member_option,
-    boost::intrusive::constant_time_size<false>>;
-
-  std::array<
-    op_list,
-    static_cast<int>(OperationTypeCode::last_op)
-  > registries;
-
-  std::array<
-    uint64_t,
-    static_cast<int>(OperationTypeCode::last_op)
-  > op_id_counters = {};
-
-  seastar::timer<seastar::lowres_clock> shutdown_timer;
-  seastar::promise<> shutdown;
-public:
-  template <typename T, typename... Args>
-  typename T::IRef create_operation(Args&&... args) {
-    typename T::IRef op = new T(std::forward<Args>(args)...);
-    registries[static_cast<int>(T::type)].push_back(*op);
-    op->set_id(op_id_counters[static_cast<int>(T::type)]++);
-    return op;
-  }
-
-  seastar::future<> stop() {
-    shutdown_timer.set_callback([this] {
-       if (std::all_of(registries.begin(),
-                       registries.end(),
-                       [](auto& opl) {
-                         return opl.empty();
-                       })) {
-         shutdown.set_value();
-         shutdown_timer.cancel();
-       }
-      });
-    shutdown_timer.arm_periodic(std::chrono::milliseconds(100/*TODO: use option instead*/));
-    return shutdown.get_future();
-  }
-};
+using OSDOperationRegistry = OperationRegistryT<
+  static_cast<size_t>(OperationTypeCode::last_op)
+  >;
 
 /**
  * Throttles set of currently running operations
@@ -372,56 +130,4 @@ private:
   void release_throttle();
 };
 
-/**
- * Ensures that at most one op may consider itself in the phase at a time.
- * Ops will see enter() unblock in the order in which they tried to enter
- * the phase.  entering (though not necessarily waiting for the future to
- * resolve) a new phase prior to exiting the previous one will ensure that
- * the op ordering is preserved.
- */
-class OrderedPipelinePhase : public Blocker {
-private:
-  void dump_detail(ceph::Formatter *f) const final;
-  const char *get_type_name() const final {
-    return name;
-  }
-
-public:
-  /**
-   * Used to encapsulate pipeline residency state.
-   */
-  class Handle {
-    OrderedPipelinePhase *phase = nullptr;
-
-  public:
-    Handle() = default;
-
-    Handle(const Handle&) = delete;
-    Handle(Handle&&) = delete;
-    Handle &operator=(const Handle&) = delete;
-    Handle &operator=(Handle&&) = delete;
-
-    /**
-     * Returns a future which unblocks when the handle has entered the passed
-     * OrderedPipelinePhase.  If already in a phase, enter will also release
-     * that phase after placing itself in the queue for the next one to preserve
-     * ordering.
-     */
-    blocking_future<> enter(OrderedPipelinePhase &phase);
-
-    /**
-     * Releases the current phase if there is one.  Called in ~Handle().
-     */
-    void exit();
-
-    ~Handle();
-  };
-
-  OrderedPipelinePhase(const char *name) : name(name) {}
-
-private:
-  const char * name;
-  seastar::shared_mutex mutex;
-};
-
 }
index e55760096e4e82ff2dbf543736908218f113515f..6290b9c771629c2a4b8759a24853d143add1beba 100644 (file)
@@ -57,13 +57,13 @@ public:
   }
 };
 
-std::vector<OperationRef> handle_pg_create(
+std::vector<crimson::OperationRef> handle_pg_create(
   OSD &osd,
   crimson::net::ConnectionRef conn,
   compound_state_ref state,
   Ref<MOSDPGCreate2> m)
 {
-  std::vector<OperationRef> ret;
+  std::vector<crimson::OperationRef> ret;
   for (auto& [pgid, when] : m->pgs) {
     const auto &[created, created_stamp] = when;
     auto q = m->pg_extra.find(pgid);
@@ -100,11 +100,12 @@ std::vector<OperationRef> handle_pg_create(
   return ret;
 }
 
-struct SubOpBlocker : BlockerT<SubOpBlocker> {
+struct SubOpBlocker : crimson::BlockerT<SubOpBlocker> {
   static constexpr const char * type_name = "CompoundOpBlocker";
 
-  std::vector<OperationRef> subops;
-  SubOpBlocker(std::vector<OperationRef> &&subops) : subops(subops) {}
+  std::vector<crimson::OperationRef> subops;
+  SubOpBlocker(std::vector<crimson::OperationRef> &&subops)
+    : subops(subops) {}
 
   virtual void dump_detail(Formatter *f) const {
     f->open_array_section("dependent_operations");
index 7d70b5e8ffbfd15fca876ec35d80422e67fdc7e3..19c8962f6df4fbd12ec00fd3f56871afc3c42c1b 100644 (file)
@@ -36,7 +36,7 @@ void PGRecovery::start_pglogbased_recovery()
     pg->get_osdmap_epoch());
 }
 
-crimson::osd::blocking_future<bool>
+crimson::blocking_future<bool>
 PGRecovery::start_recovery_ops(size_t max_to_start)
 {
   assert(pg->is_primary());
@@ -51,13 +51,13 @@ PGRecovery::start_recovery_ops(size_t max_to_start)
   assert(!pg->is_backfilling());
   assert(!pg->get_peering_state().is_deleting());
 
-  std::vector<crimson::osd::blocking_future<>> started;
+  std::vector<crimson::blocking_future<>> started;
   started.reserve(max_to_start);
   max_to_start -= start_primary_recovery_ops(max_to_start, &started);
   if (max_to_start > 0) {
     max_to_start -= start_replica_recovery_ops(max_to_start, &started);
   }
-  return crimson::osd::join_blocking_futures(std::move(started)).then(
+  return crimson::join_blocking_futures(std::move(started)).then(
     [this] {
     bool done = !pg->get_peering_state().needs_recovery();
     if (done) {
@@ -94,7 +94,7 @@ PGRecovery::start_recovery_ops(size_t max_to_start)
 
 size_t PGRecovery::start_primary_recovery_ops(
   size_t max_to_start,
-  std::vector<crimson::osd::blocking_future<>> *out)
+  std::vector<crimson::blocking_future<>> *out)
 {
   if (!pg->is_recovering()) {
     return 0;
@@ -169,7 +169,7 @@ size_t PGRecovery::start_primary_recovery_ops(
 
 size_t PGRecovery::start_replica_recovery_ops(
   size_t max_to_start,
-  std::vector<crimson::osd::blocking_future<>> *out)
+  std::vector<crimson::blocking_future<>> *out)
 {
   if (!pg->is_recovering()) {
     return 0;
@@ -254,7 +254,7 @@ size_t PGRecovery::start_replica_recovery_ops(
   return started;
 }
 
-crimson::osd::blocking_future<> PGRecovery::recover_missing(
+crimson::blocking_future<> PGRecovery::recover_missing(
   const hobject_t &soid, eversion_t need)
 {
   if (pg->get_peering_state().get_missing_loc().is_deleted(soid)) {
@@ -274,7 +274,7 @@ crimson::osd::blocking_future<> PGRecovery::recover_missing(
 size_t PGRecovery::prep_object_replica_deletes(
   const hobject_t& soid,
   eversion_t need,
-  std::vector<crimson::osd::blocking_future<>> *in_progress)
+  std::vector<crimson::blocking_future<>> *in_progress)
 {
   in_progress->push_back(
     pg->get_recovery_backend()->add_recovering(soid).make_blocking_future(
@@ -292,7 +292,7 @@ size_t PGRecovery::prep_object_replica_deletes(
 size_t PGRecovery::prep_object_replica_pushes(
   const hobject_t& soid,
   eversion_t need,
-  std::vector<crimson::osd::blocking_future<>> *in_progress)
+  std::vector<crimson::blocking_future<>> *in_progress)
 {
   in_progress->push_back(
     pg->get_recovery_backend()->add_recovering(soid).make_blocking_future(
index 86f259de5423499ea84259c20351b6bafe82e987..19303e58d923f21668144044d5b75158403c7852 100644 (file)
@@ -22,7 +22,7 @@ public:
   virtual ~PGRecovery() {}
   void start_pglogbased_recovery();
 
-  crimson::osd::blocking_future<bool> start_recovery_ops(size_t max_to_start);
+  crimson::blocking_future<bool> start_recovery_ops(size_t max_to_start);
   void on_backfill_reserved();
   void dispatch_backfill_event(
     boost::intrusive_ptr<const boost::statechart::event_base> evt);
@@ -32,24 +32,24 @@ private:
   PGRecoveryListener* pg;
   size_t start_primary_recovery_ops(
     size_t max_to_start,
-    std::vector<crimson::osd::blocking_future<>> *out);
+    std::vector<crimson::blocking_future<>> *out);
   size_t start_replica_recovery_ops(
     size_t max_to_start,
-    std::vector<crimson::osd::blocking_future<>> *out);
+    std::vector<crimson::blocking_future<>> *out);
 
   std::vector<pg_shard_t> get_replica_recovery_order() const {
     return pg->get_replica_recovery_order();
   }
-  crimson::osd::blocking_future<> recover_missing(
+  crimson::blocking_future<> recover_missing(
     const hobject_t &soid, eversion_t need);
   size_t prep_object_replica_deletes(
     const hobject_t& soid,
     eversion_t need,
-    std::vector<crimson::osd::blocking_future<>> *in_progress);
+    std::vector<crimson::blocking_future<>> *in_progress);
   size_t prep_object_replica_pushes(
     const hobject_t& soid,
     eversion_t need,
-    std::vector<crimson::osd::blocking_future<>> *in_progress);
+    std::vector<crimson::blocking_future<>> *in_progress);
 
   void on_local_recover(
     const hobject_t& soid,
index cb0ae9f20565f15828c6284fb4209d20d6a7f644..2216fdeed68407ae0b693c649f5d67fa89bb4537 100644 (file)
@@ -127,7 +127,7 @@ protected:
     object_stat_sum_t stat;
   };
 
-  class WaitForObjectRecovery : public crimson::osd::BlockerT<WaitForObjectRecovery> {
+  class WaitForObjectRecovery : public crimson::BlockerT<WaitForObjectRecovery> {
     seastar::shared_promise<> readable, recovered, pulled;
     std::map<pg_shard_t, seastar::shared_promise<>> pushes;
   public:
@@ -146,7 +146,7 @@ protected:
     seastar::future<> wait_for_recovered() {
       return recovered.get_shared_future();
     }
-    crimson::osd::blocking_future<>
+    crimson::blocking_future<>
     wait_for_recovered_blocking() {
       return make_blocking_future(
          recovered.get_shared_future());
index 2957639c6347139c69aecdba27215e2db4848c1b..01e0e91e471e5dee166e7692128a3ba13761659a 100644 (file)
@@ -88,7 +88,7 @@ public:
   }
 
   // Op Management
-  OperationRegistry registry;
+  OSDOperationRegistry registry;
   OperationThrottler throttler;
 
   template <typename T, typename... Args>