generic_server_usage();
}
-static inline utime_t get_last_run_time(void)
-{
- return utime_t();
-}
-
int main(const int argc, const char **argv)
{
vector<const char *> args;
/* Guard to not forget about closing the rados store. */
StoreDestructor store_dtor(store);
- utime_t last_run = get_last_run_time();
- ObjectExpirer objexp(store);
+ RGWObjectExpirer objexp(store);
+ objexp.start_processor();
+
+ const utime_t interval(g_ceph_context->_conf->rgw_objexp_gc_interval, 0);
while (true) {
- const utime_t round_start = ceph_clock_now(g_ceph_context);
- objexp.inspect_all_shards(last_run, round_start);
-
- last_run = round_start;
-
- /* End of the real work for now. Prepare for sleep. */
- const utime_t round_time = ceph_clock_now(g_ceph_context) - round_start;
- const utime_t interval(g_ceph_context->_conf->rgw_objexp_gc_interval, 0);
-
- if (round_time < interval) {
- /* This should be the main path of execution. All currently expired
- * objects have been removed and we need go sleep waiting for the next
- * turn. If the check isn't true, it means we have to much hints
- * in relation to interval time. */
- const utime_t sleep_period = interval - round_time;
- dout(20) << "sleeping for " << sleep_period << dendl;
- sleep_period.sleep();
- }
+ interval.sleep();
}
+ /* unreachable */
+
return EXIT_SUCCESS;
}
#define dout_subsys ceph_subsys_rgw
-int ObjectExpirer::init_bucket_info(const string& bucket_name,
+int RGWObjectExpirer::init_bucket_info(const string& bucket_name,
const string& bucket_id,
RGWBucketInfo& bucket_info)
{
return ret;
}
-int ObjectExpirer::garbage_single_object(objexp_hint_entry& hint)
+int RGWObjectExpirer::garbage_single_object(objexp_hint_entry& hint)
{
RGWBucketInfo bucket_info;
return ret;
}
-void ObjectExpirer::garbage_chunk(list<cls_timeindex_entry>& entries, /* in */
+void RGWObjectExpirer::garbage_chunk(list<cls_timeindex_entry>& entries, /* in */
bool& need_trim) /* out */
{
need_trim = false;
return;
}
-void ObjectExpirer::trim_chunk(const string& shard,
+void RGWObjectExpirer::trim_chunk(const string& shard,
const utime_t& from,
const utime_t& to)
{
return;
}
-void ObjectExpirer::proceed_single_shard(const string& shard,
+void RGWObjectExpirer::proceed_single_shard(const string& shard,
const utime_t& last_run,
const utime_t& round_start)
{
return;
}
-void ObjectExpirer::inspect_all_shards(const utime_t& last_run,
+void RGWObjectExpirer::inspect_all_shards(const utime_t& last_run,
const utime_t& round_start)
{
bool is_next_available;
return;
}
+
+bool RGWObjectExpirer::going_down()
+{
+ return (down_flag.read() != 0);
+}
+
+void RGWObjectExpirer::start_processor()
+{
+ worker = new OEWorker(store->ctx(), this);
+ worker->create();
+}
+
+void RGWObjectExpirer::stop_processor()
+{
+ down_flag.set(1);
+ if (worker) {
+ worker->stop();
+ worker->join();
+ }
+ delete worker;
+ worker = NULL;
+}
+
+void *RGWObjectExpirer::OEWorker::entry() {
+ utime_t last_run;
+ do {
+ utime_t start = ceph_clock_now(cct);
+ dout(2) << "object expiration: start" << dendl;
+ oe->inspect_all_shards(last_run, start);
+ dout(2) << "object expiration: stop" << dendl;
+
+ last_run = start;
+
+ if (oe->going_down())
+ break;
+
+ utime_t end = ceph_clock_now(cct);
+ end -= start;
+ int secs = cct->_conf->rgw_objexp_gc_interval;
+
+ if (secs <= end.sec())
+ continue; // next round
+
+ secs -= end.sec();
+
+ lock.Lock();
+ cond.WaitInterval(cct, lock, utime_t(secs, 0));
+ lock.Unlock();
+ } while (!oe->going_down());
+
+ return NULL;
+}
+
+void RGWObjectExpirer::OEWorker::stop()
+{
+ Mutex::Locker l(lock);
+ cond.Signal();
+}
+
#include "common/Formatter.h"
#include "common/errno.h"
+#include "common/Mutex.h"
+#include "common/Cond.h"
+#include "common/Thread.h"
+
#include "global/global_init.h"
#include "include/utime.h"
#include "rgw_usage.h"
#include "rgw_replica_log.h"
-class ObjectExpirer {
+class RGWObjectExpirer {
protected:
RGWRados * const store;
const string& bucket_id,
RGWBucketInfo& bucket_info);
+ class OEWorker : public Thread {
+ CephContext *cct;
+ RGWObjectExpirer *oe;
+ Mutex lock;
+ Cond cond;
+
+ public:
+ OEWorker(CephContext *_cct, RGWObjectExpirer *_oe) : cct(_cct), oe(_oe), lock("OEWorker") {}
+ void *entry();
+ void stop();
+ };
+
+ OEWorker *worker;
+ atomic_t down_flag;
+
public:
- ObjectExpirer(RGWRados * const _store)
+ RGWObjectExpirer(RGWRados * const _store)
: store(_store)
{}
void inspect_all_shards(const utime_t& last_run,
const utime_t& round_start);
+
+ bool going_down();
+ void start_processor();
+ void stop_processor();
};
#endif /* CEPH_OBJEXP_H */