]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: sync bucket stats thread
authorYehuda Sadeh <yehuda@inktank.com>
Tue, 14 Jan 2014 05:50:32 +0000 (21:50 -0800)
committerYehuda Sadeh <yehuda@inktank.com>
Fri, 24 Jan 2014 18:28:53 +0000 (10:28 -0800)
a new thread that periodically sync stats of recently modified buckets.

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/common/config_opts.h
src/rgw/rgw_quota.cc

index 0d03809777759ef295a5fb64e66964ac923ed59d..7a8c24cf885123aa8865828af6937015fa5bf331 100644 (file)
@@ -730,6 +730,7 @@ OPTION(rgw_replica_log_obj_prefix, OPT_STR, "replica_log") //
 OPTION(rgw_bucket_quota_ttl, OPT_INT, 600) // time for cached bucket stats to be cached within rgw instance
 OPTION(rgw_bucket_quota_soft_threshold, OPT_DOUBLE, 0.95) // threshold from which we don't rely on cached info for quota decisions
 OPTION(rgw_bucket_quota_cache_size, OPT_INT, 10000) // number of entries in bucket quota cache
+OPTION(rgw_user_quota_bucket_sync_interval, OPT_INT, 180) // time period for accumulating modified buckets before syncing stats
 
 OPTION(mutex_perf_counter, OPT_BOOL, false) // enable/disable mutex perf counter
 
index 78498bcd5328f03bef5ad830e7812748172273df..2854c312c7e46df0d6b0e3471ed1b8bf9751e5da 100644 (file)
 #include "include/utime.h"
 #include "common/lru_map.h"
 #include "common/RefCountedObj.h"
+#include "common/Thread.h"
+#include "common/Mutex.h"
+#include "common/RWLock.h"
 
 #include "rgw_common.h"
 #include "rgw_rados.h"
 #include "rgw_quota.h"
+#include "rgw_bucket.h"
 
 #define dout_subsys ceph_subsys_rgw
 
