cout << " --chunk-size <size> chunk-size (byte) " << std::endl;
cout << " --chunk-algorithm <fixed> " << std::endl;
cout << " --fingerprint-algorithm <sha1> " << std::endl;
+ cout << " --chunk-pool <pool name> " << std::endl;
+ cout << " --max-thread <threads> " << std::endl;
+ cout << " --report-perioid <seconds> " << std::endl;
exit(1);
}
}
}
-static void print_dedup_estimate()
-{
- uint64_t total_size = 0;
- uint64_t dedup_size = 0;
- for (auto p : chunk_statistics) {
- cout << " -- " << std::endl;
- cout << " key: " << p.first << std::endl;
- cout << " count: " << p.second.first << std::endl;
- cout << " chunk_size: " << p.second.second << std::endl;
- cout << " -- " << std::endl;
- total_size += p.second.second * p.second.first;
- dedup_size += p.second.second;
- }
-
- cout << " result: " << total_size << " / " << dedup_size << " (total size / dedup size) " << std::endl;
-}
-
+class EstimateDedupRatio;
+class ChunkScrub;
class EstimateThread : public Thread
{
- void* entry() override;
IoCtx io_ctx;
int n;
int m;
ObjectCursor begin;
ObjectCursor end;
- string chunk_algo;
- string fp_algo;
- uint64_t chunk_size;
- IoCtx chunk_io_ctx;
- map< string, pair <uint64_t, uint64_t> > local_chunk_statistics; // < key, <count, chunk_size> >
- typedef void (EstimateThread::*entry_func)();
- entry_func func;
Mutex m_lock;
Cond m_cond;
int32_t timeout;
bool m_stop = false;
+ uint64_t total_bytes = 0;
+ uint64_t deduped_bytes = 0;
+ uint64_t examined_objects = 0;
+ uint64_t total_objects = 0;
#define COND_WAIT_INTERVAL 10
- public:
- EstimateThread(IoCtx& io_ctx, int n, int m, ObjectCursor begin, ObjectCursor end,
- string chunk_algo, string fp_algo, uint64_t chunk_size, IoCtx chunk_io_ctx, int32_t timeout):
- io_ctx(io_ctx), n(n), m(m), begin(begin), end(end), chunk_algo(chunk_algo), fp_algo(fp_algo),
- chunk_size(chunk_size), chunk_io_ctx(chunk_io_ctx), m_lock("EstimateThread::Locker"),
- timeout(timeout) {}
-
- void set_estimate_dedup_ratio() {
- func = &EstimateThread::estimate_dedup_ratio;
- }
- void set_chunk_scrub_common() {
- func = &EstimateThread::chunk_scrub_common;
- }
- void estimate_dedup_ratio();
- void chunk_scrub_common();
+public:
+ EstimateThread(IoCtx& io_ctx, int n, int m, ObjectCursor begin, ObjectCursor end, int32_t timeout):
+ io_ctx(io_ctx), n(n), m(m), begin(begin), end(end), m_lock("EstimateThread::Locker"), timeout(timeout)
+ {}
void signal(int signum) {
Mutex::Locker l(m_lock);
m_stop = true;
m_cond.Signal();
}
- void estimate_print_status(Formatter *f, ostream &out);
+ virtual void print_status(Formatter *f, ostream &out) = 0;
+ uint64_t get_examined_objects() { return examined_objects; }
+ uint64_t get_total_bytes() { return total_bytes; }
+ uint64_t get_total_objects() { return total_objects; }
+ uint64_t get_deduped_bytes() { return deduped_bytes; }
+ friend class EstimateDedupRatio;
+ friend class ChunkScrub;
+};
+
+class EstimateDedupRatio : public EstimateThread
+{
+ string chunk_algo;
+ string fp_algo;
+ uint64_t chunk_size;
+ map< string, pair <uint64_t, uint64_t> > local_chunk_statistics; // < key, <count, chunk_size> >
+
+public:
+ EstimateDedupRatio(IoCtx& io_ctx, int n, int m, ObjectCursor begin, ObjectCursor end,
+ string chunk_algo, string fp_algo, uint64_t chunk_size, int32_t timeout):
+ EstimateThread(io_ctx, n, m, begin, end, timeout), chunk_algo(chunk_algo), fp_algo(fp_algo),
+ chunk_size(chunk_size) { }
+
+ void* entry() {
+ estimate_dedup_ratio();
+ return NULL;
+ }
+ void estimate_dedup_ratio();
+ void print_status(Formatter *f, ostream &out);
+ map< string, pair <uint64_t, uint64_t> > &get_chunk_statistics() { return local_chunk_statistics; }
+};
+
+class ChunkScrub: public EstimateThread
+{
+ IoCtx chunk_io_ctx;
+ uint64_t examined_objects = 0;
+ uint64_t total_objects = 0;
+
+public:
+ ChunkScrub(IoCtx& io_ctx, int n, int m, ObjectCursor begin, ObjectCursor end,
+ IoCtx& chunk_io_ctx, int32_t timeout):
+ EstimateThread(io_ctx, n, m, begin, end, timeout), chunk_io_ctx(chunk_io_ctx)
+ { }
+
+ void* entry() {
+ chunk_scrub_common();
+ return NULL;
+ }
+ void chunk_scrub_common();
+ void print_status(Formatter *f, ostream &out);
};
vector<std::unique_ptr<EstimateThread>> estimate_threads;
+static void print_dedup_estimate(bool debug = false)
+{
+ uint64_t total_size = 0;
+ uint64_t dedup_size = 0;
+ uint64_t examined_objects = 0;
+ EstimateDedupRatio *ratio = NULL;
+ if (debug) {
+ for (auto &et : estimate_threads) {
+ Mutex::Locker l(glock);
+ ratio = dynamic_cast<EstimateDedupRatio*>(et.get());
+ chunk_statistics.insert(ratio->get_chunk_statistics().begin(),
+ ratio->get_chunk_statistics().end());
+ }
+
+ for (auto p : chunk_statistics) {
+ cout << " -- " << std::endl;
+ cout << " key: " << p.first << std::endl;
+ cout << " count: " << p.second.first << std::endl;
+ cout << " chunk_size: " << p.second.second << std::endl;
+ cout << " -- " << std::endl;
+ }
+ }
+
+ for (auto &et : estimate_threads) {
+ total_size += et->get_total_bytes();
+ examined_objects += et->get_examined_objects();
+ dedup_size += et->get_deduped_bytes();
+ }
+
+ cout << " result: " << total_size << " / " << dedup_size << " (total size / dedup size) " << std::endl;
+ cout << " dedup ratio: " << dedup_size/total_size << std::endl;
+ cout << " examined objects: " << examined_objects << std::endl;
+}
+
static void handle_signal(int signum)
{
Mutex::Locker l(glock);
}
}
-void* EstimateThread::entry() {
- if (func) {
- (this->*func)();
- }
- return NULL;
-}
-
-void EstimateThread::estimate_print_status(Formatter *f, ostream &out)
+void EstimateDedupRatio::print_status(Formatter *f, ostream &out)
{
if (f) {
f->open_array_section("estimate_dedup_ratio");
f->dump_string("chunk_size", stringify(p.second.second));
}
f->close_section();
+ f->open_object_section("Status");
+ f->dump_string("Total bytes", stringify(total_bytes));
+ f->dump_string("Deduped bytes", stringify(deduped_bytes));
+ f->dump_string("Examined objectes", stringify(examined_objects));
+ f->close_section();
f->flush(out);
cout << std::endl;
}
}
-void EstimateThread::estimate_dedup_ratio()
+void EstimateDedupRatio::estimate_dedup_ratio()
{
ObjectCursor shard_start;
ObjectCursor shard_end;
Mutex::Locker l(m_lock);
if (m_stop) {
Formatter *formatter = Formatter::create("json-pretty");
- estimate_print_status(formatter, cout);
+ print_status(formatter, cout);
delete formatter;
return;
}
local_chunk_statistics[fp] = make_pair(count, chunk.length());
} else {
local_chunk_statistics[fp] = make_pair(1, chunk.length());
+ deduped_bytes += chunk.length();
}
+ total_bytes += chunk.length();
c_offset = c_offset + chunk_size;
}
} else {
m_cond.WaitInterval(m_lock,utime_t(0, COND_WAIT_INTERVAL));
if (cur_time + utime_t(timeout, 0) < ceph_clock_now()) {
Formatter *formatter = Formatter::create("json-pretty");
- estimate_print_status(formatter, cout);
+ print_status(formatter, cout);
delete formatter;
cur_time = ceph_clock_now();
}
}
+ examined_objects++;
}
+ total_objects++;
}
-
- Mutex::Locker l(glock);
- chunk_statistics.insert(local_chunk_statistics.begin(), local_chunk_statistics.end());
}
-void EstimateThread::chunk_scrub_common()
+void ChunkScrub::chunk_scrub_common()
{
ObjectCursor shard_start;
ObjectCursor shard_end;
int ret;
- int total_object = 0, comp_object = 0;
utime_t cur_time = ceph_clock_now();
chunk_io_ctx.object_list_slice(
for (const auto & i : result) {
Mutex::Locker l(m_lock);
if (m_stop) {
- cout << " Completed object : " << comp_object << std::endl;
+ Formatter *formatter = Formatter::create("json-pretty");
+ print_status(formatter, cout);
+ delete formatter;
return;
}
auto oid = i.oid;
continue;
}
}
- comp_object++;
+ examined_objects++;
m_cond.WaitInterval(m_lock,utime_t(0, COND_WAIT_INTERVAL));
if (cur_time + utime_t(timeout, 0) < ceph_clock_now()) {
- cout << " Completed object : " << comp_object << std::endl;
+ Formatter *formatter = Formatter::create("json-pretty");
+ print_status(formatter, cout);
+ delete formatter;
cur_time = ceph_clock_now();
}
}
- total_object++;
+ total_objects++;
+ }
+ cout << " Total object : " << total_objects << std::endl;
+ cout << " Completed object : " << examined_objects << std::endl;
+}
+
+void ChunkScrub::print_status(Formatter *f, ostream &out)
+{
+ if (f) {
+ f->open_array_section("chunk_scrub");
+ f->dump_string("PID", stringify(get_pid()));
+ f->open_object_section("Status");
+ f->dump_string("Total object", stringify(total_objects));
+ f->dump_string("Examined objectes", stringify(examined_objects));
+ f->close_section();
+ f->flush(out);
+ cout << std::endl;
}
- cout << " Total object : " << total_object << std::endl;
- cout << " Completed object : " << comp_object << std::endl;
}
int estimate_dedup_ratio(const std::map < std::string, std::string > &opts,
{
Rados rados;
IoCtx io_ctx;
- std::string object_name;
std::string chunk_algo;
string fp_algo;
string pool_name;
if (i != opts.end()) {
pool_name = i->second.c_str();
}
- i = opts.find("object");
- if (i != opts.end()) {
- object_name = i->second.c_str();
- }
i = opts.find("chunk-algorithm");
if (i != opts.end()) {
chunk_algo = i->second.c_str();
for (unsigned i = 0; i < max_thread; i++) {
ObjectCursor begin = io_ctx.object_list_begin();
ObjectCursor end = io_ctx.object_list_end();
- std::unique_ptr<EstimateThread> ptr (new EstimateThread(io_ctx, i, max_thread, begin, end,
- chunk_algo, fp_algo, chunk_size, IoCtx(),
+ std::unique_ptr<EstimateThread> ptr (new EstimateDedupRatio(io_ctx, i, max_thread, begin, end,
+ chunk_algo, fp_algo, chunk_size,
report_period));
- ptr->set_estimate_dedup_ratio();
ptr->create("estimate_thread");
estimate_threads.push_back(move(ptr));
}
p->join();
}
- if (debug) {
- print_dedup_estimate();
- }
+ print_dedup_estimate(debug);
+
out:
return (ret < 0) ? 1 : 0;
}
+static void print_chunk_scrub()
+{
+ uint64_t total_objects = 0;
+ uint64_t examined_objects = 0;
+
+ for (auto &et : estimate_threads) {
+ total_objects += et->get_total_objects();
+ examined_objects += et->get_examined_objects();
+ }
+
+ cout << " Total object : " << total_objects << std::endl;
+ cout << " Completed object : " << examined_objects << std::endl;
+}
+
int chunk_scrub_common(const std::map < std::string, std::string > &opts,
std::vector<const char*> &nargs)
{
for (unsigned i = 0; i < max_thread; i++) {
ObjectCursor begin = io_ctx.object_list_begin();
ObjectCursor end = io_ctx.object_list_end();
- std::unique_ptr<EstimateThread> ptr (new EstimateThread(io_ctx, i, max_thread, begin, end,
- "", "", 0, chunk_io_ctx,
- report_period));
- ptr->set_chunk_scrub_common();
+ std::unique_ptr<EstimateThread> ptr (new ChunkScrub(io_ctx, i, max_thread, begin, end, chunk_io_ctx,
+ report_period));
ptr->create("estimate_thread");
estimate_threads.push_back(move(ptr));
}
p->join();
}
+ print_chunk_scrub();
+
out:
return (ret < 0) ? 1 : 0;
}