]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
tools/ceph_dedup_tool: limit memory used for fingerprint database
authormyoungwon oh <ohmyoungwon@gmail.com>
Thu, 22 Jun 2023 04:42:55 +0000 (04:42 +0000)
committermyoungwon oh <ohmyoungwon@gmail.com>
Thu, 2 Nov 2023 05:53:45 +0000 (05:53 +0000)
Adds an FpMap capable of discarding any entries which have not met the
dedup threshold.

Signed-off-by: Myoungwon Oh <myoungwon.oh@samsung.com>
Signed-off-by: Samuel Just <sjust@redhat.com>
src/tools/ceph_dedup_tool.cc

index b57c74cf45dd50a5202baa8fe8f6ac337fd947d7..0cb90699925dae8cd86038516e0e7ed3bccc8141 100644 (file)
@@ -185,6 +185,7 @@ po::options_description make_usage() {
     ("daemon", ": execute sample dedup in daemon mode")
     ("loop", ": execute sample dedup in a loop until terminated. Sleeps 'wakeup-period' seconds between iterations")
     ("wakeup-period", po::value<int>(), ": set the wakeup period of crawler thread (sec)")
+    ("fpstore-threshold", po::value<int>()->default_value(100_M), ": set max size of in-memory fingerprint store (bytes)")
   ;
   desc.add(op_desc);
   return desc;
@@ -566,10 +567,135 @@ public:
     bufferlist data;
   };
 
