From: Yehuda Sadeh Date: Thu, 27 Aug 2015 18:05:47 +0000 (-0700) Subject: rgw: create a worker thread for object expiration X-Git-Tag: v9.1.0~229^2~13 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=faac0b1f164e43ba6bc8c11b7c84734fde30a58b;p=ceph.git rgw: create a worker thread for object expiration Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_object_expirer.cc b/src/rgw/rgw_object_expirer.cc index b492e82eadc..63f4e967db4 100644 --- a/src/rgw/rgw_object_expirer.cc +++ b/src/rgw/rgw_object_expirer.cc @@ -54,11 +54,6 @@ static void usage() generic_server_usage(); } -static inline utime_t get_last_run_time(void) -{ - return utime_t(); -} - int main(const int argc, const char **argv) { vector args; @@ -95,28 +90,15 @@ int main(const int argc, const char **argv) /* Guard to not forget about closing the rados store. */ StoreDestructor store_dtor(store); - utime_t last_run = get_last_run_time(); - ObjectExpirer objexp(store); + RGWObjectExpirer objexp(store); + objexp.start_processor(); + + const utime_t interval(g_ceph_context->_conf->rgw_objexp_gc_interval, 0); while (true) { - const utime_t round_start = ceph_clock_now(g_ceph_context); - objexp.inspect_all_shards(last_run, round_start); - - last_run = round_start; - - /* End of the real work for now. Prepare for sleep. */ - const utime_t round_time = ceph_clock_now(g_ceph_context) - round_start; - const utime_t interval(g_ceph_context->_conf->rgw_objexp_gc_interval, 0); - - if (round_time < interval) { - /* This should be the main path of execution. All currently expired - * objects have been removed and we need go sleep waiting for the next - * turn. If the check isn't true, it means we have to much hints - * in relation to interval time. */ - const utime_t sleep_period = interval - round_time; - dout(20) << "sleeping for " << sleep_period << dendl; - sleep_period.sleep(); - } + interval.sleep(); } + /* unreachable */ + return EXIT_SUCCESS; } diff --git a/src/rgw/rgw_object_expirer_core.cc b/src/rgw/rgw_object_expirer_core.cc index e2cc6b9c206..5ae562ae27c 100644 --- a/src/rgw/rgw_object_expirer_core.cc +++ b/src/rgw/rgw_object_expirer_core.cc @@ -35,7 +35,7 @@ using namespace std; #define dout_subsys ceph_subsys_rgw -int ObjectExpirer::init_bucket_info(const string& bucket_name, +int RGWObjectExpirer::init_bucket_info(const string& bucket_name, const string& bucket_id, RGWBucketInfo& bucket_info) { @@ -48,7 +48,7 @@ int ObjectExpirer::init_bucket_info(const string& bucket_name, return ret; } -int ObjectExpirer::garbage_single_object(objexp_hint_entry& hint) +int RGWObjectExpirer::garbage_single_object(objexp_hint_entry& hint) { RGWBucketInfo bucket_info; @@ -73,7 +73,7 @@ int ObjectExpirer::garbage_single_object(objexp_hint_entry& hint) return ret; } -void ObjectExpirer::garbage_chunk(list& entries, /* in */ +void RGWObjectExpirer::garbage_chunk(list& entries, /* in */ bool& need_trim) /* out */ { need_trim = false; @@ -107,7 +107,7 @@ void ObjectExpirer::garbage_chunk(list& entries, /* in return; } -void ObjectExpirer::trim_chunk(const string& shard, +void RGWObjectExpirer::trim_chunk(const string& shard, const utime_t& from, const utime_t& to) { @@ -121,7 +121,7 @@ void ObjectExpirer::trim_chunk(const string& shard, return; } -void ObjectExpirer::proceed_single_shard(const string& shard, +void RGWObjectExpirer::proceed_single_shard(const string& shard, const utime_t& last_run, const utime_t& round_start) { @@ -152,7 +152,7 @@ void ObjectExpirer::proceed_single_shard(const string& shard, return; } -void ObjectExpirer::inspect_all_shards(const utime_t& last_run, +void RGWObjectExpirer::inspect_all_shards(const utime_t& last_run, const utime_t& round_start) { bool is_next_available; @@ -170,3 +170,62 @@ void ObjectExpirer::inspect_all_shards(const utime_t& last_run, return; } + +bool RGWObjectExpirer::going_down() +{ + return (down_flag.read() != 0); +} + +void RGWObjectExpirer::start_processor() +{ + worker = new OEWorker(store->ctx(), this); + worker->create(); +} + +void RGWObjectExpirer::stop_processor() +{ + down_flag.set(1); + if (worker) { + worker->stop(); + worker->join(); + } + delete worker; + worker = NULL; +} + +void *RGWObjectExpirer::OEWorker::entry() { + utime_t last_run; + do { + utime_t start = ceph_clock_now(cct); + dout(2) << "object expiration: start" << dendl; + oe->inspect_all_shards(last_run, start); + dout(2) << "object expiration: stop" << dendl; + + last_run = start; + + if (oe->going_down()) + break; + + utime_t end = ceph_clock_now(cct); + end -= start; + int secs = cct->_conf->rgw_objexp_gc_interval; + + if (secs <= end.sec()) + continue; // next round + + secs -= end.sec(); + + lock.Lock(); + cond.WaitInterval(cct, lock, utime_t(secs, 0)); + lock.Unlock(); + } while (!oe->going_down()); + + return NULL; +} + +void RGWObjectExpirer::OEWorker::stop() +{ + Mutex::Locker l(lock); + cond.Signal(); +} + diff --git a/src/rgw/rgw_object_expirer_core.h b/src/rgw/rgw_object_expirer_core.h index a1701057a6b..47ef376b33b 100644 --- a/src/rgw/rgw_object_expirer_core.h +++ b/src/rgw/rgw_object_expirer_core.h @@ -18,6 +18,10 @@ #include "common/Formatter.h" #include "common/errno.h" +#include "common/Mutex.h" +#include "common/Cond.h" +#include "common/Thread.h" + #include "global/global_init.h" #include "include/utime.h" @@ -33,7 +37,7 @@ #include "rgw_usage.h" #include "rgw_replica_log.h" -class ObjectExpirer { +class RGWObjectExpirer { protected: RGWRados * const store; @@ -41,8 +45,23 @@ protected: const string& bucket_id, RGWBucketInfo& bucket_info); + class OEWorker : public Thread { + CephContext *cct; + RGWObjectExpirer *oe; + Mutex lock; + Cond cond; + + public: + OEWorker(CephContext *_cct, RGWObjectExpirer *_oe) : cct(_cct), oe(_oe), lock("OEWorker") {} + void *entry(); + void stop(); + }; + + OEWorker *worker; + atomic_t down_flag; + public: - ObjectExpirer(RGWRados * const _store) + RGWObjectExpirer(RGWRados * const _store) : store(_store) {} @@ -61,5 +80,9 @@ public: void inspect_all_shards(const utime_t& last_run, const utime_t& round_start); + + bool going_down(); + void start_processor(); + void stop_processor(); }; #endif /* CEPH_OBJEXP_H */