]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: protect against concurrent async quota updates
authorYehuda Sadeh <yehuda@inktank.com>
Tue, 8 Oct 2013 21:05:59 +0000 (14:05 -0700)
committerYehuda Sadeh <yehuda@inktank.com>
Thu, 10 Oct 2013 20:51:23 +0000 (13:51 -0700)
Leverage the cache lru_map locking for making sure that we don't end
up with more than a single concurrent async update on the same bucket
within the same update window.

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/common/lru_map.h
src/rgw/rgw_quota.cc

index 62182dd26e83d87cc83268c743521e3f9ec6479e..98fb44a932e4102b258869e0373ae6b2b99c52c4 100644 (file)
@@ -24,7 +24,7 @@ public:
   class UpdateContext {
     public:
       virtual ~UpdateContext() {}
-      virtual void update(V& v) = 0;
+      virtual bool update(V& v) = 0;
   };
 
   bool _find(const K& key, V *value, UpdateContext *ctx);
@@ -51,8 +51,10 @@ bool lru_map<K, V>::_find(const K& key, V *value, UpdateContext *ctx)
   entry& e = iter->second;
   entries_lru.erase(e.lru_iter);
 
+  bool r = true;
+
   if (ctx)
-    ctx->update(e.value);
+    r = ctx->update(e.value);
 
   if (value)
     *value = e.value;
@@ -60,7 +62,7 @@ bool lru_map<K, V>::_find(const K& key, V *value, UpdateContext *ctx)
   entries_lru.push_front(key);
   e.lru_iter = entries_lru.begin();
 
-  return true;
+  return r;
 }
 
 template <class K, class V>
index 984b3b57dea13cc3a2d9ca8b65395342002a782b..11da4bf0177395606b6c196179d48d8714b4645a 100644 (file)
@@ -114,15 +114,30 @@ void AsyncRefreshHandler::handle_response(int r)
   cache->async_refresh_response(bucket, bs);
 }
 
+class RGWBucketStatsAsyncTestSet : public lru_map<rgw_bucket, RGWQuotaBucketStats>::UpdateContext {
+  int objs_delta;
+  uint64_t added_bytes;
+  uint64_t removed_bytes;
+public:
+  RGWBucketStatsAsyncTestSet() {}
+  bool update(RGWQuotaBucketStats& entry) {
+    if (entry.async_refresh_time.sec() == 0)
+      return false;
+
+    entry.async_refresh_time = utime_t(0, 0);
+
+    return true;
+  }
+};
+
 int RGWBucketStatsCache::async_refresh(rgw_bucket& bucket, RGWQuotaBucketStats& qs)
 {
-#if 0
-  if (qs.async_update_flag.inc() != 1) { /* are we the first one here? */
-    qs.async_update_flag.dec();
+  /* protect against multiple updates */
+  RGWBucketStatsAsyncTestSet test_update;
+  if (!stats_map.find_and_update(bucket, NULL, &test_update)) {
+    /* most likely we just raced with another update */
     return 0;
   }
-#endif
-#warning protect against multiple updates
 
   async_refcount->get();
 
@@ -166,7 +181,7 @@ int RGWBucketStatsCache::get_bucket_stats(rgw_bucket& bucket, RGWBucketStats& st
   RGWQuotaBucketStats qs;
   utime_t now = ceph_clock_now(store->ctx());
   if (stats_map.find(bucket, qs)) {
-    if (now >= qs.async_refresh_time) {
+    if (qs.async_refresh_time.sec() > 0 && now >= qs.async_refresh_time) {
       int r = async_refresh(bucket, qs);
       if (r < 0) {
         ldout(store->ctx(), 0) << "ERROR: quota async refresh returned ret=" << r << dendl;
@@ -197,13 +212,15 @@ class RGWBucketStatsUpdate : public lru_map<rgw_bucket, RGWQuotaBucketStats>::Up
 public:
   RGWBucketStatsUpdate(int _objs_delta, uint64_t _added_bytes, uint64_t _removed_bytes) : 
                     objs_delta(_objs_delta), added_bytes(_added_bytes), removed_bytes(_removed_bytes) {}
-  void update(RGWQuotaBucketStats& entry) {
+  bool update(RGWQuotaBucketStats& entry) {
     uint64_t rounded_kb_added = rgw_rounded_kb(added_bytes);
     uint64_t rounded_kb_removed = rgw_rounded_kb(removed_bytes);
 
     entry.stats.num_kb_rounded += (rounded_kb_added - rounded_kb_removed);
     entry.stats.num_kb += (added_bytes - removed_bytes) / 1024;
     entry.stats.num_objects += objs_delta;
+
+    return true;
   }
 };