* obc load logic is moved to a seperate class for better reusability.
* with_existing_* logic is removed. (not being used)
Signed-off-by: Matan Breizman <mbreizma@redhat.com>
shard_services.cc
pg_shard_manager.cc
object_context.cc
+ object_context_loader.cc
ops_executer.cc
osd_operation.cc
osd_operations/client_request.cc
--- /dev/null
+#include "crimson/osd/object_context_loader.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace crimson::osd {
+
+using crimson::common::local_conf;
+
+ template<RWState::State State>
+ ObjectContextLoader::load_obc_iertr::future<>
+ ObjectContextLoader::with_head_obc(ObjectContextRef obc,
+ bool existed,
+ with_obc_func_t&& func)
+ {
+ logger().debug("{} {}", __func__, obc->get_oid());
+ assert(obc->is_head());
+ obc->append_to(obc_set_accessing);
+ return obc->with_lock<State, IOInterruptCondition>(
+ [existed=existed, obc=obc, func=std::move(func), this] {
+ return get_or_load_obc<State>(obc, existed)
+ .safe_then_interruptible(
+ [func = std::move(func)](auto obc) {
+ return std::move(func)(std::move(obc));
+ });
+ }).finally([this, obc=std::move(obc)] {
+ logger().debug("with_head_obc: released {}", obc->get_oid());
+ obc->remove_from(obc_set_accessing);
+ });
+ }
+
+ template<RWState::State State>
+ ObjectContextLoader::load_obc_iertr::future<>
+ ObjectContextLoader::with_clone_obc(hobject_t oid,
+ with_obc_func_t&& func)
+ {
+ assert(!oid.is_head());
+ return with_head_obc<RWState::RWREAD>(oid.get_head(),
+ [oid, func=std::move(func), this](auto head)
+ -> load_obc_iertr::future<> {
+ if (!head->obs.exists) {
+ logger().error("with_clone_obc: {} head doesn't exist",
+ head->obs.oi.soid);
+ return load_obc_iertr::future<>{
+ crimson::ct_error::enoent::make()
+ };
+ }
+ auto coid = resolve_oid(head->get_ro_ss(), oid);
+ if (!coid) {
+ logger().error("with_clone_obc: {} clone not found", coid);
+ return load_obc_iertr::future<>{
+ crimson::ct_error::enoent::make()
+ };
+ }
+ auto [clone, existed] = shard_services.get_cached_obc(*coid);
+ return clone->template with_lock<State, IOInterruptCondition>(
+ [existed=existed, head=std::move(head), clone=std::move(clone),
+ func=std::move(func), this]() -> load_obc_iertr::future<> {
+ auto loaded = get_or_load_obc<State>(clone, existed);
+ clone->head = head;
+ return loaded.safe_then_interruptible(
+ [func = std::move(func)](auto clone) {
+ return std::move(func)(std::move(clone));
+ });
+ });
+ });
+ }
+
+ template<RWState::State State>
+ ObjectContextLoader::load_obc_iertr::future<>
+ ObjectContextLoader::with_head_obc(hobject_t oid,
+ with_obc_func_t&& func)
+ {
+ auto [obc, existed] =
+ shard_services.get_cached_obc(std::move(oid));
+ return with_head_obc<State>(std::move(obc),
+ existed,
+ std::move(func));
+ }
+
+ ObjectContextLoader::load_obc_iertr::future<ObjectContextRef>
+ ObjectContextLoader::load_obc(ObjectContextRef obc)
+ {
+ return backend->load_metadata(obc->get_oid())
+ .safe_then_interruptible(
+ [obc=std::move(obc)](auto md)
+ -> load_obc_ertr::future<ObjectContextRef> {
+ const hobject_t& oid = md->os.oi.soid;
+ logger().debug(
+ "load_obc: loaded obs {} for {}", md->os.oi, oid);
+ if (oid.is_head()) {
+ if (!md->ssc) {
+ logger().error(
+ "load_obc: oid {} missing snapsetcontext", oid);
+ return crimson::ct_error::object_corrupted::make();
+ }
+ obc->set_head_state(std::move(md->os),
+ std::move(md->ssc));
+ } else {
+ obc->set_clone_state(std::move(md->os));
+ }
+ logger().debug(
+ "load_obc: returning obc {} for {}",
+ obc->obs.oi, obc->obs.oi.soid);
+ return load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
+ });
+ }
+
+ template<RWState::State State>
+ ObjectContextLoader::load_obc_iertr::future<ObjectContextRef>
+ ObjectContextLoader::get_or_load_obc(ObjectContextRef obc,
+ bool existed)
+ {
+ auto loaded =
+ load_obc_iertr::make_ready_future<ObjectContextRef>(obc);
+ if (existed) {
+ logger().debug("{}: found {} in cache",
+ __func__, obc->get_oid());
+ } else {
+ logger().debug("{}: cache miss on {}",
+ __func__, obc->get_oid());
+ loaded =
+ obc->template with_promoted_lock<State, IOInterruptCondition>(
+ [obc, this] {
+ return load_obc(obc);
+ });
+ }
+ return loaded;
+ }
+
+ ObjectContextLoader::load_obc_iertr::future<>
+ ObjectContextLoader::reload_obc(ObjectContext& obc) const
+ {
+ assert(obc.is_head());
+ return backend->load_metadata(obc.get_oid())
+ .safe_then_interruptible<false>(
+ [&obc](auto md)-> load_obc_ertr::future<> {
+ logger().debug(
+ "{}: reloaded obs {} for {}",
+ __func__,
+ md->os.oi,
+ obc.get_oid());
+ if (!md->ssc) {
+ logger().error(
+ "{}: oid {} missing snapsetcontext",
+ __func__,
+ obc.get_oid());
+ return crimson::ct_error::object_corrupted::make();
+ }
+ obc.set_head_state(std::move(md->os), std::move(md->ssc));
+ return load_obc_ertr::now();
+ });
+ }
+
+ // explicitly instantiate the used instantiations
+ template ObjectContextLoader::load_obc_iertr::future<>
+ ObjectContextLoader::with_head_obc<RWState::RWNONE>(hobject_t,
+ with_obc_func_t&&);
+
+ template ObjectContextLoader::load_obc_iertr::future<>
+ ObjectContextLoader::with_head_obc<RWState::RWREAD>(hobject_t,
+ with_obc_func_t&&);
+
+ template ObjectContextLoader::load_obc_iertr::future<>
+ ObjectContextLoader::with_head_obc<RWState::RWWRITE>(hobject_t,
+ with_obc_func_t&&);
+
+ template ObjectContextLoader::load_obc_iertr::future<>
+ ObjectContextLoader::with_head_obc<RWState::RWEXCL>(hobject_t,
+ with_obc_func_t&&);
+
+ template ObjectContextLoader::load_obc_iertr::future<>
+ ObjectContextLoader::with_clone_obc<RWState::RWNONE>(hobject_t,
+ with_obc_func_t&&);
+
+ template ObjectContextLoader::load_obc_iertr::future<>
+ ObjectContextLoader::with_clone_obc<RWState::RWREAD>(hobject_t,
+ with_obc_func_t&&);
+
+ template ObjectContextLoader::load_obc_iertr::future<>
+ ObjectContextLoader::with_clone_obc<RWState::RWWRITE>(hobject_t,
+ with_obc_func_t&&);
+
+ template ObjectContextLoader::load_obc_iertr::future<>
+ ObjectContextLoader::with_clone_obc<RWState::RWEXCL>(hobject_t,
+ with_obc_func_t&&);
+}
--- /dev/null
+#pragma once
+
+#include <seastar/core/future.hh>
+#include "crimson/common/errorator.h"
+#include "crimson/osd/object_context.h"
+#include "crimson/osd/shard_services.h"
+#include "crimson/osd/pg_backend.h"
+
+namespace crimson::osd {
+class ObjectContextLoader {
+public:
+ using obc_accessing_list_t = boost::intrusive::list<
+ ObjectContext,
+ ObjectContext::obc_accessing_option_t>;
+
+ ObjectContextLoader(
+ ShardServices& _shard_services,
+ PGBackend* _backend)
+ : shard_services{_shard_services},
+ backend{_backend}
+ {}
+
+ using load_obc_ertr = crimson::errorator<
+ crimson::ct_error::enoent,
+ crimson::ct_error::object_corrupted>;
+ using load_obc_iertr =
+ ::crimson::interruptible::interruptible_errorator<
+ ::crimson::osd::IOInterruptCondition,
+ load_obc_ertr>;
+
+ using with_obc_func_t =
+ std::function<load_obc_iertr::future<> (ObjectContextRef)>;
+
+ template<RWState::State State>
+ load_obc_iertr::future<> with_head_obc(hobject_t oid,
+ with_obc_func_t&& func);
+
+ template<RWState::State State>
+ load_obc_iertr::future<> with_clone_obc(hobject_t oid,
+ with_obc_func_t&& func);
+
+ template<RWState::State State>
+ load_obc_iertr::future<> with_head_obc(ObjectContextRef obc,
+ bool existed,
+ with_obc_func_t&& func);
+
+ template<RWState::State State>
+ load_obc_iertr::future<ObjectContextRef>
+ get_or_load_obc(ObjectContextRef obc,
+ bool existed);
+
+ load_obc_iertr::future<ObjectContextRef>
+ load_obc(ObjectContextRef obc);
+
+ load_obc_iertr::future<> reload_obc(ObjectContext& obc) const;
+
+private:
+ ShardServices &shard_services;
+ PGBackend* backend;
+ obc_accessing_list_t obc_set_accessing;
+};
+}
osdmap,
this,
this),
+ obc_loader{
+ shard_services,
+ backend.get()},
wait_for_active_blocker(this)
{
peering_state.set_backend_predicates(
{
assert(ox);
auto rollbacker = ox->create_rollbacker([this] (auto& obc) {
- return reload_obc(obc).handle_error_interruptible(
+ return obc_loader.reload_obc(obc).handle_error_interruptible(
load_obc_ertr::assert_all{"can't live with object state messed up"});
});
auto failure_func_ptr = seastar::make_lw_shared(std::move(failure_func));
}
}
-template<RWState::State State>
-PG::load_obc_iertr::future<>
-PG::with_head_obc(ObjectContextRef obc, bool existed, with_obc_func_t&& func)
-{
- logger().debug("{} {}", __func__, obc->get_oid());
- assert(obc->is_head());
- obc->append_to(obc_set_accessing);
- return obc->with_lock<State, IOInterruptCondition>(
- [existed=existed, obc=obc, func=std::move(func), this] {
- return get_or_load_obc<State>(obc, existed).safe_then_interruptible(
- [func = std::move(func)](auto obc) {
- return std::move(func)(std::move(obc));
- });
- }).finally([this, pgref=boost::intrusive_ptr<PG>{this}, obc=std::move(obc)] {
- logger().debug("with_head_obc: released {}", obc->get_oid());
- obc->remove_from(obc_set_accessing);
- });
-}
-
-template<RWState::State State>
-PG::load_obc_iertr::future<>
-PG::with_head_obc(hobject_t oid, with_obc_func_t&& func)
-{
- auto [obc, existed] =
- shard_services.get_cached_obc(std::move(oid));
- return with_head_obc<State>(std::move(obc), existed, std::move(func));
-}
-
-template<RWState::State State>
-PG::interruptible_future<>
-PG::with_existing_head_obc(ObjectContextRef obc, with_obc_func_t&& func)
-{
- constexpr bool existed = true;
- return with_head_obc<State>(
- std::move(obc), existed, std::move(func)
- ).handle_error_interruptible(load_obc_ertr::assert_all{"can't happen"});
-}
-
-template<RWState::State State>
-PG::load_obc_iertr::future<>
-PG::with_clone_obc(hobject_t oid, with_obc_func_t&& func)
-{
- assert(!oid.is_head());
- return with_head_obc<RWState::RWREAD>(oid.get_head(),
- [oid, func=std::move(func), this](auto head) -> load_obc_iertr::future<> {
- if (!head->obs.exists) {
- logger().error("with_clone_obc: {} head doesn't exist", head->obs.oi.soid);
- return load_obc_iertr::future<>{crimson::ct_error::object_corrupted::make()};
- }
- auto coid = resolve_oid(head->get_ro_ss(), oid);
- if (!coid) {
- logger().error("with_clone_obc: {} clone not found", oid);
- return load_obc_iertr::future<>{crimson::ct_error::enoent::make()};
- }
- auto [clone, existed] = shard_services.get_cached_obc(*coid);
- return clone->template with_lock<State, IOInterruptCondition>(
- [existed=existed, head=std::move(head), clone=std::move(clone),
- func=std::move(func), this]() -> load_obc_iertr::future<> {
- auto loaded = get_or_load_obc<State>(clone, existed);
- clone->head = head;
- return loaded.safe_then_interruptible([func = std::move(func)](auto clone) {
- return std::move(func)(std::move(clone));
- });
- });
- });
-}
-
-// explicitly instantiate the used instantiations
-template PG::load_obc_iertr::future<>
-PG::with_head_obc<RWState::RWNONE>(hobject_t, with_obc_func_t&&);
-
-template<RWState::State State>
-PG::interruptible_future<>
-PG::with_existing_clone_obc(ObjectContextRef clone, with_obc_func_t&& func)
-{
- assert(clone);
- assert(clone->get_head_obc());
- assert(!clone->get_oid().is_head());
- return with_existing_head_obc<RWState::RWREAD>(clone->get_head_obc(),
- [clone=std::move(clone), func=std::move(func)] ([[maybe_unused]] auto head) {
- assert(head == clone->get_head_obc());
- return clone->template with_lock<State>(
- [clone=std::move(clone), func=std::move(func)] {
- return std::move(func)(std::move(clone));
- });
- });
-}
-
-PG::load_obc_iertr::future<crimson::osd::ObjectContextRef>
-PG::load_obc(ObjectContextRef obc)
-{
- return backend->load_metadata(obc->get_oid()).safe_then_interruptible(
- [obc=std::move(obc)](auto md)
- -> load_obc_ertr::future<crimson::osd::ObjectContextRef> {
- const hobject_t& oid = md->os.oi.soid;
- logger().debug(
- "load_obc: loaded obs {} for {}", md->os.oi, oid);
- if (oid.is_head()) {
- if (!md->ssc) {
- logger().error(
- "load_obc: oid {} missing snapsetcontext", oid);
- return crimson::ct_error::object_corrupted::make();
- }
- obc->set_head_state(std::move(md->os), std::move(md->ssc));
- } else {
- obc->set_clone_state(std::move(md->os));
- }
- logger().debug(
- "load_obc: returning obc {} for {}",
- obc->obs.oi, obc->obs.oi.soid);
- return load_obc_ertr::make_ready_future<
- crimson::osd::ObjectContextRef>(obc);
- });
-}
-
-template<RWState::State State>
-PG::load_obc_iertr::future<crimson::osd::ObjectContextRef>
-PG::get_or_load_obc(
- crimson::osd::ObjectContextRef obc,
- bool existed)
-{
- auto loaded = load_obc_iertr::make_ready_future<ObjectContextRef>(obc);
- if (existed) {
- logger().debug("{}: found {} in cache", __func__, obc->get_oid());
- } else {
- logger().debug("{}: cache miss on {}", __func__, obc->get_oid());
- loaded = obc->template with_promoted_lock<State, IOInterruptCondition>(
- [obc, this] {
- return load_obc(obc);
- });
- }
- return loaded;
-}
-
-PG::load_obc_iertr::future<>
-PG::reload_obc(crimson::osd::ObjectContext& obc) const
-{
- assert(obc.is_head());
- return backend->load_metadata(obc.get_oid()).safe_then_interruptible<false>([&obc](auto md)
- -> load_obc_ertr::future<> {
- logger().debug(
- "{}: reloaded obs {} for {}",
- __func__,
- md->os.oi,
- obc.get_oid());
- if (!md->ssc) {
- logger().error(
- "{}: oid {} missing snapsetcontext",
- __func__,
- obc.get_oid());
- return crimson::ct_error::object_corrupted::make();
- }
- obc.set_head_state(std::move(md->os), std::move(md->ssc));
- return load_obc_ertr::now();
- });
-}
-
PG::load_obc_iertr::future<>
PG::with_locked_obc(const hobject_t &hobj,
const OpInfo &op_info,
switch (get_lock_type(op_info)) {
case RWState::RWREAD:
if (oid.is_head()) {
- return with_head_obc<RWState::RWREAD>(oid, std::move(f));
+ return obc_loader.with_head_obc<RWState::RWREAD>(oid, std::move(f));
} else {
- return with_clone_obc<RWState::RWREAD>(oid, std::move(f));
+ return obc_loader.with_clone_obc<RWState::RWREAD>(oid, std::move(f));
}
case RWState::RWWRITE:
if (oid.is_head()) {
- return with_head_obc<RWState::RWWRITE>(oid, std::move(f));
+ return obc_loader.with_head_obc<RWState::RWWRITE>(oid, std::move(f));
} else {
- return with_clone_obc<RWState::RWWRITE>(oid, std::move(f));
+ return obc_loader.with_clone_obc<RWState::RWWRITE>(oid, std::move(f));
}
case RWState::RWEXCL:
if (oid.is_head()) {
- return with_head_obc<RWState::RWEXCL>(oid, std::move(f));
+ return obc_loader.with_head_obc<RWState::RWEXCL>(oid, std::move(f));
} else {
- return with_clone_obc<RWState::RWEXCL>(oid, std::move(f));
+ return obc_loader.with_clone_obc<RWState::RWEXCL>(oid, std::move(f));
}
default:
ceph_abort();
};
}
-template <RWState::State State>
-PG::interruptible_future<>
-PG::with_locked_obc(ObjectContextRef obc, with_obc_func_t &&f)
-{
- // TODO: a question from rebase: do we really need such checks when
- // the interruptible stuff is being used?
- if (__builtin_expect(stopping, false)) {
- throw crimson::common::system_shutdown_exception();
- }
- if (obc->is_head()) {
- return with_existing_head_obc<State>(obc, std::move(f));
- } else {
- return with_existing_clone_obc<State>(obc, std::move(f));
- }
-}
-
-// explicitly instantiate the used instantiations
-template PG::interruptible_future<>
-PG::with_locked_obc<RWState::RWEXCL>(ObjectContextRef, with_obc_func_t&&);
-
PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
{
if (__builtin_expect(stopping, false)) {
#include "crimson/osd/pg_recovery.h"
#include "crimson/osd/pg_recovery_listener.h"
#include "crimson/osd/recovery_backend.h"
+#include "crimson/osd/object_context_loader.h"
class MQuery;
class OSDMap;
using interruptor = ::crimson::interruptible::interruptor<
::crimson::osd::IOInterruptCondition>;
- template<RWState::State State>
- load_obc_iertr::future<crimson::osd::ObjectContextRef>
- get_or_load_obc(
- crimson::osd::ObjectContextRef head_obc, bool existed);
-
- load_obc_iertr::future<crimson::osd::ObjectContextRef>
- load_obc(ObjectContextRef obc);
-
- load_obc_iertr::future<>
- reload_obc(crimson::osd::ObjectContext& obc) const;
-
public:
using with_obc_func_t =
std::function<load_obc_iertr::future<> (ObjectContextRef)>;
ObjectContext::obc_accessing_option_t>;
obc_accessing_list_t obc_set_accessing;
- template<RWState::State State>
- load_obc_iertr::future<> with_head_obc(hobject_t oid, with_obc_func_t&& func);
-
- template<RWState::State State>
- interruptible_future<> with_locked_obc(
- ObjectContextRef obc,
- with_obc_func_t&& f);
load_obc_iertr::future<> with_locked_obc(
const hobject_t &hobj,
const OpInfo &op_info,
eversion_t &version);
private:
- template<RWState::State State>
- load_obc_iertr::future<> with_head_obc(
- ObjectContextRef obc,
- bool existed,
- with_obc_func_t&& func);
- template<RWState::State State>
- interruptible_future<> with_existing_head_obc(
- ObjectContextRef head,
- with_obc_func_t&& func);
-
- template<RWState::State State>
- load_obc_iertr::future<> with_clone_obc(hobject_t oid, with_obc_func_t&& func);
- template<RWState::State State>
- interruptible_future<> with_existing_clone_obc(
- ObjectContextRef clone, with_obc_func_t&& func);
-
- load_obc_iertr::future<ObjectContextRef> get_locked_obc(
- Operation *op,
- const hobject_t &oid,
- RWState::State type);
-
void fill_op_params_bump_pg_version(
osd_op_params_t& osd_op_p,
const bool user_modify);
eversion_t projected_last_update;
public:
+ ObjectContextLoader obc_loader;
+
// PeeringListener
void publish_stats_to_osd() final;
void clear_publish_stats() final;
// start tracking the recovery of soid
return maybe_pull_missing_obj(soid, need).then_interruptible([this, soid, need] {
logger().debug("recover_object: loading obc: {}", soid);
- return pg.with_head_obc<RWState::RWREAD>(soid,
+ return pg.obc_loader.with_head_obc<RWState::RWREAD>(soid,
[this, soid, need](auto obc) {
logger().debug("recover_object: loaded obc: {}", obc->obs.oi.soid);
auto& recovery_waiter = get_recovering(soid);
auto prepare_waiter = interruptor::make_interruptible(
seastar::make_ready_future<>());
if (pi.recovery_progress.first) {
- prepare_waiter = pg.with_head_obc<RWState::RWNONE>(
+ prepare_waiter = pg.obc_loader.with_head_obc<RWState::RWNONE>(
pi.recovery_info.soid, [&pi, &recovery_waiter, &pop](auto obc) {
pi.obc = obc;
recovery_waiter.obc = obc;