string chunk_algo;
string fp_algo;
uint64_t chunk_size;
+ IoCtx chunk_io_ctx;
map< string, pair <uint64_t, uint64_t> > local_chunk_statistics; // < key, <count, chunk_size> >
typedef void (EstimateThread::*entry_func)();
entry_func func;
public:
EstimateThread(IoCtx& io_ctx, int n, int m, ObjectCursor begin, ObjectCursor end,
- string chunk_algo, string fp_algo, uint64_t chunk_size):
+ string chunk_algo, string fp_algo, uint64_t chunk_size, IoCtx chunk_io_ctx):
io_ctx(io_ctx), n(n), m(m), begin(begin), end(end), chunk_algo(chunk_algo), fp_algo(fp_algo),
- chunk_size(chunk_size) {}
+ chunk_size(chunk_size), chunk_io_ctx(chunk_io_ctx) {}
void set_estimate_dedup_ratio() {
func = &EstimateThread::estimate_dedup_ratio;
}
+ void set_chunk_scrub_common() {
+ func = &EstimateThread::chunk_scrub_common;
+ }
void estimate_dedup_ratio();
+ void chunk_scrub_common();
};
const auto &oid = i.oid;
uint64_t offset = 0;
while (true) {
- cout << __func__ << oid << std::endl;
bufferlist outdata;
ret = io_ctx.read(oid, outdata, op_size, offset);
if (ret <= 0) {
chunk_statistics.insert(local_chunk_statistics.begin(), local_chunk_statistics.end());
}
+void EstimateThread::chunk_scrub_common()
+{
+ ObjectCursor shard_start;
+ ObjectCursor shard_end;
+ int ret;
+
+ chunk_io_ctx.object_list_slice(
+ begin,
+ end,
+ n,
+ m,
+ &shard_start,
+ &shard_end);
+
+ ObjectCursor c(shard_start);
+ while(c < shard_end)
+ {
+ std::vector<ObjectItem> result;
+ int r = chunk_io_ctx.object_list(c, shard_end, 12, {}, &result, &c);
+ if (r < 0 ){
+ cerr << "error object_list : " << cpp_strerror(r) << std::endl;
+ return;
+ }
+
+ for (const auto & i : result) {
+ auto oid = i.oid;
+ set<hobject_t> refs;
+ set<hobject_t> real_refs;
+ ret = cls_chunk_refcount_read(chunk_io_ctx, oid, &refs);
+ if (ret < 0) {
+ continue;
+ }
+
+ for (auto pp : refs) {
+ ret = cls_chunk_has_chunk(io_ctx, pp.oid.name, oid);
+ if (ret != -ENOENT) {
+ real_refs.insert(pp);
+ }
+ }
+
+ if (refs.size() != real_refs.size()) {
+ ObjectWriteOperation op;
+ cls_chunk_refcount_set(op, real_refs);
+ ret = chunk_io_ctx.operate(oid, &op);
+ if (ret < 0) {
+ continue;
+ }
+ }
+ }
+ }
+}
+
int estimate_dedup_ratio(const std::map < std::string, std::string > &opts,
std::vector<const char*> &nargs)
{
ObjectCursor begin = io_ctx.object_list_begin();
ObjectCursor end = io_ctx.object_list_end();
std::unique_ptr<EstimateThread> ptr (new EstimateThread(io_ctx, i, max_thread, begin, end,
- chunk_algo, fp_algo, chunk_size));
+ chunk_algo, fp_algo, chunk_size, IoCtx()));
ptr->set_estimate_dedup_ratio();
ptr->create("estimate_thread");
estimate_threads.push_back(move(ptr));
std::string object_name, target_object_name;
string pool_name, chunk_pool_name, op_name;
int ret;
+ unsigned max_thread = default_max_thread;
std::map<std::string, std::string>::const_iterator i;
i = opts.find("pool");
} else {
usage_exit();
}
+ i = opts.find("max-thread");
+ if (i != opts.end()) {
+ if (rados_sistrtoll(i, &max_thread)) {
+ return -EINVAL;
+ }
+ }
i = opts.find("pgid");
boost::optional<pg_t> pgid(i != opts.end(), pg_t());
} else {
usage_exit();
}
- cout << __func__ << " " << __LINE__ << std::endl;
set<hobject_t> refs;
cout << " refs: " << std::endl;
ret = cls_chunk_refcount_read(chunk_io_ctx, object_name, &refs);
return ret;
}
- {
- try {
- librados::NObjectIterator i = pgid ? chunk_io_ctx.nobjects_begin(pgid->ps()) : chunk_io_ctx.nobjects_begin();
- librados::NObjectIterator i_end = chunk_io_ctx.nobjects_end();
- for (; i != i_end; ++i) {
- set<hobject_t> refs;
- set<hobject_t> real_refs;
- string oid = i->get_oid();
- ret = cls_chunk_refcount_read(chunk_io_ctx, oid, &refs);
- if (ret < 0) {
- continue;
- }
-
- for (auto pp : refs) {
- ret = cls_chunk_has_chunk(io_ctx, pp.oid.name, oid);
- if (ret != -ENOENT) {
- real_refs.insert(pp);
- }
- }
+ for (unsigned i = 0; i < max_thread; i++) {
+ ObjectCursor begin = io_ctx.object_list_begin();
+ ObjectCursor end = io_ctx.object_list_end();
+ std::unique_ptr<EstimateThread> ptr (new EstimateThread(io_ctx, i, max_thread, begin, end,
+ "", "", 0, chunk_io_ctx));
+ ptr->set_chunk_scrub_common();
+ ptr->create("estimate_thread");
+ estimate_threads.push_back(move(ptr));
+ }
- if (refs.size() != real_refs.size()) {
- ObjectWriteOperation op;
- cls_chunk_refcount_set(op, real_refs);
- ret = chunk_io_ctx.operate(oid, &op);
- if (ret < 0) {
- continue;
- }
- }
- }
- }
- catch (const std::runtime_error& e) {
- cerr << e.what() << std::endl;
- ret = -1;
- goto out;
- }
+ for (auto &p : estimate_threads) {
+ p->join();
}
out: