]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: add I/O sequencer to preserve client_requests' order across PG interval...
authorXuehan Xu <xxhdx1985126@gmail.com>
Sun, 7 Feb 2021 09:19:12 +0000 (17:19 +0800)
committerXuehan Xu <xxhdx1985126@gmail.com>
Sun, 21 Feb 2021 11:15:14 +0000 (19:15 +0800)
Signed-off-by: Xuehan Xu <xxhdx1985126@gmail.com>
src/crimson/osd/osd_connection_priv.h
src/crimson/osd/osd_operation.h
src/crimson/osd/osd_operation_sequencer.h [new file with mode: 0644]
src/crimson/osd/osd_operations/client_request.cc
src/crimson/osd/osd_operations/client_request.h
src/crimson/osd/pg.h

index a265bb43268cfbed8522c5e7d7009d804dfb7523..018b16eebdb6bd098117c63bcf5de6f85807cbd4 100644 (file)
@@ -12,6 +12,7 @@
 namespace crimson::osd {
 
 struct OSDConnectionPriv : public crimson::net::Connection::user_private_t {
+  OperationRepeatSequencer<ClientRequest> opSequencer;
   ClientRequest::ConnectionPipeline client_request_conn_pipeline;
   RemotePeeringEvent::ConnectionPipeline peering_request_conn_pipeline;
   RepRequest::ConnectionPipeline replicated_request_conn_pipeline;
index 85f5be2edf031a60362a9a0506c63fc5f5f9487b..455d55414249536271a80c828a31954e2d96a03d 100644 (file)
@@ -3,7 +3,7 @@
 
 #pragma once
 
-#include "crimson/common/operation.h"
+#include "crimson/osd/osd_operation_sequencer.h"
 #include "crimson/osd/scheduler/scheduler.h"
 
 namespace crimson::osd {
@@ -53,7 +53,13 @@ public:
   virtual ~OperationT() = default;
 
 private:
+  epoch_t interval_start_epoch = 0;
+  using ops_seq_iter_t =
+    typename OperationRepeatSequencer<T>::ops_sequence_t::iterator;
+  std::optional<ops_seq_iter_t> pos;
   virtual void dump_detail(ceph::Formatter *f) const = 0;
+  template <typename>
+  friend class OperationRepeatSequencer;
 };
 
 /**
diff --git a/src/crimson/osd/osd_operation_sequencer.h b/src/crimson/osd/osd_operation_sequencer.h
new file mode 100644 (file)
index 0000000..3ce9d82
--- /dev/null
@@ -0,0 +1,80 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include <map>
+
+#include "crimson/common/operation.h"
+#include "osd/osd_types.h"
+
+namespace crimson::osd {
+
+template <typename>
+struct OperationComparator;
+
+template <typename T>
+class OperationRepeatSequencer {
+public:
+  using OpRef = boost::intrusive_ptr<T>;
+  using ops_sequence_t = std::map<OpRef, seastar::promise<>>;
+
+  template <typename Func, typename HandleT, typename Result = std::invoke_result_t<Func>>
+  seastar::futurize_t<Result> start_op(
+      HandleT& handle,
+      epoch_t same_interval_since,
+      OpRef& op,
+      const spg_t& pgid,
+      Func&& func) {
+    auto& ops = pg_ops[pgid];
+    if (!op->pos) {
+      [[maybe_unused]] auto [it, inserted] = ops.emplace(op, seastar::promise<>());
+      assert(inserted);
+      op->pos = it;
+    }
+
+    auto curr_op_pos = *(op->pos);
+    const bool first = (curr_op_pos == ops.begin());
+    auto prev_op_pos = first ? curr_op_pos : std::prev(curr_op_pos);
+    auto prev_ops_drained = seastar::now();
+    if (epoch_t prev_interval = prev_op_pos->first->interval_start_epoch;
+        !first && same_interval_since > prev_interval) {
+      // need to wait for previous operations,
+      // release the current pipepline stage
+      handle.exit();
+      auto& [prev_op, prev_op_complete] = *prev_op_pos;
+      ::crimson::get_logger(ceph_subsys_osd).debug(
+          "{}, same_interval_since: {}, previous op: {}, last_interval_start: {}",
+          *op, same_interval_since, prev_op, prev_interval);
+      prev_ops_drained = prev_op_complete.get_future();
+    } else {
+      assert(same_interval_since == prev_interval || first);
+    }
+    return prev_ops_drained.then(
+      [op, same_interval_since, func=std::forward<Func>(func)]() mutable {
+      op->interval_start_epoch = same_interval_since;
+      auto fut = seastar::futurize_invoke(func);
+      auto curr_op_pos = *(op->pos);
+      curr_op_pos->second.set_value();
+      curr_op_pos->second = seastar::promise<>();
+      return fut;
+    });
+  }
+
+  void finish_op(OpRef& op, const spg_t& pgid) {
+    assert(op->pos);
+    pg_ops.at(pgid).erase(*(op->pos));
+  }
+private:
+  std::map<spg_t, std::map<OpRef, seastar::promise<>, OperationComparator<T>>> pg_ops;
+};
+template <typename T>
+struct OperationComparator {
+  bool operator()(
+    const typename OperationRepeatSequencer<T>::OpRef& left,
+    const typename OperationRepeatSequencer<T>::OpRef& right) const {
+    return left->get_id() < right->get_id();
+  }
+};
+
+} // namespace crimson::osd
index 357ad513f7694125fd83b2151f63bfcebf7f68fd..899a065fc69b72adfbc75bd3beb475635d3e58ee 100644 (file)
@@ -23,9 +23,14 @@ namespace crimson::osd {
 
 ClientRequest::ClientRequest(
   OSD &osd, crimson::net::ConnectionRef conn, Ref<MOSDOp> &&m)
-  : osd(osd), conn(conn), m(m)
+  : osd(osd), conn(conn), m(m), ors(get_osd_priv(conn.get()).opSequencer)
 {}
 
+ClientRequest::~ClientRequest()
+{
+  logger().debug("{}: destroying", *this);
+}
+
 void ClientRequest::print(std::ostream &lhs) const
 {
   lhs << *m;
@@ -60,6 +65,7 @@ seastar::future<> ClientRequest::start()
   return crimson::common::handle_system_shutdown(
     [this, opref=std::move(opref)]() mutable {
     return seastar::repeat([this, opref]() mutable {
+      logger().debug("{}: in repeat", *this);
       return with_blocking_future(handle.enter(cp().await_map))
       .then([this]() {
        return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_min_epoch()));
@@ -67,33 +73,42 @@ seastar::future<> ClientRequest::start()
        return with_blocking_future(handle.enter(cp().get_pg));
       }).then([this] {
        return with_blocking_future(osd.wait_for_pg(m->get_spg()));
-      }).then([this, opref](Ref<PG> pgref) {
-       PG &pg = *pgref;
-       if (pg.can_discard_op(*m)) {
-         return osd.send_incremental_map(conn, m->get_map_epoch());
-       }
-       return with_blocking_future(
-         handle.enter(pp(pg).await_map)
-       ).then([this, &pg]() mutable {
-         return with_blocking_future(
-           pg.osdmap_gate.wait_for_map(m->get_min_epoch()));
-       }).then([this, &pg](auto map) mutable {
-         return with_blocking_future(
-           handle.enter(pp(pg).wait_for_active));
-       }).then([this, &pg]() mutable {
-         return with_blocking_future(pg.wait_for_active_blocker.wait());
-       }).then([this, pgref=std::move(pgref)]() mutable {
-         if (m->finish_decode()) {
-           m->clear_payload();
-         }
-         if (is_pg_op()) {
-           return process_pg_op(pgref);
-         } else {
-           return process_op(pgref);
+      }).then([this, opref](Ref<PG> pgref) mutable {
+       epoch_t same_interval_since = pgref->get_interval_start_epoch();
+       logger().debug("{} same_interval_since: {}", *this, same_interval_since);
+       return ors.start_op(
+         handle, same_interval_since, opref, pgref->get_pgid(),
+         [this, opref, pgref] {
+         PG &pg = *pgref;
+         if (pg.can_discard_op(*m)) {
+           logger().debug("{} op discarded, {}, same_primary_since: {}",
+               *this, pg, pg.get_info().history.same_primary_since);
+           return osd.send_incremental_map(conn, m->get_map_epoch());
          }
+         return with_blocking_future(
+           handle.enter(pp(pg).await_map)
+         ).then([this, &pg]() mutable {
+           return with_blocking_future(
+             pg.osdmap_gate.wait_for_map(m->get_min_epoch()));
+         }).then([this, &pg](auto map) mutable {
+           return with_blocking_future(
+             handle.enter(pp(pg).wait_for_active));
+         }).then([this, &pg]() mutable {
+           return with_blocking_future(pg.wait_for_active_blocker.wait());
+         }).then([this, pgref=std::move(pgref)]() mutable {
+           if (m->finish_decode()) {
+             m->clear_payload();
+           }
+           if (is_pg_op()) {
+             return process_pg_op(pgref);
+           } else {
+             return process_op(pgref);
+           }
+         });
+       }).then([this, opref, pgref]() mutable {
+         ors.finish_op(opref, pgref->get_pgid());
+         return seastar::stop_iteration::yes;
        });
-      }).then([] {
-       return seastar::stop_iteration::yes;
       }).handle_exception_type([](crimson::common::actingset_changed& e) {
        if (e.is_primary()) {
          logger().debug("operation restart, acting set changed");
index b721958508346a51952c0f9dc9b123e947ffe673..33bb35ee38e41547fc2db9a4a9e5e27502a0a9d9 100644 (file)
@@ -53,6 +53,7 @@ public:
   static constexpr OperationTypeCode type = OperationTypeCode::client_request;
 
   ClientRequest(OSD &osd, crimson::net::ConnectionRef, Ref<MOSDOp> &&m);
+  ~ClientRequest();
 
   void print(std::ostream &) const final;
   void dump_detail(Formatter *f) const final;
@@ -73,6 +74,7 @@ private:
   ConnectionPipeline &cp();
   PGPipeline &pp(PG &pg);
 
+  OperationRepeatSequencer<ClientRequest>& ors;
 private:
   bool is_misdirected(const PG& pg) const;
 };
index 6a9ab301fe58abb746c55172d0f2068ea3c4de61..ac220f841731fc6c4833bfa623846658c00a8d9a 100644 (file)
@@ -1,5 +1,5 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
 
 #pragma once
 
@@ -618,6 +618,9 @@ public:
   const map<pg_shard_t, pg_missing_t> &get_shard_missing() const {
     return peering_state.get_peer_missing();
   }
+  epoch_t get_interval_start_epoch() const {
+    return get_info().history.same_interval_since;
+  }
   const pg_missing_const_i* get_shard_missing(pg_shard_t shard) const {
     if (shard == pg_whoami)
       return &get_local_missing();