]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: quota thread for full user stats sync
authorYehuda Sadeh <yehuda@inktank.com>
Tue, 14 Jan 2014 22:48:16 +0000 (14:48 -0800)
committerYehuda Sadeh <yehuda@inktank.com>
Fri, 24 Jan 2014 23:07:35 +0000 (15:07 -0800)
Get user stats up to date periodically. Add configurables for different
periods, whether we update idle users.
Make sure radosgw-admin does not start the quota threads.

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/common/config_opts.h
src/rgw/rgw_admin.cc
src/rgw/rgw_bucket.cc
src/rgw/rgw_bucket.h
src/rgw/rgw_main.cc
src/rgw/rgw_quota.cc
src/rgw/rgw_quota.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_user.cc
src/rgw/rgw_user.h

index 7a8c24cf885123aa8865828af6937015fa5bf331..3a4ce74632608ab185e49dbdda28867feb8d0b3a 100644 (file)
@@ -731,6 +731,9 @@ OPTION(rgw_bucket_quota_ttl, OPT_INT, 600) // time for cached bucket stats to be
 OPTION(rgw_bucket_quota_soft_threshold, OPT_DOUBLE, 0.95) // threshold from which we don't rely on cached info for quota decisions
 OPTION(rgw_bucket_quota_cache_size, OPT_INT, 10000) // number of entries in bucket quota cache
 OPTION(rgw_user_quota_bucket_sync_interval, OPT_INT, 180) // time period for accumulating modified buckets before syncing stats
+OPTION(rgw_user_quota_sync_interval, OPT_INT, 3600 * 24) // time period for accumulating modified buckets before syncing entire user stats
+OPTION(rgw_user_quota_sync_idle_users, OPT_BOOL, false) // whether stats for idle users be fully synced
+OPTION(rgw_user_quota_sync_wait_time, OPT_INT, 3600 * 24) // min time between two full stats syc for non-idle users
 
 OPTION(mutex_perf_counter, OPT_BOOL, false) // enable/disable mutex perf counter
 
index 9852558b8a455a94074058b6e86187a239ca1764..c26afbfc5a0313044d427f24a435d3582a6cc8af 100644 (file)
@@ -740,24 +740,6 @@ int set_user_quota(int opt_cmd, RGWUser& user, RGWUserAdminOpState& op_state, in
   return 0;
 }
 
