]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw/dedup: Prevent the dup-counter from wrapping around after it reaches 64K of ident... dedup_high_dup_count_wip_vx
authorGabriel BenHanokh <gbenhano@redhat.com>
Mon, 24 Nov 2025 08:02:22 +0000 (08:02 +0000)
committerbenhanokh <gbenhano@redhat.com>
Thu, 25 Dec 2025 15:42:48 +0000 (17:42 +0200)
Limit dedup from a single SRC to 128 Target copies to prevent OMAP size
from growing out of control
Tests cleanup

Resolves: rhbz#2415656
Resolves: rhbz#2416043

Signed-off-by: Gabriel BenHanokh <gbenhano@redhat.com>
src/rgw/driver/rados/rgw_dedup.cc
src/rgw/driver/rados/rgw_dedup_table.cc
src/rgw/driver/rados/rgw_dedup_table.h
src/rgw/driver/rados/rgw_dedup_utils.cc
src/rgw/driver/rados/rgw_dedup_utils.h
src/test/rgw/dedup/test_dedup.py
src/test/rgw/dedup/tox.ini

index eaf0492cb4880fa4fa4e282628ae7eeabd209166..f841e8aad5a9efbbe06023c2039d0a610ce28bb7 100644 (file)
@@ -465,7 +465,9 @@ namespace rgw::dedup {
                        << "::ETAG=" << std::hex << p_rec->s.md5_high
                        << p_rec->s.md5_low << std::dec << dendl;
 
-    int ret = p_table->add_entry(&key, block_id, rec_id, has_shared_manifest);
+    int ret = p_table->add_entry(&key, block_id, rec_id, has_shared_manifest,
+                                 &p_stats->small_objs_stat, &p_stats->big_objs_stat,
+                                 &p_stats->dup_head_bytes_estimate);
     if (ret == 0) {
       p_stats->loaded_objects ++;
       ldpp_dout(dpp, 20) << __func__ << "::" << p_rec->bucket_name << "/"
@@ -1061,6 +1063,24 @@ namespace rgw::dedup {
       return 0;
     }
 
+    // limit the number of ref_count in the SRC-OBJ to MAX_COPIES_PER_OBJ
+    // check <= because we also count the SRC-OBJ
+    if (src_val.get_count() <= MAX_COPIES_PER_OBJ) {
+      disk_block_id_t src_block_id = src_val.get_src_block_id();
+      record_id_t     src_rec_id   = src_val.get_src_rec_id();
+      // update the number of identical copies we got
+      ldpp_dout(dpp, 20) << __func__ << "::Obj " << p_rec->obj_name
+                         << " has " << src_val.get_count() << " copies" << dendl;
+      p_table->inc_count(&key_from_bucket_index, src_block_id, src_rec_id);
+    }
+    else {
+      // We don't want more than @MAX_COPIES_PER_OBJ to prevent OMAP overload
+      p_stats->skipped_too_many_copies++;
+      ldpp_dout(dpp, 10) << __func__ << "::Obj " << p_rec->obj_name
+                         << " has too many copies already" << dendl;
+      return 0;
+    }
+
     // Every object after this point was counted as a dedup potential
     // If we conclude that it can't be dedup it should be accounted for
     rgw_bucket b{p_rec->tenant_name, p_rec->bucket_name, p_rec->bucket_id};
@@ -1250,8 +1270,8 @@ namespace rgw::dedup {
       return 0;
     }
 
-    disk_block_id_t src_block_id = src_val.block_idx;
-    record_id_t src_rec_id = src_val.rec_id;
+    disk_block_id_t src_block_id = src_val.get_src_block_id();
+    record_id_t     src_rec_id   = src_val.get_src_rec_id();
     if (block_id == src_block_id && rec_id == src_rec_id) {
       // the table entry point to this record which means it is a dedup source so nothing to do
       p_stats->skipped_source_record++;
@@ -1794,8 +1814,7 @@ namespace rgw::dedup {
         return -ECANCELED;
       }
     }
-    p_table->count_duplicates(&p_stats->small_objs_stat, &p_stats->big_objs_stat,
-                              &p_stats->dup_head_bytes_estimate);
+    p_table->count_duplicates(&p_stats->small_objs_stat, &p_stats->big_objs_stat);
     display_table_stat_counters(dpp, p_stats);
 
     ldpp_dout(dpp, 10) << __func__ << "::MD5 Loop::" << d_ctl.dedup_type << dendl;
index c4841303e337d1b3e23864612f892aa31bf85291..4f34b27d18edaad34f8b24f682e8cedd7b533850 100644 (file)
@@ -83,10 +83,14 @@ namespace rgw::dedup {
           redistributed_loopback++;
         }
 
+        // we no longer need the counter, reuse it to count actual dedup
+        hash_tab[idx].val.reset_count();
         redistributed_search_max = std::max(redistributed_search_max, count);
         redistributed_search_total += count;
       }
       else {
+        // we no longer need the counter, reuse it to count actual dedup
+        hash_tab[tab_idx].val.reset_count();
         redistributed_not_needed++;
       }
     }
@@ -104,11 +108,44 @@ namespace rgw::dedup {
     return idx;
   }
 
