From ffb66d4ad052e46fa20bcc9912a70386559a36b4 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Wed, 9 Oct 2019 15:07:55 -0700 Subject: [PATCH] crimson/osd: use ObjectContext and take obc locks Signed-off-by: Samuel Just --- src/crimson/osd/CMakeLists.txt | 1 + src/crimson/osd/ops_executer.cc | 7 +- src/crimson/osd/ops_executer.h | 20 +- .../osd/osd_operations/client_request.cc | 78 ++++- .../osd/osd_operations/client_request.h | 16 + src/crimson/osd/pg.cc | 277 +++++++++++++++--- src/crimson/osd/pg.h | 55 +++- src/crimson/osd/pg_backend.cc | 152 +++------- src/crimson/osd/pg_backend.h | 17 +- 9 files changed, 426 insertions(+), 197 deletions(-) diff --git a/src/crimson/osd/CMakeLists.txt b/src/crimson/osd/CMakeLists.txt index e976b9a7d76..ef4fc716743 100644 --- a/src/crimson/osd/CMakeLists.txt +++ b/src/crimson/osd/CMakeLists.txt @@ -23,6 +23,7 @@ add_executable(crimson-osd objclass.cc ${PROJECT_SOURCE_DIR}/src/objclass/class_api.cc ${PROJECT_SOURCE_DIR}/src/osd/ClassHandler.cc + ${PROJECT_SOURCE_DIR}/src/osd/osd_op_util.cc ${PROJECT_SOURCE_DIR}/src/osd/PeeringState.cc ${PROJECT_SOURCE_DIR}/src/osd/PGPeeringEvent.cc ${PROJECT_SOURCE_DIR}/src/osd/PGStateUtils.cc diff --git a/src/crimson/osd/ops_executer.cc b/src/crimson/osd/ops_executer.cc index 7124f49d549..8d9db0d58e7 100644 --- a/src/crimson/osd/ops_executer.cc +++ b/src/crimson/osd/ops_executer.cc @@ -63,7 +63,7 @@ OpsExecuter::call_errorator::future<> OpsExecuter::do_op_call(OSDOp& osd_op) } const auto flags = method->get_flags(); - if (!os->exists && (flags & CLS_METHOD_WR) == 0) { + if (!obc->obs.exists && (flags & CLS_METHOD_WR) == 0) { return crimson::ct_error::enoent::make(); } @@ -357,7 +357,10 @@ OpsExecuter::execute_osd_op(OSDOp& osd_op) // TODO: dispatch via call table? // TODO: we might want to find a way to unify both input and output // of each op. - logger().debug("handling op {}", ceph_osd_op_name(osd_op.op.op)); + logger().debug( + "handling op {} on object {}", + ceph_osd_op_name(osd_op.op.op), + get_target()); switch (const ceph_osd_op& op = osd_op.op; op.op) { case CEPH_OSD_OP_SYNC_READ: [[fallthrough]]; diff --git a/src/crimson/osd/ops_executer.h b/src/crimson/osd/ops_executer.h index 66ab7ad8b66..5641a211c89 100644 --- a/src/crimson/osd/ops_executer.h +++ b/src/crimson/osd/ops_executer.h @@ -72,7 +72,7 @@ private: virtual ~effect_t() = default; }; - PGBackend::cached_os_t os; + ObjectContextRef obc; PG& pg; PGBackend& backend; Ref msg; @@ -91,10 +91,14 @@ private: call_errorator::future<> do_op_call(class OSDOp& osd_op); + hobject_t &get_target() const { + return obc->obs.oi.soid; + } + template auto do_const_op(Func&& f) { // TODO: pass backend as read-only - return std::forward(f)(backend, std::as_const(*os)); + return std::forward(f)(backend, std::as_const(obc->obs)); } template @@ -107,7 +111,7 @@ private: template auto do_write_op(Func&& f) { ++num_write; - return std::forward(f)(backend, *os, txn); + return std::forward(f)(backend, obc->obs, txn); } // PG operations are being provided with pg instead of os. @@ -122,14 +126,14 @@ private: } public: - OpsExecuter(PGBackend::cached_os_t os, PG& pg, Ref msg) - : os(std::move(os)), + OpsExecuter(ObjectContextRef obc, PG& pg, Ref msg) + : obc(std::move(obc)), pg(pg), backend(pg.get_backend()), msg(std::move(msg)) { } OpsExecuter(PG& pg, Ref msg) - : OpsExecuter{PGBackend::cached_os_t{}, pg, std::move(msg)} + : OpsExecuter{ObjectContextRef(), pg, std::move(msg)} {} osd_op_errorator::future<> execute_osd_op(class OSDOp& osd_op); @@ -175,9 +179,9 @@ auto OpsExecuter::with_effect( template OpsExecuter::osd_op_errorator::future<> OpsExecuter::submit_changes(Func&& f) && { if (__builtin_expect(op_effects.empty(), true)) { - return std::forward(f)(std::move(txn), std::move(os)); + return std::forward(f)(std::move(txn), std::move(obc)); } - return std::forward(f)(std::move(txn), std::move(os)).safe_then([this] { + return std::forward(f)(std::move(txn), std::move(obc)).safe_then([this] { // let's do the cleaning of `op_effects` in destructor return crimson::do_for_each(op_effects, [] (auto& op_effect) { return op_effect->execute(); diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index 7a84afbb0de..320de146947 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -4,6 +4,7 @@ #include #include "messages/MOSDOp.h" +#include "messages/MOSDOpReply.h" #include "crimson/osd/pg.h" #include "crimson/osd/osd.h" @@ -43,11 +44,18 @@ ClientRequest::PGPipeline &ClientRequest::pp(PG &pg) return pg.client_request_pg_pipeline; } +bool ClientRequest::is_pg_op() const +{ + return std::any_of( + begin(m->ops), end(m->ops), + [](auto& op) { return ceph_osd_op_type_pg(op.op.op); }); +} + seastar::future<> ClientRequest::start() { logger().debug("{}: start", *this); - IRef ref = this; + IRef opref = this; return with_blocking_future(handle.enter(cp().await_map)) .then([this]() { return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_min_epoch())); @@ -55,21 +63,67 @@ seastar::future<> ClientRequest::start() return with_blocking_future(handle.enter(cp().get_pg)); }).then([this] { return with_blocking_future(osd.wait_for_pg(m->get_spg())); - }).then([this, ref=std::move(ref)](Ref pg) { + }).then([this, opref=std::move(opref)](Ref pgref) { return seastar::do_with( - std::move(pg), std::move(ref), [this](auto pg, auto op) { - return with_blocking_future( - handle.enter(pp(*pg).await_map) - ).then([this, pg] { + std::move(pgref), std::move(opref), [this](auto pgref, auto opref) { + PG &pg = *pgref; return with_blocking_future( - pg->osdmap_gate.wait_for_map(m->get_map_epoch())); - }).then([this, pg] (auto) { - return with_blocking_future(handle.enter(pp(*pg).process)); - }).then([this, pg] { - return pg->handle_op(conn.get(), std::move(m)); + handle.enter(pp(pg).await_map) + ).then([this, &pg]() mutable { + return with_blocking_future( + pg.osdmap_gate.wait_for_map(m->get_map_epoch())); + }).then([this, &pg](auto map) mutable { + return with_blocking_future( + handle.enter(pp(pg).wait_for_active)); + }).then([this, &pg]() mutable { + return pg.wait_for_active(); + }).then([this, &pg]() mutable { + if (m->finish_decode()) { + m->clear_payload(); + } + if (is_pg_op()) { + return process_pg_op(pg); + } else { + return process_op(pg); + } + }); }); - }); }); } +seastar::future<> ClientRequest::process_pg_op( + PG &pg) +{ + return pg.do_pg_ops(m) + .then([this](Ref reply) { + return conn->send(reply); + }); +} + +seastar::future<> ClientRequest::process_op( + PG &pg) +{ + return with_blocking_future( + handle.enter(pp(pg).get_obc) + ).then([this, &pg]() { + op_info.set_from_op(&*m, *pg.get_osdmap()); + return pg.with_locked_obc( + m, + op_info, + this, + [this, &pg](auto obc) { + return with_blocking_future(handle.enter(pp(pg).process) + ).then([this, &pg, obc]() { + return pg.do_osd_ops(m, obc); + }).then([this](Ref reply) { + return conn->send(reply); + }); + }); + }).safe_then([] { + return seastar::now(); + }, PG::load_obc_ertr::all_same_way([](auto &code) { + logger().error("ClientRequest saw error code {}", code); + return seastar::now(); + })); +} } diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h index 8940194f62f..eb81e0c3b94 100644 --- a/src/crimson/osd/osd_operations/client_request.h +++ b/src/crimson/osd/osd_operations/client_request.h @@ -3,6 +3,7 @@ #pragma once +#include "osd/osd_op_util.h" #include "crimson/net/Connection.h" #include "crimson/osd/osd_operation.h" #include "crimson/common/type_helpers.h" @@ -17,6 +18,7 @@ class ClientRequest final : public OperationT { OSD &osd; crimson::net::ConnectionRef conn; Ref m; + OpInfo op_info; OrderedPipelinePhase::Handle handle; public: @@ -33,6 +35,12 @@ public: OrderedPipelinePhase await_map = { "ClientRequest::PGPipeline::await_map" }; + OrderedPipelinePhase wait_for_active = { + "ClientRequest::PGPipeline::wait_for_active" + }; + OrderedPipelinePhase get_obc = { + "ClientRequest::PGPipeline::get_obc" + }; OrderedPipelinePhase process = { "ClientRequest::PGPipeline::process" }; @@ -45,9 +53,17 @@ public: void print(std::ostream &) const final; void dump_detail(Formatter *f) const final; + +public: seastar::future<> start(); private: + seastar::future<> process_pg_op( + PG &pg); + seastar::future<> process_op( + PG &pg); + bool is_pg_op() const; + ConnectionPipeline &cp(); PGPipeline &pp(PG &pg); }; diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index ed0f40b3505..9db75b01053 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -412,14 +412,14 @@ seastar::future<> PG::wait_for_active() } } -seastar::future<> PG::submit_transaction(boost::local_shared_ptr&& os, +seastar::future<> PG::submit_transaction(ObjectContextRef&& obc, ceph::os::Transaction&& txn, const MOSDOp& req) { epoch_t map_epoch = get_osdmap_epoch(); eversion_t at_version{map_epoch, projected_last_update.version + 1}; return backend->mutate_object(peering_state.get_acting_recovery_backfill(), - std::move(os), + std::move(obc), std::move(txn), req, peering_state.get_last_peering_reset(), @@ -433,56 +433,80 @@ seastar::future<> PG::submit_transaction(boost::local_shared_ptr&& }); } -seastar::future> PG::do_osd_ops(Ref m) +seastar::future> PG::do_osd_ops( + Ref m, + ObjectContextRef obc) { using osd_op_errorator = OpsExecuter::osd_op_errorator; const auto oid = m->get_snapid() == CEPH_SNAPDIR ? m->get_hobj().get_head() : m->get_hobj(); - return backend->get_object_state(oid).safe_then([this, m](auto os) mutable { - auto ox = - std::make_unique(std::move(os), *this/* as const& */, m); - return crimson::do_for_each(m->ops, [this, ox = ox.get()](OSDOp& osd_op) { - logger().debug("will be handling op {}", ceph_osd_op_name(osd_op.op.op)); - return ox->execute_osd_op(osd_op); - }).safe_then([this, m, ox = std::move(ox)] { - logger().debug("all operations have been executed successfully"); - return std::move(*ox).submit_changes([this, m] (auto&& txn, auto&& os) -> osd_op_errorator::future<> { + auto ox = + std::make_unique(obc, *this/* as const& */, m); + return crimson::do_for_each( + m->ops, [this, obc, m, ox = ox.get()](OSDOp& osd_op) { + logger().debug( + "do_osd_ops: {} - object {} - handling op {}", + *m, + obc->obs.oi.soid, + ceph_osd_op_name(osd_op.op.op)); + return ox->execute_osd_op(osd_op); + }).safe_then([this, obc, m, ox = std::move(ox)] { + logger().debug( + "do_osd_ops: {} - object {} all operations successful", + *m, + obc->obs.oi.soid); + return std::move(*ox).submit_changes( + [this, m] (auto&& txn, auto&& obc) -> osd_op_errorator::future<> { // XXX: the entire lambda could be scheduled conditionally. ::if_then()? if (txn.empty()) { - logger().debug("txn is empty, bypassing mutate"); + logger().debug( + "do_osd_ops: {} - object {} txn is empty, bypassing mutate", + *m, + obc->obs.oi.soid); return osd_op_errorator::now(); } else { - return submit_transaction(std::move(os), std::move(txn), *m); + logger().debug( + "do_osd_ops: {} - object {} submitting txn", + *m, + obc->obs.oi.soid); + return submit_transaction(std::move(obc), std::move(txn), *m); } }); - }); - }).safe_then([m,this] { + }).safe_then([m, obc, this] { auto reply = make_message(m.get(), 0, get_osdmap_epoch(), 0, false); reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK); + logger().debug( + "do_osd_ops: {} - object {} sending reply", + *m, + obc->obs.oi.soid); return seastar::make_ready_future>(std::move(reply)); }, OpsExecuter::osd_op_errorator::all_same_way([=,&oid] (const std::error_code& e) { assert(e.value() > 0); - logger().debug("got statical error code while handling object {}: {} ({})", - oid, e.value(), e.message()); - return backend->evict_object_state(oid).then([=] { - auto reply = make_message( - m.get(), -e.value(), get_osdmap_epoch(), 0, false); - reply->set_enoent_reply_versions(peering_state.get_info().last_update, - peering_state.get_info().last_user_version); - return seastar::make_ready_future>(std::move(reply)); - }); + logger().debug( + "do_osd_ops: {} - object {} got error code {}, {}", + *m, + obc->obs.oi.soid, + e.value(), + e.message()); + auto reply = make_message( + m.get(), -e.value(), get_osdmap_epoch(), 0, false); + reply->set_enoent_reply_versions(peering_state.get_info().last_update, + peering_state.get_info().last_user_version); + return seastar::make_ready_future>(std::move(reply)); })).handle_exception_type([=,&oid](const crimson::osd::error& e) { // we need this handler because throwing path which aren't errorated yet. - logger().debug("got ceph::osd::error while handling object {}: {} ({})", - oid, e.code(), e.what()); - return backend->evict_object_state(oid).then([=] { - auto reply = make_message( - m.get(), -e.code().value(), get_osdmap_epoch(), 0, false); - reply->set_enoent_reply_versions(peering_state.get_info().last_update, - peering_state.get_info().last_user_version); - return seastar::make_ready_future>(std::move(reply)); - }); + logger().debug( + "do_osd_ops: {} - object {} got unhandled exception {} ({})", + *m, + obc->obs.oi.soid, + e.code(), + e.what()); + auto reply = make_message( + m.get(), -e.code().value(), get_osdmap_epoch(), 0, false); + reply->set_enoent_reply_versions(peering_state.get_info().last_update, + peering_state.get_info().last_user_version); + return seastar::make_ready_future>(std::move(reply)); }); } @@ -506,22 +530,183 @@ seastar::future> PG::do_pg_ops(Ref m) }); } -seastar::future<> PG::handle_op(crimson::net::Connection* conn, - Ref m) +std::pair PG::get_oid_and_lock( + const MOSDOp &m, + const OpInfo &op_info) { - return wait_for_active().then([conn, m, this] { - if (m->finish_decode()) { - m->clear_payload(); + auto oid = m.get_snapid() == CEPH_SNAPDIR ? + m.get_hobj().get_head() : m.get_hobj(); + + RWState::State lock_type = RWState::RWNONE; + if (op_info.rwordered() && op_info.may_read()) { + lock_type = RWState::RWState::RWEXCL; + } else if (op_info.rwordered()) { + lock_type = RWState::RWState::RWWRITE; + } else { + ceph_assert(op_info.may_read()); + lock_type = RWState::RWState::RWREAD; + } + return std::make_pair(oid, lock_type); +} + +std::optional PG::resolve_oid( + const SnapSet &ss, + const hobject_t &oid) +{ + if (oid.snap > ss.seq) { + return oid.get_head(); + } else { + // which clone would it be? + auto clone = std::upper_bound( + begin(ss.clones), end(ss.clones), + oid.snap); + if (clone == end(ss.clones)) { + // Doesn't exist, > last clone, < ss.seq + return std::nullopt; } - if (std::any_of(begin(m->ops), end(m->ops), - [](auto& op) { return ceph_osd_op_type_pg(op.op.op); })) { - return do_pg_ops(m); + auto citer = ss.clone_snaps.find(*clone); + // TODO: how do we want to handle this kind of logic error? + ceph_assert(citer != ss.clone_snaps.end()); + + if (std::find( + citer->second.begin(), + citer->second.end(), + *clone) == citer->second.end()) { + return std::nullopt; } else { - return do_osd_ops(m); + auto soid = oid; + soid.snap = *clone; + return std::optional(soid); } - }).then([conn](Ref reply) { - return conn->send(reply); - }); + } +} + +PG::load_obc_ertr::future< + std::pair> +PG::get_or_load_clone_obc(hobject_t oid, ObjectContextRef head) +{ + ceph_assert(!oid.is_head()); + using ObjectContextRef = crimson::osd::ObjectContextRef; + auto coid = resolve_oid(head->get_ro_ss(), oid); + if (!coid) { + return load_obc_ertr::make_ready_future< + std::pair>( + std::make_pair(ObjectContextRef(), true) + ); + } + auto [obc, existed] = shard_services.obc_registry.get_cached_obc(*coid); + if (existed) { + return load_obc_ertr::make_ready_future< + std::pair>( + std::make_pair(obc, true) + ); + } else { + bool got = obc->maybe_get_excl(); + ceph_assert(got); + return backend->load_metadata(*coid).safe_then( + [oid, obc=std::move(obc), head, this](auto &&md) mutable { + obc->set_clone_state(std::move(md->os), std::move(head)); + return load_obc_ertr::make_ready_future< + std::pair>( + std::make_pair(obc, false) + ); + }); + } +} + +PG::load_obc_ertr::future< + std::pair> +PG::get_or_load_head_obc(hobject_t oid) +{ + ceph_assert(oid.is_head()); + auto [obc, existed] = shard_services.obc_registry.get_cached_obc(oid); + if (existed) { + logger().debug( + "{}: found {} in cache", + __func__, + oid); + return load_obc_ertr::make_ready_future< + std::pair>( + std::make_pair(std::move(obc), true) + ); + } else { + logger().debug( + "{}: cache miss on {}", + __func__, + oid); + bool got = obc->maybe_get_excl(); + ceph_assert(got); + return backend->load_metadata(oid).safe_then( + [oid, obc=std::move(obc), this](auto md) -> + load_obc_ertr::future< + std::pair> + { + logger().debug( + "{}: loaded obs {} for {}", + __func__, + md->os.oi, + oid); + if (!md->ss) { + logger().error( + "{}: oid {} missing snapset", + __func__, + oid); + return crimson::ct_error::object_corrupted::make(); + } + obc->set_head_state(std::move(md->os), std::move(*(md->ss))); + logger().debug( + "{}: returning obc {} for {}", + __func__, + obc->obs.oi, + obc->obs.oi.soid); + return load_obc_ertr::make_ready_future< + std::pair>( + std::make_pair(obc, false) + ); + }); + } +} + +PG::load_obc_ertr::future +PG::get_locked_obc( + Operation *op, const hobject_t &oid, RWState::State type) +{ + return get_or_load_head_obc(oid.get_head()).safe_then( + [this, op, oid, type](auto p) -> load_obc_ertr::future{ + auto &[head_obc, head_existed] = p; + if (oid.is_head()) { + if (head_existed) { + return head_obc->get_lock_type(op, type).then([head_obc] { + ceph_assert(head_obc->loaded); + return load_obc_ertr::make_ready_future(head_obc); + }); + } else { + head_obc->degrade_excl_to(type); + return load_obc_ertr::make_ready_future(head_obc); + } + } else { + return head_obc->get_lock_type(op, RWState::RWREAD).then( + [this, head_obc, op, oid, type] { + ceph_assert(head_obc->loaded); + return get_or_load_clone_obc(oid, head_obc); + }).safe_then([this, head_obc, op, oid, type](auto p) { + auto &[obc, existed] = p; + if (existed) { + return load_obc_ertr::future<>( + obc->get_lock_type(op, type)).safe_then([obc] { + ceph_assert(obc->loaded); + return load_obc_ertr::make_ready_future(obc); + }); + } else { + obc->degrade_excl_to(type); + return load_obc_ertr::make_ready_future(obc); + } + }).safe_then([head_obc](auto obc) { + head_obc->put_lock_type(RWState::RWREAD); + return load_obc_ertr::make_ready_future(obc); + }); + } + }); } seastar::future<> PG::handle_rep_op(Ref req) diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 4c5cd11b542..fd6c90dbadf 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -15,7 +15,7 @@ #include "crimson/net/Fwd.h" #include "os/Transaction.h" #include "osd/osd_types.h" -#include "osd/osd_internal_types.h" +#include "crimson/osd/object_context.h" #include "osd/PeeringState.h" #include "crimson/common/type_helpers.h" @@ -435,8 +435,46 @@ public: void handle_advance_map(cached_map_t next_map, PeeringCtx &rctx); void handle_activate_map(PeeringCtx &rctx); void handle_initialize(PeeringCtx &rctx); - seastar::future<> handle_op(crimson::net::Connection* conn, - Ref m); + + static std::pair get_oid_and_lock( + const MOSDOp &m, + const OpInfo &op_info); + static std::optional resolve_oid( + const SnapSet &snapset, + const hobject_t &oid); + + using load_obc_ertr = crimson::errorator< + crimson::ct_error::object_corrupted>; + load_obc_ertr::future< + std::pair> + get_or_load_clone_obc( + hobject_t oid, crimson::osd::ObjectContextRef head_obc); + + load_obc_ertr::future< + std::pair> + get_or_load_head_obc(hobject_t oid); + + load_obc_ertr::future get_locked_obc( + Operation *op, + const hobject_t &oid, + RWState::State type); +public: + template + auto with_locked_obc( + Ref &m, + const OpInfo &op_info, + Operation *op, + F &&f) { + auto [oid, type] = get_oid_and_lock(*m, op_info); + return get_locked_obc(op, oid, type) + .safe_then([this, f=std::forward(f), type](auto obc) { + return f(obc).finally([this, obc, type] { + obc->put_lock_type(type); + return load_obc_ertr::now(); + }); + }); + } + seastar::future<> handle_rep_op(Ref m); void handle_rep_op_reply(crimson::net::Connection* conn, const MOSDRepOpReply& m); @@ -447,7 +485,9 @@ private: void do_peering_event( const boost::statechart::event_base &evt, PeeringCtx &rctx); - seastar::future> do_osd_ops(Ref m); + seastar::future> do_osd_ops( + Ref m, + ObjectContextRef obc); seastar::future> do_pg_ops(Ref m); seastar::future<> do_osd_op( ObjectState& os, @@ -456,7 +496,7 @@ private: seastar::future do_pgnls(ceph::bufferlist& indata, const std::string& nspace, uint64_t limit); - seastar::future<> submit_transaction(boost::local_shared_ptr&& os, + seastar::future<> submit_transaction(ObjectContextRef&& obc, ceph::os::Transaction&& txn, const MOSDOp& req); @@ -465,6 +505,11 @@ private: ShardServices &shard_services; cached_map_t osdmap; + +public: + cached_map_t get_osdmap() { return osdmap; } + +private: std::unique_ptr backend; PeeringState peering_state; diff --git a/src/crimson/osd/pg_backend.cc b/src/crimson/osd/pg_backend.cc index a0a59b466dd..415c2e7d0f9 100644 --- a/src/crimson/osd/pg_backend.cc +++ b/src/crimson/osd/pg_backend.cc @@ -13,6 +13,7 @@ #include "messages/MOSDOp.h" #include "os/Transaction.h" +#include "common/Clock.h" #include "crimson/os/cyanstore/cyan_object.h" #include "crimson/os/futurized_collection.h" @@ -60,128 +61,63 @@ PGBackend::PGBackend(shard_id_t shard, store{store} {} -PGBackend::get_os_errorator::future -PGBackend::get_object_state(const hobject_t& oid) -{ - // want the head? - if (oid.snap == CEPH_NOSNAP) { - logger().trace("find_object: {}@HEAD", oid); - return _load_os(oid); - } else { - // we want a snap - return _load_ss(oid).safe_then( - [oid,this](cached_ss_t ss) -> get_os_errorator::future { - // head? - if (oid.snap > ss->seq) { - return _load_os(oid.get_head()); - } else { - // which clone would it be? - auto clone = std::upper_bound(begin(ss->clones), end(ss->clones), - oid.snap); - if (clone == end(ss->clones)) { - return crimson::ct_error::enoent::make(); - } - // clone - auto soid = oid; - soid.snap = *clone; - return _load_ss(soid).safe_then( - [soid,this](cached_ss_t ss) -> get_os_errorator::future { - auto clone_snap = ss->clone_snaps.find(soid.snap); - assert(clone_snap != end(ss->clone_snaps)); - if (clone_snap->second.empty()) { - logger().trace("find_object: {}@[] -- DNE", soid); - return crimson::ct_error::enoent::make(); - } - auto first = clone_snap->second.back(); - auto last = clone_snap->second.front(); - if (first > soid.snap) { - logger().trace("find_object: {}@[{},{}] -- DNE", - soid, first, last); - return crimson::ct_error::enoent::make(); - } - logger().trace("find_object: {}@[{},{}] -- HIT", - soid, first, last); - return _load_os(soid); - }); - } - }); - } -} - -PGBackend::load_metadata_ertr::future +PGBackend::load_metadata_ertr::future PGBackend::load_metadata(const hobject_t& oid) { return store->get_attrs( coll, ghobject_t{oid, ghobject_t::NO_GEN, shard}).safe_then( - [oid, this](auto &&attrs) -> load_metadata_ertr::future{ - loaded_object_md_t ret; + [oid, this](auto &&attrs) -> load_metadata_ertr::future{ + loaded_object_md_t::ref ret(new loaded_object_md_t()); if (auto oiiter = attrs.find(OI_ATTR); oiiter != attrs.end()) { bufferlist bl; - bl.push_back(oiiter->second); - ret.os = ObjectState( + bl.push_back(std::move(oiiter->second)); + ret->os = ObjectState( object_info_t(bl), true); } else { + logger().error( + "load_metadata: object {} present but missing object info", + oid); return crimson::ct_error::object_corrupted::make(); } if (oid.is_head()) { if (auto ssiter = attrs.find(SS_ATTR); ssiter != attrs.end()) { bufferlist bl; - bl.push_back(ssiter->second); - ret.ss = SnapSet(bl); + bl.push_back(std::move(ssiter->second)); + ret->ss = SnapSet(bl); } else { - return crimson::ct_error::object_corrupted::make(); + /* TODO: add support for writing out snapsets + logger().error( + "load_metadata: object {} present but missing snapset", + oid); + //return crimson::ct_error::object_corrupted::make(); + */ + ret->ss = SnapSet(); } } - return load_metadata_ertr::make_ready_future( + return load_metadata_ertr::make_ready_future( std::move(ret)); }, crimson::ct_error::enoent::handle([oid, this] { - return load_metadata_ertr::make_ready_future( - loaded_object_md_t{ - ObjectState(), - std::nullopt + logger().debug( + "load_metadata: object {} doesn't exist, returning empty metadata", + oid); + return load_metadata_ertr::make_ready_future( + new loaded_object_md_t{ + ObjectState( + object_info_t(oid), + false), + oid.is_head() ? std::optional(SnapSet()) : std::nullopt }); })); } -PGBackend::get_os_errorator::future -PGBackend::_load_os(const hobject_t& oid) -{ - if (auto found = os_cache.find(oid); found) { - return get_os_errorator::make_ready_future(std::move(found)); - } - return load_metadata(oid).safe_then([oid, this](auto &&md) { - return get_os_errorator::make_ready_future( - os_cache.insert( - oid, - std::make_unique(std::move(md.os)))); - }); -} - -PGBackend::get_os_errorator::future -PGBackend::_load_ss(const hobject_t& oid) -{ - if (auto found = ss_cache.find(oid); found) { - return get_os_errorator::make_ready_future(std::move(found)); - } - return load_metadata(oid).safe_then([oid, this](auto &&md) { - if (!md.ss) { - return get_os_errorator::make_ready_future( - std::make_unique()); - } else { - return get_os_errorator::make_ready_future( - ss_cache.insert(oid, std::make_unique(std::move(*(md.ss))))); - } - }); -} - seastar::future PGBackend::mutate_object( std::set pg_shards, - cached_os_t&& os, + crimson::osd::ObjectContextRef &&obc, ceph::os::Transaction&& txn, const MOSDOp& m, epoch_t min_epoch, @@ -189,36 +125,30 @@ PGBackend::mutate_object( eversion_t ver) { logger().trace("mutate_object: num_ops={}", txn.get_num_ops()); - if (os->exists) { + if (obc->obs.exists) { #if 0 - os.oi.version = ctx->at_version; - os.oi.prior_version = ctx->obs->oi.version; + obc->obs.oi.version = ctx->at_version; + obc->obs.oi.prior_version = ctx->obs->oi.version; #endif - os->oi.last_reqid = m.get_reqid(); - os->oi.mtime = m.get_mtime(); - os->oi.local_mtime = ceph_clock_now(); + obc->obs.oi.last_reqid = m.get_reqid(); + obc->obs.oi.mtime = m.get_mtime(); + obc->obs.oi.local_mtime = ceph_clock_now(); // object_info_t { ceph::bufferlist osv; - encode(os->oi, osv, 0); + encode(obc->obs.oi, osv, 0); // TODO: get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr)); - txn.setattr(coll->get_cid(), ghobject_t{os->oi.soid}, OI_ATTR, osv); + txn.setattr(coll->get_cid(), ghobject_t{obc->obs.oi.soid}, OI_ATTR, osv); } } else { // reset cached ObjectState without enforcing eviction - os->oi = object_info_t(os->oi.soid); + obc->obs.oi = object_info_t(obc->obs.oi.soid); } - return _submit_transaction(std::move(pg_shards), os->oi.soid, std::move(txn), - m.get_reqid(), min_epoch, map_epoch, ver); -} - -seastar::future<> -PGBackend::evict_object_state(const hobject_t& oid) -{ - os_cache.erase(oid); - return seastar::now(); + return _submit_transaction( + std::move(pg_shards), obc->obs.oi.soid, std::move(txn), + m.get_reqid(), min_epoch, map_epoch, ver); } static inline bool _read_verify_data( diff --git a/src/crimson/osd/pg_backend.h b/src/crimson/osd/pg_backend.h index b2ba4fd2c7b..d72997eff66 100644 --- a/src/crimson/osd/pg_backend.h +++ b/src/crimson/osd/pg_backend.h @@ -43,10 +43,6 @@ public: crimson::os::CollectionRef coll, crimson::osd::ShardServices& shard_services, const ec_profile_t& ec_profile); - using cached_os_t = boost::local_shared_ptr; - using get_os_errorator = crimson::errorator; - get_os_errorator::future get_object_state(const hobject_t& oid); - seastar::future<> evict_object_state(const hobject_t& oid); using read_errorator = ll_read_errorator::extend< crimson::ct_error::input_output_error, @@ -58,6 +54,7 @@ public: size_t truncate_size, uint32_t truncate_seq, uint32_t flags); + using stat_errorator = crimson::errorator; stat_errorator::future<> stat( const ObjectState& os, @@ -80,7 +77,7 @@ public: ceph::os::Transaction& trans); seastar::future mutate_object( std::set pg_shards, - cached_os_t&& os, + crimson::osd::ObjectContextRef &&obc, ceph::os::Transaction&& txn, const MOSDOp& m, epoch_t min_epoch, @@ -129,18 +126,12 @@ public: struct loaded_object_md_t { ObjectState os; std::optional ss; + using ref = std::unique_ptr; }; - load_metadata_ertr::future load_metadata( + load_metadata_ertr::future load_metadata( const hobject_t &oid); private: - using cached_ss_t = boost::local_shared_ptr; - SharedLRU ss_cache; - get_os_errorator::future _load_ss(const hobject_t& oid); - - SharedLRU os_cache; - get_os_errorator::future _load_os(const hobject_t& oid); - virtual ll_read_errorator::future _read( const hobject_t& hoid, size_t offset, -- 2.39.5