]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd/osd_operations: run peering_state related operations in a seastar::thread
authorXuehan Xu <xxhdx1985126@gmail.com>
Thu, 22 Dec 2022 09:16:39 +0000 (17:16 +0800)
committerXuehan Xu <xxhdx1985126@gmail.com>
Fri, 6 Jan 2023 07:42:20 +0000 (07:42 +0000)
Some peering events handling may involve seastar::future::wait(), so we need to run
the peering state machine in a dedicated seastar::thread

Signed-off-by: Xuehan Xu <xxhdx1985126@gmail.com>
src/crimson/osd/osd_operations/peering_event.cc
src/crimson/osd/osd_operations/pg_advance_map.cc
src/crimson/osd/pg.cc
src/crimson/osd/pg.h

index 2a82a077cbd34346dc0d91dea1bd602491087b63..219cfcb453fb3d109f8332c05bb73d7868164247 100644 (file)
@@ -89,9 +89,11 @@ seastar::future<> PeeringEvent<T>::with_pg(
       return this->template enter_stage<interruptor>(
        BackfillRecovery::bp(*pg).process);
     }).then_interruptible([this, pg, &shard_services] {
-      pg->do_peering_event(evt, ctx);
-      that()->get_handle().exit();
-      return complete_rctx(shard_services, pg);
+      return pg->do_peering_event(evt, ctx
+      ).then_interruptible([this, pg, &shard_services] {
+       that()->get_handle().exit();
+       return complete_rctx(shard_services, pg);
+      });
     }).then_interruptible([pg, &shard_services]()
                          -> typename T::template interruptible_future<> {
       if (!pg->get_need_up_thru()) {
index 57477105b8f93284d0c26259637c0c45fbb1cddb..2514a0e748145810bdb7a0799f4b70a7dc5589d3 100644 (file)
@@ -63,39 +63,45 @@ seastar::future<> PGAdvanceMap::start()
     pg->peering_request_pg_pipeline.process
   ).then([this] {
     from = pg->get_osdmap_epoch();
+    auto fut = seastar::now();
     if (do_init) {
-      pg->handle_initialize(rctx);
-      pg->handle_activate_map(rctx);
-    }
-    return seastar::do_for_each(
-      boost::make_counting_iterator(*from + 1),
-      boost::make_counting_iterator(to + 1),
-      [this](epoch_t next_epoch) {
-        return shard_services.get_map(next_epoch).then(
-          [this] (cached_map_t&& next_map) {
-            logger().debug("{}: advancing map to {}",
-                           *this, next_map->get_epoch());
-            pg->handle_advance_map(next_map, rctx);
-          });
-      }).then([this] {
-        pg->handle_activate_map(rctx);
-        logger().debug("{}: map activated", *this);
-        if (do_init) {
-          shard_services.pg_created(pg->get_pgid(), pg);
-          logger().info("PGAdvanceMap::start new pg {}", *pg);
-        }
-        return seastar::when_all_succeed(
-          pg->get_need_up_thru()
-         ? shard_services.send_alive(
-           pg->get_same_interval_since())
-         : seastar::now(),
-          shard_services.dispatch_context(
-            pg->get_collection_ref(),
-            std::move(rctx)));
-      }).then_unpack([this] {
-        logger().debug("{}: sending pg temp", *this);
-        return shard_services.send_pg_temp();
+      fut = pg->handle_initialize(rctx
+      ).then([this] {
+       return pg->handle_activate_map(rctx);
       });
+    }
+    return fut.then([this] {
+      return seastar::do_for_each(
+       boost::make_counting_iterator(*from + 1),
+       boost::make_counting_iterator(to + 1),
+       [this](epoch_t next_epoch) {
+         return shard_services.get_map(next_epoch).then(
+           [this] (cached_map_t&& next_map) {
+             logger().debug("{}: advancing map to {}",
+                            *this, next_map->get_epoch());
+             return pg->handle_advance_map(next_map, rctx);
+           });
+       }).then([this] {
+         return pg->handle_activate_map(rctx).then([this] {
+           logger().debug("{}: map activated", *this);
+           if (do_init) {
+             shard_services.pg_created(pg->get_pgid(), pg);
+             logger().info("PGAdvanceMap::start new pg {}", *pg);
+           }
+           return seastar::when_all_succeed(
+             pg->get_need_up_thru()
+             ? shard_services.send_alive(
+               pg->get_same_interval_since())
+             : seastar::now(),
+             shard_services.dispatch_context(
+               pg->get_collection_ref(),
+               std::move(rctx)));
+         });
+       }).then_unpack([this] {
+         logger().debug("{}: sending pg temp", *this);
+         return shard_services.send_pg_temp();
+       });
+    });
   }).then([this, ref=std::move(ref)] {
     logger().debug("{}: complete", *this);
   });
index e80782eb51eddd8d7dc7d571719d7988ec44cba6..eb2227554f1d0d5625838b3b1da2881740f0d806 100644 (file)
@@ -494,49 +494,61 @@ seastar::future<> PG::read_state(crimson::os::FuturizedStore* store)
   });
 }
 