-  class FpStore {
+  using dup_count_t = ssize_t;
+
+  template <typename K, typename V>
+  class FpMap {
+    using map_t = std::unordered_map<K, V>;
   public:
-    using dup_count_t = ssize_t;
+    /// Represents a nullable reference into logical container
+    class entry_t {
+      /// Entry may be into one of two maps or NONE, indicates which
+      enum entry_into_t {
+       UNDER, OVER, NONE
+      } entry_into = NONE;
+
+      /// Valid iterator into map for UNDER|OVER, default for NONE
+      map_t::iterator iter;
+
+      entry_t(entry_into_t entry_into, map_t::iterator iter) :
+       entry_into(entry_into), iter(iter) {
+       ceph_assert(entry_into != NONE);
+      }
+
+    public:
+      entry_t() = default;
+
+      auto &operator*() {
+       ceph_assert(entry_into != NONE);
+       return *iter;
+      }
+      auto operator->() {
+       ceph_assert(entry_into != NONE);
+       return iter.operator->();
+      }
+      bool is_valid() const {
+       return entry_into != NONE;
+      }
+      bool is_above_threshold() const {
+       return entry_into == entry_t::OVER;
+      }
+      friend class FpMap;
+    };
+
+    /// inserts str, count into container, must not already be present
+    entry_t insert(const K &str, V count) {
+      std::pair<typename map_t::iterator, bool> r;
+      typename entry_t::entry_into_t s;
+      if (count < dedup_threshold) {
+       r = under_threshold_fp_map.insert({str, count});
+       s = entry_t::UNDER;
+      } else {
+       r = over_threshold_fp_map.insert({str, count});
+       s = entry_t::OVER;
+      }
+      ceph_assert(r.second);
+      return entry_t{s, r.first};
+    }
+
+    /// increments refcount for entry, promotes as necessary, entry must be valid
+    entry_t increment_reference(entry_t entry) {
+      ceph_assert(entry.is_valid());
+      entry.iter->second++;
+      if (entry.entry_into == entry_t::OVER ||
+         entry.iter->second < dedup_threshold) {
+       return entry;
+      } else {
+       auto [over_iter, inserted] = over_threshold_fp_map.insert(
+         *entry);
+       ceph_assert(inserted);
+       under_threshold_fp_map.erase(entry.iter);
+       return entry_t{entry_t::OVER, over_iter};
+      }
+    }
+
+    /// returns entry for fp, return will be !is_valid() if not present
+    auto find(const K &fp) {
+      if (auto iter = under_threshold_fp_map.find(fp);
+         iter != under_threshold_fp_map.end()) {
+       return entry_t{entry_t::UNDER, iter};
+      } else if (auto iter = over_threshold_fp_map.find(fp);
+                iter != over_threshold_fp_map.end()) {
+       return entry_t{entry_t::OVER, iter};
+      }  else {
+       return entry_t{};
+      }
+    }
 
+    /// true if container contains fp
+    bool contains(const K &fp) {
+      return find(fp).is_valid();
+    }
+
+    /// returns number of items
+    size_t get_num_items() const {
+      return under_threshold_fp_map.size() + over_threshold_fp_map.size();
+    }
+
+    /// returns estimate of total in-memory size (bytes)
+    size_t estimate_total_size() const {
+      size_t total = 0;
+      if (!under_threshold_fp_map.empty()) {
+       total += under_threshold_fp_map.size() *
+         (under_threshold_fp_map.begin()->first.size() + sizeof(V));
+      }
+      if (!over_threshold_fp_map.empty()) {
+       total += over_threshold_fp_map.size() *
+         (over_threshold_fp_map.begin()->first.size() + sizeof(V));
+      }
+      return total;
+    }
+
+    /// true if empty
+    bool empty() const {
+      return under_threshold_fp_map.empty() && over_threshold_fp_map.empty();
+    }
+
+    /// instructs container to drop entries with refcounts below threshold
+    void drop_entries_below_threshold() {
+      under_threshold_fp_map.clear();
+    }
+
+    FpMap(ssize_t dedup_threshold) : dedup_threshold(dedup_threshold) {}
+    FpMap() = delete;
+  private:
+    map_t under_threshold_fp_map;
+    map_t over_threshold_fp_map;
+    const ssize_t dedup_threshold;
+  };
+
+  class FpStore {
+  public:
     void maybe_print_status() {
       utime_t now = ceph_clock_now();
       if (next_report != utime_t() && now > next_report) {
@@ -581,41 +707,50 @@ public:
       }
     }
 
-    bool find(string& fp) {
+    bool contains(string& fp) {
       std::shared_lock lock(fingerprint_lock);
-      auto found_item = fp_map.find(fp);
-      return found_item != fp_map.end();
+      return fp_map.contains(fp);
     }
 
     // return true if the chunk is duplicate
     bool add(chunk_t& chunk) {
       std::unique_lock lock(fingerprint_lock);
-      auto found_iter = fp_map.find(chunk.fingerprint);
-      ssize_t cur_reference = 1;
+      auto entry = fp_map.find(chunk.fingerprint);
       total_bytes += chunk.size;
-      maybe_print_status();
-      if (found_iter == fp_map.end()) {
-        fp_map.insert({chunk.fingerprint, 1});
+      if (!entry.is_valid()) {
+       if (is_fpmap_full()) {
+         fp_map.drop_entries_below_threshold();
+         if (is_fpmap_full()) {
+           return false;
+         }
+       }
+       entry = fp_map.insert(chunk.fingerprint, 1);
       } else {
-       cur_reference = ++found_iter->second;
+       entry = fp_map.increment_reference(entry);
       }
-      return cur_reference >= dedup_threshold && dedup_threshold != -1;
+      return entry.is_above_threshold();
     }
 
-    FpStore(size_t chunk_threshold, uint32_t report_period) :
-      dedup_threshold(chunk_threshold), report_period(report_period) {
-      next_report = start;
-      next_report += report_period;
+    bool is_fpmap_full() const {
+      return fp_map.estimate_total_size() >= memory_threshold;
     }
 
+    FpStore(size_t chunk_threshold,
+      uint32_t report_period,  
+      ssize_t memory_threshold) :
+      report_period(report_period),
+      memory_threshold(memory_threshold),
+      fp_map(chunk_threshold) { }
+    FpStore() = delete;
+
   private:
-    ssize_t dedup_threshold = -1;
-    std::unordered_map<std::string, dup_count_t> fp_map;
     std::shared_mutex fingerprint_lock;
     const utime_t start = ceph_clock_now();
     utime_t next_report;
     const uint32_t report_period;
     size_t total_bytes = 0;
+    const uint64_t memory_threshold;
+    FpMap<std::string, dup_count_t> fp_map;
   };
 
   struct SampleDedupGlobal {
@@ -624,8 +759,9 @@ public:
     SampleDedupGlobal(
       int chunk_threshold,
       int sampling_ratio,
-      uint32_t report_period) :
-      fp_store(chunk_threshold, report_period),
+      uint32_t report_period,
+      unsigned fpstore_threshold) :
+      fp_store(chunk_threshold, report_period, fpstore_threshold),
       sampling_ratio(static_cast<double>(sampling_ratio) / 100) { }
   };
 
@@ -836,7 +972,7 @@ void SampleDedupWorkerThread::try_dedup_and_accumulate_result(
       .data = chunk_data
       };
 
-    if (sample_dedup_global.fp_store.find(fingerprint)) {
+    if (sample_dedup_global.fp_store.contains(fingerprint)) {
       duplicated_size += chunk_data.length();
     }
     if (sample_dedup_global.fp_store.add(chunk_info)) {
@@ -1643,6 +1779,8 @@ int make_crawling_daemon(const po::variables_map &opts)
     cout << "100 second is set as wakeup period by default" << std::endl;
   }
 
+  const unsigned fp_threshold = opts["fpstore-threshold"].as<int>();
+
   std::string fp_algo = get_opts_fp_algo(opts);
 
   list<string> pool_names;
@@ -1714,7 +1852,7 @@ int make_crawling_daemon(const po::variables_map &opts)
     }
 
     SampleDedupWorkerThread::SampleDedupGlobal sample_dedup_global(
-      chunk_dedup_threshold, sampling_ratio, report_period);
+      chunk_dedup_threshold, sampling_ratio, report_period, fp_threshold);
 
     std::list<SampleDedupWorkerThread> threads;
     size_t total_size = 0;