}).then_interruptible([this, pg](auto) {
return this->template enter_stage<interruptor>(peering_pp(*pg).process);
}).then_interruptible([this, pg, &shard_services] {
- return pg->do_peering_event(evt, ctx
- ).then_interruptible([this] {
+ /* The DeleteSome event invokes PeeringListener::do_delete_work, which
+ * needs to return (without a future) the object to start with on the next
+ * call. As a consequence, crimson's do_delete_work implementation needs
+ * to use get() for the object listing. To support that, we wrap
+ * PG::do_peering_event with interruptor::async here.
+ *
+ * Otherwise, it's not ok to yield during peering event handler. Doing so
+ * allows other continuations to observe PeeringState in the middle
+ * of, for instance, a map advance. The interface *does not* support such
+ * usage. DeleteSome happens not to trigger that problem so it's ok for
+ * now, but we'll want to remove that as well.
+ * https://tracker.ceph.com/issues/66708
+ */
+ return interruptor::async([this, pg] {
+ pg->do_peering_event(evt, ctx);
+ }).then_interruptible([this] {
return that()->get_handle().complete();
}).then_interruptible([this, pg, &shard_services] {
return complete_rctx(shard_services, pg);
* See: https://tracker.ceph.com/issues/61744
*/
from = pg->get_osdmap_epoch();
- auto fut = seastar::now();
if (do_init) {
- fut = pg->handle_initialize(rctx
- ).then([this] {
- return pg->handle_activate_map(rctx);
- });
+ pg->handle_initialize(rctx);
+ pg->handle_activate_map(rctx);
}
- return fut.then([this] {
- ceph_assert(std::cmp_less_equal(*from, to));
- return seastar::do_for_each(
- boost::make_counting_iterator(*from + 1),
- boost::make_counting_iterator(to + 1),
- [this](epoch_t next_epoch) {
- logger().debug("{}: start: getting map {}",
- *this, next_epoch);
- return shard_services.get_map(next_epoch).then(
- [this] (cached_map_t&& next_map) {
- logger().debug("{}: advancing map to {}",
- *this, next_map->get_epoch());
- return pg->handle_advance_map(next_map, rctx);
- });
- }).then([this] {
- return pg->handle_activate_map(rctx).then([this] {
- logger().debug("{}: map activated", *this);
- if (do_init) {
- shard_services.pg_created(pg->get_pgid(), pg);
- logger().info("PGAdvanceMap::start new pg {}", *pg);
- }
- return seastar::when_all_succeed(
- pg->get_need_up_thru()
- ? shard_services.send_alive(
- pg->get_same_interval_since())
- : seastar::now(),
- shard_services.dispatch_context(
- pg->get_collection_ref(),
- std::move(rctx)));
+ ceph_assert(std::cmp_less_equal(*from, to));
+ return seastar::do_for_each(
+ boost::make_counting_iterator(*from + 1),
+ boost::make_counting_iterator(to + 1),
+ [this](epoch_t next_epoch) {
+ logger().debug("{}: start: getting map {}",
+ *this, next_epoch);
+ return shard_services.get_map(next_epoch).then(
+ [this] (cached_map_t&& next_map) {
+ logger().debug("{}: advancing map to {}",
+ *this, next_map->get_epoch());
+ pg->handle_advance_map(next_map, rctx);
+ return seastar::now();
});
- }).then_unpack([this] {
- logger().debug("{}: sending pg temp", *this);
- return shard_services.send_pg_temp();
- });
- });
+ }).then([this] {
+ pg->handle_activate_map(rctx);
+ logger().debug("{}: map activated", *this);
+ if (do_init) {
+ shard_services.pg_created(pg->get_pgid(), pg);
+ logger().info("PGAdvanceMap::start new pg {}", *pg);
+ }
+ return seastar::when_all_succeed(
+ pg->get_need_up_thru()
+ ? shard_services.send_alive(
+ pg->get_same_interval_since())
+ : seastar::now(),
+ shard_services.dispatch_context(
+ pg->get_collection_ref(),
+ std::move(rctx)));
+ }).then_unpack([this] {
+ logger().debug("{}: sending pg temp", *this);
+ return shard_services.send_pg_temp();
+ });
}).then([this] {
logger().debug("{}: complete", *this);
return handle.complete();
});
}
-PG::interruptible_future<> PG::do_peering_event(
+void PG::do_peering_event(
PGPeeringEvent& evt, PeeringCtx &rctx)
{
if (peering_state.pg_has_reset_since(evt.get_epoch_requested()) ||
peering_state.pg_has_reset_since(evt.get_epoch_sent())) {
logger().debug("{} ignoring {} -- pg has reset", __func__, evt.get_desc());
- return interruptor::now();
} else {
logger().debug("{} handling {} for pg: {}", __func__, evt.get_desc(), pgid);
- // all peering event handling needs to be run in a dedicated seastar::thread,
- // so that event processing can involve I/O reqs freely, for example: PG::on_removal,
- // PG::on_new_interval
- return interruptor::async([this, &evt, &rctx] {
- peering_state.handle_event(
- evt.get_event(),
- &rctx);
- peering_state.write_if_dirty(rctx.transaction);
- });
+ peering_state.handle_event(
+ evt.get_event(),
+ &rctx);
+ peering_state.write_if_dirty(rctx.transaction);
}
}
-seastar::future<> PG::handle_advance_map(
+void PG::handle_advance_map(
cached_map_t next_map, PeeringCtx &rctx)
{
- return seastar::async([this, next_map=std::move(next_map), &rctx] {
- vector<int> newup, newacting;
- int up_primary, acting_primary;
- next_map->pg_to_up_acting_osds(
- pgid.pgid,
- &newup, &up_primary,
- &newacting, &acting_primary);
- peering_state.advance_map(
- next_map,
- peering_state.get_osdmap(),
- newup,
- up_primary,
- newacting,
- acting_primary,
- rctx);
- osdmap_gate.got_map(next_map->get_epoch());
- });
-}
-
-seastar::future<> PG::handle_activate_map(PeeringCtx &rctx)
+ vector<int> newup, newacting;
+ int up_primary, acting_primary;
+ next_map->pg_to_up_acting_osds(
+ pgid.pgid,
+ &newup, &up_primary,
+ &newacting, &acting_primary);
+ peering_state.advance_map(
+ next_map,
+ peering_state.get_osdmap(),
+ newup,
+ up_primary,
+ newacting,
+ acting_primary,
+ rctx);
+ osdmap_gate.got_map(next_map->get_epoch());
+}
+
+void PG::handle_activate_map(PeeringCtx &rctx)
{
- return seastar::async([this, &rctx] {
- peering_state.activate_map(rctx);
- });
+ peering_state.activate_map(rctx);
}
-seastar::future<> PG::handle_initialize(PeeringCtx &rctx)
+void PG::handle_initialize(PeeringCtx &rctx)
{
- return seastar::async([this, &rctx] {
- peering_state.handle_event(PeeringState::Initialize{}, &rctx);
- });
+ peering_state.handle_event(PeeringState::Initialize{}, &rctx);
}
void PG::init_collection_pool_opts()
seastar::future<> read_state(crimson::os::FuturizedStore::Shard* store);
- interruptible_future<> do_peering_event(
- PGPeeringEvent& evt, PeeringCtx &rctx);
+ void do_peering_event(PGPeeringEvent& evt, PeeringCtx &rctx);
- seastar::future<> handle_advance_map(cached_map_t next_map, PeeringCtx &rctx);
- seastar::future<> handle_activate_map(PeeringCtx &rctx);
- seastar::future<> handle_initialize(PeeringCtx &rctx);
+ void handle_advance_map(cached_map_t next_map, PeeringCtx &rctx);
+ void handle_activate_map(PeeringCtx &rctx);
+ void handle_initialize(PeeringCtx &rctx);
static hobject_t get_oid(const hobject_t& hobj);
static RWState::State get_lock_type(const OpInfo &op_info);