From: Yingxin Cheng Date: Thu, 14 Sep 2023 09:02:03 +0000 (+0800) Subject: crimson/osd/pg_shard_manager: cleanups around the remote pg submission X-Git-Tag: v19.0.0~141^2~7 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=08c9fbca19b62b0a3d0dc3892e6191cdeae73483;p=ceph.git crimson/osd/pg_shard_manager: cleanups around the remote pg submission Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/osd/pg_shard_manager.h b/src/crimson/osd/pg_shard_manager.h index fb5e93392caf..71a9cf1a9c80 100644 --- a/src/crimson/osd/pg_shard_manager.h +++ b/src/crimson/osd/pg_shard_manager.h @@ -153,11 +153,11 @@ public: core_id_t core, typename T::IRef &&op, F &&f) { + ceph_assert(op->use_count() == 1); if (seastar::this_shard_id() == core) { auto &target_shard_services = shard_services.local(); return std::invoke( std::move(f), - target_shard_services.local_state, target_shard_services, std::move(op)); } @@ -183,80 +183,54 @@ public: /// Runs opref on the appropriate core, creating the pg as necessary. template seastar::future<> run_with_pg_maybe_create( - typename T::IRef op + typename T::IRef op, + ShardServices &target_shard_services ) { - ceph_assert(op->use_count() == 1); - auto &logger = crimson::get_logger(ceph_subsys_osd); static_assert(T::can_create()); - logger.debug("{}: can_create", *op); - - return get_pg_to_shard_mapping().maybe_create_pg( - op->get_pgid() - ).then([this, op = std::move(op)](auto core) mutable { - return this->template with_remote_shard_state_and_op( - core, std::move(op), - [](ShardServices &shard_services, - typename T::IRef op) { - auto &logger = crimson::get_logger(ceph_subsys_osd); - auto &opref = *op; - return opref.template with_blocking_event< - PGMap::PGCreationBlockingEvent - >([&shard_services, &opref]( - auto &&trigger) { - return shard_services.get_or_create_pg( - std::move(trigger), - opref.get_pgid(), - std::move(opref.get_create_info()) - ); - }).safe_then([&logger, &shard_services, &opref](Ref pgref) { - logger.debug("{}: have_pg", opref); - return opref.with_pg(shard_services, pgref); - }).handle_error( - crimson::ct_error::ecanceled::handle([&logger, &opref](auto) { - logger.debug("{}: pg creation canceled, dropping", opref); - return seastar::now(); - }) - ).then([op=std::move(op)] {}); - }); - }); + auto &logger = crimson::get_logger(ceph_subsys_osd); + auto &opref = *op; + return opref.template with_blocking_event< + PGMap::PGCreationBlockingEvent + >([&target_shard_services, &opref](auto &&trigger) { + return target_shard_services.get_or_create_pg( + std::move(trigger), + opref.get_pgid(), + std::move(opref.get_create_info()) + ); + }).safe_then([&logger, &target_shard_services, &opref](Ref pgref) { + logger.debug("{}: have_pg", opref); + return opref.with_pg(target_shard_services, pgref); + }).handle_error( + crimson::ct_error::ecanceled::handle([&logger, &opref](auto) { + logger.debug("{}: pg creation canceled, dropping", opref); + return seastar::now(); + }) + ).then([op=std::move(op)] {}); } /// Runs opref on the appropriate core, waiting for pg as necessary template seastar::future<> run_with_pg_maybe_wait( - typename T::IRef op + typename T::IRef op, + ShardServices &target_shard_services ) { - ceph_assert(op->use_count() == 1); - auto &logger = crimson::get_logger(ceph_subsys_osd); static_assert(!T::can_create()); - logger.debug("{}: !can_create", *op); - - return get_pg_to_shard_mapping().maybe_create_pg( - op->get_pgid() - ).then([this, op = std::move(op)](auto core) mutable { - return this->template with_remote_shard_state_and_op( - core, std::move(op), - [](ShardServices &shard_services, - typename T::IRef op) { - auto &logger = crimson::get_logger(ceph_subsys_osd); - auto &opref = *op; - return opref.template with_blocking_event< - PGMap::PGCreationBlockingEvent - >([&shard_services, &opref]( - auto &&trigger) { - return shard_services.wait_for_pg( - std::move(trigger), opref.get_pgid()); - }).safe_then([&logger, &shard_services, &opref](Ref pgref) { - logger.debug("{}: have_pg", opref); - return opref.with_pg(shard_services, pgref); - }).handle_error( - crimson::ct_error::ecanceled::handle([&logger, &opref](auto) { - logger.debug("{}: pg creation canceled, dropping", opref); - return seastar::now(); - }) - ).then([op=std::move(op)] {}); - }); - }); + auto &logger = crimson::get_logger(ceph_subsys_osd); + auto &opref = *op; + return opref.template with_blocking_event< + PGMap::PGCreationBlockingEvent + >([&target_shard_services, &opref](auto &&trigger) { + return target_shard_services.wait_for_pg( + std::move(trigger), opref.get_pgid()); + }).safe_then([&logger, &target_shard_services, &opref](Ref pgref) { + logger.debug("{}: have_pg", opref); + return opref.with_pg(target_shard_services, pgref); + }).handle_error( + crimson::ct_error::ecanceled::handle([&logger, &opref](auto) { + logger.debug("{}: pg creation canceled, dropping", opref); + return seastar::now(); + }) + ).then([op=std::move(op)] {}); } seastar::future<> load_pgs(crimson::os::FuturizedStore& store); @@ -364,16 +338,23 @@ public: logger.debug("{}: got map {}, entering get_pg", opref, epoch); return opref.template enter_stage<>( opref.get_connection_pipeline().get_pg); - }).then([this, &logger, &opref, op=std::move(op)]() mutable { - logger.debug("{}: in get_pg core {}", opref, seastar::this_shard_id()); - logger.debug("{}: in get_pg", opref); - if constexpr (T::can_create()) { - logger.debug("{}: can_create", opref); - return run_with_pg_maybe_create(std::move(op)); - } else { - logger.debug("{}: !can_create", opref); - return run_with_pg_maybe_wait(std::move(op)); - } + }).then([this, &opref] { + return get_pg_to_shard_mapping().maybe_create_pg(opref.get_pgid()); + }).then([this, &logger, op=std::move(op)](auto core) mutable { + logger.debug("{}: can_create={}, target-core={}", + *op, T::can_create(), core); + return this->template with_remote_shard_state_and_op( + core, std::move(op), + [this](ShardServices &target_shard_services, + typename T::IRef op) { + if constexpr (T::can_create()) { + return this->template run_with_pg_maybe_create( + std::move(op), target_shard_services); + } else { + return this->template run_with_pg_maybe_wait( + std::move(op), target_shard_services); + } + }); }); return std::make_pair(id, std::move(fut)); }