-static int sync_bucket_stats(RGWRados *store, string& bucket_name)
-{
-  RGWBucketInfo bucket_info;
-  int r = store->get_bucket_info(NULL, bucket_name, bucket_info, NULL);
-  if (r < 0) {
-    cerr << "ERROR: could not fetch bucket info: " << cpp_strerror(-r) << std::endl;
-    return r;
-  }
-
-  r = rgw_bucket_sync_user_stats(store, bucket_info.owner, bucket_info.bucket);
-  if (r < 0) {
-    cerr << "ERROR: could not sync user stats for bucket " << bucket_name << ": " << cpp_strerror(-r) << std::endl;
-    return r;
-  }
-
-  return 0;
-}
-
 int main(int argc, char **argv) 
 {
   vector<const char*> args;
@@ -1052,7 +1034,7 @@ int main(int argc, char **argv)
   if (raw_storage_op) {
     store = RGWStoreManager::get_raw_storage(g_ceph_context);
   } else {
-    store = RGWStoreManager::get_storage(g_ceph_context, false);
+    store = RGWStoreManager::get_storage(g_ceph_context, false, false);
   }
   if (!store) {
     cerr << "couldn't init storage provider" << std::endl;
@@ -1934,42 +1916,15 @@ next:
   if (opt_cmd == OPT_USER_STATS) {
     if (sync_stats) {
       if (!bucket_name.empty()) {
-        int ret = sync_bucket_stats(store, bucket_name);
+        int ret = rgw_bucket_sync_user_stats(store, bucket_name);
         if (ret < 0) {
           cerr << "ERROR: could not sync bucket stats: " << cpp_strerror(-ret) << std::endl;
           return -ret;
         }
       } else {
-        size_t max_entries = g_conf->rgw_list_buckets_max_chunk;
-
-        bool done;
-
-        do {
-          RGWUserBuckets user_buckets;
-          int ret = rgw_read_user_buckets(store, user_id, user_buckets, marker, max_entries, false);
-          if (ret < 0) {
-            cerr << "failed to read user buckets: " << cpp_strerror(-ret) << std::endl;
-            return -ret;
-          }
-          map<string, RGWBucketEnt>& buckets = user_buckets.get_buckets();
-          for (map<string, RGWBucketEnt>::iterator i = buckets.begin();
-               i != buckets.end();
-               ++i) {
-            marker = i->first;
-
-            RGWBucketEnt& bucket_ent = i->second;
-            ret = sync_bucket_stats(store, bucket_ent.bucket.name);
-            if (ret < 0) {
-              cerr << "ERROR: could not sync bucket stats: " << cpp_strerror(-ret) << std::endl;
-              return -ret;
-            }
-          }
-          done = (buckets.size() < max_entries);
-        } while (!done);
-
-        int ret = store->complete_sync_user_stats(user_id);
+        int ret = rgw_user_sync_all_stats(store, user_id);
         if (ret < 0) {
-          cerr << "ERROR: failed to complete syncing user stats: " << cpp_strerror(-ret) << std::endl;
+          cerr << "ERROR: failed to sync user stats: " << cpp_strerror(-ret) << std::endl;
           return -ret;
         }
       }
index 3e7a44e15a42c2fc955aaaea98bf30b97ba768f8..2a9e061d529027dac48b6b3f707ce14cf94db9cc 100644 (file)
@@ -87,6 +87,24 @@ int rgw_bucket_sync_user_stats(RGWRados *store, const string& user_id, rgw_bucke
   return store->cls_user_sync_bucket_stats(obj, bucket);
 }
 
+int rgw_bucket_sync_user_stats(RGWRados *store, const string& bucket_name)
+{
+  RGWBucketInfo bucket_info;
+  int ret = store->get_bucket_info(NULL, bucket_name, bucket_info, NULL);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: could not fetch bucket info: ret=" << ret << dendl;
+    return ret;
+  }
+
+  ret = rgw_bucket_sync_user_stats(store, bucket_info.owner, bucket_info.bucket);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: could not sync user stats for bucket " << bucket_name << ": ret=" << ret << dendl;
+    return ret;
+  }
+
+  return 0;
+}
+
 int rgw_link_bucket(RGWRados *store, string user_id, rgw_bucket& bucket, time_t creation_time, bool update_entrypoint)
 {
   int ret;
index cc941a595c0e04d0e7b192f3f5ba298228e427b7..b302238328b39ffe84b421ab60d3d420c2a9565e 100644 (file)
@@ -34,6 +34,7 @@ extern int rgw_bucket_instance_remove_entry(RGWRados *store, string& entry, RGWO
 extern int rgw_bucket_delete_bucket_obj(RGWRados *store, string& bucket_name, RGWObjVersionTracker& objv_tracker);
 
 extern int rgw_bucket_sync_user_stats(RGWRados *store, const string& user_id, rgw_bucket& bucket);
+extern int rgw_bucket_sync_user_stats(RGWRados *store, const string& bucket_name);
 
 /**
  * Store a list of the user's buckets, with associated functinos.
index 82568ff3909244d4d53468387ba8ccc255aca4bb..b56f5499257200f51b6f5474b1036b58f9fef12b 100644 (file)
@@ -514,7 +514,7 @@ int main(int argc, const char **argv)
   FCGX_Init();
 
   int r = 0;
-  RGWRados *store = RGWStoreManager::get_storage(g_ceph_context, true);
+  RGWRados *store = RGWStoreManager::get_storage(g_ceph_context, true, true);
   if (!store) {
     derr << "Couldn't init storage provider (RADOS)" << dendl;
     r = EIO;
index 3e677b30f8487ae5364656bf0feec3f114220618..7a967cbe88a066b29a1d3ff290c89d0957fde86d 100644 (file)
@@ -24,6 +24,7 @@
 #include "rgw_rados.h"
 #include "rgw_quota.h"
 #include "rgw_bucket.h"
+#include "rgw_user.h"
 
 #define dout_subsys ceph_subsys_rgw
 
@@ -430,7 +431,51 @@ class RGWUserStatsCache : public RGWQuotaCache<string> {
     }
   };
 
+  /*
+   * thread, full sync all users stats periodically
+   *
+   * only sync non idle users or ones that never got synced before, this is needed so that
+   * users that didn't have quota turned on before (or existed before the user objclass
+   * tracked stats) need to get their backend stats up to date.
+   */
+  class UserSyncThread : public Thread {
+    CephContext *cct;
+    RGWUserStatsCache *stats;
+
+    Mutex lock;
+    Cond cond;
+  public:
+
+    UserSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s), lock("RGWUserStatsCache::UserSyncThread") {}
+
+    void *entry() {
+      ldout(cct, 20) << "UserSyncThread: start" << dendl;
+      do {
+
+        string key = "user";
+
+        int ret = stats->sync_all_users();
+        if (ret < 0) {
+          ldout(cct, 0) << "ERROR: sync_all_users() returned ret=" << ret << dendl;
+        }
+
+        lock.Lock();
+        cond.WaitInterval(cct, lock, utime_t(cct->_conf->rgw_user_quota_sync_interval, 0));
+        lock.Unlock();
+      } while (!stats->going_down());
+      ldout(cct, 20) << "UserSyncThread: done" << dendl;
+
+      return NULL;
+    }
+
+    void stop() {
+      Mutex::Locker l(lock);
+      cond.Signal();
+    }
+  };
+
   BucketsSyncThread *buckets_sync_thread;
+  UserSyncThread *user_sync_thread;
 protected:
   bool map_find(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) {
     return stats_map.find(user, qs);
@@ -446,6 +491,8 @@ protected:
 
   int fetch_stats_from_storage(const string& user, rgw_bucket& bucket, RGWStorageStats& stats);
   int sync_bucket(const string& user, rgw_bucket& bucket);
+  int sync_user(const string& user);
+  int sync_all_users();
 
   void data_modified(const string& user, rgw_bucket& bucket);
 
@@ -455,11 +502,30 @@ protected:
     rwlock.unlock();
   }
 
+  template<class T> /* easier doing it as a template, Thread doesn't have ->stop() */
+  void stop_thread(T **pthr) {
+    T *thread = *pthr;
+    if (!thread)
+      return;
+
+    thread->stop();
+    thread->join();
+    delete thread;
+    *pthr = NULL;
+  }
+
 public:
-  RGWUserStatsCache(RGWRados *_store) : RGWQuotaCache(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size),
+  RGWUserStatsCache(RGWRados *_store, bool quota_threads) : RGWQuotaCache(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size),
                                         rwlock("RGWUserStatsCache::rwlock") {
-    buckets_sync_thread = new BucketsSyncThread(store->ctx(), this);
-    buckets_sync_thread->create();
+    if (quota_threads) {
+      buckets_sync_thread = new BucketsSyncThread(store->ctx(), this);
+      buckets_sync_thread->create();
+      user_sync_thread = new UserSyncThread(store->ctx(), this);
+      user_sync_thread->create();
+    } else {
+      buckets_sync_thread = NULL;
+      user_sync_thread = NULL;
+    }
   }
   ~RGWUserStatsCache() {
     stop();
@@ -483,13 +549,9 @@ public:
   void stop() {
     down_flag.set(1);
     rwlock.get_write();
-    if (buckets_sync_thread) {
-      buckets_sync_thread->stop();
-      buckets_sync_thread->join();
-      delete buckets_sync_thread;
-      buckets_sync_thread = NULL;
-    }
+    stop_thread(&buckets_sync_thread);
     rwlock.unlock();
+    stop_thread(&user_sync_thread);
   }
 };
 
@@ -515,6 +577,77 @@ int RGWUserStatsCache::sync_bucket(const string& user, rgw_bucket& bucket)
   return 0;
 }
 
+int RGWUserStatsCache::sync_user(const string& user)
+{
+  cls_user_header header;
+  int ret = store->cls_user_get_header(user, &header);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: can't read user header: ret=" << ret << dendl;
+    return ret;
+  }
+
+  if (!store->ctx()->_conf->rgw_user_quota_sync_idle_users &&
+      header.last_stats_update < header.last_stats_sync) {
+    ldout(store->ctx(), 20) << "user is idle, not doing a full sync (user=" << user << ")" << dendl;
+    return 0;
+  }
+
+  utime_t when_need_full_sync = header.last_stats_sync;
+  when_need_full_sync += store->ctx()->_conf->rgw_user_quota_sync_wait_time;
+  
+  // check if enough time passed since last full sync
+
+  ret = rgw_user_sync_all_stats(store, user);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: failed user stats sync, ret=" << ret << dendl;
+    return ret;
+  }
+
+  return 0;
+}
+
+int RGWUserStatsCache::sync_all_users()
+{
+  string key = "user";
+  void *handle;
+
+  int ret = store->meta_mgr->list_keys_init(key, &handle);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: can't get key: ret=" << ret << dendl;
+    return ret;
+  }
+
+  bool truncated;
+  int max = 1000;
+
+  do {
+    list<string> keys;
+    ret = store->meta_mgr->list_keys_next(handle, max, keys, &truncated);
+    if (ret < 0) {
+      ldout(store->ctx(), 0) << "ERROR: lists_keys_next(): ret=" << ret << dendl;
+      goto done;
+    }
+    for (list<string>::iterator iter = keys.begin();
+         iter != keys.end() && !going_down(); 
+         ++iter) {
+      string& user = *iter;
+      ldout(store->ctx(), 20) << "RGWUserStatsCache: sync user=" << user << dendl;
+      int ret = sync_user(user);
+      if (ret < 0) {
+        ldout(store->ctx(), 0) << "ERROR: sync_user() failed, user=" << user << " ret=" << ret << dendl;
+
+        /* continuing to next user */
+        continue;
+      }
+    }
+  } while (truncated);
+
+  ret = 0;
+done:
+  store->meta_mgr->list_keys_complete(handle);
+  return ret;
+}
+
 void RGWUserStatsCache::data_modified(const string& user, rgw_bucket& bucket)
 {
   /* racy, but it's ok */
@@ -557,7 +690,7 @@ class RGWQuotaHandlerImpl : public RGWQuotaHandler {
     return 0;
   }
 public:
-  RGWQuotaHandlerImpl(RGWRados *_store) : store(_store), bucket_stats_cache(_store), user_stats_cache(_store) {}
+  RGWQuotaHandlerImpl(RGWRados *_store, bool quota_threads) : store(_store), bucket_stats_cache(_store), user_stats_cache(_store, quota_threads) {}
   virtual int check_quota(const string& user, rgw_bucket& bucket,
                           RGWQuotaInfo& user_quota, RGWQuotaInfo& bucket_quota,
                          uint64_t num_objs, uint64_t size) {
@@ -608,9 +741,9 @@ public:
 };
 
 
-RGWQuotaHandler *RGWQuotaHandler::generate_handler(RGWRados *store)
+RGWQuotaHandler *RGWQuotaHandler::generate_handler(RGWRados *store, bool quota_threads)
 {
-  return new RGWQuotaHandlerImpl(store);
+  return new RGWQuotaHandlerImpl(store, quota_threads);
 };
 
 void RGWQuotaHandler::free_handler(RGWQuotaHandler *handler)
index 5595a73739a640eb3f23dc6d5b4fdf93d6618cce..abdb62ed90b34f3b64e6db28f7bbfe0b3816f141 100644 (file)
@@ -68,7 +68,7 @@ public:
 
   virtual void update_stats(const string& bucket_owner, rgw_bucket& bucket, int obj_delta, uint64_t added_bytes, uint64_t removed_bytes) = 0;
 
-  static RGWQuotaHandler *generate_handler(RGWRados *store);
+  static RGWQuotaHandler *generate_handler(RGWRados *store, bool quota_threads);
   static void free_handler(RGWQuotaHandler *handler);
 };
 
