OPTION(rgw_bucket_quota_soft_threshold, OPT_DOUBLE, 0.95) // threshold from which we don't rely on cached info for quota decisions
OPTION(rgw_bucket_quota_cache_size, OPT_INT, 10000) // number of entries in bucket quota cache
OPTION(rgw_user_quota_bucket_sync_interval, OPT_INT, 180) // time period for accumulating modified buckets before syncing stats
+OPTION(rgw_user_quota_sync_interval, OPT_INT, 3600 * 24) // time period for accumulating modified buckets before syncing entire user stats
+OPTION(rgw_user_quota_sync_idle_users, OPT_BOOL, false) // whether stats for idle users be fully synced
+OPTION(rgw_user_quota_sync_wait_time, OPT_INT, 3600 * 24) // min time between two full stats syc for non-idle users
OPTION(mutex_perf_counter, OPT_BOOL, false) // enable/disable mutex perf counter
return 0;
}
-static int sync_bucket_stats(RGWRados *store, string& bucket_name)
-{
- RGWBucketInfo bucket_info;
- int r = store->get_bucket_info(NULL, bucket_name, bucket_info, NULL);
- if (r < 0) {
- cerr << "ERROR: could not fetch bucket info: " << cpp_strerror(-r) << std::endl;
- return r;
- }
-
- r = rgw_bucket_sync_user_stats(store, bucket_info.owner, bucket_info.bucket);
- if (r < 0) {
- cerr << "ERROR: could not sync user stats for bucket " << bucket_name << ": " << cpp_strerror(-r) << std::endl;
- return r;
- }
-
- return 0;
-}
-
int main(int argc, char **argv)
{
vector<const char*> args;
if (raw_storage_op) {
store = RGWStoreManager::get_raw_storage(g_ceph_context);
} else {
- store = RGWStoreManager::get_storage(g_ceph_context, false);
+ store = RGWStoreManager::get_storage(g_ceph_context, false, false);
}
if (!store) {
cerr << "couldn't init storage provider" << std::endl;
if (opt_cmd == OPT_USER_STATS) {
if (sync_stats) {
if (!bucket_name.empty()) {
- int ret = sync_bucket_stats(store, bucket_name);
+ int ret = rgw_bucket_sync_user_stats(store, bucket_name);
if (ret < 0) {
cerr << "ERROR: could not sync bucket stats: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
} else {
- size_t max_entries = g_conf->rgw_list_buckets_max_chunk;
-
- bool done;
-
- do {
- RGWUserBuckets user_buckets;
- int ret = rgw_read_user_buckets(store, user_id, user_buckets, marker, max_entries, false);
- if (ret < 0) {
- cerr << "failed to read user buckets: " << cpp_strerror(-ret) << std::endl;
- return -ret;
- }
- map<string, RGWBucketEnt>& buckets = user_buckets.get_buckets();
- for (map<string, RGWBucketEnt>::iterator i = buckets.begin();
- i != buckets.end();
- ++i) {
- marker = i->first;
-
- RGWBucketEnt& bucket_ent = i->second;
- ret = sync_bucket_stats(store, bucket_ent.bucket.name);
- if (ret < 0) {
- cerr << "ERROR: could not sync bucket stats: " << cpp_strerror(-ret) << std::endl;
- return -ret;
- }
- }
- done = (buckets.size() < max_entries);
- } while (!done);
-
- int ret = store->complete_sync_user_stats(user_id);
+ int ret = rgw_user_sync_all_stats(store, user_id);
if (ret < 0) {
- cerr << "ERROR: failed to complete syncing user stats: " << cpp_strerror(-ret) << std::endl;
+ cerr << "ERROR: failed to sync user stats: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
}
return store->cls_user_sync_bucket_stats(obj, bucket);
}
+int rgw_bucket_sync_user_stats(RGWRados *store, const string& bucket_name)
+{
+ RGWBucketInfo bucket_info;
+ int ret = store->get_bucket_info(NULL, bucket_name, bucket_info, NULL);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: could not fetch bucket info: ret=" << ret << dendl;
+ return ret;
+ }
+
+ ret = rgw_bucket_sync_user_stats(store, bucket_info.owner, bucket_info.bucket);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: could not sync user stats for bucket " << bucket_name << ": ret=" << ret << dendl;
+ return ret;
+ }
+
+ return 0;
+}
+
int rgw_link_bucket(RGWRados *store, string user_id, rgw_bucket& bucket, time_t creation_time, bool update_entrypoint)
{
int ret;
extern int rgw_bucket_delete_bucket_obj(RGWRados *store, string& bucket_name, RGWObjVersionTracker& objv_tracker);
extern int rgw_bucket_sync_user_stats(RGWRados *store, const string& user_id, rgw_bucket& bucket);
+extern int rgw_bucket_sync_user_stats(RGWRados *store, const string& bucket_name);
/**
* Store a list of the user's buckets, with associated functinos.
FCGX_Init();
int r = 0;
- RGWRados *store = RGWStoreManager::get_storage(g_ceph_context, true);
+ RGWRados *store = RGWStoreManager::get_storage(g_ceph_context, true, true);
if (!store) {
derr << "Couldn't init storage provider (RADOS)" << dendl;
r = EIO;
#include "rgw_rados.h"
#include "rgw_quota.h"
#include "rgw_bucket.h"
+#include "rgw_user.h"
#define dout_subsys ceph_subsys_rgw
}
};
+ /*
+ * thread, full sync all users stats periodically
+ *
+ * only sync non idle users or ones that never got synced before, this is needed so that
+ * users that didn't have quota turned on before (or existed before the user objclass
+ * tracked stats) need to get their backend stats up to date.
+ */
+ class UserSyncThread : public Thread {
+ CephContext *cct;
+ RGWUserStatsCache *stats;
+
+ Mutex lock;
+ Cond cond;
+ public:
+
+ UserSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s), lock("RGWUserStatsCache::UserSyncThread") {}
+
+ void *entry() {
+ ldout(cct, 20) << "UserSyncThread: start" << dendl;
+ do {
+
+ string key = "user";
+
+ int ret = stats->sync_all_users();
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: sync_all_users() returned ret=" << ret << dendl;
+ }
+
+ lock.Lock();
+ cond.WaitInterval(cct, lock, utime_t(cct->_conf->rgw_user_quota_sync_interval, 0));
+ lock.Unlock();
+ } while (!stats->going_down());
+ ldout(cct, 20) << "UserSyncThread: done" << dendl;
+
+ return NULL;
+ }
+
+ void stop() {
+ Mutex::Locker l(lock);
+ cond.Signal();
+ }
+ };
+
BucketsSyncThread *buckets_sync_thread;
+ UserSyncThread *user_sync_thread;
protected:
bool map_find(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) {
return stats_map.find(user, qs);
int fetch_stats_from_storage(const string& user, rgw_bucket& bucket, RGWStorageStats& stats);
int sync_bucket(const string& user, rgw_bucket& bucket);
+ int sync_user(const string& user);
+ int sync_all_users();
void data_modified(const string& user, rgw_bucket& bucket);
rwlock.unlock();
}
+ template<class T> /* easier doing it as a template, Thread doesn't have ->stop() */
+ void stop_thread(T **pthr) {
+ T *thread = *pthr;
+ if (!thread)
+ return;
+
+ thread->stop();
+ thread->join();
+ delete thread;
+ *pthr = NULL;
+ }
+
public:
- RGWUserStatsCache(RGWRados *_store) : RGWQuotaCache(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size),
+ RGWUserStatsCache(RGWRados *_store, bool quota_threads) : RGWQuotaCache(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size),
rwlock("RGWUserStatsCache::rwlock") {
- buckets_sync_thread = new BucketsSyncThread(store->ctx(), this);
- buckets_sync_thread->create();
+ if (quota_threads) {
+ buckets_sync_thread = new BucketsSyncThread(store->ctx(), this);
+ buckets_sync_thread->create();
+ user_sync_thread = new UserSyncThread(store->ctx(), this);
+ user_sync_thread->create();
+ } else {
+ buckets_sync_thread = NULL;
+ user_sync_thread = NULL;
+ }
}
~RGWUserStatsCache() {
stop();
void stop() {
down_flag.set(1);
rwlock.get_write();
- if (buckets_sync_thread) {
- buckets_sync_thread->stop();
- buckets_sync_thread->join();
- delete buckets_sync_thread;
- buckets_sync_thread = NULL;
- }
+ stop_thread(&buckets_sync_thread);
rwlock.unlock();
+ stop_thread(&user_sync_thread);
}
};
return 0;
}
+int RGWUserStatsCache::sync_user(const string& user)
+{
+ cls_user_header header;
+ int ret = store->cls_user_get_header(user, &header);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: can't read user header: ret=" << ret << dendl;
+ return ret;
+ }
+
+ if (!store->ctx()->_conf->rgw_user_quota_sync_idle_users &&
+ header.last_stats_update < header.last_stats_sync) {
+ ldout(store->ctx(), 20) << "user is idle, not doing a full sync (user=" << user << ")" << dendl;
+ return 0;
+ }
+
+ utime_t when_need_full_sync = header.last_stats_sync;
+ when_need_full_sync += store->ctx()->_conf->rgw_user_quota_sync_wait_time;
+
+ // check if enough time passed since last full sync
+
+ ret = rgw_user_sync_all_stats(store, user);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed user stats sync, ret=" << ret << dendl;
+ return ret;
+ }
+
+ return 0;
+}
+
+int RGWUserStatsCache::sync_all_users()
+{
+ string key = "user";
+ void *handle;
+
+ int ret = store->meta_mgr->list_keys_init(key, &handle);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: can't get key: ret=" << ret << dendl;
+ return ret;
+ }
+
+ bool truncated;
+ int max = 1000;
+
+ do {
+ list<string> keys;
+ ret = store->meta_mgr->list_keys_next(handle, max, keys, &truncated);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: lists_keys_next(): ret=" << ret << dendl;
+ goto done;
+ }
+ for (list<string>::iterator iter = keys.begin();
+ iter != keys.end() && !going_down();
+ ++iter) {
+ string& user = *iter;
+ ldout(store->ctx(), 20) << "RGWUserStatsCache: sync user=" << user << dendl;
+ int ret = sync_user(user);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: sync_user() failed, user=" << user << " ret=" << ret << dendl;
+
+ /* continuing to next user */
+ continue;
+ }
+ }
+ } while (truncated);
+
+ ret = 0;
+done:
+ store->meta_mgr->list_keys_complete(handle);
+ return ret;
+}
+
void RGWUserStatsCache::data_modified(const string& user, rgw_bucket& bucket)
{
/* racy, but it's ok */
return 0;
}
public:
- RGWQuotaHandlerImpl(RGWRados *_store) : store(_store), bucket_stats_cache(_store), user_stats_cache(_store) {}
+ RGWQuotaHandlerImpl(RGWRados *_store, bool quota_threads) : store(_store), bucket_stats_cache(_store), user_stats_cache(_store, quota_threads) {}
virtual int check_quota(const string& user, rgw_bucket& bucket,
RGWQuotaInfo& user_quota, RGWQuotaInfo& bucket_quota,
uint64_t num_objs, uint64_t size) {
};
-RGWQuotaHandler *RGWQuotaHandler::generate_handler(RGWRados *store)
+RGWQuotaHandler *RGWQuotaHandler::generate_handler(RGWRados *store, bool quota_threads)
{
- return new RGWQuotaHandlerImpl(store);
+ return new RGWQuotaHandlerImpl(store, quota_threads);
};
void RGWQuotaHandler::free_handler(RGWQuotaHandler *handler)
virtual void update_stats(const string& bucket_owner, rgw_bucket& bucket, int obj_delta, uint64_t added_bytes, uint64_t removed_bytes) = 0;
- static RGWQuotaHandler *generate_handler(RGWRados *store);
+ static RGWQuotaHandler *generate_handler(RGWRados *store, bool quota_threads);
static void free_handler(RGWQuotaHandler *handler);
};
if (use_gc_thread)
gc->start_processor();
- quota_handler = RGWQuotaHandler::generate_handler(this);
+ quota_handler = RGWQuotaHandler::generate_handler(this, quota_threads);
return ret;
}
return 0;
}
-int RGWRados::get_bucket_info(void *ctx, string& bucket_name, RGWBucketInfo& info,
+int RGWRados::get_bucket_info(void *ctx, const string& bucket_name, RGWBucketInfo& info,
time_t *pmtime, map<string, bufferlist> *pattrs)
{
bufferlist bl;
return ++max_bucket_id;
}
-RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread)
+RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool quota_threads)
{
int use_cache = cct->_conf->rgw_cache_enabled;
RGWRados *store = NULL;
store = new RGWCache<RGWRados>;
}
- if (store->initialize(cct, use_gc_thread) < 0) {
+ if (store->initialize(cct, use_gc_thread, quota_threads) < 0) {
delete store;
return NULL;
}
RGWGC *gc;
bool use_gc_thread;
+ bool quota_threads;
int num_watchers;
RGWWatcher **watchers;
public:
RGWRados() : lock("rados_timer_lock"), timer(NULL),
- gc(NULL), use_gc_thread(false),
+ gc(NULL), use_gc_thread(false), quota_threads(false),
num_watchers(0), watchers(NULL), watch_handles(NULL),
watch_initialized(false),
bucket_id_lock("rados_bucket_id"), max_bucket_id(0),
CephContext *ctx() { return cct; }
/** do all necessary setup of the storage device */
- int initialize(CephContext *_cct, bool _use_gc_thread) {
+ int initialize(CephContext *_cct, bool _use_gc_thread, bool _quota_threads) {
set_context(_cct);
use_gc_thread = _use_gc_thread;
+ quota_threads = _quota_threads;
return initialize();
}
/** Initialize the RADOS instance and prepare to do other ops */
int get_bucket_instance_info(void *ctx, rgw_bucket& bucket, RGWBucketInfo& info, time_t *pmtime, map<string, bufferlist> *pattrs);
int get_bucket_instance_from_oid(void *ctx, string& oid, RGWBucketInfo& info, time_t *pmtime, map<string, bufferlist> *pattrs);
- virtual int get_bucket_info(void *ctx, string& bucket_name, RGWBucketInfo& info,
+ virtual int get_bucket_info(void *ctx, const string& bucket_name, RGWBucketInfo& info,
time_t *pmtime, map<string, bufferlist> *pattrs = NULL);
virtual int put_linked_bucket_info(RGWBucketInfo& info, bool exclusive, time_t mtime, obj_version *pep_objv,
map<string, bufferlist> *pattrs, bool create_entry_point);
class RGWStoreManager {
public:
RGWStoreManager() {}
- static RGWRados *get_storage(CephContext *cct, bool use_gc_thread) {
- RGWRados *store = init_storage_provider(cct, use_gc_thread);
+ static RGWRados *get_storage(CephContext *cct, bool use_gc_thread, bool quota_threads) {
+ RGWRados *store = init_storage_provider(cct, use_gc_thread, quota_threads);
return store;
}
static RGWRados *get_raw_storage(CephContext *cct) {
RGWRados *store = init_raw_storage_provider(cct);
return store;
}
- static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread);
+ static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread, bool quota_threads);
static RGWRados *init_raw_storage_provider(CephContext *cct);
static void close_storage(RGWRados *store);
return (info.user_id != RGW_USER_ANON_ID);
}
+int rgw_user_sync_all_stats(RGWRados *store, const string& user_id)
+{
+ CephContext *cct = store->ctx();
+ size_t max_entries = cct->_conf->rgw_list_buckets_max_chunk;
+ bool done;
+ string marker;
+ int ret;
+
+ do {
+ RGWUserBuckets user_buckets;
+ ret = rgw_read_user_buckets(store, user_id, user_buckets, marker, max_entries, false);
+ if (ret < 0) {
+ ldout(cct, 0) << "failed to read user buckets: ret=" << ret << dendl;
+ return ret;
+ }
+ map<string, RGWBucketEnt>& buckets = user_buckets.get_buckets();
+ for (map<string, RGWBucketEnt>::iterator i = buckets.begin();
+ i != buckets.end();
+ ++i) {
+ marker = i->first;
+
+ RGWBucketEnt& bucket_ent = i->second;
+ ret = rgw_bucket_sync_user_stats(store, user_id, bucket_ent.bucket);
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: could not sync bucket stats: ret=" << ret << dendl;
+ return ret;
+ }
+ }
+ done = (buckets.size() < max_entries);
+ } while (!done);
+
+ ret = store->complete_sync_user_stats(user_id);
+ if (ret < 0) {
+ cerr << "ERROR: failed to complete syncing user stats: ret=" << ret << std::endl;
+ return ret;
+ }
+
+ return 0;
+}
+
/**
* Save the given user information to storage.
* Returns: 0 on success, -ERR# on failure.
};
WRITE_CLASS_ENCODER(RGWUID)
+extern int rgw_user_sync_all_stats(RGWRados *store, const string& user_id);
/**
* Get the anonymous (ie, unauthenticated) user info.
*/