using namespace librados;
unsigned default_op_size = 1 << 22;
+unsigned default_max_thread = 2;
map< string, pair <uint64_t, uint64_t> > chunk_statistics; // < key, <count, chunk_size> >
+Mutex glock("chunk_statistics::Locker");
void usage()
{
cout << " result: " << total_size << " / " << dedup_size << " (total size / dedup size) " << std::endl;
}
+class EstimateThread : public Thread
+{
+ void* entry() override;
+ IoCtx io_ctx;
+ int n;
+ int m;
+ ObjectCursor begin;
+ ObjectCursor end;
+ string chunk_algo;
+ string fp_algo;
+ uint64_t chunk_size;
+ map< string, pair <uint64_t, uint64_t> > local_chunk_statistics; // < key, <count, chunk_size> >
+ typedef void (EstimateThread::*entry_func)();
+ entry_func func;
+
+ public:
+ EstimateThread(IoCtx& io_ctx, int n, int m, ObjectCursor begin, ObjectCursor end,
+ string chunk_algo, string fp_algo, uint64_t chunk_size):
+ io_ctx(io_ctx), n(n), m(m), begin(begin), end(end), chunk_algo(chunk_algo), fp_algo(fp_algo),
+ chunk_size(chunk_size) {}
+
+ void set_estimate_dedup_ratio() {
+ func = &EstimateThread::estimate_dedup_ratio;
+ }
+ void estimate_dedup_ratio();
+
+};
+
+vector<std::unique_ptr<EstimateThread>> estimate_threads;
+
+void* EstimateThread::entry() {
+ if (func) {
+ (this->*func)();
+ }
+ return NULL;
+}
+
+void EstimateThread::estimate_dedup_ratio()
+{
+ ObjectCursor shard_start;
+ ObjectCursor shard_end;
+ unsigned op_size = default_op_size;
+ int ret;
+
+ io_ctx.object_list_slice(
+ begin,
+ end,
+ n,
+ m,
+ &shard_start,
+ &shard_end);
+
+ ObjectCursor c(shard_start);
+ while(c < shard_end)
+ {
+ std::vector<ObjectItem> result;
+ int r = io_ctx.object_list(c, shard_end, 12, {}, &result, &c);
+ if (r < 0 ){
+ cerr << "error object_list : " << cpp_strerror(r) << std::endl;
+ return;
+ }
+
+ for (const auto & i : result) {
+ const auto &oid = i.oid;
+ uint64_t offset = 0;
+ while (true) {
+ cout << __func__ << oid << std::endl;
+ bufferlist outdata;
+ ret = io_ctx.read(oid, outdata, op_size, offset);
+ if (ret <= 0) {
+ break;
+ }
+
+ if (chunk_algo == "fixed") {
+ if (fp_algo == "sha1") {
+ uint64_t c_offset = 0;
+ while (c_offset < outdata.length()) {
+ bufferlist chunk;
+ if (outdata.length() - c_offset > chunk_size) {
+ bufferptr bptr(chunk_size);
+ chunk.push_back(std::move(bptr));
+ chunk.copy_in(0, chunk_size, outdata.c_str());
+ } else {
+ bufferptr bptr(outdata.length() - c_offset);
+ chunk.push_back(std::move(bptr));
+ chunk.copy_in(0, outdata.length() - c_offset, outdata.c_str());
+ }
+ sha1_digest_t sha1_val = chunk.sha1();
+ string fp = sha1_val.to_str();
+ auto p = local_chunk_statistics.find(fp);
+ if (p != local_chunk_statistics.end()) {
+ uint64_t count = p->second.first;
+ count++;
+ local_chunk_statistics[fp] = make_pair(count, chunk.length());
+ } else {
+ local_chunk_statistics[fp] = make_pair(1, chunk.length());
+ }
+ c_offset = c_offset + chunk_size;
+ }
+ } else {
+ ceph_assert(0 == "no support fingerprint algorithm");
+ }
+ } else {
+ ceph_assert(0 == "no support chunk algorithm");
+ }
+
+ if (outdata.length() < op_size) {
+ break;
+ }
+ offset += outdata.length();
+ }
+ }
+ }
+
+ Mutex::Locker l(glock);
+ chunk_statistics.insert(local_chunk_statistics.begin(), local_chunk_statistics.end());
+}
+
int estimate_dedup_ratio(const std::map < std::string, std::string > &opts,
std::vector<const char*> &nargs)
{
string fp_algo;
string pool_name;
uint64_t chunk_size = 0;
- unsigned op_size = default_op_size;
+ unsigned max_thread = default_max_thread;
int ret;
std::map<std::string, std::string>::const_iterator i;
usage_exit();
}
+ i = opts.find("max-thread");
+ if (i != opts.end()) {
+ if (rados_sistrtoll(i, &max_thread)) {
+ return -EINVAL;
+ }
+ }
+
i = opts.find("pgid");
boost::optional<pg_t> pgid(i != opts.end(), pg_t());
goto out;
}
- {
- try {
- librados::NObjectIterator i = pgid ? io_ctx.nobjects_begin(pgid->ps()) : io_ctx.nobjects_begin();
- librados::NObjectIterator i_end = io_ctx.nobjects_end();
- for (; i != i_end; ++i) {
- uint64_t offset = 0;
- while (true) {
- bufferlist outdata;
- ret = io_ctx.read(i->get_oid(), outdata, op_size, offset);
- if (ret <= 0) {
- break;
- }
+ for (unsigned i = 0; i < max_thread; i++) {
+ ObjectCursor begin = io_ctx.object_list_begin();
+ ObjectCursor end = io_ctx.object_list_end();
+ std::unique_ptr<EstimateThread> ptr (new EstimateThread(io_ctx, i, max_thread, begin, end,
+ chunk_algo, fp_algo, chunk_size));
+ ptr->set_estimate_dedup_ratio();
+ ptr->create("estimate_thread");
+ estimate_threads.push_back(move(ptr));
+ }
- if (chunk_algo == "fixed") {
- if (fp_algo == "sha1") {
- uint64_t c_offset = 0;
- while (c_offset < outdata.length()) {
- bufferlist chunk;
- if (outdata.length() - c_offset > chunk_size) {
- bufferptr bptr(chunk_size);
- chunk.push_back(std::move(bptr));
- chunk.copy_in(0, chunk_size, outdata.c_str());
- } else {
- bufferptr bptr(outdata.length() - c_offset);
- chunk.push_back(std::move(bptr));
- chunk.copy_in(0, outdata.length() - c_offset, outdata.c_str());
- }
- sha1_digest_t sha1_val = chunk.sha1();
- string fp = sha1_val.to_str();
- auto p = chunk_statistics.find(fp);
- if (p != chunk_statistics.end()) {
- uint64_t count = p->second.first;
- count++;
- chunk_statistics[fp] = make_pair(count, chunk.length());
- } else {
- chunk_statistics[fp] = make_pair(1, chunk.length());
- }
- c_offset = c_offset + chunk_size;
- }
- } else {
- ceph_assert(0 == "no support fingerprint algorithm");
- }
- } else {
- ceph_assert(0 == "no support chunk algorithm");
- }
-
- if (outdata.length() < op_size) {
- break;
- }
- offset += outdata.length();
- }
- }
- }
- catch (const std::runtime_error& e) {
- cerr << e.what() << std::endl;
- ret = -1;
- goto out;
- }
+ for (auto &p : estimate_threads) {
+ p->join();
}
print_dedup_estimate();
opts["chunk-pool"] = val;
} else if (ceph_argparse_witharg(args, i, &val, "--target-ref", (char*)NULL)) {
opts["target-ref"] = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--max-thread", (char*)NULL)) {
+ opts["max-thread"] = val;
} else {
if (val[0] == '-')
usage_exit();