index 5a10e42741655c815d0259b944eb96759c2b4885..2dbcd52595d1e6e2bea24ca80bd82672a76ce282 100644 (file)
@@ -1005,7 +1005,7 @@ int RGWRados::init_complete()
   if (use_gc_thread)
     gc->start_processor();
 
-  quota_handler = RGWQuotaHandler::generate_handler(this);
+  quota_handler = RGWQuotaHandler::generate_handler(this, quota_threads);
 
   return ret;
 }
@@ -4857,7 +4857,7 @@ int RGWRados::get_bucket_entrypoint_info(void *ctx, const string& bucket_name,
   return 0;
 }
 
-int RGWRados::get_bucket_info(void *ctx, string& bucket_name, RGWBucketInfo& info,
+int RGWRados::get_bucket_info(void *ctx, const string& bucket_name, RGWBucketInfo& info,
                               time_t *pmtime, map<string, bufferlist> *pattrs)
 {
   bufferlist bl;
@@ -6328,7 +6328,7 @@ uint64_t RGWRados::next_bucket_id()
   return ++max_bucket_id;
 }
 
-RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread)
+RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool quota_threads)
 {
   int use_cache = cct->_conf->rgw_cache_enabled;
   RGWRados *store = NULL;
@@ -6338,7 +6338,7 @@ RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_t
     store = new RGWCache<RGWRados>; 
   }
 
