]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: create a worker thread for object expiration
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 27 Aug 2015 18:05:47 +0000 (11:05 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Thu, 27 Aug 2015 18:05:47 +0000 (11:05 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_object_expirer.cc
src/rgw/rgw_object_expirer_core.cc
src/rgw/rgw_object_expirer_core.h

index b492e82eadc87b70454bb9b6ec7945ffa9a7994a..63f4e967db47a3e58713825eaa186366cf3d6797 100644 (file)
@@ -54,11 +54,6 @@ static void usage()
   generic_server_usage();
 }
 
-static inline utime_t get_last_run_time(void)
-{
-  return utime_t();
-}
-
 int main(const int argc, const char **argv)
 {
   vector<const char *> args;
@@ -95,28 +90,15 @@ int main(const int argc, const char **argv)
   /* Guard to not forget about closing the rados store. */
   StoreDestructor store_dtor(store);
 
-  utime_t last_run = get_last_run_time();
-  ObjectExpirer objexp(store);
+  RGWObjectExpirer objexp(store);
+  objexp.start_processor();
+
+  const utime_t interval(g_ceph_context->_conf->rgw_objexp_gc_interval, 0);
   while (true) {
-    const utime_t round_start = ceph_clock_now(g_ceph_context);
-    objexp.inspect_all_shards(last_run, round_start);
-
-    last_run = round_start;
-
-    /* End of the real work for now. Prepare for sleep. */
-    const utime_t round_time = ceph_clock_now(g_ceph_context) - round_start;
-    const utime_t interval(g_ceph_context->_conf->rgw_objexp_gc_interval, 0);
-
-    if (round_time < interval) {
-      /* This should be the main path of execution. All currently expired
-       * objects have been removed and we need go sleep waiting for the next
-       * turn. If the check isn't true, it means we have to much hints
-       * in relation to interval time. */
-      const utime_t sleep_period = interval - round_time;
-      dout(20) << "sleeping for " << sleep_period << dendl;
-      sleep_period.sleep();
-    }
+    interval.sleep();
   }
 
+  /* unreachable */
+
   return EXIT_SUCCESS;
 }
index e2cc6b9c206c4c0d12a3b90f2e3c340112fe766e..5ae562ae27c72568a2278171c4b9b3a2034cb285 100644 (file)
@@ -35,7 +35,7 @@ using namespace std;
 
 #define dout_subsys ceph_subsys_rgw
 
-int ObjectExpirer::init_bucket_info(const string& bucket_name,
+int RGWObjectExpirer::init_bucket_info(const string& bucket_name,
                                     const string& bucket_id,
                                     RGWBucketInfo& bucket_info)
 {
@@ -48,7 +48,7 @@ int ObjectExpirer::init_bucket_info(const string& bucket_name,
   return ret;
 }
 
-int ObjectExpirer::garbage_single_object(objexp_hint_entry& hint)
+int RGWObjectExpirer::garbage_single_object(objexp_hint_entry& hint)
 {
   RGWBucketInfo bucket_info;
 
@@ -73,7 +73,7 @@ int ObjectExpirer::garbage_single_object(objexp_hint_entry& hint)
   return ret;
 }
 
-void ObjectExpirer::garbage_chunk(list<cls_timeindex_entry>& entries,      /* in  */
+void RGWObjectExpirer::garbage_chunk(list<cls_timeindex_entry>& entries,      /* in  */
                                   bool& need_trim)                         /* out */
 {
   need_trim = false;
@@ -107,7 +107,7 @@ void ObjectExpirer::garbage_chunk(list<cls_timeindex_entry>& entries,      /* in
   return;
 }
 
-void ObjectExpirer::trim_chunk(const string& shard,
+void RGWObjectExpirer::trim_chunk(const string& shard,
                                const utime_t& from,
                                const utime_t& to)
 {
@@ -121,7 +121,7 @@ void ObjectExpirer::trim_chunk(const string& shard,
   return;
 }
 
-void ObjectExpirer::proceed_single_shard(const string& shard,
+void RGWObjectExpirer::proceed_single_shard(const string& shard,
                                          const utime_t& last_run,
                                          const utime_t& round_start)
 {
@@ -152,7 +152,7 @@ void ObjectExpirer::proceed_single_shard(const string& shard,
   return;
 }
 
-void ObjectExpirer::inspect_all_shards(const utime_t& last_run,
+void RGWObjectExpirer::inspect_all_shards(const utime_t& last_run,
                                        const utime_t& round_start)
 {
   bool is_next_available;
@@ -170,3 +170,62 @@ void ObjectExpirer::inspect_all_shards(const utime_t& last_run,
 
   return;
 }
+
+bool RGWObjectExpirer::going_down()
+{
+  return (down_flag.read() != 0);
+}
+
+void RGWObjectExpirer::start_processor()
+{
+  worker = new OEWorker(store->ctx(), this);
+  worker->create();
+}
+
+void RGWObjectExpirer::stop_processor()
+{
+  down_flag.set(1);
+  if (worker) {
+    worker->stop();
+    worker->join();
+  }
+  delete worker;
+  worker = NULL;
+}
+
+void *RGWObjectExpirer::OEWorker::entry() {
+  utime_t last_run;
+  do {
+    utime_t start = ceph_clock_now(cct);
+    dout(2) << "object expiration: start" << dendl;
+    oe->inspect_all_shards(last_run, start);
+    dout(2) << "object expiration: stop" << dendl;
+
+    last_run = start;
+
+    if (oe->going_down())
+      break;
+
+    utime_t end = ceph_clock_now(cct);
+    end -= start;
+    int secs = cct->_conf->rgw_objexp_gc_interval;
+
+    if (secs <= end.sec())
+      continue; // next round
+
+    secs -= end.sec();
+
+    lock.Lock();
+    cond.WaitInterval(cct, lock, utime_t(secs, 0));
+    lock.Unlock();
+  } while (!oe->going_down());
+
+  return NULL;
+}
+
+void RGWObjectExpirer::OEWorker::stop()
+{
+  Mutex::Locker l(lock);
+  cond.Signal();
+}
+
index a1701057a6bce3e4ab453708b741e82f03c1c459..47ef376b33bb8debbea1b47d2e909408c370f424 100644 (file)
 #include "common/Formatter.h"
 #include "common/errno.h"
 
+#include "common/Mutex.h"
+#include "common/Cond.h"
+#include "common/Thread.h"
+
 #include "global/global_init.h"
 
 #include "include/utime.h"
@@ -33,7 +37,7 @@
 #include "rgw_usage.h"
 #include "rgw_replica_log.h"
 
-class ObjectExpirer {
+class RGWObjectExpirer {
 protected:
   RGWRados * const store;
 
@@ -41,8 +45,23 @@ protected:
                        const string& bucket_id,
                        RGWBucketInfo& bucket_info);
 
+  class OEWorker : public Thread {
+    CephContext *cct;
+    RGWObjectExpirer *oe;
+    Mutex lock;
+    Cond cond;
+
+  public:
+    OEWorker(CephContext *_cct, RGWObjectExpirer *_oe) : cct(_cct), oe(_oe), lock("OEWorker") {}
+    void *entry();
+    void stop();
+  };
+
+  OEWorker *worker;
+  atomic_t down_flag;
+
 public:
-  ObjectExpirer(RGWRados * const _store)
+  RGWObjectExpirer(RGWRados * const _store)
     : store(_store)
   {}
 
@@ -61,5 +80,9 @@ public:
 
   void inspect_all_shards(const utime_t& last_run,
                           const utime_t& round_start);
+
+  bool going_down();
+  void start_processor();
+  void stop_processor();
 };
 #endif /* CEPH_OBJEXP_H */