From: myoungwon oh Date: Sun, 18 Nov 2018 07:01:39 +0000 (+0900) Subject: src/tools: use the slice thing and make parallel (chunk_scrub) X-Git-Tag: v14.1.0~83^2~6 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=68f452da52c15920547b83a799f19ff7f5ddac02;p=ceph.git src/tools: use the slice thing and make parallel (chunk_scrub) Signed-off-by: Myoungwon Oh --- diff --git a/src/tools/ceph_dedup_tool.cc b/src/tools/ceph_dedup_tool.cc index da3ac1ae06f..2b4189083ab 100644 --- a/src/tools/ceph_dedup_tool.cc +++ b/src/tools/ceph_dedup_tool.cc @@ -105,20 +105,25 @@ class EstimateThread : public Thread string chunk_algo; string fp_algo; uint64_t chunk_size; + IoCtx chunk_io_ctx; map< string, pair > local_chunk_statistics; // < key, > typedef void (EstimateThread::*entry_func)(); entry_func func; public: EstimateThread(IoCtx& io_ctx, int n, int m, ObjectCursor begin, ObjectCursor end, - string chunk_algo, string fp_algo, uint64_t chunk_size): + string chunk_algo, string fp_algo, uint64_t chunk_size, IoCtx chunk_io_ctx): io_ctx(io_ctx), n(n), m(m), begin(begin), end(end), chunk_algo(chunk_algo), fp_algo(fp_algo), - chunk_size(chunk_size) {} + chunk_size(chunk_size), chunk_io_ctx(chunk_io_ctx) {} void set_estimate_dedup_ratio() { func = &EstimateThread::estimate_dedup_ratio; } + void set_chunk_scrub_common() { + func = &EstimateThread::chunk_scrub_common; + } void estimate_dedup_ratio(); + void chunk_scrub_common(); }; @@ -160,7 +165,6 @@ void EstimateThread::estimate_dedup_ratio() const auto &oid = i.oid; uint64_t offset = 0; while (true) { - cout << __func__ << oid << std::endl; bufferlist outdata; ret = io_ctx.read(oid, outdata, op_size, offset); if (ret <= 0) { @@ -212,6 +216,58 @@ void EstimateThread::estimate_dedup_ratio() chunk_statistics.insert(local_chunk_statistics.begin(), local_chunk_statistics.end()); } +void EstimateThread::chunk_scrub_common() +{ + ObjectCursor shard_start; + ObjectCursor shard_end; + int ret; + + chunk_io_ctx.object_list_slice( + begin, + end, + n, + m, + &shard_start, + &shard_end); + + ObjectCursor c(shard_start); + while(c < shard_end) + { + std::vector result; + int r = chunk_io_ctx.object_list(c, shard_end, 12, {}, &result, &c); + if (r < 0 ){ + cerr << "error object_list : " << cpp_strerror(r) << std::endl; + return; + } + + for (const auto & i : result) { + auto oid = i.oid; + set refs; + set real_refs; + ret = cls_chunk_refcount_read(chunk_io_ctx, oid, &refs); + if (ret < 0) { + continue; + } + + for (auto pp : refs) { + ret = cls_chunk_has_chunk(io_ctx, pp.oid.name, oid); + if (ret != -ENOENT) { + real_refs.insert(pp); + } + } + + if (refs.size() != real_refs.size()) { + ObjectWriteOperation op; + cls_chunk_refcount_set(op, real_refs); + ret = chunk_io_ctx.operate(oid, &op); + if (ret < 0) { + continue; + } + } + } + } +} + int estimate_dedup_ratio(const std::map < std::string, std::string > &opts, std::vector &nargs) { @@ -300,7 +356,7 @@ int estimate_dedup_ratio(const std::map < std::string, std::string > &opts, ObjectCursor begin = io_ctx.object_list_begin(); ObjectCursor end = io_ctx.object_list_end(); std::unique_ptr ptr (new EstimateThread(io_ctx, i, max_thread, begin, end, - chunk_algo, fp_algo, chunk_size)); + chunk_algo, fp_algo, chunk_size, IoCtx())); ptr->set_estimate_dedup_ratio(); ptr->create("estimate_thread"); estimate_threads.push_back(move(ptr)); @@ -323,6 +379,7 @@ int chunk_scrub_common(const std::map < std::string, std::string > &opts, std::string object_name, target_object_name; string pool_name, chunk_pool_name, op_name; int ret; + unsigned max_thread = default_max_thread; std::map::const_iterator i; i = opts.find("pool"); @@ -344,6 +401,12 @@ int chunk_scrub_common(const std::map < std::string, std::string > &opts, } else { usage_exit(); } + i = opts.find("max-thread"); + if (i != opts.end()) { + if (rados_sistrtoll(i, &max_thread)) { + return -EINVAL; + } + } i = opts.find("pgid"); boost::optional pgid(i != opts.end(), pg_t()); @@ -426,7 +489,6 @@ int chunk_scrub_common(const std::map < std::string, std::string > &opts, } else { usage_exit(); } - cout << __func__ << " " << __LINE__ << std::endl; set refs; cout << " refs: " << std::endl; ret = cls_chunk_refcount_read(chunk_io_ctx, object_name, &refs); @@ -437,41 +499,18 @@ int chunk_scrub_common(const std::map < std::string, std::string > &opts, return ret; } - { - try { - librados::NObjectIterator i = pgid ? chunk_io_ctx.nobjects_begin(pgid->ps()) : chunk_io_ctx.nobjects_begin(); - librados::NObjectIterator i_end = chunk_io_ctx.nobjects_end(); - for (; i != i_end; ++i) { - set refs; - set real_refs; - string oid = i->get_oid(); - ret = cls_chunk_refcount_read(chunk_io_ctx, oid, &refs); - if (ret < 0) { - continue; - } - - for (auto pp : refs) { - ret = cls_chunk_has_chunk(io_ctx, pp.oid.name, oid); - if (ret != -ENOENT) { - real_refs.insert(pp); - } - } + for (unsigned i = 0; i < max_thread; i++) { + ObjectCursor begin = io_ctx.object_list_begin(); + ObjectCursor end = io_ctx.object_list_end(); + std::unique_ptr ptr (new EstimateThread(io_ctx, i, max_thread, begin, end, + "", "", 0, chunk_io_ctx)); + ptr->set_chunk_scrub_common(); + ptr->create("estimate_thread"); + estimate_threads.push_back(move(ptr)); + } - if (refs.size() != real_refs.size()) { - ObjectWriteOperation op; - cls_chunk_refcount_set(op, real_refs); - ret = chunk_io_ctx.operate(oid, &op); - if (ret < 0) { - continue; - } - } - } - } - catch (const std::runtime_error& e) { - cerr << e.what() << std::endl; - ret = -1; - goto out; - } + for (auto &p : estimate_threads) { + p->join(); } out: