From a19e29c94ae28620257eaf6a03fd09ab37e0bee0 Mon Sep 17 00:00:00 2001 From: Matan Breizman Date: Mon, 31 Oct 2022 09:12:38 +0000 Subject: [PATCH] crimson/osd: Introduce ObjectContextLoader * obc load logic is moved to a seperate class for better reusability. * with_existing_* logic is removed. (not being used) Signed-off-by: Matan Breizman --- src/crimson/osd/CMakeLists.txt | 1 + src/crimson/osd/object_context_loader.cc | 190 +++++++++++++++++ src/crimson/osd/object_context_loader.h | 62 ++++++ src/crimson/osd/pg.cc | 194 +----------------- src/crimson/osd/pg.h | 42 +--- .../osd/replicated_recovery_backend.cc | 4 +- 6 files changed, 268 insertions(+), 225 deletions(-) create mode 100644 src/crimson/osd/object_context_loader.cc create mode 100644 src/crimson/osd/object_context_loader.h diff --git a/src/crimson/osd/CMakeLists.txt b/src/crimson/osd/CMakeLists.txt index 52cc33e3f954b..f899b064ad636 100644 --- a/src/crimson/osd/CMakeLists.txt +++ b/src/crimson/osd/CMakeLists.txt @@ -12,6 +12,7 @@ add_executable(crimson-osd shard_services.cc pg_shard_manager.cc object_context.cc + object_context_loader.cc ops_executer.cc osd_operation.cc osd_operations/client_request.cc diff --git a/src/crimson/osd/object_context_loader.cc b/src/crimson/osd/object_context_loader.cc new file mode 100644 index 0000000000000..246c20fcfe518 --- /dev/null +++ b/src/crimson/osd/object_context_loader.cc @@ -0,0 +1,190 @@ +#include "crimson/osd/object_context_loader.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +namespace crimson::osd { + +using crimson::common::local_conf; + + template + ObjectContextLoader::load_obc_iertr::future<> + ObjectContextLoader::with_head_obc(ObjectContextRef obc, + bool existed, + with_obc_func_t&& func) + { + logger().debug("{} {}", __func__, obc->get_oid()); + assert(obc->is_head()); + obc->append_to(obc_set_accessing); + return obc->with_lock( + [existed=existed, obc=obc, func=std::move(func), this] { + return get_or_load_obc(obc, existed) + .safe_then_interruptible( + [func = std::move(func)](auto obc) { + return std::move(func)(std::move(obc)); + }); + }).finally([this, obc=std::move(obc)] { + logger().debug("with_head_obc: released {}", obc->get_oid()); + obc->remove_from(obc_set_accessing); + }); + } + + template + ObjectContextLoader::load_obc_iertr::future<> + ObjectContextLoader::with_clone_obc(hobject_t oid, + with_obc_func_t&& func) + { + assert(!oid.is_head()); + return with_head_obc(oid.get_head(), + [oid, func=std::move(func), this](auto head) + -> load_obc_iertr::future<> { + if (!head->obs.exists) { + logger().error("with_clone_obc: {} head doesn't exist", + head->obs.oi.soid); + return load_obc_iertr::future<>{ + crimson::ct_error::enoent::make() + }; + } + auto coid = resolve_oid(head->get_ro_ss(), oid); + if (!coid) { + logger().error("with_clone_obc: {} clone not found", coid); + return load_obc_iertr::future<>{ + crimson::ct_error::enoent::make() + }; + } + auto [clone, existed] = shard_services.get_cached_obc(*coid); + return clone->template with_lock( + [existed=existed, head=std::move(head), clone=std::move(clone), + func=std::move(func), this]() -> load_obc_iertr::future<> { + auto loaded = get_or_load_obc(clone, existed); + clone->head = head; + return loaded.safe_then_interruptible( + [func = std::move(func)](auto clone) { + return std::move(func)(std::move(clone)); + }); + }); + }); + } + + template + ObjectContextLoader::load_obc_iertr::future<> + ObjectContextLoader::with_head_obc(hobject_t oid, + with_obc_func_t&& func) + { + auto [obc, existed] = + shard_services.get_cached_obc(std::move(oid)); + return with_head_obc(std::move(obc), + existed, + std::move(func)); + } + + ObjectContextLoader::load_obc_iertr::future + ObjectContextLoader::load_obc(ObjectContextRef obc) + { + return backend->load_metadata(obc->get_oid()) + .safe_then_interruptible( + [obc=std::move(obc)](auto md) + -> load_obc_ertr::future { + const hobject_t& oid = md->os.oi.soid; + logger().debug( + "load_obc: loaded obs {} for {}", md->os.oi, oid); + if (oid.is_head()) { + if (!md->ssc) { + logger().error( + "load_obc: oid {} missing snapsetcontext", oid); + return crimson::ct_error::object_corrupted::make(); + } + obc->set_head_state(std::move(md->os), + std::move(md->ssc)); + } else { + obc->set_clone_state(std::move(md->os)); + } + logger().debug( + "load_obc: returning obc {} for {}", + obc->obs.oi, obc->obs.oi.soid); + return load_obc_ertr::make_ready_future(obc); + }); + } + + template + ObjectContextLoader::load_obc_iertr::future + ObjectContextLoader::get_or_load_obc(ObjectContextRef obc, + bool existed) + { + auto loaded = + load_obc_iertr::make_ready_future(obc); + if (existed) { + logger().debug("{}: found {} in cache", + __func__, obc->get_oid()); + } else { + logger().debug("{}: cache miss on {}", + __func__, obc->get_oid()); + loaded = + obc->template with_promoted_lock( + [obc, this] { + return load_obc(obc); + }); + } + return loaded; + } + + ObjectContextLoader::load_obc_iertr::future<> + ObjectContextLoader::reload_obc(ObjectContext& obc) const + { + assert(obc.is_head()); + return backend->load_metadata(obc.get_oid()) + .safe_then_interruptible( + [&obc](auto md)-> load_obc_ertr::future<> { + logger().debug( + "{}: reloaded obs {} for {}", + __func__, + md->os.oi, + obc.get_oid()); + if (!md->ssc) { + logger().error( + "{}: oid {} missing snapsetcontext", + __func__, + obc.get_oid()); + return crimson::ct_error::object_corrupted::make(); + } + obc.set_head_state(std::move(md->os), std::move(md->ssc)); + return load_obc_ertr::now(); + }); + } + + // explicitly instantiate the used instantiations + template ObjectContextLoader::load_obc_iertr::future<> + ObjectContextLoader::with_head_obc(hobject_t, + with_obc_func_t&&); + + template ObjectContextLoader::load_obc_iertr::future<> + ObjectContextLoader::with_head_obc(hobject_t, + with_obc_func_t&&); + + template ObjectContextLoader::load_obc_iertr::future<> + ObjectContextLoader::with_head_obc(hobject_t, + with_obc_func_t&&); + + template ObjectContextLoader::load_obc_iertr::future<> + ObjectContextLoader::with_head_obc(hobject_t, + with_obc_func_t&&); + + template ObjectContextLoader::load_obc_iertr::future<> + ObjectContextLoader::with_clone_obc(hobject_t, + with_obc_func_t&&); + + template ObjectContextLoader::load_obc_iertr::future<> + ObjectContextLoader::with_clone_obc(hobject_t, + with_obc_func_t&&); + + template ObjectContextLoader::load_obc_iertr::future<> + ObjectContextLoader::with_clone_obc(hobject_t, + with_obc_func_t&&); + + template ObjectContextLoader::load_obc_iertr::future<> + ObjectContextLoader::with_clone_obc(hobject_t, + with_obc_func_t&&); +} diff --git a/src/crimson/osd/object_context_loader.h b/src/crimson/osd/object_context_loader.h new file mode 100644 index 0000000000000..7771da3cde405 --- /dev/null +++ b/src/crimson/osd/object_context_loader.h @@ -0,0 +1,62 @@ +#pragma once + +#include +#include "crimson/common/errorator.h" +#include "crimson/osd/object_context.h" +#include "crimson/osd/shard_services.h" +#include "crimson/osd/pg_backend.h" + +namespace crimson::osd { +class ObjectContextLoader { +public: + using obc_accessing_list_t = boost::intrusive::list< + ObjectContext, + ObjectContext::obc_accessing_option_t>; + + ObjectContextLoader( + ShardServices& _shard_services, + PGBackend* _backend) + : shard_services{_shard_services}, + backend{_backend} + {} + + using load_obc_ertr = crimson::errorator< + crimson::ct_error::enoent, + crimson::ct_error::object_corrupted>; + using load_obc_iertr = + ::crimson::interruptible::interruptible_errorator< + ::crimson::osd::IOInterruptCondition, + load_obc_ertr>; + + using with_obc_func_t = + std::function (ObjectContextRef)>; + + template + load_obc_iertr::future<> with_head_obc(hobject_t oid, + with_obc_func_t&& func); + + template + load_obc_iertr::future<> with_clone_obc(hobject_t oid, + with_obc_func_t&& func); + + template + load_obc_iertr::future<> with_head_obc(ObjectContextRef obc, + bool existed, + with_obc_func_t&& func); + + template + load_obc_iertr::future + get_or_load_obc(ObjectContextRef obc, + bool existed); + + load_obc_iertr::future + load_obc(ObjectContextRef obc); + + load_obc_iertr::future<> reload_obc(ObjectContext& obc) const; + +private: + ShardServices &shard_services; + PGBackend* backend; + obc_accessing_list_t obc_set_accessing; +}; +} diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 6a386bcacdc7c..b8420ef3f4e01 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -133,6 +133,9 @@ PG::PG( osdmap, this, this), + obc_loader{ + shard_services, + backend.get()}, wait_for_active_blocker(this) { peering_state.set_backend_predicates( @@ -632,7 +635,7 @@ PG::do_osd_ops_execute( { assert(ox); auto rollbacker = ox->create_rollbacker([this] (auto& obc) { - return reload_obc(obc).handle_error_interruptible( + return obc_loader.reload_obc(obc).handle_error_interruptible( load_obc_ertr::assert_all{"can't live with object state messed up"}); }); auto failure_func_ptr = seastar::make_lw_shared(std::move(failure_func)); @@ -1030,163 +1033,6 @@ std::optional PG::resolve_oid( } } -template -PG::load_obc_iertr::future<> -PG::with_head_obc(ObjectContextRef obc, bool existed, with_obc_func_t&& func) -{ - logger().debug("{} {}", __func__, obc->get_oid()); - assert(obc->is_head()); - obc->append_to(obc_set_accessing); - return obc->with_lock( - [existed=existed, obc=obc, func=std::move(func), this] { - return get_or_load_obc(obc, existed).safe_then_interruptible( - [func = std::move(func)](auto obc) { - return std::move(func)(std::move(obc)); - }); - }).finally([this, pgref=boost::intrusive_ptr{this}, obc=std::move(obc)] { - logger().debug("with_head_obc: released {}", obc->get_oid()); - obc->remove_from(obc_set_accessing); - }); -} - -template -PG::load_obc_iertr::future<> -PG::with_head_obc(hobject_t oid, with_obc_func_t&& func) -{ - auto [obc, existed] = - shard_services.get_cached_obc(std::move(oid)); - return with_head_obc(std::move(obc), existed, std::move(func)); -} - -template -PG::interruptible_future<> -PG::with_existing_head_obc(ObjectContextRef obc, with_obc_func_t&& func) -{ - constexpr bool existed = true; - return with_head_obc( - std::move(obc), existed, std::move(func) - ).handle_error_interruptible(load_obc_ertr::assert_all{"can't happen"}); -} - -template -PG::load_obc_iertr::future<> -PG::with_clone_obc(hobject_t oid, with_obc_func_t&& func) -{ - assert(!oid.is_head()); - return with_head_obc(oid.get_head(), - [oid, func=std::move(func), this](auto head) -> load_obc_iertr::future<> { - if (!head->obs.exists) { - logger().error("with_clone_obc: {} head doesn't exist", head->obs.oi.soid); - return load_obc_iertr::future<>{crimson::ct_error::object_corrupted::make()}; - } - auto coid = resolve_oid(head->get_ro_ss(), oid); - if (!coid) { - logger().error("with_clone_obc: {} clone not found", oid); - return load_obc_iertr::future<>{crimson::ct_error::enoent::make()}; - } - auto [clone, existed] = shard_services.get_cached_obc(*coid); - return clone->template with_lock( - [existed=existed, head=std::move(head), clone=std::move(clone), - func=std::move(func), this]() -> load_obc_iertr::future<> { - auto loaded = get_or_load_obc(clone, existed); - clone->head = head; - return loaded.safe_then_interruptible([func = std::move(func)](auto clone) { - return std::move(func)(std::move(clone)); - }); - }); - }); -} - -// explicitly instantiate the used instantiations -template PG::load_obc_iertr::future<> -PG::with_head_obc(hobject_t, with_obc_func_t&&); - -template -PG::interruptible_future<> -PG::with_existing_clone_obc(ObjectContextRef clone, with_obc_func_t&& func) -{ - assert(clone); - assert(clone->get_head_obc()); - assert(!clone->get_oid().is_head()); - return with_existing_head_obc(clone->get_head_obc(), - [clone=std::move(clone), func=std::move(func)] ([[maybe_unused]] auto head) { - assert(head == clone->get_head_obc()); - return clone->template with_lock( - [clone=std::move(clone), func=std::move(func)] { - return std::move(func)(std::move(clone)); - }); - }); -} - -PG::load_obc_iertr::future -PG::load_obc(ObjectContextRef obc) -{ - return backend->load_metadata(obc->get_oid()).safe_then_interruptible( - [obc=std::move(obc)](auto md) - -> load_obc_ertr::future { - const hobject_t& oid = md->os.oi.soid; - logger().debug( - "load_obc: loaded obs {} for {}", md->os.oi, oid); - if (oid.is_head()) { - if (!md->ssc) { - logger().error( - "load_obc: oid {} missing snapsetcontext", oid); - return crimson::ct_error::object_corrupted::make(); - } - obc->set_head_state(std::move(md->os), std::move(md->ssc)); - } else { - obc->set_clone_state(std::move(md->os)); - } - logger().debug( - "load_obc: returning obc {} for {}", - obc->obs.oi, obc->obs.oi.soid); - return load_obc_ertr::make_ready_future< - crimson::osd::ObjectContextRef>(obc); - }); -} - -template -PG::load_obc_iertr::future -PG::get_or_load_obc( - crimson::osd::ObjectContextRef obc, - bool existed) -{ - auto loaded = load_obc_iertr::make_ready_future(obc); - if (existed) { - logger().debug("{}: found {} in cache", __func__, obc->get_oid()); - } else { - logger().debug("{}: cache miss on {}", __func__, obc->get_oid()); - loaded = obc->template with_promoted_lock( - [obc, this] { - return load_obc(obc); - }); - } - return loaded; -} - -PG::load_obc_iertr::future<> -PG::reload_obc(crimson::osd::ObjectContext& obc) const -{ - assert(obc.is_head()); - return backend->load_metadata(obc.get_oid()).safe_then_interruptible([&obc](auto md) - -> load_obc_ertr::future<> { - logger().debug( - "{}: reloaded obs {} for {}", - __func__, - md->os.oi, - obc.get_oid()); - if (!md->ssc) { - logger().error( - "{}: oid {} missing snapsetcontext", - __func__, - obc.get_oid()); - return crimson::ct_error::object_corrupted::make(); - } - obc.set_head_state(std::move(md->os), std::move(md->ssc)); - return load_obc_ertr::now(); - }); -} - PG::load_obc_iertr::future<> PG::with_locked_obc(const hobject_t &hobj, const OpInfo &op_info, @@ -1199,47 +1045,27 @@ PG::with_locked_obc(const hobject_t &hobj, switch (get_lock_type(op_info)) { case RWState::RWREAD: if (oid.is_head()) { - return with_head_obc(oid, std::move(f)); + return obc_loader.with_head_obc(oid, std::move(f)); } else { - return with_clone_obc(oid, std::move(f)); + return obc_loader.with_clone_obc(oid, std::move(f)); } case RWState::RWWRITE: if (oid.is_head()) { - return with_head_obc(oid, std::move(f)); + return obc_loader.with_head_obc(oid, std::move(f)); } else { - return with_clone_obc(oid, std::move(f)); + return obc_loader.with_clone_obc(oid, std::move(f)); } case RWState::RWEXCL: if (oid.is_head()) { - return with_head_obc(oid, std::move(f)); + return obc_loader.with_head_obc(oid, std::move(f)); } else { - return with_clone_obc(oid, std::move(f)); + return obc_loader.with_clone_obc(oid, std::move(f)); } default: ceph_abort(); }; } -template -PG::interruptible_future<> -PG::with_locked_obc(ObjectContextRef obc, with_obc_func_t &&f) -{ - // TODO: a question from rebase: do we really need such checks when - // the interruptible stuff is being used? - if (__builtin_expect(stopping, false)) { - throw crimson::common::system_shutdown_exception(); - } - if (obc->is_head()) { - return with_existing_head_obc(obc, std::move(f)); - } else { - return with_existing_clone_obc(obc, std::move(f)); - } -} - -// explicitly instantiate the used instantiations -template PG::interruptible_future<> -PG::with_locked_obc(ObjectContextRef, with_obc_func_t&&); - PG::interruptible_future<> PG::handle_rep_op(Ref req) { if (__builtin_expect(stopping, false)) { diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 2bd230c285b42..000844aa271ad 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -36,6 +36,7 @@ #include "crimson/osd/pg_recovery.h" #include "crimson/osd/pg_recovery_listener.h" #include "crimson/osd/recovery_backend.h" +#include "crimson/osd/object_context_loader.h" class MQuery; class OSDMap; @@ -519,17 +520,6 @@ public: using interruptor = ::crimson::interruptible::interruptor< ::crimson::osd::IOInterruptCondition>; - template - load_obc_iertr::future - get_or_load_obc( - crimson::osd::ObjectContextRef head_obc, bool existed); - - load_obc_iertr::future - load_obc(ObjectContextRef obc); - - load_obc_iertr::future<> - reload_obc(crimson::osd::ObjectContext& obc) const; - public: using with_obc_func_t = std::function (ObjectContextRef)>; @@ -539,13 +529,6 @@ public: ObjectContext::obc_accessing_option_t>; obc_accessing_list_t obc_set_accessing; - template - load_obc_iertr::future<> with_head_obc(hobject_t oid, with_obc_func_t&& func); - - template - interruptible_future<> with_locked_obc( - ObjectContextRef obc, - with_obc_func_t&& f); load_obc_iertr::future<> with_locked_obc( const hobject_t &hobj, const OpInfo &op_info, @@ -569,27 +552,6 @@ public: eversion_t &version); private: - template - load_obc_iertr::future<> with_head_obc( - ObjectContextRef obc, - bool existed, - with_obc_func_t&& func); - template - interruptible_future<> with_existing_head_obc( - ObjectContextRef head, - with_obc_func_t&& func); - - template - load_obc_iertr::future<> with_clone_obc(hobject_t oid, with_obc_func_t&& func); - template - interruptible_future<> with_existing_clone_obc( - ObjectContextRef clone, with_obc_func_t&& func); - - load_obc_iertr::future get_locked_obc( - Operation *op, - const hobject_t &oid, - RWState::State type); - void fill_op_params_bump_pg_version( osd_op_params_t& osd_op_p, const bool user_modify); @@ -661,6 +623,8 @@ private: eversion_t projected_last_update; public: + ObjectContextLoader obc_loader; + // PeeringListener void publish_stats_to_osd() final; void clear_publish_stats() final; diff --git a/src/crimson/osd/replicated_recovery_backend.cc b/src/crimson/osd/replicated_recovery_backend.cc index 95a649d4182e5..6297ca87ba4ad 100644 --- a/src/crimson/osd/replicated_recovery_backend.cc +++ b/src/crimson/osd/replicated_recovery_backend.cc @@ -33,7 +33,7 @@ ReplicatedRecoveryBackend::recover_object( // start tracking the recovery of soid return maybe_pull_missing_obj(soid, need).then_interruptible([this, soid, need] { logger().debug("recover_object: loading obc: {}", soid); - return pg.with_head_obc(soid, + return pg.obc_loader.with_head_obc(soid, [this, soid, need](auto obc) { logger().debug("recover_object: loaded obc: {}", obc->obs.oi.soid); auto& recovery_waiter = get_recovering(soid); @@ -678,7 +678,7 @@ ReplicatedRecoveryBackend::_handle_pull_response( auto prepare_waiter = interruptor::make_interruptible( seastar::make_ready_future<>()); if (pi.recovery_progress.first) { - prepare_waiter = pg.with_head_obc( + prepare_waiter = pg.obc_loader.with_head_obc( pi.recovery_info.soid, [&pi, &recovery_waiter, &pop](auto obc) { pi.obc = obc; recovery_waiter.obc = obc; -- 2.39.5