From: Guang Yang Date: Fri, 29 Aug 2014 10:22:50 +0000 (+0000) Subject: Adjust bucket stats/index checking/index rebuild/tag timeout implementation to work... X-Git-Tag: v0.92~12^2~33 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=9c5acd67c4cfb28b22662e9ae3a67657cd689080;p=ceph.git Adjust bucket stats/index checking/index rebuild/tag timeout implementation to work with multiple shards. Signed-off-by: Guang Yang (yguang@yahoo-inc.com) --- diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index 0d698e3c8150..ed937bc52eb0 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -143,13 +143,50 @@ int cls_rgw_bucket_index_init_op(librados::IoCtx& io_ctx, return ret; } -void cls_rgw_bucket_set_tag_timeout(ObjectWriteOperation& o, uint64_t tag_timeout) -{ +static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx, + const string& oid, uint64_t timeout, BucketIndexAioManager *manager) { bufferlist in; struct rgw_cls_tag_timeout_op call; - call.tag_timeout = tag_timeout; + call.tag_timeout = timeout; ::encode(call, in); - o.exec("rgw", "bucket_set_tag_timeout", in); + ObjectWriteOperation op; + op.exec("rgw", "bucket_set_tag_timeout", in); + BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager); + AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb); + int r = io_ctx.aio_operate(oid, c, &op); + if (r >= 0) { + manager->add_pending(arg->id, c); + } + return r; +} + +int cls_rgw_bucket_set_tag_timeout(librados::IoCtx& io_ctx, const vector& bucket_objs, + uint64_t tag_timeout, uint32_t max_aio) +{ + int ret = 0; + vector::const_iterator iter = bucket_objs.begin(); + BucketIndexAioManager manager; + for (; iter != bucket_objs.end() && max_aio-- > 0; ++iter) { + ret = issue_bucket_set_tag_timeout_op(io_ctx, *iter, tag_timeout, &manager); + if (ret < 0) + break; + } + + int num_completions, r = 0; + while (manager.wait_for_completions(0, &num_completions, &r)) { + if (r >= 0 && ret >= 0) { + for(int i = 0; i < num_completions && iter != bucket_objs.end(); ++i, ++iter) { + int issue_ret = issue_bucket_set_tag_timeout_op(io_ctx, *iter, tag_timeout, &manager); + if(issue_ret < 0) { + ret = issue_ret; + break; + } + } + } else if (ret >= 0) { + ret = r; + } + } + return ret; } void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag, @@ -238,39 +275,91 @@ int cls_rgw_list_op(IoCtx& io_ctx, const string& start_obj, return ret; } -int cls_rgw_bucket_check_index_op(IoCtx& io_ctx, string& oid, - rgw_bucket_dir_header *existing_header, - rgw_bucket_dir_header *calculated_header) -{ - bufferlist in, out; - int r = io_ctx.exec(oid, "rgw", "bucket_check_index", in, out); - if (r < 0) - return r; +static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager, + struct rgw_cls_check_index_ret *pdata) { + bufferlist in; + librados::ObjectReadOperation op; + op.exec("rgw", "bucket_check_index", in, new ClsBucketIndexOpCtx( + pdata, NULL)); + BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager); + AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb); + int r = io_ctx.aio_operate(oid, c, &op, NULL); + if (r >= 0) { + manager->add_pending(arg->id, c); + } + return r; +} - struct rgw_cls_check_index_ret ret; - try { - bufferlist::iterator iter = out.begin(); - ::decode(ret, iter); - } catch (buffer::error& err) { - return -EIO; +int cls_rgw_bucket_check_index_op(IoCtx& io_ctx, + map& bucket_objs_ret, uint32_t max_aio) +{ + int ret = 0; + BucketIndexAioManager manager; + map::iterator iter = bucket_objs_ret.begin(); + for (; iter != bucket_objs_ret.end() && max_aio-- > 0; ++iter) { + ret = issue_bucket_check_index_op(io_ctx, iter->first, &manager, &iter->second); + if (ret < 0) + break; } - if (existing_header) - *existing_header = ret.existing_header; - if (calculated_header) - *calculated_header = ret.calculated_header; + int num_completions, r = 0; + while (manager.wait_for_completions(0, &num_completions, &r)) { + if (r >= 0 && ret >= 0) { + for (int i = 0; i < num_completions && iter != bucket_objs_ret.end(); ++i, ++iter) { + int issue_ret = issue_bucket_check_index_op(io_ctx, iter->first, &manager, &iter->second); + if (issue_ret < 0) { + ret = issue_ret; + break; + } + } + } else if (ret >= 0) { + ret = r; + } + } + return ret; +} - return 0; +static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid, + BucketIndexAioManager *manager) { + bufferlist in; + librados::ObjectWriteOperation op; + op.exec("rgw", "bucket_rebuild_index", in); + BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager); + AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb); + int r = io_ctx.aio_operate(oid, c, &op); + if (r >= 0) { + manager->add_pending(arg->id, c); + } + return r; } -int cls_rgw_bucket_rebuild_index_op(IoCtx& io_ctx, string& oid) +int cls_rgw_bucket_rebuild_index_op(IoCtx& io_ctx, const vector& bucket_objs, + uint32_t max_aio) { - bufferlist in, out; - int r = io_ctx.exec(oid, "rgw", "bucket_rebuild_index", in, out); - if (r < 0) - return r; + int ret = 0; + BucketIndexAioManager manager; + vector::const_iterator iter = bucket_objs.begin(); + for (; iter != bucket_objs.end() && max_aio-- > 0; ++iter) { + ret = issue_bucket_rebuild_index_op(io_ctx, *iter, &manager); + if (ret < 0) + break; + } - return 0; + int num_completions, r = 0; + while (manager.wait_for_completions(0, &num_completions, &r)) { + if (r >= 0 && ret >= 0) { + for (int i = 0; i < num_completions && iter != bucket_objs.end(); ++i, ++iter) { + int issue_ret = issue_bucket_rebuild_index_op(io_ctx, *iter, &manager); + if (issue_ret < 0) { + ret = issue_ret; + break; + } + } + } else if (ret >= 0) { + ret = r; + } + } + return ret; } void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates) @@ -284,28 +373,33 @@ void cls_rgw_suggest_changes(ObjectWriteOperation& o, bufferlist& updates) o.exec("rgw", "dir_suggest_changes", updates); } -int cls_rgw_get_dir_header(IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header) +int cls_rgw_get_dir_header(IoCtx& io_ctx, map& dir_headers, + uint32_t max_aio) { - bufferlist in, out; - struct rgw_cls_list_op call; - call.num_entries = 0; - ::encode(call, in); - int r = io_ctx.exec(oid, "rgw", "bucket_list", in, out); - if (r < 0) - return r; - - struct rgw_cls_list_ret ret; - try { - bufferlist::iterator iter = out.begin(); - ::decode(ret, iter); - } catch (buffer::error& err) { - return -EIO; + int ret = 0; + BucketIndexAioManager manager; + map::iterator iter = dir_headers.begin(); + for (; iter != dir_headers.end() && max_aio-- > 0; ++iter) { + ret = issue_bucket_list_op(io_ctx, iter->first, "", "", 0, &manager, &iter->second); + if (ret < 0) + break; } - if (header) - *header = ret.dir.header; - - return r; + int num_completions, r = 0; + while (manager.wait_for_completions(0, &num_completions, &r)) { + if (r >= 0 && ret >= 0) { + for (int i = 0; i < num_completions && iter != dir_headers.end(); ++i, ++iter) { + int issue_ret = issue_bucket_list_op(io_ctx, iter->first, "", "", 0, &manager, &iter->second); + if (issue_ret < 0) { + ret = issue_ret; + break; + } + } + } else if (ret >= 0) { + ret = r; + } + } + return ret; } class GetDirHeaderCompletion : public ObjectOperationCompletion { diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index a49b6422e7e8..b28425e1fba6 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -77,6 +77,37 @@ public: virtual void handle_response(int r, rgw_bucket_dir_header& header) = 0; }; +class BucketIndexShardsManager { +private: + // Per shard setting manager, for example, marker. + map value_by_shards; + const static char KEY_VALUE_SEPARATOR = '#'; + const static char SHARDS_SEPARATOR = ','; +public: + void add_item(const string& shard, const string& value) { + value_by_shards[shard] = value; + } + void to_string(string *out) const { + if (out) { + map::const_iterator iter = value_by_shards.begin(); + // No shards + if (value_by_shards.size() == 1) { + *out = iter->second; + } else { + for (; iter != value_by_shards.end(); ++iter) { + if (out->length()) { + // Not the first item, append a separator first + out->append(1, SHARDS_SEPARATOR); + } + out->append(iter->first); + out->append(1, KEY_VALUE_SEPARATOR); + out->append(iter->second); + } + } + } + } +}; + /* bucket index */ void cls_rgw_bucket_init(librados::ObjectWriteOperation& o); @@ -92,7 +123,8 @@ void cls_rgw_bucket_init(librados::ObjectWriteOperation& o); int cls_rgw_bucket_index_init_op(librados::IoCtx &io_ctx, const vector& bucket_objs, uint32_t max_aio); -void cls_rgw_bucket_set_tag_timeout(librados::ObjectWriteOperation& o, uint64_t tag_timeout); +int cls_rgw_bucket_set_tag_timeout(librados::IoCtx& io_ctx, + const vector& bucket_objs, uint64_t tag_timeout, uint32_t max_aio); void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag, string& name, string& locator, bool log_op); @@ -122,12 +154,22 @@ int cls_rgw_list_op(librados::IoCtx& io_ctx, const string & start_obj, map& list_results, uint32_t max_aio); -int cls_rgw_bucket_check_index_op(librados::IoCtx& io_ctx, string& oid, - rgw_bucket_dir_header *existing_header, - rgw_bucket_dir_header *calculated_header); -int cls_rgw_bucket_rebuild_index_op(librados::IoCtx& io_ctx, string& oid); +/** + * Check the bucket index. + * + * io_ctx - IO context for rados. + * bucket_objs_ret - check result for all shards. + * max_aio - the maximum number of AIO (for throttling). + * + * Return 0 on success, a failure code otherwise. + */ +int cls_rgw_bucket_check_index_op(librados::IoCtx& io_ctx, + map& bucket_objs_ret, uint32_t max_aio); +int cls_rgw_bucket_rebuild_index_op(librados::IoCtx& io_ctx, const vector& bucket_objs, + uint32_t max_aio); -int cls_rgw_get_dir_header(librados::IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header); +int cls_rgw_get_dir_header(librados::IoCtx& io_ctx, map& dir_headers, + uint32_t max_aio); int cls_rgw_get_dir_header_async(librados::IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx); void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates); diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 2c775ca1e25f..9cf4a734c33e 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -519,7 +519,7 @@ int bucket_stats(rgw_bucket& bucket, Formatter *formatter) return r; map stats; - uint64_t bucket_ver, master_ver; + string bucket_ver, master_ver; string max_marker; int ret = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, stats, &max_marker); if (ret < 0) { @@ -535,8 +535,8 @@ int bucket_stats(rgw_bucket& bucket, Formatter *formatter) formatter->dump_string("marker", bucket.marker); formatter->dump_string("owner", bucket_info.owner); formatter->dump_int("mtime", mtime); - formatter->dump_int("ver", bucket_ver); - formatter->dump_int("master_ver", master_ver); + formatter->dump_string("ver", bucket_ver); + formatter->dump_string("master_ver", master_ver); formatter->dump_string("max_marker", max_marker); dump_bucket_usage(stats, formatter); formatter->close_section(); diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index c09cdc2f26c8..83e451378b4a 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -358,7 +358,7 @@ int rgw_remove_bucket(RGWRados *store, const string& bucket_owner, rgw_bucket& b RGWBucketInfo info; bufferlist bl; - uint64_t bucket_ver, master_ver; + string bucket_ver, master_ver; ret = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, stats, NULL); if (ret < 0) @@ -957,7 +957,7 @@ static int bucket_stats(RGWRados *store, std::string& bucket_name, Formatter *f bucket = bucket_info.bucket; - uint64_t bucket_ver, master_ver; + string bucket_ver, master_ver; string max_marker; int ret = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, stats, &max_marker); if (ret < 0) { @@ -972,8 +972,8 @@ static int bucket_stats(RGWRados *store, std::string& bucket_name, Formatter *f formatter->dump_string("id", bucket.bucket_id); formatter->dump_string("marker", bucket.marker); formatter->dump_string("owner", bucket_info.owner); - formatter->dump_int("ver", bucket_ver); - formatter->dump_int("master_ver", master_ver); + formatter->dump_string("ver", bucket_ver); + formatter->dump_string("master_ver", master_ver); formatter->dump_int("mtime", mtime); formatter->dump_string("max_marker", max_marker); dump_bucket_usage(stats, formatter); diff --git a/src/rgw/rgw_quota.cc b/src/rgw/rgw_quota.cc index a48ce69890bd..910da2fffb7c 100644 --- a/src/rgw/rgw_quota.cc +++ b/src/rgw/rgw_quota.cc @@ -318,8 +318,8 @@ int RGWBucketStatsCache::fetch_stats_from_storage(const string& user, rgw_bucket { RGWBucketInfo bucket_info; - uint64_t bucket_ver; - uint64_t master_ver; + string bucket_ver; + string master_ver; map bucket_stats; int r = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, bucket_stats, NULL); diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 235047e296f5..a5afa49b916e 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -80,7 +80,6 @@ static RGWObjCategory main_category = RGW_OBJ_CATEGORY_MAIN; #define RGW_STATELOG_OBJ_PREFIX "statelog." - #define dout_subsys ceph_subsys_rgw void RGWDefaultRegionInfo::dump(Formatter *f) const { @@ -3851,7 +3850,7 @@ int RGWRados::open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index return 0; } -static void translate_raw_stats(rgw_bucket_dir_header& header, map& stats) +static void accumulate_raw_stats(rgw_bucket_dir_header& header, map& stats) { map::iterator iter = header.stats.begin(); for (; iter != header.stats.end(); ++iter) { @@ -3859,9 +3858,9 @@ static void translate_raw_stats(rgw_bucket_dir_header& header, mapsecond; s.category = (RGWObjCategory)iter->first; - s.num_kb = ((header_stats.total_size + 1023) / 1024); - s.num_kb_rounded = ((header_stats.total_size_rounded + 1023) / 1024); - s.num_objects = header_stats.num_entries; + s.num_kb += ((header_stats.total_size + 1023) / 1024); + s.num_kb_rounded += ((header_stats.total_size_rounded + 1023) / 1024); + s.num_objects += header_stats.num_entries; } } @@ -3870,21 +3869,23 @@ int RGWRados::bucket_check_index(rgw_bucket& bucket, map *calculated_stats) { librados::IoCtx index_ctx; - string oid; - - int ret = open_bucket_index(bucket, index_ctx, oid); + // key - bucket index object id + // value - bucket index check OP returned result with the given bucket index object (shard) + map bucket_objs_ret; + int ret = open_bucket_index(bucket, index_ctx, bucket_objs_ret); if (ret < 0) return ret; - rgw_bucket_dir_header existing_header; - rgw_bucket_dir_header calculated_header; - - ret = cls_rgw_bucket_check_index_op(index_ctx, oid, &existing_header, &calculated_header); + ret = cls_rgw_bucket_check_index_op(index_ctx, bucket_objs_ret, cct->_conf->rgw_bucket_index_max_aio); if (ret < 0) return ret; - translate_raw_stats(existing_header, *existing_stats); - translate_raw_stats(calculated_header, *calculated_stats); + // Aggregate results (from different shards if there is any) + map::iterator iter; + for (iter = bucket_objs_ret.begin(); iter != bucket_objs_ret.end(); ++iter) { + accumulate_raw_stats(iter->second.existing_header, *existing_stats); + accumulate_raw_stats(iter->second.calculated_header, *calculated_stats); + } return 0; } @@ -3892,13 +3893,12 @@ int RGWRados::bucket_check_index(rgw_bucket& bucket, int RGWRados::bucket_rebuild_index(rgw_bucket& bucket) { librados::IoCtx index_ctx; - string oid; - - int ret = open_bucket_index(bucket, index_ctx, oid); - if (ret < 0) - return ret; + vector bucket_objs; + int r = open_bucket_index(bucket, index_ctx, bucket_objs); + if (r < 0) + return r; - return cls_rgw_bucket_rebuild_index_op(index_ctx, oid); + return cls_rgw_bucket_rebuild_index_op(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio); } @@ -5463,57 +5463,91 @@ int RGWRados::obj_stat(void *ctx, rgw_obj& obj, uint64_t *psize, time_t *pmtime, return 0; } -int RGWRados::get_bucket_stats(rgw_bucket& bucket, uint64_t *bucket_ver, uint64_t *master_ver, map& stats, - string *max_marker) +int RGWRados::get_bucket_stats(rgw_bucket& bucket, string *bucket_ver, string *master_ver, + map& stats, string *max_marker) { - rgw_bucket_dir_header header; - int r = cls_bucket_head(bucket, header); + map headers; + int r = cls_bucket_head(bucket, headers); if (r < 0) return r; - stats.clear(); - - translate_raw_stats(header, stats); - - *bucket_ver = header.ver; - *master_ver = header.master_ver; - - if (max_marker) - *max_marker = header.max_marker; - + map::iterator iter = headers.begin(); + BucketIndexShardsManager ver_mgr; + BucketIndexShardsManager master_ver_mgr; + BucketIndexShardsManager marker_mgr; + char buf[64]; + for(; iter != headers.end(); ++iter) { + accumulate_raw_stats(iter->second, stats); + snprintf(buf, sizeof(buf), "%lu", iter->second.ver); + ver_mgr.add_item(iter->first, string(buf)); + snprintf(buf, sizeof(buf), "%lu", iter->second.master_ver); + master_ver_mgr.add_item(iter->first, string(buf)); + marker_mgr.add_item(iter->first, iter->second.max_marker); + } + ver_mgr.to_string(bucket_ver); + master_ver_mgr.to_string(master_ver); + marker_mgr.to_string(max_marker); return 0; } class RGWGetBucketStatsContext : public RGWGetDirHeader_CB { RGWGetBucketStats_CB *cb; + uint32_t pendings; + map stats; + int ret_code; + bool should_cb; + Mutex lock; public: - RGWGetBucketStatsContext(RGWGetBucketStats_CB *_cb) : cb(_cb) {} + RGWGetBucketStatsContext(RGWGetBucketStats_CB *_cb, uint32_t _pendings) + : cb(_cb), pendings(_pendings), stats(), ret_code(0), should_cb(true), + lock("RGWGetBucketStatsContext") {} + void handle_response(int r, rgw_bucket_dir_header& header) { - map stats; + Mutex::Locker l(lock); + if (should_cb) { + if ( r >= 0) { + accumulate_raw_stats(header, stats); + } else { + ret_code = r; + } - if (r >= 0) { - translate_raw_stats(header, stats); - cb->set_response(header.ver, header.master_ver, &stats, header.max_marker); + // Are we all done? + if (--pendings == 0) { + if (!ret_code) { + cb->set_response(&stats); + } + cb->handle_response(ret_code); + cb->put(); + } } + } - cb->handle_response(r); - - cb->put(); + void unset_cb() { + Mutex::Locker l(lock); + should_cb = false; } }; int RGWRados::get_bucket_stats_async(rgw_bucket& bucket, RGWGetBucketStats_CB *ctx) { - RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(ctx); - int r = cls_bucket_head_async(bucket, get_ctx); + RGWBucketInfo binfo; + int r = get_bucket_instance_info(NULL, bucket, binfo, NULL, NULL); + if (r < 0) + return r; + + int num_aio = 0; + RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(ctx, binfo.num_shards); + assert(get_ctx); + r = cls_bucket_head_async(bucket, get_ctx, &num_aio); + get_ctx->put(); if (r < 0) { ctx->put(); - delete get_ctx; - return r; + if (num_aio) { + get_ctx->unset_cb(); + } } - - return 0; + return r; } class RGWGetUserStatsContext : public RGWGetUserHeader_CB { @@ -5922,21 +5956,21 @@ int RGWRados::update_containers_stats(map& m) RGWBucketEnt& ent = iter->second; rgw_bucket& bucket = ent.bucket; - rgw_bucket_dir_header header; - int r = cls_bucket_head(bucket, header); + map headers; + int r = cls_bucket_head(bucket, headers); if (r < 0) return r; - ent.count = 0; - ent.size = 0; - - RGWObjCategory category = main_category; - map::iterator iter = header.stats.find((uint8_t)category); - if (iter != header.stats.end()) { - struct rgw_bucket_category_stats& stats = iter->second; - ent.count = stats.num_entries; - ent.size = stats.total_size; - ent.size_rounded = stats.total_size_rounded; + map::iterator hiter = headers.begin(); + for (; hiter != headers.end(); ++hiter) { + RGWObjCategory category = main_category; + map::iterator iter = (hiter->second.stats).find((uint8_t)category); + if (iter != hiter->second.stats.end()) { + struct rgw_bucket_category_stats& stats = iter->second; + ent.count += stats.num_entries; + ent.size += stats.total_size; + ent.size_rounded += stats.total_size_rounded; + } } } @@ -6210,18 +6244,12 @@ int RGWRados::cls_obj_complete_cancel(rgw_bucket& bucket, string& tag, string& n int RGWRados::cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeout) { librados::IoCtx index_ctx; - string oid; - - int r = open_bucket_index(bucket, index_ctx, oid); + vector bucket_objs; + int r = open_bucket_index(bucket, index_ctx, bucket_objs); if (r < 0) return r; - ObjectWriteOperation o; - cls_rgw_bucket_set_tag_timeout(o, timeout); - - r = index_ctx.operate(oid, &o); - - return r; + return cls_rgw_bucket_set_tag_timeout(index_ctx, bucket_objs, timeout, cct->_conf->rgw_bucket_index_max_aio); } int RGWRados::cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix, @@ -6522,34 +6550,44 @@ int RGWRados::check_disk_state(librados::IoCtx io_ctx, return 0; } -int RGWRados::cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header) +int RGWRados::cls_bucket_head(rgw_bucket& bucket, map& headers) { librados::IoCtx index_ctx; - string oid; - int r = open_bucket_index(bucket, index_ctx, oid); + map list_results; + int r = open_bucket_index(bucket, index_ctx, list_results); if (r < 0) return r; - r = cls_rgw_get_dir_header(index_ctx, oid, &header); + r = cls_rgw_get_dir_header(index_ctx, list_results, cct->_conf->rgw_bucket_index_max_aio); if (r < 0) return r; + map::iterator iter = list_results.begin(); + for(; iter != list_results.end(); ++iter) { + headers[iter->first] = iter->second.dir.header; + } return 0; } -int RGWRados::cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx) +int RGWRados::cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx, int *num_aio) { librados::IoCtx index_ctx; - string oid; - int r = open_bucket_index(bucket, index_ctx, oid); - if (r < 0) - return r; - - r = cls_rgw_get_dir_header_async(index_ctx, oid, ctx); + vector bucket_objs; + int r = open_bucket_index(bucket, index_ctx, bucket_objs); if (r < 0) return r; - return 0; + vector::iterator iter = bucket_objs.begin(); + for (; iter != bucket_objs.end(); ++iter) { + r = cls_rgw_get_dir_header_async(index_ctx, *iter, static_cast(ctx->get())); + if (r < 0) { + ctx->put(); + break; + } else { + (*num_aio)++; + } + } + return r; } int RGWRados::cls_user_get_header(const string& user_id, cls_user_header *header) @@ -6600,8 +6638,8 @@ int RGWRados::cls_user_get_header_async(const string& user_id, RGWGetUserHeader_ int RGWRados::cls_user_sync_bucket_stats(rgw_obj& user_obj, rgw_bucket& bucket) { - rgw_bucket_dir_header header; - int r = cls_bucket_head(bucket, header); + map headers; + int r = cls_bucket_head(bucket, headers); if (r < 0) { ldout(cct, 20) << "cls_bucket_header() returned " << r << dendl; return r; @@ -6611,12 +6649,15 @@ int RGWRados::cls_user_sync_bucket_stats(rgw_obj& user_obj, rgw_bucket& bucket) bucket.convert(&entry.bucket); - map::iterator iter = header.stats.begin(); - for (; iter != header.stats.end(); ++iter) { - struct rgw_bucket_category_stats& header_stats = iter->second; - entry.size += header_stats.total_size; - entry.size_rounded += header_stats.total_size_rounded; - entry.count += header_stats.num_entries; + map::iterator hiter = headers.begin(); + for (; hiter != headers.end(); ++hiter) { + map::iterator iter = hiter->second.stats.begin(); + for (; iter != hiter->second.stats.end(); ++iter) { + struct rgw_bucket_category_stats& header_stats = iter->second; + entry.size += header_stats.total_size; + entry.size_rounded += header_stats.total_size_rounded; + entry.count += header_stats.num_entries; + } } list entries; diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index cec51c95367d..a8f7c0b6de85 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1189,21 +1189,13 @@ public: class RGWGetBucketStats_CB : public RefCountedObject { protected: rgw_bucket bucket; - uint64_t bucket_ver; - uint64_t master_ver; map *stats; - string max_marker; public: RGWGetBucketStats_CB(rgw_bucket& _bucket) : bucket(_bucket), stats(NULL) {} virtual ~RGWGetBucketStats_CB() {} virtual void handle_response(int r) = 0; - virtual void set_response(uint64_t _bucket_ver, uint64_t _master_ver, - map *_stats, - const string &_max_marker) { - bucket_ver = _bucket_ver; - master_ver = _master_ver; + virtual void set_response(map *_stats) { stats = _stats; - max_marker = _max_marker; } }; @@ -1269,6 +1261,7 @@ class RGWRados template int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, map& bucket_objs); + struct GetObjState { librados::IoCtx io_ctx; bool sent_data; @@ -1823,8 +1816,8 @@ public: } int decode_policy(bufferlist& bl, ACLOwner *owner); - int get_bucket_stats(rgw_bucket& bucket, uint64_t *bucket_ver, uint64_t *master_ver, map& stats, - string *max_marker); + int get_bucket_stats(rgw_bucket& bucket, string *bucket_ver, string *master_ver, + map& stats, string *max_marker); int get_bucket_stats_async(rgw_bucket& bucket, RGWGetBucketStats_CB *cb); int get_user_stats(const string& user, RGWStorageStats& stats); int get_user_stats_async(const string& user, RGWGetUserStats_CB *cb); @@ -1860,8 +1853,8 @@ public: int cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix, uint32_t hint_num, map& m, bool *is_truncated, string *last_entry, bool (*force_check_filter)(const string& name) = NULL); - int cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header); - int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx); + int cls_bucket_head(rgw_bucket& bucket, map& headers); + int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx, int *num_aio); int prepare_update_index(RGWObjState *state, rgw_bucket& bucket, RGWModifyOp op, rgw_obj& oid, string& tag); int complete_update_index(rgw_bucket& bucket, string& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size, diff --git a/src/rgw/rgw_rest_log.h b/src/rgw/rgw_rest_log.h index ff1bf3466d37..22221d4078c7 100644 --- a/src/rgw/rgw_rest_log.h +++ b/src/rgw/rgw_rest_log.h @@ -38,11 +38,11 @@ public: }; class RGWOp_BILog_Info : public RGWRESTOp { - uint64_t bucket_ver; - uint64_t master_ver; + string bucket_ver; + string master_ver; string max_marker; public: - RGWOp_BILog_Info() : bucket_ver(0), master_ver(0) {} + RGWOp_BILog_Info() : bucket_ver(), master_ver() {} ~RGWOp_BILog_Info() {} int check_caps(RGWUserCaps& caps) { diff --git a/src/test/cls_rgw/test_cls_rgw.cc b/src/test/cls_rgw/test_cls_rgw.cc index 44cb30307245..fd77ccd087f4 100644 --- a/src/test/cls_rgw/test_cls_rgw.cc +++ b/src/test/cls_rgw/test_cls_rgw.cc @@ -3,6 +3,7 @@ #include "include/types.h" #include "cls/rgw/cls_rgw_client.h" +#include "cls/rgw/cls_rgw_ops.h" #include "gtest/gtest.h" #include "test/librados/test.h" @@ -10,6 +11,7 @@ #include #include #include +#include using namespace librados; @@ -66,12 +68,18 @@ public: void test_stats(librados::IoCtx& ioctx, string& oid, int category, uint64_t num_entries, uint64_t total_size) { - rgw_bucket_dir_header header; - ASSERT_EQ(0, cls_rgw_get_dir_header(ioctx, oid, &header)); - - rgw_bucket_category_stats& stats = header.stats[category]; - ASSERT_EQ(total_size, stats.total_size); - ASSERT_EQ(num_entries, stats.num_entries); + map results; + ASSERT_EQ(0, cls_rgw_get_dir_header(ioctx, results, 8)); + + uint64_t entries = 0; + uint64_t size = 0; + map::iterator iter = results.begin(); + for (; iter != results.end(); ++iter) { + entries += (iter->second).dir.header.stats[category].num_entries; + size += (iter->second).dir.header.stats[category].total_size; + } + ASSERT_EQ(total_size, size); + ASSERT_EQ(num_entries, entries); } void index_prepare(OpMgr& mgr, librados::IoCtx& ioctx, string& oid, RGWModifyOp index_op, string& tag, string& obj, string& loc) @@ -340,7 +348,9 @@ TEST(cls_rgw, index_suggest) } op = mgr.write_op(); - cls_rgw_bucket_set_tag_timeout(*op, 1); // short tag timeout + vector bucket_objs; + bucket_objs.push_back(bucket_oid); + cls_rgw_bucket_set_tag_timeout(ioctx, bucket_objs, 1, 8); // short tag timeout ASSERT_EQ(0, ioctx.operate(bucket_oid, op)); sleep(1);