#include <errno.h>
+#include <common/CDC.h>
+
MEMPOOL_DEFINE_OBJECT_FACTORY(PrimaryLogPG, replicatedpg, osd);
using std::list;
}
};
+struct C_SetDedupChunks : public Context {
+ PrimaryLogPGRef pg;
+ hobject_t oid;
+ epoch_t last_peering_reset;
+ ceph_tid_t tid;
+ uint64_t offset;
+
+ C_SetDedupChunks(PrimaryLogPG *p, hobject_t o, epoch_t lpr)
+ : pg(p), oid(o), last_peering_reset(lpr),
+ tid(0)
+ {}
+ void finish(int r) override {
+ if (r == -ECANCELED)
+ return;
+ std::scoped_lock locker{*pg};
+ auto it = pg->manifest_ops.find(oid);
+ if (it == pg->manifest_ops.end()) {
+ // raced with cancel_manifest_ops
+ return;
+ }
+ // check if the previous op returns fail
+ if (it->second->results[0] < 0) {
+ return;
+ }
+
+ it->second->results[offset] = r;
+ if (last_peering_reset == pg->get_last_peering_reset() &&
+ r >= 0) {
+ if (it->second->num_chunks != it->second->results.size()) {
+ // there are on-going works
+ return;
+ }
+ pg->finish_set_dedup(oid, it->second->chunks, r, tid);
+ } else {
+ // if any failure occurs, put a mark on the results to recognize the failure
+ it->second->results[0] = r;
+ if (last_peering_reset != pg->get_last_peering_reset())
+ it->second->results[0] = -EINVAL;
+ }
+ pg->manifest_ops.erase(it);
+ }
+};
+
+
+int PrimaryLogPG::start_dedup(OpRequestRef op, ObjectContextRef obc)
+{
+ bufferlist bl;
+ uint64_t cur_off = 0;
+ const object_info_t& oi = obc->obs.oi;
+ const hobject_t& soid = oi.soid;
+
+ if (!obc->is_blocked()) {
+ return -EINVAL;
+ }
+
+ /*
+ * The operations to make dedup chunks are tracked by a ManifestOp.
+ * This op will be finished if all the operations are completed.
+ */
+ ManifestOpRef mop(std::make_shared<ManifestOp>(nullptr, 0));
+
+ while (cur_off < oi.size) {
+ // cdc
+ vector<pair<uint64_t, uint64_t>> chunks;
+ int r = do_cdc(oi, bl, chunks);
+ if (r < 0) {
+ return r;
+ }
+ if (!chunks.size()) {
+ break;
+ }
+
+ // get fingerprint
+ for (auto p : chunks) {
+ bufferlist chunk;
+ chunk.substr_of(bl, p.first, p.second);
+ hobject_t target = get_fpoid_from_chunk(soid, chunk);
+ if (target == hobject_t()) {
+ dout(0) << " fingerprint oid is null " << dendl;
+ break;
+ }
+
+ // make a create_or_get_ref op
+ bufferlist t;
+ ObjectOperation obj_op;
+ cls_cas_chunk_create_or_get_ref_op get_call;
+ get_call.source = soid.get_head();
+ get_call.data = chunk;
+ ::encode(get_call, t);
+ obj_op.call("cas", "chunk_create_or_get_ref", t);
+
+ // issue create_or_get_ref_op
+ C_SetDedupChunks *fin = new C_SetDedupChunks(this, soid, get_last_peering_reset());
+ fin->offset = p.first;
+ object_locator_t oloc(target);
+ unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY |
+ CEPH_OSD_FLAG_RWORDERED;
+ ceph_tid_t tid = osd->objecter->mutate(
+ target.oid, oloc, obj_op, SnapContext(),
+ ceph::real_clock::from_ceph_timespec(obc->obs.oi.mtime),
+ flags, new C_OnFinisher(fin, osd->get_objecter_finisher(get_pg_shard())));
+ fin->tid = tid;
+ mop->tids[p.first] = tid;
+ mop->chunks[target] = make_pair(p.first, p.second);
+ mop->num_chunks++;
+ dout(10) << __func__ << " oid: " << soid << " tid: " << tid
+ << " target: " << target << " offset: " << p.first
+ << " length: " << p.second << dendl;
+ }
+
+ cur_off += bl.length();
+ }
+
+ if (mop->tids.size()) {
+ manifest_ops[soid] = mop;
+ manifest_ops[soid]->op = op;
+ }
+
+ return -EINPROGRESS;
+}
+
+int PrimaryLogPG::do_cdc(const object_info_t& oi, bufferlist& bl, vector<pair<uint64_t, uint64_t>>& chunks)
+{
+ uint64_t cur_off = 0;
+ string chunk_algo = pool.info.get_dedup_chunk_algorithm_name();
+ int64_t chunk_size = pool.info.get_dedup_cdc_chunk_size();
+ uint64_t max_window_size = static_cast<uint64_t>(pool.info.get_dedup_cdc_window_size());
+
+ std::unique_ptr<CDC> cdc = CDC::create(chunk_algo, cbits(chunk_size)-1);
+ if (!cdc) {
+ dout(0) << __func__ << " unrecognized chunk-algorithm " << dendl;
+ return -EINVAL;
+ }
+ while (cur_off < oi.size && cur_off < max_window_size) {
+ bufferlist chunk_data;
+ // TODO: Do we need to support EC?
+ int r = pgbackend->objects_read_sync(
+ oi.soid, cur_off, max_window_size, 0, &chunk_data);
+ if (r < 0) {
+ dout(0) << __func__ << " read fail " << " offset: " << cur_off
+ << " len: " << max_window_size << " r: " << r << dendl;
+ return r;
+ }
+ if (chunk_data.length() == 0) {
+ dout(0) << __func__ << " got 0 byte during chunking " << dendl;
+ return r;
+ }
+ bl.append(chunk_data);
+ cur_off += r;
+ }
+
+ dout(10) << __func__ << " oid: " << oi.soid << " len: " << bl.length()
+ << " oi.size: " << oi.size << " window_size: " << max_window_size
+ << " chunk_size: " << chunk_size << dendl;
+ cdc->calc_chunks(bl, &chunks);
+ return 0;
+}
+
+hobject_t PrimaryLogPG::get_fpoid_from_chunk(const hobject_t soid, bufferlist& chunk)
+{
+ pg_pool_t::fingerprint_t fp_algo = pool.info.get_fingerprint_type();
+ if (fp_algo == pg_pool_t::TYPE_FINGERPRINT_NONE) {
+ return hobject_t();
+ }
+ object_t fp_oid = [&fp_algo, &chunk]() -> string {
+ switch (fp_algo) {
+ case pg_pool_t::TYPE_FINGERPRINT_SHA1:
+ return ceph::crypto::digest<ceph::crypto::SHA1>(chunk).to_str();
+ case pg_pool_t::TYPE_FINGERPRINT_SHA256:
+ return ceph::crypto::digest<ceph::crypto::SHA256>(chunk).to_str();
+ case pg_pool_t::TYPE_FINGERPRINT_SHA512:
+ return ceph::crypto::digest<ceph::crypto::SHA512>(chunk).to_str();
+ default:
+ assert(0 == "unrecognized fingerprint type");
+ return {};
+ }
+ }();
+
+ pg_t raw_pg;
+ object_locator_t oloc(soid);
+ oloc.pool = pool.info.get_dedup_tier();
+ get_osdmap()->object_locator_to_pg(fp_oid, oloc, raw_pg);
+ hobject_t target(fp_oid, oloc.key, snapid_t(),
+ raw_pg.ps(), raw_pg.pool(),
+ oloc.nspace);
+ return target;
+}
+
+void PrimaryLogPG::finish_set_dedup(hobject_t oid, map<hobject_t, pair<uint64_t, uint64_t>>& chunks,
+ int r, ceph_tid_t tid)
+{
+ dout(10) << __func__ << " " << oid << " tid " << tid
+ << " " << cpp_strerror(r) << dendl;
+ map<hobject_t,ManifestOpRef>::iterator p = manifest_ops.find(oid);
+ if (p == manifest_ops.end()) {
+ dout(10) << __func__ << " no flush_op found" << dendl;
+ return;
+ }
+ ManifestOpRef mop = p->second;
+ ObjectContextRef obc = get_object_context(oid, false);
+ if (!obc) {
+ if (mop->op)
+ osd->reply_op_error(mop->op, -EINVAL);
+ return;
+ }
+ obc->stop_block();
+ kick_object_context_blocked(obc);
+ if (r < 0) {
+ if (mop->op)
+ osd->reply_op_error(mop->op, r);
+ return;
+ }
+
+ if (chunks.size()) {
+ OpContextUPtr ctx = simple_opc_create(obc);
+ ceph_assert(ctx);
+ if (ctx->lock_manager.get_lock_type(
+ RWState::RWWRITE,
+ oid,
+ obc,
+ mop->op)) {
+ dout(20) << __func__ << " took write lock" << dendl;
+ } else if (mop->op) {
+ dout(10) << __func__ << " waiting on write lock " << mop->op << dendl;
+ close_op_ctx(ctx.release());
+ osd->reply_op_error(mop->op, -EAGAIN);
+ return;
+ }
+
+ // drop all references the current chunk_map has
+ object_ref_delta_t refs;
+ for (auto p : obc->obs.oi.manifest.chunk_map) {
+ refs.dec_ref(p.second.oid);
+ }
+ if (!refs.is_empty()) {
+ ctx->register_on_commit(
+ [oid, this, refs](){
+ dec_refcount(oid, refs);
+ });
+ }
+
+ ctx->at_version = get_next_version();
+ ctx->new_obs = obc->obs;
+ ctx->new_obs.oi.clear_flag(object_info_t::FLAG_DIRTY);
+ for (auto p : chunks) {
+ struct chunk_info_t info;
+ info.offset = 0;
+ info.length = p.second.second;
+ info.oid = p.first;
+ ctx->new_obs.oi.manifest.chunk_map[p.second.first] = info;
+ }
+ finish_ctx(ctx.get(), pg_log_entry_t::CLEAN);
+ simple_opc_submit(std::move(ctx));
+ }
+ if (mop->op)
+ osd->reply_op_error(mop->op, r);
+}
+
int PrimaryLogPG::start_flush(
OpRequestRef op, ObjectContextRef obc,
bool blocking, hobject_t *pmissing,
bool preoctopus_compat =
get_osdmap()->require_osd_release < ceph_release_t::octopus;
SnapSet snapset;
- if (obc->obs.oi.has_manifest() && obc->obs.oi.manifest.is_chunked()) {
- /*
- * TODO: "flush" for a manifest object means re-running the CDC algorithm on the portions of the
- * object that are not currently dedup'd (not in the manifest chunk_map) and re-deduping the resulting
- * chunks. Adding support for that operation here is future work.
- *
- */
- return -EOPNOTSUPP;
- }
if (preoctopus_compat) {
// for pre-octopus compatibility, filter SnapSet::snaps. not
// certain we need this, but let's be conservative.
osd->objecter->op_cancel(tids, -ECANCELED);
}
+ if (obc->obs.oi.has_manifest() && obc->obs.oi.manifest.is_chunked()) {
+ int r = start_dedup(op, obc);
+ if (r != -EINPROGRESS) {
+ if (blocking)
+ obc->stop_block();
+ }
+ return r;
+ }
+
/**
* In general, we need to send a delete and a copyfrom.
* Consider snapc 10:[10, 9, 8, 4, 3, 2]:[10(10, 9), 4(4,3,2)]