From: myoungwon oh Date: Sun, 18 Nov 2018 04:36:05 +0000 (+0900) Subject: src/tools: use the slice thing and make parallel X-Git-Tag: v14.1.0~83^2~8 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d2fb0c2c75bd5d253f1fe380f68f544ef6b06670;p=ceph.git src/tools: use the slice thing and make parallel Signed-off-by: Myoungwon Oh --- diff --git a/src/tools/ceph_dedup_tool.cc b/src/tools/ceph_dedup_tool.cc index 4a960e5e4490..da3ac1ae06f3 100644 --- a/src/tools/ceph_dedup_tool.cc +++ b/src/tools/ceph_dedup_tool.cc @@ -45,7 +45,9 @@ using namespace librados; unsigned default_op_size = 1 << 22; +unsigned default_max_thread = 2; map< string, pair > chunk_statistics; // < key, > +Mutex glock("chunk_statistics::Locker"); void usage() { @@ -92,6 +94,124 @@ static void print_dedup_estimate() cout << " result: " << total_size << " / " << dedup_size << " (total size / dedup size) " << std::endl; } +class EstimateThread : public Thread +{ + void* entry() override; + IoCtx io_ctx; + int n; + int m; + ObjectCursor begin; + ObjectCursor end; + string chunk_algo; + string fp_algo; + uint64_t chunk_size; + map< string, pair > local_chunk_statistics; // < key, > + typedef void (EstimateThread::*entry_func)(); + entry_func func; + + public: + EstimateThread(IoCtx& io_ctx, int n, int m, ObjectCursor begin, ObjectCursor end, + string chunk_algo, string fp_algo, uint64_t chunk_size): + io_ctx(io_ctx), n(n), m(m), begin(begin), end(end), chunk_algo(chunk_algo), fp_algo(fp_algo), + chunk_size(chunk_size) {} + + void set_estimate_dedup_ratio() { + func = &EstimateThread::estimate_dedup_ratio; + } + void estimate_dedup_ratio(); + +}; + +vector> estimate_threads; + +void* EstimateThread::entry() { + if (func) { + (this->*func)(); + } + return NULL; +} + +void EstimateThread::estimate_dedup_ratio() +{ + ObjectCursor shard_start; + ObjectCursor shard_end; + unsigned op_size = default_op_size; + int ret; + + io_ctx.object_list_slice( + begin, + end, + n, + m, + &shard_start, + &shard_end); + + ObjectCursor c(shard_start); + while(c < shard_end) + { + std::vector result; + int r = io_ctx.object_list(c, shard_end, 12, {}, &result, &c); + if (r < 0 ){ + cerr << "error object_list : " << cpp_strerror(r) << std::endl; + return; + } + + for (const auto & i : result) { + const auto &oid = i.oid; + uint64_t offset = 0; + while (true) { + cout << __func__ << oid << std::endl; + bufferlist outdata; + ret = io_ctx.read(oid, outdata, op_size, offset); + if (ret <= 0) { + break; + } + + if (chunk_algo == "fixed") { + if (fp_algo == "sha1") { + uint64_t c_offset = 0; + while (c_offset < outdata.length()) { + bufferlist chunk; + if (outdata.length() - c_offset > chunk_size) { + bufferptr bptr(chunk_size); + chunk.push_back(std::move(bptr)); + chunk.copy_in(0, chunk_size, outdata.c_str()); + } else { + bufferptr bptr(outdata.length() - c_offset); + chunk.push_back(std::move(bptr)); + chunk.copy_in(0, outdata.length() - c_offset, outdata.c_str()); + } + sha1_digest_t sha1_val = chunk.sha1(); + string fp = sha1_val.to_str(); + auto p = local_chunk_statistics.find(fp); + if (p != local_chunk_statistics.end()) { + uint64_t count = p->second.first; + count++; + local_chunk_statistics[fp] = make_pair(count, chunk.length()); + } else { + local_chunk_statistics[fp] = make_pair(1, chunk.length()); + } + c_offset = c_offset + chunk_size; + } + } else { + ceph_assert(0 == "no support fingerprint algorithm"); + } + } else { + ceph_assert(0 == "no support chunk algorithm"); + } + + if (outdata.length() < op_size) { + break; + } + offset += outdata.length(); + } + } + } + + Mutex::Locker l(glock); + chunk_statistics.insert(local_chunk_statistics.begin(), local_chunk_statistics.end()); +} + int estimate_dedup_ratio(const std::map < std::string, std::string > &opts, std::vector &nargs) { @@ -102,7 +222,7 @@ int estimate_dedup_ratio(const std::map < std::string, std::string > &opts, string fp_algo; string pool_name; uint64_t chunk_size = 0; - unsigned op_size = default_op_size; + unsigned max_thread = default_max_thread; int ret; std::map::const_iterator i; @@ -143,6 +263,13 @@ int estimate_dedup_ratio(const std::map < std::string, std::string > &opts, usage_exit(); } + i = opts.find("max-thread"); + if (i != opts.end()) { + if (rados_sistrtoll(i, &max_thread)) { + return -EINVAL; + } + } + i = opts.find("pgid"); boost::optional pgid(i != opts.end(), pg_t()); @@ -169,64 +296,18 @@ int estimate_dedup_ratio(const std::map < std::string, std::string > &opts, goto out; } - { - try { - librados::NObjectIterator i = pgid ? io_ctx.nobjects_begin(pgid->ps()) : io_ctx.nobjects_begin(); - librados::NObjectIterator i_end = io_ctx.nobjects_end(); - for (; i != i_end; ++i) { - uint64_t offset = 0; - while (true) { - bufferlist outdata; - ret = io_ctx.read(i->get_oid(), outdata, op_size, offset); - if (ret <= 0) { - break; - } + for (unsigned i = 0; i < max_thread; i++) { + ObjectCursor begin = io_ctx.object_list_begin(); + ObjectCursor end = io_ctx.object_list_end(); + std::unique_ptr ptr (new EstimateThread(io_ctx, i, max_thread, begin, end, + chunk_algo, fp_algo, chunk_size)); + ptr->set_estimate_dedup_ratio(); + ptr->create("estimate_thread"); + estimate_threads.push_back(move(ptr)); + } - if (chunk_algo == "fixed") { - if (fp_algo == "sha1") { - uint64_t c_offset = 0; - while (c_offset < outdata.length()) { - bufferlist chunk; - if (outdata.length() - c_offset > chunk_size) { - bufferptr bptr(chunk_size); - chunk.push_back(std::move(bptr)); - chunk.copy_in(0, chunk_size, outdata.c_str()); - } else { - bufferptr bptr(outdata.length() - c_offset); - chunk.push_back(std::move(bptr)); - chunk.copy_in(0, outdata.length() - c_offset, outdata.c_str()); - } - sha1_digest_t sha1_val = chunk.sha1(); - string fp = sha1_val.to_str(); - auto p = chunk_statistics.find(fp); - if (p != chunk_statistics.end()) { - uint64_t count = p->second.first; - count++; - chunk_statistics[fp] = make_pair(count, chunk.length()); - } else { - chunk_statistics[fp] = make_pair(1, chunk.length()); - } - c_offset = c_offset + chunk_size; - } - } else { - ceph_assert(0 == "no support fingerprint algorithm"); - } - } else { - ceph_assert(0 == "no support chunk algorithm"); - } - - if (outdata.length() < op_size) { - break; - } - offset += outdata.length(); - } - } - } - catch (const std::runtime_error& e) { - cerr << e.what() << std::endl; - ret = -1; - goto out; - } + for (auto &p : estimate_threads) { + p->join(); } print_dedup_estimate(); @@ -439,6 +520,8 @@ int main(int argc, const char **argv) opts["chunk-pool"] = val; } else if (ceph_argparse_witharg(args, i, &val, "--target-ref", (char*)NULL)) { opts["target-ref"] = val; + } else if (ceph_argparse_witharg(args, i, &val, "--max-thread", (char*)NULL)) { + opts["max-thread"] = val; } else { if (val[0] == '-') usage_exit();