]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd: refactor OperationRepeatSequencer
authorKefu Chai <kchai@redhat.com>
Mon, 22 Feb 2021 02:17:23 +0000 (10:17 +0800)
committerKefu Chai <kchai@redhat.com>
Wed, 24 Feb 2021 08:53:07 +0000 (16:53 +0800)
* extract the OpSequencer out from OperationRepeatSequencer
* refactor OpSequencer so we don't need to track the ops using a map,
  only track the last op and last pg interval for better performance
  and smaller memory footprint.

Signed-off-by: Kefu Chai <kchai@redhat.com>
src/crimson/osd/osd_connection_priv.h
src/crimson/osd/osd_operation.h
src/crimson/osd/osd_operation_sequencer.h
src/crimson/osd/osd_operations/client_request.cc
src/crimson/osd/osd_operations/client_request.h

index 10410ec72065109080f2b90a26e473e7e3b0d182..a0482077b57130d0753c9ea8927e8f504f578faa 100644 (file)
@@ -12,7 +12,7 @@
 namespace crimson::osd {
 
 struct OSDConnectionPriv : public crimson::net::Connection::user_private_t {
-  OperationRepeatSequencer<ClientRequest> op_sequencer;
+  OperationRepeatSequencer op_sequencer;
   ClientRequest::ConnectionPipeline client_request_conn_pipeline;
   RemotePeeringEvent::ConnectionPipeline peering_request_conn_pipeline;
   RepRequest::ConnectionPipeline replicated_request_conn_pipeline;
index 455d55414249536271a80c828a31954e2d96a03d..1e438f90dbaa498073261c3395057529a52afd37 100644 (file)
@@ -53,13 +53,7 @@ public:
   virtual ~OperationT() = default;
 
 private:
-  epoch_t interval_start_epoch = 0;
-  using ops_seq_iter_t =
-    typename OperationRepeatSequencer<T>::ops_sequence_t::iterator;
-  std::optional<ops_seq_iter_t> pos;
   virtual void dump_detail(ceph::Formatter *f) const = 0;
-  template <typename>
-  friend class OperationRepeatSequencer;
 };
 
 /**
index 86662e5108b2da6765564e7947edce321b4a61e4..7e3ac7bd9209e758101dc2b5c43a11b71564d4ab 100644 (file)
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
 // vim: ts=8 sw=2 smarttab
 
 #pragma once
 
 #include <map>
-
+#include <seastar/core/condition-variable.hh>
 #include "crimson/common/operation.h"
 #include "osd/osd_types.h"
 
 namespace crimson::osd {
 
-template <typename>
-struct OperationComparator;
-
-template <typename T>
-class OperationRepeatSequencer {
+// when PG interval changes, we are supposed to interrupt all in-flight ops.
+// but in the order in which the ops are interrupted are not determined
+// because they are scheduled by the seastar scheduler, if we just interrupt
+// them at seeing a different interval when moving to a new continuation. but
+// we are supposed to replay the ops from the same client targeting the same
+// PG in the exact order that they are received.
+//
+// the way how we address this problem is to set up a blocker which blocks an
+// op until the preceding op is unblocked if the blocked one is issued in a new
+// pg interval.
+//
+// here, the ops from the same client are grouped by PG, and then ordered by
+// their id which is monotonically increasing and unique on per PG basis, so we
+// can keep an op waiting in the case explained above.
+class OpSequencer {
 public:
-  using OpRef = boost::intrusive_ptr<T>;
-  using ops_sequence_t = std::map<OpRef, seastar::promise<>>;
-
-  template <typename Func, typename HandleT, typename Result = std::invoke_result_t<Func>>
-  seastar::futurize_t<Result> start_op(
-      HandleT& handle,
-      epoch_t same_interval_since,
-      OpRef& op,
-      const spg_t& pgid,
-      Func&& func) {
-    auto& ops = pg_ops[pgid];
-    if (!op->pos) {
-      [[maybe_unused]] auto [it, inserted] = ops.emplace(op, seastar::promise<>());
-      assert(inserted);
-      op->pos = it;
+  template <typename HandleT,
+            typename FuncT,
+            typename Result = std::invoke_result_t<FuncT>>
+  seastar::futurize_t<Result>
+  start_op(HandleT& handle,
+           uint64_t prev_op,
+           uint64_t this_op,
+           FuncT&& do_op) {
+    auto have_green_light = seastar::make_ready_future<>();
+    assert(prev_op < this_op);
+    if (last_issued == prev_op) {
+      // starting a new op, let's advance the last_issued!
+      last_issued = this_op;
     }
-
-    auto curr_op_pos = *(op->pos);
-    const bool first = (curr_op_pos == ops.begin());
-    auto prev_op_pos = first ? curr_op_pos : std::prev(curr_op_pos);
-    auto prev_ops_drained = seastar::now();
-    if (epoch_t prev_interval = prev_op_pos->first->interval_start_epoch;
-        !first && same_interval_since > prev_interval) {
-      // need to wait for previous operations,
-      // release the current pipepline stage
+    if (prev_op != last_unblocked) {
+      // this implies that there are some blocked ops before me, so i have to
+      // wait until they are unblocked.
+      //
+      // i should leave the current pipeline stage when waiting for the blocked
+      // ones, so that the following ops can be queued up here. we cannot let
+      // the seastar scheduler to determine the order of performing these ops,
+      // once they are unblocked after the first op of the same pg interval is
+      // scheduled.
+      assert(prev_op > last_unblocked);
       handle.exit();
-      auto& [prev_op, prev_op_complete] = *prev_op_pos;
       ::crimson::get_logger(ceph_subsys_osd).debug(
-          "{}, same_interval_since: {}, previous op: {}, last_interval_start: {}",
-          *op, same_interval_since, prev_op, prev_interval);
-      prev_ops_drained = prev_op_complete.get_future();
-    } else {
-      assert(same_interval_since == prev_interval || first);
+        "OpSequencer::start_op: {} waiting ({} > {})",
+        this_op, prev_op, last_unblocked);
+      have_green_light = unblocked.wait([prev_op, this] {
+        // wait until the previous op is unblocked
+        return last_unblocked == prev_op;
+      });
     }
-    return prev_ops_drained.then(
-      [op, same_interval_since, func=std::forward<Func>(func)]() mutable {
-      op->interval_start_epoch = same_interval_since;
-      auto fut = seastar::futurize_invoke(func);
-      auto curr_op_pos = *(op->pos);
-      curr_op_pos->second.set_value();
-      curr_op_pos->second = seastar::promise<>();
-      return fut;
+    return have_green_light.then([this_op, do_op=std::move(do_op), this] {
+      auto result = seastar::futurize_invoke(do_op);
+      // unblock the next one
+      last_unblocked = this_op;
+      unblocked.broadcast();
+      return result;
     });
   }
-
-  void finish_op(OpRef& op, const spg_t& pgid, bool interrutped) {
-    assert(op->pos);
-    auto curr_op_pos = *(op->pos);
-    if (interrutped) {
-      curr_op_pos->second.set_value();
+  uint64_t get_last_issued() const {
+    return last_issued;
+  }
+  void finish_op(uint64_t op_id) {
+    assert(op_id > last_completed);
+    last_completed = op_id;
+  }
+  void maybe_reset(uint64_t op_id) {
+    // pg interval changes, so we need to reenqueue the previously unblocked
+    // ops by rewinding the "last_unblock" pointer
+    if (op_id <= last_unblocked) {
+      last_unblocked = last_completed;
     }
-    pg_ops.at(pgid).erase(curr_op_pos);
+  }
+  void abort() {
+    // all blocked ops should be canceled, likely due to the osd is not primary
+    // anymore.
+    unblocked.broken();
   }
 private:
-  std::map<spg_t, std::map<OpRef, seastar::promise<>, OperationComparator<T>>> pg_ops;
+  //          /--- unblocked (in pg pipeline)
+  //         |      /--- blocked
+  //         V      V
+  // |////|.....|.......| <--- last_issued
+  //      ^     ^       ^
+  //      |     |       \- prev_op
+  //      |      \--- last_unblocked
+  //      last_completed
+  //
+  // the id of last op which is issued
+  uint64_t last_issued = 0;
+  // the id of last op which is unblocked
+  uint64_t last_unblocked = 0;
+  // the id of last op which is completed
+  uint64_t last_completed = 0;
+  seastar::condition_variable unblocked;
 };
-template <typename T>
-struct OperationComparator {
-  bool operator()(
-    const typename OperationRepeatSequencer<T>::OpRef& left,
-    const typename OperationRepeatSequencer<T>::OpRef& right) const {
-    return left->get_id() < right->get_id();
+
+class OperationRepeatSequencer {
+public:
+  OpSequencer& get(const spg_t& pgid) {
+    return pg_ops.at(pgid);
   }
+  OpSequencer& operator[](const spg_t& pgid) {
+    // TODO: trim pg_ops if there are too many empty sequencers
+    return pg_ops[pgid];
+  }
+private:
+  std::map<spg_t, OpSequencer> pg_ops;
 };
-
 } // namespace crimson::osd
index ffdfe205914255ff73081d1d0029458a59a00ad2..1f59a907080ff01a83ead2204e4a5fde8b991a31 100644 (file)
@@ -23,7 +23,11 @@ namespace crimson::osd {
 
 ClientRequest::ClientRequest(
   OSD &osd, crimson::net::ConnectionRef conn, Ref<MOSDOp> &&m)
-  : osd(osd), conn(conn), m(m), ors(get_osd_priv(conn.get()).op_sequencer)
+  : osd(osd),
+    conn(conn),
+    m(m),
+    sequencer(get_osd_priv(conn.get()).op_sequencer[m->get_spg()]),
+    prev_op_id(sequencer.get_last_issued())
 {}
 
 ClientRequest::~ClientRequest()
@@ -76,8 +80,8 @@ seastar::future<> ClientRequest::start()
       }).then([this, opref](Ref<PG> pgref) mutable {
        epoch_t same_interval_since = pgref->get_interval_start_epoch();
        logger().debug("{} same_interval_since: {}", *this, same_interval_since);
-       return ors.start_op(
-         handle, same_interval_since, opref, pgref->get_pgid(),
+       return sequencer.start_op(
+         handle, prev_op_id, opref->get_id(),
          [this, opref, pgref] {
          PG &pg = *pgref;
          if (pg.can_discard_op(*m)) {
@@ -105,19 +109,23 @@ seastar::future<> ClientRequest::start()
              return process_op(pgref);
            }
          });
-       }).then([this, opref, pgref]() mutable {
-         ors.finish_op(opref, pgref->get_pgid(), false);
-         return seastar::stop_iteration::yes;
+        }).then([this] {
+          sequencer.finish_op(get_id());
+          return seastar::stop_iteration::yes;
         }).handle_exception_type(
-          [opref, pgref, this](crimson::common::actingset_changed& e) mutable {
+          [this](crimson::common::actingset_changed& e) mutable {
           if (e.is_primary()) {
             logger().debug("operation restart, acting set changed");
+            sequencer.maybe_reset(get_id());
             return seastar::stop_iteration::no;
           } else {
-            ors.finish_op(opref, pgref->get_pgid(), true);
+            sequencer.abort();
             logger().debug("operation abort, up primary changed");
             return seastar::stop_iteration::yes;
           }
+        }).handle_exception_type(
+          [](seastar::broken_condition_variable&) {
+          return seastar::stop_iteration::yes;
         });
       });
     });
index 33bb35ee38e41547fc2db9a4a9e5e27502a0a9d9..a88c3b331c2d0917da7b57eb9449857dcbcb16d1 100644 (file)
@@ -74,7 +74,9 @@ private:
   ConnectionPipeline &cp();
   PGPipeline &pp(PG &pg);
 
-  OperationRepeatSequencer<ClientRequest>& ors;
+  OpSequencer& sequencer;
+  const uint64_t prev_op_id;
+
 private:
   bool is_misdirected(const PG& pg) const;
 };