From: Guang Yang Date: Wed, 24 Sep 2014 06:21:28 +0000 (+0000) Subject: Adjust bi log trim implementation to work with multiple bucket shards. X-Git-Tag: v0.92~12^2~28 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=9e45a7cd3681dfb12aaa9689572de48eb0973e77;p=ceph.git Adjust bi log trim implementation to work with multiple bucket shards. Signed-off-by: Guang Yang (yguang@yahoo-inc.com) --- diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index 0632b3a7355c..c0c5d7c7731e 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -48,11 +48,19 @@ void BucketIndexAioManager::do_completion(int id) { completions.push_back(iter->second); pendings.erase(iter); + // If the caller needs a list of finished objects, store them + // for further processing + map::iterator miter = pending_objs.find(id); + if (miter != pending_objs.end()) { + completion_objs.push_back(miter->second); + pending_objs.erase(miter); + } + cond.Signal(); } bool BucketIndexAioManager::wait_for_completions(int valid_ret_code, - int *num_completions, int *ret_code) { + int *num_completions, int *ret_code, vector *objs) { lock.Lock(); if (pendings.empty() && completions.empty()) { lock.Unlock(); @@ -63,8 +71,12 @@ bool BucketIndexAioManager::wait_for_completions(int valid_ret_code, // Clear the completed AIOs list::iterator iter = completions.begin(); - for (; iter != completions.end(); ++iter) { + list::iterator liter = completion_objs.begin(); + for (; iter != completions.end() && liter != completion_objs.end(); ++iter, ++liter) { int r = (*iter)->get_return_value(); + if (objs && r == 0) { + objs->push_back(*liter); + } if (ret_code && (r < 0 && r != valid_ret_code)) (*ret_code) = r; (*iter)->release(); @@ -194,6 +206,24 @@ int CLSRGWIssueBILogList::issue_op() return issue_bi_log_list_op(io_ctx, iter->first, marker_mgr, max, &manager, &iter->second); } +static bool issue_bi_log_trim(librados::IoCtx& io_ctx, + string& oid, BucketIndexShardsManager& start_marker_mgr, + BucketIndexShardsManager& end_marker_mgr, BucketIndexAioManager *manager) { + bufferlist in; + cls_rgw_bi_log_trim_op call; + call.start_marker = start_marker_mgr.get(oid, ""); + call.end_marker = end_marker_mgr.get(oid, ""); + ::encode(call, in); + ObjectWriteOperation op; + op.exec("rgw", "bi_log_trim", in); + return manager->aio_operate(io_ctx, oid, &op); +} + +int CLSRGWIssueBILogTrim::issue_op() +{ + return issue_bi_log_trim(io_ctx, *iter, start_marker_mgr, end_marker_mgr, &manager); +} + static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager, struct rgw_cls_check_index_ret *pdata) { bufferlist in; @@ -275,28 +305,6 @@ int cls_rgw_get_dir_header_async(IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB return 0; } -int cls_rgw_bi_log_trim(IoCtx& io_ctx, string& oid, string& start_marker, string& end_marker) -{ - do { - int r; - bufferlist in, out; - cls_rgw_bi_log_trim_op call; - call.start_marker = start_marker; - call.end_marker = end_marker; - ::encode(call, in); - r = io_ctx.exec(oid, "rgw", "bi_log_trim", in, out); - - if (r == -ENODATA) - break; - - if (r < 0) - return r; - - } while (1); - - return 0; -} - int cls_rgw_usage_log_read(IoCtx& io_ctx, string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries, string& read_iter, map& usage, diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index e64d884caa99..14a702cc4296 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -31,6 +31,8 @@ class BucketIndexAioManager { private: map pendings; list completions; + map pending_objs; + list completion_objs; int next; Mutex lock; Cond cond; @@ -49,23 +51,29 @@ private: * Return next request ID. */ int get_next() { return next++; } - + /* * Add a new pending AIO completion instance. * * @param id - the request ID. * @param completion - the AIO completion instance. + * @param oid - the object id associated with the object, if it is NULL, we don't + * track the object id per callback. */ - void add_pending(int id, librados::AioCompletion* completion) { + void add_pending(int id, librados::AioCompletion* completion, string *oid = NULL) { Mutex::Locker l(lock); pendings[id] = completion; + if (oid) { + pending_objs[id] = *oid; + } } public: /* * Create a new instance. */ - BucketIndexAioManager() : pendings(), completions(), next(0), - lock("BucketIndexAioManager::lock"), cond() {} + BucketIndexAioManager() : pendings(), completions(), pending_objs(), completion_objs(), + next(0), lock("BucketIndexAioManager::lock"), cond() {} + /* * Do completion for the given AIO request. @@ -78,10 +86,12 @@ public: * valid_ret_code - valid AIO return code. * num_completions - number of completions. * ret_code - return code of failed AIO. + * objs - a list of objects that has been finished the AIO. * * Return false if there is no pending AIO, true otherwise. */ - bool wait_for_completions(int valid_ret_code, int *num_completions, int *ret_code); + bool wait_for_completions(int valid_ret_code, int *num_completions, int *ret_code, + vector *objs); /** * Do aio read operation. @@ -193,6 +203,12 @@ protected: virtual void cleanup() {} virtual int valid_ret_code() { return 0; } + // Return true if multiple rounds of OPs might be needed, this happens when + // OP needs to be re-send until a certain code is returned. + virtual bool need_multiple_rounds() { return false; } + // Add a new object to the end of the container. + virtual void add_object(const string& oid) {} + virtual void reset_container(vector& objs) {} public: CLSRGWConcurrentIO(librados::IoCtx& ioc, T& _objs_container, @@ -209,7 +225,8 @@ public: } int num_completions, r = 0; - while (manager.wait_for_completions(0, &num_completions, &r)) { + vector objs; + while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r, &objs)) { if (r >= 0 && ret >= 0) { for(int i = 0; i < num_completions && iter != objs_container.end(); ++i, ++iter) { int issue_ret = issue_op(); @@ -221,6 +238,11 @@ public: } else if (ret >= 0) { ret = r; } + if (need_multiple_rounds() && iter == objs_container.end() && !objs.empty()) { + // For those objects which need another round, use them to reset + // the container + reset_container(objs); + } } if (ret < 0) { @@ -303,6 +325,27 @@ public: marker_mgr(_marker_mgr), max(_max) {} }; +class CLSRGWIssueBILogTrim : public CLSRGWConcurrentIO > { + BucketIndexShardsManager& start_marker_mgr; + BucketIndexShardsManager& end_marker_mgr; +protected: + int issue_op(); + // Trim until -ENODATA is returned. + int valid_ret_code() { return -ENODATA; } + bool need_multiple_rounds() { return true; } + void add_object(const string& oid) { objs_container.push_back(oid); } + void reset_container(vector& objs) { + objs_container.swap(objs); + iter = objs_container.begin(); + objs.clear(); + } +public: + CLSRGWIssueBILogTrim(librados::IoCtx& io_ctx, BucketIndexShardsManager& _start_marker_mgr, + BucketIndexShardsManager& _end_marker_mgr, vector& _bucket_objs, uint32_t max_aio) : + CLSRGWConcurrentIO >(io_ctx, _bucket_objs, max_aio), + start_marker_mgr(_start_marker_mgr), end_marker_mgr(_end_marker_mgr) {} +}; + /** * Check the bucket index. * @@ -346,8 +389,6 @@ void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist void cls_rgw_suggest_changes(librados::ObjectWriteOperation& o, bufferlist& updates); -int cls_rgw_bi_log_trim(librados::IoCtx& io_ctx, string& oid, string& start_marker, string& end_marker); - /* usage logging */ int cls_rgw_usage_log_read(librados::IoCtx& io_ctx, string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries, diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 93c2f9edd238..a90a44212272 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -6187,16 +6187,23 @@ int RGWRados::list_bi_log_entries(rgw_bucket& bucket, string& marker, uint32_t m int RGWRados::trim_bi_log_entries(rgw_bucket& bucket, string& start_marker, string& end_marker) { librados::IoCtx index_ctx; - string oid; - int r = open_bucket_index(bucket, index_ctx, oid); + vector bucket_objs; + int r = open_bucket_index(bucket, index_ctx, bucket_objs); if (r < 0) return r; - int ret = cls_rgw_bi_log_trim(index_ctx, oid, start_marker, end_marker); - if (ret < 0) - return ret; + bool has_shards = bucket_objs.size() > 1; + BucketIndexShardsManager start_marker_mgr; + r = start_marker_mgr.from_string(start_marker, has_shards, bucket_objs.front()); + if (r < 0) + return r; + BucketIndexShardsManager end_marker_mgr; + r = end_marker_mgr.from_string(end_marker, has_shards, bucket_objs.front()); + if (r < 0) + return r; - return 0; + return CLSRGWIssueBILogTrim(index_ctx, start_marker_mgr, end_marker_mgr, bucket_objs, + cct->_conf->rgw_bucket_index_max_aio)(); } int RGWRados::gc_operate(string& oid, librados::ObjectWriteOperation *op)