From: Yehuda Sadeh Date: Fri, 10 Jan 2014 18:42:13 +0000 (-0800) Subject: rgw: refactor quota cache X-Git-Tag: v0.78~270^2~26 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=92a87b3dfd5dcfeb8e6b801b909749f7a49115d2;p=ceph.git rgw: refactor quota cache bucket quota cache is going to be reused, add a new abstract layer. Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_quota.cc b/src/rgw/rgw_quota.cc index 5c16c7895d16..828199bd31b2 100644 --- a/src/rgw/rgw_quota.cc +++ b/src/rgw/rgw_quota.cc @@ -24,38 +24,76 @@ #define dout_subsys ceph_subsys_rgw -struct RGWQuotaBucketStats { +struct RGWQuotaCacheStats { RGWStorageStats stats; utime_t expiration; utime_t async_refresh_time; }; -class RGWBucketStatsCache { +template +class RGWQuotaCache { +protected: RGWRados *store; - lru_map stats_map; + lru_map stats_map; RefCountedWaitObject *async_refcount; - int fetch_bucket_totals(rgw_bucket& bucket, RGWStorageStats& stats); + class StatsAsyncTestSet : public lru_map::UpdateContext { + int objs_delta; + uint64_t added_bytes; + uint64_t removed_bytes; + public: + StatsAsyncTestSet() {} + bool update(RGWQuotaCacheStats *entry) { + if (entry->async_refresh_time.sec() == 0) + return false; + entry->async_refresh_time = utime_t(0, 0); + + return true; + } + }; + + virtual int fetch_stats_from_storage(const string& user, rgw_bucket& bucket, RGWStorageStats& stats) = 0; + + virtual bool map_find(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0; + virtual bool map_find_and_update(const string& user, rgw_bucket& bucket, lru_map::UpdateContext *ctx) = 0; + virtual void map_add(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0; public: - RGWBucketStatsCache(RGWRados *_store) : store(_store), stats_map(store->ctx()->_conf->rgw_bucket_quota_cache_size) { + RGWQuotaCache(RGWRados *_store, int size) : store(_store), stats_map(size) { async_refcount = new RefCountedWaitObject; } - ~RGWBucketStatsCache() { + virtual ~RGWQuotaCache() { async_refcount->put_wait(); /* wait for all pending async requests to complete */ } - int get_bucket_stats(rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota); - void adjust_bucket_stats(const string& bucket_owner, rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes); + int get_stats(const string& user, rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota); + void adjust_stats(const string& user, rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes); bool can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& stats); - void set_stats(rgw_bucket& bucket, RGWQuotaBucketStats& qs, RGWStorageStats& stats); - int async_refresh(rgw_bucket& bucket, RGWQuotaBucketStats& qs); - void async_refresh_response(rgw_bucket& bucket, RGWStorageStats& stats); + void set_stats(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats); + int async_refresh(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs); + void async_refresh_response(const string& user, rgw_bucket& bucket, RGWStorageStats& stats); + + class AsyncRefreshHandler : public RGWGetBucketStats_CB { + protected: + RGWRados *store; + RGWQuotaCache *cache; + string user; + public: + AsyncRefreshHandler(RGWRados *_store, RGWQuotaCache *_cache, + const string& _user, rgw_bucket& _bucket) : RGWGetBucketStats_CB(_bucket), store(_store), + cache(_cache), user(_user) {} + + virtual int init_fetch() = 0; + virtual void handle_response(int r) = 0; + }; + + virtual AsyncRefreshHandler *allocate_refresh_handler(const string& user, rgw_bucket& bucket) = 0; }; -bool RGWBucketStatsCache::can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& cached_stats) +template +bool RGWQuotaCache::can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& cached_stats) { if (quota.max_size_kb >= 0) { if (quota.max_size_soft_threshold < 0) { @@ -84,7 +122,42 @@ bool RGWBucketStatsCache::can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageSt return true; } -int RGWBucketStatsCache::fetch_bucket_totals(rgw_bucket& bucket, RGWStorageStats& stats) +class BucketAsyncRefreshHandler : public RGWQuotaCache::AsyncRefreshHandler { +public: + BucketAsyncRefreshHandler(RGWRados *_store, RGWQuotaCache *_cache, + const string& _user, rgw_bucket& _bucket) : + RGWQuotaCache::AsyncRefreshHandler(_store, _cache, _user, _bucket) {} + + int init_fetch(); + void handle_response(int r); +}; + +class RGWBucketStatsCache : public RGWQuotaCache { +protected: + bool map_find(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) { + return stats_map.find(bucket, qs); + } + + bool map_find_and_update(const string& user, rgw_bucket& bucket, lru_map::UpdateContext *ctx) { + return stats_map.find_and_update(bucket, NULL, ctx); + } + + void map_add(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) { + stats_map.add(bucket, qs); + } + + int fetch_stats_from_storage(const string& user, rgw_bucket& bucket, RGWStorageStats& stats); + +public: + RGWBucketStatsCache(RGWRados *_store) : RGWQuotaCache(store, store->ctx()->_conf->rgw_bucket_quota_cache_size) { + } + + AsyncRefreshHandler *allocate_refresh_handler(const string& user, rgw_bucket& bucket) { + return new BucketAsyncRefreshHandler(store, this, user, bucket); + } +}; + +int RGWBucketStatsCache::fetch_stats_from_storage(const string& user, rgw_bucket& bucket, RGWStorageStats& stats) { RGWBucketInfo bucket_info; @@ -111,19 +184,7 @@ int RGWBucketStatsCache::fetch_bucket_totals(rgw_bucket& bucket, RGWStorageStats return 0; } -class AsyncRefreshHandler : public RGWGetBucketStats_CB { - RGWRados *store; - RGWBucketStatsCache *cache; -public: - AsyncRefreshHandler(RGWRados *_store, RGWBucketStatsCache *_cache, rgw_bucket& _bucket) : RGWGetBucketStats_CB(_bucket), store(_store), cache(_cache) {} - - int init_fetch(); - - void handle_response(int r); -}; - - -int AsyncRefreshHandler::init_fetch() +int BucketAsyncRefreshHandler::init_fetch() { ldout(store->ctx(), 20) << "initiating async quota refresh for bucket=" << bucket << dendl; map bucket_stats; @@ -138,7 +199,7 @@ int AsyncRefreshHandler::init_fetch() return 0; } -void AsyncRefreshHandler::handle_response(int r) +void BucketAsyncRefreshHandler::handle_response(int r) { if (r < 0) { ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl; @@ -155,37 +216,23 @@ void AsyncRefreshHandler::handle_response(int r) bs.num_objects += s.num_objects; } - cache->async_refresh_response(bucket, bs); + cache->async_refresh_response(user, bucket, bs); } -class RGWBucketStatsAsyncTestSet : public lru_map::UpdateContext { - int objs_delta; - uint64_t added_bytes; - uint64_t removed_bytes; -public: - RGWBucketStatsAsyncTestSet() {} - bool update(RGWQuotaBucketStats *entry) { - if (entry->async_refresh_time.sec() == 0) - return false; - - entry->async_refresh_time = utime_t(0, 0); - - return true; - } -}; - -int RGWBucketStatsCache::async_refresh(rgw_bucket& bucket, RGWQuotaBucketStats& qs) +template +int RGWQuotaCache::async_refresh(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) { /* protect against multiple updates */ - RGWBucketStatsAsyncTestSet test_update; - if (!stats_map.find_and_update(bucket, NULL, &test_update)) { + StatsAsyncTestSet test_update; + if (!map_find_and_update(user, bucket, &test_update)) { /* most likely we just raced with another update */ return 0; } async_refcount->get(); - AsyncRefreshHandler *handler = new AsyncRefreshHandler(store, this, bucket); + + AsyncRefreshHandler *handler = allocate_refresh_handler(user, bucket); int ret = handler->init_fetch(); if (ret < 0) { @@ -197,20 +244,22 @@ int RGWBucketStatsCache::async_refresh(rgw_bucket& bucket, RGWQuotaBucketStats& return 0; } -void RGWBucketStatsCache::async_refresh_response(rgw_bucket& bucket, RGWStorageStats& stats) +template +void RGWQuotaCache::async_refresh_response(const string& user, rgw_bucket& bucket, RGWStorageStats& stats) { ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl; - RGWQuotaBucketStats qs; + RGWQuotaCacheStats qs; - stats_map.find(bucket, qs); + map_find(user, bucket, qs); - set_stats(bucket, qs, stats); + set_stats(user, bucket, qs, stats); async_refcount->put(); } -void RGWBucketStatsCache::set_stats(rgw_bucket& bucket, RGWQuotaBucketStats& qs, RGWStorageStats& stats) +template +void RGWQuotaCache::set_stats(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats) { qs.stats = stats; qs.expiration = ceph_clock_now(store->ctx()); @@ -218,15 +267,16 @@ void RGWBucketStatsCache::set_stats(rgw_bucket& bucket, RGWQuotaBucketStats& qs, qs.expiration += store->ctx()->_conf->rgw_bucket_quota_ttl; qs.async_refresh_time += store->ctx()->_conf->rgw_bucket_quota_ttl / 2; - stats_map.add(bucket, qs); + map_add(user, bucket, qs); } -int RGWBucketStatsCache::get_bucket_stats(rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota) { - RGWQuotaBucketStats qs; +template +int RGWQuotaCache::get_stats(const string& user, rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota) { + RGWQuotaCacheStats qs; utime_t now = ceph_clock_now(store->ctx()); - if (stats_map.find(bucket, qs)) { + if (map_find(user, bucket, qs)) { if (qs.async_refresh_time.sec() > 0 && now >= qs.async_refresh_time) { - int r = async_refresh(bucket, qs); + int r = async_refresh(user, bucket, qs); if (r < 0) { ldout(store->ctx(), 0) << "ERROR: quota async refresh returned ret=" << r << dendl; @@ -240,24 +290,24 @@ int RGWBucketStatsCache::get_bucket_stats(rgw_bucket& bucket, RGWStorageStats& s } } - int ret = fetch_bucket_totals(bucket, stats); + int ret = fetch_stats_from_storage(user, bucket, stats); if (ret < 0 && ret != -ENOENT) return ret; - set_stats(bucket, qs, stats); + set_stats(user, bucket, qs, stats); return 0; } -class RGWBucketStatsUpdate : public lru_map::UpdateContext { +class RGWQuotaStatsUpdate : public lru_map::UpdateContext { int objs_delta; uint64_t added_bytes; uint64_t removed_bytes; public: - RGWBucketStatsUpdate(int _objs_delta, uint64_t _added_bytes, uint64_t _removed_bytes) : + RGWQuotaStatsUpdate(int _objs_delta, uint64_t _added_bytes, uint64_t _removed_bytes) : objs_delta(_objs_delta), added_bytes(_added_bytes), removed_bytes(_removed_bytes) {} - bool update(RGWQuotaBucketStats *entry) { + bool update(RGWQuotaCacheStats *entry) { uint64_t rounded_kb_added = rgw_rounded_kb(added_bytes); uint64_t rounded_kb_removed = rgw_rounded_kb(removed_bytes); @@ -270,10 +320,12 @@ public: }; -void RGWBucketStatsCache::adjust_bucket_stats(const string& bucket_owner, rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes) +template +void RGWQuotaCache::adjust_stats(const string& user, rgw_bucket& bucket, int objs_delta, + uint64_t added_bytes, uint64_t removed_bytes) { - RGWBucketStatsUpdate update(objs_delta, added_bytes, removed_bytes); - stats_map.find_and_update(bucket, NULL, &update); + RGWQuotaStatsUpdate update(objs_delta, added_bytes, removed_bytes); + map_find_and_update(user, bucket, &update); } @@ -282,7 +334,7 @@ class RGWQuotaHandlerImpl : public RGWQuotaHandler { RGWBucketStatsCache stats_cache; public: RGWQuotaHandlerImpl(RGWRados *_store) : store(_store), stats_cache(_store) {} - virtual int check_quota(const string& bucket_owner, rgw_bucket& bucket, RGWQuotaInfo& bucket_quota, + virtual int check_quota(const string& user, rgw_bucket& bucket, RGWQuotaInfo& bucket_quota, uint64_t num_objs, uint64_t size) { uint64_t size_kb = rgw_rounded_kb(size); if (!bucket_quota.enabled) { @@ -291,7 +343,7 @@ public: RGWStorageStats stats; - int ret = stats_cache.get_bucket_stats(bucket, stats, bucket_quota); + int ret = stats_cache.get_stats(user, bucket, stats, bucket_quota); if (ret < 0) return ret; @@ -315,8 +367,8 @@ public: return 0; } - virtual void update_stats(const string& bucket_owner, rgw_bucket& bucket, int obj_delta, uint64_t added_bytes, uint64_t removed_bytes) { - stats_cache.adjust_bucket_stats(bucket_owner, bucket, obj_delta, added_bytes, removed_bytes); + virtual void update_stats(const string& user, rgw_bucket& bucket, int obj_delta, uint64_t added_bytes, uint64_t removed_bytes) { + stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes); }; };