]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Adjust bucket stats/index checking/index rebuild/tag timeout implementation to work...
authorGuang Yang <yguang@yahoo-inc.com>
Fri, 29 Aug 2014 10:22:50 +0000 (10:22 +0000)
committerYehuda Sadeh <yehuda@redhat.com>
Wed, 14 Jan 2015 03:21:23 +0000 (19:21 -0800)
Signed-off-by: Guang Yang (yguang@yahoo-inc.com)
src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h
src/rgw/rgw_admin.cc
src/rgw/rgw_bucket.cc
src/rgw/rgw_quota.cc
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_rest_log.h
src/test/cls_rgw/test_cls_rgw.cc

index 0d698e3c8150417b285189d04fc97e2bd8868c79..ed937bc52eb0f8aea354f90eada6104ddd3b2210 100644 (file)
@@ -143,13 +143,50 @@ int cls_rgw_bucket_index_init_op(librados::IoCtx& io_ctx,
   return ret;
 }
 
-void cls_rgw_bucket_set_tag_timeout(ObjectWriteOperation& o, uint64_t tag_timeout)
-{
+static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
+    const string& oid, uint64_t timeout, BucketIndexAioManager *manager) {
   bufferlist in;
   struct rgw_cls_tag_timeout_op call;
-  call.tag_timeout = tag_timeout;
+  call.tag_timeout = timeout;
   ::encode(call, in);
-  o.exec("rgw", "bucket_set_tag_timeout", in);
+  ObjectWriteOperation op;
+  op.exec("rgw", "bucket_set_tag_timeout", in);
+  BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
+  AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
+  int r = io_ctx.aio_operate(oid, c, &op);
+  if (r >= 0) {
+    manager->add_pending(arg->id, c);
+  }
+  return r;
+}
+
+int cls_rgw_bucket_set_tag_timeout(librados::IoCtx& io_ctx, const vector<string>& bucket_objs,
+    uint64_t tag_timeout, uint32_t max_aio)
+{
+  int ret = 0;
+  vector<string>::const_iterator iter = bucket_objs.begin();
+  BucketIndexAioManager manager;
+  for (; iter != bucket_objs.end() && max_aio-- > 0; ++iter) {
+    ret = issue_bucket_set_tag_timeout_op(io_ctx, *iter, tag_timeout, &manager);
+    if (ret < 0)
+      break;
+  }
+
+  int num_completions, r = 0;
+  while (manager.wait_for_completions(0, &num_completions, &r)) {
+    if (r >= 0 && ret >= 0) {
+      for(int i = 0; i < num_completions && iter != bucket_objs.end(); ++i, ++iter) {
+        int issue_ret = issue_bucket_set_tag_timeout_op(io_ctx, *iter, tag_timeout, &manager);
+        if(issue_ret < 0) {
+          ret = issue_ret;
+          break;
+        }
+      }
+    } else if (ret >= 0) {
+      ret = r;
+    }
+  }
+  return ret;
 }
 
 void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag,
@@ -238,39 +275,91 @@ int cls_rgw_list_op(IoCtx& io_ctx, const string& start_obj,
   return ret;
 }
 
-int cls_rgw_bucket_check_index_op(IoCtx& io_ctx, string& oid,
-                                 rgw_bucket_dir_header *existing_header,
-                                 rgw_bucket_dir_header *calculated_header)
-{
-  bufferlist in, out;
-  int r = io_ctx.exec(oid, "rgw", "bucket_check_index", in, out);
-  if (r < 0)
-    return r;
+static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager,
+    struct rgw_cls_check_index_ret *pdata) {
+  bufferlist in;
+  librados::ObjectReadOperation op;
+  op.exec("rgw", "bucket_check_index", in, new ClsBucketIndexOpCtx<struct rgw_cls_check_index_ret>(
+        pdata, NULL));
+  BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
+  AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
+  int r = io_ctx.aio_operate(oid, c, &op, NULL);
+  if (r >= 0) {
+    manager->add_pending(arg->id, c);
+  }
+  return r;
+}
 
