From 0f7b3c722fca4edf3a4be0d227588b695bb2aaed Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 14 Jan 2014 14:48:16 -0800 Subject: [PATCH] rgw: quota thread for full user stats sync Get user stats up to date periodically. Add configurables for different periods, whether we update idle users. Make sure radosgw-admin does not start the quota threads. Signed-off-by: Yehuda Sadeh --- src/common/config_opts.h | 3 + src/rgw/rgw_admin.cc | 53 +------------ src/rgw/rgw_bucket.cc | 18 +++++ src/rgw/rgw_bucket.h | 1 + src/rgw/rgw_main.cc | 2 +- src/rgw/rgw_quota.cc | 157 ++++++++++++++++++++++++++++++++++++--- src/rgw/rgw_quota.h | 2 +- src/rgw/rgw_rados.cc | 8 +- src/rgw/rgw_rados.h | 14 ++-- src/rgw/rgw_user.cc | 40 ++++++++++ src/rgw/rgw_user.h | 1 + 11 files changed, 226 insertions(+), 73 deletions(-) diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 7a8c24cf88512..3a4ce74632608 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -731,6 +731,9 @@ OPTION(rgw_bucket_quota_ttl, OPT_INT, 600) // time for cached bucket stats to be OPTION(rgw_bucket_quota_soft_threshold, OPT_DOUBLE, 0.95) // threshold from which we don't rely on cached info for quota decisions OPTION(rgw_bucket_quota_cache_size, OPT_INT, 10000) // number of entries in bucket quota cache OPTION(rgw_user_quota_bucket_sync_interval, OPT_INT, 180) // time period for accumulating modified buckets before syncing stats +OPTION(rgw_user_quota_sync_interval, OPT_INT, 3600 * 24) // time period for accumulating modified buckets before syncing entire user stats +OPTION(rgw_user_quota_sync_idle_users, OPT_BOOL, false) // whether stats for idle users be fully synced +OPTION(rgw_user_quota_sync_wait_time, OPT_INT, 3600 * 24) // min time between two full stats syc for non-idle users OPTION(mutex_perf_counter, OPT_BOOL, false) // enable/disable mutex perf counter diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 9852558b8a455..c26afbfc5a031 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -740,24 +740,6 @@ int set_user_quota(int opt_cmd, RGWUser& user, RGWUserAdminOpState& op_state, in return 0; } -static int sync_bucket_stats(RGWRados *store, string& bucket_name) -{ - RGWBucketInfo bucket_info; - int r = store->get_bucket_info(NULL, bucket_name, bucket_info, NULL); - if (r < 0) { - cerr << "ERROR: could not fetch bucket info: " << cpp_strerror(-r) << std::endl; - return r; - } - - r = rgw_bucket_sync_user_stats(store, bucket_info.owner, bucket_info.bucket); - if (r < 0) { - cerr << "ERROR: could not sync user stats for bucket " << bucket_name << ": " << cpp_strerror(-r) << std::endl; - return r; - } - - return 0; -} - int main(int argc, char **argv) { vector args; @@ -1052,7 +1034,7 @@ int main(int argc, char **argv) if (raw_storage_op) { store = RGWStoreManager::get_raw_storage(g_ceph_context); } else { - store = RGWStoreManager::get_storage(g_ceph_context, false); + store = RGWStoreManager::get_storage(g_ceph_context, false, false); } if (!store) { cerr << "couldn't init storage provider" << std::endl; @@ -1934,42 +1916,15 @@ next: if (opt_cmd == OPT_USER_STATS) { if (sync_stats) { if (!bucket_name.empty()) { - int ret = sync_bucket_stats(store, bucket_name); + int ret = rgw_bucket_sync_user_stats(store, bucket_name); if (ret < 0) { cerr << "ERROR: could not sync bucket stats: " << cpp_strerror(-ret) << std::endl; return -ret; } } else { - size_t max_entries = g_conf->rgw_list_buckets_max_chunk; - - bool done; - - do { - RGWUserBuckets user_buckets; - int ret = rgw_read_user_buckets(store, user_id, user_buckets, marker, max_entries, false); - if (ret < 0) { - cerr << "failed to read user buckets: " << cpp_strerror(-ret) << std::endl; - return -ret; - } - map& buckets = user_buckets.get_buckets(); - for (map::iterator i = buckets.begin(); - i != buckets.end(); - ++i) { - marker = i->first; - - RGWBucketEnt& bucket_ent = i->second; - ret = sync_bucket_stats(store, bucket_ent.bucket.name); - if (ret < 0) { - cerr << "ERROR: could not sync bucket stats: " << cpp_strerror(-ret) << std::endl; - return -ret; - } - } - done = (buckets.size() < max_entries); - } while (!done); - - int ret = store->complete_sync_user_stats(user_id); + int ret = rgw_user_sync_all_stats(store, user_id); if (ret < 0) { - cerr << "ERROR: failed to complete syncing user stats: " << cpp_strerror(-ret) << std::endl; + cerr << "ERROR: failed to sync user stats: " << cpp_strerror(-ret) << std::endl; return -ret; } } diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index 3e7a44e15a42c..2a9e061d52902 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -87,6 +87,24 @@ int rgw_bucket_sync_user_stats(RGWRados *store, const string& user_id, rgw_bucke return store->cls_user_sync_bucket_stats(obj, bucket); } +int rgw_bucket_sync_user_stats(RGWRados *store, const string& bucket_name) +{ + RGWBucketInfo bucket_info; + int ret = store->get_bucket_info(NULL, bucket_name, bucket_info, NULL); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: could not fetch bucket info: ret=" << ret << dendl; + return ret; + } + + ret = rgw_bucket_sync_user_stats(store, bucket_info.owner, bucket_info.bucket); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: could not sync user stats for bucket " << bucket_name << ": ret=" << ret << dendl; + return ret; + } + + return 0; +} + int rgw_link_bucket(RGWRados *store, string user_id, rgw_bucket& bucket, time_t creation_time, bool update_entrypoint) { int ret; diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h index cc941a595c0e0..b302238328b39 100644 --- a/src/rgw/rgw_bucket.h +++ b/src/rgw/rgw_bucket.h @@ -34,6 +34,7 @@ extern int rgw_bucket_instance_remove_entry(RGWRados *store, string& entry, RGWO extern int rgw_bucket_delete_bucket_obj(RGWRados *store, string& bucket_name, RGWObjVersionTracker& objv_tracker); extern int rgw_bucket_sync_user_stats(RGWRados *store, const string& user_id, rgw_bucket& bucket); +extern int rgw_bucket_sync_user_stats(RGWRados *store, const string& bucket_name); /** * Store a list of the user's buckets, with associated functinos. diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc index 82568ff390924..b56f549925720 100644 --- a/src/rgw/rgw_main.cc +++ b/src/rgw/rgw_main.cc @@ -514,7 +514,7 @@ int main(int argc, const char **argv) FCGX_Init(); int r = 0; - RGWRados *store = RGWStoreManager::get_storage(g_ceph_context, true); + RGWRados *store = RGWStoreManager::get_storage(g_ceph_context, true, true); if (!store) { derr << "Couldn't init storage provider (RADOS)" << dendl; r = EIO; diff --git a/src/rgw/rgw_quota.cc b/src/rgw/rgw_quota.cc index 3e677b30f8487..7a967cbe88a06 100644 --- a/src/rgw/rgw_quota.cc +++ b/src/rgw/rgw_quota.cc @@ -24,6 +24,7 @@ #include "rgw_rados.h" #include "rgw_quota.h" #include "rgw_bucket.h" +#include "rgw_user.h" #define dout_subsys ceph_subsys_rgw @@ -430,7 +431,51 @@ class RGWUserStatsCache : public RGWQuotaCache { } }; + /* + * thread, full sync all users stats periodically + * + * only sync non idle users or ones that never got synced before, this is needed so that + * users that didn't have quota turned on before (or existed before the user objclass + * tracked stats) need to get their backend stats up to date. + */ + class UserSyncThread : public Thread { + CephContext *cct; + RGWUserStatsCache *stats; + + Mutex lock; + Cond cond; + public: + + UserSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s), lock("RGWUserStatsCache::UserSyncThread") {} + + void *entry() { + ldout(cct, 20) << "UserSyncThread: start" << dendl; + do { + + string key = "user"; + + int ret = stats->sync_all_users(); + if (ret < 0) { + ldout(cct, 0) << "ERROR: sync_all_users() returned ret=" << ret << dendl; + } + + lock.Lock(); + cond.WaitInterval(cct, lock, utime_t(cct->_conf->rgw_user_quota_sync_interval, 0)); + lock.Unlock(); + } while (!stats->going_down()); + ldout(cct, 20) << "UserSyncThread: done" << dendl; + + return NULL; + } + + void stop() { + Mutex::Locker l(lock); + cond.Signal(); + } + }; + BucketsSyncThread *buckets_sync_thread; + UserSyncThread *user_sync_thread; protected: bool map_find(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) { return stats_map.find(user, qs); @@ -446,6 +491,8 @@ protected: int fetch_stats_from_storage(const string& user, rgw_bucket& bucket, RGWStorageStats& stats); int sync_bucket(const string& user, rgw_bucket& bucket); + int sync_user(const string& user); + int sync_all_users(); void data_modified(const string& user, rgw_bucket& bucket); @@ -455,11 +502,30 @@ protected: rwlock.unlock(); } + template /* easier doing it as a template, Thread doesn't have ->stop() */ + void stop_thread(T **pthr) { + T *thread = *pthr; + if (!thread) + return; + + thread->stop(); + thread->join(); + delete thread; + *pthr = NULL; + } + public: - RGWUserStatsCache(RGWRados *_store) : RGWQuotaCache(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size), + RGWUserStatsCache(RGWRados *_store, bool quota_threads) : RGWQuotaCache(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size), rwlock("RGWUserStatsCache::rwlock") { - buckets_sync_thread = new BucketsSyncThread(store->ctx(), this); - buckets_sync_thread->create(); + if (quota_threads) { + buckets_sync_thread = new BucketsSyncThread(store->ctx(), this); + buckets_sync_thread->create(); + user_sync_thread = new UserSyncThread(store->ctx(), this); + user_sync_thread->create(); + } else { + buckets_sync_thread = NULL; + user_sync_thread = NULL; + } } ~RGWUserStatsCache() { stop(); @@ -483,13 +549,9 @@ public: void stop() { down_flag.set(1); rwlock.get_write(); - if (buckets_sync_thread) { - buckets_sync_thread->stop(); - buckets_sync_thread->join(); - delete buckets_sync_thread; - buckets_sync_thread = NULL; - } + stop_thread(&buckets_sync_thread); rwlock.unlock(); + stop_thread(&user_sync_thread); } }; @@ -515,6 +577,77 @@ int RGWUserStatsCache::sync_bucket(const string& user, rgw_bucket& bucket) return 0; } +int RGWUserStatsCache::sync_user(const string& user) +{ + cls_user_header header; + int ret = store->cls_user_get_header(user, &header); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: can't read user header: ret=" << ret << dendl; + return ret; + } + + if (!store->ctx()->_conf->rgw_user_quota_sync_idle_users && + header.last_stats_update < header.last_stats_sync) { + ldout(store->ctx(), 20) << "user is idle, not doing a full sync (user=" << user << ")" << dendl; + return 0; + } + + utime_t when_need_full_sync = header.last_stats_sync; + when_need_full_sync += store->ctx()->_conf->rgw_user_quota_sync_wait_time; + + // check if enough time passed since last full sync + + ret = rgw_user_sync_all_stats(store, user); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed user stats sync, ret=" << ret << dendl; + return ret; + } + + return 0; +} + +int RGWUserStatsCache::sync_all_users() +{ + string key = "user"; + void *handle; + + int ret = store->meta_mgr->list_keys_init(key, &handle); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: can't get key: ret=" << ret << dendl; + return ret; + } + + bool truncated; + int max = 1000; + + do { + list keys; + ret = store->meta_mgr->list_keys_next(handle, max, keys, &truncated); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: lists_keys_next(): ret=" << ret << dendl; + goto done; + } + for (list::iterator iter = keys.begin(); + iter != keys.end() && !going_down(); + ++iter) { + string& user = *iter; + ldout(store->ctx(), 20) << "RGWUserStatsCache: sync user=" << user << dendl; + int ret = sync_user(user); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: sync_user() failed, user=" << user << " ret=" << ret << dendl; + + /* continuing to next user */ + continue; + } + } + } while (truncated); + + ret = 0; +done: + store->meta_mgr->list_keys_complete(handle); + return ret; +} + void RGWUserStatsCache::data_modified(const string& user, rgw_bucket& bucket) { /* racy, but it's ok */ @@ -557,7 +690,7 @@ class RGWQuotaHandlerImpl : public RGWQuotaHandler { return 0; } public: - RGWQuotaHandlerImpl(RGWRados *_store) : store(_store), bucket_stats_cache(_store), user_stats_cache(_store) {} + RGWQuotaHandlerImpl(RGWRados *_store, bool quota_threads) : store(_store), bucket_stats_cache(_store), user_stats_cache(_store, quota_threads) {} virtual int check_quota(const string& user, rgw_bucket& bucket, RGWQuotaInfo& user_quota, RGWQuotaInfo& bucket_quota, uint64_t num_objs, uint64_t size) { @@ -608,9 +741,9 @@ public: }; -RGWQuotaHandler *RGWQuotaHandler::generate_handler(RGWRados *store) +RGWQuotaHandler *RGWQuotaHandler::generate_handler(RGWRados *store, bool quota_threads) { - return new RGWQuotaHandlerImpl(store); + return new RGWQuotaHandlerImpl(store, quota_threads); }; void RGWQuotaHandler::free_handler(RGWQuotaHandler *handler) diff --git a/src/rgw/rgw_quota.h b/src/rgw/rgw_quota.h index 5595a73739a64..abdb62ed90b34 100644 --- a/src/rgw/rgw_quota.h +++ b/src/rgw/rgw_quota.h @@ -68,7 +68,7 @@ public: virtual void update_stats(const string& bucket_owner, rgw_bucket& bucket, int obj_delta, uint64_t added_bytes, uint64_t removed_bytes) = 0; - static RGWQuotaHandler *generate_handler(RGWRados *store); + static RGWQuotaHandler *generate_handler(RGWRados *store, bool quota_threads); static void free_handler(RGWQuotaHandler *handler); }; diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 5a10e42741655..2dbcd52595d1e 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -1005,7 +1005,7 @@ int RGWRados::init_complete() if (use_gc_thread) gc->start_processor(); - quota_handler = RGWQuotaHandler::generate_handler(this); + quota_handler = RGWQuotaHandler::generate_handler(this, quota_threads); return ret; } @@ -4857,7 +4857,7 @@ int RGWRados::get_bucket_entrypoint_info(void *ctx, const string& bucket_name, return 0; } -int RGWRados::get_bucket_info(void *ctx, string& bucket_name, RGWBucketInfo& info, +int RGWRados::get_bucket_info(void *ctx, const string& bucket_name, RGWBucketInfo& info, time_t *pmtime, map *pattrs) { bufferlist bl; @@ -6328,7 +6328,7 @@ uint64_t RGWRados::next_bucket_id() return ++max_bucket_id; } -RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread) +RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool quota_threads) { int use_cache = cct->_conf->rgw_cache_enabled; RGWRados *store = NULL; @@ -6338,7 +6338,7 @@ RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_t store = new RGWCache; } - if (store->initialize(cct, use_gc_thread) < 0) { + if (store->initialize(cct, use_gc_thread, quota_threads) < 0) { delete store; return NULL; } diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 34005392526bb..fc40eb2e80929 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -843,6 +843,7 @@ class RGWRados RGWGC *gc; bool use_gc_thread; + bool quota_threads; int num_watchers; RGWWatcher **watchers; @@ -914,7 +915,7 @@ protected: public: RGWRados() : lock("rados_timer_lock"), timer(NULL), - gc(NULL), use_gc_thread(false), + gc(NULL), use_gc_thread(false), quota_threads(false), num_watchers(0), watchers(NULL), watch_handles(NULL), watch_initialized(false), bucket_id_lock("rados_bucket_id"), max_bucket_id(0), @@ -967,9 +968,10 @@ public: CephContext *ctx() { return cct; } /** do all necessary setup of the storage device */ - int initialize(CephContext *_cct, bool _use_gc_thread) { + int initialize(CephContext *_cct, bool _use_gc_thread, bool _quota_threads) { set_context(_cct); use_gc_thread = _use_gc_thread; + quota_threads = _quota_threads; return initialize(); } /** Initialize the RADOS instance and prepare to do other ops */ @@ -1361,7 +1363,7 @@ public: int get_bucket_instance_info(void *ctx, rgw_bucket& bucket, RGWBucketInfo& info, time_t *pmtime, map *pattrs); int get_bucket_instance_from_oid(void *ctx, string& oid, RGWBucketInfo& info, time_t *pmtime, map *pattrs); - virtual int get_bucket_info(void *ctx, string& bucket_name, RGWBucketInfo& info, + virtual int get_bucket_info(void *ctx, const string& bucket_name, RGWBucketInfo& info, time_t *pmtime, map *pattrs = NULL); virtual int put_linked_bucket_info(RGWBucketInfo& info, bool exclusive, time_t mtime, obj_version *pep_objv, map *pattrs, bool create_entry_point); @@ -1527,15 +1529,15 @@ public: class RGWStoreManager { public: RGWStoreManager() {} - static RGWRados *get_storage(CephContext *cct, bool use_gc_thread) { - RGWRados *store = init_storage_provider(cct, use_gc_thread); + static RGWRados *get_storage(CephContext *cct, bool use_gc_thread, bool quota_threads) { + RGWRados *store = init_storage_provider(cct, use_gc_thread, quota_threads); return store; } static RGWRados *get_raw_storage(CephContext *cct) { RGWRados *store = init_raw_storage_provider(cct); return store; } - static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread); + static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread, bool quota_threads); static RGWRados *init_raw_storage_provider(CephContext *cct); static void close_storage(RGWRados *store); diff --git a/src/rgw/rgw_user.cc b/src/rgw/rgw_user.cc index 7e769332cc3f7..3a70ee24b374c 100644 --- a/src/rgw/rgw_user.cc +++ b/src/rgw/rgw_user.cc @@ -41,6 +41,46 @@ bool rgw_user_is_authenticated(RGWUserInfo& info) return (info.user_id != RGW_USER_ANON_ID); } +int rgw_user_sync_all_stats(RGWRados *store, const string& user_id) +{ + CephContext *cct = store->ctx(); + size_t max_entries = cct->_conf->rgw_list_buckets_max_chunk; + bool done; + string marker; + int ret; + + do { + RGWUserBuckets user_buckets; + ret = rgw_read_user_buckets(store, user_id, user_buckets, marker, max_entries, false); + if (ret < 0) { + ldout(cct, 0) << "failed to read user buckets: ret=" << ret << dendl; + return ret; + } + map& buckets = user_buckets.get_buckets(); + for (map::iterator i = buckets.begin(); + i != buckets.end(); + ++i) { + marker = i->first; + + RGWBucketEnt& bucket_ent = i->second; + ret = rgw_bucket_sync_user_stats(store, user_id, bucket_ent.bucket); + if (ret < 0) { + ldout(cct, 0) << "ERROR: could not sync bucket stats: ret=" << ret << dendl; + return ret; + } + } + done = (buckets.size() < max_entries); + } while (!done); + + ret = store->complete_sync_user_stats(user_id); + if (ret < 0) { + cerr << "ERROR: failed to complete syncing user stats: ret=" << ret << std::endl; + return ret; + } + + return 0; +} + /** * Save the given user information to storage. * Returns: 0 on success, -ERR# on failure. diff --git a/src/rgw/rgw_user.h b/src/rgw/rgw_user.h index 9d4315bdd6650..be3ebcfdf02be 100644 --- a/src/rgw/rgw_user.h +++ b/src/rgw/rgw_user.h @@ -38,6 +38,7 @@ struct RGWUID }; WRITE_CLASS_ENCODER(RGWUID) +extern int rgw_user_sync_all_stats(RGWRados *store, const string& user_id); /** * Get the anonymous (ie, unauthenticated) user info. */ -- 2.39.5