}
}
+
+void PrimaryLogPG::get_adjacent_clones(const object_info_t& oi, OpContext* ctx,
+ ObjectContextRef& _l, ObjectContextRef& _g)
+{
+ const SnapSet& snapset = ctx->obc->ssc->snapset;
+
+ auto get_context = [this, &oi, &snapset](auto iter)
+ -> ObjectContextRef {
+ hobject_t cid = oi.soid;
+ cid.snap = (iter == snapset.clones.end()) ? snapid_t(CEPH_NOSNAP) : *iter;
+ ObjectContextRef obc = get_object_context(cid, false, NULL);
+ ceph_assert(obc);
+ return obc;
+ };
+
+ // check adjacent clones
+ auto s = std::find(snapset.clones.begin(), snapset.clones.end(), oi.soid.snap);
+
+ // We *must* find the clone iff it's not head,
+ // let s == snapset.clones.end() mean head
+ ceph_assert((s == snapset.clones.end()) == oi.soid.is_head());
+
+ if (s != snapset.clones.begin()) {
+ _l = get_context(s - 1);
+ }
+
+ if (s != snapset.clones.end()) {
+ _g = get_context(s + 1);
+ }
+}
+
+bool PrimaryLogPG::inc_refcount_by_set(OpContext* ctx, object_manifest_t& set_chunk,
+ RefCountCallback* fin)
+{
+ object_ref_delta_t refs;
+ ObjectContextRef obc_l, obc_g;
+ get_adjacent_clones(ctx->obs->oi, ctx, obc_l, obc_g);
+ set_chunk.calc_refs_to_inc_on_set(
+ obc_l ? &(obc_l->obs.oi.manifest) : nullptr,
+ obc_g ? &(obc_g->obs.oi.manifest) : nullptr,
+ refs);
+ if (!refs.is_empty()) {
+ /* This is called by set-chunk, so we only consider a single chunk for the time being */
+ ceph_assert(refs.size() == 1);
+ auto p = refs.begin();
+ int inc_ref_count = p->second;
+ if (inc_ref_count > 0) {
+ /*
+ * In set-chunk case, the first thing we should do is to increment
+ * the reference the targe object has prior to update object_manifest in object_info_t.
+ * So, call directly refcount_manifest.
+ */
+ refcount_manifest(ctx->obs->oi.soid, p->first,
+ refcount_t::INCREMENT_REF, fin);
+ return true;
+ } else if (inc_ref_count < 0) {
+ hobject_t src = ctx->obs->oi.soid;
+ hobject_t tgt = p->first;
+ ctx->register_on_commit(
+ [src, tgt, this](){
+ refcount_manifest(src, tgt, refcount_t::DECREMENT_REF, NULL);
+ });
+ return false;
+ }
+ }
+
+ return false;
+}
+
void PrimaryLogPG::dec_refcount_by_dirty(OpContext* ctx)
{
object_ref_delta_t refs;
refs);
if (!refs.is_empty()) {
hobject_t soid = obc->obs.oi.soid;
- ceph_assert(ctx);
ctx->register_on_commit(
[soid, this, refs](){
dec_refcount(soid, refs);
ceph_assert(ctx->obc->ssc);
if (oi.manifest.is_chunked()) {
- const SnapSet& snapset = ctx->obc->ssc->snapset;
-
- auto get_context = [this, &oi, &snapset](auto iter)
- -> ObjectContextRef {
- hobject_t cid = oi.soid;
- cid.snap = (iter == snapset.clones.end()) ? snapid_t(CEPH_NOSNAP) : *iter;
- ObjectContextRef obc = get_object_context(cid, false, NULL);
- ceph_assert(obc);
- ceph_assert(obc->obs.oi.has_manifest());
- ceph_assert(obc->obs.oi.manifest.is_chunked());
- return obc;
- };
- ObjectContextRef obc_l, obc_g;
-
- // check adjacent clones
- auto s = std::find(snapset.clones.begin(), snapset.clones.end(), oi.soid.snap);
-
- // We *must* find the clone iff it's not head,
- // let s == snapset.clones.end() mean head
- ceph_assert((s == snapset.clones.end()) == oi.soid.is_head());
-
- if (s != snapset.clones.begin()) {
- obc_l = get_context(s - 1);
- }
-
- if (s != snapset.clones.end()) {
- obc_g = get_context(s + 1);
- }
-
object_ref_delta_t refs;
+ ObjectContextRef obc_l, obc_g;
+ get_adjacent_clones(oi, ctx, obc_l, obc_g);
oi.manifest.calc_refs_to_drop_on_removal(
obc_l ? &(obc_l->obs.oi.manifest) : nullptr,
obc_g ? &(obc_g->obs.oi.manifest) : nullptr,
tgt_oloc.nspace);
bool need_reference = (osd_op.op.flags & CEPH_OSD_OP_FLAG_WITH_REFERENCE);
bool has_reference = (oi.manifest.chunk_map.find(src_offset) != oi.manifest.chunk_map.end()) &&
- (oi.manifest.chunk_map[src_offset].flags & chunk_info_t::FLAG_HAS_REFERENCE);
+ (oi.manifest.chunk_map[src_offset].test_flag(chunk_info_t::FLAG_HAS_REFERENCE));
if (has_reference) {
result = -EINVAL;
dout(5) << " the object is already a manifest " << dendl;
break;
}
+ chunk_info.oid = target;
+ chunk_info.offset = tgt_offset;
+ chunk_info.length = src_length;
if (op_finisher == nullptr && need_reference) {
// start
ctx->op_finishers[ctx->current_osd_subop_num].reset(
new SetManifestFinisher(osd_op));
RefCountCallback *fin = new RefCountCallback(ctx, osd_op);
- refcount_manifest(ctx->obc->obs.oi.soid, target,
- refcount_t::INCREMENT_REF, fin);
- result = -EINPROGRESS;
- } else {
- if (op_finisher) {
- result = op_finisher->execute();
- ceph_assert(result == 0);
+ object_manifest_t set_chunk;
+ bool need_inc_ref = false;
+ set_chunk.chunk_map[src_offset] = chunk_info;
+ need_inc_ref = inc_refcount_by_set(ctx, set_chunk, fin);
+ if (need_inc_ref) {
+ result = -EINPROGRESS;
+ break;
}
+ }
+ if (op_finisher) {
+ result = op_finisher->execute();
+ ceph_assert(result == 0);
+ }
- chunk_info_t chunk_info;
- chunk_info.oid = target;
- chunk_info.offset = tgt_offset;
- chunk_info.length = src_length;
- oi.manifest.chunk_map[src_offset] = chunk_info;
- if (!oi.has_manifest() && !oi.manifest.is_chunked())
- ctx->delta_stats.num_objects_manifest++;
- oi.set_flag(object_info_t::FLAG_MANIFEST);
- oi.manifest.type = object_manifest_t::TYPE_CHUNKED;
- if (!has_reference && need_reference) {
- oi.manifest.chunk_map[src_offset].set_flag(chunk_info_t::FLAG_HAS_REFERENCE);
- }
- if (need_reference && pool.info.get_fingerprint_type() != pg_pool_t::TYPE_FINGERPRINT_NONE) {
- oi.manifest.chunk_map[src_offset].set_flag(chunk_info_t::FLAG_HAS_FINGERPRINT);
- }
- ctx->modify = true;
- ctx->cache_operation = true;
+ oi.manifest.chunk_map[src_offset] = chunk_info;
+ if (!oi.has_manifest() && !oi.manifest.is_chunked())
+ ctx->delta_stats.num_objects_manifest++;
+ oi.set_flag(object_info_t::FLAG_MANIFEST);
+ oi.manifest.type = object_manifest_t::TYPE_CHUNKED;
+ if (!has_reference && need_reference) {
+ oi.manifest.chunk_map[src_offset].set_flag(chunk_info_t::FLAG_HAS_REFERENCE);
+ }
+ if (need_reference && pool.info.get_fingerprint_type() != pg_pool_t::TYPE_FINGERPRINT_NONE) {
+ oi.manifest.chunk_map[src_offset].set_flag(chunk_info_t::FLAG_HAS_FINGERPRINT);
+ }
+ ctx->modify = true;
+ ctx->cache_operation = true;
- dout(10) << "set-chunked oid:" << oi.soid << " user_version: " << oi.user_version
- << " chunk_info: " << chunk_info << dendl;
- if (op_finisher) {
- ctx->op_finishers.erase(ctx->current_osd_subop_num);
- }
+ dout(10) << "set-chunked oid:" << oi.soid << " user_version: " << oi.user_version
+ << " chunk_info: " << chunk_info << dendl;
+ if (op_finisher) {
+ ctx->op_finishers.erase(ctx->current_osd_subop_num);
}
}
void dec_refcount_by_dirty(OpContext* ctx);
bool is_dedup_chunk(const chunk_info_t& chunk);
ObjectContextRef get_prev_clone_obc(ObjectContextRef obc);
+ void get_adjacent_clones(const object_info_t& oi, OpContext* ctx,
+ ObjectContextRef& _l, ObjectContextRef& _g);
+ bool inc_refcount_by_set(OpContext* ctx, object_manifest_t& tgt,
+ RefCountCallback* fin = NULL);
friend struct C_ProxyChunkRead;
friend class PromoteManifestCallback;
return out << ci.ref_delta << std::endl;
}
+void object_manifest_t::calc_refs_to_inc_on_set(
+ const object_manifest_t* _g,
+ const object_manifest_t* _l,
+ object_ref_delta_t &refs) const
+{
+ /* avoid to increment the same reference on adjacent clones */
+ auto iter = chunk_map.begin();
+ auto find_chunk = [](decltype(iter) &i, const object_manifest_t* cur)
+ -> bool {
+ if (cur) {
+ auto c = cur->chunk_map.find(i->first);
+ if (c != cur->chunk_map.end() && c->second == i->second) {
+ return true;
+
+ }
+ }
+ return false;
+ };
+
+ /* If at least a same chunk exists on either _g or _l, do not increment
+ * the reference
+ *
+ * head: [0, 2) ccc, [6, 2) bbb, [8, 2) ccc
+ * 20: [0, 2) aaa, <- set_chunk
+ * 30: [0, 2) abc, [6, 2) bbb, [8, 2) ccc
+ * --> incremnt the reference
+ *
+ * head: [0, 2) ccc, [6, 2) bbb, [8, 2) ccc
+ * 20: [0, 2) ccc, <- set_chunk
+ * 30: [0, 2) abc, [6, 2) bbb, [8, 2) ccc
+ * --> do not need to increment
+ *
+ * head: [0, 2) ccc, [6, 2) bbb, [8, 2) ccc
+ * 20: [0, 2) ccc, <- set_chunk
+ * 30: [0, 2) ccc, [6, 2) bbb, [8, 2) ccc
+ * --> decrement the reference of ccc
+ *
+ */
+ for (; iter != chunk_map.end(); ++iter) {
+ auto found_g = find_chunk(iter, _g);
+ auto found_l = find_chunk(iter, _l);
+ if (!found_g && !found_l) {
+ refs.inc_ref(iter->second.oid);
+ } else if (found_g && found_l) {
+ refs.dec_ref(iter->second.oid);
+ }
+ }
+}
+
void object_manifest_t::calc_refs_to_drop_on_modify(
const object_manifest_t* _l,
const ObjectCleanRegions& clean_regions,
bool is_empty() {
return ref_delta.empty();
}
+ uint64_t size() {
+ return ref_delta.size();
+ }
friend std::ostream& operator<<(std::ostream& out, const object_ref_delta_t & ci);
};
chunk_map.clear();
}
+ /**
+ * calc_refs_to_inc_on_set
+ *
+ * Takes a manifest and returns the set of refs to
+ * increment upon set-chunk
+ *
+ * l should be nullptr if there are no clones, or
+ * l and g may each be null if the corresponding clone does not exist.
+ * *this contains the set of new references to set
+ *
+ */
+ void calc_refs_to_inc_on_set(
+ const object_manifest_t* g, ///< [in] manifest for clone > *this
+ const object_manifest_t* l, ///< [in] manifest for clone < *this
+ object_ref_delta_t &delta ///< [out] set of refs to drop
+ ) const;
+
/**
* calc_refs_to_drop_on_modify
*
* drop upon modification
*
* l should be nullptr if there are no clones, or
- * l should be a manifest for previous clone
+ * l may be null if the corresponding clone does not exist.
*
*/
void calc_refs_to_drop_on_modify(
}
}
+void ci_ref_test_inc_on_set(
+ object_manifest_t l,
+ object_manifest_t added_set,
+ object_manifest_t g,
+ object_ref_delta_t expected_delta)
+{
+ {
+ object_ref_delta_t delta;
+ added_set.calc_refs_to_inc_on_set(
+ &l,
+ &g,
+ delta);
+ ASSERT_EQ(
+ expected_delta,
+ delta);
+ }
+}
+
hobject_t mk_hobject(string name)
{
return hobject_t(
mk_delta({{"bar", -1}, {"ttt", -1}}));
}
+TEST(chunk_info_test, calc_refs_inc) {
+ ci_ref_test_inc_on_set(
+ mk_manifest({{256, {0, 256, "aaa"}}, {4096, {0, 1024, "foo"}}}),
+ mk_manifest({{1024, {0, 1024, "bar"}}}),
+ mk_manifest({{4096, {0, 1024, "foo"}}}),
+ mk_delta({{"bar", 1}}));
+}
+
+TEST(chunk_info_test, calc_refs_inc2) {
+ ci_ref_test_inc_on_set(
+ mk_manifest({{512, {0, 1024, "aaa"}}, {4096, {0, 1024, "foo"}}}),
+ mk_manifest({{1024, {0, 1024, "bar"}}, {4096, {0, 1024, "bbb"}}}),
+ mk_manifest({{512, {0, 1024, "foo"}}}),
+ mk_delta({{"bar", 1}, {"bbb", 1}}));
+}
+
+TEST(chunk_info_test, calc_refs_inc_no_l) {
+ ci_ref_test_inc_on_set(
+ mk_manifest({}),
+ mk_manifest({{1024, {0, 1024, "bar"}}, {4096, {0, 1024, "bbb"}}}),
+ mk_manifest({{512, {0, 1024, "foo"}}}),
+ mk_delta({{"bar", 1}, {"bbb", 1}}));
+}
+
+TEST(chunk_info_test, calc_refs_inc_no_g) {
+ ci_ref_test_inc_on_set(
+ mk_manifest({{512, {0, 1024, "aaa"}}, {4096, {0, 1024, "foo"}}}),
+ mk_manifest({{1024, {0, 1024, "bar"}}, {4096, {0, 1024, "foo"}}}),
+ mk_manifest({}),
+ mk_delta({{"bar", 1}}));
+}
+
+TEST(chunk_info_test, calc_refs_inc_match_g_l) {
+ ci_ref_test_inc_on_set(
+ mk_manifest({{256, {0, 256, "aaa"}}, {4096, {0, 1024, "foo"}}}),
+ mk_manifest({{256, {0, 256, "aaa"}}, {4096, {0, 1024, "foo"}}}),
+ mk_manifest({{256, {0, 256, "aaa"}}, {4096, {0, 1024, "foo"}}}),
+ mk_delta({{"aaa", -1}, {"foo", -1}}));
+}
+
+TEST(chunk_info_test, calc_refs_inc_match) {
+ ci_ref_test_inc_on_set(
+ mk_manifest({{256, {0, 256, "bbb"}}, {4096, {0, 1024, "foo"}}}),
+ mk_manifest({{256, {0, 256, "aaa"}}, {4096, {0, 1024, "foo"}}}),
+ mk_manifest({{256, {0, 256, "aaa"}}, {4096, {0, 1024, "ccc"}}}),
+ mk_delta({}));
+}
+
/*
* Local Variables:
* compile-command: "cd ../.. ;