Ref<PG> get_pg(spg_t pgid);
seastar::future<> send_beacon();
+ template <typename T, typename... Args>
+ auto start_pg_operation(Args&&... args) {
+ auto op = shard_services.registry.create_operation<T>(
+ std::forward<Args>(args)...);
+ auto &logger = crimson::get_logger(ceph_subsys_osd);
+ logger.debug("{}: starting {}", *op, __func__);
+ auto &opref = *op;
+
+ auto fut = opref.template enter_stage<>(
+ opref.get_connection_pipeline().await_active
+ ).then([this, &opref, &logger] {
+ logger.debug("{}: start_pg_operation in await_active stage", opref);
+ return state.when_active();
+ }).then([&logger, &opref] {
+ logger.debug("{}: start_pg_operation active, entering await_map", opref);
+ return opref.template enter_stage<>(
+ opref.get_connection_pipeline().await_map);
+ }).then([this, &logger, &opref] {
+ logger.debug("{}: start_pg_operation await_map stage", opref);
+ using OSDMapBlockingEvent =
+ OSD_OSDMapGate::OSDMapBlocker::BlockingEvent;
+ return opref.template with_blocking_event<OSDMapBlockingEvent>(
+ [this, &opref](auto &&trigger) {
+ return osdmap_gate.wait_for_map(std::move(trigger),
+ opref.get_epoch());
+ });
+ }).then([&logger, &opref](auto epoch) {
+ logger.debug("{}: got map {}, entering get_pg", opref, epoch);
+ return opref.template enter_stage<>(
+ opref.get_connection_pipeline().get_pg);
+ }).then([this, &logger, &opref] {
+ logger.debug("{}: in get_pg", opref);
+ if constexpr (T::can_create()) {
+ logger.debug("{}: can_create", opref);
+ return opref.template with_blocking_event<
+ PGMap::PGCreationBlockingEvent
+ >([this, &opref](auto &&trigger) {
+ std::ignore = this; // avoid clang warning
+ return get_or_create_pg(
+ std::move(trigger),
+ opref.get_pgid(), opref.get_epoch(),
+ std::move(opref.get_create_info()));
+ });
+ } else {
+ logger.debug("{}: !can_create", opref);
+ return opref.template with_blocking_event<
+ PGMap::PGCreationBlockingEvent
+ >([this, &opref](auto &&trigger) {
+ std::ignore = this; // avoid clang warning
+ return wait_for_pg(std::move(trigger), opref.get_pgid());
+ });
+ }
+ }).then([this, &logger, &opref](Ref<PG> pgref) {
+ logger.debug("{}: have_pg", opref);
+ return opref.with_pg(shard_services, pgref);
+ }).then([op] { /* Retain refcount on op until completion */ });
+
+ return std::make_pair(std::move(op), std::move(fut));
+ }
+
+
private:
LogClient log_client;
LogChannelRef clog;