#include "include/utime.h"
#include "common/lru_map.h"
#include "common/RefCountedObj.h"
+#include "common/Thread.h"
+#include "common/Mutex.h"
+#include "common/RWLock.h"
#include "rgw_common.h"
#include "rgw_rados.h"
#include "rgw_quota.h"
+#include "rgw_bucket.h"
#define dout_subsys ceph_subsys_rgw
virtual bool map_find_and_update(const string& user, rgw_bucket& bucket, typename lru_map<T, RGWQuotaCacheStats>::UpdateContext *ctx) = 0;
virtual void map_add(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) = 0;
- virtual int handle_set_stats(const string& user, rgw_bucket& bucket, RGWStorageStats& stats) { return 0; }
+ virtual void data_modified(const string& user, rgw_bucket& bucket) {}
public:
RGWQuotaCache(RGWRados *_store, int size) : store(_store), stats_map(size) {
async_refcount = new RefCountedWaitObject;
qs.async_refresh_time += store->ctx()->_conf->rgw_bucket_quota_ttl / 2;
map_add(user, bucket, qs);
-
- int ret = handle_set_stats(user, bucket, stats);
- if (ret < 0) {
- /* can't really do much about it, user stats are going to be off a bit for now */
- }
}
template<class T>
{
RGWQuotaStatsUpdate<T> update(objs_delta, added_bytes, removed_bytes);
map_find_and_update(user, bucket, &update);
+
+ data_modified(user, bucket);
}
class BucketAsyncRefreshHandler : public RGWQuotaCache<rgw_bucket>::AsyncRefreshHandler,
}
int fetch_stats_from_storage(const string& user, rgw_bucket& bucket, RGWStorageStats& stats);
- int handle_set_stats(const string& user, rgw_bucket& bucket, RGWStorageStats& stats) {
- int ret = store->update_user_bucket_stats(user, bucket, stats);
- if (ret < 0) {
- derr << "ERROR: store->update_bucket_stats() returned " << ret << dendl;
- return ret;
- }
- return 0;
- }
public:
RGWBucketStatsCache(RGWRados *_store) : RGWQuotaCache(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size) {
}
class RGWUserStatsCache : public RGWQuotaCache<string> {
+ atomic_t down_flag;
+ RWLock rwlock;
+ map<rgw_bucket, string> modified_buckets;
+
+ /* thread, sync recent modified buckets info */
+ class BucketsSyncThread : public Thread {
+ CephContext *cct;
+ RGWUserStatsCache *stats;
+
+ Mutex lock;
+ Cond cond;
+ public:
+
+ BucketsSyncThread(CephContext *_cct, RGWUserStatsCache *_s) : cct(_cct), stats(_s), lock("RGWUserStatsCache::BucketsSyncThread") {}
+
+ void *entry() {
+ ldout(cct, 20) << "BucketsSyncThread: start" << dendl;
+ do {
+ map<rgw_bucket, string> buckets;
+
+ stats->swap_modified_buckets(buckets);
+
+ for (map<rgw_bucket, string>::iterator iter = buckets.begin(); iter != buckets.end(); ++iter) {
+ rgw_bucket bucket = iter->first;
+ string& user = iter->second;
+ ldout(cct, 20) << "BucketsSyncThread: sync user=" << user << " bucket=" << bucket << dendl;
+ int r = stats->sync_bucket(user, bucket);
+ if (r < 0) {
+ ldout(cct, 0) << "WARNING: sync_bucket() returned r=" << r << dendl;
+ }
+ }
+
+ if (stats->going_down())
+ break;
+
+ lock.Lock();
+ cond.WaitInterval(cct, lock, utime_t(cct->_conf->rgw_user_quota_bucket_sync_interval, 0));
+ lock.Unlock();
+ } while (!stats->going_down());
+ ldout(cct, 20) << "BucketsSyncThread: done" << dendl;
+
+ return NULL;
+ }
+
+ void stop() {
+ Mutex::Locker l(lock);
+ cond.Signal();
+ }
+ };
+
+ BucketsSyncThread *buckets_sync_thread;
protected:
bool map_find(const string& user, rgw_bucket& bucket, RGWQuotaCacheStats& qs) {
return stats_map.find(user, qs);
}
int fetch_stats_from_storage(const string& user, rgw_bucket& bucket, RGWStorageStats& stats);
+ int sync_bucket(const string& user, rgw_bucket& bucket);
+
+ void data_modified(const string& user, rgw_bucket& bucket);
+
+ void swap_modified_buckets(map<rgw_bucket, string>& out) {
+ rwlock.get_write();
+ modified_buckets.swap(out);
+ rwlock.unlock();
+ }
public:
- RGWUserStatsCache(RGWRados *_store) : RGWQuotaCache(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size) {
+ RGWUserStatsCache(RGWRados *_store) : RGWQuotaCache(_store, _store->ctx()->_conf->rgw_bucket_quota_cache_size),
+ rwlock("RGWUserStatsCache::rwlock") {
+ buckets_sync_thread = new BucketsSyncThread(store->ctx(), this);
+ buckets_sync_thread->create();
+ }
+ ~RGWUserStatsCache() {
+ stop();
}
AsyncRefreshHandler *allocate_refresh_handler(const string& user, rgw_bucket& bucket) {
*/
return true;
}
+
+ bool going_down() {
+ return (down_flag.read() != 0);
+ }
+
+ void stop() {
+ down_flag.set(1);
+ rwlock.get_write();
+ if (buckets_sync_thread) {
+ buckets_sync_thread->stop();
+ buckets_sync_thread->join();
+ delete buckets_sync_thread;
+ buckets_sync_thread = NULL;
+ }
+ rwlock.unlock();
+ }
};
int RGWUserStatsCache::fetch_stats_from_storage(const string& user, rgw_bucket& bucket, RGWStorageStats& stats)
return 0;
}
+int RGWUserStatsCache::sync_bucket(const string& user, rgw_bucket& bucket)
+{
+ int r = rgw_bucket_sync_user_stats(store, user, bucket);
+ if (r < 0) {
+ ldout(store->ctx(), 0) << "ERROR: rgw_bucket_sync_user_stats() for user=" << user << ", bucket=" << bucket << " returned " << r << dendl;
+ return r;
+ }
+
+ return 0;
+}
+
+void RGWUserStatsCache::data_modified(const string& user, rgw_bucket& bucket)
+{
+ /* racy, but it's ok */
+ rwlock.get_read();
+ bool need_update = modified_buckets.find(bucket) == modified_buckets.end();
+ rwlock.unlock();
+
+ if (need_update) {
+ rwlock.get_write();
+ modified_buckets[bucket] = user;
+ rwlock.unlock();
+ }
+}
+
class RGWQuotaHandlerImpl : public RGWQuotaHandler {
RGWRados *store;