]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd/client_request: use fresh tracking_events/handle instances on requeue 48057/head
authorSamuel Just <sjust@redhat.com>
Mon, 12 Sep 2022 19:17:16 +0000 (19:17 +0000)
committerSamuel Just <sjust@redhat.com>
Mon, 19 Sep 2022 21:04:14 +0000 (21:04 +0000)
See instance_handle_t explanation in client_request.h

Fixes: https://tracker.ceph.com/issues/57494
Fixes: https://tracker.ceph.com/issues/57495
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/common/utility.h
src/crimson/osd/osd_operations/client_request.cc
src/crimson/osd/osd_operations/client_request.h
src/crimson/osd/osdmap_gate.h
src/crimson/osd/pg_activation_blocker.h

index 6fc50bb1c1a10cd1cc7a1f8e5e78e2d4a0053b38..86b30815585c229f2e0bf900726b04ca6407f9f2 100644 (file)
@@ -18,3 +18,21 @@ void assert_moveable(const T& t) {
     static_assert(_impl::always_false<T>::value, "unable to move-out from T");
 }
 
+namespace internal {
+
+template <typename Obj, typename Method, typename ArgTuple, size_t... I>
+static auto _apply_method_to_tuple(
+  Obj &obj, Method method, ArgTuple &&tuple,
+  std::index_sequence<I...>) {
+  return (obj.*method)(std::get<I>(std::forward<ArgTuple>(tuple))...);
+}
+
+}
+
+template <typename Obj, typename Method, typename ArgTuple>
+auto apply_method_to_tuple(Obj &obj, Method method, ArgTuple &&tuple) {
+  constexpr auto tuple_size = std::tuple_size_v<ArgTuple>;
+  return internal::_apply_method_to_tuple(
+    obj, method, std::forward<ArgTuple>(tuple),
+    std::make_index_sequence<tuple_size>());
+}
index db531f7bcb242f01680f1bd9da0a44a2138968a8..fca707f985125b1ce34f9a4bc6c1381a3c2280e9 100644 (file)
@@ -24,11 +24,9 @@ namespace crimson::osd {
 void ClientRequest::Orderer::requeue(
   ShardServices &shard_services, Ref<PG> pg)
 {
-  for (auto &req: list) {
-    req.handle.exit();
-  }
   for (auto &req: list) {
     logger().debug("{}: {} requeueing {}", __func__, *pg, req);
+    req.reset_instance_handle();
     std::ignore = req.with_pg_int(shard_services, pg);
   }
 }
@@ -47,7 +45,6 @@ void ClientRequest::Orderer::clear_and_cancel()
 void ClientRequest::complete_request()
 {
   track_event<CompletionEvent>();
-  handle.exit();
   on_complete.set_value();
 }
 
@@ -55,7 +52,8 @@ ClientRequest::ClientRequest(
   OSD &osd, crimson::net::ConnectionRef conn, Ref<MOSDOp> &&m)
   : osd(osd),
     conn(std::move(conn)),
-    m(std::move(m))
+    m(std::move(m)),
+    instance_handle(seastar::make_lw_shared<instance_handle_t>())
 {}
 
 ClientRequest::~ClientRequest()
@@ -114,8 +112,10 @@ seastar::future<> ClientRequest::with_pg_int(
   }
   const auto this_instance_id = instance_id++;
   OperationRef opref{this};
+  auto instance_handle = get_instance_handle();
+  auto &ihref = *instance_handle;
   return interruptor::with_interruption(
-    [this, pgref, this_instance_id]() mutable {
+    [this, pgref, this_instance_id, &ihref]() mutable {
       PG &pg = *pgref;
       if (pg.can_discard_op(*m)) {
        return osd.send_incremental_map(
@@ -127,28 +127,23 @@ seastar::future<> ClientRequest::with_pg_int(
          return interruptor::now();
        });
       }
-      return enter_stage<interruptor>(pp(pg).await_map
-      ).then_interruptible([this, this_instance_id, &pg] {
+      return ihref.enter_stage<interruptor>(pp(pg).await_map, *this
+      ).then_interruptible([this, this_instance_id, &pg, &ihref] {
        logger().debug("{}.{}: after await_map stage", *this, this_instance_id);
-       return with_blocking_event<
-         PG_OSDMapGate::OSDMapBlocker::BlockingEvent
-         >([this, &pg] (auto&& trigger) {
-           return pg.osdmap_gate.wait_for_map(
-             std::move(trigger),
-             m->get_min_epoch());
-         });
-      }).then_interruptible([this, this_instance_id, &pg](auto map) {
+       return ihref.enter_blocker(
+         *this, pg.osdmap_gate, &decltype(pg.osdmap_gate)::wait_for_map,
+         m->get_min_epoch(), nullptr);
+      }).then_interruptible([this, this_instance_id, &pg, &ihref](auto map) {
        logger().debug("{}.{}: after wait_for_map", *this, this_instance_id);
-       return enter_stage<interruptor>(pp(pg).wait_for_active);
-      }).then_interruptible([this, this_instance_id, &pg]() {
+       return ihref.enter_stage<interruptor>(pp(pg).wait_for_active, *this);
+      }).then_interruptible([this, this_instance_id, &pg, &ihref]() {
        logger().debug(
          "{}.{}: after wait_for_active stage", *this, this_instance_id);
-       return with_blocking_event<
-         PGActivationBlocker::BlockingEvent
-         >([&pg] (auto&& trigger) {
-           return pg.wait_for_active_blocker.wait(std::move(trigger));
-         });
-      }).then_interruptible([this, pgref, this_instance_id]() mutable
+       return ihref.enter_blocker(
+         *this,
+         pg.wait_for_active_blocker,
+         &decltype(pg.wait_for_active_blocker)::wait);
+      }).then_interruptible([this, pgref, this_instance_id, &ihref]() mutable
                            -> interruptible_future<> {
        logger().debug(
          "{}.{}: after wait_for_active", *this, this_instance_id);
@@ -156,6 +151,7 @@ seastar::future<> ClientRequest::with_pg_int(
          return process_pg_op(pgref);
        } else {
          return process_op(
+           ihref,
            pgref
          ).then_interruptible([](auto){});
        }
@@ -167,7 +163,11 @@ seastar::future<> ClientRequest::with_pg_int(
     }, [this, this_instance_id, pgref](std::exception_ptr eptr) {
       // TODO: better debug output
       logger().debug("{}.{}: interrupted {}", *this, this_instance_id, eptr);
-    }, pgref).finally([opref=std::move(opref), pgref=std::move(pgref)] {});
+    }, pgref).finally(
+      [opref=std::move(opref), pgref=std::move(pgref),
+       instance_handle=std::move(instance_handle), &ihref] {
+      ihref.handle.exit();
+    });
 }
 
 seastar::future<> ClientRequest::with_pg(
@@ -193,16 +193,17 @@ ClientRequest::process_pg_op(
 }
 
 ClientRequest::interruptible_future<ClientRequest::seq_mode_t>
-ClientRequest::process_op(Ref<PG> &pg)
+ClientRequest::process_op(instance_handle_t &ihref, Ref<PG> &pg)
 {
-  return enter_stage<interruptor>(
-    pp(*pg).recover_missing
+  return ihref.enter_stage<interruptor>(
+    pp(*pg).recover_missing,
+    *this
   ).then_interruptible(
     [this, pg]() mutable {
     return do_recover_missing(pg, m->get_hobj());
-  }).then_interruptible([this, pg]() mutable {
+  }).then_interruptible([this, pg, &ihref]() mutable {
     return pg->already_complete(m->get_reqid()).then_unpack_interruptible(
-      [this, pg](bool completed, int ret) mutable
+      [this, pg, &ihref](bool completed, int ret) mutable
       -> PG::load_obc_iertr::future<seq_mode_t> {
       if (completed) {
         auto reply = crimson::make_message<MOSDOpReply>(
@@ -212,29 +213,35 @@ ClientRequest::process_op(Ref<PG> &pg)
           return seastar::make_ready_future<seq_mode_t>(seq_mode_t::OUT_OF_ORDER);
         });
       } else {
-        return enter_stage<interruptor>(pp(*pg).get_obc).then_interruptible(
-          [this, pg]() mutable -> PG::load_obc_iertr::future<seq_mode_t> {
+        return ihref.enter_stage<interruptor>(pp(*pg).get_obc, *this
+       ).then_interruptible(
+          [this, pg, &ihref]() mutable -> PG::load_obc_iertr::future<seq_mode_t> {
           logger().debug("{}: got obc lock", *this);
           op_info.set_from_op(&*m, *pg->get_osdmap());
           // XXX: `do_with()` is just a workaround for `with_obc_func_t` imposing
           // `future<void>`.
-          return seastar::do_with(seq_mode_t{}, [this, &pg] (seq_mode_t& mode) {
-            return pg->with_locked_obc(m->get_hobj(), op_info,
-                                       [this, pg, &mode](auto obc) mutable {
-              return enter_stage<interruptor>(pp(*pg).process).then_interruptible(
-              [this, pg, obc, &mode]() mutable {
-                return do_process(pg, obc).then_interruptible([&mode] (seq_mode_t _mode) {
-                  mode = _mode;
-                  return seastar::now();
-                });
-              });
-            }).safe_then_interruptible([&mode] {
-              return PG::load_obc_iertr::make_ready_future<seq_mode_t>(mode);
-            });
-          });
-        });
+          return seastar::do_with(
+           seq_mode_t{},
+           [this, &pg, &ihref](seq_mode_t& mode) {
+             return pg->with_locked_obc(
+               m->get_hobj(), op_info,
+               [this, pg, &mode, &ihref](auto obc) mutable {
+                 return ihref.enter_stage<interruptor>(pp(*pg).process, *this
+                 ).then_interruptible(
+                   [this, pg, obc, &mode, &ihref]() mutable {
+                     return do_process(ihref, pg, obc
+                     ).then_interruptible([&mode] (seq_mode_t _mode) {
+                       mode = _mode;
+                       return seastar::now();
+                     });
+                   });
+               }).safe_then_interruptible([&mode] {
+                 return PG::load_obc_iertr::make_ready_future<seq_mode_t>(mode);
+               });
+           });
+         });
       }
-    });
+      });
   }).safe_then_interruptible([pg=std::move(pg)] (const seq_mode_t mode) {
     return seastar::make_ready_future<seq_mode_t>(mode);
   }, PG::load_obc_ertr::all_same_way([](auto &code) {
@@ -258,7 +265,9 @@ auto ClientRequest::reply_op_error(Ref<PG>& pg, int err)
 }
 
 ClientRequest::interruptible_future<ClientRequest::seq_mode_t>
-ClientRequest::do_process(Ref<PG>& pg, crimson::osd::ObjectContextRef obc)
+ClientRequest::do_process(
+  instance_handle_t &ihref,
+  Ref<PG>& pg, crimson::osd::ObjectContextRef obc)
 {
   if (!pg->is_primary()) {
     // primary can handle both normal ops and balanced reads
@@ -291,25 +300,26 @@ ClientRequest::do_process(Ref<PG>& pg, crimson::osd::ObjectContextRef obc)
   }
 
   return pg->do_osd_ops(m, obc, op_info).safe_then_unpack_interruptible(
-    [this, pg](auto submitted, auto all_completed) mutable {
-    return submitted.then_interruptible([this, pg] {
-      return enter_stage<interruptor>(pp(*pg).wait_repop);
+    [this, pg, &ihref](auto submitted, auto all_completed) mutable {
+    return submitted.then_interruptible([this, pg, &ihref] {
+      return ihref.enter_stage<interruptor>(pp(*pg).wait_repop, *this);
     }).then_interruptible(
-      [this, pg, all_completed=std::move(all_completed)]() mutable {
+      [this, pg, all_completed=std::move(all_completed), &ihref]() mutable {
       return all_completed.safe_then_interruptible(
-        [this, pg](MURef<MOSDOpReply> reply) {
-        return enter_stage<interruptor>(pp(*pg).send_reply).then_interruptible(
+        [this, pg, &ihref](MURef<MOSDOpReply> reply) {
+       return ihref.enter_stage<interruptor>(pp(*pg).send_reply, *this
+       ).then_interruptible(
           [this, reply=std::move(reply)]() mutable {
           return conn->send(std::move(reply)).then([] {
             return seastar::make_ready_future<seq_mode_t>(seq_mode_t::IN_ORDER);
           });
         });
-      }, crimson::ct_error::eagain::handle([this, pg]() mutable {
-        return process_op(pg);
+      }, crimson::ct_error::eagain::handle([this, pg, &ihref]() mutable {
+        return process_op(ihref, pg);
       }));
     });
 }, crimson::ct_error::eagain::handle([this, pg]() mutable {
-    return process_op(pg);
}, crimson::ct_error::eagain::handle([this, pg, &ihref]() mutable {
+    return process_op(ihref, pg);
   }));
 }
 
index 1d8b521afbe9d7c0b635bd0eaea849815fb94a19..47143ac7ec0005fda25e77cf073b89f8162da49e 100644 (file)
@@ -18,6 +18,7 @@
 #include "crimson/osd/pg_activation_blocker.h"
 #include "crimson/osd/pg_map.h"
 #include "crimson/common/type_helpers.h"
+#include "crimson/common/utility.h"
 #include "messages/MOSDOp.h"
 
 namespace crimson::osd {
@@ -30,7 +31,6 @@ class ClientRequest final : public PhasedOperationT<ClientRequest>,
   OSD &osd;
   const crimson::net::ConnectionRef conn;
   // must be after conn due to ConnectionPipeline's life-time
-  PipelineHandle handle;
   Ref<MOSDOp> m;
   OpInfo op_info;
   seastar::promise<> on_complete;
@@ -52,6 +52,102 @@ public:
     friend class HistoricBackend;
   };
 
+  /**
+   * instance_handle_t
+   *
+   * Client request is, at present, the only Operation which can be requeued.
+   * This is, mostly, fine.  However, reusing the PipelineHandle or
+   * BlockingEvent structures before proving that the prior instance has stopped
+   * can create hangs or crashes due to violations of the BlockerT and
+   * PipelineHandle invariants.
+   *
+   * To solve this, we create an instance_handle_t which contains the events
+   * for the portion of execution that can be rerun as well as the
+   * PipelineHandle.  ClientRequest::with_pg_int grabs a reference to the current
+   * instance_handle_t and releases its PipelineHandle in the finally block.
+   * On requeue, we create a new instance_handle_t with a fresh PipelineHandle
+   * and events tuple and use it and use it for the next invocation of
+   * with_pg_int.
+   */
+  std::tuple<
+    StartEvent,
+    ConnectionPipeline::AwaitActive::BlockingEvent,
+    ConnectionPipeline::AwaitMap::BlockingEvent,
+    OSD_OSDMapGate::OSDMapBlocker::BlockingEvent,
+    ConnectionPipeline::GetPG::BlockingEvent,
+    PGMap::PGCreationBlockingEvent,
+    CompletionEvent
+  > tracking_events;
+
+  class instance_handle_t
+    : public seastar::enable_lw_shared_from_this<instance_handle_t> {
+  public:
+    using ref_t = seastar::lw_shared_ptr<instance_handle_t>;
+    PipelineHandle handle;
+
+    std::tuple<
+      PGPipeline::AwaitMap::BlockingEvent,
+      PG_OSDMapGate::OSDMapBlocker::BlockingEvent,
+      PGPipeline::WaitForActive::BlockingEvent,
+      PGActivationBlocker::BlockingEvent,
+      PGPipeline::RecoverMissing::BlockingEvent,
+      PGPipeline::GetOBC::BlockingEvent,
+      PGPipeline::Process::BlockingEvent,
+      PGPipeline::WaitRepop::BlockingEvent,
+      PGPipeline::SendReply::BlockingEvent,
+      CompletionEvent
+      > pg_tracking_events;
+
+    template <typename BlockingEventT, typename InterruptorT=void, typename F>
+    auto with_blocking_event(F &&f, ClientRequest &op) {
+      auto ret = std::forward<F>(f)(
+       typename BlockingEventT::template Trigger<ClientRequest>{
+         std::get<BlockingEventT>(pg_tracking_events), op
+       });
+      if constexpr (std::is_same_v<InterruptorT, void>) {
+       return ret;
+      } else {
+       using ret_t = decltype(ret);
+       return typename InterruptorT::template futurize_t<ret_t>{std::move(ret)};
+      }
+    }
+
+    template <typename InterruptorT=void, typename StageT>
+    auto enter_stage(StageT &stage, ClientRequest &op) {
+      return this->template with_blocking_event<
+       typename StageT::BlockingEvent,
+       InterruptorT>(
+         [&stage, this](auto &&trigger) {
+           return handle.template enter<ClientRequest>(
+             stage, std::move(trigger));
+         }, op);
+    }
+
+    template <
+      typename InterruptorT=void, typename BlockingObj, typename Method,
+      typename... Args>
+    auto enter_blocker(
+      ClientRequest &op, BlockingObj &obj, Method method, Args&&... args) {
+      return this->template with_blocking_event<
+       typename BlockingObj::Blocker::BlockingEvent,
+       InterruptorT>(
+         [&obj, method,
+          args=std::forward_as_tuple(std::move(args)...)](auto &&trigger) mutable {
+           return apply_method_to_tuple(
+             obj, method,
+             std::tuple_cat(
+               std::forward_as_tuple(std::move(trigger)),
+               std::move(args))
+           );
+         }, op);
+    }
+  };
+  instance_handle_t::ref_t instance_handle;
+  void reset_instance_handle() {
+    instance_handle = seastar::make_lw_shared<instance_handle_t>();
+  }
+  auto get_instance_handle() { return instance_handle; }
+
   using ordering_hook_t = boost::intrusive::list_member_hook<>;
   ordering_hook_t ordering_hook;
   class Orderer {
@@ -93,7 +189,7 @@ public:
     return m->get_spg();
   }
   ConnectionPipeline &get_connection_pipeline();
-  PipelineHandle &get_handle() { return handle; }
+  PipelineHandle &get_handle() { return instance_handle->handle; }
   epoch_t get_epoch() const { return m->get_min_epoch(); }
 
   seastar::future<> with_pg_int(
@@ -114,6 +210,7 @@ private:
   };
 
   interruptible_future<seq_mode_t> do_process(
+    instance_handle_t &ihref,
     Ref<PG>& pg,
     crimson::osd::ObjectContextRef obc);
   ::crimson::interruptible::interruptible_future<
@@ -121,7 +218,8 @@ private:
     Ref<PG> &pg);
   ::crimson::interruptible::interruptible_future<
     ::crimson::osd::IOInterruptCondition, seq_mode_t> process_op(
-    Ref<PG> &pg);
+      instance_handle_t &ihref,
+      Ref<PG> &pg);
   bool is_pg_op() const;
 
   ConnectionPipeline &cp();
@@ -136,24 +234,6 @@ private:
   bool is_misdirected(const PG& pg) const;
 
 public:
-  std::tuple<
-    StartEvent,
-    ConnectionPipeline::AwaitActive::BlockingEvent,
-    ConnectionPipeline::AwaitMap::BlockingEvent,
-    OSD_OSDMapGate::OSDMapBlocker::BlockingEvent,
-    ConnectionPipeline::GetPG::BlockingEvent,
-    PGMap::PGCreationBlockingEvent,
-    PGPipeline::AwaitMap::BlockingEvent,
-    PG_OSDMapGate::OSDMapBlocker::BlockingEvent,
-    PGPipeline::WaitForActive::BlockingEvent,
-    PGActivationBlocker::BlockingEvent,
-    PGPipeline::RecoverMissing::BlockingEvent,
-    PGPipeline::GetOBC::BlockingEvent,
-    PGPipeline::Process::BlockingEvent,
-    PGPipeline::WaitRepop::BlockingEvent,
-    PGPipeline::SendReply::BlockingEvent,
-    CompletionEvent
-  > tracking_events;
 
   friend class LttngBackend;
   friend class HistoricBackend;
index 9a7d542db3e3954ae816337ddc9ae3fdd8c6ddda..d76c4b82f37c67c6927c98268e9cd81a0f41d179 100644 (file)
@@ -45,6 +45,7 @@ public:
 
     void dump_detail(Formatter *f) const final;
   };
+  using Blocker = OSDMapBlocker;
 
 private:
   // order the promises in ascending order of the waited osdmap epoch,
index fd07d9b41410b81aebd72e197444af4f972f5537..fff8219d11354552ade0ac1c19ed2398f5aa11e8 100644 (file)
@@ -24,6 +24,7 @@ protected:
 
 public:
   static constexpr const char *type_name = "PGActivationBlocker";
+  using Blocker = PGActivationBlocker;
 
   PGActivationBlocker(PG *pg) : pg(pg) {}
   void unblock();