From: Xuehan Xu Date: Thu, 22 Dec 2022 09:16:39 +0000 (+0800) Subject: crimson/osd/osd_operations: run peering_state related operations in a seastar::thread X-Git-Tag: v18.1.0~535^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=bc23cf816f534411b32e1e1d808982045e40ea47;p=ceph.git crimson/osd/osd_operations: run peering_state related operations in a seastar::thread Some peering events handling may involve seastar::future::wait(), so we need to run the peering state machine in a dedicated seastar::thread Signed-off-by: Xuehan Xu --- diff --git a/src/crimson/osd/osd_operations/peering_event.cc b/src/crimson/osd/osd_operations/peering_event.cc index 2a82a077cbd3..219cfcb453fb 100644 --- a/src/crimson/osd/osd_operations/peering_event.cc +++ b/src/crimson/osd/osd_operations/peering_event.cc @@ -89,9 +89,11 @@ seastar::future<> PeeringEvent::with_pg( return this->template enter_stage( BackfillRecovery::bp(*pg).process); }).then_interruptible([this, pg, &shard_services] { - pg->do_peering_event(evt, ctx); - that()->get_handle().exit(); - return complete_rctx(shard_services, pg); + return pg->do_peering_event(evt, ctx + ).then_interruptible([this, pg, &shard_services] { + that()->get_handle().exit(); + return complete_rctx(shard_services, pg); + }); }).then_interruptible([pg, &shard_services]() -> typename T::template interruptible_future<> { if (!pg->get_need_up_thru()) { diff --git a/src/crimson/osd/osd_operations/pg_advance_map.cc b/src/crimson/osd/osd_operations/pg_advance_map.cc index 57477105b8f9..2514a0e74814 100644 --- a/src/crimson/osd/osd_operations/pg_advance_map.cc +++ b/src/crimson/osd/osd_operations/pg_advance_map.cc @@ -63,39 +63,45 @@ seastar::future<> PGAdvanceMap::start() pg->peering_request_pg_pipeline.process ).then([this] { from = pg->get_osdmap_epoch(); + auto fut = seastar::now(); if (do_init) { - pg->handle_initialize(rctx); - pg->handle_activate_map(rctx); - } - return seastar::do_for_each( - boost::make_counting_iterator(*from + 1), - boost::make_counting_iterator(to + 1), - [this](epoch_t next_epoch) { - return shard_services.get_map(next_epoch).then( - [this] (cached_map_t&& next_map) { - logger().debug("{}: advancing map to {}", - *this, next_map->get_epoch()); - pg->handle_advance_map(next_map, rctx); - }); - }).then([this] { - pg->handle_activate_map(rctx); - logger().debug("{}: map activated", *this); - if (do_init) { - shard_services.pg_created(pg->get_pgid(), pg); - logger().info("PGAdvanceMap::start new pg {}", *pg); - } - return seastar::when_all_succeed( - pg->get_need_up_thru() - ? shard_services.send_alive( - pg->get_same_interval_since()) - : seastar::now(), - shard_services.dispatch_context( - pg->get_collection_ref(), - std::move(rctx))); - }).then_unpack([this] { - logger().debug("{}: sending pg temp", *this); - return shard_services.send_pg_temp(); + fut = pg->handle_initialize(rctx + ).then([this] { + return pg->handle_activate_map(rctx); }); + } + return fut.then([this] { + return seastar::do_for_each( + boost::make_counting_iterator(*from + 1), + boost::make_counting_iterator(to + 1), + [this](epoch_t next_epoch) { + return shard_services.get_map(next_epoch).then( + [this] (cached_map_t&& next_map) { + logger().debug("{}: advancing map to {}", + *this, next_map->get_epoch()); + return pg->handle_advance_map(next_map, rctx); + }); + }).then([this] { + return pg->handle_activate_map(rctx).then([this] { + logger().debug("{}: map activated", *this); + if (do_init) { + shard_services.pg_created(pg->get_pgid(), pg); + logger().info("PGAdvanceMap::start new pg {}", *pg); + } + return seastar::when_all_succeed( + pg->get_need_up_thru() + ? shard_services.send_alive( + pg->get_same_interval_since()) + : seastar::now(), + shard_services.dispatch_context( + pg->get_collection_ref(), + std::move(rctx))); + }); + }).then_unpack([this] { + logger().debug("{}: sending pg temp", *this); + return shard_services.send_pg_temp(); + }); + }); }).then([this, ref=std::move(ref)] { logger().debug("{}: complete", *this); }); diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index e80782eb51ed..eb2227554f1d 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -494,49 +494,61 @@ seastar::future<> PG::read_state(crimson::os::FuturizedStore* store) }); } -void PG::do_peering_event( +PG::interruptible_future<> PG::do_peering_event( PGPeeringEvent& evt, PeeringCtx &rctx) { if (peering_state.pg_has_reset_since(evt.get_epoch_requested()) || peering_state.pg_has_reset_since(evt.get_epoch_sent())) { logger().debug("{} ignoring {} -- pg has reset", __func__, evt.get_desc()); + return interruptor::now(); } else { logger().debug("{} handling {} for pg: {}", __func__, evt.get_desc(), pgid); - peering_state.handle_event( - evt.get_event(), - &rctx); - peering_state.write_if_dirty(rctx.transaction); + // all peering event handling needs to be run in a dedicated seastar::thread, + // so that event processing can involve I/O reqs freely, for example: PG::on_removal, + // PG::on_new_interval + return interruptor::async([this, &evt, &rctx] { + peering_state.handle_event( + evt.get_event(), + &rctx); + peering_state.write_if_dirty(rctx.transaction); + }); } } -void PG::handle_advance_map( +seastar::future<> PG::handle_advance_map( cached_map_t next_map, PeeringCtx &rctx) { - vector newup, newacting; - int up_primary, acting_primary; - next_map->pg_to_up_acting_osds( - pgid.pgid, - &newup, &up_primary, - &newacting, &acting_primary); - peering_state.advance_map( - next_map, - peering_state.get_osdmap(), - newup, - up_primary, - newacting, - acting_primary, - rctx); - osdmap_gate.got_map(next_map->get_epoch()); + return seastar::async([this, next_map=std::move(next_map), &rctx] { + vector newup, newacting; + int up_primary, acting_primary; + next_map->pg_to_up_acting_osds( + pgid.pgid, + &newup, &up_primary, + &newacting, &acting_primary); + peering_state.advance_map( + next_map, + peering_state.get_osdmap(), + newup, + up_primary, + newacting, + acting_primary, + rctx); + osdmap_gate.got_map(next_map->get_epoch()); + }); } -void PG::handle_activate_map(PeeringCtx &rctx) +seastar::future<> PG::handle_activate_map(PeeringCtx &rctx) { - peering_state.activate_map(rctx); + return seastar::async([this, &rctx] { + peering_state.activate_map(rctx); + }); } -void PG::handle_initialize(PeeringCtx &rctx) +seastar::future<> PG::handle_initialize(PeeringCtx &rctx) { - peering_state.handle_event(PeeringState::Initialize{}, &rctx); + return seastar::async([this, &rctx] { + peering_state.handle_event(PeeringState::Initialize{}, &rctx); + }); } diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index a353634ba19f..11e5a4ae8adb 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -497,12 +497,12 @@ public: seastar::future<> read_state(crimson::os::FuturizedStore* store); - void do_peering_event( + interruptible_future<> do_peering_event( PGPeeringEvent& evt, PeeringCtx &rctx); - void handle_advance_map(cached_map_t next_map, PeeringCtx &rctx); - void handle_activate_map(PeeringCtx &rctx); - void handle_initialize(PeeringCtx &rctx); + seastar::future<> handle_advance_map(cached_map_t next_map, PeeringCtx &rctx); + seastar::future<> handle_activate_map(PeeringCtx &rctx); + seastar::future<> handle_initialize(PeeringCtx &rctx); static hobject_t get_oid(const hobject_t& hobj); static RWState::State get_lock_type(const OpInfo &op_info);