#define dout_subsys ceph_subsys_rgw
-struct RGWQuotaBucketStats {
+struct RGWQuotaCacheStats {
RGWStorageStats stats;
utime_t expiration;
utime_t async_refresh_time;
};
-class RGWBucketStatsCache {
+template<class T>
+class RGWQuotaCache {
+protected:
RGWRados *store;
- lru_map<rgw_bucket, RGWQuotaBucketStats> stats_map;
+ lru_map<T, RGWQuotaCacheStats> stats_map;
RefCountedWaitObject *async_refcount;
- int fetch_bucket_totals(rgw_bucket& bucket, RGWStorageStats& stats);
+ class StatsAsyncTestSet : public lru_map<rgw_bucket, RGWQuotaCacheStats>::UpdateContext {
+ int objs_delta;
+ uint64_t added_bytes;
+ uint64_t removed_bytes;
+ public:
+ StatsAsyncTestSet() {}
+ bool update(RGWQuotaCacheStats *entry) {
+ if (entry->async_refresh_time.sec() == 0)
+ return false;
+ entry->async_refresh_time = utime_t(0, 0);
+
+ return true;
+ }
+ };
+
+ virtual int fetch_stats_from_storage(const string& user, rgw_bucket& bucket, RGWStorageStats& stats) = 0;
+
+ virtual bool map_find(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
+ virtual bool map_find_and_update(const string& user, rgw_bucket& bucket, lru_map<rgw_bucket, RGWQuotaCacheStats>::UpdateContext *ctx) = 0;
+ virtual void map_add(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
public:
- RGWBucketStatsCache(RGWRados *_store) : store(_store), stats_map(store->ctx()->_conf->rgw_bucket_quota_cache_size) {
+ RGWQuotaCache(RGWRados *_store, int size) : store(_store), stats_map(size) {
async_refcount = new RefCountedWaitObject;
}
- ~RGWBucketStatsCache() {
+ virtual ~RGWQuotaCache() {
async_refcount->put_wait(); /* wait for all pending async requests to complete */
}
- int get_bucket_stats(rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota);
- void adjust_bucket_stats(const string& bucket_owner, rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes);
+ int get_stats(const string& user, rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota);
+ void adjust_stats(const string& user, rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes);
bool can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& stats);
- void set_stats(rgw_bucket& bucket, RGWQuotaBucketStats& qs, RGWStorageStats& stats);
- int async_refresh(rgw_bucket& bucket, RGWQuotaBucketStats& qs);
- void async_refresh_response(rgw_bucket& bucket, RGWStorageStats& stats);
+ void set_stats(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats);
+ int async_refresh(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs);
+ void async_refresh_response(const string& user, rgw_bucket& bucket, RGWStorageStats& stats);
+
+ class AsyncRefreshHandler : public RGWGetBucketStats_CB {
+ protected:
+ RGWRados *store;
+ RGWQuotaCache<T> *cache;
+ string user;
+ public:
+ AsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<T> *_cache,
+ const string& _user, rgw_bucket& _bucket) : RGWGetBucketStats_CB(_bucket), store(_store),
+ cache(_cache), user(_user) {}
+
+ virtual int init_fetch() = 0;
+ virtual void handle_response(int r) = 0;
+ };
+
+ virtual AsyncRefreshHandler *allocate_refresh_handler(const string& user, rgw_bucket& bucket) = 0;
};
-bool RGWBucketStatsCache::can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& cached_stats)
+template<class T>
+bool RGWQuotaCache<T>::can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& cached_stats)
{
if (quota.max_size_kb >= 0) {
if (quota.max_size_soft_threshold < 0) {
return true;
}
-int RGWBucketStatsCache::fetch_bucket_totals(rgw_bucket& bucket, RGWStorageStats& stats)
+class BucketAsyncRefreshHandler : public RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler {
+public:
+ BucketAsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<rgw_bucket> *_cache,
+ const string& _user, rgw_bucket& _bucket) :
+ RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler(_store, _cache, _user, _bucket) {}
+
+ int init_fetch();
+ void handle_response(int r);
+};
+
+class RGWBucketStatsCache : public RGWQuotaCache<rgw_bucket> {
+protected:
+ bool map_find(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) {
+ return stats_map.find(bucket, qs);
+ }
+
+ bool map_find_and_update(const string& user, rgw_bucket& bucket, lru_map<rgw_bucket, RGWQuotaCacheStats>::UpdateContext *ctx) {
+ return stats_map.find_and_update(bucket, NULL, ctx);
+ }
+
+ void map_add(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) {
+ stats_map.add(bucket, qs);
+ }
+
+ int fetch_stats_from_storage(const string& user, rgw_bucket& bucket, RGWStorageStats& stats);
+
+public:
+ RGWBucketStatsCache(RGWRados *_store) : RGWQuotaCache(store, store->ctx()->_conf->rgw_bucket_quota_cache_size) {
+ }
+
+ AsyncRefreshHandler *allocate_refresh_handler(const string& user, rgw_bucket& bucket) {
+ return new BucketAsyncRefreshHandler(store, this, user, bucket);
+ }
+};
+
+int RGWBucketStatsCache::fetch_stats_from_storage(const string& user, rgw_bucket& bucket, RGWStorageStats& stats)
{
RGWBucketInfo bucket_info;
return 0;
}
-class AsyncRefreshHandler : public RGWGetBucketStats_CB {
- RGWRados *store;
- RGWBucketStatsCache *cache;
-public:
- AsyncRefreshHandler(RGWRados *_store, RGWBucketStatsCache *_cache, rgw_bucket& _bucket) : RGWGetBucketStats_CB(_bucket), store(_store), cache(_cache) {}
-
- int init_fetch();
-
- void handle_response(int r);
-};
-
-
-int AsyncRefreshHandler::init_fetch()
+int BucketAsyncRefreshHandler::init_fetch()
{
ldout(store->ctx(), 20) << "initiating async quota refresh for bucket=" << bucket << dendl;
map<RGWObjCategory, RGWStorageStats> bucket_stats;
return 0;
}
-void AsyncRefreshHandler::handle_response(int r)
+void BucketAsyncRefreshHandler::handle_response(int r)
{
if (r < 0) {
ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
bs.num_objects += s.num_objects;
}
- cache->async_refresh_response(bucket, bs);
+ cache->async_refresh_response(user, bucket, bs);
}
-class RGWBucketStatsAsyncTestSet : public lru_map<rgw_bucket, RGWQuotaBucketStats>::UpdateContext {
- int objs_delta;
- uint64_t added_bytes;
- uint64_t removed_bytes;
-public:
- RGWBucketStatsAsyncTestSet() {}
- bool update(RGWQuotaBucketStats *entry) {
- if (entry->async_refresh_time.sec() == 0)
- return false;
-
- entry->async_refresh_time = utime_t(0, 0);
-
- return true;
- }
-};
-
-int RGWBucketStatsCache::async_refresh(rgw_bucket& bucket, RGWQuotaBucketStats& qs)
+template<class T>
+int RGWQuotaCache<T>::async_refresh(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs)
{
/* protect against multiple updates */
- RGWBucketStatsAsyncTestSet test_update;
- if (!stats_map.find_and_update(bucket, NULL, &test_update)) {
+ StatsAsyncTestSet test_update;
+ if (!map_find_and_update(user, bucket, &test_update)) {
/* most likely we just raced with another update */
return 0;
}
async_refcount->get();
- AsyncRefreshHandler *handler = new AsyncRefreshHandler(store, this, bucket);
+
+ AsyncRefreshHandler *handler = allocate_refresh_handler(user, bucket);
int ret = handler->init_fetch();
if (ret < 0) {
return 0;
}
-void RGWBucketStatsCache::async_refresh_response(rgw_bucket& bucket, RGWStorageStats& stats)
+template<class T>
+void RGWQuotaCache<T>::async_refresh_response(const string& user, rgw_bucket& bucket, RGWStorageStats& stats)
{
ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
- RGWQuotaBucketStats qs;
+ RGWQuotaCacheStats qs;
- stats_map.find(bucket, qs);
+ map_find(user, bucket, qs);
- set_stats(bucket, qs, stats);
+ set_stats(user, bucket, qs, stats);
async_refcount->put();
}
-void RGWBucketStatsCache::set_stats(rgw_bucket& bucket, RGWQuotaBucketStats& qs, RGWStorageStats& stats)
+template<class T>
+void RGWQuotaCache<T>::set_stats(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats)
{
qs.stats = stats;
qs.expiration = ceph_clock_now(store->ctx());
qs.expiration += store->ctx()->_conf->rgw_bucket_quota_ttl;
qs.async_refresh_time += store->ctx()->_conf->rgw_bucket_quota_ttl / 2;
- stats_map.add(bucket, qs);
+ map_add(user, bucket, qs);
}
-int RGWBucketStatsCache::get_bucket_stats(rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota) {
- RGWQuotaBucketStats qs;
+template<class T>
+int RGWQuotaCache<T>::get_stats(const string& user, rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota) {
+ RGWQuotaCacheStats qs;
utime_t now = ceph_clock_now(store->ctx());
- if (stats_map.find(bucket, qs)) {
+ if (map_find(user, bucket, qs)) {
if (qs.async_refresh_time.sec() > 0 && now >= qs.async_refresh_time) {
- int r = async_refresh(bucket, qs);
+ int r = async_refresh(user, bucket, qs);
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: quota async refresh returned ret=" << r << dendl;
}
}
- int ret = fetch_bucket_totals(bucket, stats);
+ int ret = fetch_stats_from_storage(user, bucket, stats);
if (ret < 0 && ret != -ENOENT)
return ret;
- set_stats(bucket, qs, stats);
+ set_stats(user, bucket, qs, stats);
return 0;
}
-class RGWBucketStatsUpdate : public lru_map<rgw_bucket, RGWQuotaBucketStats>::UpdateContext {
+class RGWQuotaStatsUpdate : public lru_map<rgw_bucket, RGWQuotaCacheStats>::UpdateContext {
int objs_delta;
uint64_t added_bytes;
uint64_t removed_bytes;
public:
- RGWBucketStatsUpdate(int _objs_delta, uint64_t _added_bytes, uint64_t _removed_bytes) :
+ RGWQuotaStatsUpdate(int _objs_delta, uint64_t _added_bytes, uint64_t _removed_bytes) :
objs_delta(_objs_delta), added_bytes(_added_bytes), removed_bytes(_removed_bytes) {}
- bool update(RGWQuotaBucketStats *entry) {
+ bool update(RGWQuotaCacheStats *entry) {
uint64_t rounded_kb_added = rgw_rounded_kb(added_bytes);
uint64_t rounded_kb_removed = rgw_rounded_kb(removed_bytes);
};
-void RGWBucketStatsCache::adjust_bucket_stats(const string& bucket_owner, rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes)
+template<class T>
+void RGWQuotaCache<T>::adjust_stats(const string& user, rgw_bucket& bucket, int objs_delta,
+ uint64_t added_bytes, uint64_t removed_bytes)
{
- RGWBucketStatsUpdate update(objs_delta, added_bytes, removed_bytes);
- stats_map.find_and_update(bucket, NULL, &update);
+ RGWQuotaStatsUpdate update(objs_delta, added_bytes, removed_bytes);
+ map_find_and_update(user, bucket, &update);
}
RGWBucketStatsCache stats_cache;
public:
RGWQuotaHandlerImpl(RGWRados *_store) : store(_store), stats_cache(_store) {}
- virtual int check_quota(const string& bucket_owner, rgw_bucket& bucket, RGWQuotaInfo& bucket_quota,
+ virtual int check_quota(const string& user, rgw_bucket& bucket, RGWQuotaInfo& bucket_quota,
uint64_t num_objs, uint64_t size) {
uint64_t size_kb = rgw_rounded_kb(size);
if (!bucket_quota.enabled) {
RGWStorageStats stats;
- int ret = stats_cache.get_bucket_stats(bucket, stats, bucket_quota);
+ int ret = stats_cache.get_stats(user, bucket, stats, bucket_quota);
if (ret < 0)
return ret;
return 0;
}
- virtual void update_stats(const string& bucket_owner, rgw_bucket& bucket, int obj_delta, uint64_t added_bytes, uint64_t removed_bytes) {
- stats_cache.adjust_bucket_stats(bucket_owner, bucket, obj_delta, added_bytes, removed_bytes);
+ virtual void update_stats(const string& user, rgw_bucket& bucket, int obj_delta, uint64_t added_bytes, uint64_t removed_bytes) {
+ stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes);
};
};