From 6f34df45ed85ffa2b89e3c47248a31b9993cd235 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Mon, 13 Jan 2014 21:50:32 -0800 Subject: [PATCH] rgw: sync bucket stats thread a new thread that periodically sync stats of recently modified buckets. Signed-off-by: Yehuda Sadeh --- src/common/config_opts.h | 1 + src/rgw/rgw_quota.cc | 130 ++++++++++++++++++++++++++++++++++----- 2 files changed, 116 insertions(+), 15 deletions(-) diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 0d03809777759..7a8c24cf88512 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -730,6 +730,7 @@ OPTION(rgw_replica_log_obj_prefix, OPT_STR, "replica_log") // OPTION(rgw_bucket_quota_ttl, OPT_INT, 600) // time for cached bucket stats to be cached within rgw instance OPTION(rgw_bucket_quota_soft_threshold, OPT_DOUBLE, 0.95) // threshold from which we don't rely on cached info for quota decisions OPTION(rgw_bucket_quota_cache_size, OPT_INT, 10000) // number of entries in bucket quota cache +OPTION(rgw_user_quota_bucket_sync_interval, OPT_INT, 180) // time period for accumulating modified buckets before syncing stats OPTION(mutex_perf_counter, OPT_BOOL, false) // enable/disable mutex perf counter diff --git a/src/rgw/rgw_quota.cc b/src/rgw/rgw_quota.cc index 78498bcd5328f..2854c312c7e46 100644 --- a/src/rgw/rgw_quota.cc +++ b/src/rgw/rgw_quota.cc @@ -16,10 +16,14 @@ #include "include/utime.h" #include "common/lru_map.h" #include "common/RefCountedObj.h" +#include "common/Thread.h" +#include "common/Mutex.h" +#include "common/RWLock.h" #include "rgw_common.h" #include "rgw_rados.h" #include "rgw_quota.h" +#include "rgw_bucket.h" #define dout_subsys ceph_subsys_rgw @@ -60,7 +64,7 @@ protected: virtual bool map_find_and_update(const string& user, rgw_bucket& bucket, typename lru_map::UpdateContext *ctx) = 0; virtual void map_add(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0; - virtual int handle_set_stats(const string& user, rgw_bucket& bucket, RGWStorageStats& stats) { return 0; } + virtual void data_modified(const string& user, rgw_bucket& bucket) {} public: RGWQuotaCache(RGWRados *_store, int size) : store(_store), stats_map(size) { async_refcount = new RefCountedWaitObject; @@ -172,11 +176,6 @@ void RGWQuotaCache::set_stats(const string& user, rgw_bucket& bucket, RGWQuot qs.async_refresh_time += store->ctx()->_conf->rgw_bucket_quota_ttl / 2; map_add(user, bucket, qs); - - int ret = handle_set_stats(user, bucket, stats); - if (ret < 0) { - /* can't really do much about it, user stats are going to be off a bit for now */ - } } template @@ -236,6 +235,8 @@ void RGWQuotaCache::adjust_stats(const string& user, rgw_bucket& bucket, int { RGWQuotaStatsUpdate update(objs_delta, added_bytes, removed_bytes); map_find_and_update(user, bucket, &update); + + data_modified(user, bucket); } class BucketAsyncRefreshHandler : public RGWQuotaCache::AsyncRefreshHandler, @@ -302,14 +303,6 @@ protected: } int fetch_stats_from_storage(const string& user, rgw_bucket& bucket, RGWStorageStats& stats); - int handle_set_stats(const string& user, rgw_bucket& bucket, RGWStorageStats& stats) { - int ret = store->update_user_bucket_stats(user, bucket, stats); - if (ret < 0) { - derr << "ERROR: store->update_bucket_stats() returned " << ret << dendl; - return ret; - } - return 0; - } public: RGWBucketStatsCache(RGWRados *_store) : RGWQuotaCache(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size) { @@ -387,6 +380,57 @@ void UserAsyncRefreshHandler::handle_response(int r) } class RGWUserStatsCache : public RGWQuotaCache { + atomic_t down_flag; + RWLock rwlock; + map modified_buckets; + + /* thread, sync recent modified buckets info */ + class BucketsSyncThread : public Thread { + CephContext *cct; + RGWUserStatsCache *stats; + + Mutex lock; + Cond cond; + public: + + BucketsSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s), lock("RGWUserStatsCache::BucketsSyncThread") {} + + void *entry() { + ldout(cct, 20) << "BucketsSyncThread: start" << dendl; + do { + map buckets; + + stats->swap_modified_buckets(buckets); + + for (map::iterator iter = buckets.begin(); iter != buckets.end(); ++iter) { + rgw_bucket bucket = iter->first; + string& user = iter->second; + ldout(cct, 20) << "BucketsSyncThread: sync user=" << user << " bucket=" << bucket << dendl; + int r = stats->sync_bucket(user, bucket); + if (r < 0) { + ldout(cct, 0) << "WARNING: sync_bucket() returned r=" << r << dendl; + } + } + + if (stats->going_down()) + break; + + lock.Lock(); + cond.WaitInterval(cct, lock, utime_t(cct->_conf->rgw_user_quota_bucket_sync_interval, 0)); + lock.Unlock(); + } while (!stats->going_down()); + ldout(cct, 20) << "BucketsSyncThread: done" << dendl; + + return NULL; + } + + void stop() { + Mutex::Locker l(lock); + cond.Signal(); + } + }; + + BucketsSyncThread *buckets_sync_thread; protected: bool map_find(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) { return stats_map.find(user, qs); @@ -401,9 +445,24 @@ protected: } int fetch_stats_from_storage(const string& user, rgw_bucket& bucket, RGWStorageStats& stats); + int sync_bucket(const string& user, rgw_bucket& bucket); + + void data_modified(const string& user, rgw_bucket& bucket); + + void swap_modified_buckets(map& out) { + rwlock.get_write(); + modified_buckets.swap(out); + rwlock.unlock(); + } public: - RGWUserStatsCache(RGWRados *_store) : RGWQuotaCache(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size) { + RGWUserStatsCache(RGWRados *_store) : RGWQuotaCache(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size), + rwlock("RGWUserStatsCache::rwlock") { + buckets_sync_thread = new BucketsSyncThread(store->ctx(), this); + buckets_sync_thread->create(); + } + ~RGWUserStatsCache() { + stop(); } AsyncRefreshHandler *allocate_refresh_handler(const string& user, rgw_bucket& bucket) { @@ -416,6 +475,22 @@ public: */ return true; } + + bool going_down() { + return (down_flag.read() != 0); + } + + void stop() { + down_flag.set(1); + rwlock.get_write(); + if (buckets_sync_thread) { + buckets_sync_thread->stop(); + buckets_sync_thread->join(); + delete buckets_sync_thread; + buckets_sync_thread = NULL; + } + rwlock.unlock(); + } }; int RGWUserStatsCache::fetch_stats_from_storage(const string& user, rgw_bucket& bucket, RGWStorageStats& stats) @@ -429,6 +504,31 @@ int RGWUserStatsCache::fetch_stats_from_storage(const string& user, rgw_bucket& return 0; } +int RGWUserStatsCache::sync_bucket(const string& user, rgw_bucket& bucket) +{ + int r = rgw_bucket_sync_user_stats(store, user, bucket); + if (r < 0) { + ldout(store->ctx(), 0) << "ERROR: rgw_bucket_sync_user_stats() for user=" << user << ", bucket=" << bucket << " returned " << r << dendl; + return r; + } + + return 0; +} + +void RGWUserStatsCache::data_modified(const string& user, rgw_bucket& bucket) +{ + /* racy, but it's ok */ + rwlock.get_read(); + bool need_update = modified_buckets.find(bucket) == modified_buckets.end(); + rwlock.unlock(); + + if (need_update) { + rwlock.get_write(); + modified_buckets[bucket] = user; + rwlock.unlock(); + } +} + class RGWQuotaHandlerImpl : public RGWQuotaHandler { RGWRados *store; -- 2.39.5