#include "cls/cas/cls_cas_client.h"
#include "include/stringify.h"
#include "global/signal_handler.h"
+#include "common/rabin.h"
using namespace librados;
unsigned default_op_size = 1 << 22;
cout << " usage: [--op <estimate|chunk_scrub|add_chunk_ref|get_chunk_ref>] [--pool <pool_name> ] " << std::endl;
cout << " --object <object_name> " << std::endl;
cout << " --chunk-size <size> chunk-size (byte) " << std::endl;
- cout << " --chunk-algorithm <fixed> " << std::endl;
+ cout << " --chunk-algorithm <fixed|rabin> " << std::endl;
cout << " --fingerprint-algorithm <sha1> " << std::endl;
cout << " --chunk-pool <pool name> " << std::endl;
cout << " --max-thread <threads> " << std::endl;
cout << " --report-perioid <seconds> " << std::endl;
+ cout << " --max-read-size <bytes> " << std::endl;
+ cout << std::endl;
+ cout << " ***these options are for rabin chunk*** " << std::endl;
+ cout << " **rabin_hash = (rabin_hash * rabin_prime + new_byte - old_byte * pow) % (mod_prime) ** " << std::endl;
+ cout << " **default_chunk_mask = 7 ** " << std::endl;
+ cout << " **default_mod_prime = 6148914691236517051 ** " << std::endl;
+ cout << " **default_rabin_prime = 3 ** " << std::endl;
+ cout << " **default_pow = 907234050803559263 ** " << std::endl;
+ cout << " **default_window_size = 48** " << std::endl;
+ cout << " **default_min_chunk = 16384** " << std::endl;
+ cout << " **default_max_chunk = 4194304** " << std::endl;
+ cout << " --mod-prime <uint64_t> " << std::endl;
+ cout << " --rabin-prime <uint64_t> " << std::endl;
+ cout << " --pow <uint64_t> " << std::endl;
+ cout << " --chunk-mask-bit <uint32_t> " << std::endl;
+ cout << " --window-size <uint32_t> " << std::endl;
+ cout << " --min-chunk <uint32_t> " << std::endl;
+ cout << " --max-chunk <uint64_t> " << std::endl;
exit(1);
}
uint64_t total_bytes = 0;
uint64_t examined_objects = 0;
uint64_t total_objects = 0;
+ uint64_t max_read_size = 0;
+ bool debug = false;
#define COND_WAIT_INTERVAL 10
public:
- EstimateThread(IoCtx& io_ctx, int n, int m, ObjectCursor begin, ObjectCursor end, int32_t timeout):
- io_ctx(io_ctx), n(n), m(m), begin(begin), end(end), m_lock("EstimateThread::Locker"), timeout(timeout)
+ EstimateThread(IoCtx& io_ctx, int n, int m, ObjectCursor begin, ObjectCursor end, int32_t timeout,
+ uint64_t max_read_size = default_op_size):
+ io_ctx(io_ctx), n(n), m(m), begin(begin), end(end), m_lock("EstimateThread::Locker"),
+ timeout(timeout), max_read_size(max_read_size)
{}
void signal(int signum) {
Mutex::Locker l(m_lock);
string fp_algo;
uint64_t chunk_size;
map< string, pair <uint64_t, uint64_t> > local_chunk_statistics; // < key, <count, chunk_size> >
+ RabinChunk rabin;
public:
EstimateDedupRatio(IoCtx& io_ctx, int n, int m, ObjectCursor begin, ObjectCursor end,
- string chunk_algo, string fp_algo, uint64_t chunk_size, int32_t timeout):
- EstimateThread(io_ctx, n, m, begin, end, timeout), chunk_algo(chunk_algo), fp_algo(fp_algo),
- chunk_size(chunk_size) { }
+ string chunk_algo, string fp_algo, uint64_t chunk_size, int32_t timeout,
+ uint64_t max_read_size):
+ EstimateThread(io_ctx, n, m, begin, end, timeout, max_read_size), chunk_algo(chunk_algo),
+ fp_algo(fp_algo), chunk_size(chunk_size) { }
void* entry() {
count_objects(io_ctx, begin, end);
void print_status(Formatter *f, ostream &out);
map< string, pair <uint64_t, uint64_t> > &get_chunk_statistics() { return local_chunk_statistics; }
uint64_t fixed_chunk(string oid, uint64_t offset);
+ uint64_t rabin_chunk(string oid, uint64_t offset);
+ void add_chunk_fp_to_stat(bufferlist &chunk);
+ void set_rabin_options(uint64_t mod_prime, uint32_t rabin_prime, uint64_t pow,
+ uint64_t chunk_mask_bit, uint32_t window_size, uint32_t min_chunk,
+ uint64_t max_chunk);
};
class ChunkScrub: public EstimateThread
&shard_end);
ObjectCursor c(shard_start);
- while(c < shard_end)
+ while (c < shard_end)
{
std::vector<ObjectItem> result;
int r = ioctx.object_list(c, shard_end, 12, {}, &result, &c);
&shard_end);
ObjectCursor c(shard_start);
- while(c < shard_end)
+ while (c < shard_end)
{
std::vector<ObjectItem> result;
int r = io_ctx.object_list(c, shard_end, 12, {}, &result, &c);
uint64_t next_offset;
if (chunk_algo == "fixed") {
next_offset = fixed_chunk(oid, offset);
+ } else if (chunk_algo == "rabin") {
+ next_offset = rabin_chunk(oid, offset);
} else {
- // CDC ..
ceph_assert(0 == "no support chunk algorithm");
}
uint64_t EstimateDedupRatio::fixed_chunk(string oid, uint64_t offset)
{
- unsigned op_size = default_op_size;
+ unsigned op_size = max_read_size;
int ret;
bufferlist outdata;
ret = io_ctx.read(oid, outdata, op_size, offset);
return 0;
}
- if (fp_algo == "sha1") {
- uint64_t c_offset = 0;
- while (c_offset < outdata.length()) {
- bufferlist chunk;
- if (outdata.length() - c_offset > chunk_size) {
- bufferptr bptr(chunk_size);
- chunk.push_back(std::move(bptr));
- chunk.copy_in(0, chunk_size, outdata.c_str());
- } else {
- bufferptr bptr(outdata.length() - c_offset);
- chunk.push_back(std::move(bptr));
- chunk.copy_in(0, outdata.length() - c_offset, outdata.c_str());
- }
- sha1_digest_t sha1_val = chunk.sha1();
- string fp = sha1_val.to_str();
- auto p = local_chunk_statistics.find(fp);
- if (p != local_chunk_statistics.end()) {
- uint64_t count = p->second.first;
- count++;
- local_chunk_statistics[fp] = make_pair(count, chunk.length());
- } else {
- local_chunk_statistics[fp] = make_pair(1, chunk.length());
- }
- total_bytes += chunk.length();
- c_offset = c_offset + chunk_size;
+ uint64_t c_offset = 0;
+ while (c_offset < outdata.length()) {
+ bufferlist chunk;
+ if (outdata.length() - c_offset > chunk_size) {
+ bufferptr bptr(chunk_size);
+ chunk.push_back(std::move(bptr));
+ chunk.copy_in(0, chunk_size, outdata.c_str());
+ } else {
+ bufferptr bptr(outdata.length() - c_offset);
+ chunk.push_back(std::move(bptr));
+ chunk.copy_in(0, outdata.length() - c_offset, outdata.c_str());
}
+ add_chunk_fp_to_stat(chunk);
+ c_offset = c_offset + chunk_size;
+ }
+
+ if (outdata.length() < op_size) {
+ return 0;
+ }
+ return outdata.length();
+}
+
+void EstimateDedupRatio::add_chunk_fp_to_stat(bufferlist &chunk)
+{
+ string fp;
+ if (fp_algo == "sha1") {
+ sha1_digest_t sha1_val = chunk.sha1();
+ fp = sha1_val.to_str();
+ } else if (chunk_algo == "rabin") {
+ uint64_t hash = rabin.gen_rabin_hash(chunk.c_str(), 0, chunk.length());
+ fp = to_string(hash);
} else {
ceph_assert(0 == "no support fingerperint algorithm");
}
+ auto p = local_chunk_statistics.find(fp);
+ if (p != local_chunk_statistics.end()) {
+ uint64_t count = p->second.first;
+ count++;
+ local_chunk_statistics[fp] = make_pair(count, chunk.length());
+ } else {
+ local_chunk_statistics[fp] = make_pair(1, chunk.length());
+ }
+ total_bytes += chunk.length();
+}
+
+uint64_t EstimateDedupRatio::rabin_chunk(string oid, uint64_t offset)
+{
+ unsigned op_size = max_read_size;
+ int ret;
+ bufferlist outdata;
+ ret = io_ctx.read(oid, outdata, op_size, offset);
+ if (ret <= 0) {
+ return 0;
+ }
+
+ vector<pair<uint64_t, uint64_t>> chunks;
+ rabin.do_rabin_chunks(outdata, chunks, 0, 0); // use default value
+ for (auto p : chunks) {
+ bufferlist chunk;
+ bufferptr c_data = buffer::create(p.second);
+ c_data.zero();
+ chunk.append(c_data);
+ chunk.copy_in(0, p.second, outdata.c_str() + p.first);
+ add_chunk_fp_to_stat(chunk);
+ cout << " oid: " << oid << " offset: " << p.first << " length: " << p.second << std::endl;
+ }
+
if (outdata.length() < op_size) {
return 0;
}
return outdata.length();
}
+void EstimateDedupRatio::set_rabin_options(uint64_t mod_prime, uint32_t rabin_prime, uint64_t pow,
+ uint64_t chunk_mask_bit, uint32_t window_size,
+ uint32_t min_chunk, uint64_t max_chunk)
+{
+ if (mod_prime != 0) {
+ rabin.set_mod_prime(mod_prime);
+ }
+ if (rabin_prime != 0) {
+ rabin.set_rabin_prime(rabin_prime);
+ }
+ if (pow != 0) {
+ rabin.set_pow(pow);
+ }
+ if (chunk_mask_bit != 0) {
+ int index = rabin.add_rabin_mask(chunk_mask_bit);
+ rabin.set_numbits(index);
+ }
+ if (window_size != 0) {
+ rabin.set_window_size(window_size);
+ }
+ if (min_chunk != 0) {
+ rabin.set_min_chunk(min_chunk);
+ }
+ if (max_chunk != 0) {
+ rabin.set_max_chunk(max_chunk);
+ }
+}
+
void ChunkScrub::chunk_scrub_common()
{
ObjectCursor shard_start;
uint64_t chunk_size = 0;
unsigned max_thread = default_max_thread;
uint32_t report_period = default_report_period;
+ uint64_t max_read_size = default_op_size;
+ uint64_t mod_prime = 0, pow = 0, max_chunk = default_op_size;
+ uint32_t rabin_prime = 0, window_size = 0, chunk_mask_bit = 0, min_chunk = 16384;
int ret;
std::map<std::string, std::string>::const_iterator i;
bool debug = false;
i = opts.find("chunk-algorithm");
if (i != opts.end()) {
chunk_algo = i->second.c_str();
- if (chunk_algo != "fixed") {
+ if (chunk_algo != "fixed" && chunk_algo != "rabin") {
usage_exit();
}
} else {
i = opts.find("fingerprint-algorithm");
if (i != opts.end()) {
fp_algo = i->second.c_str();
- if (fp_algo != "sha1") {
+ if (fp_algo != "sha1" && fp_algo != "rabin") {
usage_exit();
}
} else {
return -EINVAL;
}
} else {
- usage_exit();
+ if (chunk_algo != "rabin") {
+ usage_exit();
+ }
}
i = opts.find("max-thread");
return -EINVAL;
}
}
+ i = opts.find("max-read-size");
+ if (i != opts.end()) {
+ if (rados_sistrtoll(i, &max_read_size)) {
+ return -EINVAL;
+ }
+ }
+ i = opts.find("mod-prime");
+ if (i != opts.end()) {
+ if (rados_sistrtoll(i, &mod_prime)) {
+ return -EINVAL;
+ }
+ }
+ i = opts.find("rabin-prime");
+ if (i != opts.end()) {
+ if (rados_sistrtoll(i, &rabin_prime)) {
+ return -EINVAL;
+ }
+ }
+ i = opts.find("pow");
+ if (i != opts.end()) {
+ if (rados_sistrtoll(i, &pow)) {
+ return -EINVAL;
+ }
+ }
+ i = opts.find("chunk-mask-bit");
+ if (i != opts.end()) {
+ if (rados_sistrtoll(i, &chunk_mask_bit)) {
+ return -EINVAL;
+ }
+ }
+ i = opts.find("window-size");
+ if (i != opts.end()) {
+ if (rados_sistrtoll(i, &window_size)) {
+ return -EINVAL;
+ }
+ }
+ i = opts.find("min-chunk");
+ if (i != opts.end()) {
+ if (rados_sistrtoll(i, &min_chunk)) {
+ return -EINVAL;
+ }
+ }
+ i = opts.find("max-chunk");
+ if (i != opts.end()) {
+ if (rados_sistrtoll(i, &max_chunk)) {
+ return -EINVAL;
+ }
+ }
i = opts.find("debug");
if (i != opts.end()) {
debug = true;
for (unsigned i = 0; i < max_thread; i++) {
std::unique_ptr<EstimateThread> ptr (new EstimateDedupRatio(io_ctx, i, max_thread, begin, end,
chunk_algo, fp_algo, chunk_size,
- report_period));
+ report_period, max_read_size));
+ if (chunk_algo == "rabin") {
+ EstimateDedupRatio *ratio = NULL;
+ ratio = dynamic_cast<EstimateDedupRatio*>(ptr.get());
+ ratio->set_rabin_options(mod_prime, rabin_prime, pow, chunk_mask_bit, window_size,
+ min_chunk, max_chunk);
+ }
ptr->create("estimate_thread");
estimate_threads.push_back(move(ptr));
}
opts["max-thread"] = val;
} else if (ceph_argparse_witharg(args, i, &val, "--report-period", (char*)NULL)) {
opts["report-period"] = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--max-read-size", (char*)NULL)) {
+ opts["max-read-size"] = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--mod-prime", (char*)NULL)) {
+ opts["mod-prime"] = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--rabin-prime", (char*)NULL)) {
+ opts["rabin-prime"] = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--pow", (char*)NULL)) {
+ opts["pow"] = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--chunk-mask-bit", (char*)NULL)) {
+ opts["chunk-mask-bit"] = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--window-size", (char*)NULL)) {
+ opts["window-size"] = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--min-chunk", (char*)NULL)) {
+ opts["min-chunk"] = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--max-chunk", (char*)NULL)) {
+ opts["max-chunk"] = val;
} else if (ceph_argparse_flag(args, i, "--debug", (char*)NULL)) {
opts["debug"] = "true";
} else {