]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: async quota update
authorYehuda Sadeh <yehuda@inktank.com>
Tue, 8 Oct 2013 03:22:51 +0000 (20:22 -0700)
committerYehuda Sadeh <yehuda@inktank.com>
Thu, 10 Oct 2013 20:51:17 +0000 (13:51 -0700)
Asynchronously update bucket stats when a period passed, but bucket
stats are within the ttl window.

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h
src/rgw/rgw_quota.cc
src/rgw/rgw_quota.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index 165ca4379878bb748d6558ff51513d0006290a97..2851f2bd70261a49a1a3f68c862b80c25f16d513 100644 (file)
@@ -2,6 +2,7 @@
 
 #include "include/types.h"
 #include "cls/rgw/cls_rgw_ops.h"
+#include "cls/rgw/cls_rgw_client.h"
 #include "include/rados/librados.hpp"
 
 #include "common/debug.h"
@@ -157,6 +158,44 @@ int cls_rgw_get_dir_header(IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *he
  return r;
 }
 
+class GetDirHeaderCompletion : public ObjectOperationCompletion {
+  RGWGetDirHeader_CB *ret_ctx;
+public:
+  GetDirHeaderCompletion(RGWGetDirHeader_CB *_ctx) : ret_ctx(_ctx) {}
+  ~GetDirHeaderCompletion() {
+    ret_ctx->put();
+  }
+  void handle_completion(int r, bufferlist& outbl) {
+    struct rgw_cls_list_ret ret;
+    try {
+      bufferlist::iterator iter = outbl.begin();
+      ::decode(ret, iter);
+    } catch (buffer::error& err) {
+      r = -EIO;
+    }
+
+    ret_ctx->handle_response(r, ret.dir.header);
+  };
+};
+
+int cls_rgw_get_dir_header_async(IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx)
+{
+  bufferlist in, out;
+  struct rgw_cls_list_op call;
+  call.num_entries = 0;
+  ::encode(call, in);
+  ObjectReadOperation op;
+  GetDirHeaderCompletion *cb = new GetDirHeaderCompletion(ctx);
+  op.exec("rgw", "bucket_list", in, cb);
+  AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
+  int r = io_ctx.aio_operate(oid, c, &op, NULL);
+  c->release();
+  if (r < 0)
+    return r;
+
+  return 0;
+}
+
 int cls_rgw_bi_log_list(IoCtx& io_ctx, string& oid, string& marker, uint32_t max,
                     list<rgw_bi_log_entry>& entries, bool *truncated)
 {
index 2ea5d9ca771ecae015986f81ee0094e560aaa330..39bb3c9fc4a9eb58e1d5eeb8e0e2de0e912d38a5 100644 (file)
@@ -4,6 +4,13 @@
 #include "include/types.h"
 #include "include/rados/librados.hpp"
 #include "cls_rgw_types.h"
+#include "common/RefCountedObj.h"
+
+class RGWGetDirHeader_CB : public RefCountedObject {
+public:
+  virtual ~RGWGetDirHeader_CB() {}
+  virtual void handle_response(int r, rgw_bucket_dir_header& header) = 0;
+};
 
 /* bucket index */
 void cls_rgw_bucket_init(librados::ObjectWriteOperation& o);
@@ -27,6 +34,7 @@ int cls_rgw_bucket_check_index_op(librados::IoCtx& io_ctx, string& oid,
 int cls_rgw_bucket_rebuild_index_op(librados::IoCtx& io_ctx, string& oid);
   
 int cls_rgw_get_dir_header(librados::IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header);
+int cls_rgw_get_dir_header_async(librados::IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx);
 
 void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates);
 
index 70ab0762ce61c05c7308de143aa9589c22acdb42..984b3b57dea13cc3a2d9ca8b65395342002a782b 100644 (file)
@@ -1,6 +1,7 @@
 
 #include "include/utime.h"
 #include "common/lru_map.h"
+#include "common/RefCountedObj.h"
 
 #include "rgw_common.h"
 #include "rgw_rados.h"
 struct RGWQuotaBucketStats {
   RGWBucketStats stats;
   utime_t expiration;
+  utime_t async_refresh_time;
 };
 
 class RGWBucketStatsCache {
   RGWRados *store;
   lru_map<rgw_bucket, RGWQuotaBucketStats> stats_map;
+  RefCountedWaitObject *async_refcount;
 
   int fetch_bucket_totals(rgw_bucket& bucket, RGWBucketStats& stats);
 
 public:
 #warning FIXME configurable stats_map size
-  RGWBucketStatsCache(RGWRados *_store) : store(_store), stats_map(10000) {}
+  RGWBucketStatsCache(RGWRados *_store) : store(_store), stats_map(10000) {
+    async_refcount = new RefCountedWaitObject;
+  }
+  ~RGWBucketStatsCache() {
+    async_refcount->put_wait(); /* wait for all pending async requests to complete */
+  }
 
   int get_bucket_stats(rgw_bucket& bucket, RGWBucketStats& stats);
   void adjust_bucket_stats(rgw_bucket& bucket, int objs_delta, uint64_t added_bytes, uint64_t removed_bytes);
+
+  void set_stats(rgw_bucket& bucket, RGWQuotaBucketStats& qs, RGWBucketStats& stats);
+  int async_refresh(rgw_bucket& bucket, RGWQuotaBucketStats& qs);
+  void async_refresh_response(rgw_bucket& bucket, RGWBucketStats& stats);
 };
 
 int RGWBucketStatsCache::fetch_bucket_totals(rgw_bucket& bucket, RGWBucketStats& stats)
@@ -42,6 +54,8 @@ int RGWBucketStatsCache::fetch_bucket_totals(rgw_bucket& bucket, RGWBucketStats&
     return r;
   }
 
+  stats = RGWBucketStats();
+
   map<RGWObjCategory, RGWBucketStats>::iterator iter;
   for (iter = bucket_stats.begin(); iter != bucket_stats.end(); ++iter) {
     RGWBucketStats& s = iter->second;
@@ -53,23 +67,124 @@ int RGWBucketStatsCache::fetch_bucket_totals(rgw_bucket& bucket, RGWBucketStats&
   return 0;
 }
 
+class AsyncRefreshHandler : public RGWGetBucketStats_CB {
+  RGWRados *store;
+  RGWBucketStatsCache *cache;
+public:
+  AsyncRefreshHandler(RGWRados *_store, RGWBucketStatsCache *_cache, rgw_bucket& _bucket) : RGWGetBucketStats_CB(_bucket), store(_store), cache(_cache) {}
+
+  int init_fetch();
+
+  void handle_response(int r);
+};
+
+
+int AsyncRefreshHandler::init_fetch()
+{
+  ldout(store->ctx(), 20) << "initiating async quota refresh for bucket=" << bucket << dendl;
+  map<RGWObjCategory, RGWBucketStats> bucket_stats;
+  int r = store->get_bucket_stats_async(bucket, this);
+  if (r < 0) {
+    ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket.name << dendl;
+
+    /* get_bucket_stats_async() dropped our reference already */
+    return r;
+  }
+
+  return 0;
+}
+
+void AsyncRefreshHandler::handle_response(int r)
+{
+  if (r < 0) {
+    ldout(store->ctx(), 20) << "AsyncRefreshHandler::handle_response() r=" << r << dendl;
+    return; /* nothing to do here */
+  }
+
+  RGWBucketStats bs;
+
+  map<RGWObjCategory, RGWBucketStats>::iterator iter;
+  for (iter = stats->begin(); iter != stats->end(); ++iter) {
+    RGWBucketStats& s = iter->second;
+    bs.num_kb += s.num_kb;
+    bs.num_kb_rounded += s.num_kb_rounded;
+    bs.num_objects += s.num_objects;
+  }
+
+  cache->async_refresh_response(bucket, bs);
+}
+
+int RGWBucketStatsCache::async_refresh(rgw_bucket& bucket, RGWQuotaBucketStats& qs)
+{
+#if 0
+  if (qs.async_update_flag.inc() != 1) { /* are we the first one here? */
+    qs.async_update_flag.dec();
+    return 0;
+  }
+#endif
+#warning protect against multiple updates
+
+  async_refcount->get();
+
+  AsyncRefreshHandler *handler = new AsyncRefreshHandler(store, this, bucket);
+
+  int ret = handler->init_fetch();
+  if (ret < 0) {
+    async_refcount->put();
+    handler->put();
+    return ret;
+  }
+
+  return 0;
+}
+
+void RGWBucketStatsCache::async_refresh_response(rgw_bucket& bucket, RGWBucketStats& stats)
+{
+  ldout(store->ctx(), 20) << "async stats refresh response for bucket=" << bucket << dendl;
+
+  RGWQuotaBucketStats qs;
+
+  stats_map.find(bucket, qs);
+
+  set_stats(bucket, qs, stats);
+
+  async_refcount->put();
+}
+
+void RGWBucketStatsCache::set_stats(rgw_bucket& bucket, RGWQuotaBucketStats& qs, RGWBucketStats& stats)
+{
+  qs.stats = stats;
+  qs.expiration = ceph_clock_now(store->ctx());
+  qs.async_refresh_time = qs.expiration;
+  qs.expiration += store->ctx()->_conf->rgw_bucket_quota_ttl;
+  qs.async_refresh_time += store->ctx()->_conf->rgw_bucket_quota_ttl / 2;
+
+  stats_map.add(bucket, qs);
+}
+
 int RGWBucketStatsCache::get_bucket_stats(rgw_bucket& bucket, RGWBucketStats& stats) {
   RGWQuotaBucketStats qs;
+  utime_t now = ceph_clock_now(store->ctx());
   if (stats_map.find(bucket, qs)) {
+    if (now >= qs.async_refresh_time) {
+      int r = async_refresh(bucket, qs);
+      if (r < 0) {
+        ldout(store->ctx(), 0) << "ERROR: quota async refresh returned ret=" << r << dendl;
+
+        /* continue processing, might be a transient error, async refresh is just optimization */
+      }
+    }
     if (qs.expiration > ceph_clock_now(store->ctx())) {
       stats = qs.stats;
       return 0;
     }
   }
 
-  int ret = fetch_bucket_totals(bucket, qs.stats);
+  int ret = fetch_bucket_totals(bucket, stats);
   if (ret < 0 && ret != -ENOENT)
     return ret;
 
-  qs.expiration = ceph_clock_now(store->ctx());
-  qs.expiration += store->ctx()->_conf->rgw_bucket_quota_ttl;
-
-  stats_map.add(bucket, qs);
+  set_stats(bucket, qs, stats);
 
   return 0;
 }
index 39cfc62de5579937309d42ff27d975c5ca9ea817..9af91e3986d8821f7171b38833070f3648f4ce0b 100644 (file)
@@ -3,6 +3,7 @@
 
 
 #include "include/utime.h"
+#include "include/atomic.h"
 #include "common/lru_map.h"
 
 class RGWRados;
@@ -42,7 +43,8 @@ class rgw_bucket;
 class RGWQuotaHandler {
 public:
   RGWQuotaHandler() {}
-  virtual ~RGWQuotaHandler() {}
+  virtual ~RGWQuotaHandler() {
+  }
   virtual int check_quota(rgw_bucket& bucket, RGWQuotaInfo& bucket_quota,
                          uint64_t num_objs, uint64_t size) = 0;
 
index 8035e0589dea7aa50c7aff4e06ee8e0d70405180..811c7ee57cc30a78d2b026d4cf735bb2da8f82db 100644 (file)
@@ -4614,6 +4614,38 @@ int RGWRados::get_bucket_stats(rgw_bucket& bucket, uint64_t *bucket_ver, uint64_
   return 0;
 }
 
+class RGWGetBucketStatsContext : public RGWGetDirHeader_CB {
+  RGWGetBucketStats_CB *cb;
+
+public:
+  RGWGetBucketStatsContext(RGWGetBucketStats_CB *_cb) : cb(_cb) {}
+  void handle_response(int r, rgw_bucket_dir_header& header) {
+    map<RGWObjCategory, RGWBucketStats> stats;
+
+    if (r >= 0) {
+      translate_raw_stats(header, stats);
+      cb->set_response(header.ver, header.master_ver, &stats, header.max_marker);
+    }
+
+    cb->handle_response(r);
+
+    cb->put();
+  }
+};
+
+int RGWRados::get_bucket_stats_async(rgw_bucket& bucket, RGWGetBucketStats_CB *ctx)
+{
+  RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(ctx);
+  int r = cls_bucket_head_async(bucket, get_ctx);
+  if (r < 0) {
+    ctx->put();
+    delete get_ctx;
+    return r;
+  }
+
+  return 0;
+}
+
 void RGWRados::get_bucket_instance_entry(rgw_bucket& bucket, string& entry)
 {
   entry = bucket.name + ":" + bucket.bucket_id;
@@ -5496,6 +5528,21 @@ int RGWRados::cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header&
   return 0;
 }
 
+int RGWRados::cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx)
+{
+  librados::IoCtx index_ctx;
+  string oid;
+  int r = open_bucket_index(bucket, index_ctx, oid);
+  if (r < 0)
+    return r;
+
+  r = cls_rgw_get_dir_header_async(index_ctx, oid, ctx);
+  if (r < 0)
+    return r;
+
+  return 0;
+}
+
 int RGWRados::check_quota(rgw_bucket& bucket, RGWQuotaInfo& quota_info, uint64_t obj_size)
 {
   return quota_handler->check_quota(bucket, quota_info, 1, obj_size);
index a23f90f1f23eb6512862cdd20ab9132d249490d9..52b898123d4ef5ccff4284c433726dcaaf727738 100644 (file)
@@ -761,6 +761,29 @@ public:
   int renew_state();
 };
 
+class RGWGetBucketStats_CB : public RefCountedObject {
+protected:
+  rgw_bucket bucket;
+  uint64_t bucket_ver;
+  uint64_t master_ver;
+  map<RGWObjCategory, RGWBucketStats> *stats;
+  string max_marker;
+public:
+  RGWGetBucketStats_CB(rgw_bucket& _bucket) : bucket(_bucket), stats(NULL) {}
+  virtual ~RGWGetBucketStats_CB() {}
+  virtual void handle_response(int r) = 0;
+  virtual void set_response(uint64_t _bucket_ver, uint64_t _master_ver,
+                            map<RGWObjCategory, RGWBucketStats> *_stats,
+                            const string &_max_marker) {
+    bucket_ver = _bucket_ver;
+    master_ver = _master_ver;
+    stats = _stats;
+    max_marker = _max_marker;
+  }
+};
+
+class RGWGetDirHeader_CB;
+
 
 class RGWRados
 {
@@ -1295,6 +1318,7 @@ public:
   int decode_policy(bufferlist& bl, ACLOwner *owner);
   int get_bucket_stats(rgw_bucket& bucket, uint64_t *bucket_ver, uint64_t *master_ver, map<RGWObjCategory, RGWBucketStats>& stats,
                        string *max_marker);
+  int get_bucket_stats_async(rgw_bucket& bucket, RGWGetBucketStats_CB *cb);
   void get_bucket_instance_obj(rgw_bucket& bucket, rgw_obj& obj);
   void get_bucket_instance_entry(rgw_bucket& bucket, string& entry);
   void get_bucket_meta_oid(rgw_bucket& bucket, string& oid);
@@ -1326,6 +1350,7 @@ public:
                       map<string, RGWObjEnt>& m, bool *is_truncated,
                       string *last_entry, bool (*force_check_filter)(const string&  name) = NULL);
   int cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header);
+  int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx);
   int prepare_update_index(RGWObjState *state, rgw_bucket& bucket,
                            RGWModifyOp op, rgw_obj& oid, string& tag);
   int complete_update_index(rgw_bucket& bucket, string& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size,