]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd: add osd_operation
authorSamuel Just <sjust@redhat.com>
Tue, 11 Jun 2019 00:29:55 +0000 (17:29 -0700)
committerSamuel Just <sjust@redhat.com>
Thu, 20 Jun 2019 20:51:22 +0000 (13:51 -0700)
Adds basic machinery for registering in progress operations along
with blocking conditions.

Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/osd/CMakeLists.txt
src/crimson/osd/osd_operation.cc [new file with mode: 0644]
src/crimson/osd/osd_operation.h [new file with mode: 0644]

index ae9a463d142650aab3cae300a6372d50b131dcce..77ac3858b69e98090d978188282ce6dfee09832d 100644 (file)
@@ -10,6 +10,7 @@ add_executable(crimson-osd
   pg_meta.cc
   replicated_backend.cc
   shard_services.cc
+  osd_operation.cc
   ${PROJECT_SOURCE_DIR}/src/osd/PeeringState.cc
   ${PROJECT_SOURCE_DIR}/src/osd/PGPeeringEvent.cc
   ${PROJECT_SOURCE_DIR}/src/osd/PGStateUtils.cc
diff --git a/src/crimson/osd/osd_operation.cc b/src/crimson/osd/osd_operation.cc
new file mode 100644 (file)
index 0000000..69ec5b7
--- /dev/null
@@ -0,0 +1,79 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "osd_operation.h"
+
+namespace ceph::osd {
+
+void Operation::dump(Formatter *f)
+{
+  f->open_object_section("operation");
+  f->dump_string("type", get_type_name());
+  f->dump_unsigned("id", id);
+  {
+    f->open_object_section("detail");
+    dump_detail(f);
+    f->close_section();
+  }
+  f->open_array_section("blockers");
+  for (auto &blocker : blockers) {
+    blocker->dump(f);
+  }
+  f->close_section();
+  f->close_section();
+}
+
+void Operation::dump_brief(Formatter *f)
+{
+  f->open_object_section("operation");
+  f->dump_string("type", get_type_name());
+  f->dump_unsigned("id", id);
+  f->close_section();
+}
+
+std::ostream &operator<<(std::ostream &lhs, const Operation &rhs) {
+  lhs << rhs.get_type_name() << "(id=" << rhs.get_id() << ", detail=";
+  rhs.print(lhs);
+  lhs << ")";
+  return lhs;
+}
+
+void Blocker::dump(Formatter *f) const
+{
+  f->open_object_section("blocker");
+  f->dump_string("op_type", get_type_name());
+  {
+    f->open_object_section("detail");
+    dump_detail(f);
+    f->close_section();
+  }
+  f->close_section();
+}
+
+void OrderedPipelinePhase::Handle::exit()
+{
+  if (phase) {
+    phase->mutex.unlock();
+    phase = nullptr;
+  }
+}
+
+blocking_future<> OrderedPipelinePhase::Handle::enter(
+  OrderedPipelinePhase &new_phase)
+{
+  auto fut = new_phase.mutex.lock();
+  exit();
+  phase = &new_phase;
+  return new_phase.make_blocking_future(std::move(fut));
+}
+
+OrderedPipelinePhase::Handle::~Handle()
+{
+  exit();
+}
+
+void OrderedPipelinePhase::dump_detail(Formatter *f) const
+{
+}
+
+}
diff --git a/src/crimson/osd/osd_operation.h b/src/crimson/osd/osd_operation.h
new file mode 100644 (file)
index 0000000..73ce754
--- /dev/null
@@ -0,0 +1,269 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <seastar/core/shared_mutex.hh>
+#include <seastar/core/future.hh>
+
+#include <vector>
+#include <array>
+#include <algorithm>
+#include <set>
+#include <boost/intrusive/list.hpp>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+
+#include "common/Formatter.h"
+
+namespace ceph::osd {
+
+enum class OperationTypeCode {
+  client_request = 0,
+  peering_event,
+  compound_peering_request,
+  last_op
+};
+
+static constexpr const char* const OP_NAMES[] = {
+  "client_write",
+  "peering_event",
+  "compound_peering_request"
+};
+
+// prevent the addition of OperationTypeCode-s with no matching OP_NAMES entry:
+static_assert(
+  (sizeof(OP_NAMES)/sizeof(OP_NAMES[0])) ==
+  static_cast<int>(OperationTypeCode::last_op));
+
+class OperationRegistry;
+
+using registry_hook_t = boost::intrusive::list_member_hook<
+  boost::intrusive::link_mode<boost::intrusive::auto_unlink>>;
+
+class Operation;
+class Blocker;
+
+/**
+ * Provides an abstraction for registering and unregistering a blocker
+ * for the duration of a future becoming available.
+ */
+template <typename... T>
+class blocking_future {
+  friend class Operation;
+  friend class Blocker;
+  Blocker *blocker;
+  seastar::future<T...> fut;
+  blocking_future(Blocker *b, seastar::future<T...> &&f)
+    : blocker(b), fut(std::move(f)) {}
+
+  template <typename... V, typename... U>
+  friend blocking_future<V...> make_ready_blocking_future(U&&... args);
+};
+
+template <typename... V, typename... U>
+blocking_future<V...> make_ready_blocking_future(U&&... args) {
+  return blocking_future<V...>(
+    nullptr,
+    seastar::make_ready_future<V...>(std::forward<U>(args)...));
+}
+
+/**
+ * Provides an interface for dumping diagnostic information about
+ * why a particular op is not making progress.
+ */
+class Blocker {
+protected:
+  virtual void dump_detail(Formatter *f) const = 0;
+
+public:
+  template <typename... T>
+  blocking_future<T...> make_blocking_future(seastar::future<T...> &&f) {
+    return blocking_future(this, std::move(f));
+  }
+
+  void dump(Formatter *f) const;
+
+  virtual const char *get_type_name() const = 0;
+
+  virtual ~Blocker() = default;
+};
+
+template <typename T>
+class BlockerT : public Blocker {
+public:
+  const char *get_type_name() const final {
+    return T::type_name;
+  }
+
+  virtual ~BlockerT() = default;
+};
+
+/**
+ * Common base for all crimson-osd operations.  Mainly provides
+ * an interface for registering ops in flight and dumping
+ * diagnostic information.
+ */
+class Operation : public boost::intrusive_ref_counter<
+  Operation, boost::thread_unsafe_counter> {
+  friend class OperationRegistry;
+  registry_hook_t registry_hook;
+
+  std::vector<Blocker*> blockers;
+  uint64_t id = 0;
+  void set_id(uint64_t in_id) {
+    id = in_id;
+  }
+protected:
+  virtual void dump_detail(Formatter *f) const = 0;
+
+public:
+  uint64_t get_id() const {
+    return id;
+  }
+
+  virtual OperationTypeCode get_type() const = 0;
+  virtual const char *get_type_name() const = 0;
+  virtual void print(std::ostream &) const = 0;
+
+  void add_blocker(Blocker *b) {
+    blockers.push_back(b);
+  }
+
+  void clear_blocker(Blocker *b) {
+    auto iter = std::find(blockers.begin(), blockers.end(), b);
+    if (iter != blockers.end()) {
+      blockers.erase(iter);
+    }
+  }
+
+  template <typename... T>
+  seastar::future<T...> with_blocking_future(blocking_future<T...> &&f) {
+    if (f.fut.available() || f.fut.failed()) {
+      return std::move(f.fut);
+    }
+    ceph_assert(f.blocker);
+    add_blocker(f.blocker);
+    return std::move(f.fut).then_wrapped([this, blocker=f.blocker](auto &&arg) {
+      clear_blocker(blocker);
+      return std::move(arg);
+    });
+  }
+
+  void dump(Formatter *f);
+  void dump_brief(Formatter *f);
+  virtual ~Operation() = default;
+};
+using OperationRef = boost::intrusive_ptr<Operation>;
+
+std::ostream &operator<<(std::ostream &, const Operation &op);
+
+template <typename T>
+class OperationT : public Operation {
+
+protected:
+  virtual void dump_detail(Formatter *f) const = 0;
+
+public:
+  static constexpr const char *type_name = OP_NAMES[static_cast<int>(T::type)];
+  using IRef = boost::intrusive_ptr<T>;
+
+  OperationTypeCode get_type() const final {
+    return T::type;
+  }
+
+  const char *get_type_name() const final {
+    return T::type_name;
+  }
+
+  virtual ~OperationT() = default;
+};
+
+/**
+ * Maintains a set of lists of all active ops.
+ */
+class OperationRegistry {
+  friend class Operation;
+  using op_list_member_option = boost::intrusive::member_hook<
+    Operation,
+    registry_hook_t,
+    &Operation::registry_hook
+    >;
+  using op_list = boost::intrusive::list<
+    Operation,
+    op_list_member_option,
+    boost::intrusive::constant_time_size<false>>;
+
+  std::array<
+    op_list,
+    static_cast<int>(OperationTypeCode::last_op)
+  > registries;
+
+  std::array<
+    uint64_t,
+    static_cast<int>(OperationTypeCode::last_op)
+  > op_id_counters = {};
+
+public:
+  template <typename T, typename... Args>
+  typename T::IRef create_operation(Args&&... args) {
+    typename T::IRef op = new T(std::forward<Args>(args)...);
+    registries[static_cast<int>(T::type)].push_back(*op);
+    op->set_id(op_id_counters[static_cast<int>(T::type)]++);
+    return op;
+  }
+};
+
+/**
+ * Ensures that at most one op may consider itself in the phase at a time.
+ * Ops will see enter() unblock in the order in which they tried to enter
+ * the phase.  entering (though not necessarily waiting for the future to
+ * resolve) a new phase prior to exiting the previous one will ensure that
+ * the op ordering is preserved.
+ */
+class OrderedPipelinePhase : public Blocker {
+  const char * name;
+
+protected:
+  virtual void dump_detail(Formatter *f) const final;
+  const char *get_type_name() const final {
+    return name;
+  }
+
+public:
+  seastar::shared_mutex mutex;
+
+  /**
+   * Used to encapsulate pipeline residency state.
+   */
+  class Handle {
+    OrderedPipelinePhase *phase = nullptr;
+
+  public:
+    Handle() = default;
+
+    Handle(const Handle&) = delete;
+    Handle(Handle&&) = delete;
+    Handle &operator=(const Handle&) = delete;
+    Handle &operator=(Handle&&) = delete;
+
+    /**
+     * Returns a future which unblocks when the handle has entered the passed
+     * OrderedPipelinePhase.  If already in a phase, enter will also release
+     * that phase after placing itself in the queue for the next one to preserve
+     * ordering.
+     */
+    blocking_future<> enter(OrderedPipelinePhase &phase);
+
+    /**
+     * Releases the current phase if there is one.  Called in ~Handle().
+     */
+    void exit();
+
+    ~Handle();
+  };
+
+  OrderedPipelinePhase(const char *name) : name(name) {}
+};
+
+}