From 9f23086234cef1eedba4064747fec14bc767925f Mon Sep 17 00:00:00 2001 From: myoungwon oh Date: Mon, 5 Oct 2020 21:28:23 +0900 Subject: [PATCH] osd: add tier_flush for dedup As described in TODO, tier_flush triggers running CDC, then increases reference count after CDC Signed-off-by: Myoungwon Oh --- src/osd/PrimaryLogPG.cc | 278 ++++++++++++++++++++++++++++++++++++++-- src/osd/PrimaryLogPG.h | 11 ++ 2 files changed, 280 insertions(+), 9 deletions(-) diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index 47127382280..41e08205cd7 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -70,6 +70,8 @@ #include +#include + MEMPOOL_DEFINE_OBJECT_FACTORY(PrimaryLogPG, replicatedpg, osd); using std::list; @@ -9976,6 +9978,264 @@ struct C_Flush : public Context { } }; +struct C_SetDedupChunks : public Context { + PrimaryLogPGRef pg; + hobject_t oid; + epoch_t last_peering_reset; + ceph_tid_t tid; + uint64_t offset; + + C_SetDedupChunks(PrimaryLogPG *p, hobject_t o, epoch_t lpr) + : pg(p), oid(o), last_peering_reset(lpr), + tid(0) + {} + void finish(int r) override { + if (r == -ECANCELED) + return; + std::scoped_lock locker{*pg}; + auto it = pg->manifest_ops.find(oid); + if (it == pg->manifest_ops.end()) { + // raced with cancel_manifest_ops + return; + } + // check if the previous op returns fail + if (it->second->results[0] < 0) { + return; + } + + it->second->results[offset] = r; + if (last_peering_reset == pg->get_last_peering_reset() && + r >= 0) { + if (it->second->num_chunks != it->second->results.size()) { + // there are on-going works + return; + } + pg->finish_set_dedup(oid, it->second->chunks, r, tid); + } else { + // if any failure occurs, put a mark on the results to recognize the failure + it->second->results[0] = r; + if (last_peering_reset != pg->get_last_peering_reset()) + it->second->results[0] = -EINVAL; + } + pg->manifest_ops.erase(it); + } +}; + + +int PrimaryLogPG::start_dedup(OpRequestRef op, ObjectContextRef obc) +{ + bufferlist bl; + uint64_t cur_off = 0; + const object_info_t& oi = obc->obs.oi; + const hobject_t& soid = oi.soid; + + if (!obc->is_blocked()) { + return -EINVAL; + } + + /* + * The operations to make dedup chunks are tracked by a ManifestOp. + * This op will be finished if all the operations are completed. + */ + ManifestOpRef mop(std::make_shared(nullptr, 0)); + + while (cur_off < oi.size) { + // cdc + vector> chunks; + int r = do_cdc(oi, bl, chunks); + if (r < 0) { + return r; + } + if (!chunks.size()) { + break; + } + + // get fingerprint + for (auto p : chunks) { + bufferlist chunk; + chunk.substr_of(bl, p.first, p.second); + hobject_t target = get_fpoid_from_chunk(soid, chunk); + if (target == hobject_t()) { + dout(0) << " fingerprint oid is null " << dendl; + break; + } + + // make a create_or_get_ref op + bufferlist t; + ObjectOperation obj_op; + cls_cas_chunk_create_or_get_ref_op get_call; + get_call.source = soid.get_head(); + get_call.data = chunk; + ::encode(get_call, t); + obj_op.call("cas", "chunk_create_or_get_ref", t); + + // issue create_or_get_ref_op + C_SetDedupChunks *fin = new C_SetDedupChunks(this, soid, get_last_peering_reset()); + fin->offset = p.first; + object_locator_t oloc(target); + unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY | + CEPH_OSD_FLAG_RWORDERED; + ceph_tid_t tid = osd->objecter->mutate( + target.oid, oloc, obj_op, SnapContext(), + ceph::real_clock::from_ceph_timespec(obc->obs.oi.mtime), + flags, new C_OnFinisher(fin, osd->get_objecter_finisher(get_pg_shard()))); + fin->tid = tid; + mop->tids[p.first] = tid; + mop->chunks[target] = make_pair(p.first, p.second); + mop->num_chunks++; + dout(10) << __func__ << " oid: " << soid << " tid: " << tid + << " target: " << target << " offset: " << p.first + << " length: " << p.second << dendl; + } + + cur_off += bl.length(); + } + + if (mop->tids.size()) { + manifest_ops[soid] = mop; + manifest_ops[soid]->op = op; + } + + return -EINPROGRESS; +} + +int PrimaryLogPG::do_cdc(const object_info_t& oi, bufferlist& bl, vector>& chunks) +{ + uint64_t cur_off = 0; + string chunk_algo = pool.info.get_dedup_chunk_algorithm_name(); + int64_t chunk_size = pool.info.get_dedup_cdc_chunk_size(); + uint64_t max_window_size = static_cast(pool.info.get_dedup_cdc_window_size()); + + std::unique_ptr cdc = CDC::create(chunk_algo, cbits(chunk_size)-1); + if (!cdc) { + dout(0) << __func__ << " unrecognized chunk-algorithm " << dendl; + return -EINVAL; + } + while (cur_off < oi.size && cur_off < max_window_size) { + bufferlist chunk_data; + // TODO: Do we need to support EC? + int r = pgbackend->objects_read_sync( + oi.soid, cur_off, max_window_size, 0, &chunk_data); + if (r < 0) { + dout(0) << __func__ << " read fail " << " offset: " << cur_off + << " len: " << max_window_size << " r: " << r << dendl; + return r; + } + if (chunk_data.length() == 0) { + dout(0) << __func__ << " got 0 byte during chunking " << dendl; + return r; + } + bl.append(chunk_data); + cur_off += r; + } + + dout(10) << __func__ << " oid: " << oi.soid << " len: " << bl.length() + << " oi.size: " << oi.size << " window_size: " << max_window_size + << " chunk_size: " << chunk_size << dendl; + cdc->calc_chunks(bl, &chunks); + return 0; +} + +hobject_t PrimaryLogPG::get_fpoid_from_chunk(const hobject_t soid, bufferlist& chunk) +{ + pg_pool_t::fingerprint_t fp_algo = pool.info.get_fingerprint_type(); + if (fp_algo == pg_pool_t::TYPE_FINGERPRINT_NONE) { + return hobject_t(); + } + object_t fp_oid = [&fp_algo, &chunk]() -> string { + switch (fp_algo) { + case pg_pool_t::TYPE_FINGERPRINT_SHA1: + return ceph::crypto::digest(chunk).to_str(); + case pg_pool_t::TYPE_FINGERPRINT_SHA256: + return ceph::crypto::digest(chunk).to_str(); + case pg_pool_t::TYPE_FINGERPRINT_SHA512: + return ceph::crypto::digest(chunk).to_str(); + default: + assert(0 == "unrecognized fingerprint type"); + return {}; + } + }(); + + pg_t raw_pg; + object_locator_t oloc(soid); + oloc.pool = pool.info.get_dedup_tier(); + get_osdmap()->object_locator_to_pg(fp_oid, oloc, raw_pg); + hobject_t target(fp_oid, oloc.key, snapid_t(), + raw_pg.ps(), raw_pg.pool(), + oloc.nspace); + return target; +} + +void PrimaryLogPG::finish_set_dedup(hobject_t oid, map>& chunks, + int r, ceph_tid_t tid) +{ + dout(10) << __func__ << " " << oid << " tid " << tid + << " " << cpp_strerror(r) << dendl; + map::iterator p = manifest_ops.find(oid); + if (p == manifest_ops.end()) { + dout(10) << __func__ << " no flush_op found" << dendl; + return; + } + ManifestOpRef mop = p->second; + ObjectContextRef obc = get_object_context(oid, false); + if (!obc) { + if (mop->op) + osd->reply_op_error(mop->op, -EINVAL); + return; + } + obc->stop_block(); + kick_object_context_blocked(obc); + if (r < 0) { + if (mop->op) + osd->reply_op_error(mop->op, r); + return; + } + + if (chunks.size()) { + OpContextUPtr ctx = simple_opc_create(obc); + ceph_assert(ctx); + if (ctx->lock_manager.get_lock_type( + RWState::RWWRITE, + oid, + obc, + mop->op)) { + dout(20) << __func__ << " took write lock" << dendl; + } else if (mop->op) { + dout(10) << __func__ << " waiting on write lock " << mop->op << dendl; + close_op_ctx(ctx.release()); + osd->reply_op_error(mop->op, -EAGAIN); + return; + } + + // drop all references the current chunk_map has + object_ref_delta_t refs; + for (auto p : obc->obs.oi.manifest.chunk_map) { + refs.dec_ref(p.second.oid); + } + if (!refs.is_empty()) { + ctx->register_on_commit( + [oid, this, refs](){ + dec_refcount(oid, refs); + }); + } + + ctx->at_version = get_next_version(); + ctx->new_obs = obc->obs; + ctx->new_obs.oi.clear_flag(object_info_t::FLAG_DIRTY); + for (auto p : chunks) { + struct chunk_info_t info; + info.offset = 0; + info.length = p.second.second; + info.oid = p.first; + ctx->new_obs.oi.manifest.chunk_map[p.second.first] = info; + } + finish_ctx(ctx.get(), pg_log_entry_t::CLEAN); + simple_opc_submit(std::move(ctx)); + } + if (mop->op) + osd->reply_op_error(mop->op, r); +} + int PrimaryLogPG::start_flush( OpRequestRef op, ObjectContextRef obc, bool blocking, hobject_t *pmissing, @@ -9992,15 +10252,6 @@ int PrimaryLogPG::start_flush( bool preoctopus_compat = get_osdmap()->require_osd_release < ceph_release_t::octopus; SnapSet snapset; - if (obc->obs.oi.has_manifest() && obc->obs.oi.manifest.is_chunked()) { - /* - * TODO: "flush" for a manifest object means re-running the CDC algorithm on the portions of the - * object that are not currently dedup'd (not in the manifest chunk_map) and re-deduping the resulting - * chunks. Adding support for that operation here is future work. - * - */ - return -EOPNOTSUPP; - } if (preoctopus_compat) { // for pre-octopus compatibility, filter SnapSet::snaps. not // certain we need this, but let's be conservative. @@ -10079,6 +10330,15 @@ int PrimaryLogPG::start_flush( osd->objecter->op_cancel(tids, -ECANCELED); } + if (obc->obs.oi.has_manifest() && obc->obs.oi.manifest.is_chunked()) { + int r = start_dedup(op, obc); + if (r != -EINPROGRESS) { + if (blocking) + obc->stop_block(); + } + return r; + } + /** * In general, we need to send a delete and a copyfrom. * Consider snapc 10:[10, 9, 8, 4, 3, 2]:[10(10, 9), 4(4,3,2)] diff --git a/src/osd/PrimaryLogPG.h b/src/osd/PrimaryLogPG.h index 479994d80eb..7decb5704d3 100644 --- a/src/osd/PrimaryLogPG.h +++ b/src/osd/PrimaryLogPG.h @@ -260,6 +260,11 @@ public: struct ManifestOp { RefCountCallback *cb; ceph_tid_t objecter_tid; + OpRequestRef op; + std::map results; + std::map tids; + std::map> chunks; + uint64_t num_chunks = 0; ManifestOp(RefCountCallback* cb, ceph_tid_t tid) : cb(cb), objecter_tid(tid) {} @@ -1504,11 +1509,17 @@ protected: ObjectContextRef& _l, ObjectContextRef& _g); bool inc_refcount_by_set(OpContext* ctx, object_manifest_t& tgt, OSDOp& osd_op); + int do_cdc(const object_info_t& oi, bufferlist& bl, vector>& chunks); + int start_dedup(OpRequestRef op, ObjectContextRef obc); + hobject_t get_fpoid_from_chunk(const hobject_t soid, bufferlist& chunk); + void finish_set_dedup(hobject_t oid, map>& chunks, + int r, ceph_tid_t tid); friend struct C_ProxyChunkRead; friend class PromoteManifestCallback; friend struct C_CopyChunk; friend struct RefCountCallback; + friend struct C_SetDedupChunks; public: PrimaryLogPG(OSDService *o, OSDMapRef curmap, -- 2.39.5