while (cur_off < oi.size) {
// cdc
- vector<pair<uint64_t, uint64_t>> chunks;
- int r = do_cdc(oi, bl, chunks);
+ std::map<uint64_t, bufferlist> chunks;
+ int r = do_cdc(oi, cur_off, mop->new_manifest.chunk_map, chunks);
if (r < 0) {
return r;
}
break;
}
- // get fingerprint
+ // chunks issued here are different with chunk_map newly generated
+ // because the same chunks in previous snap will not be issued
+ // So, we need two data structures; the first is the issued chunk list to track
+ // issued operations, and the second is the new chunk_map to update chunk_map after
+ // all operations are finished
+ object_ref_delta_t refs;
+ ObjectContextRef obc_l, obc_g;
+ get_adjacent_clones(obc, obc_l, obc_g);
+ // skip if the same content exits in prev snap at same offset
+ mop->new_manifest.calc_refs_to_inc_on_set(
+ obc_l ? &(obc_l->obs.oi.manifest) : nullptr,
+ obc_g ? &(obc_g->obs.oi.manifest) : nullptr,
+ refs,
+ chunks.begin()->first); // to avoid unnecessary search
+
for (auto p : chunks) {
- bufferlist chunk;
- chunk.substr_of(bl, p.first, p.second);
- hobject_t target = get_fpoid_from_chunk(soid, chunk);
-
- chunk_info_t chunk_info(0, p.second, target);
- // chunks issued here are different with chunk_map newly generated
- // because the same chunks in previous snap will not be issued
- // So, we need two data structures; the first is the issued chunk list to track
- // issued operations, and the second is the new chunk_map to update chunk_map after
- // all operations are finished
- mop->new_chunk_map[p.first] = chunk_info;
- // skip if the same content exits in prev snap at same offset
- object_ref_delta_t refs;
- ObjectContextRef obc_l, obc_g;
- object_manifest_t set_chunk;
- set_chunk.chunk_map[p.first] = chunk_info;
- get_adjacent_clones(obc, obc_l, obc_g);
- set_chunk.calc_refs_to_inc_on_set(
- obc_l ? &(obc_l->obs.oi.manifest) : nullptr,
- obc_g ? &(obc_g->obs.oi.manifest) : nullptr,
- refs);
- if (refs.is_empty()) {
- dout(15) << " found same chunk " << refs << dendl;
+ hobject_t target = mop->new_manifest.chunk_map[p.first].oid;
+ if (refs.find(target) == refs.end()) {
continue;
- }
-
+ }
// make a create_or_get_ref op
bufferlist t;
ObjectOperation obj_op;
cls_cas_chunk_create_or_get_ref_op get_call;
get_call.source = soid.get_head();
- get_call.data = chunk;
+ get_call.data = chunks[p.first];
::encode(get_call, t);
obj_op.call("cas", "chunk_create_or_get_ref", t);
flags, new C_OnFinisher(fin, osd->get_objecter_finisher(get_pg_shard())));
fin->tid = tid;
mop->tids[p.first] = tid;
- mop->chunks[target] = make_pair(p.first, p.second);
+ mop->chunks[target] = make_pair(p.first, p.second.length());
mop->num_chunks++;
dout(10) << __func__ << " oid: " << soid << " tid: " << tid
<< " target: " << target << " offset: " << p.first
- << " length: " << p.second << dendl;
+ << " length: " << p.second.length() << dendl;
}
-
- cur_off += bl.length();
+ cur_off += r;
}
if (mop->tids.size()) {
return -EINPROGRESS;
}
-int PrimaryLogPG::do_cdc(const object_info_t& oi, bufferlist& bl, vector<pair<uint64_t, uint64_t>>& chunks)
+int PrimaryLogPG::do_cdc(const object_info_t& oi, uint64_t off,
+ std::map<uint64_t, chunk_info_t>& chunk_map,
+ std::map<uint64_t, bufferlist>& chunks)
{
- uint64_t cur_off = 0;
+ uint64_t cur_off = off;
string chunk_algo = pool.info.get_dedup_chunk_algorithm_name();
int64_t chunk_size = pool.info.get_dedup_cdc_chunk_size();
uint64_t max_window_size = static_cast<uint64_t>(pool.info.get_dedup_cdc_window_size());
+ uint64_t total_length = 0;
std::unique_ptr<CDC> cdc = CDC::create(chunk_algo, cbits(chunk_size)-1);
if (!cdc) {
dout(0) << __func__ << " unrecognized chunk-algorithm " << dendl;
return -EINVAL;
}
- while (cur_off < oi.size && cur_off < max_window_size) {
- bufferlist chunk_data;
- /**
- * We disable EC pool as a base tier of distributed dedup.
- * The reason why we disallow erasure code pool here is that the EC pool does not support objects_read_sync().
- * Therefore, we should change the current implementation totally to make EC pool compatible.
- * As s result, we leave this as a future work.
- */
- int r = pgbackend->objects_read_sync(
- oi.soid, cur_off, max_window_size, 0, &chunk_data);
- if (r < 0) {
- dout(0) << __func__ << " read fail " << " offset: " << cur_off
- << " len: " << max_window_size << " r: " << r << dendl;
- return r;
- }
- if (chunk_data.length() == 0) {
- dout(0) << __func__ << " got 0 byte during chunking " << dendl;
- return r;
- }
- bl.append(chunk_data);
- cur_off += r;
+
+ if (cur_off >= oi.size) {
+ return -ERANGE;
+ }
+ bufferlist bl;
+ /**
+ * We disable EC pool as a base tier of distributed dedup.
+ * The reason why we disallow erasure code pool here is that the EC pool does not support objects_read_sync().
+ * Therefore, we should change the current implementation totally to make EC pool compatible.
+ * As s result, we leave this as a future work.
+ */
+ int r = pgbackend->objects_read_sync(
+ oi.soid, cur_off, max_window_size, 0, &bl);
+ if (r < 0) {
+ dout(0) << __func__ << " read fail " << " offset: " << cur_off
+ << " len: " << max_window_size << " r: " << r << dendl;
+ return r;
+ }
+ if (bl.length() == 0) {
+ dout(0) << __func__ << " got 0 byte during chunking " << dendl;
+ return r;
}
dout(10) << __func__ << " oid: " << oi.soid << " len: " << bl.length()
<< " oi.size: " << oi.size << " window_size: " << max_window_size
<< " chunk_size: " << chunk_size << dendl;
- cdc->calc_chunks(bl, &chunks);
- return 0;
+
+ vector<pair<uint64_t, uint64_t>> cdc_chunks;
+ cdc->calc_chunks(bl, &cdc_chunks);
+
+ // get fingerprint
+ for (auto p : cdc_chunks) {
+ bufferlist chunk;
+ uint64_t object_offset = off + p.first;
+ chunk.substr_of(bl, p.first, p.second);
+ hobject_t target = get_fpoid_from_chunk(oi.soid, chunk);
+ chunks[object_offset] = move(chunk);
+ chunk_map[object_offset] = chunk_info_t(0, p.second, target);
+ total_length += p.second;
+ }
+ return total_length;
}
hobject_t PrimaryLogPG::get_fpoid_from_chunk(const hobject_t soid, bufferlist& chunk)
ctx->new_obs.oi.manifest.chunk_map.clear();
// set new references
- ctx->new_obs.oi.manifest.chunk_map = mop->new_chunk_map;
+ ctx->new_obs.oi.manifest.chunk_map = mop->new_manifest.chunk_map;
finish_ctx(ctx.get(), pg_log_entry_t::CLEAN);
simple_opc_submit(std::move(ctx));