-  struct rgw_cls_check_index_ret ret;
-  try {
-    bufferlist::iterator iter = out.begin();
-    ::decode(ret, iter);
-  } catch (buffer::error& err) {
-    return -EIO;
+int cls_rgw_bucket_check_index_op(IoCtx& io_ctx,
+    map<string, struct rgw_cls_check_index_ret>& bucket_objs_ret, uint32_t max_aio)
+{
+  int ret = 0;
+  BucketIndexAioManager manager;
+  map<string, struct rgw_cls_check_index_ret>::iterator iter = bucket_objs_ret.begin();
+  for (; iter != bucket_objs_ret.end() && max_aio-- > 0; ++iter) {
+    ret = issue_bucket_check_index_op(io_ctx, iter->first, &manager, &iter->second);
+    if (ret < 0)
+      break;
   }
 
-  if (existing_header)
-    *existing_header = ret.existing_header;
-  if (calculated_header)
-    *calculated_header = ret.calculated_header;
+  int num_completions, r = 0;
+  while (manager.wait_for_completions(0, &num_completions, &r)) {
+    if (r >= 0 && ret >= 0) {
+      for (int i = 0; i < num_completions && iter != bucket_objs_ret.end(); ++i, ++iter) {
+        int issue_ret = issue_bucket_check_index_op(io_ctx, iter->first, &manager, &iter->second);
+        if (issue_ret < 0) {
+          ret = issue_ret;
+          break;
+        }
+      }
+    } else if (ret >= 0) {
+      ret = r;
+    }
+  }
+  return ret;
+}
 
-  return 0;
+static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid,
+    BucketIndexAioManager *manager) {
+  bufferlist in;
+  librados::ObjectWriteOperation op;
+  op.exec("rgw", "bucket_rebuild_index", in);
+  BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
+  AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
+  int r = io_ctx.aio_operate(oid, c, &op);
+  if (r >= 0) {
+    manager->add_pending(arg->id, c);
+  }
+  return r;
 }
 
-int cls_rgw_bucket_rebuild_index_op(IoCtx& io_ctx, string& oid)
+int cls_rgw_bucket_rebuild_index_op(IoCtx& io_ctx, const vector<string>& bucket_objs,
+    uint32_t max_aio)
 {
-  bufferlist in, out;
-  int r = io_ctx.exec(oid, "rgw", "bucket_rebuild_index", in, out);
-  if (r < 0)
-    return r;
+  int ret = 0;
+  BucketIndexAioManager manager;
+  vector<string>::const_iterator iter = bucket_objs.begin();
+  for (; iter != bucket_objs.end() && max_aio-- > 0; ++iter) {
+    ret = issue_bucket_rebuild_index_op(io_ctx, *iter, &manager);
+    if (ret < 0)
+      break;
+  }
 
-  return 0;
+  int num_completions, r = 0;
+  while (manager.wait_for_completions(0, &num_completions, &r)) {
+    if (r >= 0 && ret >= 0) {
+      for (int i = 0; i < num_completions && iter != bucket_objs.end(); ++i, ++iter) {
+        int issue_ret = issue_bucket_rebuild_index_op(io_ctx, *iter, &manager);
+        if (issue_ret < 0) {
+          ret = issue_ret;
+          break;
+        }
+      }
+    } else if (ret >= 0) {
+      ret = r;
+    }
+  }
+  return ret;
 }
 
 void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates)
@@ -284,28 +373,33 @@ void cls_rgw_suggest_changes(ObjectWriteOperation& o, bufferlist& updates)
   o.exec("rgw", "dir_suggest_changes", updates);
 }
 