-  if (store->initialize(cct, use_gc_thread) < 0) {
+  if (store->initialize(cct, use_gc_thread, quota_threads) < 0) {
     delete store;
     return NULL;
   }
index 34005392526bbabd00639cb175455a7cc827a71e..fc40eb2e80929c1ac7a46d58c398d602edbf5b28 100644 (file)
@@ -843,6 +843,7 @@ class RGWRados
 
   RGWGC *gc;
   bool use_gc_thread;
+  bool quota_threads;
 
   int num_watchers;
   RGWWatcher **watchers;
@@ -914,7 +915,7 @@ protected:
 
 public:
   RGWRados() : lock("rados_timer_lock"), timer(NULL),
-               gc(NULL), use_gc_thread(false),
+               gc(NULL), use_gc_thread(false), quota_threads(false),
                num_watchers(0), watchers(NULL), watch_handles(NULL),
                watch_initialized(false),
                bucket_id_lock("rados_bucket_id"), max_bucket_id(0),
@@ -967,9 +968,10 @@ public:
 
   CephContext *ctx() { return cct; }
   /** do all necessary setup of the storage device */
-  int initialize(CephContext *_cct, bool _use_gc_thread) {
+  int initialize(CephContext *_cct, bool _use_gc_thread, bool _quota_threads) {
     set_context(_cct);
     use_gc_thread = _use_gc_thread;
+    quota_threads = _quota_threads;
     return initialize();
   }
   /** Initialize the RADOS instance and prepare to do other ops */
@@ -1361,7 +1363,7 @@ public:
   int get_bucket_instance_info(void *ctx, rgw_bucket& bucket, RGWBucketInfo& info, time_t *pmtime, map<string, bufferlist> *pattrs);
   int get_bucket_instance_from_oid(void *ctx, string& oid, RGWBucketInfo& info, time_t *pmtime, map<string, bufferlist> *pattrs);
 
-  virtual int get_bucket_info(void *ctx, string& bucket_name, RGWBucketInfo& info,
+  virtual int get_bucket_info(void *ctx, const string& bucket_name, RGWBucketInfo& info,
                               time_t *pmtime, map<string, bufferlist> *pattrs = NULL);
   virtual int put_linked_bucket_info(RGWBucketInfo& info, bool exclusive, time_t mtime, obj_version *pep_objv,
                                      map<string, bufferlist> *pattrs, bool create_entry_point);
@@ -1527,15 +1529,15 @@ public:
 class RGWStoreManager {
 public:
   RGWStoreManager() {}
-  static RGWRados *get_storage(CephContext *cct, bool use_gc_thread) {
-    RGWRados *store = init_storage_provider(cct, use_gc_thread);
+  static RGWRados *get_storage(CephContext *cct, bool use_gc_thread, bool quota_threads) {
+    RGWRados *store = init_storage_provider(cct, use_gc_thread, quota_threads);
     return store;
   }
   static RGWRados *get_raw_storage(CephContext *cct) {
     RGWRados *store = init_raw_storage_provider(cct);
     return store;
   }
-  static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread);
+  static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread, bool quota_threads);
   static RGWRados *init_raw_storage_provider(CephContext *cct);
   static void close_storage(RGWRados *store);
 
index 7e769332cc3f7132f4b69daa5ce441b3bb2326d0..3a70ee24b374c519aacdb93cd537d5b152fec749 100644 (file)
@@ -41,6 +41,46 @@ bool rgw_user_is_authenticated(RGWUserInfo& info)
   return (info.user_id != RGW_USER_ANON_ID);
 }
 
+int rgw_user_sync_all_stats(RGWRados *store, const string& user_id)
+{
+  CephContext *cct = store->ctx();
+  size_t max_entries = cct->_conf->rgw_list_buckets_max_chunk;
+  bool done;
+  string marker;
+  int ret;
+
+  do {
+    RGWUserBuckets user_buckets;
+    ret = rgw_read_user_buckets(store, user_id, user_buckets, marker, max_entries, false);
+    if (ret < 0) {
+      ldout(cct, 0) << "failed to read user buckets: ret=" << ret << dendl;
+      return ret;
+    }
+    map<string, RGWBucketEnt>& buckets = user_buckets.get_buckets();
+    for (map<string, RGWBucketEnt>::iterator i = buckets.begin();
+         i != buckets.end();
+         ++i) {
+      marker = i->first;
+
+      RGWBucketEnt& bucket_ent = i->second;
+      ret = rgw_bucket_sync_user_stats(store, user_id, bucket_ent.bucket);
+      if (ret < 0) {
+        ldout(cct, 0) << "ERROR: could not sync bucket stats: ret=" << ret << dendl;
+        return ret;
+      }
+    }
+    done = (buckets.size() < max_entries);
+  } while (!done);
+
+  ret = store->complete_sync_user_stats(user_id);
+  if (ret < 0) {
+    cerr << "ERROR: failed to complete syncing user stats: ret=" << ret << std::endl;
+    return ret;
+  }
+
+  return 0;
+}
+
 /**
  * Save the given user information to storage.
  * Returns: 0 on success, -ERR# on failure.
index 9d4315bdd6650ec2d704326505840b116c8cfed3..be3ebcfdf02bea1501f9a5454eb4b27339993fc6 100644 (file)
@@ -38,6 +38,7 @@ struct RGWUID
 };
 WRITE_CLASS_ENCODER(RGWUID)
 
+extern int rgw_user_sync_all_stats(RGWRados *store, const string& user_id);
 /**
  * Get the anonymous (ie, unauthenticated) user info.
  */