From f9b280ea8943de38d27a5abd876f8165bd5fef3f Mon Sep 17 00:00:00 2001 From: Guang Yang Date: Tue, 23 Sep 2014 23:14:24 +0000 Subject: [PATCH] Adjust bi log listing to work with multiple bucket shards. Signed-off-by: Guang Yang (yguang@yahoo-inc.com) --- src/cls/rgw/cls_rgw_client.cc | 98 +++++++----------------- src/cls/rgw/cls_rgw_client.h | 136 ++++++++++++++++++++++++++-------- src/rgw/rgw_rados.cc | 91 ++++++++++++++++++++--- src/rgw/rgw_rados.h | 2 + 4 files changed, 214 insertions(+), 113 deletions(-) diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index e253474d7bebc..0632b3a7355c0 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -11,6 +11,9 @@ using namespace librados; +const string BucketIndexShardsManager::KEY_VALUE_SEPARATOR = "#"; +const string BucketIndexShardsManager::SHARDS_SEPARATOR = ","; + /** * This class represents the bucket index object operation callback context. */ @@ -37,15 +40,6 @@ public: } }; -/* - * Callback implementation for AIO request. - */ -static void bucket_index_op_completion_cb(void* cb, void* arg) { - BucketIndexAioArg* cb_arg = (BucketIndexAioArg*) arg; - cb_arg->manager->do_completion(cb_arg->id); - cb_arg->put(); -} - void BucketIndexAioManager::do_completion(int id) { Mutex::Locker l(lock); @@ -95,12 +89,7 @@ static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx, librados::ObjectWriteOperation op; op.create(true); op.exec("rgw", "bucket_init_index", in); - BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager); - AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb); - int r = io_ctx.aio_operate(oid, c, &op); - if (r >= 0) - manager->add_pending(arg->id, c); - return r; + return manager->aio_operate(io_ctx, oid, &op); } static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx, @@ -111,13 +100,7 @@ static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx, ::encode(call, in); ObjectWriteOperation op; op.exec("rgw", "bucket_set_tag_timeout", in); - BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager); - AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb); - int r = io_ctx.aio_operate(oid, c, &op); - if (r >= 0) { - manager->add_pending(arg->id, c); - } - return r; + return manager->aio_operate(io_ctx, oid, &op); } int CLSRGWIssueBucketIndexInit::issue_op() @@ -184,13 +167,7 @@ static bool issue_bucket_list_op(librados::IoCtx& io_ctx, librados::ObjectReadOperation op; op.exec("rgw", "bucket_list", in, new ClsBucketIndexOpCtx(pdata, NULL)); - - BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager); - AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb); - int r = io_ctx.aio_operate(oid, c, &op, NULL); - if (r >= 0) - manager->add_pending(arg->id, c); - return r; + return manager->aio_operate(io_ctx, oid, &op); } int CLSRGWIssueBucketList::issue_op() @@ -198,19 +175,32 @@ int CLSRGWIssueBucketList::issue_op() return issue_bucket_list_op(io_ctx, iter->first, start_obj, filter_prefix, num_entries, &manager, &iter->second); } +static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, + const string& oid, BucketIndexShardsManager& marker_mgr, uint32_t max, BucketIndexAioManager *manager, + struct cls_rgw_bi_log_list_ret *pdata) { + bufferlist in; + cls_rgw_bi_log_list_op call; + call.marker = marker_mgr.get(oid, ""); + call.max = max; + ::encode(call, in); + + librados::ObjectReadOperation op; + op.exec("rgw", "bi_log_list", in, new ClsBucketIndexOpCtx(pdata, NULL)); + return manager->aio_operate(io_ctx, oid, &op); +} + +int CLSRGWIssueBILogList::issue_op() +{ + return issue_bi_log_list_op(io_ctx, iter->first, marker_mgr, max, &manager, &iter->second); +} + static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager, struct rgw_cls_check_index_ret *pdata) { bufferlist in; librados::ObjectReadOperation op; op.exec("rgw", "bucket_check_index", in, new ClsBucketIndexOpCtx( pdata, NULL)); - BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager); - AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb); - int r = io_ctx.aio_operate(oid, c, &op, NULL); - if (r >= 0) { - manager->add_pending(arg->id, c); - } - return r; + return manager->aio_operate(io_ctx, oid, &op); } int CLSRGWIssueBucketCheck::issue_op() @@ -223,13 +213,7 @@ static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid, bufferlist in; librados::ObjectWriteOperation op; op.exec("rgw", "bucket_rebuild_index", in); - BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager); - AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb); - int r = io_ctx.aio_operate(oid, c, &op); - if (r >= 0) { - manager->add_pending(arg->id, c); - } - return r; + return manager->aio_operate(io_ctx, oid, &op); } int CLSRGWIssueBucketRebuild::issue_op() @@ -291,34 +275,6 @@ int cls_rgw_get_dir_header_async(IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB return 0; } -int cls_rgw_bi_log_list(IoCtx& io_ctx, string& oid, string& marker, uint32_t max, - list& entries, bool *truncated) -{ - bufferlist in, out; - cls_rgw_bi_log_list_op call; - call.marker = marker; - call.max = max; - ::encode(call, in); - int r = io_ctx.exec(oid, "rgw", "bi_log_list", in, out); - if (r < 0) - return r; - - cls_rgw_bi_log_list_ret ret; - try { - bufferlist::iterator iter = out.begin(); - ::decode(ret, iter); - } catch (buffer::error& err) { - return -EIO; - } - - entries = ret.entries; - - if (truncated) - *truncated = ret.truncated; - - return r; -} - int cls_rgw_bi_log_trim(IoCtx& io_ctx, string& oid, string& start_marker, string& end_marker) { do { diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index ebd005fba335d..e64d884caa99d 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -2,11 +2,26 @@ #define CEPH_CLS_RGW_CLIENT_H #include "include/types.h" +#include "include/str_list.h" #include "include/rados/librados.hpp" #include "cls_rgw_types.h" #include "cls_rgw_ops.h" #include "common/RefCountedObj.h" +// Forward declaration +class BucketIndexAioManager; + +/* + * Bucket index AIO request argument, this is used to pass a argument + * to callback. + */ +struct BucketIndexAioArg : public RefCountedObject { + BucketIndexAioArg(int _id, BucketIndexAioManager* _manager) : + id(_id), manager(_manager) {} + int id; + BucketIndexAioManager* manager; +}; + /* * This class manages AIO completions. This class is not completely thread-safe, * methods like *get_next* is not thread-safe and is expected to be called from @@ -19,12 +34,21 @@ private: int next; Mutex lock; Cond cond; -public: /* - * Create a new instance. + * Callback implementation for AIO request. */ - BucketIndexAioManager() : pendings(), completions(), next(0), - lock("BucketIndexAioManager::lock"), cond() {} + static void bucket_index_op_completion_cb(void* cb, void* arg) { + BucketIndexAioArg* cb_arg = (BucketIndexAioArg*) arg; + cb_arg->manager->do_completion(cb_arg->id); + cb_arg->put(); + } + + /* + * Get next request ID. This method is not thread-safe. + * + * Return next request ID. + */ + int get_next() { return next++; } /* * Add a new pending AIO completion instance. @@ -36,18 +60,17 @@ public: Mutex::Locker l(lock); pendings[id] = completion; } - +public: /* - * Do completion for the given AIO request. + * Create a new instance. */ - void do_completion(int id); + BucketIndexAioManager() : pendings(), completions(), next(0), + lock("BucketIndexAioManager::lock"), cond() {} /* - * Get next request ID. This method is not thread-safe. - * - * Return next request ID. + * Do completion for the given AIO request. */ - int get_next() { return next++; } + void do_completion(int id); /* * Wait for AIO completions. @@ -59,17 +82,32 @@ public: * Return false if there is no pending AIO, true otherwise. */ bool wait_for_completions(int valid_ret_code, int *num_completions, int *ret_code); -}; -/* - * Bucket index AIO request argument, this is used to pass a argument - * to callback. - */ -struct BucketIndexAioArg : public RefCountedObject { - BucketIndexAioArg(int _id, BucketIndexAioManager* _manager) : - id(_id), manager(_manager) {} - int id; - BucketIndexAioManager* manager; + /** + * Do aio read operation. + */ + bool aio_operate(librados::IoCtx& io_ctx, const string& oid, librados::ObjectReadOperation *op) { + BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this); + librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb); + int r = io_ctx.aio_operate(oid, c, (librados::ObjectReadOperation*)op, NULL); + if (r >= 0) { + add_pending(arg->id, c); + } + return r; + } + + /** + * Do aio write operation. + */ + bool aio_operate(librados::IoCtx& io_ctx, const string& oid, librados::ObjectWriteOperation *op) { + BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this); + librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb); + int r = io_ctx.aio_operate(oid, c, (librados::ObjectWriteOperation*)op); + if (r >= 0) { + add_pending(arg->id, c); + } + return r; + } }; class RGWGetDirHeader_CB : public RefCountedObject { @@ -82,12 +120,23 @@ class BucketIndexShardsManager { private: // Per shard setting manager, for example, marker. map value_by_shards; - const static char KEY_VALUE_SEPARATOR = '#'; - const static char SHARDS_SEPARATOR = ','; public: - void add_item(const string& shard, const string& value) { + const static string KEY_VALUE_SEPARATOR; + const static string SHARDS_SEPARATOR; + + void add(const string& shard, const string& value) { value_by_shards[shard] = value; } + + const string& get(const string& shard, const string& default_value) { + map::iterator iter = value_by_shards.find(shard); + return (iter == value_by_shards.end() ? default_value : iter->second); + } + + bool empty() { + return value_by_shards.empty(); + } + void to_string(string *out) const { if (out) { map::const_iterator iter = value_by_shards.begin(); @@ -98,15 +147,34 @@ public: for (; iter != value_by_shards.end(); ++iter) { if (out->length()) { // Not the first item, append a separator first - out->append(1, SHARDS_SEPARATOR); + out->append(SHARDS_SEPARATOR); } out->append(iter->first); - out->append(1, KEY_VALUE_SEPARATOR); + out->append(KEY_VALUE_SEPARATOR); out->append(iter->second); } } } } + + int from_string(const string& composed_marker, bool has_shards, const string& oid) { + value_by_shards.clear(); + if (!has_shards) { + add(oid, composed_marker); + } else { + list shards; + get_str_list(composed_marker, SHARDS_SEPARATOR.c_str(), shards); + list::const_iterator iter = shards.begin(); + for (; iter != shards.end(); ++iter) { + size_t pos = iter->find(KEY_VALUE_SEPARATOR); + if (pos == string::npos) + return -EINVAL; + string name = iter->substr(0, pos); + value_by_shards[name] = iter->substr(pos + 1, iter->length() - pos - 1); + } + } + return 0; + } }; /* bucket index */ @@ -223,6 +291,18 @@ public: start_obj(_start_obj), filter_prefix(_filter_prefix), num_entries(_num_entries) {} }; +class CLSRGWIssueBILogList : public CLSRGWConcurrentIO > { + BucketIndexShardsManager& marker_mgr; + uint32_t max; +protected: + int issue_op(); +public: + CLSRGWIssueBILogList(librados::IoCtx& io_ctx, BucketIndexShardsManager& _marker_mgr, uint32_t _max, + map& bi_log_lists, uint32_t max_aio) : + CLSRGWConcurrentIO >(io_ctx, bi_log_lists, max_aio), + marker_mgr(_marker_mgr), max(_max) {} +}; + /** * Check the bucket index. * @@ -266,10 +346,6 @@ void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist void cls_rgw_suggest_changes(librados::ObjectWriteOperation& o, bufferlist& updates); -/* bucket index log */ - -int cls_rgw_bi_log_list(librados::IoCtx& io_ctx, string& oid, string& marker, uint32_t max, - list& entries, bool *truncated); int cls_rgw_bi_log_trim(librados::IoCtx& io_ctx, string& oid, string& start_marker, string& end_marker); /* usage logging */ diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 605b3ffee685b..93c2f9edd238d 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -1676,6 +1676,15 @@ int RGWRados::open_bucket_data_extra_ctx(rgw_bucket& bucket, librados::IoCtx& da return 0; } +void RGWRados::build_bucket_index_marker(const string& shard_name, const string& shard_marker, + string *marker) { + if (marker) { + *marker = shard_name; + marker->append(BucketIndexShardsManager::KEY_VALUE_SEPARATOR); + marker->append(shard_marker); + } +} + int RGWRados::open_bucket_index_ctx(rgw_bucket& bucket, librados::IoCtx& index_ctx) { int r = open_bucket_pool_ctx(bucket.name, bucket.index_pool, index_ctx); @@ -5479,10 +5488,10 @@ int RGWRados::get_bucket_stats(rgw_bucket& bucket, string *bucket_ver, string *m for(; iter != headers.end(); ++iter) { accumulate_raw_stats(iter->second, stats); snprintf(buf, sizeof(buf), "%lu", iter->second.ver); - ver_mgr.add_item(iter->first, string(buf)); + ver_mgr.add(iter->first, string(buf)); snprintf(buf, sizeof(buf), "%lu", iter->second.master_ver); - master_ver_mgr.add_item(iter->first, string(buf)); - marker_mgr.add_item(iter->first, iter->second.max_marker); + master_ver_mgr.add(iter->first, string(buf)); + marker_mgr.add(iter->first, iter->second.max_marker); } ver_mgr.to_string(bucket_ver); master_ver_mgr.to_string(master_ver); @@ -6096,22 +6105,80 @@ int RGWRados::list_raw_objects(rgw_bucket& pool, const string& prefix_filter, int RGWRados::list_bi_log_entries(rgw_bucket& bucket, string& marker, uint32_t max, std::list& result, bool *truncated) { + ldout(cct, 20) << __func__ << bucket << " marker " << marker << " max " << max << dendl; result.clear(); librados::IoCtx index_ctx; - string oid; - int r = open_bucket_index(bucket, index_ctx, oid); + map bi_log_lists; + int r = open_bucket_index(bucket, index_ctx, bi_log_lists); if (r < 0) return r; - std::list entries; - int ret = cls_rgw_bi_log_list(index_ctx, oid, marker, max - result.size(), entries, truncated); - if (ret < 0) - return ret; + BucketIndexShardsManager marker_mgr; + bool has_shards = (bi_log_lists.size() > 1); + // If there are multiple shards for the bucket index object, the marker + // should have the pattern '{shard_oid_1}#{shard_marker_1},{shard_oid_2}# + // {shard_marker_2}...', if there is no sharding, the bi_log_list should + // only contain one record, and the key is the bucket index object id. + r = marker_mgr.from_string(marker, has_shards, bi_log_lists.begin()->first); + if (r < 0) + return r; + + r = CLSRGWIssueBILogList(index_ctx, marker_mgr, max, bi_log_lists, cct->_conf->rgw_bucket_index_max_aio)(); + if (r < 0) + return r; + + vector::iterator> vcurrents; + vector::iterator> vends; + vector vnames; + if (truncated) { + *truncated = false; + } + map::iterator miter = bi_log_lists.begin(); + for (; miter != bi_log_lists.end(); ++miter) { + vnames.push_back(miter->first); + vcurrents.push_back(miter->second.entries.begin()); + vends.push_back(miter->second.entries.end()); + if (truncated) { + *truncated = (*truncated || miter->second.truncated); + } + } + + bool has_more = true; + while (result.size() < max && has_more) { + has_more = false; + for (size_t i = 0; + result.size() < max && i < vcurrents.size() && vcurrents[i] != vends[i]; + ++vcurrents[i], ++i) { + if (vcurrents[i] != vends[i]) { + rgw_bi_log_entry& entry = *(vcurrents[i]); + if (has_shards) { + // Put the shard name as part of the ID, so that caller can easy find out + // the next marker + string tmp_id; + build_bucket_index_marker(vnames[i], entry.id, &tmp_id); + entry.id.swap(tmp_id); + } + marker_mgr.add(vnames[i], entry.id); + result.push_back(entry); + has_more = true; + } + } + } - std::list::iterator iter; - for (iter = entries.begin(); iter != entries.end(); ++iter) { - result.push_back(*iter); + for (size_t i = 0; i < vcurrents.size(); ++i) { + if (truncated) { + *truncated = (*truncated || (vcurrents[i] != vends[i])); + } + } + + // Refresh marker, if there are multiple shards, the output will look like + // '{shard_oid_1}#{shard_marker_1},{shard_oid_2}#{shard_marker_2}...', + // if there is no sharding, the simply marker (without oid) is returned + if (has_shards) { + marker_mgr.to_string(&marker); + } else { + marker = result.rbegin()->id; } return 0; diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index a8f7c0b6de85c..b501fa450b2b0 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1261,6 +1261,8 @@ class RGWRados template int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, map& bucket_objs); + void build_bucket_index_marker(const string& shard_name, const string& shard_marker, + string *marker); struct GetObjState { librados::IoCtx io_ctx; -- 2.39.5