-int cls_rgw_get_dir_header(IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header)
+int cls_rgw_get_dir_header(IoCtx& io_ctx, map<string, rgw_cls_list_ret>& dir_headers,
+    uint32_t max_aio)
 {
-  bufferlist in, out;
-  struct rgw_cls_list_op call;
-  call.num_entries = 0;
-  ::encode(call, in);
-  int r = io_ctx.exec(oid, "rgw", "bucket_list", in, out);
-  if (r < 0)
-    return r;
-
-  struct rgw_cls_list_ret ret;
-  try {
-    bufferlist::iterator iter = out.begin();
-    ::decode(ret, iter);
-  } catch (buffer::error& err) {
-    return -EIO;
+  int ret = 0;
+  BucketIndexAioManager manager;
+  map<string, rgw_cls_list_ret>::iterator iter = dir_headers.begin();
+  for (; iter != dir_headers.end() && max_aio-- > 0; ++iter) {
+    ret = issue_bucket_list_op(io_ctx, iter->first, "", "", 0, &manager, &iter->second);
+    if (ret < 0)
+      break;
   }
 
-  if (header)
-    *header = ret.dir.header;
-
- return r;
+  int num_completions, r = 0;
+  while (manager.wait_for_completions(0, &num_completions, &r)) {
+    if (r >= 0 && ret >= 0) {
+      for (int i = 0; i < num_completions && iter != dir_headers.end(); ++i, ++iter) {
+        int issue_ret = issue_bucket_list_op(io_ctx, iter->first, "", "", 0, &manager, &iter->second);
+        if (issue_ret < 0) {
+          ret = issue_ret;
+          break;
+        }
+      }
+    } else if (ret >= 0) {
+      ret = r;
+    }
+  }
+  return ret;
 }
 
 class GetDirHeaderCompletion : public ObjectOperationCompletion {
index a49b6422e7e80853a9a63bb63584d8964904b834..b28425e1fba6553fe525772411e4421cec2ff331 100644 (file)
@@ -77,6 +77,37 @@ public:
   virtual void handle_response(int r, rgw_bucket_dir_header& header) = 0;
 };
 
+class BucketIndexShardsManager {
+private:
+  // Per shard setting manager, for example, marker.
+  map<string, string> value_by_shards;
+  const static char KEY_VALUE_SEPARATOR = '#';
+  const static char SHARDS_SEPARATOR = ',';
+public:
+  void add_item(const string& shard, const string& value) {
+    value_by_shards[shard] = value;
+  }
+  void to_string(string *out) const {
+    if (out) {
+      map<string, string>::const_iterator iter = value_by_shards.begin();
+      // No shards
+      if (value_by_shards.size() == 1) {
+        *out = iter->second;
+      } else {
+        for (; iter != value_by_shards.end(); ++iter) {
+          if (out->length()) {
+            // Not the first item, append a separator first
+            out->append(1, SHARDS_SEPARATOR);
+          }
+          out->append(iter->first);
+          out->append(1, KEY_VALUE_SEPARATOR);
+          out->append(iter->second);
+        }
+      }
+    }
+  }
+};
+
 /* bucket index */
 void cls_rgw_bucket_init(librados::ObjectWriteOperation& o);
 
@@ -92,7 +123,8 @@ void cls_rgw_bucket_init(librados::ObjectWriteOperation& o);
 int cls_rgw_bucket_index_init_op(librados::IoCtx &io_ctx,
         const vector<string>& bucket_objs, uint32_t max_aio);
 
-void cls_rgw_bucket_set_tag_timeout(librados::ObjectWriteOperation& o, uint64_t tag_timeout);
+int cls_rgw_bucket_set_tag_timeout(librados::IoCtx& io_ctx,
+    const vector<string>& bucket_objs, uint64_t tag_timeout, uint32_t max_aio);
 
 void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag,
                                string& name, string& locator, bool log_op);
@@ -122,12 +154,22 @@ int cls_rgw_list_op(librados::IoCtx& io_ctx, const string & start_obj,
                     map<string, struct rgw_cls_list_ret>& list_results,
                     uint32_t max_aio);
 
-int cls_rgw_bucket_check_index_op(librados::IoCtx& io_ctx, string& oid,
-                                 rgw_bucket_dir_header *existing_header,
-                                 rgw_bucket_dir_header *calculated_header);
-int cls_rgw_bucket_rebuild_index_op(librados::IoCtx& io_ctx, string& oid);
+/**
+ * Check the bucket index.
+ *
+ * io_ctx          - IO context for rados.
+ * bucket_objs_ret - check result for all shards.
+ * max_aio         - the maximum number of AIO (for throttling).
+ *
+ * Return 0 on success, a failure code otherwise.
+ */
+int cls_rgw_bucket_check_index_op(librados::IoCtx& io_ctx,
+    map<string, struct rgw_cls_check_index_ret>& bucket_objs_ret, uint32_t max_aio);
+int cls_rgw_bucket_rebuild_index_op(librados::IoCtx& io_ctx, const vector<string>& bucket_objs,
+    uint32_t max_aio);
   
-int cls_rgw_get_dir_header(librados::IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header);
+int cls_rgw_get_dir_header(librados::IoCtx& io_ctx, map<string, rgw_cls_list_ret>& dir_headers,
+    uint32_t max_aio);
 int cls_rgw_get_dir_header_async(librados::IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx);
 
 void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates);
index 2c775ca1e25f2593c4a721937ddc6356e5e5adcb..9cf4a734c33eeacfe9b76b021f9193af29da7a32 100644 (file)
@@ -519,7 +519,7 @@ int bucket_stats(rgw_bucket& bucket, Formatter *formatter)
     return r;
 
   map<RGWObjCategory, RGWStorageStats> stats;
