]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: rework OpSequencer for out-of-order op execution. 43011/head
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Wed, 1 Sep 2021 12:57:54 +0000 (12:57 +0000)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Thu, 16 Sep 2021 15:58:03 +0000 (15:58 +0000)
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/crimson/common/operation.h
src/crimson/osd/osd_operation_sequencer.h
src/crimson/osd/osd_operations/client_request.cc
src/crimson/osd/osd_operations/client_request.h

index 3106da5c05c9e8ba10fc3b1cc9cfe2e1b2c994d8..3559f6591555dac6e274f3442bea0ab5e82014eb 100644 (file)
@@ -314,6 +314,12 @@ std::ostream &operator<<(std::ostream &, const Operation &op);
  * Maintains a set of lists of all active ops.
  */
 class OperationRegistryI {
+  using op_list_member_option = boost::intrusive::member_hook<
+    Operation,
+    registry_hook_t,
+    &Operation::registry_hook
+    >;
+
   friend class Operation;
   seastar::timer<seastar::lowres_clock> shutdown_timer;
   seastar::promise<> shutdown;
@@ -323,6 +329,11 @@ protected:
   virtual bool registries_empty() const = 0;
 
 public:
+  using op_list = boost::intrusive::list<
+    Operation,
+    op_list_member_option,
+    boost::intrusive::constant_time_size<false>>;
+
   template <typename T, typename... Args>
   typename T::IRef create_operation(Args&&... args) {
     typename T::IRef op = new T(std::forward<Args>(args)...);
@@ -346,16 +357,6 @@ public:
 
 template <size_t NUM_REGISTRIES>
 class OperationRegistryT : public OperationRegistryI {
-  using op_list_member_option = boost::intrusive::member_hook<
-    Operation,
-    registry_hook_t,
-    &Operation::registry_hook
-    >;
-  using op_list = boost::intrusive::list<
-    Operation,
-    op_list_member_option,
-    boost::intrusive::constant_time_size<false>>;
-
   std::array<
     op_list,
     NUM_REGISTRIES
@@ -379,6 +380,13 @@ protected:
                         return opl.empty();
                       });
   }
