#include <climits>
#include <locale>
#include <memory>
+#include <math.h>
#include "tools/RadosDump.h"
#include "cls/cas/cls_cas_client.h"
#include "global/signal_handler.h"
#include "common/CDC.h"
+struct EstimateResult {
+ std::unique_ptr<CDC> cdc;
+
+ uint64_t chunk_size;
+
+ ceph::mutex lock = ceph::make_mutex("EstimateResult::lock");
+
+ // < key, <count, chunk_size> >
+ map< string, pair <uint64_t, uint64_t> > chunk_statistics;
+ uint64_t total_bytes = 0;
+
+ EstimateResult(std::string alg, int chunk_size)
+ : cdc(CDC::create(alg, chunk_size)),
+ chunk_size(1ull << chunk_size) {}
+
+ void add_chunk(bufferlist& chunk, const std::string& fp_algo) {
+ string fp;
+ if (fp_algo == "sha1") {
+ sha1_digest_t sha1_val = crypto::digest<crypto::SHA1>(chunk);
+ fp = sha1_val.to_str();
+ } else if (fp_algo == "sha256") {
+ sha256_digest_t sha256_val = crypto::digest<crypto::SHA256>(chunk);
+ fp = sha256_val.to_str();
+ } else if (fp_algo == "sha512") {
+ sha512_digest_t sha512_val = crypto::digest<crypto::SHA512>(chunk);
+ fp = sha512_val.to_str();
+ } else {
+ ceph_assert(0 == "no support fingerperint algorithm");
+ }
+
+ std::lock_guard l(lock);
+ auto p = chunk_statistics.find(fp);
+ if (p != chunk_statistics.end()) {
+ p->second.first++;
+ if (p->second.second != chunk.length()) {
+ cerr << "warning: hash collision on " << fp
+ << ": was " << p->second.second
+ << " now " << chunk.length() << std::endl;
+ }
+ } else {
+ chunk_statistics[fp] = make_pair(1, chunk.length());
+ }
+ total_bytes += chunk.length();
+ }
+
+ void dump(Formatter *f) const {
+ f->dump_unsigned("target_chunk_size", chunk_size);
+
+ uint64_t dedup_bytes = 0;
+ uint64_t dedup_objects = chunk_statistics.size();
+ for (auto& j : chunk_statistics) {
+ dedup_bytes += j.second.second;
+ }
+ double dedup_ratio = 1.0 - ((double)dedup_bytes / (double)total_bytes);
+ f->dump_unsigned("dedup_bytes", dedup_bytes);
+ //f->dump_unsigned("original_bytes", total_bytes);
+ f->dump_float("dedup_ratio", dedup_ratio);
+
+ uint64_t avg = total_bytes / dedup_objects;
+ uint64_t sqsum = 0;
+ for (auto& j : chunk_statistics) {
+ sqsum += (avg - j.second.second) * (avg - j.second.second);
+ }
+ uint64_t stddev = sqrt(sqsum / dedup_objects);
+ f->dump_unsigned("chunk_size_average", avg);
+ f->dump_unsigned("chunk_size_stddev", stddev);
+ }
+};
+
+map<uint64_t, EstimateResult> dedup_estimates; // chunk size -> result
+
using namespace librados;
unsigned default_op_size = 1 << 26;
unsigned default_max_thread = 2;
int32_t default_report_period = 2;
-map< string, pair <uint64_t, uint64_t> > chunk_statistics; // < key, <count, chunk_size> >
-ceph::mutex glock = ceph::make_mutex("chunk_statistics::Locker");
+ceph::mutex glock = ceph::make_mutex("glock");
void usage()
{
int32_t report_period;
bool m_stop = false;
uint64_t total_bytes = 0;
+ uint64_t total_objects = 0;
uint64_t examined_objects = 0;
- uint64_t total_objects;
+ uint64_t examined_bytes = 0;
uint64_t max_read_size = 0;
bool debug = false;
#define COND_WAIT_INTERVAL 10
m_stop = true;
m_cond.notify_all();
}
- virtual void print_status(Formatter *f, ostream &out) = 0;
+ virtual void print_status(Formatter *f, ostream &out) {}
uint64_t get_examined_objects() { return examined_objects; }
+ uint64_t get_examined_bytes() { return examined_bytes; }
uint64_t get_total_bytes() { return total_bytes; }
uint64_t get_total_objects() { return total_objects; }
friend class EstimateDedupRatio;
class EstimateDedupRatio : public CrawlerThread
{
- std::unique_ptr<CDC> cdc;
string chunk_algo;
string fp_algo;
uint64_t chunk_size;
uint64_t max_seconds;
- map< string, pair <uint64_t, uint64_t> > local_chunk_statistics; // < key, <count, chunk_size> >
public:
EstimateDedupRatio(
uint64_t max_seconds):
CrawlerThread(io_ctx, n, m, begin, end, report_period, num_objects,
max_read_size),
- cdc(CDC::create(chunk_algo, cbits(chunk_size) - 1)),
chunk_algo(chunk_algo),
fp_algo(fp_algo),
chunk_size(chunk_size),
return NULL;
}
void estimate_dedup_ratio();
- void print_status(Formatter *f, ostream &out);
- map< string, pair <uint64_t, uint64_t> > &get_chunk_statistics() { return local_chunk_statistics; }
- void add_chunk_fp_to_stat(bufferlist &chunk);
};
class ChunkScrub: public CrawlerThread
vector<std::unique_ptr<CrawlerThread>> estimate_threads;
-static void print_dedup_estimate(bool debug = false)
+static void print_dedup_estimate(std::string chunk_algo, bool debug = false)
{
- uint64_t total_size = 0;
- uint64_t dedup_size = 0;
- uint64_t examined_objects = 0;
+ /*
+ uint64_t total_bytes = 0;
uint64_t total_objects = 0;
- EstimateDedupRatio *ratio = NULL;
- for (auto &et : estimate_threads) {
- std::lock_guard l{glock};
- ratio = dynamic_cast<EstimateDedupRatio*>(et.get());
- assert(ratio);
- for (auto p : ratio->get_chunk_statistics()) {
- auto c = chunk_statistics.find(p.first);
- if (c != chunk_statistics.end()) {
- c->second.first += p.second.first;
- } else {
- chunk_statistics.insert(p);
- }
- }
- }
-
- if (debug) {
- for (auto p : chunk_statistics) {
- cout << " -- " << std::endl;
- cout << " key: " << p.first << std::endl;
- cout << " count: " << p.second.first << std::endl;
- cout << " chunk_size: " << p.second.second << std::endl;
- dedup_size += p.second.second;
- cout << " -- " << std::endl;
- }
- } else {
- for (auto p : chunk_statistics) {
- dedup_size += p.second.second;
- }
-
- }
+ */
+ uint64_t examined_objects = 0;
+ uint64_t examined_bytes = 0;
for (auto &et : estimate_threads) {
- total_size += et->get_total_bytes();
examined_objects += et->get_examined_objects();
- if (!total_objects) {
- total_objects = et->get_total_objects();
- }
- }
-
- cout << " result: " << total_size << " | " << dedup_size << " (total size | deduped size) " << std::endl;
- cout << " Dedup ratio: " << (100 - (double)(dedup_size)/total_size*100) << " % " << std::endl;
- cout << " Examined objects: " << examined_objects << std::endl;
- cout << " Total objects: " << total_objects << std::endl;
+ examined_bytes += et->get_examined_bytes();
+ }
+
+ auto f = Formatter::create("json-pretty");
+ f->open_object_section("results");
+ f->dump_string("chunk_algo", chunk_algo);
+ f->open_array_section("chunk_sizes");
+ for (auto& i : dedup_estimates) {
+ f->dump_object("chunker", i.second);
+ }
+ f->close_section();
+
+ f->open_object_section("summary");
+ f->dump_unsigned("examined_objects", examined_objects);
+ f->dump_unsigned("examined_bytes", examined_bytes);
+ /*
+ f->dump_unsigned("total_objects", total_objects);
+ f->dump_unsigned("total_bytes", total_bytes);
+ f->dump_float("examined_ratio", (float)examined_bytes / (float)total_bytes);
+ */
+ f->close_section();
+ f->close_section();
+ f->flush(cout);
}
static void handle_signal(int signum)
}
}
-void EstimateDedupRatio::print_status(Formatter *f, ostream &out)
-{
- if (f) {
- f->open_array_section("estimate_dedup_ratio");
- f->dump_string("PID", stringify(get_pid()));
- for (auto p : local_chunk_statistics) {
- f->open_object_section("fingerprint object");
- f->dump_string("fingerprint", p.first);
- f->dump_string("count", stringify(p.second.first));
- f->dump_string("chunk_size", stringify(p.second.second));
- f->close_section();
- }
- f->close_section();
- f->open_object_section("Status");
- f->dump_string("Total bytes", stringify(total_bytes));
- f->dump_string("Examined objectes", stringify(examined_objects));
- f->close_section();
- f->flush(out);
- cout << std::endl;
- }
-}
-
void EstimateDedupRatio::estimate_dedup_ratio()
{
ObjectCursor shard_start;
}
}
if (m_stop) {
- Formatter *formatter = Formatter::create("json-pretty");
- print_status(formatter, cout);
- delete formatter;
return;
}
offset += ret;
bl.claim_append(t);
}
-
- // chunk
- vector<pair<uint64_t, uint64_t>> chunks;
- cdc->calc_chunks(bl, &chunks);
- for (auto& p : chunks) {
- bufferlist chunk;
- chunk.substr_of(bl, p.first, p.second);
- add_chunk_fp_to_stat(chunk);
- if (debug) {
- cout << " " << oid << " " << p.first << "~" << p.second << std::endl;
+ examined_objects++;
+ examined_bytes += bl.length();
+
+ // do the chunking
+ for (auto& i : dedup_estimates) {
+ vector<pair<uint64_t, uint64_t>> chunks;
+ i.second.cdc->calc_chunks(bl, &chunks);
+ for (auto& p : chunks) {
+ bufferlist chunk;
+ chunk.substr_of(bl, p.first, p.second);
+ i.second.add_chunk(chunk, fp_algo);
+ if (debug) {
+ cout << " " << oid << " " << p.first << "~" << p.second << std::endl;
+ }
}
}
-
- std::unique_lock l{m_lock};
-
- examined_objects++;
- }
- }
-}
-
-void EstimateDedupRatio::add_chunk_fp_to_stat(bufferlist &chunk)
-{
- string fp;
- if (fp_algo == "sha1") {
- sha1_digest_t sha1_val = crypto::digest<crypto::SHA1>(chunk);
- fp = sha1_val.to_str();
- } else if (fp_algo == "sha256") {
- sha256_digest_t sha256_val = crypto::digest<crypto::SHA256>(chunk);
- fp = sha256_val.to_str();
- } else if (fp_algo == "sha512") {
- sha512_digest_t sha512_val = crypto::digest<crypto::SHA512>(chunk);
- fp = sha512_val.to_str();
- } else {
- ceph_assert(0 == "no support fingerperint algorithm");
- }
-
- auto p = local_chunk_statistics.find(fp);
- if (p != local_chunk_statistics.end()) {
- p->second.first++;
- if (p->second.second != chunk.length()) {
- cerr << "warning: hash collision on " << fp << ": was " << p->second.second
- << " now " << chunk.length() << std::endl;
}
- } else {
- local_chunk_statistics[fp] = make_pair(1, chunk.length());
}
- total_bytes += chunk.length();
}
void ChunkScrub::chunk_scrub_common()
Rados rados;
IoCtx io_ctx;
std::string chunk_algo;
- string fp_algo;
+ string fp_algo = "sha1";
string pool_name;
uint64_t chunk_size = 0;
+ uint64_t min_chunk_size = 8192;
+ uint64_t max_chunk_size = 4*1024*1024;
unsigned max_thread = default_max_thread;
uint32_t report_period = default_report_period;
uint64_t max_read_size = default_op_size;
cerr << "unrecognized fingerprint-algorithm " << fp_algo << std::endl;
exit(1);
}
- } else {
- cerr << "must specify fingerprint-algorithm" << std::endl;
- exit(1);
}
i = opts.find("chunk-size");
if (rados_sistrtoll(i, &chunk_size)) {
return -EINVAL;
}
- } else {
- cerr << "must specify chunk-size" << std::endl;
- exit(1);
+ }
+
+ i = opts.find("min-chunk-size");
+ if (i != opts.end()) {
+ if (rados_sistrtoll(i, &min_chunk_size)) {
+ return -EINVAL;
+ }
+ }
+ i = opts.find("max-chunk-size");
+ if (i != opts.end()) {
+ if (rados_sistrtoll(i, &max_chunk_size)) {
+ return -EINVAL;
+ }
}
i = opts.find("max-thread");
goto out;
}
+ // set up chunkers
+ if (chunk_size) {
+ dedup_estimates.emplace(std::piecewise_construct,
+ std::forward_as_tuple(chunk_size),
+ std::forward_as_tuple(chunk_algo, cbits(chunk_size)-1));
+ } else {
+ for (size_t cs = min_chunk_size; cs <= max_chunk_size; cs *= 2) {
+ dedup_estimates.emplace(std::piecewise_construct,
+ std::forward_as_tuple(cs),
+ std::forward_as_tuple(chunk_algo, cbits(cs)-1));
+ }
+ }
+
glock.lock();
begin = io_ctx.object_list_begin();
end = io_ctx.object_list_end();
p->join();
}
- print_dedup_estimate(debug);
+ print_dedup_estimate(chunk_algo, debug);
out:
return (ret < 0) ? 1 : 0;
opts["max-seconds"] = val;
} else if (ceph_argparse_witharg(args, i, &val, "--max-seconds", (char*)NULL)) {
opts["max-seconds"] = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--min-chunk-size", (char*)NULL)) {
+ opts["min-chunk-size"] = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--max-chunk-size", (char*)NULL)) {
+ opts["max-chunk-size"] = val;
} else if (ceph_argparse_flag(args, i, "--debug", (char*)NULL)) {
opts["debug"] = "true";
} else {