public:
using dup_count_t = ssize_t;
+ void maybe_print_status() {
+ utime_t now = ceph_clock_now();
+ if (next_report != utime_t() && now > next_report) {
+ cerr << (int)(now - start) << "s : read "
+ << total_bytes << " bytes so far..."
+ << std::endl;
+ next_report = now;
+ next_report += report_period;
+ }
+ }
+
bool find(string& fp) {
std::shared_lock lock(fingerprint_lock);
auto found_item = fp_map.find(fp);
std::unique_lock lock(fingerprint_lock);
auto found_iter = fp_map.find(chunk.fingerprint);
ssize_t cur_reference = 1;
+ total_bytes += chunk.size;
+ maybe_print_status();
if (found_iter == fp_map.end()) {
fp_map.insert({chunk.fingerprint, 1});
} else {
return cur_reference >= dedup_threshold && dedup_threshold != -1;
}
- FpStore(size_t chunk_threshold) : dedup_threshold(chunk_threshold) { }
+ FpStore(size_t chunk_threshold, uint32_t report_period) :
+ dedup_threshold(chunk_threshold), report_period(report_period) {
+ next_report = start;
+ next_report += report_period;
+ }
private:
ssize_t dedup_threshold = -1;
std::unordered_map<std::string, dup_count_t> fp_map;
std::shared_mutex fingerprint_lock;
+ const utime_t start = ceph_clock_now();
+ utime_t next_report;
+ const uint32_t report_period = default_report_period;
+ size_t total_bytes = 0;
};
struct SampleDedupGlobal {
const double sampling_ratio = -1;
SampleDedupGlobal(
int chunk_threshold,
- int sampling_ratio) :
- fp_store(chunk_threshold),
+ int sampling_ratio,
+ uint32_t report_period) :
+ fp_store(chunk_threshold, report_period),
sampling_ratio(static_cast<double>(sampling_ratio) / 100) { }
};
~SampleDedupWorkerThread() { };
+ size_t get_total_duplicated_size() const {
+ return total_duplicated_size;
+ }
+
+ size_t get_total_object_size() const {
+ return total_object_size;
+ }
+
protected:
void* entry() override {
crawl();
string base_pool_name = get_opts_pool_name(opts);
string chunk_pool_name = get_opts_chunk_pool(opts);
unsigned max_thread = get_opts_max_thread(opts);
+ uint32_t report_period = default_report_period;
bool loop = false;
if (opts.count("loop")) {
}
std::string chunk_algo = get_opts_chunk_algo(opts);
+ report_period = get_opts_report_period(opts);
Rados rados;
int ret = rados.init_with_context(g_ceph_context);
}
SampleDedupWorkerThread::SampleDedupGlobal sample_dedup_global(
- chunk_dedup_threshold, sampling_ratio);
+ chunk_dedup_threshold, sampling_ratio, report_period);
std::list<SampleDedupWorkerThread> threads;
+ size_t total_size = 0;
+ size_t total_duplicate_size = 0;
for (unsigned i = 0; i < max_thread; i++) {
cout << " add thread.. " << std::endl;
ObjectCursor shard_start;
}
for (auto &p : threads) {
+ total_size += p.get_total_object_size();
+ total_duplicate_size += p.get_total_duplicated_size();
p.join();
}
+
+ cerr << "Summary: read "
+ << total_size << " bytes so far and found saveable space ("
+ << total_duplicate_size << " bytes)."
+ << std::endl;
+
if (loop) {
sleep(wakeup_period);
} else {