": perform a chunk dedup---deduplicate only a chunk, which is a part of object.")
("op object-dedup --pool <POOL> --object <OID> --chunk-pool <POOL> --fingerprint-algorithm <FP> --dedup-cdc-chunk-size <CHUNK_SIZE> [--snap]",
": perform a object dedup---deduplicate the entire object, not a chunk. Related snapshots are also deduplicated if --snap is given")
+ ("op sample-dedup --pool <POOL> --chunk-pool <POOL> --chunk-algorithm <ALGO> --fingerprint-algorithm <FP>",
+ ": perform a sample dedup---make crawling threads which crawl objeccts in base pool and deduplicate them based on their deduplication efficiency")
;
po::options_description op_desc("Opational arguments");
op_desc.add_options()
("snap", ": deduplciate snapshotted object")
("debug", ": enable debug")
("pgid", ": set pgid")
+ ("chunk-dedup-threshold", po::value<uint32_t>(), ": set the threshold for chunk dedup (number of duplication) ")
;
desc.add(op_desc);
return desc;
io_ctx(io_ctx), n(n), m(m), begin(begin), end(end),
report_period(report_period), total_objects(num_objects), max_read_size(max_read_size)
{}
+ CrawlerThread() { }
+
void signal(int signum) {
std::lock_guard l{m_lock};
m_stop = true;
cout << "--done--" << std::endl;
}
+using AioCompRef = unique_ptr<AioCompletion>;
+
+class SampleDedupWorkerThread : public CrawlerThread
+{
+public:
+ struct chunk_t {
+ string oid = "";
+ size_t start = 0;
+ size_t size = 0;
+ string fingerprint = "";
+ bufferlist data;
+ };
+
+ class FpStore {
+ public:
+ using dup_count_t = ssize_t;
+
+ bool find(string& fp) {
+ std::shared_lock lock(fingerprint_lock);
+ auto found_item = fp_map.find(fp);
+ if (found_item != fp_map.end()) {
+ return true;
+ }
+ return false;
+ }
+
+ // return true if the chunk is duplicate
+ bool add(chunk_t& chunk) {
+ std::unique_lock lock(fingerprint_lock);
+ auto found_iter = fp_map.find(chunk.fingerprint);
+ if (found_iter == fp_map.end()) {
+ auto ret = fp_map.insert({chunk.fingerprint, 1});
+ found_iter = ret.first;
+ }
+ auto &target = found_iter->second;
+ target++;
+ if (target >= dedup_threshold && dedup_threshold != -1) {
+ return true;
+ }
+ return false;
+ }
+
+ void init(size_t dedup_threshold_) {
+ std::unique_lock lock(fingerprint_lock);
+ fp_map.clear();
+ dedup_threshold = dedup_threshold_;
+ }
+ FpStore(size_t chunk_threshold) : dedup_threshold(chunk_threshold) { }
+
+ private:
+ ssize_t dedup_threshold = -1;
+ std::unordered_map<std::string, dup_count_t> fp_map;
+ std::shared_mutex fingerprint_lock;
+ };
+
+ struct SampleDedupGlobal {
+ FpStore fp_store;
+ double object_dedup_threshold = -1;
+ SampleDedupGlobal(
+ int chunk_threshold) :
+ fp_store(chunk_threshold) { }
+ };
+
+ SampleDedupWorkerThread(
+ IoCtx &io_ctx,
+ IoCtx &chunk_io_ctx,
+ ObjectCursor begin,
+ ObjectCursor end,
+ size_t chunk_size,
+ std::string &fp_algo,
+ std::string &chunk_algo,
+ SampleDedupGlobal &sample_dedup_global) :
+ io_ctx(io_ctx),
+ chunk_io_ctx(chunk_io_ctx),
+ chunk_size(chunk_size),
+ fp_type(pg_pool_t::get_fingerprint_from_str(fp_algo)),
+ chunk_algo(chunk_algo),
+ sample_dedup_global(sample_dedup_global),
+ begin(begin),
+ end(end) { }
+
+ ~SampleDedupWorkerThread() { };
+
+protected:
+ void* entry() override {
+ crawl();
+ return nullptr;
+ }
+
+private:
+ void crawl();
+ std::tuple<std::vector<ObjectItem>, ObjectCursor> get_objects(
+ ObjectCursor current,
+ ObjectCursor end,
+ size_t max_object_count);
+ std::vector<size_t> sample_object(size_t count);
+ void try_dedup_and_accumulate_result(ObjectItem &object);
+ bool ok_to_dedup_all();
+ void do_object_dedup(ObjectItem &object);
+ int do_chunk_dedup(chunk_t &chunk);
+ void mark_non_dedup(ObjectCursor start, ObjectCursor end);
+ bufferlist read_object(ObjectItem &object);
+ std::vector<std::tuple<bufferlist, pair<uint64_t, uint64_t>>> do_cdc(
+ ObjectItem &object,
+ bufferlist &data);
+ std::string generate_fingerprint(bufferlist chunk_data);
+ AioCompRef do_async_evict(string oid);
+
+ IoCtx io_ctx;
+ IoCtx chunk_io_ctx;
+ std::list<chunk_t> duplicable_chunks;
+ size_t total_duplicated_size = 0;
+ size_t total_object_size = 0;
+
+ std::set<std::string> oid_for_evict;
+ size_t chunk_size = 0;
+ pg_pool_t::fingerprint_t fp_type = pg_pool_t::TYPE_FINGERPRINT_NONE;
+ std::string chunk_algo;
+ SampleDedupGlobal &sample_dedup_global;
+ ObjectCursor begin;
+ ObjectCursor end;
+};
+
+void SampleDedupWorkerThread::crawl()
+{
+ cout << "new iteration" << std::endl;
+
+ for (ObjectCursor current_object = begin;
+ current_object < end;) {
+ std::vector<ObjectItem> objects;
+ // Get the list of object IDs to deduplicate
+ std::tie(objects, current_object) = get_objects(current_object, end, 100);
+
+ // Pick few objects to be processed. Crawling mode decides how many
+ // objects to pick (sampling ratio). Lower sampling ratio makes crawler
+ // have lower crawling overhead but find less duplication.
+ std::set<size_t> sampled_indexes = sample_object(objects.size());
+ for (size_t index : sampled_indexes) {
+ ObjectItem target = objects[index];
+ try_dedup_and_accumulate_result(target);
+ }
+ }
+
+ vector<AioCompRef> evict_completions(oid_for_evict.size());
+ int i = 0;
+ for (auto &oid : oid_for_evict) {
+ auto completion = do_async_evict(oid);
+ evict_completions[i] = move(completion);
+ i++;
+ }
+ for (auto &completion : evict_completions) {
+ completion->wait_for_complete();
+ }
+ cout << "done iteration" << std::endl;
+}
+
+AioCompRef SampleDedupWorkerThread::do_async_evict(string oid)
+{
+ Rados rados;
+ ObjectReadOperation op_tier;
+ AioCompRef completion(rados.aio_create_completion());
+ op_tier.tier_evict();
+ io_ctx.aio_operate(
+ oid,
+ completion.get(),
+ &op_tier,
+ NULL);
+ return completion;
+}
+
+std::tuple<std::vector<ObjectItem>, ObjectCursor> SampleDedupWorkerThread::get_objects(
+ ObjectCursor current, ObjectCursor end, size_t max_object_count)
+{
+ std::vector<ObjectItem> objects;
+ ObjectCursor next;
+ int ret = io_ctx.object_list(
+ current,
+ end,
+ max_object_count,
+ {},
+ &objects,
+ &next);
+ if (ret < 0 ) {
+ cerr << "error object_list" << std::endl;
+ objects.resize(0);
+ }
+
+ return std::make_tuple(objects, next);
+}
+
+std::set<size_t> SampleDedupWorkerThread::sample_object(size_t count)
+{
+ std::set<size_t> indexes;
+ for (size_t index = 0 ; index < count ; index++) {
+ indexes.insert(index);
+ }
+ return indexes;
+}
+
+void SampleDedupWorkerThread::try_dedup_and_accumulate_result(ObjectItem &object)
+{
+ bufferlist data = read_object(object);
+ if (data.length() == 0) {
+ cerr << __func__ << " skip object " << object.oid
+ << " dedup (read failed)\n";
+ return;
+ }
+ auto chunks = do_cdc(object, data);
+ size_t chunk_total_amount = 0;
+
+ // First, check total size of created chunks
+ for (auto &chunk : chunks) {
+ auto &chunk_data = std::get<0>(chunk);
+ chunk_total_amount += chunk_data.length();
+ }
+ if (chunk_total_amount != data.length()) {
+ cerr << __func__ << " sum of chunked length(" << chunk_total_amount
+ << ") is different from object data length(" << data.length() << ")\n";
+ return;
+ }
+
+ size_t duplicated_size = 0;
+ list<chunk_t> redundant_chunks;
+ for (auto &chunk : chunks) {
+ auto &chunk_data = std::get<0>(chunk);
+ std::string fingerprint = generate_fingerprint(chunk_data);
+ std::pair<uint64_t, uint64_t> chunk_boundary = std::get<1>(chunk);
+ chunk_t chunk_info = {
+ .oid = object.oid,
+ .start = chunk_boundary.first,
+ .size = chunk_boundary.second,
+ .fingerprint = fingerprint,
+ .data = chunk_data
+ };
+
+ if (sample_dedup_global.fp_store.find(fingerprint)) {
+ duplicated_size += chunk_data.length();
+ }
+ if (sample_dedup_global.fp_store.add(chunk_info)) {
+ redundant_chunks.push_back(chunk_info);
+ }
+ }
+
+ size_t object_size = data.length();
+
+ // perform chunk-dedup
+ for (auto &p : redundant_chunks) {
+ do_chunk_dedup(p);
+ }
+ total_duplicated_size += duplicated_size;
+ total_object_size += object_size;
+}
+
+bufferlist SampleDedupWorkerThread::read_object(ObjectItem &object)
+{
+ bufferlist whole_data;
+ size_t offset = 0;
+ int ret = -1;
+ while (ret != 0) {
+ bufferlist partial_data;
+ ret = io_ctx.read(object.oid, partial_data, default_op_size, offset);
+ if (ret < 0) {
+ cerr << "read object error " << object.oid << " offset " << offset
+ << " size " << default_op_size << " error(" << cpp_strerror(ret)
+ << std::endl;
+ bufferlist empty_buf;
+ return empty_buf;
+ }
+ offset += ret;
+ whole_data.claim_append(partial_data);
+ }
+ return whole_data;
+}
+
+std::vector<std::tuple<bufferlist, pair<uint64_t, uint64_t>>> SampleDedupWorkerThread::do_cdc(
+ ObjectItem &object,
+ bufferlist &data)
+{
+ std::vector<std::tuple<bufferlist, pair<uint64_t, uint64_t>>> ret;
+
+ unique_ptr<CDC> cdc = CDC::create(chunk_algo, cbits(chunk_size) - 1);
+ vector<pair<uint64_t, uint64_t>> chunks;
+ cdc->calc_chunks(data, &chunks);
+ for (auto &p : chunks) {
+ bufferlist chunk;
+ chunk.substr_of(data, p.first, p.second);
+ ret.push_back(make_tuple(chunk, p));
+ }
+
+ return ret;
+}
+
+std::string SampleDedupWorkerThread::generate_fingerprint(bufferlist chunk_data)
+{
+ string ret;
+
+ switch (fp_type) {
+ case pg_pool_t::TYPE_FINGERPRINT_SHA1:
+ ret = crypto::digest<crypto::SHA1>(chunk_data).to_str();
+ break;
+
+ case pg_pool_t::TYPE_FINGERPRINT_SHA256:
+ ret = crypto::digest<crypto::SHA256>(chunk_data).to_str();
+ break;
+
+ case pg_pool_t::TYPE_FINGERPRINT_SHA512:
+ ret = crypto::digest<crypto::SHA512>(chunk_data).to_str();
+ break;
+ default:
+ ceph_assert(0 == "Invalid fp type");
+ break;
+ }
+ return ret;
+}
+
+void SampleDedupWorkerThread::do_object_dedup(ObjectItem &object)
+{
+ ObjectReadOperation op;
+ op.tier_flush();
+
+ int ret = io_ctx.operate(
+ object.oid,
+ &op,
+ NULL);
+ if (ret < 0) {
+ cerr << " tier_flush fail : " << cpp_strerror(ret) << std::endl;
+ }
+ return;
+}
+
+int SampleDedupWorkerThread::do_chunk_dedup(chunk_t &chunk)
+{
+ uint64_t size;
+ time_t mtime;
+
+ int ret = chunk_io_ctx.stat(chunk.fingerprint, &size, &mtime);
+
+ if (ret == -ENOENT) {
+ bufferlist bl;
+ bl.append(chunk.data);
+ ObjectWriteOperation wop;
+ wop.write_full(bl);
+ chunk_io_ctx.operate(chunk.fingerprint, &wop);
+ } else {
+ ceph_assert(ret == 0);
+ }
+
+ ObjectReadOperation op;
+ op.set_chunk(
+ chunk.start,
+ chunk.size,
+ chunk_io_ctx,
+ chunk.fingerprint,
+ 0,
+ CEPH_OSD_OP_FLAG_WITH_REFERENCE);
+ ret = io_ctx.operate(chunk.oid, &op, nullptr);
+ oid_for_evict.insert(chunk.oid);
+ return ret;
+}
+
void ChunkScrub::print_status(Formatter *f, ostream &out)
{
if (f) {
op_name = get_opts_op_name(opts);
chunk_pool_name = get_opts_chunk_pool(opts);
- max_thread = get_opts_max_thread(opts);
- report_period = get_opts_report_period(opts);
boost::optional<pg_t> pgid(opts.count("pgid"), pg_t());
ret = rados.init_with_context(g_ceph_context);
return 0;
}
+ max_thread = get_opts_max_thread(opts);
+ report_period = get_opts_report_period(opts);
glock.lock();
begin = chunk_io_ctx.object_list_begin();
end = chunk_io_ctx.object_list_end();
return (ret < 0) ? 1 : 0;
}
+int make_crawling_daemon(const po::variables_map &opts)
+{
+ string base_pool_name = get_opts_pool_name(opts);
+ string chunk_pool_name = get_opts_chunk_pool(opts);
+ unsigned max_thread = get_opts_max_thread(opts);
+
+ size_t chunk_size = 8192;
+ if (opts.count("chunk-size")) {
+ chunk_size = opts["chunk-size"].as<int>();
+ } else {
+ cout << "8192 is set as chunk size by default" << std::endl;
+ }
+
+ uint32_t chunk_dedup_threshold = -1;
+ if (opts.count("chunk-dedup-threshold")) {
+ chunk_size = opts["chunk-dedup-threshold"].as<uint32_t>();
+ }
+
+ std::string chunk_algo = get_opts_chunk_algo(opts);
+
+ Rados rados;
+ int ret = rados.init_with_context(g_ceph_context);
+ if (ret < 0) {
+ cerr << "couldn't initialize rados: " << cpp_strerror(ret) << std::endl;
+ return -EINVAL;
+ }
+ ret = rados.connect();
+ if (ret) {
+ cerr << "couldn't connect to cluster: " << cpp_strerror(ret) << std::endl;
+ return -EINVAL;
+ }
+
+ std::string fp_algo = get_opts_fp_algo(opts);
+
+ list<string> pool_names;
+ IoCtx io_ctx, chunk_io_ctx;
+ pool_names.push_back(base_pool_name);
+ ret = rados.ioctx_create(base_pool_name.c_str(), io_ctx);
+ if (ret < 0) {
+ cerr << "error opening base pool "
+ << base_pool_name << ": "
+ << cpp_strerror(ret) << std::endl;
+ return -EINVAL;
+ }
+
+ ret = rados.ioctx_create(chunk_pool_name.c_str(), chunk_io_ctx);
+ if (ret < 0) {
+ cerr << "error opening chunk pool "
+ << chunk_pool_name << ": "
+ << cpp_strerror(ret) << std::endl;
+ return -EINVAL;
+ }
+ bufferlist inbl;
+ ret = rados.mon_command(
+ make_pool_str(base_pool_name, "fingerprint_algorithm", fp_algo),
+ inbl, NULL, NULL);
+ if (ret < 0) {
+ cerr << " operate fail : " << cpp_strerror(ret) << std::endl;
+ return ret;
+ }
+ ret = rados.mon_command(
+ make_pool_str(base_pool_name, "dedup_chunk_algorithm", "fastcdc"),
+ inbl, NULL, NULL);
+ if (ret < 0) {
+ cerr << " operate fail : " << cpp_strerror(ret) << std::endl;
+ return ret;
+ }
+ ret = rados.mon_command(
+ make_pool_str(base_pool_name, "dedup_cdc_chunk_size", chunk_size),
+ inbl, NULL, NULL);
+ if (ret < 0) {
+ cerr << " operate fail : " << cpp_strerror(ret) << std::endl;
+ return ret;
+ }
+
+ cout << "Object Dedup Threshold : " << object_dedup_threshold << std::endl
+ << "Chunk Dedup Threshold : " << chunk_dedup_threshold << std::endl
+ << "Chunk Size : " << chunk_size << std::endl
+ << std::endl;
+
+ while (true) {
+ lock_guard lock(glock);
+ ObjectCursor begin = io_ctx.object_list_begin();
+ ObjectCursor end = io_ctx.object_list_end();
+ map<string, librados::pool_stat_t> stats;
+ ret = rados.get_pool_stats(pool_names, stats);
+ if (ret < 0) {
+ cerr << "error fetching pool stats: " << cpp_strerror(ret) << std::endl;
+ return -EINVAL;
+ }
+ if (stats.find(base_pool_name) == stats.end()) {
+ cerr << "stats can not find pool name: " << base_pool_name << std::endl;
+ return -EINVAL;
+ }
+
+ bool debug = false;
+ if (opts.count("debug")) {
+ debug = true;
+ }
+
+ estimate_threads.clear();
+ SampleDedupWorkerThread::SampleDedupGlobal sample_dedup_global(
+ chunk_dedup_threshold);
+
+ for (unsigned i = 0; i < max_thread; i++) {
+ cout << " add thread.. " << std::endl;
+ ObjectCursor shard_start;
+ ObjectCursor shard_end;
+ io_ctx.object_list_slice(
+ begin,
+ end,
+ i,
+ max_thread,
+ &shard_start,
+ &shard_end);
+
+ unique_ptr<CrawlerThread> ptr (
+ new SampleDedupWorkerThread(
+ io_ctx,
+ chunk_io_ctx,
+ shard_start,
+ shard_end,
+ chunk_size,
+ fp_algo,
+ chunk_algo,
+ sample_dedup_global));
+ ptr->set_debug(debug);
+ ptr->create("sample_dedup");
+ estimate_threads.push_back(move(ptr));
+ }
+
+ for (auto &p : estimate_threads) {
+ p->join();
+ }
+ break;
+ }
+
+ return 0;
+}
+
int main(int argc, const char **argv)
{
auto args = argv_to_vec(argc, argv);
*
*/
return make_dedup_object(opts);
+ } else if (op_name == "sample-dedup") {
+ return make_crawling_daemon(opts);
} else {
cerr << "unrecognized op " << op_name << std::endl;
exit(1);