}
}();
bufferlist in;
- if (fp_oid != tgt_soid.oid) {
+ if (fp_oid != tgt_soid.oid && (!obc->ssc && !obc->ssc->snapset.clones.size())) {
// TODO: don't retain reference to dirty extents
// decrement old chunk's reference count
ObjectOperation dec_op;
cls_cas_chunk_put_ref_op put_call;
- put_call.source = soid;
+ put_call.source = soid.get_head();
::encode(put_call, in);
dec_op.call("cas", "chunk_put_ref", in);
// we don't care dec_op's completion. scrub for dedup will fix this.
{
bufferlist t;
cls_cas_chunk_create_or_get_ref_op get_call;
- get_call.source = soid;
+ get_call.source = soid.get_head();
get_call.data = chunk_data;
::encode(get_call, t);
obj_op.call("cas", "chunk_create_or_get_ref", t);
}
}
-void PrimaryLogPG::dec_refcount_non_intersection(ObjectContextRef obc, const object_info_t& oi,
- const set<uint64_t>& intersection_set)
+void PrimaryLogPG::dec_refcount(ObjectContextRef obc, const object_info_t& oi,
+ const object_ref_delta_t& refs)
{
- for (auto c : oi.manifest.chunk_map) {
- auto iter = intersection_set.find(c.first);
- if (intersection_set.end() == iter) {
- dout(10) << __func__ << ": decrement reference on offset " << c.first << " length: "
- << c.second.length << " oid: " << c.second.oid << dendl;
- object_locator_t target_oloc(c.second.oid);
- refcount_manifest(obc, obc->obs.oi.soid, target_oloc, c.second.oid,
- SnapContext(), refcount_t::DECREMENT_REF, NULL);
- }
+ for (auto p = refs.begin(); p != refs.end(); ++p) {
+ dout(10) << __func__ << ": decrement reference on offset oid: " << p->first << dendl;
+ object_locator_t target_oloc(p->first);
+ refcount_manifest(obc, obc->obs.oi.soid, target_oloc, p->first,
+ SnapContext(), refcount_t::DECREMENT_REF, NULL);
}
}
-void PrimaryLogPG::dec_all_refcount_head_manifest(object_info_t& oi, OpContext* ctx)
+
+void PrimaryLogPG::dec_all_refcount_head_manifest(object_info_t& oi, OpContext* ctx, const hobject_t &coid)
{
SnapSetContext* ssc = ctx->obc->ssc;
ceph_assert(oi.has_manifest());
if (ssc && ssc->snapset.clones.size() > 0) {
ceph_assert(ssc);
dout(15) << __func__ << " has snapset " << dendl;
- set<uint64_t> refs;
+ object_ref_delta_t refs;
ceph_assert(oi.manifest.is_chunked());
ceph_assert(!oi.manifest.is_redirect());
- hobject_t clone_oid = oi.soid;
- clone_oid.snap = ssc->snapset.clone_overlap.rbegin()->first;
- ObjectContextRef cobc = get_object_context(clone_oid, false, NULL);
- if (!cobc) {
- dout(0) << __func__ << ": Can not find clone obc " << clone_oid << dendl;;
- return;
+ SnapSet& snapset = ssc->snapset;
+
+ // check adjacent clones
+ auto s = std::find(snapset.clones.begin(), snapset.clones.end(), coid.snap);
+ int index = std::distance(snapset.clones.begin(), s);
+ snapid_t l = *snapset.clones.begin(), g = CEPH_NOSNAP;
+ object_manifest_t* manifest_g = nullptr, * manifest_l = nullptr;
+ hobject_t t_oid = coid;
+
+ auto get_context = [this](const hobject_t & oid)
+ -> ObjectContextRef {
+ ObjectContextRef obc = get_object_context(oid, false, NULL);
+ ceph_assert(obc);
+ ceph_assert(obc->obs.oi.has_manifest());
+ ceph_assert(obc->obs.oi.manifest.is_chunked());
+ return obc;
+ };
+
+ if (index) {
+ l = snapset.clones[index - 1];
+ t_oid.snap = l;
+ manifest_l = &get_context(t_oid)->obs.oi.manifest;
+ }
+
+ if (*s != snapset.clones.back() && s != snapset.clones.end()) {
+ g = snapset.clones[index + 1];
+ t_oid.snap = g;
+ manifest_g = &get_context(t_oid)->obs.oi.manifest;
+ } else if (*s == snapset.clones.back()) {
+ manifest_g = &oi.manifest;
+ }
+
+ if (!manifest_g) {
+ oi.manifest.calc_refs_to_drop_on_removal(manifest_g, manifest_l, refs);
+ } else {
+ get_context(coid)->obs.oi.manifest.calc_refs_to_drop_on_removal(manifest_g, manifest_l, refs);
}
- object_info_t& coi = cobc->obs.oi;
- oi.manifest.build_intersection_set(coi.manifest.chunk_map, refs);
- ctx->register_on_commit(
- [oi, ctx, this, refs](){
- dec_refcount_non_intersection(ctx->obc, oi, refs);
- });
+ if (!refs.is_empty()) {
+ ctx->register_on_commit(
+ [oi, ctx, this, refs](){
+ dec_refcount(ctx->obc, oi, refs);
+ });
+ }
return;
}
bufferlist in;
if (type == refcount_t::INCREMENT_REF) {
cls_cas_chunk_get_ref_op call;
- call.source = src_soid;
+ call.source = src_soid.get_head();
::encode(call, in);
obj_op.call("cas", "chunk_get_ref", in);
} else if (type == refcount_t::DECREMENT_REF) {
cls_cas_chunk_put_ref_op call;
- call.source = src_soid;
+ call.source = src_soid.get_head();
::encode(call, in);
obj_op.call("cas", "chunk_put_ref", in);
}
if (coi.is_cache_pinned())
ctx->delta_stats.num_objects_pinned--;
if (coi.has_manifest()) {
- set<uint64_t> refs;
- object_info_t oi;
- for (auto p : snapset.clones) {
- hobject_t clone_oid = coid;
- if (clone_oid.snap == p) {
- continue;
- }
- clone_oid.snap = p;
- ObjectContextRef cobc = get_object_context(clone_oid, false, NULL);
- if (!cobc) {
- close_op_ctx(ctx.release());
- dout(0) << __func__ << ": Can not find obc " << coid << dendl;;
- return -ENOENT;
- }
-
- // check if the references is still used
- oi = cobc->obs.oi;
- if (oi.has_manifest()) {
- coi.manifest.build_intersection_set(oi.manifest.chunk_map, refs);
- }
- }
- // head
- oi = head_obc->obs.oi;
- if (oi.has_manifest()) {
- coi.manifest.build_intersection_set(oi.manifest.chunk_map, refs);
- }
-
- dec_refcount_non_intersection(head_obc, coi, refs);
+ dec_all_refcount_head_manifest(head_obc->obs.oi, ctx.get(), coid);
ctx->delta_stats.num_objects_manifest--;
}
obc->obs.exists = false;
}
if (oi.has_manifest()) {
ctx->delta_stats.num_objects_manifest--;
- dec_all_refcount_head_manifest(oi, ctx.get());
+ dec_all_refcount_head_manifest(oi, ctx.get(), hobject_t());
}
head_obc->obs.exists = false;
head_obc->obs.oi = object_info_t(head_oid);
break;
}
- dec_all_refcount_head_manifest(oi, ctx);
+ dec_all_refcount_head_manifest(oi, ctx, hobject_t());
oi.clear_flag(object_info_t::FLAG_MANIFEST);
oi.manifest = object_manifest_t();
if (oi.has_manifest()) {
ctx->delta_stats.num_objects_manifest--;
- dec_all_refcount_head_manifest(oi, ctx);
+ dec_all_refcount_head_manifest(oi, ctx, hobject_t());
}
if (whiteout) {
completion->release();
}
+ // check chunk's refcount
+ {
+ bufferlist t;
+ SHA1 sha1_gen;
+ int size = strlen("er");
+ unsigned char fingerprint[CEPH_CRYPTO_SHA1_DIGESTSIZE + 1];
+ char p_str[CEPH_CRYPTO_SHA1_DIGESTSIZE*2+1] = {0};
+ sha1_gen.Update((const unsigned char *)"er", size);
+ sha1_gen.Final(fingerprint);
+ buf_to_hex(fingerprint, CEPH_CRYPTO_SHA1_DIGESTSIZE, p_str);
+ cache_ioctx.getxattr(p_str, CHUNK_REFCOUNT_ATTR, t);
+ chunk_refs_t refs;
+ try {
+ auto iter = t.cbegin();
+ decode(refs, iter);
+ } catch (buffer::error& err) {
+ ASSERT_TRUE(0);
+ }
+ ASSERT_EQ(1u, refs.count());
+ }
+
// create a snapshot, clone
vector<uint64_t> my_snaps(1);
ASSERT_EQ(0, ioctx.selfmanaged_snap_create(&my_snaps[0]));
ASSERT_EQ(0, ioctx.selfmanaged_snap_set_write_ctx(my_snaps[0],
my_snaps));
- // foo: [cc] [hi]
+ // foo: [er] [hi]
// make a dirty chunks
{
bufferlist bl;
- bl.append("Thcce");
+ bl.append("There");
ASSERT_EQ(0, ioctx.write("foo", bl, bl.length(), 0));
}
// flush
completion->release();
}
+ // check chunk's refcount
+ {
+ bufferlist t;
+ SHA1 sha1_gen;
+ int size = strlen("er");
+ unsigned char fingerprint[CEPH_CRYPTO_SHA1_DIGESTSIZE + 1];
+ char p_str[CEPH_CRYPTO_SHA1_DIGESTSIZE*2+1] = {0};
+ sha1_gen.Update((const unsigned char *)"er", size);
+ sha1_gen.Final(fingerprint);
+ buf_to_hex(fingerprint, CEPH_CRYPTO_SHA1_DIGESTSIZE, p_str);
+ cache_ioctx.getxattr(p_str, CHUNK_REFCOUNT_ATTR, t);
+ chunk_refs_t refs;
+ try {
+ auto iter = t.cbegin();
+ decode(refs, iter);
+ } catch (buffer::error& err) {
+ ASSERT_TRUE(0);
+ }
+ ASSERT_EQ(2u, refs.count());
+ }
+
+ // and another
+ my_snaps.resize(3);
+ my_snaps[2] = my_snaps[1];
+ my_snaps[1] = my_snaps[0];
+ ASSERT_EQ(0, ioctx.selfmanaged_snap_create(&my_snaps[0]));
+ ASSERT_EQ(0, ioctx.selfmanaged_snap_set_write_ctx(my_snaps[0],
+ my_snaps));
+
+ // foo: [bb] [hi]
+ // make a dirty chunks
+ {
+ bufferlist bl;
+ bl.append("Thbbe");
+ ASSERT_EQ(0, ioctx.write("foo", bl, bl.length(), 0));
+ }
+ // flush
+ {
+ ObjectReadOperation op;
+ op.tier_flush();
+ librados::AioCompletion *completion = cluster.aio_create_completion();
+ ASSERT_EQ(0, ioctx.aio_operate(
+ "foo", completion, &op,
+ librados::OPERATION_IGNORE_CACHE, NULL));
+ completion->wait_for_complete();
+ ASSERT_EQ(0, completion->get_return_value());
+ completion->release();
+ }
+
+ /*
+ * snap[2]: [er] [hi]
+ * snap[1]: [bb] [hi]
+ * snap[0]: [er] [hi]
+ * head: [bb] [hi]
+ */
+
// check chunk's refcount
{
bufferlist t;
ASSERT_EQ(1u, refs.count());
}
+ // check chunk's refcount
+ {
+ bufferlist t;
+ SHA1 sha1_gen;
+ int size = strlen("er");
+ unsigned char fingerprint[CEPH_CRYPTO_SHA1_DIGESTSIZE + 1];
+ char p_str[CEPH_CRYPTO_SHA1_DIGESTSIZE*2+1] = {0};
+ sha1_gen.Update((const unsigned char *)"er", size);
+ sha1_gen.Final(fingerprint);
+ buf_to_hex(fingerprint, CEPH_CRYPTO_SHA1_DIGESTSIZE, p_str);
+ cache_ioctx.getxattr(p_str, CHUNK_REFCOUNT_ATTR, t);
+ chunk_refs_t refs;
+ try {
+ auto iter = t.cbegin();
+ decode(refs, iter);
+ } catch (buffer::error& err) {
+ ASSERT_TRUE(0);
+ }
+ ASSERT_EQ(2u, refs.count());
+ }
+
+ // remove snap
+ ioctx.selfmanaged_snap_remove(my_snaps[2]);
+
+ /*
+ * snap[1]: [bb] [hi]
+ * snap[0]: [er] [hi]
+ * head: [bb] [hi]
+ */
+
+ sleep(10);
+
+ // check chunk's refcount
+ {
+ bufferlist t;
+ SHA1 sha1_gen;
+ int size = strlen("hi");
+ unsigned char fingerprint[CEPH_CRYPTO_SHA1_DIGESTSIZE + 1];
+ char p_str[CEPH_CRYPTO_SHA1_DIGESTSIZE*2+1] = {0};
+ sha1_gen.Update((const unsigned char *)"hi", size);
+ sha1_gen.Final(fingerprint);
+ buf_to_hex(fingerprint, CEPH_CRYPTO_SHA1_DIGESTSIZE, p_str);
+ cache_ioctx.getxattr(p_str, CHUNK_REFCOUNT_ATTR, t);
+ chunk_refs_t refs;
+ try {
+ auto iter = t.cbegin();
+ decode(refs, iter);
+ } catch (buffer::error& err) {
+ ASSERT_TRUE(0);
+ }
+ ASSERT_EQ(1u, refs.count());
+ }
+
+ // remove snap
+ ioctx.selfmanaged_snap_remove(my_snaps[0]);
+
+ /*
+ * snap[1]: [bb] [hi]
+ * head: [bb] [hi]
+ */
+
+ sleep(10);
+
+ // check chunk's refcount
+ {
+ bufferlist t;
+ SHA1 sha1_gen;
+ int size = strlen("bb");
+ unsigned char fingerprint[CEPH_CRYPTO_SHA1_DIGESTSIZE + 1];
+ char p_str[CEPH_CRYPTO_SHA1_DIGESTSIZE*2+1] = {0};
+ sha1_gen.Update((const unsigned char *)"bb", size);
+ sha1_gen.Final(fingerprint);
+ buf_to_hex(fingerprint, CEPH_CRYPTO_SHA1_DIGESTSIZE, p_str);
+ cache_ioctx.getxattr(p_str, CHUNK_REFCOUNT_ATTR, t);
+ chunk_refs_t refs;
+ try {
+ auto iter = t.cbegin();
+ decode(refs, iter);
+ } catch (buffer::error& err) {
+ ASSERT_TRUE(0);
+ }
+ ASSERT_EQ(1u, refs.count());
+ }
+
// remove snap
ioctx.selfmanaged_snap_remove(my_snaps[1]);
+ /*
+ * snap[1]: [bb] [hi]
+ */
+
sleep(10);
+ // check chunk's refcount
+ {
+ bufferlist t;
+ SHA1 sha1_gen;
+ int size = strlen("bb");
+ unsigned char fingerprint[CEPH_CRYPTO_SHA1_DIGESTSIZE + 1];
+ char p_str[CEPH_CRYPTO_SHA1_DIGESTSIZE*2+1] = {0};
+ sha1_gen.Update((const unsigned char *)"bb", size);
+ sha1_gen.Final(fingerprint);
+ buf_to_hex(fingerprint, CEPH_CRYPTO_SHA1_DIGESTSIZE, p_str);
+ cache_ioctx.getxattr(p_str, CHUNK_REFCOUNT_ATTR, t);
+ chunk_refs_t refs;
+ try {
+ auto iter = t.cbegin();
+ decode(refs, iter);
+ } catch (buffer::error& err) {
+ ASSERT_TRUE(0);
+ }
+ ASSERT_EQ(1u, refs.count());
+ }
+
// check chunk's refcount
{
bufferlist t;
completion->release();
}
-
// foo head: [er] [hi]
// make a dirty chunks
{
ObjectReadOperation op;
op.tier_flush();
librados::AioCompletion *completion = cluster.aio_create_completion();
- //ASSERT_EQ(0, ioctx.aio_operate("foo", completion, &op));
ASSERT_EQ(0, ioctx.aio_operate(
"foo", completion, &op,
librados::OPERATION_IGNORE_CACHE, NULL));