#include "tools/RadosDump.h"
#include "cls/cas/cls_cas_client.h"
+#include "include/stringify.h"
+#include "global/signal_handler.h"
using namespace librados;
unsigned default_op_size = 1 << 22;
unsigned default_max_thread = 2;
+int32_t default_report_period = 2;
map< string, pair <uint64_t, uint64_t> > chunk_statistics; // < key, <count, chunk_size> >
Mutex glock("chunk_statistics::Locker");
map< string, pair <uint64_t, uint64_t> > local_chunk_statistics; // < key, <count, chunk_size> >
typedef void (EstimateThread::*entry_func)();
entry_func func;
+ Mutex m_lock;
+ Cond m_cond;
+ int32_t timeout;
+ bool m_stop = false;
+#define COND_WAIT_INTERVAL 10
public:
EstimateThread(IoCtx& io_ctx, int n, int m, ObjectCursor begin, ObjectCursor end,
- string chunk_algo, string fp_algo, uint64_t chunk_size, IoCtx chunk_io_ctx):
+ string chunk_algo, string fp_algo, uint64_t chunk_size, IoCtx chunk_io_ctx, int32_t timeout):
io_ctx(io_ctx), n(n), m(m), begin(begin), end(end), chunk_algo(chunk_algo), fp_algo(fp_algo),
- chunk_size(chunk_size), chunk_io_ctx(chunk_io_ctx) {}
+ chunk_size(chunk_size), chunk_io_ctx(chunk_io_ctx), m_lock("EstimateThread::Locker"),
+ timeout(timeout) {}
void set_estimate_dedup_ratio() {
func = &EstimateThread::estimate_dedup_ratio;
}
void estimate_dedup_ratio();
void chunk_scrub_common();
-
+ void signal(int signum) {
+ Mutex::Locker l(m_lock);
+ m_stop = true;
+ m_cond.Signal();
+ }
+ void estimate_print_status(Formatter *f, ostream &out);
};
vector<std::unique_ptr<EstimateThread>> estimate_threads;
+static void handle_signal(int signum)
+{
+ Mutex::Locker l(glock);
+ for (auto &p : estimate_threads) {
+ p->signal(signum);
+ }
+}
+
void* EstimateThread::entry() {
if (func) {
(this->*func)();
return NULL;
}
+void EstimateThread::estimate_print_status(Formatter *f, ostream &out)
+{
+ if (f) {
+ f->open_array_section("estimate_dedup_ratio");
+ f->dump_string("PID", stringify(get_pid()));
+ for (auto p : local_chunk_statistics) {
+ f->open_object_section("fingerprint object");
+ f->dump_string("fingperint", p.first);
+ f->dump_string("count", stringify(p.second.first));
+ f->dump_string("chunk_size", stringify(p.second.second));
+ }
+ f->close_section();
+ f->flush(out);
+ cout << std::endl;
+ }
+}
+
void EstimateThread::estimate_dedup_ratio()
{
ObjectCursor shard_start;
ObjectCursor shard_end;
unsigned op_size = default_op_size;
int ret;
+ utime_t cur_time = ceph_clock_now();
io_ctx.object_list_slice(
begin,
const auto &oid = i.oid;
uint64_t offset = 0;
while (true) {
+ Mutex::Locker l(m_lock);
+ if (m_stop) {
+ Formatter *formatter = Formatter::create("json-pretty");
+ estimate_print_status(formatter, cout);
+ delete formatter;
+ return;
+ }
bufferlist outdata;
ret = io_ctx.read(oid, outdata, op_size, offset);
if (ret <= 0) {
break;
}
offset += outdata.length();
+ m_cond.WaitInterval(m_lock,utime_t(0, COND_WAIT_INTERVAL));
+ if (cur_time + utime_t(timeout, 0) < ceph_clock_now()) {
+ Formatter *formatter = Formatter::create("json-pretty");
+ estimate_print_status(formatter, cout);
+ delete formatter;
+ cur_time = ceph_clock_now();
+ }
}
}
}
ObjectCursor shard_start;
ObjectCursor shard_end;
int ret;
+ int total_object = 0, comp_object = 0;
+ utime_t cur_time = ceph_clock_now();
chunk_io_ctx.object_list_slice(
begin,
}
for (const auto & i : result) {
+ Mutex::Locker l(m_lock);
+ if (m_stop) {
+ cout << " Completed object : " << comp_object << std::endl;
+ return;
+ }
auto oid = i.oid;
set<hobject_t> refs;
set<hobject_t> real_refs;
continue;
}
}
+ comp_object++;
+ m_cond.WaitInterval(m_lock,utime_t(0, COND_WAIT_INTERVAL));
+ if (cur_time + utime_t(timeout, 0) < ceph_clock_now()) {
+ cout << " Completed object : " << comp_object << std::endl;
+ cur_time = ceph_clock_now();
+ }
}
+ total_object++;
}
+ cout << " Total object : " << total_object << std::endl;
+ cout << " Completed object : " << comp_object << std::endl;
}
int estimate_dedup_ratio(const std::map < std::string, std::string > &opts,
string pool_name;
uint64_t chunk_size = 0;
unsigned max_thread = default_max_thread;
+ uint32_t report_period = default_report_period;
int ret;
std::map<std::string, std::string>::const_iterator i;
+ bool debug = false;
i = opts.find("pool");
if (i != opts.end()) {
}
}
+ i = opts.find("report-period");
+ if (i != opts.end()) {
+ if (rados_sistrtoll(i, &report_period)) {
+ return -EINVAL;
+ }
+ }
+ i = opts.find("debug");
+ if (i != opts.end()) {
+ debug = true;
+ }
+
i = opts.find("pgid");
boost::optional<pg_t> pgid(i != opts.end(), pg_t());
goto out;
}
+ glock.Lock();
for (unsigned i = 0; i < max_thread; i++) {
ObjectCursor begin = io_ctx.object_list_begin();
ObjectCursor end = io_ctx.object_list_end();
std::unique_ptr<EstimateThread> ptr (new EstimateThread(io_ctx, i, max_thread, begin, end,
- chunk_algo, fp_algo, chunk_size, IoCtx()));
+ chunk_algo, fp_algo, chunk_size, IoCtx(),
+ report_period));
ptr->set_estimate_dedup_ratio();
ptr->create("estimate_thread");
estimate_threads.push_back(move(ptr));
}
+ glock.Unlock();
for (auto &p : estimate_threads) {
p->join();
}
- print_dedup_estimate();
+ if (debug) {
+ print_dedup_estimate();
+ }
out:
return (ret < 0) ? 1 : 0;
}
int ret;
unsigned max_thread = default_max_thread;
std::map<std::string, std::string>::const_iterator i;
+ uint32_t report_period = default_report_period;
i = opts.find("pool");
if (i != opts.end()) {
return -EINVAL;
}
}
+ i = opts.find("report-period");
+ if (i != opts.end()) {
+ if (rados_sistrtoll(i, &report_period)) {
+ return -EINVAL;
+ }
+ }
i = opts.find("pgid");
boost::optional<pg_t> pgid(i != opts.end(), pg_t());
return ret;
}
+ glock.Lock();
for (unsigned i = 0; i < max_thread; i++) {
ObjectCursor begin = io_ctx.object_list_begin();
ObjectCursor end = io_ctx.object_list_end();
std::unique_ptr<EstimateThread> ptr (new EstimateThread(io_ctx, i, max_thread, begin, end,
- "", "", 0, chunk_io_ctx));
+ "", "", 0, chunk_io_ctx,
+ report_period));
ptr->set_chunk_scrub_common();
ptr->create("estimate_thread");
estimate_threads.push_back(move(ptr));
}
+ glock.Unlock();
for (auto &p : estimate_threads) {
p->join();
auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
CODE_ENVIRONMENT_UTILITY, 0);
common_init_finish(g_ceph_context);
+ init_async_signal_handler();
+ register_async_signal_handler_oneshot(SIGINT, handle_signal);
+ register_async_signal_handler_oneshot(SIGTERM, handle_signal);
std::map < std::string, std::string > opts;
std::string val;
std::vector<const char*>::iterator i;
opts["target-ref"] = val;
} else if (ceph_argparse_witharg(args, i, &val, "--max-thread", (char*)NULL)) {
opts["max-thread"] = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--report-period", (char*)NULL)) {
+ opts["report-period"] = val;
+ } else if (ceph_argparse_flag(args, i, "--debug", (char*)NULL)) {
+ opts["debug"] = "true";
} else {
if (val[0] == '-')
usage_exit();
usage();
exit(0);
}
+
+ unregister_async_signal_handler(SIGINT, handle_signal);
+ unregister_async_signal_handler(SIGTERM, handle_signal);
+ shutdown_async_signal_handler();
return 0;
}