From 0d1f97ff318bcf6ec60c5257b8b501d792d0b16e Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 5 Dec 2014 15:52:26 -0800 Subject: [PATCH] rgw, cls_rgw: keep shard ids with oids Instead of just having the list of oids, keep the shard ids together, so that we can know on which shard the operation happened. Bucket markers are just using the shard numeric id, instead of the bucket instance shard id. This makes it easier to parse the markers appropriately. Signed-off-by: Yehuda Sadeh --- src/cls/rgw/cls_rgw_client.cc | 8 +-- src/cls/rgw/cls_rgw_client.h | 13 ++-- src/rgw/rgw_rados.cc | 127 ++++++++++++++++++---------------- src/rgw/rgw_rados.h | 13 ++-- 4 files changed, 85 insertions(+), 76 deletions(-) diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index 51f1d0fbae2fb..1f7ff4fc05e6b 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -52,7 +52,7 @@ void BucketIndexAioManager::do_completion(int id) { // for further processing map::iterator miter = pending_objs.find(id); if (miter != pending_objs.end()) { - completion_objs.push_back(miter->second); + completion_objs[id] = miter->second; pending_objs.erase(miter); } @@ -60,7 +60,7 @@ void BucketIndexAioManager::do_completion(int id) { } bool BucketIndexAioManager::wait_for_completions(int valid_ret_code, - int *num_completions, int *ret_code, vector *objs) { + int *num_completions, int *ret_code, map *objs) { lock.Lock(); if (pendings.empty() && completions.empty()) { lock.Unlock(); @@ -71,11 +71,11 @@ bool BucketIndexAioManager::wait_for_completions(int valid_ret_code, // Clear the completed AIOs list::iterator iter = completions.begin(); - list::iterator liter = completion_objs.begin(); + map::iterator liter = completion_objs.begin(); for (; iter != completions.end() && liter != completion_objs.end(); ++iter, ++liter) { int r = (*iter)->get_return_value(); if (objs && r == 0) { - objs->push_back(*liter); + (*objs)[liter->first] = liter->second; } if (ret_code && (r < 0 && r != valid_ret_code)) (*ret_code) = r; diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index cd6e852baaac5..efe6d517ed151 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -32,7 +32,7 @@ private: map pendings; list completions; map pending_objs; - list completion_objs; + map completion_objs; int next; Mutex lock; Cond cond; @@ -71,8 +71,7 @@ public: /* * Create a new instance. */ - BucketIndexAioManager() : pendings(), completions(), pending_objs(), completion_objs(), - next(0), lock("BucketIndexAioManager::lock"), cond() {} + BucketIndexAioManager() : next(0), lock("BucketIndexAioManager::lock") {} /* @@ -91,7 +90,7 @@ public: * Return false if there is no pending AIO, true otherwise. */ bool wait_for_completions(int valid_ret_code, int *num_completions, int *ret_code, - vector *objs); + map *objs); /** * Do aio read operation. @@ -214,8 +213,8 @@ protected: // OP needs to be re-send until a certain code is returned. virtual bool need_multiple_rounds() { return false; } // Add a new object to the end of the container. - virtual void add_object(const string& oid) {} - virtual void reset_container(vector& objs) {} + virtual void add_object(int shard, const string& oid) {} + virtual void reset_container(map& objs) {} public: CLSRGWConcurrentIO(librados::IoCtx& ioc, map& _objs_container, @@ -232,7 +231,7 @@ public: } int num_completions, r = 0; - vector objs; + map objs; while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r, &objs)) { if (r >= 0 && ret >= 0) { for(int i = 0; i < num_completions && iter != objs_container.end(); ++i, ++iter) { diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 27608572c5220..2f18f73142d06 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -1676,10 +1676,10 @@ int RGWRados::open_bucket_data_extra_ctx(rgw_bucket& bucket, librados::IoCtx& da return 0; } -void RGWRados::build_bucket_index_marker(const string& shard_name, const string& shard_marker, +void RGWRados::build_bucket_index_marker(const string& shard_id_str, const string& shard_marker, string *marker) { if (marker) { - *marker = shard_name; + *marker = shard_id_str; marker->append(BucketIndexShardsManager::KEY_VALUE_SEPARATOR); marker->append(shard_marker); } @@ -2386,7 +2386,7 @@ int RGWRados::init_bucket_index(rgw_bucket& bucket) string dir_oid = dir_oid_prefix; dir_oid.append(bucket.marker); - vector bucket_objs; + map bucket_objs; get_bucket_index_objects(dir_oid, bucket_index_max_shards, bucket_objs); return CLSRGWIssueBucketIndexInit(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)(); @@ -2486,15 +2486,15 @@ int RGWRados::create_bucket(RGWUserInfo& owner, rgw_bucket& bucket, /* remove bucket index */ librados::IoCtx index_ctx; // context for new bucket - vector bucket_objs; + map bucket_objs; int r = open_bucket_index(bucket, index_ctx, bucket_objs); if (r < 0) return r; - vector::const_iterator biter; + map::const_iterator biter; for (biter = bucket_objs.begin(); biter != bucket_objs.end(); ++biter) { // Do best effort removal - index_ctx.remove(*biter); + index_ctx.remove(biter->second); } } /* ret == -ENOENT here */ @@ -3834,7 +3834,7 @@ int RGWRados::open_bucket_index_base(rgw_bucket& bucket, librados::IoCtx& index_ } int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, - vector& bucket_objs, int shard_id, vector *bucket_instance_ids) { + map& bucket_objs, int shard_id, map *bucket_instance_ids) { string bucket_oid_base; int ret = open_bucket_index_base(bucket, index_ctx, bucket_oid_base); if (ret < 0) @@ -3855,16 +3855,16 @@ int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, template int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, - map& bucket_objs, int shard_id, vector *bucket_instance_ids) + map& oids, map& bucket_objs, + int shard_id, map *bucket_instance_ids) { - vector oids; int ret = open_bucket_index(bucket, index_ctx, oids, shard_id, bucket_instance_ids); if (ret < 0) return ret; - vector::const_iterator iter = oids.begin(); + map::const_iterator iter = oids.begin(); for (; iter != oids.end(); ++iter) { - bucket_objs[*iter] = T(); + bucket_objs[iter->first] = T(); } return 0; } @@ -3912,17 +3912,18 @@ int RGWRados::bucket_check_index(rgw_bucket& bucket, librados::IoCtx index_ctx; // key - bucket index object id // value - bucket index check OP returned result with the given bucket index object (shard) - map bucket_objs_ret; - int ret = open_bucket_index(bucket, index_ctx, bucket_objs_ret); + map oids; + map bucket_objs_ret; + int ret = open_bucket_index(bucket, index_ctx, oids, bucket_objs_ret); if (ret < 0) return ret; - ret = CLSRGWIssueBucketCheck(index_ctx, bucket_objs_ret, cct->_conf->rgw_bucket_index_max_aio)(); + ret = CLSRGWIssueBucketCheck(index_ctx, oids, bucket_objs_ret, cct->_conf->rgw_bucket_index_max_aio)(); if (ret < 0) return ret; // Aggregate results (from different shards if there is any) - map::iterator iter; + map::iterator iter; for (iter = bucket_objs_ret.begin(); iter != bucket_objs_ret.end(); ++iter) { accumulate_raw_stats(iter->second.existing_header, *existing_stats); accumulate_raw_stats(iter->second.calculated_header, *calculated_stats); @@ -3934,7 +3935,7 @@ int RGWRados::bucket_check_index(rgw_bucket& bucket, int RGWRados::bucket_rebuild_index(rgw_bucket& bucket) { librados::IoCtx index_ctx; - vector bucket_objs; + map bucket_objs; int r = open_bucket_index(bucket, index_ctx, bucket_objs); if (r < 0) return r; @@ -5525,7 +5526,7 @@ int RGWRados::get_bucket_stats(rgw_bucket& bucket, string *bucket_ver, string *m map& stats, string *max_marker) { map headers; - vector bucket_instance_ids; + map bucket_instance_ids; int r = cls_bucket_head(bucket, headers, &bucket_instance_ids); if (r < 0) return r; @@ -5533,7 +5534,7 @@ int RGWRados::get_bucket_stats(rgw_bucket& bucket, string *bucket_ver, string *m assert(headers.size() == bucket_instance_ids.size()); map::iterator iter = headers.begin(); - vector::iterator viter = bucket_instance_ids.begin(); + map::iterator viter = bucket_instance_ids.begin(); BucketIndexShardsManager ver_mgr; BucketIndexShardsManager master_ver_mgr; BucketIndexShardsManager marker_mgr; @@ -5541,10 +5542,10 @@ int RGWRados::get_bucket_stats(rgw_bucket& bucket, string *bucket_ver, string *m for(; iter != headers.end(); ++iter, ++viter) { accumulate_raw_stats(iter->second, stats); snprintf(buf, sizeof(buf), "%lu", iter->second.ver); - ver_mgr.add(*viter, string(buf)); + ver_mgr.add(viter->first, string(buf)); snprintf(buf, sizeof(buf), "%lu", iter->second.master_ver); - master_ver_mgr.add(*viter, string(buf)); - marker_mgr.add(*viter, iter->second.max_marker); + master_ver_mgr.add(viter->first, string(buf)); + marker_mgr.add(viter->first, iter->second.max_marker); } ver_mgr.to_string(bucket_ver); master_ver_mgr.to_string(master_ver); @@ -6162,33 +6163,39 @@ int RGWRados::list_bi_log_entries(rgw_bucket& bucket, int shard_id, string& mark result.clear(); librados::IoCtx index_ctx; - map bi_log_lists; - vector bucket_instance_ids; - int r = open_bucket_index(bucket, index_ctx, bi_log_lists, shard_id, &bucket_instance_ids); + map oids; + map bi_log_lists; + map bucket_instance_ids; + int r = open_bucket_index(bucket, index_ctx, oids, shard_id, &bucket_instance_ids); if (r < 0) return r; BucketIndexShardsManager marker_mgr; - bool has_shards = (bi_log_lists.size() > 1 || shard_id >= 0); + bool has_shards = (oids.size() > 1 || shard_id >= 0); // If there are multiple shards for the bucket index object, the marker - // should have the pattern '{shard_oid_1}#{shard_marker_1},{shard_oid_2}# + // should have the pattern '{shard_id_1}#{shard_marker_1},{shard_id_2}# // {shard_marker_2}...', if there is no sharding, the bi_log_list should - // only contain one record, and the key is the bucket index object id. - r = marker_mgr.from_string(marker, has_shards, bi_log_lists.begin()->first); + // only contain one record, and the key is the bucket instance id. + r = marker_mgr.from_string(marker, has_shards); if (r < 0) return r; - r = CLSRGWIssueBILogList(index_ctx, marker_mgr, max, bi_log_lists, cct->_conf->rgw_bucket_index_max_aio)(); + r = CLSRGWIssueBILogList(index_ctx, marker_mgr, max, oids, bi_log_lists, cct->_conf->rgw_bucket_index_max_aio)(); if (r < 0) return r; + vector shard_ids_str; vector::iterator> vcurrents; vector::iterator> vends; if (truncated) { *truncated = false; } - map::iterator miter = bi_log_lists.begin(); + map::iterator miter = bi_log_lists.begin(); for (; miter != bi_log_lists.end(); ++miter) { + int shard_id = miter->first; + char buf[16]; + snprintf(buf, sizeof(buf), "%d", shard_id); + shard_ids_str.push_back(buf); vcurrents.push_back(miter->second.entries.begin()); vends.push_back(miter->second.entries.end()); if (truncated) { @@ -6202,17 +6209,17 @@ int RGWRados::list_bi_log_entries(rgw_bucket& bucket, int shard_id, string& mark for (size_t i = 0; result.size() < max && i < vcurrents.size() && vcurrents[i] != vends[i]; ++vcurrents[i], ++i) { + string& shard_str = shard_ids_str[i]; if (vcurrents[i] != vends[i]) { rgw_bi_log_entry& entry = *(vcurrents[i]); - string& name = bucket_instance_ids[i]; if (has_shards) { // Put the shard name as part of the ID, so that caller can easy find out // the next marker string tmp_id; - build_bucket_index_marker(name, entry.id, &tmp_id); + build_bucket_index_marker(shard_str, entry.id, &tmp_id); entry.id.swap(tmp_id); } - marker_mgr.add(name, entry.id); + marker_mgr.add(i, entry.id); result.push_back(entry); has_more = true; } @@ -6240,18 +6247,18 @@ int RGWRados::list_bi_log_entries(rgw_bucket& bucket, int shard_id, string& mark int RGWRados::trim_bi_log_entries(rgw_bucket& bucket, int shard_id, string& start_marker, string& end_marker) { librados::IoCtx index_ctx; - vector bucket_objs; + map bucket_objs; int r = open_bucket_index(bucket, index_ctx, bucket_objs, shard_id); if (r < 0) return r; bool has_shards = bucket_objs.size() > 1 || shard_id >= 0; BucketIndexShardsManager start_marker_mgr; - r = start_marker_mgr.from_string(start_marker, has_shards, bucket_objs.front()); + r = start_marker_mgr.from_string(start_marker, has_shards); if (r < 0) return r; BucketIndexShardsManager end_marker_mgr; - r = end_marker_mgr.from_string(end_marker, has_shards, bucket_objs.front()); + r = end_marker_mgr.from_string(end_marker, has_shards); if (r < 0) return r; @@ -6357,7 +6364,7 @@ int RGWRados::cls_obj_complete_cancel(BucketShard& bs, string& tag, string& name int RGWRados::cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeout) { librados::IoCtx index_ctx; - vector bucket_objs; + map bucket_objs; int r = open_bucket_index(bucket, index_ctx, bucket_objs); if (r < 0) return r; @@ -6374,12 +6381,13 @@ int RGWRados::cls_bucket_list(rgw_bucket& bucket, const string& start, const str librados::IoCtx index_ctx; // key - oid (for different shards if there is any) // value - list result for the corresponding oid (shard), it is filled by the AIO callback - map list_results; - int r = open_bucket_index(bucket, index_ctx, list_results); + map oids; + map list_results; + int r = open_bucket_index(bucket, index_ctx, oids); if (r < 0) return r; - r = CLSRGWIssueBucketList(index_ctx, start, prefix, num_entries, list_results, cct->_conf->rgw_bucket_index_max_aio)(); + r = CLSRGWIssueBucketList(index_ctx, start, prefix, num_entries, oids, list_results, cct->_conf->rgw_bucket_index_max_aio)(); if (r < 0) return r; @@ -6387,12 +6395,12 @@ int RGWRados::cls_bucket_list(rgw_bucket& bucket, const string& start, const str vector::iterator> vcurrents(list_results.size()); vector::iterator> vends(list_results.size()); vector vnames(list_results.size()); - map::iterator iter = list_results.begin(); + map::iterator iter = list_results.begin(); *is_truncated = false; for (; iter != list_results.end(); ++iter) { vcurrents.push_back(iter->second.dir.m.begin()); vends.push_back(iter->second.dir.m.end()); - vnames.push_back(iter->first); + vnames.push_back(oids[iter->first]); *is_truncated = (*is_truncated || iter->second.is_truncated); } @@ -6663,21 +6671,22 @@ int RGWRados::check_disk_state(librados::IoCtx io_ctx, return 0; } -int RGWRados::cls_bucket_head(rgw_bucket& bucket, map& headers, vector *bucket_instance_ids) +int RGWRados::cls_bucket_head(rgw_bucket& bucket, map& headers, map *bucket_instance_ids) { librados::IoCtx index_ctx; - map list_results; - int r = open_bucket_index(bucket, index_ctx, list_results, -1, bucket_instance_ids); + map oids; + map list_results; + int r = open_bucket_index(bucket, index_ctx, oids, list_results, -1, bucket_instance_ids); if (r < 0) return r; - r = CLSRGWIssueGetDirHeader(index_ctx, list_results, cct->_conf->rgw_bucket_index_max_aio)(); + r = CLSRGWIssueGetDirHeader(index_ctx, oids, list_results, cct->_conf->rgw_bucket_index_max_aio)(); if (r < 0) return r; - map::iterator iter = list_results.begin(); + map::iterator iter = list_results.begin(); for(; iter != list_results.end(); ++iter) { - headers[iter->first] = iter->second.dir.header; + headers[oids[iter->first]] = iter->second.dir.header; } return 0; } @@ -6685,14 +6694,14 @@ int RGWRados::cls_bucket_head(rgw_bucket& bucket, map bucket_objs; + map bucket_objs; int r = open_bucket_index(bucket, index_ctx, bucket_objs); if (r < 0) return r; - vector::iterator iter = bucket_objs.begin(); + map::iterator iter = bucket_objs.begin(); for (; iter != bucket_objs.end(); ++iter) { - r = cls_rgw_get_dir_header_async(index_ctx, *iter, static_cast(ctx->get())); + r = cls_rgw_get_dir_header_async(index_ctx, iter->second, static_cast(ctx->get())); if (r < 0) { ctx->put(); break; @@ -6990,46 +6999,46 @@ int RGWRados::remove_temp_objects(string date, string time) } void RGWRados::get_bucket_index_objects(const string& bucket_oid_base, - uint32_t num_shards, vector& bucket_objects, int shard_id) + uint32_t num_shards, map& bucket_objects, int shard_id) { if (!num_shards) { - bucket_objects.push_back(bucket_oid_base); + bucket_objects[0] = bucket_oid_base; } else { char buf[bucket_oid_base.size() + 32]; if (shard_id < 0) { for (uint32_t i = 0; i < num_shards; ++i) { snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), i); - bucket_objects.push_back(string(buf)); + bucket_objects[i] = buf; } } else { if ((uint32_t)shard_id > num_shards) { return; } snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), shard_id); - bucket_objects.push_back(string(buf)); + bucket_objects[shard_id] = buf; } } } -void RGWRados::get_bucket_instance_ids(RGWBucketInfo& bucket_info, int shard_id, vector *result) +void RGWRados::get_bucket_instance_ids(RGWBucketInfo& bucket_info, int shard_id, map *result) { rgw_bucket& bucket = bucket_info.bucket; string plain_id = bucket.name + ":" + bucket.bucket_id; if (!bucket_info.num_shards) { - result->push_back(plain_id); + (*result)[0] = plain_id; } else { char buf[16]; if (shard_id < 0) { for (uint32_t i = 0; i < bucket_info.num_shards; ++i) { snprintf(buf, sizeof(buf), ":%d", i); - result->push_back(plain_id + buf); + (*result)[i] = plain_id + buf; } } else { if ((uint32_t)shard_id > bucket_info.num_shards) { return; } snprintf(buf, sizeof(buf), ":%d", shard_id); - result->push_back(plain_id + buf); + (*result)[shard_id] = plain_id + buf; } } } diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index c5376b779bede..5c5a6bf3f3b1d 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1257,14 +1257,15 @@ class RGWRados int open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index_ctx, const string& obj_key, string *bucket_obj, int *shard_id); int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, - vector& bucket_objs, int shard_id = -1, vector *bucket_instance_ids = NULL); + map& bucket_objs, int shard_id = -1, map *bucket_instance_ids = NULL); template int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, - map& bucket_objs, int shard_id = -1, vector *bucket_instance_ids = NULL); - void build_bucket_index_marker(const string& shard_name, const string& shard_marker, + map& oids, map& bucket_objs, + int shard_id = -1, map *bucket_instance_ids = NULL); + void build_bucket_index_marker(const string& shard_id_str, const string& shard_marker, string *marker); - void get_bucket_instance_ids(RGWBucketInfo& bucket_info, int shard_id, vector *result); + void get_bucket_instance_ids(RGWBucketInfo& bucket_info, int shard_id, map *result); struct GetObjState { librados::IoCtx io_ctx; @@ -1867,7 +1868,7 @@ public: int cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix, uint32_t hint_num, map& m, bool *is_truncated, string *last_entry, bool (*force_check_filter)(const string& name) = NULL); - int cls_bucket_head(rgw_bucket& bucket, map& headers, vector *bucket_instance_ids = NULL); + int cls_bucket_head(rgw_bucket& bucket, map& headers, map *bucket_instance_ids = NULL); int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx, int *num_aio); int prepare_update_index(RGWObjState *state, BucketShard& bucket_shard, @@ -1974,7 +1975,7 @@ public: * bucket_objs [out] - filled by this method, a list of bucket index objects. */ void get_bucket_index_objects(const string& bucket_oid_base, uint32_t num_shards, - vector& bucket_objs, int shard_id = -1); + map& bucket_objs, int shard_id = -1); /** * Get the bucket index object with the given base bucket index object and object key, -- 2.39.5