return ret;
}
-void cls_rgw_bucket_set_tag_timeout(ObjectWriteOperation& o, uint64_t tag_timeout)
-{
+static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
+ const string& oid, uint64_t timeout, BucketIndexAioManager *manager) {
bufferlist in;
struct rgw_cls_tag_timeout_op call;
- call.tag_timeout = tag_timeout;
+ call.tag_timeout = timeout;
::encode(call, in);
- o.exec("rgw", "bucket_set_tag_timeout", in);
+ ObjectWriteOperation op;
+ op.exec("rgw", "bucket_set_tag_timeout", in);
+ BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
+ AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
+ int r = io_ctx.aio_operate(oid, c, &op);
+ if (r >= 0) {
+ manager->add_pending(arg->id, c);
+ }
+ return r;
+}
+
+int cls_rgw_bucket_set_tag_timeout(librados::IoCtx& io_ctx, const vector<string>& bucket_objs,
+ uint64_t tag_timeout, uint32_t max_aio)
+{
+ int ret = 0;
+ vector<string>::const_iterator iter = bucket_objs.begin();
+ BucketIndexAioManager manager;
+ for (; iter != bucket_objs.end() && max_aio-- > 0; ++iter) {
+ ret = issue_bucket_set_tag_timeout_op(io_ctx, *iter, tag_timeout, &manager);
+ if (ret < 0)
+ break;
+ }
+
+ int num_completions, r = 0;
+ while (manager.wait_for_completions(0, &num_completions, &r)) {
+ if (r >= 0 && ret >= 0) {
+ for(int i = 0; i < num_completions && iter != bucket_objs.end(); ++i, ++iter) {
+ int issue_ret = issue_bucket_set_tag_timeout_op(io_ctx, *iter, tag_timeout, &manager);
+ if(issue_ret < 0) {
+ ret = issue_ret;
+ break;
+ }
+ }
+ } else if (ret >= 0) {
+ ret = r;
+ }
+ }
+ return ret;
}
void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag,
return ret;
}
-int cls_rgw_bucket_check_index_op(IoCtx& io_ctx, string& oid,
- rgw_bucket_dir_header *existing_header,
- rgw_bucket_dir_header *calculated_header)
-{
- bufferlist in, out;
- int r = io_ctx.exec(oid, "rgw", "bucket_check_index", in, out);
- if (r < 0)
- return r;
+static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager,
+ struct rgw_cls_check_index_ret *pdata) {
+ bufferlist in;
+ librados::ObjectReadOperation op;
+ op.exec("rgw", "bucket_check_index", in, new ClsBucketIndexOpCtx<struct rgw_cls_check_index_ret>(
+ pdata, NULL));
+ BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
+ AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
+ int r = io_ctx.aio_operate(oid, c, &op, NULL);
+ if (r >= 0) {
+ manager->add_pending(arg->id, c);
+ }
+ return r;
+}
- struct rgw_cls_check_index_ret ret;
- try {
- bufferlist::iterator iter = out.begin();
- ::decode(ret, iter);
- } catch (buffer::error& err) {
- return -EIO;
+int cls_rgw_bucket_check_index_op(IoCtx& io_ctx,
+ map<string, struct rgw_cls_check_index_ret>& bucket_objs_ret, uint32_t max_aio)
+{
+ int ret = 0;
+ BucketIndexAioManager manager;
+ map<string, struct rgw_cls_check_index_ret>::iterator iter = bucket_objs_ret.begin();
+ for (; iter != bucket_objs_ret.end() && max_aio-- > 0; ++iter) {
+ ret = issue_bucket_check_index_op(io_ctx, iter->first, &manager, &iter->second);
+ if (ret < 0)
+ break;
}
- if (existing_header)
- *existing_header = ret.existing_header;
- if (calculated_header)
- *calculated_header = ret.calculated_header;
+ int num_completions, r = 0;
+ while (manager.wait_for_completions(0, &num_completions, &r)) {
+ if (r >= 0 && ret >= 0) {
+ for (int i = 0; i < num_completions && iter != bucket_objs_ret.end(); ++i, ++iter) {
+ int issue_ret = issue_bucket_check_index_op(io_ctx, iter->first, &manager, &iter->second);
+ if (issue_ret < 0) {
+ ret = issue_ret;
+ break;
+ }
+ }
+ } else if (ret >= 0) {
+ ret = r;
+ }
+ }
+ return ret;
+}
- return 0;
+static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid,
+ BucketIndexAioManager *manager) {
+ bufferlist in;
+ librados::ObjectWriteOperation op;
+ op.exec("rgw", "bucket_rebuild_index", in);
+ BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
+ AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
+ int r = io_ctx.aio_operate(oid, c, &op);
+ if (r >= 0) {
+ manager->add_pending(arg->id, c);
+ }
+ return r;
}
-int cls_rgw_bucket_rebuild_index_op(IoCtx& io_ctx, string& oid)
+int cls_rgw_bucket_rebuild_index_op(IoCtx& io_ctx, const vector<string>& bucket_objs,
+ uint32_t max_aio)
{
- bufferlist in, out;
- int r = io_ctx.exec(oid, "rgw", "bucket_rebuild_index", in, out);
- if (r < 0)
- return r;
+ int ret = 0;
+ BucketIndexAioManager manager;
+ vector<string>::const_iterator iter = bucket_objs.begin();
+ for (; iter != bucket_objs.end() && max_aio-- > 0; ++iter) {
+ ret = issue_bucket_rebuild_index_op(io_ctx, *iter, &manager);
+ if (ret < 0)
+ break;
+ }
- return 0;
+ int num_completions, r = 0;
+ while (manager.wait_for_completions(0, &num_completions, &r)) {
+ if (r >= 0 && ret >= 0) {
+ for (int i = 0; i < num_completions && iter != bucket_objs.end(); ++i, ++iter) {
+ int issue_ret = issue_bucket_rebuild_index_op(io_ctx, *iter, &manager);
+ if (issue_ret < 0) {
+ ret = issue_ret;
+ break;
+ }
+ }
+ } else if (ret >= 0) {
+ ret = r;
+ }
+ }
+ return ret;
}
void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates)
o.exec("rgw", "dir_suggest_changes", updates);
}
-int cls_rgw_get_dir_header(IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header)
+int cls_rgw_get_dir_header(IoCtx& io_ctx, map<string, rgw_cls_list_ret>& dir_headers,
+ uint32_t max_aio)
{
- bufferlist in, out;
- struct rgw_cls_list_op call;
- call.num_entries = 0;
- ::encode(call, in);
- int r = io_ctx.exec(oid, "rgw", "bucket_list", in, out);
- if (r < 0)
- return r;
-
- struct rgw_cls_list_ret ret;
- try {
- bufferlist::iterator iter = out.begin();
- ::decode(ret, iter);
- } catch (buffer::error& err) {
- return -EIO;
+ int ret = 0;
+ BucketIndexAioManager manager;
+ map<string, rgw_cls_list_ret>::iterator iter = dir_headers.begin();
+ for (; iter != dir_headers.end() && max_aio-- > 0; ++iter) {
+ ret = issue_bucket_list_op(io_ctx, iter->first, "", "", 0, &manager, &iter->second);
+ if (ret < 0)
+ break;
}
- if (header)
- *header = ret.dir.header;
-
- return r;
+ int num_completions, r = 0;
+ while (manager.wait_for_completions(0, &num_completions, &r)) {
+ if (r >= 0 && ret >= 0) {
+ for (int i = 0; i < num_completions && iter != dir_headers.end(); ++i, ++iter) {
+ int issue_ret = issue_bucket_list_op(io_ctx, iter->first, "", "", 0, &manager, &iter->second);
+ if (issue_ret < 0) {
+ ret = issue_ret;
+ break;
+ }
+ }
+ } else if (ret >= 0) {
+ ret = r;
+ }
+ }
+ return ret;
}
class GetDirHeaderCompletion : public ObjectOperationCompletion {
virtual void handle_response(int r, rgw_bucket_dir_header& header) = 0;
};
+class BucketIndexShardsManager {
+private:
+ // Per shard setting manager, for example, marker.
+ map<string, string> value_by_shards;
+ const static char KEY_VALUE_SEPARATOR = '#';
+ const static char SHARDS_SEPARATOR = ',';
+public:
+ void add_item(const string& shard, const string& value) {
+ value_by_shards[shard] = value;
+ }
+ void to_string(string *out) const {
+ if (out) {
+ map<string, string>::const_iterator iter = value_by_shards.begin();
+ // No shards
+ if (value_by_shards.size() == 1) {
+ *out = iter->second;
+ } else {
+ for (; iter != value_by_shards.end(); ++iter) {
+ if (out->length()) {
+ // Not the first item, append a separator first
+ out->append(1, SHARDS_SEPARATOR);
+ }
+ out->append(iter->first);
+ out->append(1, KEY_VALUE_SEPARATOR);
+ out->append(iter->second);
+ }
+ }
+ }
+ }
+};
+
/* bucket index */
void cls_rgw_bucket_init(librados::ObjectWriteOperation& o);
int cls_rgw_bucket_index_init_op(librados::IoCtx &io_ctx,
const vector<string>& bucket_objs, uint32_t max_aio);
-void cls_rgw_bucket_set_tag_timeout(librados::ObjectWriteOperation& o, uint64_t tag_timeout);
+int cls_rgw_bucket_set_tag_timeout(librados::IoCtx& io_ctx,
+ const vector<string>& bucket_objs, uint64_t tag_timeout, uint32_t max_aio);
void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag,
string& name, string& locator, bool log_op);
map<string, struct rgw_cls_list_ret>& list_results,
uint32_t max_aio);
-int cls_rgw_bucket_check_index_op(librados::IoCtx& io_ctx, string& oid,
- rgw_bucket_dir_header *existing_header,
- rgw_bucket_dir_header *calculated_header);
-int cls_rgw_bucket_rebuild_index_op(librados::IoCtx& io_ctx, string& oid);
+/**
+ * Check the bucket index.
+ *
+ * io_ctx - IO context for rados.
+ * bucket_objs_ret - check result for all shards.
+ * max_aio - the maximum number of AIO (for throttling).
+ *
+ * Return 0 on success, a failure code otherwise.
+ */
+int cls_rgw_bucket_check_index_op(librados::IoCtx& io_ctx,
+ map<string, struct rgw_cls_check_index_ret>& bucket_objs_ret, uint32_t max_aio);
+int cls_rgw_bucket_rebuild_index_op(librados::IoCtx& io_ctx, const vector<string>& bucket_objs,
+ uint32_t max_aio);
-int cls_rgw_get_dir_header(librados::IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header);
+int cls_rgw_get_dir_header(librados::IoCtx& io_ctx, map<string, rgw_cls_list_ret>& dir_headers,
+ uint32_t max_aio);
int cls_rgw_get_dir_header_async(librados::IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx);
void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates);
return r;
map<RGWObjCategory, RGWStorageStats> stats;
- uint64_t bucket_ver, master_ver;
+ string bucket_ver, master_ver;
string max_marker;
int ret = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, stats, &max_marker);
if (ret < 0) {
formatter->dump_string("marker", bucket.marker);
formatter->dump_string("owner", bucket_info.owner);
formatter->dump_int("mtime", mtime);
- formatter->dump_int("ver", bucket_ver);
- formatter->dump_int("master_ver", master_ver);
+ formatter->dump_string("ver", bucket_ver);
+ formatter->dump_string("master_ver", master_ver);
formatter->dump_string("max_marker", max_marker);
dump_bucket_usage(stats, formatter);
formatter->close_section();
RGWBucketInfo info;
bufferlist bl;
- uint64_t bucket_ver, master_ver;
+ string bucket_ver, master_ver;
ret = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, stats, NULL);
if (ret < 0)
bucket = bucket_info.bucket;
- uint64_t bucket_ver, master_ver;
+ string bucket_ver, master_ver;
string max_marker;
int ret = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, stats, &max_marker);
if (ret < 0) {
formatter->dump_string("id", bucket.bucket_id);
formatter->dump_string("marker", bucket.marker);
formatter->dump_string("owner", bucket_info.owner);
- formatter->dump_int("ver", bucket_ver);
- formatter->dump_int("master_ver", master_ver);
+ formatter->dump_string("ver", bucket_ver);
+ formatter->dump_string("master_ver", master_ver);
formatter->dump_int("mtime", mtime);
formatter->dump_string("max_marker", max_marker);
dump_bucket_usage(stats, formatter);
{
RGWBucketInfo bucket_info;
- uint64_t bucket_ver;
- uint64_t master_ver;
+ string bucket_ver;
+ string master_ver;
map<RGWObjCategory, RGWStorageStats> bucket_stats;
int r = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, bucket_stats, NULL);
#define RGW_STATELOG_OBJ_PREFIX "statelog."
-
#define dout_subsys ceph_subsys_rgw
void RGWDefaultRegionInfo::dump(Formatter *f) const {
return 0;
}
-static void translate_raw_stats(rgw_bucket_dir_header& header, map<RGWObjCategory, RGWStorageStats>& stats)
+static void accumulate_raw_stats(rgw_bucket_dir_header& header, map<RGWObjCategory, RGWStorageStats>& stats)
{
map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = header.stats.begin();
for (; iter != header.stats.end(); ++iter) {
RGWStorageStats& s = stats[category];
struct rgw_bucket_category_stats& header_stats = iter->second;
s.category = (RGWObjCategory)iter->first;
- s.num_kb = ((header_stats.total_size + 1023) / 1024);
- s.num_kb_rounded = ((header_stats.total_size_rounded + 1023) / 1024);
- s.num_objects = header_stats.num_entries;
+ s.num_kb += ((header_stats.total_size + 1023) / 1024);
+ s.num_kb_rounded += ((header_stats.total_size_rounded + 1023) / 1024);
+ s.num_objects += header_stats.num_entries;
}
}
map<RGWObjCategory, RGWStorageStats> *calculated_stats)
{
librados::IoCtx index_ctx;
- string oid;
-
- int ret = open_bucket_index(bucket, index_ctx, oid);
+ // key - bucket index object id
+ // value - bucket index check OP returned result with the given bucket index object (shard)
+ map<string, struct rgw_cls_check_index_ret> bucket_objs_ret;
+ int ret = open_bucket_index(bucket, index_ctx, bucket_objs_ret);
if (ret < 0)
return ret;
- rgw_bucket_dir_header existing_header;
- rgw_bucket_dir_header calculated_header;
-
- ret = cls_rgw_bucket_check_index_op(index_ctx, oid, &existing_header, &calculated_header);
+ ret = cls_rgw_bucket_check_index_op(index_ctx, bucket_objs_ret, cct->_conf->rgw_bucket_index_max_aio);
if (ret < 0)
return ret;
- translate_raw_stats(existing_header, *existing_stats);
- translate_raw_stats(calculated_header, *calculated_stats);
+ // Aggregate results (from different shards if there is any)
+ map<string, struct rgw_cls_check_index_ret>::iterator iter;
+ for (iter = bucket_objs_ret.begin(); iter != bucket_objs_ret.end(); ++iter) {
+ accumulate_raw_stats(iter->second.existing_header, *existing_stats);
+ accumulate_raw_stats(iter->second.calculated_header, *calculated_stats);
+ }
return 0;
}
int RGWRados::bucket_rebuild_index(rgw_bucket& bucket)
{
librados::IoCtx index_ctx;
- string oid;
-
- int ret = open_bucket_index(bucket, index_ctx, oid);
- if (ret < 0)
- return ret;
+ vector<string> bucket_objs;
+ int r = open_bucket_index(bucket, index_ctx, bucket_objs);
+ if (r < 0)
+ return r;
- return cls_rgw_bucket_rebuild_index_op(index_ctx, oid);
+ return cls_rgw_bucket_rebuild_index_op(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio);
}
return 0;
}
-int RGWRados::get_bucket_stats(rgw_bucket& bucket, uint64_t *bucket_ver, uint64_t *master_ver, map<RGWObjCategory, RGWStorageStats>& stats,
- string *max_marker)
+int RGWRados::get_bucket_stats(rgw_bucket& bucket, string *bucket_ver, string *master_ver,
+ map<RGWObjCategory, RGWStorageStats>& stats, string *max_marker)
{
- rgw_bucket_dir_header header;
- int r = cls_bucket_head(bucket, header);
+ map<string, rgw_bucket_dir_header> headers;
+ int r = cls_bucket_head(bucket, headers);
if (r < 0)
return r;
- stats.clear();
-
- translate_raw_stats(header, stats);
-
- *bucket_ver = header.ver;
- *master_ver = header.master_ver;
-
- if (max_marker)
- *max_marker = header.max_marker;
-
+ map<string, rgw_bucket_dir_header>::iterator iter = headers.begin();
+ BucketIndexShardsManager ver_mgr;
+ BucketIndexShardsManager master_ver_mgr;
+ BucketIndexShardsManager marker_mgr;
+ char buf[64];
+ for(; iter != headers.end(); ++iter) {
+ accumulate_raw_stats(iter->second, stats);
+ snprintf(buf, sizeof(buf), "%lu", iter->second.ver);
+ ver_mgr.add_item(iter->first, string(buf));
+ snprintf(buf, sizeof(buf), "%lu", iter->second.master_ver);
+ master_ver_mgr.add_item(iter->first, string(buf));
+ marker_mgr.add_item(iter->first, iter->second.max_marker);
+ }
+ ver_mgr.to_string(bucket_ver);
+ master_ver_mgr.to_string(master_ver);
+ marker_mgr.to_string(max_marker);
return 0;
}
class RGWGetBucketStatsContext : public RGWGetDirHeader_CB {
RGWGetBucketStats_CB *cb;
+ uint32_t pendings;
+ map<RGWObjCategory, RGWStorageStats> stats;
+ int ret_code;
+ bool should_cb;
+ Mutex lock;
public:
- RGWGetBucketStatsContext(RGWGetBucketStats_CB *_cb) : cb(_cb) {}
+ RGWGetBucketStatsContext(RGWGetBucketStats_CB *_cb, uint32_t _pendings)
+ : cb(_cb), pendings(_pendings), stats(), ret_code(0), should_cb(true),
+ lock("RGWGetBucketStatsContext") {}
+
void handle_response(int r, rgw_bucket_dir_header& header) {
- map<RGWObjCategory, RGWStorageStats> stats;
+ Mutex::Locker l(lock);
+ if (should_cb) {
+ if ( r >= 0) {
+ accumulate_raw_stats(header, stats);
+ } else {
+ ret_code = r;
+ }
- if (r >= 0) {
- translate_raw_stats(header, stats);
- cb->set_response(header.ver, header.master_ver, &stats, header.max_marker);
+ // Are we all done?
+ if (--pendings == 0) {
+ if (!ret_code) {
+ cb->set_response(&stats);
+ }
+ cb->handle_response(ret_code);
+ cb->put();
+ }
}
+ }
- cb->handle_response(r);
-
- cb->put();
+ void unset_cb() {
+ Mutex::Locker l(lock);
+ should_cb = false;
}
};
int RGWRados::get_bucket_stats_async(rgw_bucket& bucket, RGWGetBucketStats_CB *ctx)
{
- RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(ctx);
- int r = cls_bucket_head_async(bucket, get_ctx);
+ RGWBucketInfo binfo;
+ int r = get_bucket_instance_info(NULL, bucket, binfo, NULL, NULL);
+ if (r < 0)
+ return r;
+
+ int num_aio = 0;
+ RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(ctx, binfo.num_shards);
+ assert(get_ctx);
+ r = cls_bucket_head_async(bucket, get_ctx, &num_aio);
+ get_ctx->put();
if (r < 0) {
ctx->put();
- delete get_ctx;
- return r;
+ if (num_aio) {
+ get_ctx->unset_cb();
+ }
}
-
- return 0;
+ return r;
}
class RGWGetUserStatsContext : public RGWGetUserHeader_CB {
RGWBucketEnt& ent = iter->second;
rgw_bucket& bucket = ent.bucket;
- rgw_bucket_dir_header header;
- int r = cls_bucket_head(bucket, header);
+ map<string, rgw_bucket_dir_header> headers;
+ int r = cls_bucket_head(bucket, headers);
if (r < 0)
return r;
- ent.count = 0;
- ent.size = 0;
-
- RGWObjCategory category = main_category;
- map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = header.stats.find((uint8_t)category);
- if (iter != header.stats.end()) {
- struct rgw_bucket_category_stats& stats = iter->second;
- ent.count = stats.num_entries;
- ent.size = stats.total_size;
- ent.size_rounded = stats.total_size_rounded;
+ map<string, rgw_bucket_dir_header>::iterator hiter = headers.begin();
+ for (; hiter != headers.end(); ++hiter) {
+ RGWObjCategory category = main_category;
+ map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = (hiter->second.stats).find((uint8_t)category);
+ if (iter != hiter->second.stats.end()) {
+ struct rgw_bucket_category_stats& stats = iter->second;
+ ent.count += stats.num_entries;
+ ent.size += stats.total_size;
+ ent.size_rounded += stats.total_size_rounded;
+ }
}
}
int RGWRados::cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeout)
{
librados::IoCtx index_ctx;
- string oid;
-
- int r = open_bucket_index(bucket, index_ctx, oid);
+ vector<string> bucket_objs;
+ int r = open_bucket_index(bucket, index_ctx, bucket_objs);
if (r < 0)
return r;
- ObjectWriteOperation o;
- cls_rgw_bucket_set_tag_timeout(o, timeout);
-
- r = index_ctx.operate(oid, &o);
-
- return r;
+ return cls_rgw_bucket_set_tag_timeout(index_ctx, bucket_objs, timeout, cct->_conf->rgw_bucket_index_max_aio);
}
int RGWRados::cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix,
return 0;
}
-int RGWRados::cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header)
+int RGWRados::cls_bucket_head(rgw_bucket& bucket, map<string, struct rgw_bucket_dir_header>& headers)
{
librados::IoCtx index_ctx;
- string oid;
- int r = open_bucket_index(bucket, index_ctx, oid);
+ map<string, struct rgw_cls_list_ret> list_results;
+ int r = open_bucket_index(bucket, index_ctx, list_results);
if (r < 0)
return r;
- r = cls_rgw_get_dir_header(index_ctx, oid, &header);
+ r = cls_rgw_get_dir_header(index_ctx, list_results, cct->_conf->rgw_bucket_index_max_aio);
if (r < 0)
return r;
+ map<string, struct rgw_cls_list_ret>::iterator iter = list_results.begin();
+ for(; iter != list_results.end(); ++iter) {
+ headers[iter->first] = iter->second.dir.header;
+ }
return 0;
}
-int RGWRados::cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx)
+int RGWRados::cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx, int *num_aio)
{
librados::IoCtx index_ctx;
- string oid;
- int r = open_bucket_index(bucket, index_ctx, oid);
- if (r < 0)
- return r;
-
- r = cls_rgw_get_dir_header_async(index_ctx, oid, ctx);
+ vector<string> bucket_objs;
+ int r = open_bucket_index(bucket, index_ctx, bucket_objs);
if (r < 0)
return r;
- return 0;
+ vector<string>::iterator iter = bucket_objs.begin();
+ for (; iter != bucket_objs.end(); ++iter) {
+ r = cls_rgw_get_dir_header_async(index_ctx, *iter, static_cast<RGWGetDirHeader_CB*>(ctx->get()));
+ if (r < 0) {
+ ctx->put();
+ break;
+ } else {
+ (*num_aio)++;
+ }
+ }
+ return r;
}
int RGWRados::cls_user_get_header(const string& user_id, cls_user_header *header)
int RGWRados::cls_user_sync_bucket_stats(rgw_obj& user_obj, rgw_bucket& bucket)
{
- rgw_bucket_dir_header header;
- int r = cls_bucket_head(bucket, header);
+ map<string, struct rgw_bucket_dir_header> headers;
+ int r = cls_bucket_head(bucket, headers);
if (r < 0) {
ldout(cct, 20) << "cls_bucket_header() returned " << r << dendl;
return r;
bucket.convert(&entry.bucket);
- map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = header.stats.begin();
- for (; iter != header.stats.end(); ++iter) {
- struct rgw_bucket_category_stats& header_stats = iter->second;
- entry.size += header_stats.total_size;
- entry.size_rounded += header_stats.total_size_rounded;
- entry.count += header_stats.num_entries;
+ map<string, struct rgw_bucket_dir_header>::iterator hiter = headers.begin();
+ for (; hiter != headers.end(); ++hiter) {
+ map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = hiter->second.stats.begin();
+ for (; iter != hiter->second.stats.end(); ++iter) {
+ struct rgw_bucket_category_stats& header_stats = iter->second;
+ entry.size += header_stats.total_size;
+ entry.size_rounded += header_stats.total_size_rounded;
+ entry.count += header_stats.num_entries;
+ }
}
list<cls_user_bucket_entry> entries;
class RGWGetBucketStats_CB : public RefCountedObject {
protected:
rgw_bucket bucket;
- uint64_t bucket_ver;
- uint64_t master_ver;
map<RGWObjCategory, RGWStorageStats> *stats;
- string max_marker;
public:
RGWGetBucketStats_CB(rgw_bucket& _bucket) : bucket(_bucket), stats(NULL) {}
virtual ~RGWGetBucketStats_CB() {}
virtual void handle_response(int r) = 0;
- virtual void set_response(uint64_t _bucket_ver, uint64_t _master_ver,
- map<RGWObjCategory, RGWStorageStats> *_stats,
- const string &_max_marker) {
- bucket_ver = _bucket_ver;
- master_ver = _master_ver;
+ virtual void set_response(map<RGWObjCategory, RGWStorageStats> *_stats) {
stats = _stats;
- max_marker = _max_marker;
}
};
template<typename T>
int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
map<string, T>& bucket_objs);
+
struct GetObjState {
librados::IoCtx io_ctx;
bool sent_data;
}
int decode_policy(bufferlist& bl, ACLOwner *owner);
- int get_bucket_stats(rgw_bucket& bucket, uint64_t *bucket_ver, uint64_t *master_ver, map<RGWObjCategory, RGWStorageStats>& stats,
- string *max_marker);
+ int get_bucket_stats(rgw_bucket& bucket, string *bucket_ver, string *master_ver,
+ map<RGWObjCategory, RGWStorageStats>& stats, string *max_marker);
int get_bucket_stats_async(rgw_bucket& bucket, RGWGetBucketStats_CB *cb);
int get_user_stats(const string& user, RGWStorageStats& stats);
int get_user_stats_async(const string& user, RGWGetUserStats_CB *cb);
int cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix, uint32_t hint_num,
map<string, RGWObjEnt>& m, bool *is_truncated, string *last_entry,
bool (*force_check_filter)(const string& name) = NULL);
- int cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header);
- int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx);
+ int cls_bucket_head(rgw_bucket& bucket, map<string, struct rgw_bucket_dir_header>& headers);
+ int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx, int *num_aio);
int prepare_update_index(RGWObjState *state, rgw_bucket& bucket,
RGWModifyOp op, rgw_obj& oid, string& tag);
int complete_update_index(rgw_bucket& bucket, string& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size,
};
class RGWOp_BILog_Info : public RGWRESTOp {
- uint64_t bucket_ver;
- uint64_t master_ver;
+ string bucket_ver;
+ string master_ver;
string max_marker;
public:
- RGWOp_BILog_Info() : bucket_ver(0), master_ver(0) {}
+ RGWOp_BILog_Info() : bucket_ver(), master_ver() {}
~RGWOp_BILog_Info() {}
int check_caps(RGWUserCaps& caps) {
#include "include/types.h"
#include "cls/rgw/cls_rgw_client.h"
+#include "cls/rgw/cls_rgw_ops.h"
#include "gtest/gtest.h"
#include "test/librados/test.h"
#include <errno.h>
#include <string>
#include <vector>
+#include <map>
using namespace librados;
void test_stats(librados::IoCtx& ioctx, string& oid, int category, uint64_t num_entries, uint64_t total_size)
{
- rgw_bucket_dir_header header;
- ASSERT_EQ(0, cls_rgw_get_dir_header(ioctx, oid, &header));
-
- rgw_bucket_category_stats& stats = header.stats[category];
- ASSERT_EQ(total_size, stats.total_size);
- ASSERT_EQ(num_entries, stats.num_entries);
+ map<string, struct rgw_cls_list_ret> results;
+ ASSERT_EQ(0, cls_rgw_get_dir_header(ioctx, results, 8));
+
+ uint64_t entries = 0;
+ uint64_t size = 0;
+ map<string, struct rgw_cls_list_ret>::iterator iter = results.begin();
+ for (; iter != results.end(); ++iter) {
+ entries += (iter->second).dir.header.stats[category].num_entries;
+ size += (iter->second).dir.header.stats[category].total_size;
+ }
+ ASSERT_EQ(total_size, size);
+ ASSERT_EQ(num_entries, entries);
}
void index_prepare(OpMgr& mgr, librados::IoCtx& ioctx, string& oid, RGWModifyOp index_op, string& tag, string& obj, string& loc)
}
op = mgr.write_op();
- cls_rgw_bucket_set_tag_timeout(*op, 1); // short tag timeout
+ vector<string> bucket_objs;
+ bucket_objs.push_back(bucket_oid);
+ cls_rgw_bucket_set_tag_timeout(ioctx, bucket_objs, 1, 8); // short tag timeout
ASSERT_EQ(0, ioctx.operate(bucket_oid, op));
sleep(1);