objclass.cc
${PROJECT_SOURCE_DIR}/src/objclass/class_api.cc
${PROJECT_SOURCE_DIR}/src/osd/ClassHandler.cc
+ ${PROJECT_SOURCE_DIR}/src/osd/osd_op_util.cc
${PROJECT_SOURCE_DIR}/src/osd/PeeringState.cc
${PROJECT_SOURCE_DIR}/src/osd/PGPeeringEvent.cc
${PROJECT_SOURCE_DIR}/src/osd/PGStateUtils.cc
}
const auto flags = method->get_flags();
- if (!os->exists && (flags & CLS_METHOD_WR) == 0) {
+ if (!obc->obs.exists && (flags & CLS_METHOD_WR) == 0) {
return crimson::ct_error::enoent::make();
}
// TODO: dispatch via call table?
// TODO: we might want to find a way to unify both input and output
// of each op.
- logger().debug("handling op {}", ceph_osd_op_name(osd_op.op.op));
+ logger().debug(
+ "handling op {} on object {}",
+ ceph_osd_op_name(osd_op.op.op),
+ get_target());
switch (const ceph_osd_op& op = osd_op.op; op.op) {
case CEPH_OSD_OP_SYNC_READ:
[[fallthrough]];
virtual ~effect_t() = default;
};
- PGBackend::cached_os_t os;
+ ObjectContextRef obc;
PG& pg;
PGBackend& backend;
Ref<MOSDOp> msg;
call_errorator::future<> do_op_call(class OSDOp& osd_op);
+ hobject_t &get_target() const {
+ return obc->obs.oi.soid;
+ }
+
template <class Func>
auto do_const_op(Func&& f) {
// TODO: pass backend as read-only
- return std::forward<Func>(f)(backend, std::as_const(*os));
+ return std::forward<Func>(f)(backend, std::as_const(obc->obs));
}
template <class Func>
template <class Func>
auto do_write_op(Func&& f) {
++num_write;
- return std::forward<Func>(f)(backend, *os, txn);
+ return std::forward<Func>(f)(backend, obc->obs, txn);
}
// PG operations are being provided with pg instead of os.
}
public:
- OpsExecuter(PGBackend::cached_os_t os, PG& pg, Ref<MOSDOp> msg)
- : os(std::move(os)),
+ OpsExecuter(ObjectContextRef obc, PG& pg, Ref<MOSDOp> msg)
+ : obc(std::move(obc)),
pg(pg),
backend(pg.get_backend()),
msg(std::move(msg)) {
}
OpsExecuter(PG& pg, Ref<MOSDOp> msg)
- : OpsExecuter{PGBackend::cached_os_t{}, pg, std::move(msg)}
+ : OpsExecuter{ObjectContextRef(), pg, std::move(msg)}
{}
osd_op_errorator::future<> execute_osd_op(class OSDOp& osd_op);
template <typename Func>
OpsExecuter::osd_op_errorator::future<> OpsExecuter::submit_changes(Func&& f) && {
if (__builtin_expect(op_effects.empty(), true)) {
- return std::forward<Func>(f)(std::move(txn), std::move(os));
+ return std::forward<Func>(f)(std::move(txn), std::move(obc));
}
- return std::forward<Func>(f)(std::move(txn), std::move(os)).safe_then([this] {
+ return std::forward<Func>(f)(std::move(txn), std::move(obc)).safe_then([this] {
// let's do the cleaning of `op_effects` in destructor
return crimson::do_for_each(op_effects, [] (auto& op_effect) {
return op_effect->execute();
#include <seastar/core/future.hh>
#include "messages/MOSDOp.h"
+#include "messages/MOSDOpReply.h"
#include "crimson/osd/pg.h"
#include "crimson/osd/osd.h"
return pg.client_request_pg_pipeline;
}
+bool ClientRequest::is_pg_op() const
+{
+ return std::any_of(
+ begin(m->ops), end(m->ops),
+ [](auto& op) { return ceph_osd_op_type_pg(op.op.op); });
+}
+
seastar::future<> ClientRequest::start()
{
logger().debug("{}: start", *this);
- IRef ref = this;
+ IRef opref = this;
return with_blocking_future(handle.enter(cp().await_map))
.then([this]() {
return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_min_epoch()));
return with_blocking_future(handle.enter(cp().get_pg));
}).then([this] {
return with_blocking_future(osd.wait_for_pg(m->get_spg()));
- }).then([this, ref=std::move(ref)](Ref<PG> pg) {
+ }).then([this, opref=std::move(opref)](Ref<PG> pgref) {
return seastar::do_with(
- std::move(pg), std::move(ref), [this](auto pg, auto op) {
- return with_blocking_future(
- handle.enter(pp(*pg).await_map)
- ).then([this, pg] {
+ std::move(pgref), std::move(opref), [this](auto pgref, auto opref) {
+ PG &pg = *pgref;
return with_blocking_future(
- pg->osdmap_gate.wait_for_map(m->get_map_epoch()));
- }).then([this, pg] (auto) {
- return with_blocking_future(handle.enter(pp(*pg).process));
- }).then([this, pg] {
- return pg->handle_op(conn.get(), std::move(m));
+ handle.enter(pp(pg).await_map)
+ ).then([this, &pg]() mutable {
+ return with_blocking_future(
+ pg.osdmap_gate.wait_for_map(m->get_map_epoch()));
+ }).then([this, &pg](auto map) mutable {
+ return with_blocking_future(
+ handle.enter(pp(pg).wait_for_active));
+ }).then([this, &pg]() mutable {
+ return pg.wait_for_active();
+ }).then([this, &pg]() mutable {
+ if (m->finish_decode()) {
+ m->clear_payload();
+ }
+ if (is_pg_op()) {
+ return process_pg_op(pg);
+ } else {
+ return process_op(pg);
+ }
+ });
});
- });
});
}
+seastar::future<> ClientRequest::process_pg_op(
+ PG &pg)
+{
+ return pg.do_pg_ops(m)
+ .then([this](Ref<MOSDOpReply> reply) {
+ return conn->send(reply);
+ });
+}
+
+seastar::future<> ClientRequest::process_op(
+ PG &pg)
+{
+ return with_blocking_future(
+ handle.enter(pp(pg).get_obc)
+ ).then([this, &pg]() {
+ op_info.set_from_op(&*m, *pg.get_osdmap());
+ return pg.with_locked_obc(
+ m,
+ op_info,
+ this,
+ [this, &pg](auto obc) {
+ return with_blocking_future(handle.enter(pp(pg).process)
+ ).then([this, &pg, obc]() {
+ return pg.do_osd_ops(m, obc);
+ }).then([this](Ref<MOSDOpReply> reply) {
+ return conn->send(reply);
+ });
+ });
+ }).safe_then([] {
+ return seastar::now();
+ }, PG::load_obc_ertr::all_same_way([](auto &code) {
+ logger().error("ClientRequest saw error code {}", code);
+ return seastar::now();
+ }));
+}
}
#pragma once
+#include "osd/osd_op_util.h"
#include "crimson/net/Connection.h"
#include "crimson/osd/osd_operation.h"
#include "crimson/common/type_helpers.h"
OSD &osd;
crimson::net::ConnectionRef conn;
Ref<MOSDOp> m;
+ OpInfo op_info;
OrderedPipelinePhase::Handle handle;
public:
OrderedPipelinePhase await_map = {
"ClientRequest::PGPipeline::await_map"
};
+ OrderedPipelinePhase wait_for_active = {
+ "ClientRequest::PGPipeline::wait_for_active"
+ };
+ OrderedPipelinePhase get_obc = {
+ "ClientRequest::PGPipeline::get_obc"
+ };
OrderedPipelinePhase process = {
"ClientRequest::PGPipeline::process"
};
void print(std::ostream &) const final;
void dump_detail(Formatter *f) const final;
+
+public:
seastar::future<> start();
private:
+ seastar::future<> process_pg_op(
+ PG &pg);
+ seastar::future<> process_op(
+ PG &pg);
+ bool is_pg_op() const;
+
ConnectionPipeline &cp();
PGPipeline &pp(PG &pg);
};
}
}
-seastar::future<> PG::submit_transaction(boost::local_shared_ptr<ObjectState>&& os,
+seastar::future<> PG::submit_transaction(ObjectContextRef&& obc,
ceph::os::Transaction&& txn,
const MOSDOp& req)
{
epoch_t map_epoch = get_osdmap_epoch();
eversion_t at_version{map_epoch, projected_last_update.version + 1};
return backend->mutate_object(peering_state.get_acting_recovery_backfill(),
- std::move(os),
+ std::move(obc),
std::move(txn),
req,
peering_state.get_last_peering_reset(),
});
}
-seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(Ref<MOSDOp> m)
+seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(
+ Ref<MOSDOp> m,
+ ObjectContextRef obc)
{
using osd_op_errorator = OpsExecuter::osd_op_errorator;
const auto oid = m->get_snapid() == CEPH_SNAPDIR ? m->get_hobj().get_head()
: m->get_hobj();
- return backend->get_object_state(oid).safe_then([this, m](auto os) mutable {
- auto ox =
- std::make_unique<OpsExecuter>(std::move(os), *this/* as const& */, m);
- return crimson::do_for_each(m->ops, [this, ox = ox.get()](OSDOp& osd_op) {
- logger().debug("will be handling op {}", ceph_osd_op_name(osd_op.op.op));
- return ox->execute_osd_op(osd_op);
- }).safe_then([this, m, ox = std::move(ox)] {
- logger().debug("all operations have been executed successfully");
- return std::move(*ox).submit_changes([this, m] (auto&& txn, auto&& os) -> osd_op_errorator::future<> {
+ auto ox =
+ std::make_unique<OpsExecuter>(obc, *this/* as const& */, m);
+ return crimson::do_for_each(
+ m->ops, [this, obc, m, ox = ox.get()](OSDOp& osd_op) {
+ logger().debug(
+ "do_osd_ops: {} - object {} - handling op {}",
+ *m,
+ obc->obs.oi.soid,
+ ceph_osd_op_name(osd_op.op.op));
+ return ox->execute_osd_op(osd_op);
+ }).safe_then([this, obc, m, ox = std::move(ox)] {
+ logger().debug(
+ "do_osd_ops: {} - object {} all operations successful",
+ *m,
+ obc->obs.oi.soid);
+ return std::move(*ox).submit_changes(
+ [this, m] (auto&& txn, auto&& obc) -> osd_op_errorator::future<> {
// XXX: the entire lambda could be scheduled conditionally. ::if_then()?
if (txn.empty()) {
- logger().debug("txn is empty, bypassing mutate");
+ logger().debug(
+ "do_osd_ops: {} - object {} txn is empty, bypassing mutate",
+ *m,
+ obc->obs.oi.soid);
return osd_op_errorator::now();
} else {
- return submit_transaction(std::move(os), std::move(txn), *m);
+ logger().debug(
+ "do_osd_ops: {} - object {} submitting txn",
+ *m,
+ obc->obs.oi.soid);
+ return submit_transaction(std::move(obc), std::move(txn), *m);
}
});
- });
- }).safe_then([m,this] {
+ }).safe_then([m, obc, this] {
auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
0, false);
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
+ logger().debug(
+ "do_osd_ops: {} - object {} sending reply",
+ *m,
+ obc->obs.oi.soid);
return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
}, OpsExecuter::osd_op_errorator::all_same_way([=,&oid] (const std::error_code& e) {
assert(e.value() > 0);
- logger().debug("got statical error code while handling object {}: {} ({})",
- oid, e.value(), e.message());
- return backend->evict_object_state(oid).then([=] {
- auto reply = make_message<MOSDOpReply>(
- m.get(), -e.value(), get_osdmap_epoch(), 0, false);
- reply->set_enoent_reply_versions(peering_state.get_info().last_update,
- peering_state.get_info().last_user_version);
- return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
- });
+ logger().debug(
+ "do_osd_ops: {} - object {} got error code {}, {}",
+ *m,
+ obc->obs.oi.soid,
+ e.value(),
+ e.message());
+ auto reply = make_message<MOSDOpReply>(
+ m.get(), -e.value(), get_osdmap_epoch(), 0, false);
+ reply->set_enoent_reply_versions(peering_state.get_info().last_update,
+ peering_state.get_info().last_user_version);
+ return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
})).handle_exception_type([=,&oid](const crimson::osd::error& e) {
// we need this handler because throwing path which aren't errorated yet.
- logger().debug("got ceph::osd::error while handling object {}: {} ({})",
- oid, e.code(), e.what());
- return backend->evict_object_state(oid).then([=] {
- auto reply = make_message<MOSDOpReply>(
- m.get(), -e.code().value(), get_osdmap_epoch(), 0, false);
- reply->set_enoent_reply_versions(peering_state.get_info().last_update,
- peering_state.get_info().last_user_version);
- return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
- });
+ logger().debug(
+ "do_osd_ops: {} - object {} got unhandled exception {} ({})",
+ *m,
+ obc->obs.oi.soid,
+ e.code(),
+ e.what());
+ auto reply = make_message<MOSDOpReply>(
+ m.get(), -e.code().value(), get_osdmap_epoch(), 0, false);
+ reply->set_enoent_reply_versions(peering_state.get_info().last_update,
+ peering_state.get_info().last_user_version);
+ return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
});
}
});
}
-seastar::future<> PG::handle_op(crimson::net::Connection* conn,
- Ref<MOSDOp> m)
+std::pair<hobject_t, RWState::State> PG::get_oid_and_lock(
+ const MOSDOp &m,
+ const OpInfo &op_info)
{
- return wait_for_active().then([conn, m, this] {
- if (m->finish_decode()) {
- m->clear_payload();
+ auto oid = m.get_snapid() == CEPH_SNAPDIR ?
+ m.get_hobj().get_head() : m.get_hobj();
+
+ RWState::State lock_type = RWState::RWNONE;
+ if (op_info.rwordered() && op_info.may_read()) {
+ lock_type = RWState::RWState::RWEXCL;
+ } else if (op_info.rwordered()) {
+ lock_type = RWState::RWState::RWWRITE;
+ } else {
+ ceph_assert(op_info.may_read());
+ lock_type = RWState::RWState::RWREAD;
+ }
+ return std::make_pair(oid, lock_type);
+}
+
+std::optional<hobject_t> PG::resolve_oid(
+ const SnapSet &ss,
+ const hobject_t &oid)
+{
+ if (oid.snap > ss.seq) {
+ return oid.get_head();
+ } else {
+ // which clone would it be?
+ auto clone = std::upper_bound(
+ begin(ss.clones), end(ss.clones),
+ oid.snap);
+ if (clone == end(ss.clones)) {
+ // Doesn't exist, > last clone, < ss.seq
+ return std::nullopt;
}
- if (std::any_of(begin(m->ops), end(m->ops),
- [](auto& op) { return ceph_osd_op_type_pg(op.op.op); })) {
- return do_pg_ops(m);
+ auto citer = ss.clone_snaps.find(*clone);
+ // TODO: how do we want to handle this kind of logic error?
+ ceph_assert(citer != ss.clone_snaps.end());
+
+ if (std::find(
+ citer->second.begin(),
+ citer->second.end(),
+ *clone) == citer->second.end()) {
+ return std::nullopt;
} else {
- return do_osd_ops(m);
+ auto soid = oid;
+ soid.snap = *clone;
+ return std::optional<hobject_t>(soid);
}
- }).then([conn](Ref<MOSDOpReply> reply) {
- return conn->send(reply);
- });
+ }
+}
+
+PG::load_obc_ertr::future<
+ std::pair<crimson::osd::ObjectContextRef, bool>>
+PG::get_or_load_clone_obc(hobject_t oid, ObjectContextRef head)
+{
+ ceph_assert(!oid.is_head());
+ using ObjectContextRef = crimson::osd::ObjectContextRef;
+ auto coid = resolve_oid(head->get_ro_ss(), oid);
+ if (!coid) {
+ return load_obc_ertr::make_ready_future<
+ std::pair<crimson::osd::ObjectContextRef, bool>>(
+ std::make_pair(ObjectContextRef(), true)
+ );
+ }
+ auto [obc, existed] = shard_services.obc_registry.get_cached_obc(*coid);
+ if (existed) {
+ return load_obc_ertr::make_ready_future<
+ std::pair<crimson::osd::ObjectContextRef, bool>>(
+ std::make_pair(obc, true)
+ );
+ } else {
+ bool got = obc->maybe_get_excl();
+ ceph_assert(got);
+ return backend->load_metadata(*coid).safe_then(
+ [oid, obc=std::move(obc), head, this](auto &&md) mutable {
+ obc->set_clone_state(std::move(md->os), std::move(head));
+ return load_obc_ertr::make_ready_future<
+ std::pair<crimson::osd::ObjectContextRef, bool>>(
+ std::make_pair(obc, false)
+ );
+ });
+ }
+}
+
+PG::load_obc_ertr::future<
+ std::pair<crimson::osd::ObjectContextRef, bool>>
+PG::get_or_load_head_obc(hobject_t oid)
+{
+ ceph_assert(oid.is_head());
+ auto [obc, existed] = shard_services.obc_registry.get_cached_obc(oid);
+ if (existed) {
+ logger().debug(
+ "{}: found {} in cache",
+ __func__,
+ oid);
+ return load_obc_ertr::make_ready_future<
+ std::pair<crimson::osd::ObjectContextRef, bool>>(
+ std::make_pair(std::move(obc), true)
+ );
+ } else {
+ logger().debug(
+ "{}: cache miss on {}",
+ __func__,
+ oid);
+ bool got = obc->maybe_get_excl();
+ ceph_assert(got);
+ return backend->load_metadata(oid).safe_then(
+ [oid, obc=std::move(obc), this](auto md) ->
+ load_obc_ertr::future<
+ std::pair<crimson::osd::ObjectContextRef, bool>>
+ {
+ logger().debug(
+ "{}: loaded obs {} for {}",
+ __func__,
+ md->os.oi,
+ oid);
+ if (!md->ss) {
+ logger().error(
+ "{}: oid {} missing snapset",
+ __func__,
+ oid);
+ return crimson::ct_error::object_corrupted::make();
+ }
+ obc->set_head_state(std::move(md->os), std::move(*(md->ss)));
+ logger().debug(
+ "{}: returning obc {} for {}",
+ __func__,
+ obc->obs.oi,
+ obc->obs.oi.soid);
+ return load_obc_ertr::make_ready_future<
+ std::pair<crimson::osd::ObjectContextRef, bool>>(
+ std::make_pair(obc, false)
+ );
+ });
+ }
+}
+
+PG::load_obc_ertr::future<crimson::osd::ObjectContextRef>
+PG::get_locked_obc(
+ Operation *op, const hobject_t &oid, RWState::State type)
+{
+ return get_or_load_head_obc(oid.get_head()).safe_then(
+ [this, op, oid, type](auto p) -> load_obc_ertr::future<ObjectContextRef>{
+ auto &[head_obc, head_existed] = p;
+ if (oid.is_head()) {
+ if (head_existed) {
+ return head_obc->get_lock_type(op, type).then([head_obc] {
+ ceph_assert(head_obc->loaded);
+ return load_obc_ertr::make_ready_future<ObjectContextRef>(head_obc);
+ });
+ } else {
+ head_obc->degrade_excl_to(type);
+ return load_obc_ertr::make_ready_future<ObjectContextRef>(head_obc);
+ }
+ } else {
+ return head_obc->get_lock_type(op, RWState::RWREAD).then(
+ [this, head_obc, op, oid, type] {
+ ceph_assert(head_obc->loaded);
+ return get_or_load_clone_obc(oid, head_obc);
+ }).safe_then([this, head_obc, op, oid, type](auto p) {
+ auto &[obc, existed] = p;
+ if (existed) {
+ return load_obc_ertr::future<>(
+ obc->get_lock_type(op, type)).safe_then([obc] {
+ ceph_assert(obc->loaded);
+ return load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
+ });
+ } else {
+ obc->degrade_excl_to(type);
+ return load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
+ }
+ }).safe_then([head_obc](auto obc) {
+ head_obc->put_lock_type(RWState::RWREAD);
+ return load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
+ });
+ }
+ });
}
seastar::future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
#include "crimson/net/Fwd.h"
#include "os/Transaction.h"
#include "osd/osd_types.h"
-#include "osd/osd_internal_types.h"
+#include "crimson/osd/object_context.h"
#include "osd/PeeringState.h"
#include "crimson/common/type_helpers.h"
void handle_advance_map(cached_map_t next_map, PeeringCtx &rctx);
void handle_activate_map(PeeringCtx &rctx);
void handle_initialize(PeeringCtx &rctx);
- seastar::future<> handle_op(crimson::net::Connection* conn,
- Ref<MOSDOp> m);
+
+ static std::pair<hobject_t, RWState::State> get_oid_and_lock(
+ const MOSDOp &m,
+ const OpInfo &op_info);
+ static std::optional<hobject_t> resolve_oid(
+ const SnapSet &snapset,
+ const hobject_t &oid);
+
+ using load_obc_ertr = crimson::errorator<
+ crimson::ct_error::object_corrupted>;
+ load_obc_ertr::future<
+ std::pair<crimson::osd::ObjectContextRef, bool>>
+ get_or_load_clone_obc(
+ hobject_t oid, crimson::osd::ObjectContextRef head_obc);
+
+ load_obc_ertr::future<
+ std::pair<crimson::osd::ObjectContextRef, bool>>
+ get_or_load_head_obc(hobject_t oid);
+
+ load_obc_ertr::future<ObjectContextRef> get_locked_obc(
+ Operation *op,
+ const hobject_t &oid,
+ RWState::State type);
+public:
+ template <typename F>
+ auto with_locked_obc(
+ Ref<MOSDOp> &m,
+ const OpInfo &op_info,
+ Operation *op,
+ F &&f) {
+ auto [oid, type] = get_oid_and_lock(*m, op_info);
+ return get_locked_obc(op, oid, type)
+ .safe_then([this, f=std::forward<F>(f), type](auto obc) {
+ return f(obc).finally([this, obc, type] {
+ obc->put_lock_type(type);
+ return load_obc_ertr::now();
+ });
+ });
+ }
+
seastar::future<> handle_rep_op(Ref<MOSDRepOp> m);
void handle_rep_op_reply(crimson::net::Connection* conn,
const MOSDRepOpReply& m);
void do_peering_event(
const boost::statechart::event_base &evt,
PeeringCtx &rctx);
- seastar::future<Ref<MOSDOpReply>> do_osd_ops(Ref<MOSDOp> m);
+ seastar::future<Ref<MOSDOpReply>> do_osd_ops(
+ Ref<MOSDOp> m,
+ ObjectContextRef obc);
seastar::future<Ref<MOSDOpReply>> do_pg_ops(Ref<MOSDOp> m);
seastar::future<> do_osd_op(
ObjectState& os,
seastar::future<ceph::bufferlist> do_pgnls(ceph::bufferlist& indata,
const std::string& nspace,
uint64_t limit);
- seastar::future<> submit_transaction(boost::local_shared_ptr<ObjectState>&& os,
+ seastar::future<> submit_transaction(ObjectContextRef&& obc,
ceph::os::Transaction&& txn,
const MOSDOp& req);
ShardServices &shard_services;
cached_map_t osdmap;
+
+public:
+ cached_map_t get_osdmap() { return osdmap; }
+
+private:
std::unique_ptr<PGBackend> backend;
PeeringState peering_state;
#include "messages/MOSDOp.h"
#include "os/Transaction.h"
+#include "common/Clock.h"
#include "crimson/os/cyanstore/cyan_object.h"
#include "crimson/os/futurized_collection.h"
store{store}
{}
-PGBackend::get_os_errorator::future<PGBackend::cached_os_t>
-PGBackend::get_object_state(const hobject_t& oid)
-{
- // want the head?
- if (oid.snap == CEPH_NOSNAP) {
- logger().trace("find_object: {}@HEAD", oid);
- return _load_os(oid);
- } else {
- // we want a snap
- return _load_ss(oid).safe_then(
- [oid,this](cached_ss_t ss) -> get_os_errorator::future<cached_os_t> {
- // head?
- if (oid.snap > ss->seq) {
- return _load_os(oid.get_head());
- } else {
- // which clone would it be?
- auto clone = std::upper_bound(begin(ss->clones), end(ss->clones),
- oid.snap);
- if (clone == end(ss->clones)) {
- return crimson::ct_error::enoent::make();
- }
- // clone
- auto soid = oid;
- soid.snap = *clone;
- return _load_ss(soid).safe_then(
- [soid,this](cached_ss_t ss) -> get_os_errorator::future<cached_os_t> {
- auto clone_snap = ss->clone_snaps.find(soid.snap);
- assert(clone_snap != end(ss->clone_snaps));
- if (clone_snap->second.empty()) {
- logger().trace("find_object: {}@[] -- DNE", soid);
- return crimson::ct_error::enoent::make();
- }
- auto first = clone_snap->second.back();
- auto last = clone_snap->second.front();
- if (first > soid.snap) {
- logger().trace("find_object: {}@[{},{}] -- DNE",
- soid, first, last);
- return crimson::ct_error::enoent::make();
- }
- logger().trace("find_object: {}@[{},{}] -- HIT",
- soid, first, last);
- return _load_os(soid);
- });
- }
- });
- }
-}
-
-PGBackend::load_metadata_ertr::future<PGBackend::loaded_object_md_t>
+PGBackend::load_metadata_ertr::future<PGBackend::loaded_object_md_t::ref>
PGBackend::load_metadata(const hobject_t& oid)
{
return store->get_attrs(
coll,
ghobject_t{oid, ghobject_t::NO_GEN, shard}).safe_then(
- [oid, this](auto &&attrs) -> load_metadata_ertr::future<loaded_object_md_t>{
- loaded_object_md_t ret;
+ [oid, this](auto &&attrs) -> load_metadata_ertr::future<loaded_object_md_t::ref>{
+ loaded_object_md_t::ref ret(new loaded_object_md_t());
if (auto oiiter = attrs.find(OI_ATTR); oiiter != attrs.end()) {
bufferlist bl;
- bl.push_back(oiiter->second);
- ret.os = ObjectState(
+ bl.push_back(std::move(oiiter->second));
+ ret->os = ObjectState(
object_info_t(bl),
true);
} else {
+ logger().error(
+ "load_metadata: object {} present but missing object info",
+ oid);
return crimson::ct_error::object_corrupted::make();
}
if (oid.is_head()) {
if (auto ssiter = attrs.find(SS_ATTR); ssiter != attrs.end()) {
bufferlist bl;
- bl.push_back(ssiter->second);
- ret.ss = SnapSet(bl);
+ bl.push_back(std::move(ssiter->second));
+ ret->ss = SnapSet(bl);
} else {
- return crimson::ct_error::object_corrupted::make();
+ /* TODO: add support for writing out snapsets
+ logger().error(
+ "load_metadata: object {} present but missing snapset",
+ oid);
+ //return crimson::ct_error::object_corrupted::make();
+ */
+ ret->ss = SnapSet();
}
}
- return load_metadata_ertr::make_ready_future<loaded_object_md_t>(
+ return load_metadata_ertr::make_ready_future<loaded_object_md_t::ref>(
std::move(ret));
}, crimson::ct_error::enoent::handle([oid, this] {
- return load_metadata_ertr::make_ready_future<loaded_object_md_t>(
- loaded_object_md_t{
- ObjectState(),
- std::nullopt
+ logger().debug(
+ "load_metadata: object {} doesn't exist, returning empty metadata",
+ oid);
+ return load_metadata_ertr::make_ready_future<loaded_object_md_t::ref>(
+ new loaded_object_md_t{
+ ObjectState(
+ object_info_t(oid),
+ false),
+ oid.is_head() ? std::optional<SnapSet>(SnapSet()) : std::nullopt
});
}));
}
-PGBackend::get_os_errorator::future<PGBackend::cached_os_t>
-PGBackend::_load_os(const hobject_t& oid)
-{
- if (auto found = os_cache.find(oid); found) {
- return get_os_errorator::make_ready_future<cached_os_t>(std::move(found));
- }
- return load_metadata(oid).safe_then([oid, this](auto &&md) {
- return get_os_errorator::make_ready_future<cached_os_t>(
- os_cache.insert(
- oid,
- std::make_unique<ObjectState>(std::move(md.os))));
- });
-}
-
-PGBackend::get_os_errorator::future<PGBackend::cached_ss_t>
-PGBackend::_load_ss(const hobject_t& oid)
-{
- if (auto found = ss_cache.find(oid); found) {
- return get_os_errorator::make_ready_future<cached_ss_t>(std::move(found));
- }
- return load_metadata(oid).safe_then([oid, this](auto &&md) {
- if (!md.ss) {
- return get_os_errorator::make_ready_future<cached_ss_t>(
- std::make_unique<SnapSet>());
- } else {
- return get_os_errorator::make_ready_future<cached_ss_t>(
- ss_cache.insert(oid, std::make_unique<SnapSet>(std::move(*(md.ss)))));
- }
- });
-}
-
seastar::future<crimson::osd::acked_peers_t>
PGBackend::mutate_object(
std::set<pg_shard_t> pg_shards,
- cached_os_t&& os,
+ crimson::osd::ObjectContextRef &&obc,
ceph::os::Transaction&& txn,
const MOSDOp& m,
epoch_t min_epoch,
eversion_t ver)
{
logger().trace("mutate_object: num_ops={}", txn.get_num_ops());
- if (os->exists) {
+ if (obc->obs.exists) {
#if 0
- os.oi.version = ctx->at_version;
- os.oi.prior_version = ctx->obs->oi.version;
+ obc->obs.oi.version = ctx->at_version;
+ obc->obs.oi.prior_version = ctx->obs->oi.version;
#endif
- os->oi.last_reqid = m.get_reqid();
- os->oi.mtime = m.get_mtime();
- os->oi.local_mtime = ceph_clock_now();
+ obc->obs.oi.last_reqid = m.get_reqid();
+ obc->obs.oi.mtime = m.get_mtime();
+ obc->obs.oi.local_mtime = ceph_clock_now();
// object_info_t
{
ceph::bufferlist osv;
- encode(os->oi, osv, 0);
+ encode(obc->obs.oi, osv, 0);
// TODO: get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
- txn.setattr(coll->get_cid(), ghobject_t{os->oi.soid}, OI_ATTR, osv);
+ txn.setattr(coll->get_cid(), ghobject_t{obc->obs.oi.soid}, OI_ATTR, osv);
}
} else {
// reset cached ObjectState without enforcing eviction
- os->oi = object_info_t(os->oi.soid);
+ obc->obs.oi = object_info_t(obc->obs.oi.soid);
}
- return _submit_transaction(std::move(pg_shards), os->oi.soid, std::move(txn),
- m.get_reqid(), min_epoch, map_epoch, ver);
-}
-
-seastar::future<>
-PGBackend::evict_object_state(const hobject_t& oid)
-{
- os_cache.erase(oid);
- return seastar::now();
+ return _submit_transaction(
+ std::move(pg_shards), obc->obs.oi.soid, std::move(txn),
+ m.get_reqid(), min_epoch, map_epoch, ver);
}
static inline bool _read_verify_data(
crimson::os::CollectionRef coll,
crimson::osd::ShardServices& shard_services,
const ec_profile_t& ec_profile);
- using cached_os_t = boost::local_shared_ptr<ObjectState>;
- using get_os_errorator = crimson::errorator<crimson::ct_error::enoent>;
- get_os_errorator::future<cached_os_t> get_object_state(const hobject_t& oid);
- seastar::future<> evict_object_state(const hobject_t& oid);
using read_errorator = ll_read_errorator::extend<
crimson::ct_error::input_output_error,
size_t truncate_size,
uint32_t truncate_seq,
uint32_t flags);
+
using stat_errorator = crimson::errorator<crimson::ct_error::enoent>;
stat_errorator::future<> stat(
const ObjectState& os,
ceph::os::Transaction& trans);
seastar::future<crimson::osd::acked_peers_t> mutate_object(
std::set<pg_shard_t> pg_shards,
- cached_os_t&& os,
+ crimson::osd::ObjectContextRef &&obc,
ceph::os::Transaction&& txn,
const MOSDOp& m,
epoch_t min_epoch,
struct loaded_object_md_t {
ObjectState os;
std::optional<SnapSet> ss;
+ using ref = std::unique_ptr<loaded_object_md_t>;
};
- load_metadata_ertr::future<loaded_object_md_t> load_metadata(
+ load_metadata_ertr::future<loaded_object_md_t::ref> load_metadata(
const hobject_t &oid);
private:
- using cached_ss_t = boost::local_shared_ptr<SnapSet>;
- SharedLRU<hobject_t, SnapSet> ss_cache;
- get_os_errorator::future<cached_ss_t> _load_ss(const hobject_t& oid);
-
- SharedLRU<hobject_t, ObjectState> os_cache;
- get_os_errorator::future<cached_os_t> _load_os(const hobject_t& oid);
-
virtual ll_read_errorator::future<ceph::bufferlist> _read(
const hobject_t& hoid,
size_t offset,