]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: refactor quota cache
authorYehuda Sadeh <yehuda@inktank.com>
Fri, 10 Jan 2014 18:42:13 +0000 (10:42 -0800)
committerYehuda Sadeh <yehuda@inktank.com>
Fri, 24 Jan 2014 18:28:49 +0000 (10:28 -0800)
bucket quota cache is going to be reused, add a new abstract layer.

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/rgw/rgw_quota.cc

index 5c16c7895d16eefbef323a2ea9f06c5e59a70f8e..828199bd31b2a0642ed546d6772cc0b759516420 100644 (file)
 #define dout_subsys ceph_subsys_rgw
 
 
-struct RGWQuotaBucketStats {
+struct RGWQuotaCacheStats {
   RGWStorageStats stats;
   utime_t expiration;
   utime_t async_refresh_time;
 };
 
-class RGWBucketStatsCache {
+template<class T>
+class RGWQuotaCache {
+protected:
   RGWRados *store;
-  lru_map<rgw_bucket, RGWQuotaBucketStats> stats_map;
+  lru_map<T, RGWQuotaCacheStats> stats_map;
   RefCountedWaitObject *async_refcount;
 
-  int fetch_bucket_totals(rgw_bucket& bucket, RGWStorageStats& stats);
+  class StatsAsyncTestSet : public lru_map<rgw_bucket, RGWQuotaCacheStats>::UpdateContext {
+    int objs_delta;
+    uint64_t added_bytes;
+    uint64_t removed_bytes;
+  public:
+    StatsAsyncTestSet() {}
+    bool update(RGWQuotaCacheStats *entry) {
+      if (entry->async_refresh_time.sec() == 0)
+        return false;
 
+      entry->async_refresh_time = utime_t(0, 0);
+
+      return true;
+    }
+  };
+
+  virtual int fetch_stats_from_storage(const string& user, rgw_bucket& bucket, RGWStorageStats& stats) = 0;
+
+  virtual bool map_find(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
+  virtual bool map_find_and_update(const string& user, rgw_bucket& bucket, lru_map<rgw_bucket, RGWQuotaCacheStats>::UpdateContext *ctx) = 0;
+  virtual void map_add(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
 public:
-  RGWBucketStatsCache(RGWRados *_store) : store(_store), stats_map(store->ctx()->_conf->rgw_bucket_quota_cache_size) {
+  RGWQuotaCache(RGWRados *_store, int size) : store(_store), stats_map(size) {
     async_refcount = new RefCountedWaitObject;
   }
-  ~RGWBucketStatsCache() {
+  virtual ~RGWQuotaCache() {
     async_refcount->put_wait(); /* wait for all pending async requests to complete */
   }
 
-  int get_bucket_stats(rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota);
-  void adjust_bucket_stats(const string& bucket_owner, rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes);
+  int get_stats(const string& user, rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota);
+  void adjust_stats(const string& user, rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes);
 
   bool can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& stats);
 
-  void set_stats(rgw_bucket& bucket, RGWQuotaBucketStats& qs, RGWStorageStats& stats);
-  int async_refresh(rgw_bucket& bucket, RGWQuotaBucketStats& qs);
-  void async_refresh_response(rgw_bucket& bucket, RGWStorageStats& stats);
+  void set_stats(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats);
+  int async_refresh(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs);
+  void async_refresh_response(const string& user, rgw_bucket& bucket, RGWStorageStats& stats);
+
+  class AsyncRefreshHandler : public RGWGetBucketStats_CB {
+  protected:
+    RGWRados *store;
+    RGWQuotaCache<T> *cache;
+    string user;
+  public:
+    AsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<T> *_cache,
+                        const string& _user, rgw_bucket& _bucket) : RGWGetBucketStats_CB(_bucket), store(_store),
+                                                                    cache(_cache), user(_user) {}
+
+    virtual int init_fetch() = 0;
+    virtual void handle_response(int r) = 0;
+  };
+
+  virtual AsyncRefreshHandler *allocate_refresh_handler(const string& user, rgw_bucket& bucket) = 0;
 };
 
-bool RGWBucketStatsCache::can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& cached_stats)
+template<class T>
+bool RGWQuotaCache<T>::can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageStats& cached_stats)
 {
   if (quota.max_size_kb >= 0) {
     if (quota.max_size_soft_threshold < 0) {
@@ -84,7 +122,42 @@ bool RGWBucketStatsCache::can_use_cached_stats(RGWQuotaInfo& quota, RGWStorageSt
   return true;
 }
 
-int RGWBucketStatsCache::fetch_bucket_totals(rgw_bucket& bucket, RGWStorageStats& stats)
+class BucketAsyncRefreshHandler : public RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler {
+public:
+  BucketAsyncRefreshHandler(RGWRados *_store, RGWQuotaCache<rgw_bucket> *_cache,
+                            const string& _user, rgw_bucket& _bucket) :
+                                      RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler(_store, _cache, _user, _bucket) {}
+
+  int init_fetch();
+  void handle_response(int r);
+};
+
+class RGWBucketStatsCache : public RGWQuotaCache<rgw_bucket> {
+protected:
+  bool map_find(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) {
+    return stats_map.find(bucket, qs);
+  }
+
+  bool map_find_and_update(const string& user, rgw_bucket& bucket, lru_map<rgw_bucket, RGWQuotaCacheStats>::UpdateContext *ctx) {
+    return stats_map.find_and_update(bucket, NULL, ctx);
+  }
+
+  void map_add(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) {
+    stats_map.add(bucket, qs);
+  }
+
+  int fetch_stats_from_storage(const string& user, rgw_bucket& bucket, RGWStorageStats& stats);
+
+public:
+  RGWBucketStatsCache(RGWRados *_store) : RGWQuotaCache(store, store->ctx()->_conf->rgw_bucket_quota_cache_size) {
+  }
+
+  AsyncRefreshHandler *allocate_refresh_handler(const string& user, rgw_bucket& bucket) {
+    return new BucketAsyncRefreshHandler(store, this, user, bucket);
+  }
+};
+
+int RGWBucketStatsCache::fetch_stats_from_storage(const string& user, rgw_bucket& bucket, RGWStorageStats& stats)
 {
   RGWBucketInfo bucket_info;
 
@@ -111,19 +184,7 @@ int RGWBucketStatsCache::fetch_bucket_totals(rgw_bucket& bucket, RGWStorageStats
   return 0;
 }
 
-class AsyncRefreshHandler : public RGWGetBucketStats_CB {
-  RGWRados *store;
-  RGWBucketStatsCache *cache;
-public:
-  AsyncRefreshHandler(RGWRados *_store, RGWBucketStatsCache *_cache, rgw_bucket& _bucket) : RGWGetBucketStats_CB(_bucket), store(_store), cache(_cache) {}
-
-  int init_fetch();
-
-  void handle_response(int r);
-};
-
-
-int AsyncRefreshHandler::init_fetch()
+int BucketAsyncRefreshHandler::init_fetch()
 {
   ldout(store->ctx(), 20) << "initiating async quota refresh for bucket=" << bucket << dendl;
   map<RGWObjCategory, RGWStorageStats> bucket_stats;
@@ -138,7 +199,7 @@ int AsyncRefreshHandler::init_fetch()
   return 0;
 }
 
-void AsyncRefreshHandler::handle_response(int r)
+void BucketAsyncRefreshHandler::handle_response(int r)
 {
   if (r < 0) {
     ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
@@ -155,37 +216,23 @@ void AsyncRefreshHandler::handle_response(int r)
     bs.num_objects += s.num_objects;
   }
 
-  cache->async_refresh_response(bucket, bs);
+  cache->async_refresh_response(user, bucket, bs);
 }
 
-class RGWBucketStatsAsyncTestSet : public lru_map<rgw_bucket, RGWQuotaBucketStats>::UpdateContext {
-  int objs_delta;
-  uint64_t added_bytes;
-  uint64_t removed_bytes;
-public:
-  RGWBucketStatsAsyncTestSet() {}
-  bool update(RGWQuotaBucketStats *entry) {
-    if (entry->async_refresh_time.sec() == 0)
-      return false;
-
-    entry->async_refresh_time = utime_t(0, 0);
-
-    return true;
-  }
-};
-
-int RGWBucketStatsCache::async_refresh(rgw_bucket& bucket, RGWQuotaBucketStats& qs)
+template<class T>
+int RGWQuotaCache<T>::async_refresh(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs)
 {
   /* protect against multiple updates */
-  RGWBucketStatsAsyncTestSet test_update;
-  if (!stats_map.find_and_update(bucket, NULL, &test_update)) {
+  StatsAsyncTestSet test_update;
+  if (!map_find_and_update(user, bucket, &test_update)) {
     /* most likely we just raced with another update */
     return 0;
   }
 
   async_refcount->get();
 
-  AsyncRefreshHandler *handler = new AsyncRefreshHandler(store, this, bucket);
+
+  AsyncRefreshHandler *handler = allocate_refresh_handler(user, bucket);
 
   int ret = handler->init_fetch();
   if (ret < 0) {
@@ -197,20 +244,22 @@ int RGWBucketStatsCache::async_refresh(rgw_bucket& bucket, RGWQuotaBucketStats&
   return 0;
 }
 
-void RGWBucketStatsCache::async_refresh_response(rgw_bucket& bucket, RGWStorageStats& stats)
+template<class T>
+void RGWQuotaCache<T>::async_refresh_response(const string& user, rgw_bucket& bucket, RGWStorageStats& stats)
 {
   ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
 
-  RGWQuotaBucketStats qs;
+  RGWQuotaCacheStats qs;
 
-  stats_map.find(bucket, qs);
+  map_find(user, bucket, qs);
 
-  set_stats(bucket, qs, stats);
+  set_stats(user, bucket, qs, stats);
 
   async_refcount->put();
 }
 
-void RGWBucketStatsCache::set_stats(rgw_bucket& bucket, RGWQuotaBucketStats& qs, RGWStorageStats& stats)
+template<class T>
+void RGWQuotaCache<T>::set_stats(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs, RGWStorageStats& stats)
 {
   qs.stats = stats;
   qs.expiration = ceph_clock_now(store->ctx());
@@ -218,15 +267,16 @@ void RGWBucketStatsCache::set_stats(rgw_bucket& bucket, RGWQuotaBucketStats& qs,
   qs.expiration += store->ctx()->_conf->rgw_bucket_quota_ttl;
   qs.async_refresh_time += store->ctx()->_conf->rgw_bucket_quota_ttl / 2;
 
-  stats_map.add(bucket, qs);
+  map_add(user, bucket, qs);
 }
 
-int RGWBucketStatsCache::get_bucket_stats(rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota) {
-  RGWQuotaBucketStats qs;
+template<class T>
+int RGWQuotaCache<T>::get_stats(const string& user, rgw_bucket& bucket, RGWStorageStats& stats, RGWQuotaInfo& quota) {
+  RGWQuotaCacheStats qs;
   utime_t now = ceph_clock_now(store->ctx());
-  if (stats_map.find(bucket, qs)) {
+  if (map_find(user, bucket, qs)) {
     if (qs.async_refresh_time.sec() > 0 && now >= qs.async_refresh_time) {
-      int r = async_refresh(bucket, qs);
+      int r = async_refresh(user, bucket, qs);
       if (r < 0) {
         ldout(store->ctx(), 0) << "ERROR: quota async refresh returned ret=" << r << dendl;
 
@@ -240,24 +290,24 @@ int RGWBucketStatsCache::get_bucket_stats(rgw_bucket& bucket, RGWStorageStats& s
     }
   }
 
-  int ret = fetch_bucket_totals(bucket, stats);
+  int ret = fetch_stats_from_storage(user, bucket, stats);
   if (ret < 0 && ret != -ENOENT)
     return ret;
 
-  set_stats(bucket, qs, stats);
+  set_stats(user, bucket, qs, stats);
 
   return 0;
 }
 
 
-class RGWBucketStatsUpdate : public lru_map<rgw_bucket, RGWQuotaBucketStats>::UpdateContext {
+class RGWQuotaStatsUpdate : public lru_map<rgw_bucket, RGWQuotaCacheStats>::UpdateContext {
   int objs_delta;
   uint64_t added_bytes;
   uint64_t removed_bytes;
 public:
-  RGWBucketStatsUpdate(int _objs_delta, uint64_t _added_bytes, uint64_t _removed_bytes) : 
+  RGWQuotaStatsUpdate(int _objs_delta, uint64_t _added_bytes, uint64_t _removed_bytes) : 
                     objs_delta(_objs_delta), added_bytes(_added_bytes), removed_bytes(_removed_bytes) {}
-  bool update(RGWQuotaBucketStats *entry) {
+  bool update(RGWQuotaCacheStats *entry) {
     uint64_t rounded_kb_added = rgw_rounded_kb(added_bytes);
     uint64_t rounded_kb_removed = rgw_rounded_kb(removed_bytes);
 
@@ -270,10 +320,12 @@ public:
 };
 
 
-void RGWBucketStatsCache::adjust_bucket_stats(const string& bucket_owner, rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes)
+template<class T>
+void RGWQuotaCache<T>::adjust_stats(const string& user, rgw_bucket& bucket, int objs_delta,
+                                 uint64_t added_bytes, uint64_t removed_bytes)
 {
-  RGWBucketStatsUpdate update(objs_delta, added_bytes, removed_bytes);
-  stats_map.find_and_update(bucket, NULL, &update);
+  RGWQuotaStatsUpdate update(objs_delta, added_bytes, removed_bytes);
+  map_find_and_update(user, bucket, &update);
 }
 
 
@@ -282,7 +334,7 @@ class RGWQuotaHandlerImpl : public RGWQuotaHandler {
   RGWBucketStatsCache stats_cache;
 public:
   RGWQuotaHandlerImpl(RGWRados *_store) : store(_store), stats_cache(_store) {}
-  virtual int check_quota(const string& bucket_owner, rgw_bucket& bucket, RGWQuotaInfo& bucket_quota,
+  virtual int check_quota(const string& user, rgw_bucket& bucket, RGWQuotaInfo& bucket_quota,
                          uint64_t num_objs, uint64_t size) {
     uint64_t size_kb = rgw_rounded_kb(size);
     if (!bucket_quota.enabled) {
@@ -291,7 +343,7 @@ public:
 
     RGWStorageStats stats;
 
-    int ret = stats_cache.get_bucket_stats(bucket, stats, bucket_quota);
+    int ret = stats_cache.get_stats(user, bucket, stats, bucket_quota);
     if (ret < 0)
       return ret;
 
@@ -315,8 +367,8 @@ public:
     return 0;
   }
 
-  virtual void update_stats(const string& bucket_owner, rgw_bucket& bucket, int obj_delta, uint64_t added_bytes, uint64_t removed_bytes) {
-    stats_cache.adjust_bucket_stats(bucket_owner, bucket, obj_delta, added_bytes, removed_bytes);
+  virtual void update_stats(const string& user, rgw_bucket& bucket, int obj_delta, uint64_t added_bytes, uint64_t removed_bytes) {
+    stats_cache.adjust_stats(user, bucket, obj_delta, added_bytes, removed_bytes);
   };
 };