@@ -60,7 +64,7 @@ protected:
   virtual bool map_find_and_update(const string& user, rgw_bucket& bucket, typename lru_map<T, RGWQuotaCacheStats>::UpdateContext *ctx) = 0;
   virtual void map_add(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
 
-  virtual int handle_set_stats(const string& user, rgw_bucket& bucket, RGWStorageStats& stats) { return 0; }
+  virtual void data_modified(const string& user, rgw_bucket& bucket) {}
 public:
   RGWQuotaCache(RGWRados *_store, int size) : store(_store), stats_map(size) {
     async_refcount = new RefCountedWaitObject;
@@ -172,11 +176,6 @@ void RGWQuotaCache<T>::set_stats(const string& user, rgw_bucket& bucket, RGWQuot
   qs.async_refresh_time += store->ctx()->_conf->rgw_bucket_quota_ttl / 2;
 
   map_add(user, bucket, qs);
-
-  int ret = handle_set_stats(user, bucket, stats);
-  if (ret < 0) {
-    /* can't really do much about it, user stats are going to be off a bit for now */
-  }
 }
 
 template<class T>
@@ -236,6 +235,8 @@ void RGWQuotaCache<T>::adjust_stats(const string& user, rgw_bucket& bucket, int
 {
   RGWQuotaStatsUpdate<T> update(objs_delta, added_bytes, removed_bytes);
   map_find_and_update(user, bucket, &update);
+
+  data_modified(user, bucket);
 }
 
 class BucketAsyncRefreshHandler : public RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler,
@@ -302,14 +303,6 @@ protected:
   }
 
   int fetch_stats_from_storage(const string& user, rgw_bucket& bucket, RGWStorageStats& stats);
-  int handle_set_stats(const string& user, rgw_bucket& bucket, RGWStorageStats& stats) {
-    int ret = store->update_user_bucket_stats(user, bucket, stats);
-    if (ret < 0) {
-      derr << "ERROR: store->update_bucket_stats() returned " << ret << dendl;
-      return ret;
-    }
-    return 0;
-  }
 
 public:
   RGWBucketStatsCache(RGWRados *_store) : RGWQuotaCache(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size) {
@@ -387,6 +380,57 @@ void UserAsyncRefreshHandler::handle_response(int r)
 }
 
 class RGWUserStatsCache : public RGWQuotaCache<string> {
+  atomic_t down_flag;
+  RWLock rwlock;
+  map<rgw_bucket, string> modified_buckets;
+
+  /* thread, sync recent modified buckets info */
+  class BucketsSyncThread : public Thread {
+    CephContext *cct;
+    RGWUserStatsCache *stats;
+
+    Mutex lock;
+    Cond cond;
+  public:
+
+    BucketsSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s), lock("RGWUserStatsCache::BucketsSyncThread") {}
+
+    void *entry() {
+      ldout(cct, 20) << "BucketsSyncThread: start" << dendl;
+      do {
+        map<rgw_bucket, string> buckets;
+
+        stats->swap_modified_buckets(buckets);
+
+        for (map<rgw_bucket, string>::iterator iter = buckets.begin(); iter != buckets.end(); ++iter) {
+          rgw_bucket bucket = iter->first;
+          string& user = iter->second;
+          ldout(cct, 20) << "BucketsSyncThread: sync user=" << user << " bucket=" << bucket << dendl;
+          int r = stats->sync_bucket(user, bucket);
+          if (r < 0) {
+            ldout(cct, 0) << "WARNING: sync_bucket() returned r=" << r << dendl;
+          }
+        }
+
+        if (stats->going_down())
+          break;
+
+        lock.Lock();
+        cond.WaitInterval(cct, lock, utime_t(cct->_conf->rgw_user_quota_bucket_sync_interval, 0));
+        lock.Unlock();
+      } while (!stats->going_down());
+      ldout(cct, 20) << "BucketsSyncThread: done" << dendl;
+
+      return NULL;
+    }
+
+    void stop() {
+      Mutex::Locker l(lock);
+      cond.Signal();
+    }
+  };
+
+  BucketsSyncThread *buckets_sync_thread;
 protected:
   bool map_find(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) {
     return stats_map.find(user, qs);
@@ -401,9 +445,24 @@ protected:
   }
 
   int fetch_stats_from_storage(const string& user, rgw_bucket& bucket, RGWStorageStats& stats);
+  int sync_bucket(const string& user, rgw_bucket& bucket);
+
+  void data_modified(const string& user, rgw_bucket& bucket);
+
+  void swap_modified_buckets(map<rgw_bucket, string>& out) {
+    rwlock.get_write();
+    modified_buckets.swap(out);
+    rwlock.unlock();
+  }
 
 public:
-  RGWUserStatsCache(RGWRados *_store) : RGWQuotaCache(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size) {
+  RGWUserStatsCache(RGWRados *_store) : RGWQuotaCache(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size),
+                                        rwlock("RGWUserStatsCache::rwlock") {
+    buckets_sync_thread = new BucketsSyncThread(store->ctx(), this);
+    buckets_sync_thread->create();
+  }
+  ~RGWUserStatsCache() {
+    stop();
   }
 
   AsyncRefreshHandler *allocate_refresh_handler(const string& user, rgw_bucket& bucket) {
@@ -416,6 +475,22 @@ public:
      */
     return true;
   }
+
+  bool going_down() {
+    return (down_flag.read() != 0);
+  }
+
+  void stop() {
+    down_flag.set(1);
+    rwlock.get_write();
+    if (buckets_sync_thread) {
+      buckets_sync_thread->stop();
+      buckets_sync_thread->join();
+      delete buckets_sync_thread;
+      buckets_sync_thread = NULL;
+    }
+    rwlock.unlock();
+  }
 };
 
 int RGWUserStatsCache::fetch_stats_from_storage(const string& user, rgw_bucket& bucket, RGWStorageStats& stats)
@@ -429,6 +504,31 @@ int RGWUserStatsCache::fetch_stats_from_storage(const string& user, rgw_bucket&
   return 0;
 }
 
+int RGWUserStatsCache::sync_bucket(const string& user, rgw_bucket& bucket)
+{
+  int r = rgw_bucket_sync_user_stats(store, user, bucket);
+  if (r < 0) {
+    ldout(store->ctx(), 0) << "ERROR: rgw_bucket_sync_user_stats() for user=" << user << ", bucket=" << bucket << " returned " << r << dendl;
+    return r;
+  }
+
+  return 0;
+}
+
+void RGWUserStatsCache::data_modified(const string& user, rgw_bucket& bucket)
+{
+  /* racy, but it's ok */
+  rwlock.get_read();
+  bool need_update = modified_buckets.find(bucket) == modified_buckets.end();
+  rwlock.unlock();
+
+  if (need_update) {
+    rwlock.get_write();
+    modified_buckets[bucket] = user;
+    rwlock.unlock();
+  }
+}
+
 
 class RGWQuotaHandlerImpl : public RGWQuotaHandler {
   RGWRados *store;