#define dout_prefix *_dout << "ceph_dedup_daemon: " \
<< __func__ << ": "
-ceph::shared_mutex glock = ceph::make_shared_mutex("glock");
-class SampleDedupWorkerThread;
-bool all_stop = false; // Accessed in the main thread and in other worker threads under glock
-
po::options_description make_usage() {
po::options_description desc("Usage");
desc.add_options()
};
struct SampleDedupGlobal {
- FpStore fp_store;
- const double sampling_ratio = -1;
+ public:
SampleDedupGlobal(
size_t chunk_threshold,
int sampling_ratio,
size_t fpstore_threshold) :
fp_store(chunk_threshold, report_period, fpstore_threshold),
sampling_ratio(static_cast<double>(sampling_ratio) / 100) { }
+
+ bool is_all_stop() {
+ std::shared_lock l{glock};
+ return all_stop;
+ }
+ void set_all_stop() {
+ std::unique_lock l{glock};
+ all_stop = true;
+ }
+ friend class SampleDedupWorkerThread;
+ private:
+ FpStore fp_store;
+ const double sampling_ratio = -1;
+ ceph::shared_mutex glock = ceph::make_shared_mutex("glock");
+ bool all_stop = false; // Accessed in the main thread and in other worker threads under glock
};
SampleDedupWorkerThread(
void SampleDedupWorkerThread::crawl()
{
ObjectCursor current_object = begin;
- std::shared_lock l{glock};
- while (!all_stop && current_object < end) {
- l.unlock();
+ while (!sample_dedup_global.is_all_stop() && current_object < end) {
std::vector<ObjectItem> objects;
// Get the list of object IDs to deduplicate
std::tie(objects, current_object) = get_objects(current_object, end, 100);
} else {
try_dedup_and_accumulate_result(target);
}
- l.lock();
- if (all_stop) {
+ if (sample_dedup_global.is_all_stop()) {
oid_for_evict.clear();
break;
}
- l.unlock();
}
- l.lock();
}
- l.unlock();
vector<AioCompRef> evict_completions(oid_for_evict.size());
int i = 0;
return ret;
}
+unique_ptr<SampleDedupWorkerThread::SampleDedupGlobal> state;
+
int make_crawling_daemon(const po::variables_map &opts)
{
string base_pool_name = get_opts_pool_name(opts);
<< ")"
<< dendl;
- std::shared_lock l(glock);
+ state = std::make_unique<SampleDedupWorkerThread::SampleDedupGlobal>(
+ chunk_dedup_threshold, sampling_ratio, report_period, fp_threshold);
+ ret = 0;
- while (!all_stop) {
- l.unlock();
+ while (!state->is_all_stop()) {
ObjectCursor begin = io_ctx.object_list_begin();
ObjectCursor end = io_ctx.object_list_end();
- SampleDedupWorkerThread::SampleDedupGlobal sample_dedup_global(
- chunk_dedup_threshold, sampling_ratio, report_period, fp_threshold);
-
std::list<SampleDedupWorkerThread> threads;
size_t total_size = 0;
size_t total_duplicate_size = 0;
chunk_size,
fp_algo,
chunk_algo,
- sample_dedup_global,
+ *state,
snap);
threads.back().create("sample_dedup");
}
return -EINVAL;
}
- l.lock();
if (run_once) {
- all_stop = true;
+ assert(state);
+ state->set_all_stop();
break;
}
}
- l.unlock();
dout(0) << "done" << dendl;
- return 0;
+ return ret;
}
static void handle_signal(int signum)
{
- std::unique_lock l{glock};
switch (signum) {
case SIGINT:
case SIGTERM:
- all_stop = true;
+ if (state) {
+ state->set_all_stop();
+ }
dout(0) << "got a signal(" << signum << "), daemon wil be terminiated" << dendl;
break;