#include "common.h"
+#undef dout_prefix
+#define dout_prefix *_dout << "ceph_dedup_daemon: " \
+ << __func__ << ": "
+
ceph::shared_mutex glock = ceph::make_shared_mutex("glock");
class SampleDedupWorkerThread;
bool all_stop = false; // Accessed in the main thread and in other worker threads under glock
void maybe_print_status() {
utime_t now = ceph_clock_now();
if (next_report != utime_t() && now > next_report) {
- cerr << (int)(now - start) << "s : read "
+ dout(5) << (int)(now - start) << "s : read "
<< total_bytes << " bytes so far..."
- << std::endl;
+ << dendl;
next_report = now;
next_report += report_period;
}
void SampleDedupWorkerThread::crawl()
{
- cout << "new iteration" << std::endl;
-
ObjectCursor current_object = begin;
std::shared_lock l{glock};
while (!all_stop && current_object < end) {
for (auto &completion : evict_completions) {
completion->wait_for_complete();
}
- cout << "done iteration" << std::endl;
}
AioCompRef SampleDedupWorkerThread::do_async_evict(string oid)
&objects,
&next);
if (ret < 0 ) {
- cerr << "error object_list" << std::endl;
+ derr << "error object_list" << dendl;
objects.clear();
}
{
bufferlist data = read_object(object);
if (data.length() == 0) {
- cerr << __func__ << " skip object " << object.oid
- << " read returned size 0" << std::endl;
+ derr << __func__ << " skip object " << object.oid
+ << " read returned size 0" << dendl;
return;
}
auto chunks = do_cdc(object, data);
chunk_total_amount += chunk_data.length();
}
if (chunk_total_amount != data.length()) {
- cerr << __func__ << " sum of chunked length(" << chunk_total_amount
+ derr << __func__ << " sum of chunked length(" << chunk_total_amount
<< ") is different from object data length(" << data.length() << ")"
- << std::endl;
+ << dendl;
return;
}
if (sample_dedup_global.fp_store.contains(fingerprint)) {
duplicated_size += chunk_data.length();
}
+
+ dout(20) << "generate a chunk (chunk oid: " << chunk_info.oid << ", offset: "
+ << chunk_info.start << ", length: " << chunk_info.size << ", fingerprint: "
+ << chunk_info.fingerprint << ")" << dendl;
+
if (sample_dedup_global.fp_store.add(chunk_info)) {
redundant_chunks.push_back(chunk_info);
+ dout(20) << chunk_info.fingerprint << "is duplicated, try to perform dedup" << dendl;
}
}
bufferlist partial_data;
ret = io_ctx.read(object.oid, partial_data, default_op_size, offset);
if (ret < 0) {
- cerr << "read object error " << object.oid << " offset " << offset
+ derr << "read object error " << object.oid << " offset " << offset
<< " size " << default_op_size << " error(" << cpp_strerror(ret)
- << std::endl;
+ << dendl;
bufferlist empty_buf;
return empty_buf;
}
offset += ret;
whole_data.claim_append(partial_data);
}
+ dout(20) << " got object: " << object.oid << " size: " << whole_data.length() << dendl;
return whole_data;
}
Rados rados;
int ret = rados.init_with_context(g_ceph_context);
if (ret < 0) {
- cerr << "couldn't initialize rados: " << cpp_strerror(ret) << std::endl;
+ derr << "couldn't initialize rados: " << cpp_strerror(ret) << dendl;
return -EINVAL;
}
ret = rados.connect();
if (ret) {
- cerr << "couldn't connect to cluster: " << cpp_strerror(ret) << std::endl;
+ derr << "couldn't connect to cluster: " << cpp_strerror(ret) << dendl;
return -EINVAL;
}
int wakeup_period = 5;
pool_names.push_back(base_pool_name);
ret = rados.ioctx_create(base_pool_name.c_str(), io_ctx);
if (ret < 0) {
- cerr << "error opening base pool "
+ derr << "error opening base pool "
<< base_pool_name << ": "
- << cpp_strerror(ret) << std::endl;
+ << cpp_strerror(ret) << dendl;
return -EINVAL;
}
ret = rados.ioctx_create(chunk_pool_name.c_str(), chunk_io_ctx);
if (ret < 0) {
- cerr << "error opening chunk pool "
+ derr << "error opening chunk pool "
<< chunk_pool_name << ": "
- << cpp_strerror(ret) << std::endl;
+ << cpp_strerror(ret) << dendl;
return -EINVAL;
}
- cout << "SampleRatio : " << sampling_ratio << std::endl
- << "Chunk Dedup Threshold : " << chunk_dedup_threshold << std::endl
- << "Chunk Size : " << chunk_size << std::endl
- << std::endl;
+ dout(0) << "ceph-dedup-daemon starts ( "
+ << "SampleRatio : " << sampling_ratio
+ << ", Chunk Dedup Threshold : " << chunk_dedup_threshold
+ << ", Chunk Size : " << chunk_size
+ << ", Fingperint Argorithm : " << fp_algo
+ << ", Chunk Argorithm : " << chunk_algo
+ << ", Chunk Dedup Threshold : " << chunk_dedup_threshold
+ << ", Fingerprint Store Threshold : " << fp_threshold
+ << ")"
+ << dendl;
std::shared_lock l(glock);
size_t total_size = 0;
size_t total_duplicate_size = 0;
for (unsigned i = 0; i < max_thread; i++) {
- cout << " add thread.. " << std::endl;
+ dout(15) << " spawn thread.. " << i << dendl;
ObjectCursor shard_start;
ObjectCursor shard_end;
io_ctx.object_list_slice(
total_duplicate_size += p.get_total_duplicated_size();
}
- cerr << "Summary: read "
+ dout(5) << "Summary: read "
<< total_size << " bytes so far and found saveable space ("
<< total_duplicate_size << " bytes)."
- << std::endl;
+ << dendl;
sleep(wakeup_period);
map<string, librados::pool_stat_t> stats;
ret = rados.get_pool_stats(pool_names, stats);
if (ret < 0) {
- cerr << "error fetching pool stats: " << cpp_strerror(ret) << std::endl;
+ derr << "error fetching pool stats: " << cpp_strerror(ret) << dendl;
return -EINVAL;
}
if (stats.find(base_pool_name) == stats.end()) {
- cerr << "stats can not find pool name: " << base_pool_name << std::endl;
+ derr << "stats can not find pool name: " << base_pool_name << dendl;
return -EINVAL;
}
}
l.unlock();
+ dout(0) << "done" << dendl;
return 0;
}
case SIGINT:
case SIGTERM:
all_stop = true;
+ dout(0) << "got a signal(" << signum << "), daemon wil be terminiated" << dendl;
break;
default: