From: Guang Yang Date: Mon, 18 Aug 2014 11:46:32 +0000 (+0000) Subject: Adjust bucket listing to work with multiple shards. X-Git-Tag: v0.92~12^2~34 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=56feee792ee6cc083e1dfc74fcb7aa181286df80;p=ceph.git Adjust bucket listing to work with multiple shards. Signed-off-by: Guang Yang (yguang@yahoo-inc.com) --- diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index 47c9dcb8db89..0d698e3c8150 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -11,6 +11,32 @@ using namespace librados; +/** + * This class represents the bucket index object operation callback context. + */ +template +class ClsBucketIndexOpCtx : public ObjectOperationCompletion { +private: + T *data; + int *ret_code; +public: + ClsBucketIndexOpCtx(T* _data, int *_ret_code) : data(_data), ret_code(_ret_code) { assert(data); } + ~ClsBucketIndexOpCtx() {} + void handle_completion(int r, bufferlist& outbl) { + if (r >= 0) { + try { + bufferlist::iterator iter = outbl.begin(); + ::decode((*data), iter); + } catch (buffer::error& err) { + r = -EIO; + } + } + if (ret_code) { + *ret_code = r; + } + } +}; + /* * Callback implementation for AIO request. */ @@ -91,7 +117,7 @@ int cls_rgw_bucket_index_init_op(librados::IoCtx& io_ctx, break; } - int num_completions, r; + int num_completions, r = 0; while (manager.wait_for_completions(-EEXIST, &num_completions, &r)) { if (r >= 0 && ret >= 0) { for(int i = 0; i < num_completions && iter != bucket_objs.end(); ++i, ++iter) { @@ -159,35 +185,57 @@ void cls_rgw_bucket_complete_op(ObjectWriteOperation& o, RGWModifyOp op, string& o.exec("rgw", "bucket_complete_op", in); } - -int cls_rgw_list_op(IoCtx& io_ctx, string& oid, string& start_obj, - string& filter_prefix, uint32_t num_entries, - rgw_bucket_dir *dir, bool *is_truncated) -{ - bufferlist in, out; +static bool issue_bucket_list_op(librados::IoCtx& io_ctx, + const string& oid, const string& start_obj, const string& filter_prefix, + uint32_t num_entries, BucketIndexAioManager *manager, + struct rgw_cls_list_ret *pdata) { + bufferlist in; struct rgw_cls_list_op call; call.start_obj = start_obj; call.filter_prefix = filter_prefix; call.num_entries = num_entries; ::encode(call, in); - int r = io_ctx.exec(oid, "rgw", "bucket_list", in, out); - if (r < 0) - return r; - struct rgw_cls_list_ret ret; - try { - bufferlist::iterator iter = out.begin(); - ::decode(ret, iter); - } catch (buffer::error& err) { - return -EIO; + librados::ObjectReadOperation op; + op.exec("rgw", "bucket_list", in, new ClsBucketIndexOpCtx(pdata, NULL)); + + BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager); + AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb); + int r = io_ctx.aio_operate(oid, c, &op, NULL); + if (r >= 0) + manager->add_pending(arg->id, c); + return r; +} + +int cls_rgw_list_op(IoCtx& io_ctx, const string& start_obj, + const string& filter_prefix, uint32_t num_entries, + map& list_results, uint32_t max_aio) +{ + int ret = 0; + BucketIndexAioManager manager; + map::iterator iter = list_results.begin(); + for (; iter != list_results.end() && max_aio-- > 0; ++iter) { + ret = issue_bucket_list_op(io_ctx, iter->first, start_obj, filter_prefix, num_entries, &manager, &iter->second); + if (ret < 0) + break; } - if (dir) - *dir = ret.dir; - if (is_truncated) - *is_truncated = ret.is_truncated; + int num_completions, r = 0; + while (manager.wait_for_completions(0, &num_completions, &r)) { + if (r >= 0 && ret >= 0) { + for (int i = 0; i < num_completions && iter != list_results.end(); ++i, ++iter) { + int issue_ret = issue_bucket_list_op(io_ctx, iter->first, start_obj, filter_prefix, num_entries, &manager, &iter->second); + if (issue_ret < 0) { + ret = issue_ret; + break; + } + } + } else if (ret >= 0) { + ret = r; + } + } - return r; + return ret; } int cls_rgw_bucket_check_index_op(IoCtx& io_ctx, string& oid, diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index 9cce78f6331a..a49b6422e7e8 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -101,9 +101,26 @@ void cls_rgw_bucket_complete_op(librados::ObjectWriteOperation& o, RGWModifyOp o rgw_bucket_entry_ver& ver, string& name, rgw_bucket_dir_entry_meta& dir_meta, list *remove_objs, bool log_op); -int cls_rgw_list_op(librados::IoCtx& io_ctx, string& oid, string& start_obj, - string& filter_prefix, uint32_t num_entries, - rgw_bucket_dir *dir, bool *is_truncated); +/** + * List the bucket with the starting object and filter prefix. + * NOTE: this method do listing requests for each bucket index shards identified by + * the keys of the *list_results* map, which means the map should be popludated + * by the caller to fill with each bucket index object id. + * + * io_ctx - IO context for rados. + * start_obj - marker for the listing. + * filter_prefix - filter prefix. + * num_entries - number of entries to request for each object (note the total + * amount of entries returned depends on the number of shardings). + * list_results - the list results keyed by bucket index object id. + * max_aio - the maximum number of AIO (for throttling). + * + * Return 0 on success, a failure code otherwise. +*/ +int cls_rgw_list_op(librados::IoCtx& io_ctx, const string & start_obj, + const string& filter_prefix, uint32_t num_entries, + map& list_results, + uint32_t max_aio); int cls_rgw_bucket_check_index_op(librados::IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *existing_header, diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index 4afe1ae10192..c09cdc2f26c8 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -729,9 +729,9 @@ int RGWBucket::check_object_index(RGWBucketAdminOpState& op_state, while (is_truncated) { map result; - int r = store->cls_bucket_list(bucket, marker, prefix, 1000, result, - &is_truncated, &marker, - bucket_object_check_filter); + int r = store->cls_bucket_list(bucket, marker, prefix, 1000, + result, &is_truncated, &marker, + bucket_object_check_filter); if (r == -ENOENT) { break; diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index f53727bb0a33..235047e296f5 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -19,6 +19,7 @@ #include "rgw_metadata.h" #include "rgw_bucket.h" +#include "cls/rgw/cls_rgw_ops.h" #include "cls/rgw/cls_rgw_types.h" #include "cls/rgw/cls_rgw_client.h" #include "cls/refcount/cls_refcount_client.h" @@ -2326,6 +2327,10 @@ int RGWRados::list_objects(rgw_bucket& bucket, int max, string& prefix, string& result.push_back(ent); count++; } + + // Either the back-end telling us truncated, or we don't consume all + // items returned per the amount caller request + truncated = (truncated || eiter != ent_map.end()); } done: @@ -3811,6 +3816,22 @@ int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, return 0; } +template +int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, + map& bucket_objs) +{ + vector oids; + int ret = open_bucket_index(bucket, index_ctx, oids); + if (ret < 0) + return ret; + + vector::const_iterator iter = oids.begin(); + for (; iter != oids.end(); ++iter) { + bucket_objs[*iter] = T(); + } + return 0; +} + int RGWRados::open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index_ctx, const string& obj_key, string *bucket_obj) { @@ -6203,31 +6224,56 @@ int RGWRados::cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeou return r; } -int RGWRados::cls_bucket_list(rgw_bucket& bucket, string start, string prefix, - uint32_t num, map& m, - bool *is_truncated, string *last_entry, - bool (*force_check_filter)(const string& name)) +int RGWRados::cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix, + uint32_t num_entries, map& m, bool *is_truncated, + string *last_entry, bool (*force_check_filter)(const string& name)) { - ldout(cct, 10) << "cls_bucket_list " << bucket << " start " << start << " num " << num << dendl; + ldout(cct, 10) << "cls_bucket_list " << bucket << " start " << start << " num_entries " << num_entries << dendl; librados::IoCtx index_ctx; - string oid; - int r = open_bucket_index(bucket, index_ctx, oid); + // key - oid (for different shards if there is any) + // value - list result for the corresponding oid (shard), it is filled by the AIO callback + map list_results; + int r = open_bucket_index(bucket, index_ctx, list_results); if (r < 0) return r; - struct rgw_bucket_dir dir; - r = cls_rgw_list_op(index_ctx, oid, start, prefix, num, &dir, is_truncated); + r = cls_rgw_list_op(index_ctx, start, prefix, num_entries, list_results, cct->_conf->rgw_bucket_index_max_aio); if (r < 0) return r; - map::iterator miter; - bufferlist updates; - for (miter = dir.m.begin(); miter != dir.m.end(); ++miter) { - RGWObjEnt e; - rgw_bucket_dir_entry& dirent = miter->second; + // Create a list of iterators that are used to iterate each shard + vector::iterator> vcurrents(list_results.size()); + vector::iterator> vends(list_results.size()); + vector vnames(list_results.size()); + map::iterator iter = list_results.begin(); + *is_truncated = false; + for (; iter != list_results.end(); ++iter) { + vcurrents.push_back(iter->second.dir.m.begin()); + vends.push_back(iter->second.dir.m.end()); + vnames.push_back(iter->first); + *is_truncated = (*is_truncated || iter->second.is_truncated); + } + + // Create a map to track the next candidate entry from each shard, if the entry + // from a specified shard is selected/erased, the next entry from that shard will + // be inserted for next round selection + map candidates; + for (size_t i = 0; i < vcurrents.size(); ++i) { + if (vcurrents[i] != vends[i]) { + candidates[vcurrents[i]->second.name] = i; + } + } + + map updates; + uint32_t count = 0; + while (count < num_entries && !candidates.empty()) { + // Select the next one + int pos = candidates.begin()->second; + struct rgw_bucket_dir_entry& dirent = vcurrents[pos]->second; // fill it in with initial values; we may correct later + RGWObjEnt e; e.name = dirent.name; e.size = dirent.meta.size; e.mtime = dirent.meta.mtime; @@ -6237,20 +6283,13 @@ int RGWRados::cls_bucket_list(rgw_bucket& bucket, string start, string prefix, e.content_type = dirent.meta.content_type; e.tag = dirent.tag; - /* oh, that shouldn't happen! */ - if (e.name.empty()) { - ldout(cct, 0) << "WARNING: got empty dirent name, skipping" << dendl; - continue; - } - bool force_check = force_check_filter && force_check_filter(dirent.name); - if (!dirent.exists || !dirent.pending_map.empty() || force_check) { /* there are uncommitted ops. We need to check the current state, * and if the tags are old we need to do cleanup as well. */ librados::IoCtx sub_ctx; sub_ctx.dup(index_ctx); - r = check_disk_state(sub_ctx, bucket, dirent, e, updates); + r = check_disk_state(sub_ctx, bucket, dirent, e, updates[vnames[pos]]); if (r < 0) { if (r == -ENOENT) continue; @@ -6260,21 +6299,37 @@ int RGWRados::cls_bucket_list(rgw_bucket& bucket, string start, string prefix, } m[e.name] = e; ldout(cct, 10) << "RGWRados::cls_bucket_list: got " << e.name << dendl; + + // Refresh the candidates map + candidates.erase(candidates.begin()); + ++vcurrents[pos]; + if (vcurrents[pos] != vends[pos]) { + candidates[vcurrents[pos]->second.name] = pos; + } + ++count; } - if (dir.m.size()) { - *last_entry = dir.m.rbegin()->first; + // Suggest updates if there is any + map::iterator miter = updates.begin(); + for (; miter != updates.end(); ++miter) { + if (miter->second.length()) { + ObjectWriteOperation o; + cls_rgw_suggest_changes(o, miter->second); + // we don't care if we lose suggested updates, send them off blindly + AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL); + index_ctx.aio_operate(miter->first, c, &o); + c->release(); + } } - if (updates.length()) { - ObjectWriteOperation o; - cls_rgw_suggest_changes(o, updates); - // we don't care if we lose suggested updates, send them off blindly - AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL); - r = index_ctx.aio_operate(oid, c, &o); - c->release(); + // Check if all the returned entries are consumed or not + for (size_t i = 0; i < vcurrents.size(); ++i) { + if (vcurrents[i] != vends[i]) + *is_truncated = true; } - return m.size(); + if (m.size()) + *last_entry = m.rbegin()->first; + return 0; } int RGWRados::cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info) diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 556b4cf685ef..cec51c95367d 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1266,6 +1266,9 @@ class RGWRados const string& obj_key, string *bucket_obj); int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, vector& bucket_objs); + template + int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, + map& bucket_objs); struct GetObjState { librados::IoCtx io_ctx; bool sent_data; @@ -1854,9 +1857,9 @@ public: int cls_obj_complete_del(rgw_bucket& bucket, string& tag, int64_t pool, uint64_t epoch, string& name); int cls_obj_complete_cancel(rgw_bucket& bucket, string& tag, string& name); int cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeout); - int cls_bucket_list(rgw_bucket& bucket, string start, string prefix, uint32_t num, - map& m, bool *is_truncated, - string *last_entry, bool (*force_check_filter)(const string& name) = NULL); + int cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix, uint32_t hint_num, + map& m, bool *is_truncated, string *last_entry, + bool (*force_check_filter)(const string& name) = NULL); int cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header); int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx); int prepare_update_index(RGWObjState *state, rgw_bucket& bucket,