+  //---------------------------------------------------------------------------
+  static void inc_counters(const key_t *p_key,
+                           uint32_t head_object_size,
+                           dedup_stats_t *p_small_objs,
+                           dedup_stats_t *p_big_objs,
+                           uint64_t *p_duplicate_head_bytes)
+  {
+    // This is an approximation only since size is stored in 4KB resolution
+    uint64_t byte_size_approx = disk_blocks_to_byte_size(p_key->size_4k_units);
+
+    // skip small single part objects which we can't dedup
+    if (!p_key->multipart_object() && (byte_size_approx <= head_object_size)) {
+      p_small_objs->duplicate_count ++;
+      p_small_objs->dedup_bytes_estimate += byte_size_approx;
+      return;
+    }
+    else {
+      uint64_t dup_bytes_approx = calc_deduped_bytes(head_object_size,
+                                                     p_key->num_parts,
+                                                     byte_size_approx);
+      p_big_objs->duplicate_count ++;
+      p_big_objs->dedup_bytes_estimate += dup_bytes_approx;
+
+      if (!p_key->multipart_object()) {
+        // single part objects duplicate the head object when dedup is used
+        *p_duplicate_head_bytes += head_object_size;
+      }
+    }
+  }
+
   //---------------------------------------------------------------------------
   int dedup_table_t::add_entry(key_t *p_key,
                                disk_block_id_t block_id,
                                record_id_t rec_id,
-                               bool shared_manifest)
+                               bool shared_manifest,
+                               dedup_stats_t *p_small_objs,
+                               dedup_stats_t *p_big_objs,
+                               uint64_t *p_duplicate_head_bytes)
   {
     value_t new_val(block_id, rec_id, shared_manifest);
     uint32_t idx = find_entry(p_key);
@@ -128,7 +165,13 @@ namespace rgw::dedup {
     }
     else {
       ceph_assert(hash_tab[idx].key == *p_key);
-      val.count ++;
+      if (val.count <= MAX_COPIES_PER_OBJ) {
+        inc_counters(p_key, head_object_size, p_small_objs, p_big_objs,
+                     p_duplicate_head_bytes);
+      }
+      if (val.count < std::numeric_limits<std::uint16_t>::max()) {
+        val.count ++;
+      }
       if (!val.has_shared_manifest() && shared_manifest) {
         // replace value!
         ldpp_dout(dpp, 20) << __func__ << "::Replace with shared_manifest::["
@@ -154,8 +197,6 @@ namespace rgw::dedup {
     ceph_assert(hash_tab[idx].key == *p_key);
     value_t &val = hash_tab[idx].val;
     ceph_assert(val.is_occupied());
-    // we only update non-singletons since we purge singletons after the first pass
-    ceph_assert(val.count > 1);
 
     // need to overwrite the block_idx/rec_id from the first pass
     // unless already set with shared_manifest with the correct block-id/rec-id
@@ -189,23 +230,46 @@ namespace rgw::dedup {
     return -ENOENT;
   }
 
+  //---------------------------------------------------------------------------
+  int dedup_table_t::inc_count(const key_t *p_key,
+                               disk_block_id_t block_id,
+                               record_id_t rec_id)
+  {
+    uint32_t idx = find_entry(p_key);
+    value_t &val = hash_tab[idx].val;
+    if (val.is_occupied()) {
+      if (val.block_idx == block_id && val.rec_id == rec_id) {
+        val.inc_count();
+        return 0;
+      }
+      else {
+        ldpp_dout(dpp, 5) << __func__ << "::ERR Failed Ncopies bloc/rec" << dendl;
+      }
+    }
+    else {
+      ldpp_dout(dpp, 5) << __func__ << "::ERR Failed Ncopies key" << dendl;
+    }
+
+    return -ENOENT;
+  }
+
   //---------------------------------------------------------------------------
   int dedup_table_t::get_val(const key_t *p_key, struct value_t *p_val /*OUT*/)
   {
     uint32_t idx = find_entry(p_key);
     const value_t &val = hash_tab[idx].val;
-    if (!val.is_occupied()) {
+    if (val.is_occupied()) {
+      *p_val = val;
+      return 0;
+    }
+    else {
       return -ENOENT;
     }
-
-    *p_val = val;
-    return 0;
   }
 
   //---------------------------------------------------------------------------
   void dedup_table_t::count_duplicates(dedup_stats_t *p_small_objs,
-                                       dedup_stats_t *p_big_objs,
-                                       uint64_t *p_duplicate_head_bytes)
+                                       dedup_stats_t *p_big_objs)
   {
     for (uint32_t tab_idx = 0; tab_idx < entries_count; tab_idx++) {
       if (!hash_tab[tab_idx].val.is_occupied()) {
@@ -215,7 +279,6 @@ namespace rgw::dedup {
       const key_t &key = hash_tab[tab_idx].key;
       // This is an approximation only since size is stored in 4KB resolution
       uint64_t byte_size_approx = disk_blocks_to_byte_size(key.size_4k_units);
-      uint32_t duplicate_count = (hash_tab[tab_idx].val.count -1);
 
       // skip small single part objects which we can't dedup
       if (!key.multipart_object() && (byte_size_approx <= head_object_size)) {
@@ -223,29 +286,16 @@ namespace rgw::dedup {
           p_small_objs->singleton_count++;
         }
         else {
-          p_small_objs->duplicate_count += duplicate_count;
           p_small_objs->unique_count ++;
-          p_small_objs->dedup_bytes_estimate += (duplicate_count * byte_size_approx);
         }
-        continue;
-      }
-
-      if (hash_tab[tab_idx].val.is_singleton()) {
-        p_big_objs->singleton_count++;
       }
       else {
-        ceph_assert(hash_tab[tab_idx].val.count > 1);
-        uint64_t dup_bytes_approx = calc_deduped_bytes(head_object_size,
-                                                       key.num_parts,
-                                                       byte_size_approx);
-        p_big_objs->dedup_bytes_estimate += (duplicate_count * dup_bytes_approx);
-        p_big_objs->duplicate_count += duplicate_count;
-        p_big_objs->unique_count ++;
-
-        if (!key.multipart_object()) {
-          // single part objects duplicate the head object when dedup is used
-          uint64_t dup_head_bytes = duplicate_count * head_object_size;
-          *p_duplicate_head_bytes += dup_head_bytes;
+        if (hash_tab[tab_idx].val.is_singleton()) {
+          p_big_objs->singleton_count++;
+        }
+        else {
+          ceph_assert(hash_tab[tab_idx].val.count > 1);
+          p_big_objs->unique_count ++;
         }
       }
     }
index 85fd3295f5e5393b378a51f4e36fcf1278cbda44..4a46db6e5b7912f5dd09fde275d87a054a1c7946 100644 (file)
@@ -66,6 +66,7 @@ namespace rgw::dedup {
   public:
     // 8 Bytes Value
     struct value_t {
+      friend class dedup_table_t;
       value_t() {
         this->block_idx = 0xFFFFFFFF;
         this->count  = 0;
@@ -83,15 +84,21 @@ namespace rgw::dedup {
           flags.set_shared_manifest();
         }
       }
-
-      inline void clear_flags() { flags.clear(); }
       inline bool has_shared_manifest() const {return flags.has_shared_manifest(); }
+      inline uint16_t        get_count() { return this->count; }
+      inline disk_block_id_t get_src_block_id() { return this->block_idx; }
+      inline record_id_t     get_src_rec_id() { return this->rec_id; }
+    private:
       inline void set_shared_manifest_src() { this->flags.set_shared_manifest(); }
+      inline void inc_count() { count ++; }
+      inline void reset_count() { count = 0; }
+      inline void clear_flags() { flags.clear(); }
       inline bool is_singleton() const { return (count == 1); }
       inline bool is_occupied() const { return flags.is_occupied(); }
       inline void set_occupied() { this->flags.set_occupied();  }
       inline void clear_occupied() { this->flags.clear_occupied(); }
 
+
       disk_block_id_t block_idx; // 32 bits
       uint16_t        count;     // 16 bits
       record_id_t     rec_id;    //  8 bits
@@ -103,20 +110,27 @@ namespace rgw::dedup {
                   uint32_t _head_object_size,
                   uint8_t *p_slab,
                   uint64_t slab_size);
-    int add_entry(key_t *p_key, disk_block_id_t block_id, record_id_t rec_id,
-                  bool shared_manifest);
+    int add_entry(key_t *p_key,
+                  disk_block_id_t block_id,
+                  record_id_t rec_id,
+                  bool shared_manifest,
+                  dedup_stats_t *p_small_objs_stat,
+                  dedup_stats_t *p_big_objs_stat,
+                  uint64_t *p_duplicate_head_bytes);
+
     void update_entry(key_t *p_key, disk_block_id_t block_id, record_id_t rec_id,
                       bool shared_manifest);
 
     int  get_val(const key_t *p_key, struct value_t *p_val /*OUT*/);
 
+    int inc_count(const key_t *p_key, disk_block_id_t block_id, record_id_t rec_id);
+
     int set_shared_manifest_src_mode(const key_t *p_key,
                                      disk_block_id_t block_id,
                                      record_id_t rec_id);
 
     void count_duplicates(dedup_stats_t *p_small_objs_stat,
-                          dedup_stats_t *p_big_objs_stat,
-                          uint64_t *p_duplicate_head_bytes);
+                          dedup_stats_t *p_big_objs_stat);
 
     void remove_singletons_and_redistribute_keys();
   private:
index 04c842a0fbaac733d7914aa779e6df9ed0f5fdfa..61ad6b91c516ef6cff68dc0ddca89000fefdf44c 100644 (file)
@@ -557,6 +557,7 @@ namespace rgw::dedup {
     this->skipped_purged_small    += other.skipped_purged_small;
     this->skipped_singleton       += other.skipped_singleton;
     this->skipped_singleton_bytes += other.skipped_singleton_bytes;
+    this->skipped_too_many_copies += other.skipped_too_many_copies;
     this->skipped_source_record   += other.skipped_source_record;
     this->duplicate_records       += other.duplicate_records;
     this->size_mismatch           += other.size_mismatch;
@@ -671,6 +672,9 @@ namespace rgw::dedup {
       if (this->skipped_singleton) {
         f->dump_unsigned("Skipped singleton Bytes", this->skipped_singleton_bytes);
       }
+      if (this->skipped_too_many_copies) {
+        f->dump_unsigned("Skipped Too Many Copies", this->skipped_too_many_copies);
+      }
       f->dump_unsigned("Skipped source record", this->skipped_source_record);
 
       if (this->ingress_skip_encrypted) {
@@ -755,6 +759,7 @@ namespace rgw::dedup {
     encode(m.skipped_purged_small, bl);
     encode(m.skipped_singleton, bl);
     encode(m.skipped_singleton_bytes, bl);
+    encode(m.skipped_too_many_copies, bl);
     encode(m.skipped_source_record, bl);
     encode(m.duplicate_records, bl);
     encode(m.size_mismatch, bl);
@@ -808,6 +813,7 @@ namespace rgw::dedup {
     decode(m.skipped_purged_small, bl);
     decode(m.skipped_singleton, bl);
     decode(m.skipped_singleton_bytes, bl);
+    decode(m.skipped_too_many_copies, bl);
     decode(m.skipped_source_record, bl);
     decode(m.duplicate_records, bl);
     decode(m.size_mismatch, bl);
index ae10ffd6c69e606a866d282313f38d931f16b482..abe624321225b337378f4495a5eac857f47c990c 100644 (file)
@@ -56,6 +56,10 @@ namespace rgw::dedup {
   static_assert(MAX_MD5_SHARD  < NULL_SHARD);
   static_assert(MAX_MD5_SHARD  <= MD5_SHARD_HARD_LIMIT);
 
+  // Limit the number of duplicates allowed per unique object to control
+  // the number of ref_count entries in the SRC-OBJ
+  const uint8_t MAX_COPIES_PER_OBJ = 128;
+
   //---------------------------------------------------------------------------
   enum dedup_req_type_t {
     DEDUP_TYPE_NONE     = 0,
@@ -249,6 +253,7 @@ namespace rgw::dedup {
     uint64_t skipped_purged_small = 0;
     uint64_t skipped_singleton = 0;
     uint64_t skipped_singleton_bytes = 0;
+    uint64_t skipped_too_many_copies = 0;
     uint64_t skipped_source_record = 0;
     uint64_t duplicate_records = 0;
     uint64_t size_mismatch = 0;
index 4838f91056fb016e2191e3a491eeaf83bdf8c192..3f3a3d606dd27f9825a065f2f83b5b460c4a695e 100644 (file)
@@ -49,8 +49,6 @@ class Dedup_Stats:
     unique_obj : int = 0
     dedup_bytes_estimate : int = 0
     duplicate_obj : int = 0
-    dup_head_size_estimate : int = 0
-    dup_head_size : int = 0
     deduped_obj_bytes : int = 0
     non_default_storage_class_objs_bytes : int = 0
     potential_singleton_obj : int = 0
@@ -278,6 +276,7 @@ default_config = TransferConfig(multipart_threshold=MULTIPART_SIZE, multipart_ch
 ETAG_ATTR="user.rgw.etag"
 POOLNAME="default.rgw.buckets.data"
 
+MAX_COPIES_PER_OBJ=128
 #-------------------------------------------------------------------------------
 def write_file(filename, size):
     full_filename = OUT_DIR + filename
@@ -541,7 +540,6 @@ def calc_rados_obj_count(num_copies, obj_size, config):
 
 #-------------------------------------------------------------------------------
 def calc_dedupable_space(obj_size, config):
-    dup_head_size=0
     threshold = config.multipart_threshold
     # Objects with size bigger than MULTIPART_SIZE are uploaded as multi-part
     # multi-part objects got a zero size Head objects
@@ -549,13 +547,12 @@ def calc_dedupable_space(obj_size, config):
         dedupable_space = obj_size
     elif obj_size > RADOS_OBJ_SIZE:
         dedupable_space = obj_size - RADOS_OBJ_SIZE
-        dup_head_size = RADOS_OBJ_SIZE
     else:
         dedupable_space = 0
 
     log.debug("obj_size=%.2f MiB, dedupable_space=%.2f MiB",
               float(obj_size)/MB, float(dedupable_space)/MB)
-    return (dedupable_space, dup_head_size)
+    return dedupable_space
 
 BLOCK_SIZE=4096
 #-------------------------------------------------------------------------------
@@ -566,6 +563,7 @@ def calc_on_disk_byte_size(byte_size):
 #-------------------------------------------------------------------------------
 def calc_expected_stats(dedup_stats, obj_size, num_copies, config):
     dups_count = (num_copies - 1)
+    dups_count = min(dups_count, MAX_COPIES_PER_OBJ)
     on_disk_byte_size = calc_on_disk_byte_size(obj_size)
     log.debug("obj_size=%d, on_disk_byte_size=%d", obj_size, on_disk_byte_size)
     threshold = config.multipart_threshold
@@ -595,17 +593,13 @@ def calc_expected_stats(dedup_stats, obj_size, num_copies, config):
     else:
         dedup_stats.skip_src_record += 1
         dedup_stats.set_shared_manifest_src += 1
-        dedup_stats.set_hash += num_copies
-        dedup_stats.invalid_hash += num_copies
+        dedup_stats.set_hash += (dups_count + 1)
+        dedup_stats.invalid_hash += (dups_count + 1)
         dedup_stats.unique_obj += 1
         dedup_stats.duplicate_obj += dups_count
         dedup_stats.deduped_obj += dups_count
-        ret=calc_dedupable_space(on_disk_byte_size, config)
-        deduped_obj_bytes=ret[0]
-        dup_head_size=ret[1]
+        deduped_obj_bytes=calc_dedupable_space(on_disk_byte_size, config)
         dedup_stats.deduped_obj_bytes += (deduped_obj_bytes * dups_count)
-        dedup_stats.dup_head_size += (dup_head_size * dups_count)
-        dedup_stats.dup_head_size_estimate += (dup_head_size * dups_count)
         deduped_block_bytes=((deduped_obj_bytes+BLOCK_SIZE-1)//BLOCK_SIZE)*BLOCK_SIZE
         dedup_stats.dedup_bytes_estimate += (deduped_block_bytes * dups_count)
 
@@ -649,9 +643,7 @@ def upload_objects(bucket_name, files, indices, conn, config, check_obj_count=Tr
         assert(obj_size)
         calc_expected_stats(dedup_stats, obj_size, num_copies, config)
         total_space += (obj_size * num_copies)
-        ret=calc_dedupable_space(obj_size, config)
-        dedupable_space=ret[0]
-        dup_head_size=ret[1]
+        dedupable_space=calc_dedupable_space(obj_size, config)
         duplicated_space += ((num_copies-1) * dedupable_space)
         rados_obj_count=calc_rados_obj_count(num_copies, obj_size, config)
         rados_objects_total += (rados_obj_count * num_copies)
@@ -664,10 +656,10 @@ def upload_objects(bucket_name, files, indices, conn, config, check_obj_count=Tr
         for i in range(idx, num_copies):
             key = gen_object_name(filename, i)
             #log.debug("upload_file %s/%s with crc32", bucket_name, key)
-            conn.upload_file(OUT_DIR + filename, bucket_name, key, Config=config, ExtraArgs={'ChecksumAlgorithm': 'crc32'})
+            conn.upload_file(OUT_DIR + filename, bucket_name, key, Config=config)
 
     log.debug("==========================================")
-    log.debug("Summery:\n%d S3 objects were uploaded (%d rados objects), total size = %.2f MiB",
+    log.debug("Summary: %d S3 objects were uploaded (%d rados objects), total size = %.2f MiB",
              s3_objects_total, rados_objects_total, total_space/MB)
     log.debug("Based on calculation we should have %d rados objects", rados_objects_total)
     log.debug("Based on calculation we should have %d duplicated tail objs", duplicated_tail_objs)
@@ -701,9 +693,7 @@ def upload_objects_multi(files, conns, bucket_names, indices, config, check_obj_
         assert(obj_size)
         calc_expected_stats(dedup_stats, obj_size, num_copies, config)
         total_space += (obj_size * num_copies)
-        ret=calc_dedupable_space(obj_size, config)
-        dedupable_space=ret[0]
-        dup_head_size=ret[1]
+        dedupable_space=calc_dedupable_space(obj_size, config)
         duplicated_space += ((num_copies-1) * dedupable_space)
         rados_obj_count=calc_rados_obj_count(num_copies, obj_size, config)
         rados_objects_total += (rados_obj_count * num_copies)
@@ -720,7 +710,7 @@ def upload_objects_multi(files, conns, bucket_names, indices, config, check_obj_
             log.debug("upload_objects::<%s/%s>", bucket_names[ten_id], key)
 
     log.debug("==========================================")
-    log.debug("Summery:%d S3 objects were uploaded (%d rados objects), total size = %.2f MiB",
+    log.debug("Summary:%d S3 objects were uploaded (%d rados objects), total size = %.2f MiB",
              s3_objects_total, rados_objects_total, total_space/MB)
     log.debug("Based on calculation we should have %d rados objects", rados_objects_total)
     log.debug("Based on calculation we should have %d duplicated tail objs", duplicated_tail_objs)
@@ -786,9 +776,7 @@ def procs_upload_objects(files, conns, bucket_names, indices, config, check_obj_
         assert(obj_size)
         calc_expected_stats(dedup_stats, obj_size, num_copies, config)
         total_space += (obj_size * num_copies)
-        ret=calc_dedupable_space(obj_size, config)
-        dedupable_space=ret[0]
-        dup_head_size=ret[1]
+        dedupable_space=calc_dedupable_space(obj_size, config)
         duplicated_space += ((num_copies-1) * dedupable_space)
         rados_obj_count=calc_rados_obj_count(num_copies, obj_size, config)
         rados_objects_total += (rados_obj_count * num_copies)
@@ -801,7 +789,7 @@ def procs_upload_objects(files, conns, bucket_names, indices, config, check_obj_
         proc_list[idx].join()
 
     log.debug("==========================================")
-    log.debug("Summery:%d S3 objects were uploaded (%d rados objects), total size = %.2f MiB",
+    log.debug("Summary:%d S3 objects were uploaded (%d rados objects), total size = %.2f MiB",
              s3_objects_total, rados_objects_total, total_space/MB)
     log.debug("Based on calculation we should have %d rados objects", rados_objects_total)
     log.debug("Based on calculation we should have %d duplicated tail objs", duplicated_tail_objs)
@@ -827,42 +815,42 @@ def procs_upload_objects(files, conns, bucket_names, indices, config, check_obj_
 
 #-------------------------------------------------------------------------------
 def verify_objects(bucket_name, files, conn, expected_results, config):
-    tempfile = OUT_DIR + "temp"
+    tmpfile = OUT_DIR + "temp"
     for f in files:
         filename=f[0]
         obj_size=f[1]
         num_copies=f[2]
         log.debug("comparing file=%s, size=%d, copies=%d", filename, obj_size, num_copies)
         for i in range(0, num_copies):
+            filecmp.clear_cache()
             key = gen_object_name(filename, i)
-            #log.debug("download_file(%s) with crc32", key)
-            conn.download_file(bucket_name, key, tempfile, Config=config, ExtraArgs={'ChecksumMode': 'crc32'})
-            #conn.download_file(bucket_name, key, tempfile, Config=config)
-            result = bash(['cmp', tempfile, OUT_DIR + filename])
-            assert result[1] == 0 ,"Files %s and %s differ!!" % (key, tempfile)
-            os.remove(tempfile)
+            conn.download_file(bucket_name, key, tmpfile, Config=config)
+            equal = filecmp.cmp(tmpfile, OUT_DIR + filename, shallow=False)
+            assert equal ,"Files %s and %s differ!!" % (key, tmpfile)
+            os.remove(tmpfile)
 
+    log.debug("verify_objects: finished reading all objects")
     assert expected_results == count_object_parts_in_all_buckets(True)
     log.debug("verify_objects::completed successfully!!")
 
-
 #-------------------------------------------------------------------------------
 def verify_objects_multi(files, conns, bucket_names, expected_results, config):
     max_tenants=len(conns)
-    tempfile = OUT_DIR + "temp"
+    tmpfile = OUT_DIR + "temp"
     for f in files:
         filename=f[0]
         obj_size=f[1]
         num_copies=f[2]
         log.debug("comparing file=%s, size=%d, copies=%d", filename, obj_size, num_copies)
         for i in range(0, num_copies):
+            filecmp.clear_cache()
             key = gen_object_name(filename, i)
             log.debug("comparing object %s with file %s", key, filename)
             ten_id = i % max_tenants
-            conns[ten_id].download_file(bucket_names[ten_id], key, tempfile, Config=config)
-            result = bash(['cmp', tempfile, OUT_DIR + filename])
-            assert result[1] == 0 ,"Files %s and %s differ!!" % (key, tempfile)
-            os.remove(tempfile)
+            conns[ten_id].download_file(bucket_names[ten_id], key, tmpfile, Config=config)
+            equal = filecmp.cmp(tmpfile, OUT_DIR + filename, shallow=False)
+            assert equal ,"Files %s and %s differ!!" % (key, tmpfile)
+            os.remove(tmpfile)
 
     assert expected_results == count_object_parts_in_all_buckets(True)
     log.debug("verify_objects::completed successfully!!")
@@ -870,7 +858,7 @@ def verify_objects_multi(files, conns, bucket_names, expected_results, config):
 
 #-------------------------------------------------------------------------------
 def thread_verify(thread_id, num_threads, files, conn, bucket, config):
-    tempfile = OUT_DIR + "temp" + str(thread_id)
+    tmpfile = OUT_DIR + "temp" + str(thread_id)
     count = 0
     for f in files:
         filename=f[0]
@@ -883,10 +871,10 @@ def thread_verify(thread_id, num_threads, files, conn, bucket, config):
             if thread_id == target_thread:
                 key = gen_object_name(filename, i)
                 log.debug("comparing object %s with file %s", key, filename)
-                conn.download_file(bucket, key, tempfile, Config=config)
-                result = bash(['cmp', tempfile, OUT_DIR + filename])
-                assert result[1] == 0 ,"Files %s and %s differ!!" % (key, tempfile)
-                os.remove(tempfile)
+                conn.download_file(bucket, key, tmpfile, Config=config)
+                equal = filecmp.cmp(tmpfile, OUT_DIR + filename, shallow=False)
+                assert equal ,"Files %s and %s differ!!" % (key, tmpfile)
+                os.remove(tmpfile)
 
 
 #-------------------------------------------------------------------------------
@@ -932,7 +920,6 @@ def reset_full_dedup_stats(dedup_stats):
     dedup_stats.total_processed_objects = 0
     dedup_stats.set_shared_manifest_src = 0
     dedup_stats.deduped_obj = 0
-    dedup_stats.dup_head_size = 0
     dedup_stats.deduped_obj_bytes = 0
     dedup_stats.skip_shared_manifest = 0
     dedup_stats.skip_src_record = 0
@@ -1053,8 +1040,6 @@ def read_dedup_stats(dry_run):
         dedup_stats.dedup_bytes_estimate = main['Dedup Bytes Estimate']
 
         potential = md5_stats['Potential Dedup']
-        dedup_stats.dup_head_size_estimate = potential['Duplicated Head Bytes Estimate']
-        dedup_stats.dup_head_size = potential['Duplicated Head Bytes']
         dedup_stats.potential_singleton_obj = potential['Singleton Obj (64KB-4MB)']
         dedup_stats.potential_unique_obj = potential['Unique Obj (64KB-4MB)']
         dedup_stats.potential_duplicate_obj = potential['Duplicate Obj (64KB-4MB)']
@@ -1239,7 +1224,7 @@ def simple_dedup(conn, files, bucket_name, run_cleanup_after, config, dry_run):
         ret = upload_objects(bucket_name, files, indices, conn, config)
         expected_results = ret[0]
         dedup_stats = ret[1]
-
+        log.info("%d S3 objects were uploaded", ret[2])
         exec_dedup(dedup_stats, dry_run)
         if dry_run == False:
             log.debug("Verify all objects")
@@ -1512,6 +1497,17 @@ def verify_objects_with_version(bucket_name, op_log, conn, config):
         conn.delete_object(Bucket=bucket_name, Key=key, VersionId=version_id)
 
 
+#-------------------------------------------------------------------------------
+def print_bucket_versioning(conn, bucket_name):
+    resp = conn.get_bucket_versioning(Bucket=bucket_name)
+    status = resp.get('Status')
+    mfadelete = resp.get('MFADelete')
+
+    if status is None:
+        log.info("%s: versioning not configured", bucket_name)
+    else:
+        log.info("%s: Status=%s, MFADelete=%s", bucket_name, status, mfadelete)
+
 #-------------------------------------------------------------------------------
 # generate @num_files objects with @ver_count versions each of @obj_size
 # verify that we got the correct number of rados-objects
@@ -1527,7 +1523,7 @@ def test_dedup_with_versions():
         return
 
     prepare_test()
-    bucket_name = "bucket1"
+    bucket_name = "bucketwithversions"
     files=[]
     op_log=[]
     num_files=43
@@ -1567,9 +1563,12 @@ def test_dedup_with_versions():
     finally:
         # cleanup must be executed even after a failure
         if success == False:
+            # otherwise, objects been removed by verify_objects_with_version()
             delete_all_versions(conn, bucket_name, dry_run=False)
 
-        # otherwise, objects been removed by verify_objects_with_version()
+        conn.put_bucket_versioning(Bucket=bucket_name,
+                                   VersioningConfiguration={"Status": "Suspended"})
+        print_bucket_versioning(conn, bucket_name)
         cleanup(bucket_name, conn)
 
 #==============================================================================
@@ -1890,7 +1889,6 @@ def test_dedup_inc_0_with_tenants():
         s3_objects_total = ret[2]
 
         dedup_stats2 = dedup_stats
-        dedup_stats2.dup_head_size = 0
         dedup_stats2.skip_shared_manifest=dedup_stats.deduped_obj
         dedup_stats2.skip_src_record=dedup_stats.set_shared_manifest_src
         dedup_stats2.set_shared_manifest_src=0
@@ -1939,7 +1937,6 @@ def test_dedup_inc_0():
         s3_objects_total = ret[2]
 
         dedup_stats2 = dedup_stats
-        dedup_stats2.dup_head_size = 0
         dedup_stats2.skip_shared_manifest=dedup_stats.deduped_obj
         dedup_stats2.skip_src_record=dedup_stats.set_shared_manifest_src
         dedup_stats2.set_shared_manifest_src=0
@@ -2007,7 +2004,6 @@ def test_dedup_inc_1_with_tenants():
         stats_combined=ret[1]
         stats_combined.skip_shared_manifest = stats_base.deduped_obj
         stats_combined.skip_src_record     -= stats_base.skip_src_record
-        stats_combined.dup_head_size       -= stats_base.dup_head_size
         stats_combined.skip_src_record     += stats_base.set_shared_manifest_src
 
         stats_combined.set_shared_manifest_src -= stats_base.set_shared_manifest_src
@@ -2071,7 +2067,6 @@ def test_dedup_inc_1():
         expected_results = ret[0]
         stats_combined = ret[1]
         stats_combined.skip_shared_manifest = stats_base.deduped_obj
-        stats_combined.dup_head_size       -= stats_base.dup_head_size
         stats_combined.skip_src_record     -= stats_base.skip_src_record
         stats_combined.skip_src_record     += stats_base.set_shared_manifest_src
 
@@ -2149,7 +2144,6 @@ def test_dedup_inc_2_with_tenants():
         expected_results = ret[0]
         stats_combined = ret[1]
         stats_combined.skip_shared_manifest = stats_base.deduped_obj
-        stats_combined.dup_head_size       -= stats_base.dup_head_size
         stats_combined.skip_src_record     -= stats_base.skip_src_record
         stats_combined.skip_src_record     += stats_base.set_shared_manifest_src
 
@@ -2223,7 +2217,6 @@ def test_dedup_inc_2():
         stats_combined = ret[1]
         stats_combined.skip_shared_manifest = stats_base.deduped_obj
         stats_combined.skip_src_record     -= stats_base.skip_src_record
-        stats_combined.dup_head_size       -= stats_base.dup_head_size
         stats_combined.skip_src_record     += stats_base.set_shared_manifest_src
 
         stats_combined.set_shared_manifest_src -= stats_base.set_shared_manifest_src
@@ -2311,7 +2304,6 @@ def test_dedup_inc_with_remove_multi_tenants():
         # run dedup again
         dedup_stats.set_shared_manifest_src=0
         dedup_stats.deduped_obj=0
-        dedup_stats.dup_head_size=0
         dedup_stats.deduped_obj_bytes=0
         dedup_stats.skip_src_record=src_record
         dedup_stats.skip_shared_manifest=shared_manifest
@@ -2398,7 +2390,6 @@ def test_dedup_inc_with_remove():
         # run dedup again
         dedup_stats.set_shared_manifest_src=0
         dedup_stats.deduped_obj=0
-        dedup_stats.dup_head_size=0
         dedup_stats.deduped_obj_bytes=0
         dedup_stats.skip_src_record=src_record
         dedup_stats.skip_shared_manifest=shared_manifest
@@ -2661,7 +2652,6 @@ def inc_step_with_tenants(stats_base, files, conns, bucket_names, config):
     stats_combined.skip_shared_manifest = stats_base.deduped_obj
     stats_combined.skip_src_record      = src_record
     stats_combined.set_shared_manifest_src -= stats_base.set_shared_manifest_src
-    stats_combined.dup_head_size       -= stats_base.dup_head_size
     stats_combined.deduped_obj         -= stats_base.deduped_obj
     stats_combined.deduped_obj_bytes   -= stats_base.deduped_obj_bytes
 
@@ -2704,12 +2694,11 @@ def test_dedup_inc_loop_with_tenants():
         ret=simple_dedup_with_tenants(files, conns, bucket_names, config)
         stats_base=ret[1]
 
-        for idx in range(0, 9):
+        for idx in range(0, 7):
             ret = inc_step_with_tenants(stats_base, files, conns, bucket_names, config)
             files=ret[0]
             stats_last=ret[1]
             stats_base.set_shared_manifest_src += stats_last.set_shared_manifest_src
-            stats_base.dup_head_size     += stats_last.dup_head_size
             stats_base.deduped_obj       += stats_last.deduped_obj
             stats_base.deduped_obj_bytes += stats_last.deduped_obj_bytes
             stats_base.set_hash          += stats_last.set_hash
@@ -2773,9 +2762,8 @@ def test_dedup_dry_multipart():
 
     num_files=8
     min_size=MULTIPART_SIZE
-    # create files in range [MULTIPART_SIZE, 4*MULTIPART_SIZE] aligned on RADOS_OBJ_SIZE
-    # create files in range [MULTIPART_SIZE, 1GB] aligned on RADOS_OBJ_SIZE
-    gen_files_in_range(files, num_files, min_size, 1024*MB)
+    #gen_files_in_range(files, num_files, min_size, 1024*MB)
+    gen_files_in_range(files, num_files, min_size, 128*MB)
 
     # add files in range [MULTIPART_SIZE, 4*MULTIPART_SIZE] aligned on MULTIPART_SIZE
     gen_files_in_range(files, num_files, min_size, min_size*8, MULTIPART_SIZE)
@@ -2995,3 +2983,161 @@ def test_dedup_dry_large_scale():
 def test_cleanup():
     close_all_connections()
 
+#---------------------------------------------------------------------------
+def proc_upload_identical(proc_id, num_procs, filename, conn, bucket_name, num_copies, config):
+    log.debug("Proc_ID=%d/%d::num_copies=%d", proc_id, num_procs, num_copies)
+    for idx in range(num_copies):
+        log.debug("upload_objects::%s::idx=%d", filename, idx);
+        target_proc = (idx % num_procs)
+        if (proc_id == target_proc):
+            key = gen_object_name(filename, idx)
+            conn.upload_file(OUT_DIR+filename, bucket_name, key, Config=config)
+            #log.info("[%d]upload_objects::<%s/%s>", proc_id, bucket_name, key)
+
+#---------------------------------------------------------------------------
+def proc_parallel_upload_identical(files, conns, bucket_name, config):
+    num_procs=len(conns)
+    proc_list=list()
+    f = files[0]
+    filename=f[0]
+    num_copies=f[2]
+    for idx in range(num_procs):
+        log.debug("Create proc_id=%d", idx)
+        p=Process(target=proc_upload_identical,
+                  args=(idx, num_procs, filename, conns[idx], bucket_name, num_copies, config))
+        proc_list.append(p)
+        proc_list[idx].start()
+
+
+    # wait for all worker proc to join
+    for idx in range(num_procs):
+        proc_list[idx].join()
+
+#---------------------------------------------------------------------------
+def calc_identical_copies_stats(files, conns, bucket_name, config):
+    f = files[0]
+    obj_size=f[1]
+    filename=f[0]
+    copies_count=f[2]
+    dedup_stats = Dedup_Stats()
+    s3_objects_total=copies_count
+    calc_expected_stats(dedup_stats, obj_size, copies_count, config)
+    dups_count = min(copies_count, MAX_COPIES_PER_OBJ)
+    total_space = (obj_size * copies_count)
+    dedupable_space=calc_dedupable_space(obj_size, config)
+    duplicated_space = (dups_count * dedupable_space)
+    rados_obj_count=calc_rados_obj_count(copies_count, obj_size, config)
+    rados_objects_total = (rados_obj_count * copies_count)
+    duplicated_tail_objs = (dups_count * (rados_obj_count-1))
+    log.info("upload_objects::%s::size=%d, copies_count=%d",
+             filename, obj_size, copies_count);
+
+    s3_object_count = count_objects_in_bucket(bucket_name, conns[0])
+    assert rados_objects_total == count_object_parts_in_all_buckets()
+    assert (s3_object_count == s3_objects_total)
+    expected_rados_obj_count_post_dedup=(rados_objects_total-duplicated_tail_objs)
+    return (expected_rados_obj_count_post_dedup, dedup_stats, s3_objects_total)
+
+#-------------------------------------------------------------------------------
+def __test_dedup_identical_copies(files, config, dry_run, verify, force_clean=False):
+    num_threads=32
+    bucket_name = "bucket1"
+    conns=get_connections(num_threads)
+    bucket_names=[bucket_name] * num_threads
+    try:
+        if dry_run:
+            conns[0].create_bucket(Bucket=bucket_name)
+            start = time.time_ns()
+            proc_parallel_upload_identical(files, conns, bucket_name, config)
+            upload_time_sec = (time.time_ns() - start) / (1000*1000*1000)
+            log.info("upload time = %d sec", upload_time_sec)
+
+        ret=calc_identical_copies_stats(files, conns, bucket_name, config)
+        expected_results = ret[0]
+        dedup_stats = ret[1]
+
+        exec_dedup(dedup_stats, dry_run)
+        if verify:
+            log.info("Verify all objects")
+            start_time = time.time_ns()
+            threads_verify_objects(files, conns, bucket_names, expected_results, config)
+            end_time = time.time_ns()
+            log.info("Verify all objects time = %d(sec)",
+                     (end_time - start_time)/1_000_000_000)
+    finally:
+        # cleanup must be executed even after a failure
+        if not dry_run or force_clean:
+            log.info("cleanup bucket")
+            cleanup(bucket_name, conns[0])
+
+#-------------------------------------------------------------------------------
+@pytest.mark.basic_test
+def test_dedup_identical_copies_1():
+    num_files=1
+    copies_count=64*1024+1
+    size=64*KB
+    config=default_config
+    prepare_test()
+    files=[]
+    gen_files_fixed_copies(files, num_files, size, copies_count)
+
+    # start with a dry_run
+    dry_run=True
+    verify=False
+    log.info("test_dedup_identical_copies:dry test")
+    __test_dedup_identical_copies(files, config, dry_run, verify)
+
+    # and then perform a full dedup
+    dry_run=False
+    # no need to read-verify data since min size for single-part dedup is 4MB
+    verify=False
+    force=False
+    log.info("test_dedup_identical_copies:full test")
+    __test_dedup_identical_copies(files, config, dry_run, verify, force)
+
+#-------------------------------------------------------------------------------
+@pytest.mark.basic_test
+def test_dedup_identical_copies_multipart():
+    num_files=1
+    copies_count=64*1024+1
+    size=16*KB
+    prepare_test()
+    files=[]
+    gen_files_fixed_copies(files, num_files, size, copies_count)
+    config=TransferConfig(multipart_threshold=size, multipart_chunksize=size)
+    # start with a dry_run
+    dry_run=True
+    verify=False
+    log.info("test_dedup_identical_copies_multipart:dry test")
+    __test_dedup_identical_copies(files, config, dry_run, verify)
+
+    # and then perform a full dedup
+    dry_run=False
+    verify=False
+    force_clean=True
+    log.info("test_dedup_identical_copies_multipart:full test")
+    __test_dedup_identical_copies(files, config, dry_run, verify, force_clean)
+
+#-------------------------------------------------------------------------------
+@pytest.mark.basic_test
+def test_dedup_identical_copies_multipart_small():
+    num_files=1
+    copies_count=1024
+    size=16*KB
+    prepare_test()
+    files=[]
+    gen_files_fixed_copies(files, num_files, size, copies_count)
+    config=TransferConfig(multipart_threshold=size, multipart_chunksize=size)
+    # start with a dry_run
+    dry_run=True
+    verify=False
+    log.info("test_dedup_identical_copies_multipart:dry test")
+    __test_dedup_identical_copies(files, config, dry_run, verify)
+
+    # and then perform a full dedup
+    dry_run=False
+    verify=True
+    force_clean=True
+    log.info("test_dedup_identical_copies_multipart:full test")
+    __test_dedup_identical_copies(files, config, dry_run, verify, force_clean)
+
index a6aaa980d484fdf8301d8f17a620c903f3a08631..ba86319fa318eee165cdaa9c857ac86e3494e3c2 100644 (file)
@@ -6,4 +6,6 @@ skipsdist = True
 deps = -rrequirements.txt
 passenv =
   DEDUPTESTS_CONF
+setenv =
+  PYTEST_ADDOPTS = --maxfail=1
 commands = pytest {posargs}