+public:
+  template <size_t REGISTRY_INDEX>
+  const op_list& get_registry() const {
+    static_assert(
+      REGISTRY_INDEX < std::tuple_size<decltype(registries)>::value);
+    return registries[REGISTRY_INDEX];
+  }
 };
 
 class PipelineExitBarrierI {
index 4a80a92307f5dd34ec315ae0d2fb6938fa1493bf..dd89f2d4223fc9428fba1f56502432f444a00113 100644 (file)
@@ -25,6 +25,42 @@ namespace crimson::osd {
 // their id which is monotonically increasing and unique on per PG basis, so we
 // can keep an op waiting in the case explained above.
 class OpSequencer {
+  bool resequencing{false};
+
+  static bool is_unfinished(const ClientRequest& this_op) {
+    // TODO: kill the tombstone; reuse op status tracking.
+    return !this_op.finished;
+  }
+
+  std::uint64_t get_prev_id(const ClientRequest& this_op,
+                            const OSDOperationRegistry& registry) {
+    // an alternative to iterating till the registy's beginning could be
+    // holding a pointer to next(last_completed).
+    constexpr auto type_idx = static_cast<size_t>(ClientRequest::type);
+    for (auto it = OSDOperationRegistry::op_list::s_iterator_to(this_op);
+         it != std::begin(registry.get_registry<type_idx>());
+         --it) {
+      // we're iterating over the operation registry of all ClientRequests.
+      // this list aggrates every single instance in the system, and thus
+      // we need to skip operations coming from different client's session
+      // or targeting different PGs.
+      // as this is supposed to happen on cold paths only, the overhead is
+      // a thing we can live with.
+      auto* maybe_prev_op = std::addressof(static_cast<const ClientRequest&>(*it));
+      if (maybe_prev_op->same_session_and_pg(this_op)) {
+        if (is_unfinished(*maybe_prev_op)) {
+          return maybe_prev_op->get_id();
+        } else {
+          // an early exited one
+        }
+      }
+    }
+    // the prev op of this session targeting the same PG as this_op must has
+    // been completed and hence already has been removed from the list, that's
+    // the only way we got here
+    return last_completed_id;
+  }
+
 public:
   template <typename HandleT,
             typename FuncT,
@@ -32,61 +68,96 @@ public:
   seastar::futurize_t<Result>
   start_op(const ClientRequest& op,
            HandleT& handle,
+           const OSDOperationRegistry& registry,
            FuncT&& do_op) {
-    const uint64_t prev_op = op.get_prev_id();
-    const uint64_t this_op = op.get_id();
+    ::crimson::get_logger(ceph_subsys_osd).debug(
+      "OpSequencer::{}: op={}, last_started={}, last_unblocked={}, last_completed={}",
+      __func__, op.get_id(), last_started_id, last_unblocked_id, last_completed_id);
     auto have_green_light = seastar::make_ready_future<>();
-    assert(prev_op < this_op);
-    if (last_issued == prev_op) {
-      // starting a new op, let's advance the last_issued!
-      last_issued = this_op;
+    if (last_started_id < op.get_id()) {
+      // starting a new op, let's advance the last_started!
+      last_started_id = op.get_id();
     }
-    if (prev_op != last_unblocked) {
-      // this implies that there are some blocked ops before me, so i have to
-      // wait until they are unblocked.
+    if (__builtin_expect(resequencing, false)) {
+      // this implies that there was a reset condition and there may me some
+      // older ops before me, so i have to wait until they are unblocked.
       //
       // i should leave the current pipeline stage when waiting for the blocked
       // ones, so that the following ops can be queued up here. we cannot let
       // the seastar scheduler to determine the order of performing these ops,
       // once they are unblocked after the first op of the same pg interval is
       // scheduled.
-      assert(prev_op > last_unblocked);
+      const auto prev_id = get_prev_id(op, registry);
+      assert(prev_id >= last_unblocked_id);
       handle.exit();
       ::crimson::get_logger(ceph_subsys_osd).debug(
-        "OpSequencer::start_op: {} waiting ({} > {})",
-        op, prev_op, last_unblocked);
-      have_green_light = unblocked.wait([prev_op, this] {
+        "OpSequencer::start_op: {} resequencing ({} >= {})",
+        op, prev_id, last_unblocked_id);
+      have_green_light = unblocked.wait([&op, &registry, this] {
         // wait until the previous op is unblocked
-        return last_unblocked == prev_op;
+        const bool unblocking =
+          get_prev_id(op, registry) == last_unblocked_id;
+        if (unblocking) {
+          // stop resequencing if everything is handled which means there is no
+          // operation after us. the range could be minimized by snapshotting
+          // `last_started` on `maybe_reset()`.
+          // `<=` is to handle the situation when `last_started` has finished out-
+          // of-the-order.
+          resequencing = !(last_started_id <= op.get_id());
+        }
+        return unblocking;
       });
     }
-    return have_green_light.then([this_op, do_op=std::move(do_op), this]() mutable {
+    return have_green_light.then([&op, do_op=std::move(do_op), this]() mutable {
       auto result = seastar::futurize_invoke(std::move(do_op));
       // unblock the next one
-      last_unblocked = this_op;
+      last_unblocked_id = op.get_id();
       unblocked.broadcast();
       return result;
     });
   }
-  uint64_t get_last_issued() const {
-    return last_issued;
+  void finish_op_in_order(ClientRequest& op) {
+    ::crimson::get_logger(ceph_subsys_osd).debug(
+      "OpSequencer::{}: op={}, last_started={}, last_unblocked={}, last_completed={}",
+      __func__, op.get_id(), last_started_id, last_unblocked_id, last_completed_id);
+    assert(op.get_id() > last_completed_id);
+    last_completed_id = op.get_id();
+    op.finished = true;
   }
-  void finish_op(const ClientRequest& op) {
-    assert(op.get_id() > last_completed);
-    last_completed = op.get_id();
+  void finish_op_out_of_order(ClientRequest& op,
+                              const OSDOperationRegistry& registry) {
+    ::crimson::get_logger(ceph_subsys_osd).debug(
+      "OpSequencer::{}: op={}, last_started={}, last_unblocked={}, last_completed={}",
+      __func__, op.get_id(), last_started_id, last_unblocked_id, last_completed_id);
+    op.finished = true;
+    // fix the `last_unblocked_id`. otherwise we wouldn't be able to leave
+    // the wait loop in `start_op()` as any concrete value of `last_unblocked_id`
+    // can wake at most one blocked operation (the successor of `op` if there is any)
+    // and above we lowered this number to 0 (`get_prev_id()` there would never return
+    // a value matching `last_unblocked_id`).
+    if (last_unblocked_id == op.get_id()) {
+      last_unblocked_id = get_prev_id(op, registry);
+    }
   }
   void maybe_reset(const ClientRequest& op) {
+    ::crimson::get_logger(ceph_subsys_osd).debug(
+      "OpSequencer::{}: op={}, last_started={}, last_unblocked={}, last_completed={}",
+      __func__, op.get_id(), last_started_id, last_unblocked_id, last_completed_id);
     const auto op_id = op.get_id();
     // pg interval changes, so we need to reenqueue the previously unblocked
-    // ops by rewinding the "last_unblock" pointer
-    if (op_id <= last_unblocked) {
+    // ops by rewinding the "last_unblock" ID.
+    if (op_id <= last_unblocked_id) {
       ::crimson::get_logger(ceph_subsys_osd).debug(
         "OpSequencer::maybe_reset:{}  {} <= {}, resetting to {}",
-        op, op_id, last_unblocked, last_completed);
-      last_unblocked = last_completed;
+        op, op_id, last_unblocked_id, last_completed_id);
+      last_unblocked_id = last_completed_id;
+      resequencing = true;
     }
   }
   void abort() {
+    ::crimson::get_logger(ceph_subsys_osd).debug(
+      "OpSequencer::{}: last_started={}, last_unblocked={}, last_completed={}",
+      __func__, last_started_id, last_unblocked_id, last_completed_id);
     // all blocked ops should be canceled, likely due to the osd is not primary
     // anymore.
     unblocked.broken();
@@ -95,18 +166,18 @@ private:
   //          /--- unblocked (in pg pipeline)
   //         |      /--- blocked
   //         V      V
-  // |////|.....|.......| <--- last_issued
+  // |////|.....|.......| <--- last_started
   //      ^     ^       ^
   //      |     |       \- prev_op
   //      |      \--- last_unblocked
   //      last_completed
   //
   // the id of last op which is issued
-  uint64_t last_issued = 0;
+  std::uint64_t last_started_id = 0;
   // the id of last op which is unblocked
-  uint64_t last_unblocked = 0;
+  std::uint64_t last_unblocked_id = 0;
   // the id of last op which is completed
-  uint64_t last_completed = 0;
+  std::uint64_t last_completed_id = 0;
   seastar::condition_variable unblocked;
 
   friend fmt::formatter<OpSequencer>;
@@ -137,9 +208,9 @@ struct fmt::formatter<crimson::osd::OpSequencer> {
               FormatContext& ctx)
   {
     return fmt::format_to(ctx.out(),
-                          "(last_completed={},last_unblocked={},last_issued={})",
-                          sequencer.last_completed,
-                          sequencer.last_unblocked,
-                          sequencer.last_issued);
+                          "(last_completed={},last_unblocked={},last_started={})",
+                          sequencer.last_completed_id,
+                          sequencer.last_unblocked_id,
+                          sequencer.last_started_id);
   }
 };
index 68d64ab5ce1c5681f58a14bc0a0591d7bd9aecf5..d5d90f4a2061f822ae18095fa02fc3f544705f38 100644 (file)
@@ -37,7 +37,7 @@ ClientRequest::~ClientRequest()
 
 void ClientRequest::print(std::ostream &lhs) const
 {
-  lhs << "m=[" << *m << "], prev_op_id=" << prev_op_id;
+  lhs << "m=[" << *m << "]";
 }
 
 void ClientRequest::dump_detail(Formatter *f) const
@@ -54,6 +54,12 @@ ClientRequest::PGPipeline &ClientRequest::pp(PG &pg)
   return pg.client_request_pg_pipeline;
 }
 
+bool ClientRequest::same_session_and_pg(const ClientRequest& other_op) const
+{
+  return &get_osd_priv(conn.get()) == &get_osd_priv(other_op.conn.get()) &&
+         m->get_spg() == other_op.m->get_spg();
+}
+
 bool ClientRequest::is_pg_op() const
 {
   return std::any_of(
@@ -61,22 +67,10 @@ bool ClientRequest::is_pg_op() const
     [](auto& op) { return ceph_osd_op_type_pg(op.op.op); });
 }
 
-void ClientRequest::may_set_prev_op()
-{
-  // set prev_op_id if it's not set yet
-  if (__builtin_expect(!prev_op_id.has_value(), true)) {
-    prev_op_id.emplace(sequencer.get_last_issued());
-  }
-}
-
 template <typename FuncT>
 ClientRequest::interruptible_future<> ClientRequest::with_sequencer(FuncT&& func)
 {
-  may_set_prev_op();
-  return sequencer.start_op(*this, handle, std::forward<FuncT>(func))
-  .then_interruptible([this] {
-    sequencer.finish_op(*this);
-  });
+  return sequencer.start_op(*this, handle, osd.get_shard_services().registry, std::forward<FuncT>(func));
 }
 
 seastar::future<> ClientRequest::start()
@@ -101,12 +95,14 @@ seastar::future<> ClientRequest::start()
           if (m->finish_decode()) {
             m->clear_payload();
           }
-          const bool has_pg_op = is_pg_op();
-          auto interruptible_do_op = interruptor::wrap_function([=] {
+          return with_sequencer(interruptor::wrap_function([pgref, this] {
             PG &pg = *pgref;
             if (pg.can_discard_op(*m)) {
-              return interruptible_future<>(
-                osd.send_incremental_map(conn, m->get_map_epoch()));
+              return osd.send_incremental_map(
+                conn, m->get_map_epoch()).then([this] {
+                  sequencer.finish_op_out_of_order(*this, osd.get_shard_services().registry);
+                  return interruptor::now();
+              });
             }
             return with_blocking_future_interruptible<IOInterruptCondition>(
               handle.enter(pp(pg).await_map)
@@ -119,21 +115,22 @@ seastar::future<> ClientRequest::start()
             }).then_interruptible([this, &pg]() {
               return with_blocking_future_interruptible<IOInterruptCondition>(
                 pg.wait_for_active_blocker.wait());
-            }).then_interruptible([this,
-                                   has_pg_op,
-                                   pgref=std::move(pgref)]() mutable {
-              return (has_pg_op ?
-                      process_pg_op(pgref) :
-                      process_op(pgref));
+            }).then_interruptible([this, pgref=std::move(pgref)]() mutable {
+              if (is_pg_op()) {
+                return process_pg_op(pgref).then_interruptible([this] {
+                  sequencer.finish_op_out_of_order(*this, osd.get_shard_services().registry);
+                });
+              } else {
+                return process_op(pgref).then_interruptible([this] {
+                  // NOTE: this assumes process_op() handles everything
+                  // in-order which I'm not sure about.
+                  sequencer.finish_op_in_order(*this);
+                });
+              }
             });
+          })).then_interruptible([pgref] {
+            return seastar::stop_iteration::yes;
           });
-          // keep the ordering of non-pg ops when across pg internvals
-          return (has_pg_op ?
-                  interruptible_do_op() :
-                  with_sequencer(std::move(interruptible_do_op)))
-            .then_interruptible([pgref]() {
-              return seastar::stop_iteration::yes;
-            });
        }, [this, pgref](std::exception_ptr eptr) {
           if (should_abort_request(*this, std::move(eptr))) {
             sequencer.abort();
index 3f14acd1cd7af1e1aa5b2fc45321ab0ae6a0b0c9..9563c10226060d13c8a032b625da48fedadb0986 100644 (file)
@@ -57,10 +57,7 @@ public:
 
 public:
   seastar::future<> start();
-  uint64_t get_prev_id() const {
-    assert(prev_op_id.has_value());
-    return *prev_op_id;
-  }
+  bool same_session_and_pg(const ClientRequest& other_op) const;
 
 private:
   template <typename FuncT>
@@ -80,7 +77,10 @@ private:
   PGPipeline &pp(PG &pg);
 
   class OpSequencer& sequencer;
-  std::optional<uint64_t> prev_op_id;
+  // a tombstone used currently by OpSequencer. In the future it's supposed
+  // to be replaced with a reusage of OpTracking facilities.
+  bool finished = false;
+  friend class OpSequencer;
 
   template <typename Errorator>
   using interruptible_errorator =
@@ -89,7 +89,6 @@ private:
       Errorator>;
 private:
   bool is_misdirected(const PG& pg) const;
-  void may_set_prev_op();
 };
 
 }