]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd: replace OpSequencer with simpler intrusive_list based implementation
authorSamuel Just <sjust@redhat.com>
Fri, 29 Apr 2022 01:16:32 +0000 (01:16 +0000)
committerSamuel Just <sjust@redhat.com>
Fri, 6 May 2022 06:11:34 +0000 (23:11 -0700)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/osd/osd_connection_priv.h
src/crimson/osd/osd_operation_sequencer.h [deleted file]
src/crimson/osd/osd_operations/client_request.cc
src/crimson/osd/osd_operations/client_request.h
src/crimson/osd/osd_operations/replicated_request.h
src/crimson/osd/pg.cc
src/crimson/osd/pg.h

index 32d677d9652f49b91135369f3c57ab9dc4904681..69edf94b88fe5c0d9aa6ca18835b9efffb95c140 100644 (file)
@@ -5,7 +5,6 @@
 
 #include "crimson/net/Connection.h"
 #include "crimson/osd/osd_operation.h"
-#include "crimson/osd/osd_operation_sequencer.h"
 #include "crimson/osd/osd_operations/client_request.h"
 #include "crimson/osd/osd_operations/peering_event.h"
 #include "crimson/osd/osd_operations/replicated_request.h"
@@ -13,7 +12,6 @@
 namespace crimson::osd {
 
 struct OSDConnectionPriv : public crimson::net::Connection::user_private_t {
-  OpSequencers op_sequencer;
   ConnectionPipeline client_request_conn_pipeline;
   ConnectionPipeline peering_request_conn_pipeline;
   ConnectionPipeline replicated_request_conn_pipeline;
diff --git a/src/crimson/osd/osd_operation_sequencer.h b/src/crimson/osd/osd_operation_sequencer.h
deleted file mode 100644 (file)
index dd89f2d..0000000
+++ /dev/null
@@ -1,216 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
-// vim: ts=8 sw=2 smarttab
-
-#pragma once
-
-#include <map>
-#include <fmt/format.h>
-#include <seastar/core/condition-variable.hh>
-#include "crimson/osd/osd_operations/client_request.h"
-
-namespace crimson::osd {
-
-// when PG interval changes, we are supposed to interrupt all in-flight ops.
-// but in the order in which the ops are interrupted are not determined
-// because they are scheduled by the seastar scheduler, if we just interrupt
-// them at seeing a different interval when moving to a new continuation. but
-// we are supposed to replay the ops from the same client targeting the same
-// PG in the exact order that they are received.
-//
-// the way how we address this problem is to set up a blocker which blocks an
-// op until the preceding op is unblocked if the blocked one is issued in a new
-// pg interval.
-//
-// here, the ops from the same client are grouped by PG, and then ordered by
-// their id which is monotonically increasing and unique on per PG basis, so we
-// can keep an op waiting in the case explained above.
-class OpSequencer {
-  bool resequencing{false};
-
-  static bool is_unfinished(const ClientRequest& this_op) {
-    // TODO: kill the tombstone; reuse op status tracking.
-    return !this_op.finished;
-  }
-
-  std::uint64_t get_prev_id(const ClientRequest& this_op,
-                            const OSDOperationRegistry& registry) {
-    // an alternative to iterating till the registy's beginning could be
-    // holding a pointer to next(last_completed).
-    constexpr auto type_idx = static_cast<size_t>(ClientRequest::type);
-    for (auto it = OSDOperationRegistry::op_list::s_iterator_to(this_op);
-         it != std::begin(registry.get_registry<type_idx>());
-         --it) {
-      // we're iterating over the operation registry of all ClientRequests.
-      // this list aggrates every single instance in the system, and thus
-      // we need to skip operations coming from different client's session
-      // or targeting different PGs.
-      // as this is supposed to happen on cold paths only, the overhead is
-      // a thing we can live with.
-      auto* maybe_prev_op = std::addressof(static_cast<const ClientRequest&>(*it));
-      if (maybe_prev_op->same_session_and_pg(this_op)) {
-        if (is_unfinished(*maybe_prev_op)) {
-          return maybe_prev_op->get_id();
-        } else {
-          // an early exited one
-        }
-      }
-    }
-    // the prev op of this session targeting the same PG as this_op must has
-    // been completed and hence already has been removed from the list, that's
-    // the only way we got here
-    return last_completed_id;
-  }
-
-public:
-  template <typename HandleT,
-            typename FuncT,
-            typename Result = std::invoke_result_t<FuncT>>
-  seastar::futurize_t<Result>
-  start_op(const ClientRequest& op,
-           HandleT& handle,
-           const OSDOperationRegistry& registry,
-           FuncT&& do_op) {
-    ::crimson::get_logger(ceph_subsys_osd).debug(
-      "OpSequencer::{}: op={}, last_started={}, last_unblocked={}, last_completed={}",
-      __func__, op.get_id(), last_started_id, last_unblocked_id, last_completed_id);
-    auto have_green_light = seastar::make_ready_future<>();
-    if (last_started_id < op.get_id()) {
-      // starting a new op, let's advance the last_started!
-      last_started_id = op.get_id();
-    }
-    if (__builtin_expect(resequencing, false)) {
-      // this implies that there was a reset condition and there may me some
-      // older ops before me, so i have to wait until they are unblocked.
-      //
-      // i should leave the current pipeline stage when waiting for the blocked
-      // ones, so that the following ops can be queued up here. we cannot let
-      // the seastar scheduler to determine the order of performing these ops,
-      // once they are unblocked after the first op of the same pg interval is
-      // scheduled.
-      const auto prev_id = get_prev_id(op, registry);
-      assert(prev_id >= last_unblocked_id);
-      handle.exit();
-      ::crimson::get_logger(ceph_subsys_osd).debug(
-        "OpSequencer::start_op: {} resequencing ({} >= {})",
-        op, prev_id, last_unblocked_id);
-      have_green_light = unblocked.wait([&op, &registry, this] {
-        // wait until the previous op is unblocked
-        const bool unblocking =
-          get_prev_id(op, registry) == last_unblocked_id;
-        if (unblocking) {
-          // stop resequencing if everything is handled which means there is no
-          // operation after us. the range could be minimized by snapshotting
-          // `last_started` on `maybe_reset()`.
-          // `<=` is to handle the situation when `last_started` has finished out-
-          // of-the-order.
-          resequencing = !(last_started_id <= op.get_id());
-        }
-        return unblocking;
-      });
-    }
-    return have_green_light.then([&op, do_op=std::move(do_op), this]() mutable {
-      auto result = seastar::futurize_invoke(std::move(do_op));
-      // unblock the next one
-      last_unblocked_id = op.get_id();
-      unblocked.broadcast();
-      return result;
-    });
-  }
-  void finish_op_in_order(ClientRequest& op) {
-    ::crimson::get_logger(ceph_subsys_osd).debug(
-      "OpSequencer::{}: op={}, last_started={}, last_unblocked={}, last_completed={}",
-      __func__, op.get_id(), last_started_id, last_unblocked_id, last_completed_id);
-    assert(op.get_id() > last_completed_id);
-    last_completed_id = op.get_id();
-    op.finished = true;
-  }
-  void finish_op_out_of_order(ClientRequest& op,
-                              const OSDOperationRegistry& registry) {
-    ::crimson::get_logger(ceph_subsys_osd).debug(
-      "OpSequencer::{}: op={}, last_started={}, last_unblocked={}, last_completed={}",
-      __func__, op.get_id(), last_started_id, last_unblocked_id, last_completed_id);
-    op.finished = true;
-    // fix the `last_unblocked_id`. otherwise we wouldn't be able to leave
-    // the wait loop in `start_op()` as any concrete value of `last_unblocked_id`
-    // can wake at most one blocked operation (the successor of `op` if there is any)
-    // and above we lowered this number to 0 (`get_prev_id()` there would never return
-    // a value matching `last_unblocked_id`).
-    if (last_unblocked_id == op.get_id()) {
-      last_unblocked_id = get_prev_id(op, registry);
-    }
-  }
-  void maybe_reset(const ClientRequest& op) {
-    ::crimson::get_logger(ceph_subsys_osd).debug(
-      "OpSequencer::{}: op={}, last_started={}, last_unblocked={}, last_completed={}",
-      __func__, op.get_id(), last_started_id, last_unblocked_id, last_completed_id);
-    const auto op_id = op.get_id();
-    // pg interval changes, so we need to reenqueue the previously unblocked
-    // ops by rewinding the "last_unblock" ID.
-    if (op_id <= last_unblocked_id) {
-      ::crimson::get_logger(ceph_subsys_osd).debug(
-        "OpSequencer::maybe_reset:{}  {} <= {}, resetting to {}",
-        op, op_id, last_unblocked_id, last_completed_id);
-      last_unblocked_id = last_completed_id;
-      resequencing = true;
-    }
-  }
-  void abort() {
-    ::crimson::get_logger(ceph_subsys_osd).debug(
-      "OpSequencer::{}: last_started={}, last_unblocked={}, last_completed={}",
-      __func__, last_started_id, last_unblocked_id, last_completed_id);
-    // all blocked ops should be canceled, likely due to the osd is not primary
-    // anymore.
-    unblocked.broken();
-  }
-private:
-  //          /--- unblocked (in pg pipeline)
-  //         |      /--- blocked
-  //         V      V
-  // |////|.....|.......| <--- last_started
-  //      ^     ^       ^
-  //      |     |       \- prev_op
-  //      |      \--- last_unblocked
-  //      last_completed
-  //
-  // the id of last op which is issued
-  std::uint64_t last_started_id = 0;
-  // the id of last op which is unblocked
-  std::uint64_t last_unblocked_id = 0;
-  // the id of last op which is completed
-  std::uint64_t last_completed_id = 0;
-  seastar::condition_variable unblocked;
-
-  friend fmt::formatter<OpSequencer>;
-};
-
-
-class OpSequencers {
-public:
-  OpSequencer& get(const spg_t& pgid) {
-    return pg_ops.at(pgid);
-  }
-  OpSequencer& operator[](const spg_t& pgid) {
-    // TODO: trim pg_ops if there are too many empty sequencers
-    return pg_ops[pgid];
-  }
-private:
-  std::map<spg_t, OpSequencer> pg_ops;
-};
-} // namespace crimson::osd
-
-template <>
-struct fmt::formatter<crimson::osd::OpSequencer> {
-  // ignore the format string
-  constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
-
-  template <typename FormatContext>
-  auto format(const crimson::osd::OpSequencer& sequencer,
-              FormatContext& ctx)
-  {
-    return fmt::format_to(ctx.out(),
-                          "(last_completed={},last_unblocked={},last_started={})",
-                          sequencer.last_completed_id,
-                          sequencer.last_unblocked_id,
-                          sequencer.last_started_id);
-  }
-};
index bc0c3d943970f5f692aed0ebbb4eda1a2d63acd4..b35dbff1609ea28d59d71854fbab974868f51ee4 100644 (file)
@@ -1,8 +1,6 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
 // vim: ts=8 sw=2 smarttab expandtab
 
-#include <seastar/core/future.hh>
-
 #include "messages/MOSDOp.h"
 #include "messages/MOSDOpReply.h"
 
@@ -10,7 +8,6 @@
 #include "crimson/osd/pg.h"
 #include "crimson/osd/osd.h"
 #include "common/Formatter.h"
-#include "crimson/osd/osd_operation_sequencer.h"
 #include "crimson/osd/osd_operation_external_tracking.h"
 #include "crimson/osd/osd_operations/client_request.h"
 #include "crimson/osd/osd_connection_priv.h"
@@ -23,12 +20,42 @@ namespace {
 
 namespace crimson::osd {
 
+
+void ClientRequest::Orderer::requeue(
+  ShardServices &shard_services, Ref<PG> pg)
+{
+  for (auto &req: list) {
+    req.handle.exit();
+  }
+  for (auto &req: list) {
+    logger().debug("{}: {} requeueing {}", __func__, *pg, req);
+    std::ignore = req.with_pg_int(shard_services, pg);
+  }
+}
+
+void ClientRequest::Orderer::clear_and_cancel()
+{
+  for (auto i = list.begin(); i != list.end(); ) {
+    logger().debug(
+      "{}: ClientRequest::Orderer::clear_and_cancel {}",
+      *i);
+    i->complete_request();
+    remove_request(*(i++));
+  }
+}
+
+void ClientRequest::complete_request()
+{
+  track_event<CompletionEvent>();
+  handle.exit();
+  on_complete.set_value();
+}
+
 ClientRequest::ClientRequest(
   OSD &osd, crimson::net::ConnectionRef conn, Ref<MOSDOp> &&m)
   : osd(osd),
     conn(conn),
-    m(m),
-    sequencer(get_osd_priv(conn.get()).op_sequencer[m->get_spg()])
+    m(m)
 {}
 
 ClientRequest::~ClientRequest()
@@ -77,7 +104,7 @@ bool ClientRequest::is_pg_op() const
     [](auto& op) { return ceph_osd_op_type_pg(op.op.op); });
 }
 
-seastar::future<seastar::stop_iteration> ClientRequest::with_pg_int(
+seastar::future<> ClientRequest::with_pg_int(
   ShardServices &shard_services, Ref<PG> pgref)
 {
   epoch_t same_interval_since = pgref->get_interval_start_epoch();
@@ -85,79 +112,72 @@ seastar::future<seastar::stop_iteration> ClientRequest::with_pg_int(
   if (m->finish_decode()) {
     m->clear_payload();
   }
+  const auto this_instance_id = instance_id++;
+  OperationRef opref{this};
   return interruptor::with_interruption(
-    [this, &shard_services, pgref]() mutable {
-      return sequencer.start_op(
-       *this, handle, shard_services.registry,
-       interruptor::wrap_function([pgref, this, &shard_services] {
-         PG &pg = *pgref;
-         if (pg.can_discard_op(*m)) {
-           return osd.send_incremental_map(
-             conn, m->get_map_epoch()
-           ).then([this, &shard_services] {
-             sequencer.finish_op_out_of_order(*this, shard_services.registry);
-             return interruptor::now();
-           });
-         }
-         return enter_stage<interruptor>(pp(pg).await_map
-         ).then_interruptible([this, &pg] {
-           return with_blocking_event<
-             PG_OSDMapGate::OSDMapBlocker::BlockingEvent
-             >([this, &pg] (auto&& trigger) {
-               return pg.osdmap_gate.wait_for_map(std::move(trigger),
-                                                  m->get_min_epoch());
-             });
-         }).then_interruptible([this, &pg](auto map) {
-           return enter_stage<interruptor>(pp(pg).wait_for_active);
-         }).then_interruptible([this, &pg]() {
-           return with_blocking_event<
-             PGActivationBlocker::BlockingEvent
-             >([&pg] (auto&& trigger) {
-                return pg.wait_for_active_blocker.wait(std::move(trigger));
-             });
-         }).then_interruptible(
-           [this, &shard_services, pgref=std::move(pgref)]() mutable {
-             if (is_pg_op()) {
-               return process_pg_op(
-                 pgref
-               ).then_interruptible([this, &shard_services] {
-                 sequencer.finish_op_out_of_order(*this, shard_services.registry);
-               });
-             } else {
-               return process_op(
-                 pgref
-               ).then_interruptible([this, &shard_services](const seq_mode_t mode) {
-                 if (mode == seq_mode_t::IN_ORDER) {
-                   sequencer.finish_op_in_order(*this);
-                 } else {
-                   assert(mode == seq_mode_t::OUT_OF_ORDER);
-                   sequencer.finish_op_out_of_order(*this, shard_services.registry);
-                 }
-               });
-             }
-           });
-       })).then_interruptible([] {
-         return seastar::stop_iteration::yes;
+    [this, pgref, this_instance_id]() mutable {
+      PG &pg = *pgref;
+      if (pg.can_discard_op(*m)) {
+       return osd.send_incremental_map(
+         conn, m->get_map_epoch()
+       ).then([this, this_instance_id, pgref] {
+         logger().debug("{}.{}: discarding", *this, this_instance_id);
+         pgref->client_request_orderer.remove_request(*this);
+         complete_request();
+         return interruptor::now();
        });
-    }, [this, pgref](std::exception_ptr eptr) {
-      if (should_abort_request(*this, std::move(eptr))) {
-       sequencer.abort();
-       return seastar::stop_iteration::yes;
-      } else {
-       sequencer.maybe_reset(*this);
-       return seastar::stop_iteration::no;
       }
-    }, pgref);
+      return enter_stage<interruptor>(pp(pg).await_map
+      ).then_interruptible([this, this_instance_id, &pg] {
+       logger().debug("{}.{}: after await_map stage", *this, this_instance_id);
+       return with_blocking_event<
+         PG_OSDMapGate::OSDMapBlocker::BlockingEvent
+         >([this, &pg] (auto&& trigger) {
+           return pg.osdmap_gate.wait_for_map(std::move(trigger),
+                                              m->get_min_epoch());
+         });
+      }).then_interruptible([this, this_instance_id, &pg](auto map) {
+       logger().debug("{}.{}: after wait_for_map", *this, this_instance_id);
+       return enter_stage<interruptor>(pp(pg).wait_for_active);
+      }).then_interruptible([this, this_instance_id, &pg]() {
+       logger().debug(
+         "{}.{}: after wait_for_active stage", *this, this_instance_id);
+       return with_blocking_event<
+         PGActivationBlocker::BlockingEvent
+         >([&pg] (auto&& trigger) {
+           return pg.wait_for_active_blocker.wait(std::move(trigger));
+         });
+      }).then_interruptible([this, pgref, this_instance_id]() mutable
+                           -> interruptible_future<> {
+       logger().debug(
+         "{}.{}: after wait_for_active", *this, this_instance_id);
+       if (is_pg_op()) {
+         return process_pg_op(pgref);
+       } else {
+         return process_op(
+           pgref
+         ).then_interruptible([](auto){});
+       }
+      }).then_interruptible([this, this_instance_id, pgref] {
+       logger().debug("{}.{}: after process*", *this, this_instance_id);
+       pgref->client_request_orderer.remove_request(*this);
+       complete_request();
+      });
+    }, [this, this_instance_id, pgref](std::exception_ptr eptr) {
+      // TODO: better debug output
+      logger().debug("{}.{}: interrupted {}", *this, this_instance_id, eptr);
+    }, pgref).finally([opref=std::move(opref), pgref=std::move(pgref)] {});
 }
 
 seastar::future<> ClientRequest::with_pg(
   ShardServices &shard_services, Ref<PG> pgref)
 {
-  return seastar::repeat([this, &shard_services, pgref]() mutable {
-    return with_pg_int(shard_services, pgref);
-  }).then([this] {
-    track_event<CompletionEvent>();
-  });
+  pgref->client_request_orderer.add_request(*this);
+  auto ret = on_complete.get_future();
+  std::ignore = with_pg_int(
+    shard_services, std::move(pgref)
+  );
+  return ret;
 }
 
 ClientRequest::interruptible_future<>
index 21e09bc57cc9e7e272791987f4a04d74bf185345..a222ea6a03af04c0565163a746b4adb6f96cc791 100644 (file)
@@ -3,6 +3,11 @@
 
 #pragma once
 
+#include <seastar/core/future.hh>
+
+#include <boost/intrusive/list.hpp>
+#include <boost/intrusive_ptr.hpp>
+
 #include "osd/osd_op_util.h"
 #include "crimson/net/Connection.h"
 #include "crimson/osd/object_context.h"
@@ -26,6 +31,8 @@ class ClientRequest final : public PhasedOperationT<ClientRequest>,
   crimson::net::ConnectionRef conn;
   Ref<MOSDOp> m;
   OpInfo op_info;
+  seastar::promise<> on_complete;
+  unsigned instance_id = 0;
 
 public:
   class PGPipeline : public CommonPGPipeline {
@@ -42,6 +49,34 @@ public:
     friend class LttngBackend;
   };
 
+  using ordering_hook_t = boost::intrusive::list_member_hook<>;
+  ordering_hook_t ordering_hook;
+  class Orderer {
+    using list_t = boost::intrusive::list<
+      ClientRequest,
+      boost::intrusive::member_hook<
+       ClientRequest,
+       typename ClientRequest::ordering_hook_t,
+       &ClientRequest::ordering_hook>
+      >;
+    list_t list;
+
+  public:
+    void add_request(ClientRequest &request) {
+      assert(!request.ordering_hook.is_linked());
+      intrusive_ptr_add_ref(&request);
+      list.push_back(request);
+    }
+    void remove_request(ClientRequest &request) {
+      assert(request.ordering_hook.is_linked());
+      list.erase(list_t::s_iterator_to(request));
+      intrusive_ptr_release(&request);
+    }
+    void requeue(ShardServices &shard_services, Ref<PG> pg);
+    void clear_and_cancel();
+  };
+  void complete_request();
+
   static constexpr OperationTypeCode type = OperationTypeCode::client_request;
 
   ClientRequest(OSD &osd, crimson::net::ConnectionRef, Ref<MOSDOp> &&m);
@@ -58,7 +93,7 @@ public:
   PipelineHandle &get_handle() { return handle; }
   epoch_t get_epoch() const { return m->get_min_epoch(); }
 
-  seastar::future<seastar::stop_iteration> with_pg_int(
+  seastar::future<> with_pg_int(
     ShardServices &shard_services, Ref<PG> pg);
 public:
   bool same_session_and_pg(const ClientRequest& other_op) const;
@@ -89,12 +124,6 @@ private:
   ConnectionPipeline &cp();
   PGPipeline &pp(PG &pg);
 
-  class OpSequencer& sequencer;
-  // a tombstone used currently by OpSequencer. In the future it's supposed
-  // to be replaced with a reusage of OpTracking facilities.
-  bool finished = false;
-  friend class OpSequencer;
-
   template <typename Errorator>
   using interruptible_errorator =
     ::crimson::interruptible::interruptible_errorator<
index f07bc86105ab1e8e3d6a929e9d3e0b4ac816fcaf..39c6d6a2473753eeb8f2f80dbd356c38a1f7a622 100644 (file)
@@ -39,10 +39,11 @@ public:
   seastar::future<> start();
 
   std::tuple<
+    ConnectionPipeline::AwaitActive::BlockingEvent,
     ConnectionPipeline::AwaitMap::BlockingEvent,
-    OSD_OSDMapGate::OSDMapBlocker::BlockingEvent,
     ConnectionPipeline::GetPG::BlockingEvent,
-    PGMap::PGCreationBlockingEvent
+    PGMap::PGCreationBlockingEvent,
+    OSD_OSDMapGate::OSDMapBlocker::BlockingEvent
   > tracking_events;
 
 private:
index 5aded130e55711daae7230c09f19daaef1713d35..910ab6c1903a6d33a726377f6e5823e659173a87 100644 (file)
@@ -1149,12 +1149,20 @@ seastar::future<> PG::stop()
 }
 
 void PG::on_change(ceph::os::Transaction &t) {
-  logger().debug("{}, {}", __func__, *this);
+  logger().debug("{} {}:", *this, __func__);
   for (auto& obc : obc_set_accessing) {
     obc.interrupt(::crimson::common::actingset_changed(is_primary()));
   }
   recovery_backend->on_peering_interval_change(t);
   backend->on_actingset_changed({ is_primary() });
+  wait_for_active_blocker.unblock();
+  if (is_primary()) {
+    logger().debug("{} {}: requeueing", *this, __func__);
+    client_request_orderer.requeue(shard_services, this);
+  } else {
+    logger().debug("{} {}: dropping requests", *this, __func__);
+    client_request_orderer.clear_and_cancel();
+  }
 }
 
 bool PG::can_discard_op(const MOSDOp& m) const {
index b44aac4a952325db4f452cb236fb1415c3c14f7a..f1fa1d32b42871c9c00b583d9d08883483c971c9 100644 (file)
@@ -55,7 +55,6 @@ namespace crimson::os {
 }
 
 namespace crimson::osd {
-class ClientRequest;
 class OpsExecuter;
 
 class PG : public boost::intrusive_ref_counter<
@@ -72,6 +71,8 @@ class PG : public boost::intrusive_ref_counter<
   PGPeeringPipeline peering_request_pg_pipeline;
   RepRequest::PGPipeline replicated_request_pg_pipeline;
 
+  ClientRequest::Orderer client_request_orderer;
+
   spg_t pgid;
   pg_shard_t pg_whoami;
   crimson::os::CollectionRef coll_ref;