]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
osd: add tier_flush for dedup
authormyoungwon oh <ohmyoungwon@gmail.com>
Mon, 5 Oct 2020 12:28:23 +0000 (21:28 +0900)
committermyoungwon oh <ohmyoungwon@gmail.com>
Tue, 13 Oct 2020 09:56:05 +0000 (18:56 +0900)
As described in TODO, tier_flush triggers running CDC, then
increases reference count after CDC

Signed-off-by: Myoungwon Oh <myoungwon.oh@samsumg.com>
src/osd/PrimaryLogPG.cc
src/osd/PrimaryLogPG.h

index 47127382280043046df9f1d3e5422ba49b595c78..41e08205cd771ad140d698bd6546870dba5f4fbe 100644 (file)
@@ -70,6 +70,8 @@
 
 #include <errno.h>
 
+#include <common/CDC.h>
+
 MEMPOOL_DEFINE_OBJECT_FACTORY(PrimaryLogPG, replicatedpg, osd);
 
 using std::list;
@@ -9976,6 +9978,264 @@ struct C_Flush : public Context {
   }
 };
 
+struct C_SetDedupChunks : public Context {
+  PrimaryLogPGRef pg;
+  hobject_t oid;
+  epoch_t last_peering_reset;
+  ceph_tid_t tid;
+  uint64_t offset;
+  
+  C_SetDedupChunks(PrimaryLogPG *p, hobject_t o, epoch_t lpr)
+    : pg(p), oid(o), last_peering_reset(lpr),
+      tid(0)
+  {}
+  void finish(int r) override {
+    if (r == -ECANCELED)
+      return;
+    std::scoped_lock locker{*pg};
+    auto it = pg->manifest_ops.find(oid);
+    if (it == pg->manifest_ops.end()) {
+      // raced with cancel_manifest_ops
+      return;
+    }
+    // check if the previous op returns fail
+    if (it->second->results[0] < 0) {
+      return;
+    }
+
+    it->second->results[offset] = r;
+    if (last_peering_reset == pg->get_last_peering_reset() &&
+       r >= 0) {
+      if (it->second->num_chunks != it->second->results.size()) {
+       // there are on-going works
+       return;
+      }
+      pg->finish_set_dedup(oid, it->second->chunks, r, tid);
+    } else {
+      // if any failure occurs, put a mark on the results to recognize the failure
+      it->second->results[0] = r;
+      if (last_peering_reset != pg->get_last_peering_reset()) 
+       it->second->results[0] = -EINVAL;
+    }
+    pg->manifest_ops.erase(it);
+  }
+};
+
+
+int PrimaryLogPG::start_dedup(OpRequestRef op, ObjectContextRef obc)
+{
+  bufferlist bl;
+  uint64_t cur_off = 0;
+  const object_info_t& oi = obc->obs.oi;
+  const hobject_t& soid = oi.soid;
+
+  if (!obc->is_blocked()) {
+    return -EINVAL;
+  }
+
+  /*
+   * The operations to make dedup chunks are tracked by a ManifestOp.
+   * This op will be finished if all the operations are completed.
+   */
+  ManifestOpRef mop(std::make_shared<ManifestOp>(nullptr, 0));
+
+  while (cur_off < oi.size) {
+    // cdc
+    vector<pair<uint64_t, uint64_t>> chunks;
+    int r = do_cdc(oi, bl, chunks);
+    if (r < 0) {
+      return r;
+    }
+    if (!chunks.size()) {
+      break;
+    }
+
+    // get fingerprint 
+    for (auto p : chunks) {
+      bufferlist chunk;
+      chunk.substr_of(bl, p.first, p.second);
+      hobject_t target = get_fpoid_from_chunk(soid, chunk);
+      if (target == hobject_t()) {
+       dout(0) << " fingerprint oid is null " << dendl;
+       break;
+      }
+
+      // make a create_or_get_ref op
+      bufferlist t;
+      ObjectOperation obj_op;
+      cls_cas_chunk_create_or_get_ref_op get_call;
+      get_call.source = soid.get_head();
+      get_call.data = chunk;
+      ::encode(get_call, t);
+      obj_op.call("cas", "chunk_create_or_get_ref", t);
+
+      // issue create_or_get_ref_op
+      C_SetDedupChunks *fin = new C_SetDedupChunks(this, soid, get_last_peering_reset());
+      fin->offset = p.first;
+      object_locator_t oloc(target);
+      unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY |
+                       CEPH_OSD_FLAG_RWORDERED;
+      ceph_tid_t tid = osd->objecter->mutate(
+         target.oid, oloc, obj_op, SnapContext(),
+         ceph::real_clock::from_ceph_timespec(obc->obs.oi.mtime),
+         flags, new C_OnFinisher(fin, osd->get_objecter_finisher(get_pg_shard())));
+      fin->tid = tid;
+      mop->tids[p.first] = tid;
+      mop->chunks[target] = make_pair(p.first, p.second);
+      mop->num_chunks++;
+      dout(10) << __func__ << " oid: " << soid << " tid: " << tid 
+             << " target: " << target << " offset: " << p.first
+             << " length: " << p.second << dendl;
+    }
+    
+    cur_off += bl.length();
+  }
+
+  if (mop->tids.size()) {
+    manifest_ops[soid] = mop;
+    manifest_ops[soid]->op = op;
+  } 
+
+  return -EINPROGRESS;
+}
+
+int PrimaryLogPG::do_cdc(const object_info_t& oi, bufferlist& bl, vector<pair<uint64_t, uint64_t>>& chunks)
+{
+  uint64_t cur_off = 0;
+  string chunk_algo = pool.info.get_dedup_chunk_algorithm_name();
+  int64_t chunk_size = pool.info.get_dedup_cdc_chunk_size();
+  uint64_t max_window_size = static_cast<uint64_t>(pool.info.get_dedup_cdc_window_size());
+
+  std::unique_ptr<CDC> cdc = CDC::create(chunk_algo, cbits(chunk_size)-1);
+  if (!cdc) {
+    dout(0) << __func__ << " unrecognized chunk-algorithm " << dendl;
+    return -EINVAL;
+  }
+  while (cur_off < oi.size && cur_off < max_window_size) {
+    bufferlist chunk_data;
+    // TODO: Do we need to support EC?
+    int r = pgbackend->objects_read_sync(
+       oi.soid, cur_off, max_window_size, 0, &chunk_data);
+    if (r < 0) {
+      dout(0) << __func__ << " read fail " << " offset: " << cur_off
+       << " len: " << max_window_size << " r: " << r << dendl;
+      return r;
+    }
+    if (chunk_data.length() == 0) {
+      dout(0) << __func__ << " got 0 byte during chunking " << dendl;
+      return r;
+    }
+    bl.append(chunk_data);
+    cur_off += r;
+  }
+
+  dout(10) << __func__ << " oid: " << oi.soid << " len: " << bl.length() 
+         << " oi.size: " << oi.size << " window_size: " << max_window_size 
+         << " chunk_size: " << chunk_size << dendl;
+  cdc->calc_chunks(bl, &chunks);
+  return 0;
+}
+
+hobject_t PrimaryLogPG::get_fpoid_from_chunk(const hobject_t soid, bufferlist& chunk)
+{
+  pg_pool_t::fingerprint_t fp_algo = pool.info.get_fingerprint_type();
+  if (fp_algo == pg_pool_t::TYPE_FINGERPRINT_NONE) {
+    return hobject_t();
+  }
+  object_t fp_oid = [&fp_algo, &chunk]() -> string {
+    switch (fp_algo) {
+      case pg_pool_t::TYPE_FINGERPRINT_SHA1:
+       return ceph::crypto::digest<ceph::crypto::SHA1>(chunk).to_str();
+      case pg_pool_t::TYPE_FINGERPRINT_SHA256:
+       return ceph::crypto::digest<ceph::crypto::SHA256>(chunk).to_str();
+      case pg_pool_t::TYPE_FINGERPRINT_SHA512:
+       return ceph::crypto::digest<ceph::crypto::SHA512>(chunk).to_str();
+      default:
+       assert(0 == "unrecognized fingerprint type");
+       return {};
+    }
+  }();    
+
+  pg_t raw_pg;
+  object_locator_t oloc(soid);
+  oloc.pool = pool.info.get_dedup_tier();
+  get_osdmap()->object_locator_to_pg(fp_oid, oloc, raw_pg);
+  hobject_t target(fp_oid, oloc.key, snapid_t(),
+                   raw_pg.ps(), raw_pg.pool(),
+                   oloc.nspace);
+  return target;
+}
+
+void PrimaryLogPG::finish_set_dedup(hobject_t oid, map<hobject_t, pair<uint64_t, uint64_t>>& chunks,
+                                   int r, ceph_tid_t tid)
+{
+  dout(10) << __func__ << " " << oid << " tid " << tid
+          << " " << cpp_strerror(r) << dendl;
+  map<hobject_t,ManifestOpRef>::iterator p = manifest_ops.find(oid);
+  if (p == manifest_ops.end()) {
+    dout(10) << __func__ << " no flush_op found" << dendl;
+    return;
+  }
+  ManifestOpRef mop = p->second;
+  ObjectContextRef obc = get_object_context(oid, false);
+  if (!obc) {
+    if (mop->op)
+      osd->reply_op_error(mop->op, -EINVAL);
+    return;
+  }
+  obc->stop_block();
+  kick_object_context_blocked(obc);
+  if (r < 0) {
+    if (mop->op)
+      osd->reply_op_error(mop->op, r);
+    return;
+  }
+
+  if (chunks.size()) {
+    OpContextUPtr ctx = simple_opc_create(obc);
+    ceph_assert(ctx);
+    if (ctx->lock_manager.get_lock_type(
+         RWState::RWWRITE,
+         oid,
+         obc,
+         mop->op)) {
+      dout(20) << __func__ << " took write lock" << dendl;
+    } else if (mop->op) {
+      dout(10) << __func__ << " waiting on write lock " << mop->op << dendl;
+      close_op_ctx(ctx.release());
+      osd->reply_op_error(mop->op, -EAGAIN);
+      return;    
+    }
+
+    // drop all references the current chunk_map has
+    object_ref_delta_t refs;
+    for (auto p : obc->obs.oi.manifest.chunk_map) {
+      refs.dec_ref(p.second.oid);
+    }
+    if (!refs.is_empty()) {
+      ctx->register_on_commit(
+       [oid, this, refs](){
+         dec_refcount(oid, refs);
+       });
+    }
+  
+    ctx->at_version = get_next_version();
+    ctx->new_obs = obc->obs;
+    ctx->new_obs.oi.clear_flag(object_info_t::FLAG_DIRTY);
+    for (auto p : chunks) {
+      struct chunk_info_t info;
+      info.offset = 0;
+      info.length = p.second.second;
+      info.oid =  p.first;
+      ctx->new_obs.oi.manifest.chunk_map[p.second.first] = info;
+    }
+    finish_ctx(ctx.get(), pg_log_entry_t::CLEAN);
+    simple_opc_submit(std::move(ctx));
+  }
+  if (mop->op)
+    osd->reply_op_error(mop->op, r);
+}
+
 int PrimaryLogPG::start_flush(
   OpRequestRef op, ObjectContextRef obc,
   bool blocking, hobject_t *pmissing,
@@ -9992,15 +10252,6 @@ int PrimaryLogPG::start_flush(
   bool preoctopus_compat =
     get_osdmap()->require_osd_release < ceph_release_t::octopus;
   SnapSet snapset;
-  if (obc->obs.oi.has_manifest() && obc->obs.oi.manifest.is_chunked()) {
-    /*
-     * TODO: "flush" for a manifest object means re-running the CDC algorithm on the portions of the
-     * object that are not currently dedup'd (not in the manifest chunk_map) and re-deduping the resulting
-     * chunks.  Adding support for that operation here is future work.
-     *
-     */
-    return -EOPNOTSUPP;
-  }
   if (preoctopus_compat) {
     // for pre-octopus compatibility, filter SnapSet::snaps.  not
     // certain we need this, but let's be conservative.
@@ -10079,6 +10330,15 @@ int PrimaryLogPG::start_flush(
     osd->objecter->op_cancel(tids, -ECANCELED);
   }
 
+  if (obc->obs.oi.has_manifest() && obc->obs.oi.manifest.is_chunked()) {
+    int r = start_dedup(op, obc);
+    if (r != -EINPROGRESS) {
+      if (blocking)
+       obc->stop_block();
+    }
+    return r;
+  }
+
   /**
    * In general, we need to send a delete and a copyfrom.
    * Consider snapc 10:[10, 9, 8, 4, 3, 2]:[10(10, 9), 4(4,3,2)]
index 479994d80eb8425754760bc237d644a93d764963..7decb5704d327deff9b7938b1ce4034af3c4f318 100644 (file)
@@ -260,6 +260,11 @@ public:
   struct ManifestOp {
     RefCountCallback *cb;
     ceph_tid_t objecter_tid;
+    OpRequestRef op;
+    std::map<uint64_t, int> results;
+    std::map<uint64_t, ceph_tid_t> tids; 
+    std::map<hobject_t, pair<uint64_t, uint64_t>> chunks;
+    uint64_t num_chunks = 0;
 
     ManifestOp(RefCountCallback* cb, ceph_tid_t tid)
       : cb(cb), objecter_tid(tid) {}
@@ -1504,11 +1509,17 @@ protected:
                           ObjectContextRef& _l, ObjectContextRef& _g);
   bool inc_refcount_by_set(OpContext* ctx, object_manifest_t& tgt,
                           OSDOp& osd_op);
+  int do_cdc(const object_info_t& oi, bufferlist& bl, vector<pair<uint64_t, uint64_t>>& chunks);
+  int start_dedup(OpRequestRef op, ObjectContextRef obc);
+  hobject_t get_fpoid_from_chunk(const hobject_t soid, bufferlist& chunk);
+  void finish_set_dedup(hobject_t oid, map<hobject_t, pair<uint64_t, uint64_t>>& chunks,
+                       int r, ceph_tid_t tid);
 
   friend struct C_ProxyChunkRead;
   friend class PromoteManifestCallback;
   friend struct C_CopyChunk;
   friend struct RefCountCallback;
+  friend struct C_SetDedupChunks;
 
 public:
   PrimaryLogPG(OSDService *o, OSDMapRef curmap,