("daemon", ": execute sample dedup in daemon mode")
("loop", ": execute sample dedup in a loop until terminated. Sleeps 'wakeup-period' seconds between iterations")
("wakeup-period", po::value<int>(), ": set the wakeup period of crawler thread (sec)")
+ ("fpstore-threshold", po::value<int>()->default_value(100_M), ": set max size of in-memory fingerprint store (bytes)")
;
desc.add(op_desc);
return desc;
bufferlist data;
};
- class FpStore {
+ using dup_count_t = ssize_t;
+
+ template <typename K, typename V>
+ class FpMap {
+ using map_t = std::unordered_map<K, V>;
public:
- using dup_count_t = ssize_t;
+ /// Represents a nullable reference into logical container
+ class entry_t {
+ /// Entry may be into one of two maps or NONE, indicates which
+ enum entry_into_t {
+ UNDER, OVER, NONE
+ } entry_into = NONE;
+
+ /// Valid iterator into map for UNDER|OVER, default for NONE
+ map_t::iterator iter;
+
+ entry_t(entry_into_t entry_into, map_t::iterator iter) :
+ entry_into(entry_into), iter(iter) {
+ ceph_assert(entry_into != NONE);
+ }
+
+ public:
+ entry_t() = default;
+
+ auto &operator*() {
+ ceph_assert(entry_into != NONE);
+ return *iter;
+ }
+ auto operator->() {
+ ceph_assert(entry_into != NONE);
+ return iter.operator->();
+ }
+ bool is_valid() const {
+ return entry_into != NONE;
+ }
+ bool is_above_threshold() const {
+ return entry_into == entry_t::OVER;
+ }
+ friend class FpMap;
+ };
+
+ /// inserts str, count into container, must not already be present
+ entry_t insert(const K &str, V count) {
+ std::pair<typename map_t::iterator, bool> r;
+ typename entry_t::entry_into_t s;
+ if (count < dedup_threshold) {
+ r = under_threshold_fp_map.insert({str, count});
+ s = entry_t::UNDER;
+ } else {
+ r = over_threshold_fp_map.insert({str, count});
+ s = entry_t::OVER;
+ }
+ ceph_assert(r.second);
+ return entry_t{s, r.first};
+ }
+
+ /// increments refcount for entry, promotes as necessary, entry must be valid
+ entry_t increment_reference(entry_t entry) {
+ ceph_assert(entry.is_valid());
+ entry.iter->second++;
+ if (entry.entry_into == entry_t::OVER ||
+ entry.iter->second < dedup_threshold) {
+ return entry;
+ } else {
+ auto [over_iter, inserted] = over_threshold_fp_map.insert(
+ *entry);
+ ceph_assert(inserted);
+ under_threshold_fp_map.erase(entry.iter);
+ return entry_t{entry_t::OVER, over_iter};
+ }
+ }
+
+ /// returns entry for fp, return will be !is_valid() if not present
+ auto find(const K &fp) {
+ if (auto iter = under_threshold_fp_map.find(fp);
+ iter != under_threshold_fp_map.end()) {
+ return entry_t{entry_t::UNDER, iter};
+ } else if (auto iter = over_threshold_fp_map.find(fp);
+ iter != over_threshold_fp_map.end()) {
+ return entry_t{entry_t::OVER, iter};
+ } else {
+ return entry_t{};
+ }
+ }
+ /// true if container contains fp
+ bool contains(const K &fp) {
+ return find(fp).is_valid();
+ }
+
+ /// returns number of items
+ size_t get_num_items() const {
+ return under_threshold_fp_map.size() + over_threshold_fp_map.size();
+ }
+
+ /// returns estimate of total in-memory size (bytes)
+ size_t estimate_total_size() const {
+ size_t total = 0;
+ if (!under_threshold_fp_map.empty()) {
+ total += under_threshold_fp_map.size() *
+ (under_threshold_fp_map.begin()->first.size() + sizeof(V));
+ }
+ if (!over_threshold_fp_map.empty()) {
+ total += over_threshold_fp_map.size() *
+ (over_threshold_fp_map.begin()->first.size() + sizeof(V));
+ }
+ return total;
+ }
+
+ /// true if empty
+ bool empty() const {
+ return under_threshold_fp_map.empty() && over_threshold_fp_map.empty();
+ }
+
+ /// instructs container to drop entries with refcounts below threshold
+ void drop_entries_below_threshold() {
+ under_threshold_fp_map.clear();
+ }
+
+ FpMap(ssize_t dedup_threshold) : dedup_threshold(dedup_threshold) {}
+ FpMap() = delete;
+ private:
+ map_t under_threshold_fp_map;
+ map_t over_threshold_fp_map;
+ const ssize_t dedup_threshold;
+ };
+
+ class FpStore {
+ public:
void maybe_print_status() {
utime_t now = ceph_clock_now();
if (next_report != utime_t() && now > next_report) {
}
}
- bool find(string& fp) {
+ bool contains(string& fp) {
std::shared_lock lock(fingerprint_lock);
- auto found_item = fp_map.find(fp);
- return found_item != fp_map.end();
+ return fp_map.contains(fp);
}
// return true if the chunk is duplicate
bool add(chunk_t& chunk) {
std::unique_lock lock(fingerprint_lock);
- auto found_iter = fp_map.find(chunk.fingerprint);
- ssize_t cur_reference = 1;
+ auto entry = fp_map.find(chunk.fingerprint);
total_bytes += chunk.size;
- maybe_print_status();
- if (found_iter == fp_map.end()) {
- fp_map.insert({chunk.fingerprint, 1});
+ if (!entry.is_valid()) {
+ if (is_fpmap_full()) {
+ fp_map.drop_entries_below_threshold();
+ if (is_fpmap_full()) {
+ return false;
+ }
+ }
+ entry = fp_map.insert(chunk.fingerprint, 1);
} else {
- cur_reference = ++found_iter->second;
+ entry = fp_map.increment_reference(entry);
}
- return cur_reference >= dedup_threshold && dedup_threshold != -1;
+ return entry.is_above_threshold();
}
- FpStore(size_t chunk_threshold, uint32_t report_period) :
- dedup_threshold(chunk_threshold), report_period(report_period) {
- next_report = start;
- next_report += report_period;
+ bool is_fpmap_full() const {
+ return fp_map.estimate_total_size() >= memory_threshold;
}
+ FpStore(size_t chunk_threshold,
+ uint32_t report_period,
+ ssize_t memory_threshold) :
+ report_period(report_period),
+ memory_threshold(memory_threshold),
+ fp_map(chunk_threshold) { }
+ FpStore() = delete;
+
private:
- ssize_t dedup_threshold = -1;
- std::unordered_map<std::string, dup_count_t> fp_map;
std::shared_mutex fingerprint_lock;
const utime_t start = ceph_clock_now();
utime_t next_report;
const uint32_t report_period;
size_t total_bytes = 0;
+ const uint64_t memory_threshold;
+ FpMap<std::string, dup_count_t> fp_map;
};
struct SampleDedupGlobal {
SampleDedupGlobal(
int chunk_threshold,
int sampling_ratio,
- uint32_t report_period) :
- fp_store(chunk_threshold, report_period),
+ uint32_t report_period,
+ unsigned fpstore_threshold) :
+ fp_store(chunk_threshold, report_period, fpstore_threshold),
sampling_ratio(static_cast<double>(sampling_ratio) / 100) { }
};
.data = chunk_data
};
- if (sample_dedup_global.fp_store.find(fingerprint)) {
+ if (sample_dedup_global.fp_store.contains(fingerprint)) {
duplicated_size += chunk_data.length();
}
if (sample_dedup_global.fp_store.add(chunk_info)) {
cout << "100 second is set as wakeup period by default" << std::endl;
}
+ const unsigned fp_threshold = opts["fpstore-threshold"].as<int>();
+
std::string fp_algo = get_opts_fp_algo(opts);
list<string> pool_names;
}
SampleDedupWorkerThread::SampleDedupGlobal sample_dedup_global(
- chunk_dedup_threshold, sampling_ratio, report_period);
+ chunk_dedup_threshold, sampling_ratio, report_period, fp_threshold);
std::list<SampleDedupWorkerThread> threads;
size_t total_size = 0;