-void PG::do_peering_event(
+PG::interruptible_future<> PG::do_peering_event(
   PGPeeringEvent& evt, PeeringCtx &rctx)
 {
   if (peering_state.pg_has_reset_since(evt.get_epoch_requested()) ||
       peering_state.pg_has_reset_since(evt.get_epoch_sent())) {
     logger().debug("{} ignoring {} -- pg has reset", __func__, evt.get_desc());
+    return interruptor::now();
   } else {
     logger().debug("{} handling {} for pg: {}", __func__, evt.get_desc(), pgid);
-    peering_state.handle_event(
-      evt.get_event(),
-      &rctx);
-    peering_state.write_if_dirty(rctx.transaction);
+    // all peering event handling needs to be run in a dedicated seastar::thread,
+    // so that event processing can involve I/O reqs freely, for example: PG::on_removal,
+    // PG::on_new_interval
+    return interruptor::async([this, &evt, &rctx] {
+      peering_state.handle_event(
+        evt.get_event(),
+        &rctx);
+      peering_state.write_if_dirty(rctx.transaction);
+    });
   }
 }
 
-void PG::handle_advance_map(
+seastar::future<> PG::handle_advance_map(
   cached_map_t next_map, PeeringCtx &rctx)
 {
-  vector<int> newup, newacting;
-  int up_primary, acting_primary;
-  next_map->pg_to_up_acting_osds(
-    pgid.pgid,
-    &newup, &up_primary,
-    &newacting, &acting_primary);
-  peering_state.advance_map(
-    next_map,
-    peering_state.get_osdmap(),
-    newup,
-    up_primary,
-    newacting,
-    acting_primary,
-    rctx);
-  osdmap_gate.got_map(next_map->get_epoch());
+  return seastar::async([this, next_map=std::move(next_map), &rctx] {
+    vector<int> newup, newacting;
+    int up_primary, acting_primary;
+    next_map->pg_to_up_acting_osds(
+      pgid.pgid,
+      &newup, &up_primary,
+      &newacting, &acting_primary);
+    peering_state.advance_map(
+      next_map,
+      peering_state.get_osdmap(),
+      newup,
+      up_primary,
+      newacting,
+      acting_primary,
+      rctx);
+    osdmap_gate.got_map(next_map->get_epoch());
+  });
 }
 
-void PG::handle_activate_map(PeeringCtx &rctx)
+seastar::future<> PG::handle_activate_map(PeeringCtx &rctx)
 {
-  peering_state.activate_map(rctx);
+  return seastar::async([this, &rctx] {
+    peering_state.activate_map(rctx);
+  });
 }
 
-void PG::handle_initialize(PeeringCtx &rctx)
+seastar::future<> PG::handle_initialize(PeeringCtx &rctx)
 {
-  peering_state.handle_event(PeeringState::Initialize{}, &rctx);
+  return seastar::async([this, &rctx] {
+    peering_state.handle_event(PeeringState::Initialize{}, &rctx);
+  });
 }
 
 
index a353634ba19f01914863aa3281a03ef05428a127..11e5a4ae8adbe46460cce52d0d3a7d7e83c51ab4 100644 (file)
@@ -497,12 +497,12 @@ public:
 
   seastar::future<> read_state(crimson::os::FuturizedStore* store);
 
-  void do_peering_event(
+  interruptible_future<> do_peering_event(
     PGPeeringEvent& evt, PeeringCtx &rctx);
 
-  void handle_advance_map(cached_map_t next_map, PeeringCtx &rctx);
-  void handle_activate_map(PeeringCtx &rctx);
-  void handle_initialize(PeeringCtx &rctx);
+  seastar::future<> handle_advance_map(cached_map_t next_map, PeeringCtx &rctx);
+  seastar::future<> handle_activate_map(PeeringCtx &rctx);
+  seastar::future<> handle_initialize(PeeringCtx &rctx);
 
   static hobject_t get_oid(const hobject_t& hobj);
   static RWState::State get_lock_type(const OpInfo &op_info);