int cls_get_snapset_seq(cls_method_context_t hctx, uint64_t *snap_seq)
{
+ auto* ox = reinterpret_cast<crimson::osd::OpsExecuter*>(hctx);
+ auto obc = ox->get_obc();
+ if (!obc->obs.exists ||
+ (obc->obs.oi.is_whiteout() &&
+ obc->ssc->snapset.clones.empty())) {
+ return -ENOENT;
+ }
+ *snap_seq = obc->ssc->snapset.seq;
return 0;
}
namespace crimson::osd {
class Watch;
+struct SnapSetContext;
+using SnapSetContextRef = boost::intrusive_ptr<SnapSetContext>;
template <typename OBC>
struct obc_to_hoid {
}
};
+struct SnapSetContext :
+ public boost::intrusive_ref_counter<SnapSetContext,
+ boost::thread_unsafe_counter>
+{
+ hobject_t oid;
+ SnapSet snapset;
+ bool exists = false;
+ /**
+ * exists
+ *
+ * Because ObjectContext's are cached, we need to be able to express the case
+ * where the object to which a cached ObjectContext refers does not exist.
+ * ObjectContext's for yet-to-be-created objects are initialized with exists=false.
+ * The ObjectContext for a deleted object will have exists set to false until it falls
+ * out of cache (or another write recreates the object).
+ */
+ explicit SnapSetContext(const hobject_t& o) :
+ oid(o), exists(false) {}
+};
+
class ObjectContext : public ceph::common::intrusive_lru_base<
ceph::common::intrusive_lru_config<
hobject_t, ObjectContext, obc_to_hoid<ObjectContext>>>
public:
Ref head; // Ref defined as part of ceph::common::intrusive_lru_base
ObjectState obs;
- std::optional<SnapSet> ss;
+ SnapSetContextRef ssc;
// the watch / notify machinery rather stays away from the hot and
// frequented paths. std::map is used mostly because of developer's
// convenience.
const SnapSet &get_ro_ss() const {
if (is_head()) {
- ceph_assert(ss);
- return *ss;
+ ceph_assert(ssc);
+ return ssc->snapset;
} else {
- ceph_assert(head);
return head->get_ro_ss();
}
}
- void set_head_state(ObjectState &&_obs, SnapSet &&_ss) {
+ void set_head_state(ObjectState &&_obs, SnapSetContextRef &&_ssc) {
ceph_assert(is_head());
obs = std::move(_obs);
- ss = std::move(_ss);
+ ssc = std::move(_ssc);
}
void set_clone_state(ObjectState &&_obs, Ref &&_head) {
OpsExecuter::interruptible_errorated_future<OpsExecuter::osd_op_errorator>
OpsExecuter::execute_op(OSDOp& osd_op)
{
+ head_existed = obc->obs.exists;
return do_execute_op(osd_op).handle_error_interruptible(
osd_op_errorator::all_same_way([&osd_op](auto e, auto&& e_raw)
-> OpsExecuter::osd_op_errorator::future<> {
return pg->get_last_user_version();
}
+void OpsExecuter::make_writeable(std::vector<pg_log_entry_t>& log_entries)
+{
+ const hobject_t& soid = obc->obs.oi.soid;
+ logger().debug("{} {} snapset={} snapc={}",
+ __func__, soid,
+ obc->ssc->snapset, snapc);
+
+ // clone?
+ if (head_existed && // old obs.exists
+ snapc.snaps.size() && // there are snaps
+ snapc.snaps[0] > obc->ssc->snapset.seq) { // existing obj is old
+
+ // clone object, the snap field is set to the seq of the SnapContext
+ // at its creation.
+ hobject_t coid = soid;
+ coid.snap = snapc.seq;
+
+ // existing snaps are stored in descending order in snapc,
+ // cloned_snaps vector will hold all the snaps stored until snapset.seq
+ const std::vector<snapid_t> cloned_snaps = [&] {
+ auto last = std::find_if(
+ std::begin(snapc.snaps), std::end(snapc.snaps),
+ [&](snapid_t snap_id) { return snap_id <= obc->ssc->snapset.seq; });
+ return std::vector<snapid_t>{std::begin(snapc.snaps), last};
+ }();
+
+ // version
+ osd_op_params->at_version = pg->next_version();
+
+ auto snap_oi = prepare_clone(coid);
+
+ // make clone
+ do_write_op([this, &snap_oi](auto& backend, auto& os, auto& txn) {
+ return backend.clone(snap_oi, os, clone_obc->obs, txn);
+ });
+
+ delta_stats.num_objects++;
+ if (snap_oi.is_omap()) {
+ delta_stats.num_objects_omap++;
+ }
+ delta_stats.num_object_clones++;
+ // newsnapset is obc's ssc
+ obc->ssc->snapset.clones.push_back(coid.snap);
+ obc->ssc->snapset.clone_size[coid.snap] = obc->obs.oi.size;
+ obc->ssc->snapset.clone_snaps[coid.snap] = cloned_snaps;
+
+ // clone_overlap should contain an entry for each clone
+ // (an empty interval_set if there is no overlap)
+ auto &overlap = obc->ssc->snapset.clone_overlap[coid.snap];
+ if (obc->obs.oi.size) {
+ overlap.insert(0, obc->obs.oi.size);
+ }
+
+ // log clone
+ logger().debug("cloning v {} to {} v {} snaps= {} snapset={}",
+ obc->obs.oi.version, coid,
+ osd_op_params->at_version, cloned_snaps, obc->ssc->snapset);
+
+ log_entries.emplace_back(pg_log_entry_t::CLONE,
+ coid, osd_op_params->at_version,
+ obc->obs.oi.version, obc->obs.oi.user_version,
+ osd_reqid_t(),
+ obc->obs.oi.mtime, 0);
+ encode(cloned_snaps, log_entries.back().snaps);
+ osd_op_params->at_version.version++;
+
+ // TODO: update most recent clone_overlap and usage stats
+
+ if (snapc.seq > obc->ssc->snapset.seq) {
+ // update snapset with latest snap context
+ obc->ssc->snapset.seq = snapc.seq;
+ obc->ssc->snapset.snaps.clear();
+ }
+ logger().debug("{} {} done, snapset={}",
+ __func__, soid, obc->ssc->snapset);
+ }
+}
+
+const object_info_t OpsExecuter::prepare_clone(
+ const hobject_t& coid)
+{
+ object_info_t static_snap_oi(coid);
+ static_snap_oi.version = osd_op_params->at_version;
+ static_snap_oi.prior_version = obc->obs.oi.version;
+ static_snap_oi.copy_user_bits(obc->obs.oi);
+
+ if (pg->is_primary()) {
+ // lookup_or_create
+ auto [c_obc, existed] =
+ pg->get_shard_services().get_cached_obc(
+ std::move(coid));
+ assert(!existed);
+ c_obc->obs.oi = static_snap_oi;
+ c_obc->obs.exists = true;
+ c_obc->ssc = obc->ssc;
+ c_obc->head = obc->head;
+ logger().debug("clone_obc: {}", c_obc->obs.oi);
+ clone_obc = std::move(c_obc);
+ }
+ return static_snap_oi;
+}
+
static inline std::unique_ptr<const PGLSFilter> get_pgls_filter(
const std::string& type,
bufferlist::const_iterator& iter)
Ref<PG> pg; // for the sake of object class
ObjectContextRef obc;
+ ObjectContextRef clone_obc; // if we create a clone
const OpInfo& op_info;
ceph::static_ptr<ExecutableMessage,
sizeof(ExecutableMessagePimpl<void>)> msg;
std::optional<osd_op_params_t> osd_op_params;
bool user_modify = false;
+ bool head_existed = false;
ceph::os::Transaction txn;
size_t num_read = 0; ///< count read ops
size_t num_write = 0; ///< count update ops
+ SnapContext snapc; // writer snap context
+
// this gizmo could be wrapped in std::optional for the sake of lazy
// initialization. we don't need it for ops that doesn't have effect
// TODO: verify the init overhead of chunked_fifo
OpsExecuter(Ref<PG> pg,
ObjectContextRef obc,
const OpInfo& op_info,
- const MsgT& msg)
+ const MsgT& msg,
+ const SnapContext& snapc)
: pg(std::move(pg)),
obc(std::move(obc)),
op_info(op_info),
- msg(std::in_place_type_t<ExecutableMessagePimpl<MsgT>>{}, &msg) {
+ msg(std::in_place_type_t<ExecutableMessagePimpl<MsgT>>{}, &msg),
+ snapc(snapc) {
}
template <class Func>
const std::vector<OSDOp>& ops);
void fill_op_params_bump_pg_version();
+ ObjectContextRef get_obc() const {
+ return obc;
+ }
+
const object_info_t &get_object_info() const {
return obc->obs.oi;
}
}
version_t get_last_user_version() const;
+
+ const SnapContext& get_snapc() const {
+ return snapc;
+ }
+
+ void make_writeable(std::vector<pg_log_entry_t>& log_entries);
+
+ const object_info_t prepare_clone(
+ const hobject_t& coid);
};
template <class Context, class MainFunc, class EffectFunc>
if (want_mutate) {
fill_op_params_bump_pg_version();
auto log_entries = prepare_transaction(ops);
+ make_writeable(log_entries);
auto [submitted, all_completed] = std::forward<MutFunc>(mut_func)(std::move(txn),
std::move(obc),
std::move(*osd_op_params),
if (__builtin_expect(stopping, false)) {
throw crimson::common::system_shutdown_exception();
}
+ SnapContext snapc;
+ if (op_info.may_write() || op_info.may_cache()) {
+ // snap
+ if (get_pgpool().info.is_pool_snaps_mode()) {
+ // use pool's snapc
+ snapc = get_pgpool().snapc;
+ logger().debug("{} using pool's snapc snaps={}",
+ __func__, snapc.snaps);
+
+ } else {
+ // client specified snapc
+ snapc.seq = m->get_snap_seq();
+ snapc.snaps = m->get_snaps();
+ logger().debug("{} client specified snapc seq={} snaps={}",
+ __func__, snapc.seq, snapc.snaps);
+ }
+ }
return do_osd_ops_execute<MURef<MOSDOpReply>>(
seastar::make_lw_shared<OpsExecuter>(
- Ref<PG>{this}, obc, op_info, *m),
+ Ref<PG>{this}, obc, op_info, *m, snapc),
m->ops,
[this, m, obc, may_write = op_info.may_write(),
may_read = op_info.may_read(), rvec = op_info.allows_returnvec()] {
do_osd_ops_success_func_t success_func,
do_osd_ops_failure_func_t failure_func)
{
- return seastar::do_with(std::move(msg_params), [=, this, &ops, &op_info]
- (auto &msg_params) {
+ // This overload is generally used for internal client requests,
+ // use an empty SnapContext.
+ return seastar::do_with(
+ std::move(msg_params),
+ [=, this, &ops, &op_info](auto &msg_params) {
return do_osd_ops_execute<void>(
seastar::make_lw_shared<OpsExecuter>(
- Ref<PG>{this}, std::move(obc), op_info, msg_params),
+ Ref<PG>{this}, std::move(obc), op_info, msg_params, SnapContext{}),
ops,
std::move(success_func),
std::move(failure_func));
const SnapSet &ss,
const hobject_t &oid)
{
+ logger().debug("{} oid.snap={},head snapset.seq={}",
+ __func__, oid.snap, ss.seq);
if (oid.snap > ss.seq) {
+ // Because oid.snap > ss.seq, we are trying to read from a snapshot
+ // taken after the most recent write to this object. Read from head.
return oid.get_head();
} else {
// which clone would it be?
- auto clone = std::upper_bound(
+ auto clone = std::lower_bound(
begin(ss.clones), end(ss.clones),
oid.snap);
if (clone == end(ss.clones)) {
return load_obc_iertr::future<>{crimson::ct_error::object_corrupted::make()};
}
auto [clone, existed] = shard_services.get_cached_obc(*coid);
- return clone->template with_lock<State>(
+ return clone->template with_lock<State, IOInterruptCondition>(
[coid=*coid, existed=existed,
head=std::move(head), clone=std::move(clone),
func=std::move(func), this]() -> load_obc_iertr::future<> {
logger().debug("with_clone_obc: found {} in cache", coid);
} else {
logger().debug("with_clone_obc: cache miss on {}", coid);
- loaded = clone->template with_promoted_lock<State>(
+ loaded = clone->template with_promoted_lock<State, IOInterruptCondition>(
[coid, clone, head, this] {
return backend->load_metadata(coid).safe_then_interruptible(
[coid, clone=std::move(clone), head=std::move(head)](auto md) mutable {
const hobject_t& oid = md->os.oi.soid;
logger().debug(
"load_head_obc: loaded obs {} for {}", md->os.oi, oid);
- if (!md->ss) {
+ if (!md->ssc) {
logger().error(
- "load_head_obc: oid {} missing snapset", oid);
+ "load_head_obc: oid {} missing snapsetcontext", oid);
return crimson::ct_error::object_corrupted::make();
+
}
- obc->set_head_state(std::move(md->os), std::move(*(md->ss)));
+ obc->set_head_state(std::move(md->os), std::move(md->ssc));
logger().debug(
"load_head_obc: returning obc {} for {}",
obc->obs.oi, obc->obs.oi.soid);
__func__,
md->os.oi,
obc.get_oid());
- if (!md->ss) {
+ if (!md->ssc) {
logger().error(
- "{}: oid {} missing snapset",
+ "{}: oid {} missing snapsetcontext",
__func__,
obc.get_oid());
return crimson::ct_error::object_corrupted::make();
}
- obc.set_head_state(std::move(md->os), std::move(*(md->ss)));
+ obc.set_head_state(std::move(md->os), std::move(md->ssc));
return load_obc_ertr::now();
});
}
oid);
return crimson::ct_error::object_corrupted::make();
}
-
- if (oid.is_head()) {
- if (auto ssiter = attrs.find(SS_ATTR); ssiter != attrs.end()) {
- bufferlist bl = std::move(ssiter->second);
- ret->ss = SnapSet(bl);
- } else {
- /* TODO: add support for writing out snapsets
- logger().error(
- "load_metadata: object {} present but missing snapset",
- oid);
- //return crimson::ct_error::object_corrupted::make();
- */
- ret->ss = SnapSet();
+
+ if (oid.is_head()) {
+ // Returning object_corrupted when the object exsits and the
+ // Snapset is either not found or empty.
+ bool object_corrupted = true;
+ if (auto ssiter = attrs.find(SS_ATTR); ssiter != attrs.end()) {
+ object_corrupted = false;
+ logger().debug(
+ "load_metadata: object {} and snapset {} present",
+ oid, ssiter->second);
+ bufferlist bl = std::move(ssiter->second);
+ if (bl.length()) {
+ ret->ssc = new crimson::osd::SnapSetContext(oid.get_snapdir());
+ try {
+ ret->ssc->snapset = SnapSet(bl);
+ ret->ssc->exists = true;
+ } catch (const buffer::error&) {
+ logger().warn("unable to decode SnapSet");
+ throw crimson::osd::invalid_argument();
+ }
+ } else {
+ object_corrupted = true;
+ }
+ }
+ if (object_corrupted) {
+ logger().error(
+ "load_metadata: object {} present but missing snapset",
+ oid);
+ return crimson::ct_error::object_corrupted::make();
}
}
ObjectState(
object_info_t(oid),
false),
- oid.is_head() ? std::optional<SnapSet>(SnapSet()) : std::nullopt
+ oid.is_head() ? (new crimson::osd::SnapSetContext(oid)) : nullptr
});
}));
}
// TODO: get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
txn.setattr(coll->get_cid(), ghobject_t{obc->obs.oi.soid}, OI_ATTR, osv);
}
+
+ // snapset
+ if (obc->obs.oi.soid.snap == CEPH_NOSNAP) {
+ logger().debug("final snapset {} in {}",
+ obc->ssc->snapset, obc->obs.oi.soid);
+ ceph::bufferlist bss;
+ encode(obc->ssc->snapset, bss);
+ txn.setattr(coll->get_cid(), ghobject_t{obc->obs.oi.soid}, SS_ATTR, bss);
+ obc->ssc->exists = true;
+ } else {
+ logger().debug("no snapset (this is a clone)");
+ }
} else {
// reset cached ObjectState without enforcing eviction
obc->obs.oi = object_info_t(obc->obs.oi.soid);
return rm_xattr_iertr::now();
}
+void PGBackend::clone(
+ object_info_t& snap_oi,
+ ObjectState& os,
+ ObjectState& d_os,
+ ceph::os::Transaction& txn)
+{
+ // Prepend the cloning operation to txn
+ ceph::os::Transaction c_txn;
+ c_txn.clone(coll->get_cid(), ghobject_t{os.oi.soid}, ghobject_t{d_os.oi.soid});
+ // Operations will be removed from txn while appending
+ c_txn.append(txn);
+ txn = std::move(c_txn);
+
+ ceph::bufferlist bv;
+ snap_oi.encode_no_oid(bv, CEPH_FEATURES_ALL);
+
+ txn.setattr(coll->get_cid(), ghobject_t{d_os.oi.soid}, OI_ATTR, bv);
+ txn.rmattr(coll->get_cid(), ghobject_t{d_os.oi.soid}, SS_ATTR);
+}
+
using get_omap_ertr =
crimson::os::FuturizedStore::read_errorator::extend<
crimson::ct_error::enodata>;
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& trans);
+ void clone(
+ object_info_t& snap_oi,
+ ObjectState& os,
+ ObjectState& d_os,
+ ceph::os::Transaction& trans);
interruptible_future<struct stat> stat(
CollectionRef c,
const ghobject_t& oid) const;
public:
struct loaded_object_md_t {
ObjectState os;
- std::optional<SnapSet> ss;
+ crimson::osd::SnapSetContextRef ssc;
using ref = std::unique_ptr<loaded_object_md_t>;
};
load_metadata_iertr::future<loaded_object_md_t::ref>