]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd/pg: limit usage of async for peering state machine
authorSamuel Just <sjust@redhat.com>
Thu, 27 Jun 2024 20:41:23 +0000 (20:41 +0000)
committerMatan Breizman <mbreizma@redhat.com>
Thu, 25 Jul 2024 07:54:19 +0000 (10:54 +0300)
See comment and https://tracker.ceph.com/issues/66708.

Signed-off-by: Samuel Just <sjust@redhat.com>
(cherry picked from commit 0f32fc6080075e937e4cba752fd14bf3cc3be064)

src/crimson/osd/osd_operations/peering_event.cc
src/crimson/osd/osd_operations/pg_advance_map.cc
src/crimson/osd/pg.cc
src/crimson/osd/pg.h

index 6c1afd3d3360f888a482fb80e613bad072315a7a..2190f5fa2d89a986478744cedaf7c0a968cf43d4 100644 (file)
@@ -89,8 +89,22 @@ seastar::future<> PeeringEvent<T>::with_pg(
     }).then_interruptible([this, pg](auto) {
       return this->template enter_stage<interruptor>(peering_pp(*pg).process);
     }).then_interruptible([this, pg, &shard_services] {
-      return pg->do_peering_event(evt, ctx
-      ).then_interruptible([this] {
+      /* The DeleteSome event invokes PeeringListener::do_delete_work, which
+       * needs to return (without a future) the object to start with on the next
+       * call.  As a consequence, crimson's do_delete_work implementation needs
+       * to use get() for the object listing.  To support that, we wrap
+       * PG::do_peering_event with interruptor::async here.
+       *
+       * Otherwise, it's not ok to yield during peering event handler. Doing so
+       * allows other continuations to observe PeeringState in the middle
+       * of, for instance, a map advance.  The interface *does not* support such
+       * usage.  DeleteSome happens not to trigger that problem so it's ok for
+       * now, but we'll want to remove that as well.
+       * https://tracker.ceph.com/issues/66708
+       */
+      return interruptor::async([this, pg] {
+       pg->do_peering_event(evt, ctx);
+      }).then_interruptible([this] {
        return that()->get_handle().complete();
       }).then_interruptible([this, pg, &shard_services] {
        return complete_rctx(shard_services, pg);
index 832794a56003fd50ce76a545977746ed5f81e2b9..c6c6b5cdabf3f347aaafdf3f564e910d06b7a810 100644 (file)
@@ -80,48 +80,43 @@ seastar::future<> PGAdvanceMap::start()
      * See: https://tracker.ceph.com/issues/61744
      */
     from = pg->get_osdmap_epoch();
-    auto fut = seastar::now();
     if (do_init) {
-      fut = pg->handle_initialize(rctx
-      ).then([this] {
-       return pg->handle_activate_map(rctx);
-      });
+      pg->handle_initialize(rctx);
+      pg->handle_activate_map(rctx);
     }
-    return fut.then([this] {
-      ceph_assert(std::cmp_less_equal(*from, to));
-      return seastar::do_for_each(
-       boost::make_counting_iterator(*from + 1),
-       boost::make_counting_iterator(to + 1),
-       [this](epoch_t next_epoch) {
-         logger().debug("{}: start: getting map {}",
-                        *this, next_epoch);
-         return shard_services.get_map(next_epoch).then(
-           [this] (cached_map_t&& next_map) {
-             logger().debug("{}: advancing map to {}",
-                            *this, next_map->get_epoch());
-             return pg->handle_advance_map(next_map, rctx);
-           });
-       }).then([this] {
-         return pg->handle_activate_map(rctx).then([this] {
-           logger().debug("{}: map activated", *this);
-           if (do_init) {
-             shard_services.pg_created(pg->get_pgid(), pg);
-             logger().info("PGAdvanceMap::start new pg {}", *pg);
-           }
-           return seastar::when_all_succeed(
-             pg->get_need_up_thru()
-             ? shard_services.send_alive(
-               pg->get_same_interval_since())
-             : seastar::now(),
-             shard_services.dispatch_context(
-               pg->get_collection_ref(),
-               std::move(rctx)));
+    ceph_assert(std::cmp_less_equal(*from, to));
+    return seastar::do_for_each(
+      boost::make_counting_iterator(*from + 1),
+      boost::make_counting_iterator(to + 1),
+      [this](epoch_t next_epoch) {
+       logger().debug("{}: start: getting map {}",
+                      *this, next_epoch);
+       return shard_services.get_map(next_epoch).then(
+         [this] (cached_map_t&& next_map) {
+           logger().debug("{}: advancing map to {}",
+                          *this, next_map->get_epoch());
+           pg->handle_advance_map(next_map, rctx);
+           return seastar::now();
          });
-       }).then_unpack([this] {
-         logger().debug("{}: sending pg temp", *this);
-         return shard_services.send_pg_temp();
-       });
-    });
+      }).then([this] {
+       pg->handle_activate_map(rctx);
+       logger().debug("{}: map activated", *this);
+       if (do_init) {
+         shard_services.pg_created(pg->get_pgid(), pg);
+         logger().info("PGAdvanceMap::start new pg {}", *pg);
+       }
+       return seastar::when_all_succeed(
+         pg->get_need_up_thru()
+         ? shard_services.send_alive(
+           pg->get_same_interval_since())
+         : seastar::now(),
+         shard_services.dispatch_context(
+           pg->get_collection_ref(),
+           std::move(rctx)));
+      }).then_unpack([this] {
+       logger().debug("{}: sending pg temp", *this);
+       return shard_services.send_pg_temp();
+      });
   }).then([this] {
     logger().debug("{}: complete", *this);
     return handle.complete();
index ad777c49d60118b66973abd10116d61196173294..02f4a67433b2f6e9af4dd613c2f9f56ee38c3ef8 100644 (file)
@@ -704,61 +704,49 @@ seastar::future<> PG::read_state(crimson::os::FuturizedStore::Shard* store)
   });
 }
 
-PG::interruptible_future<> PG::do_peering_event(
+void PG::do_peering_event(
   PGPeeringEvent& evt, PeeringCtx &rctx)
 {
   if (peering_state.pg_has_reset_since(evt.get_epoch_requested()) ||
       peering_state.pg_has_reset_since(evt.get_epoch_sent())) {
     logger().debug("{} ignoring {} -- pg has reset", __func__, evt.get_desc());
-    return interruptor::now();
   } else {
     logger().debug("{} handling {} for pg: {}", __func__, evt.get_desc(), pgid);
-    // all peering event handling needs to be run in a dedicated seastar::thread,
-    // so that event processing can involve I/O reqs freely, for example: PG::on_removal,
-    // PG::on_new_interval
-    return interruptor::async([this, &evt, &rctx] {
-      peering_state.handle_event(
-        evt.get_event(),
-        &rctx);
-      peering_state.write_if_dirty(rctx.transaction);
-    });
+    peering_state.handle_event(
+      evt.get_event(),
+      &rctx);
+    peering_state.write_if_dirty(rctx.transaction);
   }
 }
 
-seastar::future<> PG::handle_advance_map(
+void PG::handle_advance_map(
   cached_map_t next_map, PeeringCtx &rctx)
 {
-  return seastar::async([this, next_map=std::move(next_map), &rctx] {
-    vector<int> newup, newacting;
-    int up_primary, acting_primary;
-    next_map->pg_to_up_acting_osds(
-      pgid.pgid,
-      &newup, &up_primary,
-      &newacting, &acting_primary);
-    peering_state.advance_map(
-      next_map,
-      peering_state.get_osdmap(),
-      newup,
-      up_primary,
-      newacting,
-      acting_primary,
-      rctx);
-    osdmap_gate.got_map(next_map->get_epoch());
-  });
-}
-
-seastar::future<> PG::handle_activate_map(PeeringCtx &rctx)
+  vector<int> newup, newacting;
+  int up_primary, acting_primary;
+  next_map->pg_to_up_acting_osds(
+    pgid.pgid,
+    &newup, &up_primary,
+    &newacting, &acting_primary);
+  peering_state.advance_map(
+    next_map,
+    peering_state.get_osdmap(),
+    newup,
+    up_primary,
+    newacting,
+    acting_primary,
+    rctx);
+  osdmap_gate.got_map(next_map->get_epoch());
+}
+
+void PG::handle_activate_map(PeeringCtx &rctx)
 {
-  return seastar::async([this, &rctx] {
-    peering_state.activate_map(rctx);
-  });
+  peering_state.activate_map(rctx);
 }
 
-seastar::future<> PG::handle_initialize(PeeringCtx &rctx)
+void PG::handle_initialize(PeeringCtx &rctx)
 {
-  return seastar::async([this, &rctx] {
-    peering_state.handle_event(PeeringState::Initialize{}, &rctx);
-  });
+  peering_state.handle_event(PeeringState::Initialize{}, &rctx);
 }
 
 void PG::init_collection_pool_opts()
index 92d05e06e6d1c85a0d01714e4c2811fce0d6998f..571bae3a7ee016659e90eceeff64654fe5082172 100644 (file)
@@ -481,12 +481,11 @@ public:
 
   seastar::future<> read_state(crimson::os::FuturizedStore::Shard* store);
 
-  interruptible_future<> do_peering_event(
-    PGPeeringEvent& evt, PeeringCtx &rctx);
+  void do_peering_event(PGPeeringEvent& evt, PeeringCtx &rctx);
 
-  seastar::future<> handle_advance_map(cached_map_t next_map, PeeringCtx &rctx);
-  seastar::future<> handle_activate_map(PeeringCtx &rctx);
-  seastar::future<> handle_initialize(PeeringCtx &rctx);
+  void handle_advance_map(cached_map_t next_map, PeeringCtx &rctx);
+  void handle_activate_map(PeeringCtx &rctx);
+  void handle_initialize(PeeringCtx &rctx);
 
   static hobject_t get_oid(const hobject_t& hobj);
   static RWState::State get_lock_type(const OpInfo &op_info);