{
for (auto p = refs.begin(); p != refs.end(); ++p) {
int dec_ref_count = p->second;
+ ceph_assert(dec_ref_count < 0);
while (dec_ref_count < 0) {
dout(10) << __func__ << ": decrement reference on offset oid: " << p->first << dendl;
refcount_manifest(obc->obs.oi.soid, p->first,
}
-void PrimaryLogPG::dec_all_refcount_manifest(object_info_t& oi, OpContext* ctx)
+void PrimaryLogPG::dec_all_refcount_manifest(const object_info_t& oi, OpContext* ctx)
{
- SnapSetContext* ssc = ctx->obc->ssc;
ceph_assert(oi.has_manifest());
- // has snapshot
- if (ssc && ssc->snapset.clones.size() > 0) {
- ceph_assert(ssc);
- dout(15) << __func__ << " has snapset " << dendl;
- object_ref_delta_t refs;
- ceph_assert(oi.manifest.is_chunked());
- ceph_assert(!oi.manifest.is_redirect());
-
- SnapSet& snapset = ssc->snapset;
+ ceph_assert(ctx->obc->ssc);
- // check adjacent clones
- auto s = std::find(snapset.clones.begin(), snapset.clones.end(), oi.soid.snap);
- int index = std::distance(snapset.clones.begin(), s);
- snapid_t l = *snapset.clones.begin(), g = CEPH_NOSNAP;
- object_manifest_t* manifest_g = nullptr, * manifest_l = nullptr;
- hobject_t t_oid = oi.soid;
+ if (oi.manifest.is_chunked()) {
+ const SnapSet& snapset = ctx->obc->ssc->snapset;
- auto get_context = [this](const hobject_t & oid)
+ auto get_context = [this, &oi, &snapset](auto iter)
-> ObjectContextRef {
- ObjectContextRef obc = get_object_context(oid, false, NULL);
+ hobject_t cid = oi.soid;
+ cid.snap = (iter == snapset.clones.end()) ? snapid_t(CEPH_NOSNAP) : *iter;
+ ObjectContextRef obc = get_object_context(cid, false, NULL);
ceph_assert(obc);
ceph_assert(obc->obs.oi.has_manifest());
ceph_assert(obc->obs.oi.manifest.is_chunked());
return obc;
};
+ ObjectContextRef obc_l, obc_g;
- if (index) {
- l = snapset.clones[index - 1];
- t_oid.snap = l;
- manifest_l = &get_context(t_oid)->obs.oi.manifest;
- }
+ // check adjacent clones
+ auto s = std::find(snapset.clones.begin(), snapset.clones.end(), oi.soid.snap);
- if (snapset.clones.size() != 0 && s != snapset.clones.end()
- && *s != snapset.clones.back()) {
- g = snapset.clones[index + 1];
- t_oid.snap = g;
- manifest_g = &get_context(t_oid)->obs.oi.manifest;
- } else if (*s == snapset.clones.back()) {
- manifest_g = &get_context(oi.soid.get_head())->obs.oi.manifest;
+ // We *must* find the clone iff it's not head,
+ // let s == snapset.clones.end() mean head
+ ceph_assert((s == snapset.clones.end()) == oi.soid.is_head());
+
+ if (s != snapset.clones.begin()) {
+ obc_l = get_context(s - 1);
}
-
-
- oi.manifest.calc_refs_to_drop_on_removal(manifest_g, manifest_l, refs);
+
+ if (s != snapset.clones.end()) {
+ obc_g = get_context(s + 1);
+ }
+
+ object_ref_delta_t refs;
+ oi.manifest.calc_refs_to_drop_on_removal(
+ obc_l ? &(obc_l->obs.oi.manifest) : nullptr,
+ obc_g ? &(obc_g->obs.oi.manifest) : nullptr,
+ refs);
if (!refs.is_empty()) {
ctx->register_on_commit(
[ctx, this, refs](){
dec_refcount(ctx->obc, refs);
- });
+ });
}
- return;
- }
-
- // no snapshot
- if ((oi.flags & object_info_t::FLAG_REDIRECT_HAS_REFERENCE) && oi.manifest.is_redirect()) {
+ } else if (oi.manifest.is_redirect()) {
+ ceph_assert(oi.flags & object_info_t::FLAG_REDIRECT_HAS_REFERENCE);
ctx->register_on_commit(
[oi, this](){
- refcount_manifest(oi.soid, oi.manifest.redirect_target,
- refcount_t::DECREMENT_REF, NULL);
- });
- } else if (oi.manifest.is_chunked()) {
- ctx->register_on_commit(
- [oi, this](){
- for (auto p : oi.manifest.chunk_map) {
- if (p.second.has_reference()) {
- refcount_manifest(oi.soid, p.second.oid,
- refcount_t::DECREMENT_REF, NULL);
- }
- }
- });
+ refcount_manifest(oi.soid, oi.manifest.redirect_target,
+ refcount_t::DECREMENT_REF, NULL);
+ });
} else {
ceph_abort_msg("unrecognized manifest type");
}