-  uint64_t bucket_ver, master_ver;
+  string bucket_ver, master_ver;
   string max_marker;
   int ret = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, stats, &max_marker);
   if (ret < 0) {
@@ -535,8 +535,8 @@ int bucket_stats(rgw_bucket& bucket, Formatter *formatter)
   formatter->dump_string("marker", bucket.marker);
   formatter->dump_string("owner", bucket_info.owner);
   formatter->dump_int("mtime", mtime);
-  formatter->dump_int("ver", bucket_ver);
-  formatter->dump_int("master_ver", master_ver);
+  formatter->dump_string("ver", bucket_ver);
+  formatter->dump_string("master_ver", master_ver);
   formatter->dump_string("max_marker", max_marker);
   dump_bucket_usage(stats, formatter);
   formatter->close_section();
index c09cdc2f26c809df07864a8e5fa0845fb3b939ff..83e451378b4a7d12afe265fb76aa75d1c60035da 100644 (file)
@@ -358,7 +358,7 @@ int rgw_remove_bucket(RGWRados *store, const string& bucket_owner, rgw_bucket& b
   RGWBucketInfo info;
   bufferlist bl;
 
-  uint64_t bucket_ver, master_ver;
+  string bucket_ver, master_ver;
 
   ret = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, stats, NULL);
   if (ret < 0)
@@ -957,7 +957,7 @@ static int bucket_stats(RGWRados *store, std::string&  bucket_name, Formatter *f
 
   bucket = bucket_info.bucket;
 
-  uint64_t bucket_ver, master_ver;
+  string bucket_ver, master_ver;
   string max_marker;
   int ret = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, stats, &max_marker);
   if (ret < 0) {
@@ -972,8 +972,8 @@ static int bucket_stats(RGWRados *store, std::string&  bucket_name, Formatter *f
   formatter->dump_string("id", bucket.bucket_id);
   formatter->dump_string("marker", bucket.marker);
   formatter->dump_string("owner", bucket_info.owner);
-  formatter->dump_int("ver", bucket_ver);
-  formatter->dump_int("master_ver", master_ver);
+  formatter->dump_string("ver", bucket_ver);
+  formatter->dump_string("master_ver", master_ver);
   formatter->dump_int("mtime", mtime);
   formatter->dump_string("max_marker", max_marker);
   dump_bucket_usage(stats, formatter);
index a48ce69890bdf34031808c239e42cb9f409c32b8..910da2fffb7c2f15d30da68aee2856c4e4d1dfc8 100644 (file)
@@ -318,8 +318,8 @@ int RGWBucketStatsCache::fetch_stats_from_storage(const string& user, rgw_bucket
 {
   RGWBucketInfo bucket_info;
 
-  uint64_t bucket_ver;
-  uint64_t master_ver;
+  string bucket_ver;
+  string master_ver;
 
   map<RGWObjCategory, RGWStorageStats> bucket_stats;
   int r = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, bucket_stats, NULL);
index 235047e296f5ce96ca7b1f44c526d035a4b7de08..a5afa49b916e00cf19516083854cc4643c909dae 100644 (file)
@@ -80,7 +80,6 @@ static RGWObjCategory main_category = RGW_OBJ_CATEGORY_MAIN;
 
 #define RGW_STATELOG_OBJ_PREFIX "statelog."
 
-
 #define dout_subsys ceph_subsys_rgw
 
 void RGWDefaultRegionInfo::dump(Formatter *f) const {
@@ -3851,7 +3850,7 @@ int RGWRados::open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index
   return 0;
 }
 
-static void translate_raw_stats(rgw_bucket_dir_header& header, map<RGWObjCategory, RGWStorageStats>& stats)
+static void accumulate_raw_stats(rgw_bucket_dir_header& header, map<RGWObjCategory, RGWStorageStats>& stats)
 {
   map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = header.stats.begin();
   for (; iter != header.stats.end(); ++iter) {
@@ -3859,9 +3858,9 @@ static void translate_raw_stats(rgw_bucket_dir_header& header, map<RGWObjCategor
     RGWStorageStats& s = stats[category];
     struct rgw_bucket_category_stats& header_stats = iter->second;
     s.category = (RGWObjCategory)iter->first;
-    s.num_kb = ((header_stats.total_size + 1023) / 1024);
-    s.num_kb_rounded = ((header_stats.total_size_rounded + 1023) / 1024);
-    s.num_objects = header_stats.num_entries;
+    s.num_kb += ((header_stats.total_size + 1023) / 1024);
+    s.num_kb_rounded += ((header_stats.total_size_rounded + 1023) / 1024);
+    s.num_objects += header_stats.num_entries;
   }
 }
 
@@ -3870,21 +3869,23 @@ int RGWRados::bucket_check_index(rgw_bucket& bucket,
                                 map<RGWObjCategory, RGWStorageStats> *calculated_stats)
 {
   librados::IoCtx index_ctx;
-  string oid;
-
-  int ret = open_bucket_index(bucket, index_ctx, oid);
+  // key - bucket index object id
+  // value - bucket index check OP returned result with the given bucket index object (shard)
+  map<string, struct rgw_cls_check_index_ret> bucket_objs_ret;
+  int ret = open_bucket_index(bucket, index_ctx, bucket_objs_ret);
   if (ret < 0)
     return ret;
 
-  rgw_bucket_dir_header existing_header;
-  rgw_bucket_dir_header calculated_header;
-
-  ret = cls_rgw_bucket_check_index_op(index_ctx, oid, &existing_header, &calculated_header);
+  ret = cls_rgw_bucket_check_index_op(index_ctx, bucket_objs_ret, cct->_conf->rgw_bucket_index_max_aio);
   if (ret < 0)
     return ret;
 
-  translate_raw_stats(existing_header, *existing_stats);
-  translate_raw_stats(calculated_header, *calculated_stats);
+  // Aggregate results (from different shards if there is any)
+  map<string, struct rgw_cls_check_index_ret>::iterator iter;
+  for (iter = bucket_objs_ret.begin(); iter != bucket_objs_ret.end(); ++iter) {
+    accumulate_raw_stats(iter->second.existing_header, *existing_stats);
+    accumulate_raw_stats(iter->second.calculated_header, *calculated_stats);
+  }
 
   return 0;
 }
@@ -3892,13 +3893,12 @@ int RGWRados::bucket_check_index(rgw_bucket& bucket,
 int RGWRados::bucket_rebuild_index(rgw_bucket& bucket)
 {
   librados::IoCtx index_ctx;
-  string oid;
-
-  int ret = open_bucket_index(bucket, index_ctx, oid);
-  if (ret < 0)
-    return ret;
+  vector<string> bucket_objs;
+  int r = open_bucket_index(bucket, index_ctx, bucket_objs);
+  if (r < 0)
+    return r;
 
-  return cls_rgw_bucket_rebuild_index_op(index_ctx, oid);
+  return cls_rgw_bucket_rebuild_index_op(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio);
 }
 
 
@@ -5463,57 +5463,91 @@ int RGWRados::obj_stat(void *ctx, rgw_obj& obj, uint64_t *psize, time_t *pmtime,
   return 0;
 }
 
-int RGWRados::get_bucket_stats(rgw_bucket& bucket, uint64_t *bucket_ver, uint64_t *master_ver, map<RGWObjCategory, RGWStorageStats>& stats,
-                               string *max_marker)
+int RGWRados::get_bucket_stats(rgw_bucket& bucket, string *bucket_ver, string *master_ver,
+    map<RGWObjCategory, RGWStorageStats>& stats, string *max_marker)
 {
-  rgw_bucket_dir_header header;
-  int r = cls_bucket_head(bucket, header);
+  map<string, rgw_bucket_dir_header> headers;
+  int r = cls_bucket_head(bucket, headers);
   if (r < 0)
     return r;
 
-  stats.clear();
-
-  translate_raw_stats(header, stats);
-
-  *bucket_ver = header.ver;
-  *master_ver = header.master_ver;
-
-  if (max_marker)
-    *max_marker = header.max_marker;
-
+  map<string, rgw_bucket_dir_header>::iterator iter = headers.begin();
+  BucketIndexShardsManager ver_mgr;
+  BucketIndexShardsManager master_ver_mgr;
+  BucketIndexShardsManager marker_mgr;
+  char buf[64];
+  for(; iter != headers.end(); ++iter) {
+    accumulate_raw_stats(iter->second, stats);
+    snprintf(buf, sizeof(buf), "%lu", iter->second.ver);
+    ver_mgr.add_item(iter->first, string(buf));
+    snprintf(buf, sizeof(buf), "%lu", iter->second.master_ver);
+    master_ver_mgr.add_item(iter->first, string(buf));
+    marker_mgr.add_item(iter->first, iter->second.max_marker);
+  }
+  ver_mgr.to_string(bucket_ver);
+  master_ver_mgr.to_string(master_ver);
+  marker_mgr.to_string(max_marker);
   return 0;
 }
 
 class RGWGetBucketStatsContext : public RGWGetDirHeader_CB {
   RGWGetBucketStats_CB *cb;
+  uint32_t pendings;
+  map<RGWObjCategory, RGWStorageStats> stats;
+  int ret_code;
+  bool should_cb;
+  Mutex lock;
 
 public:
-  RGWGetBucketStatsContext(RGWGetBucketStats_CB *_cb) : cb(_cb) {}
+  RGWGetBucketStatsContext(RGWGetBucketStats_CB *_cb, uint32_t _pendings)
+    : cb(_cb), pendings(_pendings), stats(), ret_code(0), should_cb(true),
+    lock("RGWGetBucketStatsContext") {}
+
   void handle_response(int r, rgw_bucket_dir_header& header) {
-    map<RGWObjCategory, RGWStorageStats> stats;
+    Mutex::Locker l(lock);
+    if (should_cb) {
+      if ( r >= 0) {
+        accumulate_raw_stats(header, stats);
+      } else {
+        ret_code = r;
+      }
 
-    if (r >= 0) {
-      translate_raw_stats(header, stats);
-      cb->set_response(header.ver, header.master_ver, &stats, header.max_marker);
+      // Are we all done?
+      if (--pendings == 0) {
+        if (!ret_code) {
+          cb->set_response(&stats);
+        }
+        cb->handle_response(ret_code);
+        cb->put();
+      }
     }
+  }
 
-    cb->handle_response(r);
-
-    cb->put();
+  void unset_cb() {
+    Mutex::Locker l(lock);
+    should_cb = false;
   }
 };
 
 int RGWRados::get_bucket_stats_async(rgw_bucket& bucket, RGWGetBucketStats_CB *ctx)
 {
-  RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(ctx);
-  int r = cls_bucket_head_async(bucket, get_ctx);
+  RGWBucketInfo binfo;
+  int r = get_bucket_instance_info(NULL, bucket, binfo, NULL, NULL);
+  if (r < 0)
+    return r;
+
+  int num_aio = 0;
+  RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(ctx, binfo.num_shards);
+  assert(get_ctx);
+  r = cls_bucket_head_async(bucket, get_ctx, &num_aio);
+  get_ctx->put();
   if (r < 0) {
     ctx->put();
-    delete get_ctx;
-    return r;
+    if (num_aio) {
+      get_ctx->unset_cb();
+    }
   }
-
-  return 0;
+  return r;
 }
 
 class RGWGetUserStatsContext : public RGWGetUserHeader_CB {
@@ -5922,21 +5956,21 @@ int RGWRados::update_containers_stats(map<string, RGWBucketEnt>& m)
     RGWBucketEnt& ent = iter->second;
     rgw_bucket& bucket = ent.bucket;
 
-    rgw_bucket_dir_header header;
-    int r = cls_bucket_head(bucket, header);
+    map<string, rgw_bucket_dir_header> headers;
+    int r = cls_bucket_head(bucket, headers);
     if (r < 0)
       return r;
 
-    ent.count = 0;
-    ent.size = 0;
-
-    RGWObjCategory category = main_category;
-    map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = header.stats.find((uint8_t)category);
-    if (iter != header.stats.end()) {
-      struct rgw_bucket_category_stats& stats = iter->second;
-      ent.count = stats.num_entries;
-      ent.size = stats.total_size;
-      ent.size_rounded = stats.total_size_rounded;
+    map<string, rgw_bucket_dir_header>::iterator hiter = headers.begin();
+    for (; hiter != headers.end(); ++hiter) {
+      RGWObjCategory category = main_category;
+      map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = (hiter->second.stats).find((uint8_t)category);
+      if (iter != hiter->second.stats.end()) {
+        struct rgw_bucket_category_stats& stats = iter->second;
+        ent.count += stats.num_entries;
+        ent.size += stats.total_size;
+        ent.size_rounded += stats.total_size_rounded;
+      }
     }
   }
 
@@ -6210,18 +6244,12 @@ int RGWRados::cls_obj_complete_cancel(rgw_bucket& bucket, string& tag, string& n
 int RGWRados::cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeout)
 {
   librados::IoCtx index_ctx;
-  string oid;
-
-  int r = open_bucket_index(bucket, index_ctx, oid);
+  vector<string> bucket_objs;
+  int r = open_bucket_index(bucket, index_ctx, bucket_objs);
   if (r < 0)
     return r;
 
-  ObjectWriteOperation o;
-  cls_rgw_bucket_set_tag_timeout(o, timeout);
-
-  r = index_ctx.operate(oid, &o);
-
-  return r;
+  return cls_rgw_bucket_set_tag_timeout(index_ctx, bucket_objs, timeout, cct->_conf->rgw_bucket_index_max_aio);
 }
 
 int RGWRados::cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix,
@@ -6522,34 +6550,44 @@ int RGWRados::check_disk_state(librados::IoCtx io_ctx,
   return 0;
 }
 
-int RGWRados::cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header)
+int RGWRados::cls_bucket_head(rgw_bucket& bucket, map<string, struct rgw_bucket_dir_header>& headers)
 {
   librados::IoCtx index_ctx;
-  string oid;
-  int r = open_bucket_index(bucket, index_ctx, oid);
+  map<string, struct rgw_cls_list_ret> list_results;
+  int r = open_bucket_index(bucket, index_ctx, list_results);
   if (r < 0)
     return r;
 
-  r = cls_rgw_get_dir_header(index_ctx, oid, &header);
+  r = cls_rgw_get_dir_header(index_ctx, list_results, cct->_conf->rgw_bucket_index_max_aio);
   if (r < 0)
     return r;
 
+  map<string, struct rgw_cls_list_ret>::iterator iter = list_results.begin();
+  for(; iter != list_results.end(); ++iter) {
+    headers[iter->first] = iter->second.dir.header;
+  }
   return 0;
 }
 
-int RGWRados::cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx)
+int RGWRados::cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx, int *num_aio)
 {
   librados::IoCtx index_ctx;
-  string oid;
-  int r = open_bucket_index(bucket, index_ctx, oid);
-  if (r < 0)
-    return r;
-
-  r = cls_rgw_get_dir_header_async(index_ctx, oid, ctx);
+  vector<string> bucket_objs;
+  int r = open_bucket_index(bucket, index_ctx, bucket_objs);
   if (r < 0)
     return r;
 
-  return 0;
+  vector<string>::iterator iter = bucket_objs.begin();
+  for (; iter != bucket_objs.end(); ++iter) {
+    r = cls_rgw_get_dir_header_async(index_ctx, *iter, static_cast<RGWGetDirHeader_CB*>(ctx->get()));
+    if (r < 0) {
+      ctx->put();
+      break;
+    } else {
+      (*num_aio)++;
+    }
+  }
+  return r;
 }
 
 int RGWRados::cls_user_get_header(const string& user_id, cls_user_header *header)
@@ -6600,8 +6638,8 @@ int RGWRados::cls_user_get_header_async(const string& user_id, RGWGetUserHeader_
 
 int RGWRados::cls_user_sync_bucket_stats(rgw_obj& user_obj, rgw_bucket& bucket)
 {
-  rgw_bucket_dir_header header;
-  int r = cls_bucket_head(bucket, header);
+  map<string, struct rgw_bucket_dir_header> headers;
+  int r = cls_bucket_head(bucket, headers);
   if (r < 0) {
     ldout(cct, 20) << "cls_bucket_header() returned " << r << dendl;
     return r;
@@ -6611,12 +6649,15 @@ int RGWRados::cls_user_sync_bucket_stats(rgw_obj& user_obj, rgw_bucket& bucket)
 
   bucket.convert(&entry.bucket);
 
-  map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = header.stats.begin();
-  for (; iter != header.stats.end(); ++iter) {
-    struct rgw_bucket_category_stats& header_stats = iter->second;
-    entry.size += header_stats.total_size;
-    entry.size_rounded += header_stats.total_size_rounded;
-    entry.count += header_stats.num_entries;
+  map<string, struct rgw_bucket_dir_header>::iterator hiter = headers.begin();
+  for (; hiter != headers.end(); ++hiter) {
+    map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = hiter->second.stats.begin();
+    for (; iter != hiter->second.stats.end(); ++iter) {
+      struct rgw_bucket_category_stats& header_stats = iter->second;
+      entry.size += header_stats.total_size;
+      entry.size_rounded += header_stats.total_size_rounded;
+      entry.count += header_stats.num_entries;
+    }
   }
 
   list<cls_user_bucket_entry> entries;
index cec51c95367d9974780c95fda534f09f0b7b62df..a8f7c0b6de85c4e32b860712d656bf671cc5f756 100644 (file)
@@ -1189,21 +1189,13 @@ public:
 class RGWGetBucketStats_CB : public RefCountedObject {
 protected:
   rgw_bucket bucket;
-  uint64_t bucket_ver;
-  uint64_t master_ver;
   map<RGWObjCategory, RGWStorageStats> *stats;
-  string max_marker;
 public:
   RGWGetBucketStats_CB(rgw_bucket& _bucket) : bucket(_bucket), stats(NULL) {}
   virtual ~RGWGetBucketStats_CB() {}
   virtual void handle_response(int r) = 0;
-  virtual void set_response(uint64_t _bucket_ver, uint64_t _master_ver,
-                            map<RGWObjCategory, RGWStorageStats> *_stats,
-                            const string &_max_marker) {
-    bucket_ver = _bucket_ver;
-    master_ver = _master_ver;
+  virtual void set_response(map<RGWObjCategory, RGWStorageStats> *_stats) {
     stats = _stats;
-    max_marker = _max_marker;
   }
 };
 
@@ -1269,6 +1261,7 @@ class RGWRados
   template<typename T>
   int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
       map<string, T>& bucket_objs);
+
   struct GetObjState {
     librados::IoCtx io_ctx;
     bool sent_data;
@@ -1823,8 +1816,8 @@ public:
   }
 
   int decode_policy(bufferlist& bl, ACLOwner *owner);
-  int get_bucket_stats(rgw_bucket& bucket, uint64_t *bucket_ver, uint64_t *master_ver, map<RGWObjCategory, RGWStorageStats>& stats,
-                       string *max_marker);
+  int get_bucket_stats(rgw_bucket& bucket, string *bucket_ver, string *master_ver,
+      map<RGWObjCategory, RGWStorageStats>& stats, string *max_marker);
   int get_bucket_stats_async(rgw_bucket& bucket, RGWGetBucketStats_CB *cb);
   int get_user_stats(const string& user, RGWStorageStats& stats);
   int get_user_stats_async(const string& user, RGWGetUserStats_CB *cb);
@@ -1860,8 +1853,8 @@ public:
   int cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix, uint32_t hint_num,
                       map<string, RGWObjEnt>& m, bool *is_truncated, string *last_entry,
                       bool (*force_check_filter)(const string&  name) = NULL);
-  int cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header);
-  int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx);
+  int cls_bucket_head(rgw_bucket& bucket, map<string, struct rgw_bucket_dir_header>& headers);
+  int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx, int *num_aio);
   int prepare_update_index(RGWObjState *state, rgw_bucket& bucket,
                            RGWModifyOp op, rgw_obj& oid, string& tag);
   int complete_update_index(rgw_bucket& bucket, string& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size,
index ff1bf3466d378558aab620613a97d3a6f903dc68..22221d4078c7dec1e0143c9651a0064a9b41193b 100644 (file)
@@ -38,11 +38,11 @@ public:
 };
 
 class RGWOp_BILog_Info : public RGWRESTOp {
-  uint64_t bucket_ver;
-  uint64_t master_ver;
+  string bucket_ver;
+  string master_ver;
   string max_marker;
 public:
-  RGWOp_BILog_Info() : bucket_ver(0), master_ver(0) {}
+  RGWOp_BILog_Info() : bucket_ver(), master_ver() {}
   ~RGWOp_BILog_Info() {}
 
   int check_caps(RGWUserCaps& caps) {
index 44cb30307245d2117e4adb7cb628d1d08719aa1c..fd77ccd087f494a0681eb862374d0cd90e4bd5cb 100644 (file)
@@ -3,6 +3,7 @@
 
 #include "include/types.h"
 #include "cls/rgw/cls_rgw_client.h"
+#include "cls/rgw/cls_rgw_ops.h"
 
 #include "gtest/gtest.h"
 #include "test/librados/test.h"
@@ -10,6 +11,7 @@
 #include <errno.h>
 #include <string>
 #include <vector>
+#include <map>
 
 using namespace librados;
 
@@ -66,12 +68,18 @@ public:
 
 void test_stats(librados::IoCtx& ioctx, string& oid, int category, uint64_t num_entries, uint64_t total_size)
 {
-  rgw_bucket_dir_header header;
-  ASSERT_EQ(0, cls_rgw_get_dir_header(ioctx, oid, &header));
-
-  rgw_bucket_category_stats& stats = header.stats[category];
-  ASSERT_EQ(total_size, stats.total_size);
-  ASSERT_EQ(num_entries, stats.num_entries);
+  map<string, struct rgw_cls_list_ret> results;
+  ASSERT_EQ(0, cls_rgw_get_dir_header(ioctx, results, 8));
+
+  uint64_t entries = 0;
+  uint64_t size = 0;
+  map<string, struct rgw_cls_list_ret>::iterator iter = results.begin();
+  for (; iter != results.end(); ++iter) {
+    entries += (iter->second).dir.header.stats[category].num_entries;
+    size += (iter->second).dir.header.stats[category].total_size;
+  }
+  ASSERT_EQ(total_size, size);
+  ASSERT_EQ(num_entries, entries);
 }
 
 void index_prepare(OpMgr& mgr, librados::IoCtx& ioctx, string& oid, RGWModifyOp index_op, string& tag, string& obj, string& loc)
@@ -340,7 +348,9 @@ TEST(cls_rgw, index_suggest)
   }
 
   op = mgr.write_op();
-  cls_rgw_bucket_set_tag_timeout(*op, 1); // short tag timeout
+  vector<string> bucket_objs;
+  bucket_objs.push_back(bucket_oid);
+  cls_rgw_bucket_set_tag_timeout(ioctx, bucket_objs, 1, 8); // short tag timeout
   ASSERT_EQ(0, ioctx.operate(bucket_oid, op));
 
   sleep(1);