From: Yehuda Sadeh Date: Thu, 22 Jan 2015 01:30:32 +0000 (-0800) Subject: Revert "Revert "Merge remote-tracking branch 'origin/wip-bi-sharding-3' into next"" X-Git-Tag: v0.93~222 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=6f44f7a0a9847e419ce2783164633efa71218380;p=ceph.git Revert "Revert "Merge remote-tracking branch 'origin/wip-bi-sharding-3' into next"" Following a merge of next to master, the feature got reverted (because it was reverted on next). Undoing. This reverts commit 6613358ddc5339c8e33c409387fd6044db0b6f26. --- diff --git a/src/cls/rgw/cls_rgw.cc b/src/cls/rgw/cls_rgw.cc index eb4a4232d189..6198d62810ba 100644 --- a/src/cls/rgw/cls_rgw.cc +++ b/src/cls/rgw/cls_rgw.cc @@ -814,8 +814,6 @@ static int bi_log_iterate_entries(cls_method_context_t hctx, const string& marke map keys; string filter_prefix, end_key; - bufferlist start_bl; - bool start_key_added = false; uint32_t i = 0; string key; @@ -829,10 +827,6 @@ static int bi_log_iterate_entries(cls_method_context_t hctx, const string& marke key.append(marker); start_key = key; - int ret = cls_cxx_map_get_val(hctx, start_key, &start_bl); - if ((ret < 0) && (ret != -ENOENT)) { - return ret; - } } else { start_key = key_iter; } @@ -856,10 +850,6 @@ static int bi_log_iterate_entries(cls_method_context_t hctx, const string& marke if (ret < 0) return ret; - if ((start_bl.length() > 0) && (!start_key_added)) { - keys[start_key] = start_bl; - start_key_added = true; - } map::iterator iter = keys.begin(); if (iter == keys.end()) break; diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index c13c1a1559c6..545b36bcff56 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -11,19 +11,131 @@ using namespace librados; +const string BucketIndexShardsManager::KEY_VALUE_SEPARATOR = "#"; +const string BucketIndexShardsManager::SHARDS_SEPARATOR = ","; + +/** + * This class represents the bucket index object operation callback context. + */ +template +class ClsBucketIndexOpCtx : public ObjectOperationCompletion { +private: + T *data; + int *ret_code; +public: + ClsBucketIndexOpCtx(T* _data, int *_ret_code) : data(_data), ret_code(_ret_code) { assert(data); } + ~ClsBucketIndexOpCtx() {} + void handle_completion(int r, bufferlist& outbl) { + if (r >= 0) { + try { + bufferlist::iterator iter = outbl.begin(); + ::decode((*data), iter); + } catch (buffer::error& err) { + r = -EIO; + } + } + if (ret_code) { + *ret_code = r; + } + } +}; + +void BucketIndexAioManager::do_completion(int id) { + Mutex::Locker l(lock); + + map::iterator iter = pendings.find(id); + assert(iter != pendings.end()); + completions[id] = iter->second; + pendings.erase(iter); + + // If the caller needs a list of finished objects, store them + // for further processing + map::iterator miter = pending_objs.find(id); + if (miter != pending_objs.end()) { + completion_objs[id] = miter->second; + pending_objs.erase(miter); + } + + cond.Signal(); +} + +bool BucketIndexAioManager::wait_for_completions(int valid_ret_code, + int *num_completions, int *ret_code, map *objs) { + lock.Lock(); + if (pendings.empty() && completions.empty()) { + lock.Unlock(); + return false; + } + + if (completions.empty()) { + // Wait for AIO completion + cond.Wait(lock); + } + + // Clear the completed AIOs + map::iterator iter = completions.begin(); + for (; iter != completions.end(); ++iter) { + int r = iter->second->get_return_value(); + if (objs && r == 0) { /* update list of successfully completed objs */ + map::iterator liter = completion_objs.find(iter->first); + if (liter != completion_objs.end()) { + (*objs)[liter->first] = liter->second; + } + } + if (ret_code && (r < 0 && r != valid_ret_code)) + (*ret_code) = r; + iter->second->release(); + } + if (num_completions) + (*num_completions) = completions.size(); + completions.clear(); + lock.Unlock(); + + return true; +} + void cls_rgw_bucket_init(ObjectWriteOperation& o) { bufferlist in; o.exec("rgw", "bucket_init_index", in); } -void cls_rgw_bucket_set_tag_timeout(ObjectWriteOperation& o, uint64_t tag_timeout) -{ +static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx, + const string& oid, BucketIndexAioManager *manager) { + bufferlist in; + librados::ObjectWriteOperation op; + op.create(true); + op.exec("rgw", "bucket_init_index", in); + return manager->aio_operate(io_ctx, oid, &op); +} + +static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx, + const string& oid, uint64_t timeout, BucketIndexAioManager *manager) { bufferlist in; struct rgw_cls_tag_timeout_op call; - call.tag_timeout = tag_timeout; + call.tag_timeout = timeout; ::encode(call, in); - o.exec("rgw", "bucket_set_tag_timeout", in); + ObjectWriteOperation op; + op.exec("rgw", "bucket_set_tag_timeout", in); + return manager->aio_operate(io_ctx, oid, &op); +} + +int CLSRGWIssueBucketIndexInit::issue_op(int shard_id, const string& oid) +{ + return issue_bucket_index_init_op(io_ctx, oid, &manager); +} + +void CLSRGWIssueBucketIndexInit::cleanup() +{ + // Do best effort removal + for (map::iterator citer = objs_container.begin(); citer != iter; ++citer) { + io_ctx.remove(citer->second); + } +} + +int CLSRGWIssueSetTagTimeout::issue_op(int shard_id, const string& oid) +{ + return issue_bucket_set_tag_timeout_op(io_ctx, oid, tag_timeout, &manager); } void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag, @@ -59,70 +171,89 @@ void cls_rgw_bucket_complete_op(ObjectWriteOperation& o, RGWModifyOp op, string& o.exec("rgw", "bucket_complete_op", in); } - -int cls_rgw_list_op(IoCtx& io_ctx, string& oid, string& start_obj, - string& filter_prefix, uint32_t num_entries, - rgw_bucket_dir *dir, bool *is_truncated) -{ - bufferlist in, out; +static bool issue_bucket_list_op(librados::IoCtx& io_ctx, + const string& oid, const string& start_obj, const string& filter_prefix, + uint32_t num_entries, BucketIndexAioManager *manager, + struct rgw_cls_list_ret *pdata) { + bufferlist in; struct rgw_cls_list_op call; call.start_obj = start_obj; call.filter_prefix = filter_prefix; call.num_entries = num_entries; ::encode(call, in); - int r = io_ctx.exec(oid, "rgw", "bucket_list", in, out); - if (r < 0) - return r; - struct rgw_cls_list_ret ret; - try { - bufferlist::iterator iter = out.begin(); - ::decode(ret, iter); - } catch (buffer::error& err) { - return -EIO; - } + librados::ObjectReadOperation op; + op.exec("rgw", "bucket_list", in, new ClsBucketIndexOpCtx(pdata, NULL)); + return manager->aio_operate(io_ctx, oid, &op); +} - if (dir) - *dir = ret.dir; - if (is_truncated) - *is_truncated = ret.is_truncated; +int CLSRGWIssueBucketList::issue_op(int shard_id, const string& oid) +{ + return issue_bucket_list_op(io_ctx, oid, start_obj, filter_prefix, num_entries, &manager, &result[shard_id]); +} - return r; +static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, const string& oid, int shard_id, + BucketIndexShardsManager& marker_mgr, uint32_t max, BucketIndexAioManager *manager, + struct cls_rgw_bi_log_list_ret *pdata) { + bufferlist in; + cls_rgw_bi_log_list_op call; + call.marker = marker_mgr.get(shard_id, ""); + call.max = max; + ::encode(call, in); + + librados::ObjectReadOperation op; + op.exec("rgw", "bi_log_list", in, new ClsBucketIndexOpCtx(pdata, NULL)); + return manager->aio_operate(io_ctx, oid, &op); } -int cls_rgw_bucket_check_index_op(IoCtx& io_ctx, string& oid, - rgw_bucket_dir_header *existing_header, - rgw_bucket_dir_header *calculated_header) +int CLSRGWIssueBILogList::issue_op(int shard_id, const string& oid) { - bufferlist in, out; - int r = io_ctx.exec(oid, "rgw", "bucket_check_index", in, out); - if (r < 0) - return r; + return issue_bi_log_list_op(io_ctx, oid, shard_id, marker_mgr, max, &manager, &result[shard_id]); +} - struct rgw_cls_check_index_ret ret; - try { - bufferlist::iterator iter = out.begin(); - ::decode(ret, iter); - } catch (buffer::error& err) { - return -EIO; - } +static bool issue_bi_log_trim(librados::IoCtx& io_ctx, const string& oid, int shard_id, + BucketIndexShardsManager& start_marker_mgr, + BucketIndexShardsManager& end_marker_mgr, BucketIndexAioManager *manager) { + bufferlist in; + cls_rgw_bi_log_trim_op call; + call.start_marker = start_marker_mgr.get(shard_id, ""); + call.end_marker = end_marker_mgr.get(shard_id, ""); + ::encode(call, in); + ObjectWriteOperation op; + op.exec("rgw", "bi_log_trim", in); + return manager->aio_operate(io_ctx, oid, &op); +} - if (existing_header) - *existing_header = ret.existing_header; - if (calculated_header) - *calculated_header = ret.calculated_header; +int CLSRGWIssueBILogTrim::issue_op(int shard_id, const string& oid) +{ + return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager); +} - return 0; +static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager, + struct rgw_cls_check_index_ret *pdata) { + bufferlist in; + librados::ObjectReadOperation op; + op.exec("rgw", "bucket_check_index", in, new ClsBucketIndexOpCtx( + pdata, NULL)); + return manager->aio_operate(io_ctx, oid, &op); } -int cls_rgw_bucket_rebuild_index_op(IoCtx& io_ctx, string& oid) +int CLSRGWIssueBucketCheck::issue_op(int shard_id, const string& oid) { - bufferlist in, out; - int r = io_ctx.exec(oid, "rgw", "bucket_rebuild_index", in, out); - if (r < 0) - return r; + return issue_bucket_check_index_op(io_ctx, oid, &manager, &result[shard_id]); +} - return 0; +static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid, + BucketIndexAioManager *manager) { + bufferlist in; + librados::ObjectWriteOperation op; + op.exec("rgw", "bucket_rebuild_index", in); + return manager->aio_operate(io_ctx, oid, &op); +} + +int CLSRGWIssueBucketRebuild::issue_op(int shard_id, const string& oid) +{ + return issue_bucket_rebuild_index_op(io_ctx, oid, &manager); } void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates) @@ -136,28 +267,9 @@ void cls_rgw_suggest_changes(ObjectWriteOperation& o, bufferlist& updates) o.exec("rgw", "dir_suggest_changes", updates); } -int cls_rgw_get_dir_header(IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header) +int CLSRGWIssueGetDirHeader::issue_op(int shard_id, const string& oid) { - bufferlist in, out; - struct rgw_cls_list_op call; - call.num_entries = 0; - ::encode(call, in); - int r = io_ctx.exec(oid, "rgw", "bucket_list", in, out); - if (r < 0) - return r; - - struct rgw_cls_list_ret ret; - try { - bufferlist::iterator iter = out.begin(); - ::decode(ret, iter); - } catch (buffer::error& err) { - return -EIO; - } - - if (header) - *header = ret.dir.header; - - return r; + return issue_bucket_list_op(io_ctx, oid, "", "", 0, &manager, &result[shard_id]); } class GetDirHeaderCompletion : public ObjectOperationCompletion { @@ -198,56 +310,6 @@ int cls_rgw_get_dir_header_async(IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB return 0; } -int cls_rgw_bi_log_list(IoCtx& io_ctx, string& oid, string& marker, uint32_t max, - list& entries, bool *truncated) -{ - bufferlist in, out; - cls_rgw_bi_log_list_op call; - call.marker = marker; - call.max = max; - ::encode(call, in); - int r = io_ctx.exec(oid, "rgw", "bi_log_list", in, out); - if (r < 0) - return r; - - cls_rgw_bi_log_list_ret ret; - try { - bufferlist::iterator iter = out.begin(); - ::decode(ret, iter); - } catch (buffer::error& err) { - return -EIO; - } - - entries = ret.entries; - - if (truncated) - *truncated = ret.truncated; - - return r; -} - -int cls_rgw_bi_log_trim(IoCtx& io_ctx, string& oid, string& start_marker, string& end_marker) -{ - do { - int r; - bufferlist in, out; - cls_rgw_bi_log_trim_op call; - call.start_marker = start_marker; - call.end_marker = end_marker; - ::encode(call, in); - r = io_ctx.exec(oid, "rgw", "bi_log_trim", in, out); - - if (r == -ENODATA) - break; - - if (r < 0) - return r; - - } while (1); - - return 0; -} - int cls_rgw_usage_log_read(IoCtx& io_ctx, string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries, string& read_iter, map& usage, diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index c6b5b757fa84..79de35825eff 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -2,20 +2,305 @@ #define CEPH_CLS_RGW_CLIENT_H #include "include/types.h" +#include "include/str_list.h" #include "include/rados/librados.hpp" #include "cls_rgw_types.h" +#include "cls_rgw_ops.h" #include "common/RefCountedObj.h" +// Forward declaration +class BucketIndexAioManager; + +/* + * Bucket index AIO request argument, this is used to pass a argument + * to callback. + */ +struct BucketIndexAioArg : public RefCountedObject { + BucketIndexAioArg(int _id, BucketIndexAioManager* _manager) : + id(_id), manager(_manager) {} + int id; + BucketIndexAioManager* manager; +}; + +/* + * This class manages AIO completions. This class is not completely thread-safe, + * methods like *get_next* is not thread-safe and is expected to be called from + * within one thread. + */ +class BucketIndexAioManager { +private: + map pendings; + map completions; + map pending_objs; + map completion_objs; + int next; + Mutex lock; + Cond cond; + /* + * Callback implementation for AIO request. + */ + static void bucket_index_op_completion_cb(void* cb, void* arg) { + BucketIndexAioArg* cb_arg = (BucketIndexAioArg*) arg; + cb_arg->manager->do_completion(cb_arg->id); + cb_arg->put(); + } + + /* + * Get next request ID. This method is not thread-safe. + * + * Return next request ID. + */ + int get_next() { return next++; } + + /* + * Add a new pending AIO completion instance. + * + * @param id - the request ID. + * @param completion - the AIO completion instance. + * @param oid - the object id associated with the object, if it is NULL, we don't + * track the object id per callback. + */ + void add_pending(int id, librados::AioCompletion* completion, const string& oid) { + pendings[id] = completion; + pending_objs[id] = oid; + } +public: + /* + * Create a new instance. + */ + BucketIndexAioManager() : next(0), lock("BucketIndexAioManager::lock") {} + + + /* + * Do completion for the given AIO request. + */ + void do_completion(int id); + + /* + * Wait for AIO completions. + * + * valid_ret_code - valid AIO return code. + * num_completions - number of completions. + * ret_code - return code of failed AIO. + * objs - a list of objects that has been finished the AIO. + * + * Return false if there is no pending AIO, true otherwise. + */ + bool wait_for_completions(int valid_ret_code, int *num_completions, int *ret_code, + map *objs); + + /** + * Do aio read operation. + */ + bool aio_operate(librados::IoCtx& io_ctx, const string& oid, librados::ObjectReadOperation *op) { + Mutex::Locker l(lock); + BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this); + librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb); + int r = io_ctx.aio_operate(oid, c, (librados::ObjectReadOperation*)op, NULL); + if (r >= 0) { + add_pending(arg->id, c, oid); + } + return r; + } + + /** + * Do aio write operation. + */ + bool aio_operate(librados::IoCtx& io_ctx, const string& oid, librados::ObjectWriteOperation *op) { + Mutex::Locker l(lock); + BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this); + librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb); + int r = io_ctx.aio_operate(oid, c, (librados::ObjectWriteOperation*)op); + if (r >= 0) { + add_pending(arg->id, c, oid); + } + return r; + } +}; + class RGWGetDirHeader_CB : public RefCountedObject { public: virtual ~RGWGetDirHeader_CB() {} virtual void handle_response(int r, rgw_bucket_dir_header& header) = 0; }; +class BucketIndexShardsManager { +private: + // Per shard setting manager, for example, marker. + map value_by_shards; +public: + const static string KEY_VALUE_SEPARATOR; + const static string SHARDS_SEPARATOR; + + void add(int shard, const string& value) { + value_by_shards[shard] = value; + } + + const string& get(int shard, const string& default_value) { + map::iterator iter = value_by_shards.find(shard); + return (iter == value_by_shards.end() ? default_value : iter->second); + } + + map& get() { + return value_by_shards; + } + + bool empty() { + return value_by_shards.empty(); + } + + void to_string(string *out) const { + if (!out) { + return; + } + out->clear(); + map::const_iterator iter = value_by_shards.begin(); + for (; iter != value_by_shards.end(); ++iter) { + if (out->length()) { + // Not the first item, append a separator first + out->append(SHARDS_SEPARATOR); + } + char buf[16]; + snprintf(buf, sizeof(buf), "%d", iter->first); + out->append(buf); + out->append(KEY_VALUE_SEPARATOR); + out->append(iter->second); + } + } + + static bool is_shards_marker(const string& marker) { + return marker.find(KEY_VALUE_SEPARATOR) != string::npos; + } + + /* + * convert from string. There are two options of how the string looks like: + * + * 1. Single shard, no shard id specified, e.g. 000001.23.1 + * + * for this case, if passed shard_id >= 0, use this shard id, otherwise assume that it's a + * bucket with no shards. + * + * 2. One or more shards, shard id specified for each shard, e.g., 0#00002.12,1#00003.23.2 + * + */ + int from_string(const string& composed_marker, int shard_id) { + value_by_shards.clear(); + vector shards; + get_str_vec(composed_marker, SHARDS_SEPARATOR.c_str(), shards); + if (shards.size() > 1 && shard_id >= 0) { + return -EINVAL; + } + vector::const_iterator iter = shards.begin(); + for (; iter != shards.end(); ++iter) { + size_t pos = iter->find(KEY_VALUE_SEPARATOR); + if (pos == string::npos) { + if (!value_by_shards.empty()) { + return -EINVAL; + } + if (shard_id < 0) { + add(0, *iter); + } else { + add(shard_id, *iter); + } + return 0; + } + string shard_str = iter->substr(0, pos); + string err; + int shard = (int)strict_strtol(shard_str.c_str(), 10, &err); + if (!err.empty()) { + return -EINVAL; + } + add(shard, iter->substr(pos + 1)); + } + return 0; + } +}; + /* bucket index */ void cls_rgw_bucket_init(librados::ObjectWriteOperation& o); -void cls_rgw_bucket_set_tag_timeout(librados::ObjectWriteOperation& o, uint64_t tag_timeout); +class CLSRGWConcurrentIO { +protected: + librados::IoCtx& io_ctx; + map& objs_container; + map::iterator iter; + uint32_t max_aio; + BucketIndexAioManager manager; + + virtual int issue_op(int shard_id, const string& oid) = 0; + + virtual void cleanup() {} + virtual int valid_ret_code() { return 0; } + // Return true if multiple rounds of OPs might be needed, this happens when + // OP needs to be re-send until a certain code is returned. + virtual bool need_multiple_rounds() { return false; } + // Add a new object to the end of the container. + virtual void add_object(int shard, const string& oid) {} + virtual void reset_container(map& objs) {} + +public: + CLSRGWConcurrentIO(librados::IoCtx& ioc, map& _objs_container, + uint32_t _max_aio) : io_ctx(ioc), objs_container(_objs_container), max_aio(_max_aio) {} + virtual ~CLSRGWConcurrentIO() {} + + int operator()() { + int ret = 0; + iter = objs_container.begin(); + for (; iter != objs_container.end() && max_aio-- > 0; ++iter) { + ret = issue_op(iter->first, iter->second); + if (ret < 0) + break; + } + + int num_completions, r = 0; + map objs; + map *pobjs = (need_multiple_rounds() ? &objs : NULL); + while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r, pobjs)) { + if (r >= 0 && ret >= 0) { + for(int i = 0; i < num_completions && iter != objs_container.end(); ++i, ++iter) { + int issue_ret = issue_op(iter->first, iter->second); + if(issue_ret < 0) { + ret = issue_ret; + break; + } + } + } else if (ret >= 0) { + ret = r; + } + if (need_multiple_rounds() && iter == objs_container.end() && !objs.empty()) { + // For those objects which need another round, use them to reset + // the container + reset_container(objs); + } + } + + if (ret < 0) { + cleanup(); + } + return ret; + } +}; + +class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO { +protected: + int issue_op(int shard_id, const string& oid); + int valid_ret_code() { return -EEXIST; } + void cleanup(); +public: + CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc, map& _bucket_objs, + uint32_t _max_aio) : + CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {} +}; + +class CLSRGWIssueSetTagTimeout : public CLSRGWConcurrentIO { + uint64_t tag_timeout; +protected: + int issue_op(int shard_id, const string& oid); +public: + CLSRGWIssueSetTagTimeout(librados::IoCtx& ioc, map& _bucket_objs, + uint32_t _max_aio, uint64_t _tag_timeout) : + CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio), tag_timeout(_tag_timeout) {} +}; void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag, string& name, string& locator, bool log_op); @@ -24,28 +309,118 @@ void cls_rgw_bucket_complete_op(librados::ObjectWriteOperation& o, RGWModifyOp o rgw_bucket_entry_ver& ver, string& name, rgw_bucket_dir_entry_meta& dir_meta, list *remove_objs, bool log_op); -int cls_rgw_list_op(librados::IoCtx& io_ctx, string& oid, string& start_obj, - string& filter_prefix, uint32_t num_entries, - rgw_bucket_dir *dir, bool *is_truncated); +/** + * List the bucket with the starting object and filter prefix. + * NOTE: this method do listing requests for each bucket index shards identified by + * the keys of the *list_results* map, which means the map should be popludated + * by the caller to fill with each bucket index object id. + * + * io_ctx - IO context for rados. + * start_obj - marker for the listing. + * filter_prefix - filter prefix. + * num_entries - number of entries to request for each object (note the total + * amount of entries returned depends on the number of shardings). + * list_results - the list results keyed by bucket index object id. + * max_aio - the maximum number of AIO (for throttling). + * + * Return 0 on success, a failure code otherwise. +*/ + +class CLSRGWIssueBucketList : public CLSRGWConcurrentIO { + string start_obj; + string filter_prefix; + uint32_t num_entries; + map& result; +protected: + int issue_op(int shard_id, const string& oid); +public: + CLSRGWIssueBucketList(librados::IoCtx& io_ctx, const string& _start_obj, + const string& _filter_prefix, uint32_t _num_entries, + map& oids, + map& list_results, + uint32_t max_aio) : + CLSRGWConcurrentIO(io_ctx, oids, max_aio), + start_obj(_start_obj), filter_prefix(_filter_prefix), num_entries(_num_entries), result(list_results) {} +}; + +class CLSRGWIssueBILogList : public CLSRGWConcurrentIO { + map& result; + BucketIndexShardsManager& marker_mgr; + uint32_t max; +protected: + int issue_op(int shard_id, const string& oid); +public: + CLSRGWIssueBILogList(librados::IoCtx& io_ctx, BucketIndexShardsManager& _marker_mgr, uint32_t _max, + map& oids, + map& bi_log_lists, uint32_t max_aio) : + CLSRGWConcurrentIO(io_ctx, oids, max_aio), result(bi_log_lists), + marker_mgr(_marker_mgr), max(_max) {} +}; + +class CLSRGWIssueBILogTrim : public CLSRGWConcurrentIO { + BucketIndexShardsManager& start_marker_mgr; + BucketIndexShardsManager& end_marker_mgr; +protected: + int issue_op(int shard_id, const string& oid); + // Trim until -ENODATA is returned. + int valid_ret_code() { return -ENODATA; } + bool need_multiple_rounds() { return true; } + void add_object(int shard, const string& oid) { objs_container[shard] = oid; } + void reset_container(map& objs) { + objs_container.swap(objs); + iter = objs_container.begin(); + objs.clear(); + } +public: + CLSRGWIssueBILogTrim(librados::IoCtx& io_ctx, BucketIndexShardsManager& _start_marker_mgr, + BucketIndexShardsManager& _end_marker_mgr, map& _bucket_objs, uint32_t max_aio) : + CLSRGWConcurrentIO(io_ctx, _bucket_objs, max_aio), + start_marker_mgr(_start_marker_mgr), end_marker_mgr(_end_marker_mgr) {} +}; + +/** + * Check the bucket index. + * + * io_ctx - IO context for rados. + * bucket_objs_ret - check result for all shards. + * max_aio - the maximum number of AIO (for throttling). + * + * Return 0 on success, a failure code otherwise. + */ +class CLSRGWIssueBucketCheck : public CLSRGWConcurrentIO /* >*/ { + map& result; +protected: + int issue_op(int shard_id, const string& oid); +public: + CLSRGWIssueBucketCheck(librados::IoCtx& ioc, map& oids, map& bucket_objs_ret, + uint32_t _max_aio) : + CLSRGWConcurrentIO(ioc, oids, _max_aio), result(bucket_objs_ret) {} +}; + +class CLSRGWIssueBucketRebuild : public CLSRGWConcurrentIO { +protected: + int issue_op(int shard_id, const string& oid); +public: + CLSRGWIssueBucketRebuild(librados::IoCtx& io_ctx, map& bucket_objs, + uint32_t max_aio) : CLSRGWConcurrentIO(io_ctx, bucket_objs, max_aio) {} +}; + +class CLSRGWIssueGetDirHeader : public CLSRGWConcurrentIO { + map& result; +protected: + int issue_op(int shard_id, const string& oid); +public: + CLSRGWIssueGetDirHeader(librados::IoCtx& io_ctx, map& oids, map& dir_headers, + uint32_t max_aio) : + CLSRGWConcurrentIO(io_ctx, oids, max_aio), result(dir_headers) {} +}; -int cls_rgw_bucket_check_index_op(librados::IoCtx& io_ctx, string& oid, - rgw_bucket_dir_header *existing_header, - rgw_bucket_dir_header *calculated_header); -int cls_rgw_bucket_rebuild_index_op(librados::IoCtx& io_ctx, string& oid); - -int cls_rgw_get_dir_header(librados::IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header); int cls_rgw_get_dir_header_async(librados::IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx); void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates); void cls_rgw_suggest_changes(librados::ObjectWriteOperation& o, bufferlist& updates); -/* bucket index log */ - -int cls_rgw_bi_log_list(librados::IoCtx& io_ctx, string& oid, string& marker, uint32_t max, - list& entries, bool *truncated); -int cls_rgw_bi_log_trim(librados::IoCtx& io_ctx, string& oid, string& start_marker, string& end_marker); - /* usage logging */ int cls_rgw_usage_log_read(librados::IoCtx& io_ctx, string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries, diff --git a/src/common/config_opts.h b/src/common/config_opts.h index bb627cea2cbf..bd053b1d372d 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -883,6 +883,22 @@ OPTION(nss_db_path, OPT_STR, "") // path to nss db OPTION(rgw_max_chunk_size, OPT_INT, 512 * 1024) +/** + * override max bucket index shards in zone configuration (if not zero) + * + * Represents the number of shards for the bucket index object, a value of zero + * indicates there is no sharding. By default (no sharding, the name of the object + * is '.dir.{marker}', with sharding, the name is '.dir.{markder}.{sharding_id}', + * sharding_id is zero-based value. It is not recommended to set a too large value + * (e.g. thousand) as it increases the cost for bucket listing. + */ +OPTION(rgw_override_bucket_index_max_shards, OPT_U32, 0) + +/** + * Represents the maximum AIO pending requests for the bucket index object shards. + */ +OPTION(rgw_bucket_index_max_aio, OPT_U32, 8) + OPTION(rgw_data, OPT_STR, "/var/lib/ceph/radosgw/$cluster-$id") OPTION(rgw_enable_apis, OPT_STR, "s3, swift, swift_auth, admin") OPTION(rgw_cache_enabled, OPT_BOOL, true) // rgw cache enabled diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 2c775ca1e25f..03b51a59685e 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -519,7 +519,7 @@ int bucket_stats(rgw_bucket& bucket, Formatter *formatter) return r; map stats; - uint64_t bucket_ver, master_ver; + string bucket_ver, master_ver; string max_marker; int ret = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, stats, &max_marker); if (ret < 0) { @@ -535,8 +535,8 @@ int bucket_stats(rgw_bucket& bucket, Formatter *formatter) formatter->dump_string("marker", bucket.marker); formatter->dump_string("owner", bucket_info.owner); formatter->dump_int("mtime", mtime); - formatter->dump_int("ver", bucket_ver); - formatter->dump_int("master_ver", master_ver); + formatter->dump_string("ver", bucket_ver); + formatter->dump_string("master_ver", master_ver); formatter->dump_string("max_marker", max_marker); dump_bucket_usage(stats, formatter); formatter->close_section(); @@ -2350,7 +2350,7 @@ next: do { list entries; - ret = store->list_bi_log_entries(bucket, marker, max_entries - count, entries, &truncated); + ret = store->list_bi_log_entries(bucket, shard_id, marker, max_entries - count, entries, &truncated); if (ret < 0) { cerr << "ERROR: list_bi_log_entries(): " << cpp_strerror(-ret) << std::endl; return -ret; @@ -2382,7 +2382,7 @@ next: cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl; return -ret; } - ret = store->trim_bi_log_entries(bucket, start_marker, end_marker); + ret = store->trim_bi_log_entries(bucket, shard_id, start_marker, end_marker); if (ret < 0) { cerr << "ERROR: trim_bi_log_entries(): " << cpp_strerror(-ret) << std::endl; return -ret; @@ -2565,7 +2565,7 @@ next: } RGWReplicaBucketLogger logger(store); - ret = logger.get_bounds(bucket, bounds); + ret = logger.get_bounds(bucket, shard_id, bounds); if (ret < 0) return -ret; } else { // shouldn't get here @@ -2616,7 +2616,7 @@ next: } RGWReplicaBucketLogger logger(store); - ret = logger.delete_bound(bucket, daemon_id); + ret = logger.delete_bound(bucket, shard_id, daemon_id); if (ret < 0) return -ret; } diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index 4afe1ae10192..48abc4d72e66 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -233,6 +233,32 @@ int rgw_bucket_instance_remove_entry(RGWRados *store, string& entry, RGWObjVersi return store->meta_mgr->remove_entry(bucket_instance_meta_handler, entry, objv_tracker); } +int rgw_bucket_parse_bucket_instance(const string& bucket_instance, string *target_bucket_instance, int *shard_id) +{ + ssize_t pos = bucket_instance.rfind(':'); + if (pos < 0) { + return -EINVAL; + } + + string first = bucket_instance.substr(0, pos); + string second = bucket_instance.substr(pos + 1); + + if (first.find(':') == string::npos) { + *shard_id = -1; + *target_bucket_instance = bucket_instance; + return 0; + } + + *target_bucket_instance = first; + string err; + *shard_id = strict_strtol(second.c_str(), 10, &err); + if (!err.empty()) { + return -EINVAL; + } + + return 0; +} + int rgw_bucket_set_attrs(RGWRados *store, RGWBucketInfo& bucket_info, map& attrs, map* rmattrs, @@ -358,7 +384,7 @@ int rgw_remove_bucket(RGWRados *store, const string& bucket_owner, rgw_bucket& b RGWBucketInfo info; bufferlist bl; - uint64_t bucket_ver, master_ver; + string bucket_ver, master_ver; ret = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, stats, NULL); if (ret < 0) @@ -729,9 +755,9 @@ int RGWBucket::check_object_index(RGWBucketAdminOpState& op_state, while (is_truncated) { map result; - int r = store->cls_bucket_list(bucket, marker, prefix, 1000, result, - &is_truncated, &marker, - bucket_object_check_filter); + int r = store->cls_bucket_list(bucket, marker, prefix, 1000, + result, &is_truncated, &marker, + bucket_object_check_filter); if (r == -ENOENT) { break; @@ -957,7 +983,7 @@ static int bucket_stats(RGWRados *store, std::string& bucket_name, Formatter *f bucket = bucket_info.bucket; - uint64_t bucket_ver, master_ver; + string bucket_ver, master_ver; string max_marker; int ret = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, stats, &max_marker); if (ret < 0) { @@ -972,8 +998,8 @@ static int bucket_stats(RGWRados *store, std::string& bucket_name, Formatter *f formatter->dump_string("id", bucket.bucket_id); formatter->dump_string("marker", bucket.marker); formatter->dump_string("owner", bucket_info.owner); - formatter->dump_int("ver", bucket_ver); - formatter->dump_int("master_ver", master_ver); + formatter->dump_string("ver", bucket_ver); + formatter->dump_string("master_ver", master_ver); formatter->dump_int("mtime", mtime); formatter->dump_string("max_marker", max_marker); dump_bucket_usage(stats, formatter); @@ -1076,9 +1102,10 @@ void rgw_data_change::dump(Formatter *f) const } -int RGWDataChangesLog::choose_oid(rgw_bucket& bucket) { - string& name = bucket.name; - uint32_t r = ceph_str_hash_linux(name.c_str(), name.size()) % num_shards; +int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) { + const string& name = bs.bucket.name; + int shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0); + uint32_t r = (ceph_str_hash_linux(name.c_str(), name.size()) + shard_shift) % num_shards; return (int)r; } @@ -1090,19 +1117,22 @@ int RGWDataChangesLog::renew_entries() /* we can't keep the bucket name as part of the cls_log_entry, and we need * it later, so we keep two lists under the map */ - map, list > > m; + map, list > > m; lock.Lock(); - map entries; + map entries; entries.swap(cur_cycle); lock.Unlock(); - map::iterator iter; + map::iterator iter; string section; utime_t ut = ceph_clock_now(cct); for (iter = entries.begin(); iter != entries.end(); ++iter) { - rgw_bucket& bucket = iter->second; - int index = choose_oid(bucket); + const rgw_bucket_shard& bs = iter->first; + const rgw_bucket& bucket = bs.bucket; + int shard_id = bs.shard_id; + + int index = choose_oid(bs); cls_log_entry entry; @@ -1110,16 +1140,21 @@ int RGWDataChangesLog::renew_entries() bufferlist bl; change.entity_type = ENTITY_TYPE_BUCKET; change.key = bucket.name + ":" + bucket.bucket_id; + if (shard_id >= 0) { + char buf[16]; + snprintf(buf, sizeof(buf), ":%d", shard_id); + change.key += buf; + } change.timestamp = ut; ::encode(change, bl); store->time_log_prepare_entry(entry, ut, section, bucket.name, bl); - m[index].first.push_back(bucket.name); + m[index].first.push_back(bs); m[index].second.push_back(entry); } - map, list > >::iterator miter; + map, list > >::iterator miter; for (miter = m.begin(); miter != m.end(); ++miter) { list& entries = miter->second.second; @@ -1136,8 +1171,8 @@ int RGWDataChangesLog::renew_entries() utime_t expiration = now; expiration += utime_t(cct->_conf->rgw_data_log_window, 0); - list& buckets = miter->second.first; - list::iterator liter; + list& buckets = miter->second.first; + list::iterator liter; for (liter = buckets.begin(); liter != buckets.end(); ++liter) { update_renewed(*liter, expiration); } @@ -1146,39 +1181,41 @@ int RGWDataChangesLog::renew_entries() return 0; } -void RGWDataChangesLog::_get_change(string& bucket_name, ChangeStatusPtr& status) +void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status) { assert(lock.is_locked()); - if (!changes.find(bucket_name, status)) { + if (!changes.find(bs, status)) { status = ChangeStatusPtr(new ChangeStatus); - changes.add(bucket_name, status); + changes.add(bs, status); } } -void RGWDataChangesLog::register_renew(rgw_bucket& bucket) +void RGWDataChangesLog::register_renew(rgw_bucket_shard& bs) { Mutex::Locker l(lock); - cur_cycle[bucket.name] = bucket; + cur_cycle[bs] = true; } -void RGWDataChangesLog::update_renewed(string& bucket_name, utime_t& expiration) +void RGWDataChangesLog::update_renewed(rgw_bucket_shard& bs, utime_t& expiration) { Mutex::Locker l(lock); ChangeStatusPtr status; - _get_change(bucket_name, status); + _get_change(bs, status); - ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bucket_name << " expiration=" << expiration << dendl; + ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bs.bucket.name << " shard_id=" << bs.shard_id << " expiration=" << expiration << dendl; status->cur_expiration = expiration; } -int RGWDataChangesLog::add_entry(rgw_bucket& bucket) { +int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) { if (!store->need_to_log_data()) return 0; + rgw_bucket_shard bs(bucket, shard_id); + lock.Lock(); ChangeStatusPtr status; - _get_change(bucket.name, status); + _get_change(bs, status); lock.Unlock(); @@ -1186,13 +1223,13 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket) { status->lock->Lock(); - ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl; + ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " shard_id=" << shard_id << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl; if (now < status->cur_expiration) { /* no need to send, recently completed */ status->lock->Unlock(); - register_renew(bucket); + register_renew(bs); return 0; } @@ -1209,7 +1246,7 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket) { int ret = cond->wait(); cond->put(); if (!ret) { - register_renew(bucket); + register_renew(bs); } return ret; } @@ -1217,7 +1254,7 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket) { status->cond = new RefCountedCond; status->pending = true; - string& oid = oids[choose_oid(bucket)]; + string& oid = oids[choose_oid(bs)]; utime_t expiration; int ret; @@ -1234,6 +1271,11 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket) { rgw_data_change change; change.entity_type = ENTITY_TYPE_BUCKET; change.key = bucket.name + ":" + bucket.bucket_id; + if (shard_id >= 0) { + char buf[16]; + snprintf(buf, sizeof(buf), ":%d", shard_id); + change.key += buf; + } change.timestamp = now; ::encode(change, bl); string section; @@ -1686,7 +1728,7 @@ public: objv_tracker = bci.info.objv_tracker; - ret = store->init_bucket_index(bci.info.bucket); + ret = store->init_bucket_index(bci.info.bucket, bci.info.num_shards); if (ret < 0) return ret; diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h index 3bdd68c057d0..d0c2f4b18493 100644 --- a/src/rgw/rgw_bucket.h +++ b/src/rgw/rgw_bucket.h @@ -32,6 +32,8 @@ extern int rgw_bucket_instance_store_info(RGWRados *store, string& oid, bufferli map *pattrs, RGWObjVersionTracker *objv_tracker, time_t mtime); +extern int rgw_bucket_parse_bucket_instance(const string& bucket_instance, string *target_bucket_instance, int *shard_id); + extern int rgw_bucket_instance_remove_entry(RGWRados *store, string& entry, RGWObjVersionTracker *objv_tracker); extern int rgw_bucket_delete_bucket_obj(RGWRados *store, string& bucket_name, RGWObjVersionTracker& objv_tracker); @@ -314,13 +316,13 @@ class RGWDataChangesLog { typedef ceph::shared_ptr ChangeStatusPtr; - lru_map changes; + lru_map changes; - map cur_cycle; + map cur_cycle; - void _get_change(string& bucket_name, ChangeStatusPtr& status); - void register_renew(rgw_bucket& bucket); - void update_renewed(string& bucket_name, utime_t& expiration); + void _get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status); + void register_renew(rgw_bucket_shard& bs); + void update_renewed(rgw_bucket_shard& bs, utime_t& expiration); class ChangesRenewThread : public Thread { CephContext *cct; @@ -362,8 +364,8 @@ public: ~RGWDataChangesLog(); - int choose_oid(rgw_bucket& bucket); - int add_entry(rgw_bucket& bucket); + int choose_oid(const rgw_bucket_shard& bs); + int add_entry(rgw_bucket& bucket, int shard_id); int renew_entries(); int list_entries(int shard, utime_t& start_time, utime_t& end_time, int max_entries, list& entries, diff --git a/src/rgw/rgw_common.cc b/src/rgw/rgw_common.cc index dfe3361a8399..a36d89de2a06 100644 --- a/src/rgw/rgw_common.cc +++ b/src/rgw/rgw_common.cc @@ -26,6 +26,8 @@ PerfCounters *perfcounter = NULL; +const uint32_t RGWBucketInfo::NUM_SHARDS_BLIND_BUCKET(UINT32_MAX); + int rgw_perf_start(CephContext *cct) { PerfCountersBuilder plb(cct, cct->_conf->name.to_str(), l_rgw_first, l_rgw_last); diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index d9175e8dc93e..a4c9b41ef167 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -140,6 +140,10 @@ using ceph::crypto::MD5; #define ERR_USER_SUSPENDED 2100 #define ERR_INTERNAL_ERROR 2200 +#ifndef UINT32_MAX +#define UINT32_MAX (4294967295) +#endif + typedef void *RGWAccessHandle; @@ -677,6 +681,25 @@ inline ostream& operator<<(ostream& out, const rgw_bucket &b) { return out; } +struct rgw_bucket_shard { + rgw_bucket bucket; + int shard_id; + + rgw_bucket_shard() : shard_id(-1) {} + rgw_bucket_shard(rgw_bucket& _b, int _sid) : bucket(_b), shard_id(_sid) {} + + bool operator<(const rgw_bucket_shard& b) const { + if (bucket < b.bucket) { + return true; + } + if (b.bucket < bucket) { + return false; + } + return shard_id < b.shard_id; + } +}; + + struct RGWObjVersionTracker { obj_version read_version; obj_version write_version; @@ -721,6 +744,10 @@ enum RGWBucketFlags { struct RGWBucketInfo { + enum BIShardsHashType { + MOD = 0 + }; + rgw_bucket bucket; string owner; uint32_t flags; @@ -732,8 +759,20 @@ struct RGWBucketInfo obj_version ep_objv; /* entry point object version, for runtime tracking only */ RGWQuotaInfo quota; + // Represents the number of bucket index object shards: + // - value of 0 indicates there is no sharding (this is by default before this + // feature is implemented). + // - value of UINT32_T::MAX indicates this is a blind bucket. + uint32_t num_shards; + + // Represents the bucket index shard hash type. + uint8_t bucket_index_shard_hash_type; + + // Represents the shard number for blind bucket. + const static uint32_t NUM_SHARDS_BLIND_BUCKET; + void encode(bufferlist& bl) const { - ENCODE_START(9, 4, bl); + ENCODE_START(11, 4, bl); ::encode(bucket, bl); ::encode(owner, bl); ::encode(flags, bl); @@ -743,6 +782,8 @@ struct RGWBucketInfo ::encode(placement_rule, bl); ::encode(has_instance_obj, bl); ::encode(quota, bl); + ::encode(num_shards, bl); + ::encode(bucket_index_shard_hash_type, bl); ENCODE_FINISH(bl); } void decode(bufferlist::iterator& bl) { @@ -765,6 +806,10 @@ struct RGWBucketInfo ::decode(has_instance_obj, bl); if (struct_v >= 9) ::decode(quota, bl); + if (struct_v >= 10) + ::decode(num_shards, bl); + if (struct_v >= 11) + ::decode(bucket_index_shard_hash_type, bl); DECODE_FINISH(bl); } void dump(Formatter *f) const; @@ -772,7 +817,7 @@ struct RGWBucketInfo void decode_json(JSONObj *obj); - RGWBucketInfo() : flags(0), creation_time(0), has_instance_obj(false) {} + RGWBucketInfo() : flags(0), creation_time(0), has_instance_obj(false), num_shards(0), bucket_index_shard_hash_type(MOD) {} }; WRITE_CLASS_ENCODER(RGWBucketInfo) @@ -1022,6 +1067,9 @@ public: bool in_extra_data; /* in-memory only member, does not serialize */ + // Represents the hash index source for this object once it is set (non-empty) + std::string index_hash_source; + rgw_obj() : in_extra_data(false) {} rgw_obj(const char *b, const char *o) : in_extra_data(false) { rgw_bucket _b(b); @@ -1120,6 +1168,9 @@ public: return orig_key; } + string& get_hash_object() { + return index_hash_source.empty() ? object : index_hash_source; + } /** * Translate a namespace-mangled object name to the user-facing name * existing in the given namespace. diff --git a/src/rgw/rgw_json_enc.cc b/src/rgw/rgw_json_enc.cc index cd731b78a592..f4ce380d20cd 100644 --- a/src/rgw/rgw_json_enc.cc +++ b/src/rgw/rgw_json_enc.cc @@ -545,6 +545,8 @@ void RGWBucketInfo::dump(Formatter *f) const encode_json("placement_rule", placement_rule, f); encode_json("has_instance_obj", has_instance_obj, f); encode_json("quota", quota, f); + encode_json("num_shards", num_shards, f); + encode_json("bi_shard_hash_type", (uint32_t)bucket_index_shard_hash_type, f); } void RGWBucketInfo::decode_json(JSONObj *obj) { @@ -556,6 +558,10 @@ void RGWBucketInfo::decode_json(JSONObj *obj) { JSONDecoder::decode_json("placement_rule", placement_rule, obj); JSONDecoder::decode_json("has_instance_obj", has_instance_obj, obj); JSONDecoder::decode_json("quota", quota, obj); + JSONDecoder::decode_json("num_shards", num_shards, obj); + uint32_t hash_type; + JSONDecoder::decode_json("bi_shard_hash_type", hash_type, obj); + bucket_index_shard_hash_type = (uint8_t)hash_type; } void RGWObjEnt::dump(Formatter *f) const @@ -658,6 +664,7 @@ void RGWZone::dump(Formatter *f) const encode_json("endpoints", endpoints, f); encode_json("log_meta", log_meta, f); encode_json("log_data", log_data, f); + encode_json("bucket_index_max_shards", bucket_index_max_shards, f); } void RGWZone::decode_json(JSONObj *obj) @@ -666,6 +673,7 @@ void RGWZone::decode_json(JSONObj *obj) JSONDecoder::decode_json("endpoints", endpoints, obj); JSONDecoder::decode_json("log_meta", log_meta, obj); JSONDecoder::decode_json("log_data", log_data, obj); + JSONDecoder::decode_json("bucket_index_max_shards", bucket_index_max_shards, obj); } void RGWRegionPlacementTarget::dump(Formatter *f) const diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 11bb93d2d57a..679b70bd14c2 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -323,7 +323,14 @@ static int rgw_build_policies(RGWRados *store, struct req_state *s, bool only_bu string obj_str; RGWUserInfo bucket_owner_info; - s->bucket_instance_id = s->info.args.get(RGW_SYS_PARAM_PREFIX "bucket-instance"); + string bi = s->info.args.get(RGW_SYS_PARAM_PREFIX "bucket-instance"); + if (!bi.empty()) { + int shard_id; + ret = rgw_bucket_parse_bucket_instance(bi, &s->bucket_instance_id, &shard_id); + if (ret < 0) { + return ret; + } + } s->bucket_acl = new RGWAccessControlPolicy(s->cct); @@ -1454,6 +1461,7 @@ int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, void *obj_ctx, string } head_obj = manifest_gen.get_cur_obj(); + head_obj.index_hash_source = obj_str; cur_obj = head_obj; return 0; @@ -2531,6 +2539,7 @@ void RGWInitMultipart::execute() obj.init_ns(s->bucket, tmp_obj_name, mp_ns); // the meta object will be indexed with 0 size, we c obj.set_in_extra_data(true); + obj.index_hash_source = s->object_str; ret = store->put_obj_meta(s->obj_ctx, obj, 0, NULL, attrs, RGW_OBJ_CATEGORY_MULTIMETA, PUT_OBJ_CREATE_EXCL, s->owner.get_id()); } while (ret == -EEXIST); } @@ -2739,6 +2748,7 @@ void RGWCompleteMultipart::execute() meta_obj.init_ns(s->bucket, meta_oid, mp_ns); meta_obj.set_in_extra_data(true); + meta_obj.index_hash_source = s->object_str; ret = get_obj_attrs(store, s, meta_obj, attrs, NULL, NULL); if (ret < 0) { @@ -2890,6 +2900,7 @@ void RGWAbortMultipart::execute() string oid = mp.get_part(obj_iter->second.num); rgw_obj obj; obj.init_ns(s->bucket, oid, mp_ns); + obj.index_hash_source = s->object_str; ret = store->delete_obj(s->obj_ctx, owner, obj); if (ret < 0 && ret != -ENOENT) return; @@ -2898,6 +2909,7 @@ void RGWAbortMultipart::execute() RGWObjManifest::obj_iterator oiter; for (oiter = manifest.obj_begin(); oiter != manifest.obj_end(); ++oiter) { rgw_obj loc = oiter.get_location(); + loc.index_hash_source = s->object_str; ret = store->delete_obj(s->obj_ctx, owner, loc); if (ret < 0 && ret != -ENOENT) return; @@ -2909,6 +2921,7 @@ void RGWAbortMultipart::execute() // and also remove the metadata obj meta_obj.init_ns(s->bucket, meta_oid, mp_ns); meta_obj.set_in_extra_data(true); + meta_obj.index_hash_source = s->object_str; ret = store->delete_obj(s->obj_ctx, owner, meta_obj); if (ret == -ENOENT) { ret = -ERR_NO_SUCH_BUCKET; diff --git a/src/rgw/rgw_quota.cc b/src/rgw/rgw_quota.cc index a48ce69890bd..910da2fffb7c 100644 --- a/src/rgw/rgw_quota.cc +++ b/src/rgw/rgw_quota.cc @@ -318,8 +318,8 @@ int RGWBucketStatsCache::fetch_stats_from_storage(const string& user, rgw_bucket { RGWBucketInfo bucket_info; - uint64_t bucket_ver; - uint64_t master_ver; + string bucket_ver; + string master_ver; map bucket_stats; int r = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, bucket_stats, NULL); diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index bb110142f491..164363f83034 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -19,6 +19,7 @@ #include "rgw_metadata.h" #include "rgw_bucket.h" +#include "cls/rgw/cls_rgw_ops.h" #include "cls/rgw/cls_rgw_types.h" #include "cls/rgw/cls_rgw_client.h" #include "cls/refcount/cls_refcount_client.h" @@ -48,6 +49,8 @@ using namespace librados; #define dout_subsys ceph_subsys_rgw +#define MAX_BUCKET_INDEX_SHARDS_PRIME 7877 + using namespace std; static RGWCache cached_rados_provider; @@ -77,7 +80,6 @@ static RGWObjCategory main_category = RGW_OBJ_CATEGORY_MAIN; #define RGW_STATELOG_OBJ_PREFIX "statelog." - #define dout_subsys ceph_subsys_rgw void RGWDefaultRegionInfo::dump(Formatter *f) const { @@ -1451,6 +1453,15 @@ int RGWRados::init_complete() quota_handler = RGWQuotaHandler::generate_handler(this, quota_threads); + bucket_index_max_shards = (cct->_conf->rgw_override_bucket_index_max_shards ? cct->_conf->rgw_override_bucket_index_max_shards : + zone_public_config.bucket_index_max_shards); + if (bucket_index_max_shards > MAX_BUCKET_INDEX_SHARDS_PRIME) { + bucket_index_max_shards = MAX_BUCKET_INDEX_SHARDS_PRIME; + ldout(cct, 1) << __func__ << " bucket index max shards is too large, reset to value: " + << MAX_BUCKET_INDEX_SHARDS_PRIME << dendl; + } + ldout(cct, 20) << __func__ << " bucket index max shards: " << bucket_index_max_shards << dendl; + return ret; } @@ -1672,6 +1683,15 @@ int RGWRados::open_bucket_data_extra_ctx(rgw_bucket& bucket, librados::IoCtx& da return 0; } +void RGWRados::build_bucket_index_marker(const string& shard_id_str, const string& shard_marker, + string *marker) { + if (marker) { + *marker = shard_id_str; + marker->append(BucketIndexShardsManager::KEY_VALUE_SEPARATOR); + marker->append(shard_marker); + } +} + int RGWRados::open_bucket_index_ctx(rgw_bucket& bucket, librados::IoCtx& index_ctx) { int r = open_bucket_pool_ctx(bucket.name, bucket.index_pool, index_ctx); @@ -1994,7 +2014,7 @@ void RGWRados::shard_name(const string& prefix, unsigned max_shards, const strin name = prefix + buf; } -void RGWRados::time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, string& section, string& key, bufferlist& bl) +void RGWRados::time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, const string& section, const string& key, bufferlist& bl) { cls_log_add_prepare_entry(entry, ut, section, key, bl); } @@ -2322,6 +2342,10 @@ int RGWRados::list_objects(rgw_bucket& bucket, int max, string& prefix, string& result.push_back(ent); count++; } + + // Either the back-end telling us truncated, or we don't consume all + // items returned per the amount caller request + truncated = (truncated || eiter != ent_map.end()); } done: @@ -2358,7 +2382,7 @@ int RGWRados::create_pool(rgw_bucket& bucket) return 0; } -int RGWRados::init_bucket_index(rgw_bucket& bucket) +int RGWRados::init_bucket_index(rgw_bucket& bucket, int num_shards) { librados::IoCtx index_ctx; // context for new bucket @@ -2369,13 +2393,10 @@ int RGWRados::init_bucket_index(rgw_bucket& bucket) string dir_oid = dir_oid_prefix; dir_oid.append(bucket.marker); - librados::ObjectWriteOperation op; - op.create(true); - r = cls_rgw_init_index(index_ctx, op, dir_oid); - if (r < 0 && r != -EEXIST) - return r; + map bucket_objs; + get_bucket_index_objects(dir_oid, num_shards, bucket_objs); - return 0; + return CLSRGWIssueBucketIndexInit(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)(); } /** @@ -2426,7 +2447,7 @@ int RGWRados::create_bucket(RGWUserInfo& owner, rgw_bucket& bucket, string dir_oid = dir_oid_prefix; dir_oid.append(bucket.marker); - r = init_bucket_index(bucket); + r = init_bucket_index(bucket, bucket_index_max_shards); if (r < 0) return r; @@ -2442,6 +2463,8 @@ int RGWRados::create_bucket(RGWUserInfo& owner, rgw_bucket& bucket, info.owner = owner.user_id; info.region = region_name; info.placement_rule = selected_placement_rule; + info.num_shards = bucket_index_max_shards; + info.bucket_index_shard_hash_type = RGWBucketInfo::MOD; if (!creation_time) time(&info.creation_time); else @@ -2470,11 +2493,16 @@ int RGWRados::create_bucket(RGWUserInfo& owner, rgw_bucket& bucket, /* remove bucket index */ librados::IoCtx index_ctx; // context for new bucket - int r = open_bucket_index_ctx(bucket, index_ctx); + map bucket_objs; + int r = open_bucket_index(bucket, index_ctx, bucket_objs); if (r < 0) return r; - index_ctx.remove(dir_oid); + map::const_iterator biter; + for (biter = bucket_objs.begin(); biter != bucket_objs.end(); ++biter) { + // Do best effort removal + index_ctx.remove(biter->second); + } } /* ret == -ENOENT here */ } @@ -2807,6 +2835,25 @@ int RGWRados::get_obj_ref(const rgw_obj& obj, rgw_rados_ref *ref, rgw_bucket *bu return 0; } +int RGWRados::BucketShard::init(rgw_bucket& _bucket, rgw_obj& obj) +{ + bucket = _bucket; + + if (store->bucket_is_system(bucket)) { + return 0; + } + + int ret = store->open_bucket_index_shard(bucket, index_ctx, obj.get_hash_object(), &bucket_obj, &shard_id); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: open_bucket_index_shard() returned ret=" << ret << dendl; + return ret; + } + ldout(store->ctx(), 20) << " bucket index object: " << bucket_obj << dendl; + + return 0; +} + + /** * Write/overwrite an object to the bucket storage. * bucket: the bucket to store the object in @@ -2928,7 +2975,14 @@ int RGWRados::put_obj_meta_impl(void *ctx, rgw_obj& obj, uint64_t size, index_tag = state->write_tag; } - r = prepare_update_index(NULL, bucket, CLS_RGW_OP_ADD, obj, index_tag); + librados::IoCtx index_ctx; + BucketShard bs(this); + r = bs.init(bucket, obj); + if (r < 0) { + return r; + } + + r = prepare_update_index(NULL, bs, CLS_RGW_OP_ADD, obj, index_tag); if (r < 0) return r; @@ -2950,8 +3004,8 @@ int RGWRados::put_obj_meta_impl(void *ctx, rgw_obj& obj, uint64_t size, ldout(cct, 0) << "ERROR: complete_atomic_overwrite returned r=" << r << dendl; } - r = complete_update_index(bucket, obj.object, index_tag, poolid, epoch, size, - ut, etag, content_type, &acl_bl, category, remove_objs); + r = complete_update_index(bs, obj, index_tag, poolid, epoch, size, + ut, etag, content_type, &acl_bl, category, remove_objs); if (r < 0) goto done_cancel; @@ -2967,7 +3021,7 @@ int RGWRados::put_obj_meta_impl(void *ctx, rgw_obj& obj, uint64_t size, return 0; done_cancel: - int ret = complete_update_index_cancel(bucket, obj.object, index_tag); + int ret = complete_update_index_cancel(bs, obj, index_tag); if (ret < 0) { ldout(cct, 0) << "ERROR: complete_update_index_cancel() returned ret=" << ret << dendl; } @@ -3765,7 +3819,86 @@ int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, return 0; } -static void translate_raw_stats(rgw_bucket_dir_header& header, map& stats) +int RGWRados::open_bucket_index_base(rgw_bucket& bucket, librados::IoCtx& index_ctx, + string& bucket_oid_base) { + if (bucket_is_system(bucket)) + return -EINVAL; + + int r = open_bucket_index_ctx(bucket, index_ctx); + if (r < 0) + return r; + + if (bucket.marker.empty()) { + ldout(cct, 0) << "ERROR: empty marker for bucket operation" << dendl; + return -EIO; + } + + bucket_oid_base = dir_oid_prefix; + bucket_oid_base.append(bucket.marker); + + return 0; + +} + +int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, + map& bucket_objs, int shard_id, map *bucket_instance_ids) { + string bucket_oid_base; + int ret = open_bucket_index_base(bucket, index_ctx, bucket_oid_base); + if (ret < 0) + return ret; + + // Get the bucket info + RGWBucketInfo binfo; + ret = get_bucket_instance_info(NULL, bucket, binfo, NULL, NULL); + if (ret < 0) + return ret; + + get_bucket_index_objects(bucket_oid_base, binfo.num_shards, bucket_objs, shard_id); + if (bucket_instance_ids) { + get_bucket_instance_ids(binfo, shard_id, bucket_instance_ids); + } + return 0; +} + +template +int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, + map& oids, map& bucket_objs, + int shard_id, map *bucket_instance_ids) +{ + int ret = open_bucket_index(bucket, index_ctx, oids, shard_id, bucket_instance_ids); + if (ret < 0) + return ret; + + map::const_iterator iter = oids.begin(); + for (; iter != oids.end(); ++iter) { + bucket_objs[iter->first] = T(); + } + return 0; +} + +int RGWRados::open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index_ctx, + const string& obj_key, string *bucket_obj, int *shard_id) +{ + string bucket_oid_base; + int ret = open_bucket_index_base(bucket, index_ctx, bucket_oid_base); + if (ret < 0) + return ret; + + // Get the bucket info + RGWBucketInfo binfo; + ret = get_bucket_instance_info(NULL, bucket, binfo, NULL, NULL); + if (ret < 0) + return ret; + + ret = get_bucket_index_object(bucket_oid_base, obj_key, binfo.num_shards, + (RGWBucketInfo::BIShardsHashType)binfo.bucket_index_shard_hash_type, bucket_obj, shard_id); + if (ret < 0) { + ldout(cct, 10) << "get_bucket_index_object() returned ret=" << ret << dendl; + } + return 0; +} + +static void accumulate_raw_stats(rgw_bucket_dir_header& header, map& stats) { map::iterator iter = header.stats.begin(); for (; iter != header.stats.end(); ++iter) { @@ -3773,9 +3906,9 @@ static void translate_raw_stats(rgw_bucket_dir_header& header, mapsecond; s.category = (RGWObjCategory)iter->first; - s.num_kb = ((header_stats.total_size + 1023) / 1024); - s.num_kb_rounded = ((header_stats.total_size_rounded + 1023) / 1024); - s.num_objects = header_stats.num_entries; + s.num_kb += ((header_stats.total_size + 1023) / 1024); + s.num_kb_rounded += ((header_stats.total_size_rounded + 1023) / 1024); + s.num_objects += header_stats.num_entries; } } @@ -3784,21 +3917,24 @@ int RGWRados::bucket_check_index(rgw_bucket& bucket, map *calculated_stats) { librados::IoCtx index_ctx; - string oid; - - int ret = open_bucket_index(bucket, index_ctx, oid); + // key - bucket index object id + // value - bucket index check OP returned result with the given bucket index object (shard) + map oids; + map bucket_objs_ret; + int ret = open_bucket_index(bucket, index_ctx, oids, bucket_objs_ret); if (ret < 0) return ret; - rgw_bucket_dir_header existing_header; - rgw_bucket_dir_header calculated_header; - - ret = cls_rgw_bucket_check_index_op(index_ctx, oid, &existing_header, &calculated_header); + ret = CLSRGWIssueBucketCheck(index_ctx, oids, bucket_objs_ret, cct->_conf->rgw_bucket_index_max_aio)(); if (ret < 0) return ret; - translate_raw_stats(existing_header, *existing_stats); - translate_raw_stats(calculated_header, *calculated_stats); + // Aggregate results (from different shards if there is any) + map::iterator iter; + for (iter = bucket_objs_ret.begin(); iter != bucket_objs_ret.end(); ++iter) { + accumulate_raw_stats(iter->second.existing_header, *existing_stats); + accumulate_raw_stats(iter->second.calculated_header, *calculated_stats); + } return 0; } @@ -3806,13 +3942,12 @@ int RGWRados::bucket_check_index(rgw_bucket& bucket, int RGWRados::bucket_rebuild_index(rgw_bucket& bucket) { librados::IoCtx index_ctx; - string oid; - - int ret = open_bucket_index(bucket, index_ctx, oid); - if (ret < 0) - return ret; + map bucket_objs; + int r = open_bucket_index(bucket, index_ctx, bucket_objs); + if (r < 0) + return r; - return cls_rgw_bucket_rebuild_index_op(index_ctx, oid); + return CLSRGWIssueBucketRebuild(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)(); } @@ -3874,8 +4009,14 @@ int RGWRados::delete_obj_impl(void *ctx, const string& bucket_owner, rgw_obj& ob bool ret_not_existed = (state && !state->exists); + BucketShard bs(this); + r = bs.init(bucket, obj); + if (r < 0) { + return r; + } + string tag; - r = prepare_update_index(state, bucket, CLS_RGW_OP_DEL, obj, tag); + r = prepare_update_index(state, bs, CLS_RGW_OP_DEL, obj, tag); if (r < 0) return r; @@ -3890,9 +4031,9 @@ int RGWRados::delete_obj_impl(void *ctx, const string& bucket_owner, rgw_obj& ob int64_t poolid = ref.ioctx.get_id(); if (r >= 0 || r == -ENOENT) { uint64_t epoch = ref.ioctx.get_last_version(); - r = complete_update_index_del(bucket, obj.object, tag, poolid, epoch); + r = complete_update_index_del(bs, obj, tag, poolid, epoch); } else { - int ret = complete_update_index_cancel(bucket, obj.object, tag); + int ret = complete_update_index_cancel(bs, obj, tag); if (ret < 0) { ldout(cct, 0) << "ERROR: complete_update_index_cancel returned ret=" << ret << dendl; } @@ -3950,8 +4091,14 @@ int RGWRados::delete_obj_index(rgw_obj& obj) std::string oid, key; get_obj_bucket_and_oid_key(obj, bucket, oid, key); + BucketShard bs(this); + int r = bs.init(bucket, obj); + if (r < 0) { + return r; + } + string tag; - int r = complete_update_index_del(bucket, obj.object, tag, -1 /* pool */, 0); + r = complete_update_index_del(bs, obj, tag, -1 /* pool */, 0); return r; } @@ -4287,9 +4434,17 @@ int RGWRados::set_attrs(void *ctx, rgw_obj& obj, if (!op.size()) return 0; + librados::IoCtx index_ctx; + BucketShard bs(this); + r = bs.init(bucket, obj); + if (r < 0) { + ldout(cct, 10) << "bs.init() returned r=" << r << dendl; + return r; + } + string tag; if (state) { - r = prepare_update_index(state, bucket, CLS_RGW_OP_ADD, obj, tag); + r = prepare_update_index(state, bs, CLS_RGW_OP_ADD, obj, tag); if (r < 0) return r; } @@ -4305,10 +4460,10 @@ int RGWRados::set_attrs(void *ctx, rgw_obj& obj, uint64_t epoch = ref.ioctx.get_last_version(); int64_t poolid = ref.ioctx.get_id(); utime_t mtime = ceph_clock_now(cct); - r = complete_update_index(bucket, obj.object, tag, poolid, epoch, state->size, + r = complete_update_index(bs, obj, tag, poolid, epoch, state->size, mtime, etag, content_type, &acl_bl, RGW_OBJ_CATEGORY_MAIN, NULL); } else { - int ret = complete_update_index_cancel(bucket, obj.object, tag); + int ret = complete_update_index_cancel(bs, obj, tag); if (ret < 0) { ldout(cct, 0) << "ERROR: comlete_update_index_cancel() returned r=" << r << dendl; } @@ -4504,13 +4659,13 @@ done_err: return r; } -int RGWRados::prepare_update_index(RGWObjState *state, rgw_bucket& bucket, +int RGWRados::prepare_update_index(RGWObjState *state, BucketShard& bs, RGWModifyOp op, rgw_obj& obj, string& tag) { - if (bucket_is_system(bucket)) + if (bucket_is_system(bs.bucket)) return 0; - int ret = data_log->add_entry(obj.bucket); + int ret = data_log->add_entry(bs.bucket, bs.shard_id); if (ret < 0) { lderr(cct) << "ERROR: failed writing data log" << dendl; return ret; @@ -4527,21 +4682,20 @@ int RGWRados::prepare_update_index(RGWObjState *state, rgw_bucket& bucket, append_rand_alpha(cct, tag, tag, 32); } } - ret = cls_obj_prepare_op(bucket, op, tag, - obj.object, obj.key); + ret = cls_obj_prepare_op(bs, op, tag, obj.object, obj.key); return ret; } -int RGWRados::complete_update_index(rgw_bucket& bucket, string& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size, +int RGWRados::complete_update_index(BucketShard& bs, rgw_obj& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size, utime_t& ut, string& etag, string& content_type, bufferlist *acl_bl, RGWObjCategory category, list *remove_objs) { - if (bucket_is_system(bucket)) + if (bucket_is_system(bs.bucket)) return 0; RGWObjEnt ent; - ent.name = oid; + ent.name = oid.object; ent.size = size; ent.mtime = ut; ent.etag = etag; @@ -4556,7 +4710,7 @@ int RGWRados::complete_update_index(rgw_bucket& bucket, string& oid, string& tag ent.owner_display_name = owner.get_display_name(); ent.content_type = content_type; - int ret = cls_obj_complete_add(bucket, tag, poolid, epoch, ent, category, remove_objs); + int ret = cls_obj_complete_add(bs, tag, poolid, epoch, ent, category, remove_objs); return ret; } @@ -4580,6 +4734,7 @@ int RGWRados::clone_objs_impl(void *ctx, rgw_obj& dst_obj, string etag; string content_type; bufferlist acl_bl; + BucketShard bs(this); bool update_index = (category == RGW_OBJ_CATEGORY_MAIN || category == RGW_OBJ_CATEGORY_MULTIMETA); @@ -4660,8 +4815,13 @@ int RGWRados::clone_objs_impl(void *ctx, rgw_obj& dst_obj, int64_t poolid = io_ctx.get_id(); int ret; + ret = bs.init(bucket, dst_obj); + if (ret < 0) { + goto done; + } + if (update_index) { - ret = prepare_update_index(state, bucket, CLS_RGW_OP_ADD, dst_obj, tag); + ret = prepare_update_index(state, bs, CLS_RGW_OP_ADD, dst_obj, tag); if (ret < 0) goto done; } @@ -4675,10 +4835,10 @@ done: if (update_index) { if (ret >= 0) { - ret = complete_update_index(bucket, dst_obj.object, tag, poolid, epoch, size, + ret = complete_update_index(bs, dst_obj, tag, poolid, epoch, size, ut, etag, content_type, &acl_bl, category, NULL); } else { - int r = complete_update_index_cancel(bucket, dst_obj.object, tag); + int r = complete_update_index_cancel(bs, dst_obj, tag); if (r < 0) { ldout(cct, 0) << "ERROR: comlete_update_index_cancel() returned r=" << r << dendl; } @@ -5377,57 +5537,95 @@ int RGWRados::obj_stat(void *ctx, rgw_obj& obj, uint64_t *psize, time_t *pmtime, return 0; } -int RGWRados::get_bucket_stats(rgw_bucket& bucket, uint64_t *bucket_ver, uint64_t *master_ver, map& stats, - string *max_marker) +int RGWRados::get_bucket_stats(rgw_bucket& bucket, string *bucket_ver, string *master_ver, + map& stats, string *max_marker) { - rgw_bucket_dir_header header; - int r = cls_bucket_head(bucket, header); + map headers; + map bucket_instance_ids; + int r = cls_bucket_head(bucket, headers, &bucket_instance_ids); if (r < 0) return r; - stats.clear(); - - translate_raw_stats(header, stats); - - *bucket_ver = header.ver; - *master_ver = header.master_ver; - - if (max_marker) - *max_marker = header.max_marker; - + assert(headers.size() == bucket_instance_ids.size()); + + map::iterator iter = headers.begin(); + map::iterator viter = bucket_instance_ids.begin(); + BucketIndexShardsManager ver_mgr; + BucketIndexShardsManager master_ver_mgr; + BucketIndexShardsManager marker_mgr; + char buf[64]; + for(; iter != headers.end(); ++iter, ++viter) { + accumulate_raw_stats(iter->second, stats); + snprintf(buf, sizeof(buf), "%lu", iter->second.ver); + ver_mgr.add(viter->first, string(buf)); + snprintf(buf, sizeof(buf), "%lu", iter->second.master_ver); + master_ver_mgr.add(viter->first, string(buf)); + marker_mgr.add(viter->first, iter->second.max_marker); + } + ver_mgr.to_string(bucket_ver); + master_ver_mgr.to_string(master_ver); + marker_mgr.to_string(max_marker); return 0; } class RGWGetBucketStatsContext : public RGWGetDirHeader_CB { RGWGetBucketStats_CB *cb; + uint32_t pendings; + map stats; + int ret_code; + bool should_cb; + Mutex lock; public: - RGWGetBucketStatsContext(RGWGetBucketStats_CB *_cb) : cb(_cb) {} + RGWGetBucketStatsContext(RGWGetBucketStats_CB *_cb, uint32_t _pendings) + : cb(_cb), pendings(_pendings), stats(), ret_code(0), should_cb(true), + lock("RGWGetBucketStatsContext") {} + void handle_response(int r, rgw_bucket_dir_header& header) { - map stats; + Mutex::Locker l(lock); + if (should_cb) { + if ( r >= 0) { + accumulate_raw_stats(header, stats); + } else { + ret_code = r; + } - if (r >= 0) { - translate_raw_stats(header, stats); - cb->set_response(header.ver, header.master_ver, &stats, header.max_marker); + // Are we all done? + if (--pendings == 0) { + if (!ret_code) { + cb->set_response(&stats); + } + cb->handle_response(ret_code); + cb->put(); + } } + } - cb->handle_response(r); - - cb->put(); + void unset_cb() { + Mutex::Locker l(lock); + should_cb = false; } }; int RGWRados::get_bucket_stats_async(rgw_bucket& bucket, RGWGetBucketStats_CB *ctx) { - RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(ctx); - int r = cls_bucket_head_async(bucket, get_ctx); + RGWBucketInfo binfo; + int r = get_bucket_instance_info(NULL, bucket, binfo, NULL, NULL); + if (r < 0) + return r; + + int num_aio = 0; + RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(ctx, binfo.num_shards); + assert(get_ctx); + r = cls_bucket_head_async(bucket, get_ctx, &num_aio); + get_ctx->put(); if (r < 0) { ctx->put(); - delete get_ctx; - return r; + if (num_aio) { + get_ctx->unset_cb(); + } } - - return 0; + return r; } class RGWGetUserStatsContext : public RGWGetUserHeader_CB { @@ -5521,7 +5719,7 @@ int RGWRados::get_bucket_instance_info(void *ctx, rgw_bucket& bucket, RGWBucketI time_t *pmtime, map *pattrs) { string oid; - if (!bucket.oid.empty()) { + if (bucket.oid.empty()) { get_bucket_meta_oid(bucket, oid); } else { oid = bucket.oid; @@ -5836,21 +6034,21 @@ int RGWRados::update_containers_stats(map& m) RGWBucketEnt& ent = iter->second; rgw_bucket& bucket = ent.bucket; - rgw_bucket_dir_header header; - int r = cls_bucket_head(bucket, header); + map headers; + int r = cls_bucket_head(bucket, headers); if (r < 0) return r; - ent.count = 0; - ent.size = 0; - - RGWObjCategory category = main_category; - map::iterator iter = header.stats.find((uint8_t)category); - if (iter != header.stats.end()) { - struct rgw_bucket_category_stats& stats = iter->second; - ent.count = stats.num_entries; - ent.size = stats.total_size; - ent.size_rounded = stats.total_size_rounded; + map::iterator hiter = headers.begin(); + for (; hiter != headers.end(); ++hiter) { + RGWObjCategory category = main_category; + map::iterator iter = (hiter->second.stats).find((uint8_t)category); + if (iter != hiter->second.stats.end()) { + struct rgw_bucket_category_stats& stats = iter->second; + ent.count += stats.num_entries; + ent.size += stats.total_size; + ent.size_rounded += stats.total_size_rounded; + } } } @@ -5973,43 +6171,125 @@ int RGWRados::list_raw_objects(rgw_bucket& pool, const string& prefix_filter, return oids.size(); } -int RGWRados::list_bi_log_entries(rgw_bucket& bucket, string& marker, uint32_t max, +int RGWRados::list_bi_log_entries(rgw_bucket& bucket, int shard_id, string& marker, uint32_t max, std::list& result, bool *truncated) { + ldout(cct, 20) << __func__ << ": " << bucket << " marker " << marker << " shard_id=" << shard_id << " max " << max << dendl; result.clear(); librados::IoCtx index_ctx; - string oid; - int r = open_bucket_index(bucket, index_ctx, oid); + map oids; + map bi_log_lists; + map bucket_instance_ids; + int r = open_bucket_index(bucket, index_ctx, oids, shard_id, &bucket_instance_ids); if (r < 0) return r; - std::list entries; - int ret = cls_rgw_bi_log_list(index_ctx, oid, marker, max - result.size(), entries, truncated); - if (ret < 0) - return ret; + BucketIndexShardsManager marker_mgr; + bool has_shards = (oids.size() > 1 || shard_id >= 0); + // If there are multiple shards for the bucket index object, the marker + // should have the pattern '{shard_id_1}#{shard_marker_1},{shard_id_2}# + // {shard_marker_2}...', if there is no sharding, the bi_log_list should + // only contain one record, and the key is the bucket instance id. + r = marker_mgr.from_string(marker, shard_id); + if (r < 0) + return r; + + r = CLSRGWIssueBILogList(index_ctx, marker_mgr, max, oids, bi_log_lists, cct->_conf->rgw_bucket_index_max_aio)(); + if (r < 0) + return r; - std::list::iterator iter; - for (iter = entries.begin(); iter != entries.end(); ++iter) { - result.push_back(*iter); + vector shard_ids_str; + map::iterator> vcurrents; + map::iterator> vends; + if (truncated) { + *truncated = false; + } + map::iterator miter = bi_log_lists.begin(); + for (; miter != bi_log_lists.end(); ++miter) { + int shard_id = miter->first; + vcurrents[shard_id] = miter->second.entries.begin(); + vends[shard_id] = miter->second.entries.end(); + if (truncated) { + *truncated = (*truncated || miter->second.truncated); + } + } + + size_t total = 0; + bool has_more = true; + map::iterator>::iterator viter; + map::iterator>::iterator eiter; + while (total < max && has_more) { + has_more = false; + + viter = vcurrents.begin(); + eiter = vends.begin(); + + for (; total < max && viter != vcurrents.end(); ++viter, ++eiter) { + assert (eiter != vends.end()); + + int shard_id = viter->first; + list::iterator& liter = viter->second; + + if (liter == eiter->second){ + continue; + } + rgw_bi_log_entry& entry = *(liter); + if (has_shards) { + char buf[16]; + snprintf(buf, sizeof(buf), "%d", shard_id); + string tmp_id; + build_bucket_index_marker(buf, entry.id, &tmp_id); + entry.id.swap(tmp_id); + } + marker_mgr.add(shard_id, entry.id); + result.push_back(entry); + total++; + has_more = true; + ++liter; + } + } + + if (truncated) { + for (viter = vcurrents.begin(), eiter = vends.begin(); viter != vcurrents.end(); ++viter, ++eiter) { + assert (eiter != vends.end()); + *truncated = (*truncated || (viter->second != eiter->second)); + } + } + + // Refresh marker, if there are multiple shards, the output will look like + // '{shard_oid_1}#{shard_marker_1},{shard_oid_2}#{shard_marker_2}...', + // if there is no sharding, the simply marker (without oid) is returned + if (has_shards) { + marker_mgr.to_string(&marker); + } else { + if (!result.empty()) { + marker = result.rbegin()->id; + } } return 0; } -int RGWRados::trim_bi_log_entries(rgw_bucket& bucket, string& start_marker, string& end_marker) +int RGWRados::trim_bi_log_entries(rgw_bucket& bucket, int shard_id, string& start_marker, string& end_marker) { librados::IoCtx index_ctx; - string oid; - int r = open_bucket_index(bucket, index_ctx, oid); + map bucket_objs; + int r = open_bucket_index(bucket, index_ctx, bucket_objs, shard_id); if (r < 0) return r; - int ret = cls_rgw_bi_log_trim(index_ctx, oid, start_marker, end_marker); - if (ret < 0) - return ret; + BucketIndexShardsManager start_marker_mgr; + r = start_marker_mgr.from_string(start_marker, shard_id); + if (r < 0) + return r; + BucketIndexShardsManager end_marker_mgr; + r = end_marker_mgr.from_string(end_marker, shard_id); + if (r < 0) + return r; - return 0; + return CLSRGWIssueBILogTrim(index_ctx, start_marker_mgr, end_marker_mgr, bucket_objs, + cct->_conf->rgw_bucket_index_max_aio)(); } int RGWRados::gc_operate(string& oid, librados::ObjectWriteOperation *op) @@ -6048,34 +6328,20 @@ int RGWRados::cls_rgw_init_index(librados::IoCtx& index_ctx, librados::ObjectWri return r; } -int RGWRados::cls_obj_prepare_op(rgw_bucket& bucket, RGWModifyOp op, string& tag, +int RGWRados::cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag, string& name, string& locator) { - librados::IoCtx index_ctx; - string oid; - - int r = open_bucket_index(bucket, index_ctx, oid); - if (r < 0) - return r; - ObjectWriteOperation o; cls_rgw_bucket_prepare_op(o, op, tag, name, locator, zone_public_config.log_data); - r = index_ctx.operate(oid, &o); - return r; + int ret = bs.index_ctx.operate(bs.bucket_obj, &o); + return ret; } -int RGWRados::cls_obj_complete_op(rgw_bucket& bucket, RGWModifyOp op, string& tag, +int RGWRados::cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, list *remove_objs) { - librados::IoCtx index_ctx; - string oid; - - int r = open_bucket_index(bucket, index_ctx, oid); - if (r < 0) - return r; - ObjectWriteOperation o; rgw_bucket_dir_entry_meta dir_meta; dir_meta.size = ent.size; @@ -6092,77 +6358,97 @@ int RGWRados::cls_obj_complete_op(rgw_bucket& bucket, RGWModifyOp op, string& ta cls_rgw_bucket_complete_op(o, op, tag, ver, ent.name, dir_meta, remove_objs, zone_public_config.log_data); AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL); - r = index_ctx.aio_operate(oid, c, &o); + int ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &o); c->release(); - return r; + return ret; } -int RGWRados::cls_obj_complete_add(rgw_bucket& bucket, string& tag, +int RGWRados::cls_obj_complete_add(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, - list *remove_objs) + list *remove_obj) { - return cls_obj_complete_op(bucket, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_objs); + return cls_obj_complete_op(bs, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_obj); } -int RGWRados::cls_obj_complete_del(rgw_bucket& bucket, string& tag, +int RGWRados::cls_obj_complete_del(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, string& name) { RGWObjEnt ent; ent.name = name; - return cls_obj_complete_op(bucket, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, NULL); + return cls_obj_complete_op(bs, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, NULL); } -int RGWRados::cls_obj_complete_cancel(rgw_bucket& bucket, string& tag, string& name) +int RGWRados::cls_obj_complete_cancel(BucketShard& bs, string& tag, string& name) { RGWObjEnt ent; ent.name = name; - return cls_obj_complete_op(bucket, CLS_RGW_OP_ADD, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL); + return cls_obj_complete_op(bs, CLS_RGW_OP_ADD, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL); } int RGWRados::cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeout) { librados::IoCtx index_ctx; - string oid; - - int r = open_bucket_index(bucket, index_ctx, oid); + map bucket_objs; + int r = open_bucket_index(bucket, index_ctx, bucket_objs); if (r < 0) return r; - ObjectWriteOperation o; - cls_rgw_bucket_set_tag_timeout(o, timeout); - - r = index_ctx.operate(oid, &o); - - return r; + return CLSRGWIssueSetTagTimeout(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio, timeout)(); } -int RGWRados::cls_bucket_list(rgw_bucket& bucket, string start, string prefix, - uint32_t num, map& m, - bool *is_truncated, string *last_entry, - bool (*force_check_filter)(const string& name)) +int RGWRados::cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix, + uint32_t num_entries, map& m, bool *is_truncated, + string *last_entry, bool (*force_check_filter)(const string& name)) { - ldout(cct, 10) << "cls_bucket_list " << bucket << " start " << start << " num " << num << dendl; + ldout(cct, 10) << "cls_bucket_list " << bucket << " start " << start << " num_entries " << num_entries << dendl; librados::IoCtx index_ctx; - string oid; - int r = open_bucket_index(bucket, index_ctx, oid); + // key - oid (for different shards if there is any) + // value - list result for the corresponding oid (shard), it is filled by the AIO callback + map oids; + map list_results; + int r = open_bucket_index(bucket, index_ctx, oids); if (r < 0) return r; - struct rgw_bucket_dir dir; - r = cls_rgw_list_op(index_ctx, oid, start, prefix, num, &dir, is_truncated); + r = CLSRGWIssueBucketList(index_ctx, start, prefix, num_entries, oids, list_results, cct->_conf->rgw_bucket_index_max_aio)(); if (r < 0) return r; - map::iterator miter; - bufferlist updates; - for (miter = dir.m.begin(); miter != dir.m.end(); ++miter) { - RGWObjEnt e; - rgw_bucket_dir_entry& dirent = miter->second; + // Create a list of iterators that are used to iterate each shard + vector::iterator> vcurrents(list_results.size()); + vector::iterator> vends(list_results.size()); + vector vnames(list_results.size()); + map::iterator iter = list_results.begin(); + *is_truncated = false; + for (; iter != list_results.end(); ++iter) { + vcurrents.push_back(iter->second.dir.m.begin()); + vends.push_back(iter->second.dir.m.end()); + vnames.push_back(oids[iter->first]); + *is_truncated = (*is_truncated || iter->second.is_truncated); + } + + // Create a map to track the next candidate entry from each shard, if the entry + // from a specified shard is selected/erased, the next entry from that shard will + // be inserted for next round selection + map candidates; + for (size_t i = 0; i < vcurrents.size(); ++i) { + if (vcurrents[i] != vends[i]) { + candidates[vcurrents[i]->second.name] = i; + } + } + + map updates; + uint32_t count = 0; + while (count < num_entries && !candidates.empty()) { + // Select the next one + int pos = candidates.begin()->second; + struct rgw_bucket_dir_entry& dirent = vcurrents[pos]->second; // fill it in with initial values; we may correct later + RGWObjEnt e; e.name = dirent.name; e.size = dirent.meta.size; e.mtime = dirent.meta.mtime; @@ -6172,44 +6458,53 @@ int RGWRados::cls_bucket_list(rgw_bucket& bucket, string start, string prefix, e.content_type = dirent.meta.content_type; e.tag = dirent.tag; - /* oh, that shouldn't happen! */ - if (e.name.empty()) { - ldout(cct, 0) << "WARNING: got empty dirent name, skipping" << dendl; - continue; - } - bool force_check = force_check_filter && force_check_filter(dirent.name); - if (!dirent.exists || !dirent.pending_map.empty() || force_check) { /* there are uncommitted ops. We need to check the current state, * and if the tags are old we need to do cleanup as well. */ librados::IoCtx sub_ctx; sub_ctx.dup(index_ctx); - r = check_disk_state(sub_ctx, bucket, dirent, e, updates); - if (r < 0) { - if (r == -ENOENT) - continue; - else + r = check_disk_state(sub_ctx, bucket, dirent, e, updates[vnames[pos]]); + if (r < 0 && r != -ENOENT) { return r; } } - m[e.name] = e; - ldout(cct, 10) << "RGWRados::cls_bucket_list: got " << e.name << dendl; + if (r >= 0) { + m[e.name] = e; + ldout(cct, 10) << "RGWRados::cls_bucket_list: got " << e.name << dendl; + ++count; + } + + // Refresh the candidates map + candidates.erase(candidates.begin()); + ++vcurrents[pos]; + if (vcurrents[pos] != vends[pos]) { + candidates[vcurrents[pos]->second.name] = pos; + } } - if (dir.m.size()) { - *last_entry = dir.m.rbegin()->first; + // Suggest updates if there is any + map::iterator miter = updates.begin(); + for (; miter != updates.end(); ++miter) { + if (miter->second.length()) { + ObjectWriteOperation o; + cls_rgw_suggest_changes(o, miter->second); + // we don't care if we lose suggested updates, send them off blindly + AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL); + index_ctx.aio_operate(miter->first, c, &o); + c->release(); + } } - if (updates.length()) { - ObjectWriteOperation o; - cls_rgw_suggest_changes(o, updates); - // we don't care if we lose suggested updates, send them off blindly - AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL); - r = index_ctx.aio_operate(oid, c, &o); - c->release(); + // Check if all the returned entries are consumed or not + for (size_t i = 0; i < vcurrents.size(); ++i) { + if (vcurrents[i] != vends[i]) + *is_truncated = true; } - return m.size(); + if (m.size()) + *last_entry = m.rbegin()->first; + + return 0; } int RGWRados::cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info) @@ -6402,34 +6697,45 @@ int RGWRados::check_disk_state(librados::IoCtx io_ctx, return 0; } -int RGWRados::cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header) +int RGWRados::cls_bucket_head(rgw_bucket& bucket, map& headers, map *bucket_instance_ids) { librados::IoCtx index_ctx; - string oid; - int r = open_bucket_index(bucket, index_ctx, oid); + map oids; + map list_results; + int r = open_bucket_index(bucket, index_ctx, oids, list_results, -1, bucket_instance_ids); if (r < 0) return r; - r = cls_rgw_get_dir_header(index_ctx, oid, &header); + r = CLSRGWIssueGetDirHeader(index_ctx, oids, list_results, cct->_conf->rgw_bucket_index_max_aio)(); if (r < 0) return r; + map::iterator iter = list_results.begin(); + for(; iter != list_results.end(); ++iter) { + headers[oids[iter->first]] = iter->second.dir.header; + } return 0; } -int RGWRados::cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx) +int RGWRados::cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx, int *num_aio) { librados::IoCtx index_ctx; - string oid; - int r = open_bucket_index(bucket, index_ctx, oid); + map bucket_objs; + int r = open_bucket_index(bucket, index_ctx, bucket_objs); if (r < 0) return r; - r = cls_rgw_get_dir_header_async(index_ctx, oid, ctx); - if (r < 0) - return r; - - return 0; + map::iterator iter = bucket_objs.begin(); + for (; iter != bucket_objs.end(); ++iter) { + r = cls_rgw_get_dir_header_async(index_ctx, iter->second, static_cast(ctx->get())); + if (r < 0) { + ctx->put(); + break; + } else { + (*num_aio)++; + } + } + return r; } int RGWRados::cls_user_get_header(const string& user_id, cls_user_header *header) @@ -6480,8 +6786,8 @@ int RGWRados::cls_user_get_header_async(const string& user_id, RGWGetUserHeader_ int RGWRados::cls_user_sync_bucket_stats(rgw_obj& user_obj, rgw_bucket& bucket) { - rgw_bucket_dir_header header; - int r = cls_bucket_head(bucket, header); + map headers; + int r = cls_bucket_head(bucket, headers); if (r < 0) { ldout(cct, 20) << "cls_bucket_header() returned " << r << dendl; return r; @@ -6491,12 +6797,15 @@ int RGWRados::cls_user_sync_bucket_stats(rgw_obj& user_obj, rgw_bucket& bucket) bucket.convert(&entry.bucket); - map::iterator iter = header.stats.begin(); - for (; iter != header.stats.end(); ++iter) { - struct rgw_bucket_category_stats& header_stats = iter->second; - entry.size += header_stats.total_size; - entry.size_rounded += header_stats.total_size_rounded; - entry.count += header_stats.num_entries; + map::iterator hiter = headers.begin(); + for (; hiter != headers.end(); ++hiter) { + map::iterator iter = hiter->second.stats.begin(); + for (; iter != hiter->second.stats.end(); ++iter) { + struct rgw_bucket_category_stats& header_stats = iter->second; + entry.size += header_stats.total_size; + entry.size_rounded += header_stats.total_size_rounded; + entry.count += header_stats.num_entries; + } } list entries; @@ -6715,6 +7024,81 @@ int RGWRados::remove_temp_objects(string date, string time) return 0; } +void RGWRados::get_bucket_index_objects(const string& bucket_oid_base, + uint32_t num_shards, map& bucket_objects, int shard_id) +{ + if (!num_shards) { + bucket_objects[0] = bucket_oid_base; + } else { + char buf[bucket_oid_base.size() + 32]; + if (shard_id < 0) { + for (uint32_t i = 0; i < num_shards; ++i) { + snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), i); + bucket_objects[i] = buf; + } + } else { + if ((uint32_t)shard_id > num_shards) { + return; + } + snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), shard_id); + bucket_objects[shard_id] = buf; + } + } +} + +void RGWRados::get_bucket_instance_ids(RGWBucketInfo& bucket_info, int shard_id, map *result) +{ + rgw_bucket& bucket = bucket_info.bucket; + string plain_id = bucket.name + ":" + bucket.bucket_id; + if (!bucket_info.num_shards) { + (*result)[0] = plain_id; + } else { + char buf[16]; + if (shard_id < 0) { + for (uint32_t i = 0; i < bucket_info.num_shards; ++i) { + snprintf(buf, sizeof(buf), ":%d", i); + (*result)[i] = plain_id + buf; + } + } else { + if ((uint32_t)shard_id > bucket_info.num_shards) { + return; + } + snprintf(buf, sizeof(buf), ":%d", shard_id); + (*result)[shard_id] = plain_id + buf; + } + } +} + +int RGWRados::get_bucket_index_object(const string& bucket_oid_base, const string& obj_key, + uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj, int *shard_id) +{ + int r = 0; + switch (hash_type) { + case RGWBucketInfo::MOD: + if (!num_shards) { + // By default with no sharding, we use the bucket oid as itself + (*bucket_obj) = bucket_oid_base; + if (shard_id) { + *shard_id = -1; + } + } else { + uint32_t sid = ceph_str_hash_linux(obj_key.c_str(), obj_key.size()); + uint32_t sid2 = sid ^ ((sid & 0xFF) << 24); + sid = sid2 % MAX_BUCKET_INDEX_SHARDS_PRIME % num_shards; + char buf[bucket_oid_base.size() + 32]; + snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), sid); + (*bucket_obj) = buf; + if (shard_id) { + *shard_id = (int)sid; + } + } + break; + default: + r = -ENOTSUP; + } + return r; +} + int RGWRados::process_intent_log(rgw_bucket& bucket, string& oid, time_t epoch, int flags, bool purge) { diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 57338b860163..1071b2f1ae61 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -913,14 +913,24 @@ struct RGWZone { bool log_meta; bool log_data; - RGWZone() : log_meta(false), log_data(false) {} +/** + * Represents the number of shards for the bucket index object, a value of zero + * indicates there is no sharding. By default (no sharding, the name of the object + * is '.dir.{marker}', with sharding, the name is '.dir.{markder}.{sharding_id}', + * sharding_id is zero-based value. It is not recommended to set a too large value + * (e.g. thousand) as it increases the cost for bucket listing. + */ + uint32_t bucket_index_max_shards; + + RGWZone() : log_meta(false), log_data(false), bucket_index_max_shards(0) {} void encode(bufferlist& bl) const { - ENCODE_START(2, 1, bl); + ENCODE_START(3, 1, bl); ::encode(name, bl); ::encode(endpoints, bl); ::encode(log_meta, bl); ::encode(log_data, bl); + ::encode(bucket_index_max_shards, bl); ENCODE_FINISH(bl); } @@ -932,6 +942,9 @@ struct RGWZone { ::decode(log_meta, bl); ::decode(log_data, bl); } + if (struct_v >= 3) { + ::decode(bucket_index_max_shards, bl); + } DECODE_FINISH(bl); } void dump(Formatter *f) const; @@ -1190,21 +1203,13 @@ public: class RGWGetBucketStats_CB : public RefCountedObject { protected: rgw_bucket bucket; - uint64_t bucket_ver; - uint64_t master_ver; map *stats; - string max_marker; public: RGWGetBucketStats_CB(rgw_bucket& _bucket) : bucket(_bucket), stats(NULL) {} virtual ~RGWGetBucketStats_CB() {} virtual void handle_response(int r) = 0; - virtual void set_response(uint64_t _bucket_ver, uint64_t _master_ver, - map *_stats, - const string &_max_marker) { - bucket_ver = _bucket_ver; - master_ver = _master_ver; + virtual void set_response(map *_stats) { stats = _stats; - max_marker = _max_marker; } }; @@ -1261,6 +1266,20 @@ class RGWRados int open_bucket_data_ctx(rgw_bucket& bucket, librados::IoCtx& io_ctx); int open_bucket_data_extra_ctx(rgw_bucket& bucket, librados::IoCtx& io_ctx); int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, string& bucket_oid); + int open_bucket_index_base(rgw_bucket& bucket, librados::IoCtx& index_ctx, + string& bucket_oid_base); + int open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index_ctx, + const string& obj_key, string *bucket_obj, int *shard_id); + int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, + map& bucket_objs, int shard_id = -1, map *bucket_instance_ids = NULL); + template + int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, + map& oids, map& bucket_objs, + int shard_id = -1, map *bucket_instance_ids = NULL); + void build_bucket_index_marker(const string& shard_id_str, const string& shard_marker, + string *marker); + + void get_bucket_instance_ids(RGWBucketInfo& bucket_info, int shard_id, map *result); struct GetObjState { librados::IoCtx io_ctx; @@ -1294,6 +1313,9 @@ class RGWRados Mutex bucket_id_lock; + // This field represents the number of bucket index object shards + uint32_t bucket_index_max_shards; + int get_obj_ioctx(const rgw_obj& obj, librados::IoCtx *ioctx); int get_obj_ref(const rgw_obj& obj, rgw_rados_ref *ref, rgw_bucket *bucket, bool ref_system_obj = false); uint64_t max_bucket_id; @@ -1365,7 +1387,9 @@ public: gc(NULL), use_gc_thread(false), quota_threads(false), num_watchers(0), watchers(NULL), watch_handles(NULL), watch_initialized(false), - bucket_id_lock("rados_bucket_id"), max_bucket_id(0), + bucket_id_lock("rados_bucket_id"), + bucket_index_max_shards(0), + max_bucket_id(0), cct(NULL), rados(NULL), pools_initialized(false), quota_handler(NULL), @@ -1479,7 +1503,7 @@ public: * create a bucket with name bucket and the given list of attrs * returns 0 on success, -ERR# otherwise. */ - virtual int init_bucket_index(rgw_bucket& bucket); + virtual int init_bucket_index(rgw_bucket& bucket, int num_shards); int select_bucket_placement(RGWUserInfo& user_info, const string& region_name, const std::string& rule, const std::string& bucket_name, rgw_bucket& bucket, string *pselected_rule); int select_legacy_bucket_placement(const string& bucket_name, rgw_bucket& bucket); @@ -1811,8 +1835,8 @@ public: } int decode_policy(bufferlist& bl, ACLOwner *owner); - int get_bucket_stats(rgw_bucket& bucket, uint64_t *bucket_ver, uint64_t *master_ver, map& stats, - string *max_marker); + int get_bucket_stats(rgw_bucket& bucket, string *bucket_ver, string *master_ver, + map& stats, string *max_marker); int get_bucket_stats_async(rgw_bucket& bucket, RGWGetBucketStats_CB *cb); int get_user_stats(const string& user, RGWStorageStats& stats); int get_user_stats_async(const string& user, RGWGetUserStats_CB *cb); @@ -1836,39 +1860,50 @@ public: virtual int put_linked_bucket_info(RGWBucketInfo& info, bool exclusive, time_t mtime, obj_version *pep_objv, map *pattrs, bool create_entry_point); + struct BucketShard { + RGWRados *store; + rgw_bucket bucket; + int shard_id; + librados::IoCtx index_ctx; + string bucket_obj; + + BucketShard(RGWRados *_store) : store(_store), shard_id(-1) {} + int init(rgw_bucket& _bucket, rgw_obj& obj); + }; + int cls_rgw_init_index(librados::IoCtx& io_ctx, librados::ObjectWriteOperation& op, string& oid); - int cls_obj_prepare_op(rgw_bucket& bucket, RGWModifyOp op, string& tag, - string& name, string& locator); - int cls_obj_complete_op(rgw_bucket& bucket, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch, + int cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag, string& name, string& locator); + int cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, list *remove_objs); - int cls_obj_complete_add(rgw_bucket& bucket, string& tag, int64_t pool, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, list *remove_objs); - int cls_obj_complete_del(rgw_bucket& bucket, string& tag, int64_t pool, uint64_t epoch, string& name); - int cls_obj_complete_cancel(rgw_bucket& bucket, string& tag, string& name); + int cls_obj_complete_add(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, list *remove_objs); + int cls_obj_complete_del(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, string& name); + int cls_obj_complete_cancel(BucketShard& bs, string& tag, string& name); int cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeout); - int cls_bucket_list(rgw_bucket& bucket, string start, string prefix, uint32_t num, - map& m, bool *is_truncated, - string *last_entry, bool (*force_check_filter)(const string& name) = NULL); - int cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header); - int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx); - int prepare_update_index(RGWObjState *state, rgw_bucket& bucket, + int cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix, uint32_t hint_num, + map& m, bool *is_truncated, string *last_entry, + bool (*force_check_filter)(const string& name) = NULL); + int cls_bucket_head(rgw_bucket& bucket, map& headers, map *bucket_instance_ids = NULL); + int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx, int *num_aio); + + int prepare_update_index(RGWObjState *state, BucketShard& bucket_shard, RGWModifyOp op, rgw_obj& oid, string& tag); - int complete_update_index(rgw_bucket& bucket, string& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size, + int complete_update_index(BucketShard& bucket_shard, rgw_obj& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size, utime_t& ut, string& etag, string& content_type, bufferlist *acl_bl, RGWObjCategory category, list *remove_objs); - int complete_update_index_del(rgw_bucket& bucket, string& oid, string& tag, int64_t pool, uint64_t epoch) { - if (bucket_is_system(bucket)) + int complete_update_index_del(BucketShard& bucket_shard, rgw_obj& oid, string& tag, int64_t pool, uint64_t epoch) { + if (bucket_is_system(bucket_shard.bucket)) return 0; - return cls_obj_complete_del(bucket, tag, pool, epoch, oid); + return cls_obj_complete_del(bucket_shard, tag, pool, epoch, oid.object); } - int complete_update_index_cancel(rgw_bucket& bucket, string& oid, string& tag) { - if (bucket_is_system(bucket)) + int complete_update_index_cancel(BucketShard& bucket_shard, rgw_obj& oid, string& tag) { + if (bucket_is_system(bucket_shard.bucket)) return 0; - return cls_obj_complete_cancel(bucket, tag, oid); + return cls_obj_complete_cancel(bucket_shard, tag, oid.object); } - int list_bi_log_entries(rgw_bucket& bucket, string& marker, uint32_t max, std::list& result, bool *truncated); - int trim_bi_log_entries(rgw_bucket& bucket, string& marker, string& end_marker); + int list_bi_log_entries(rgw_bucket& bucket, int shard_id, string& marker, uint32_t max, std::list& result, bool *truncated); + int trim_bi_log_entries(rgw_bucket& bucket, int shard_id, string& marker, string& end_marker); int cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info); int cls_obj_usage_log_read(string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries, @@ -1877,7 +1912,7 @@ public: void shard_name(const string& prefix, unsigned max_shards, const string& key, string& name); void shard_name(const string& prefix, unsigned max_shards, const string& section, const string& key, string& name); - void time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, string& section, string& key, bufferlist& bl); + void time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, const string& section, const string& key, bufferlist& bl); int time_log_add(const string& oid, list& entries); int time_log_add(const string& oid, const utime_t& ut, const string& section, const string& key, bufferlist& bl); int time_log_list(const string& oid, utime_t& start_time, utime_t& end_time, @@ -1945,6 +1980,32 @@ public: } private: + /** + * This is a helper method, it generates a list of bucket index objects with the given + * bucket base oid and number of shards. + * + * bucket_oid_base [in] - base name of the bucket index object; + * num_shards [in] - number of bucket index object shards. + * bucket_objs [out] - filled by this method, a list of bucket index objects. + */ + void get_bucket_index_objects(const string& bucket_oid_base, uint32_t num_shards, + map& bucket_objs, int shard_id = -1); + + /** + * Get the bucket index object with the given base bucket index object and object key, + * and the number of bucket index shards. + * + * bucket_oid_base [in] - bucket object base name. + * obj_key [in] - object key. + * num_shards [in] - number of bucket index shards. + * hash_type [in] - type of hash to find the shard ID. + * bucket_obj [out] - the bucket index object for the given object. + * + * Return 0 on success, a failure code otherwise. + */ + int get_bucket_index_object(const string& bucket_oid_base, const string& obj_key, + uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj, int *shard); + int process_intent_log(rgw_bucket& bucket, string& oid, time_t epoch, int flags, bool purge); /** diff --git a/src/rgw/rgw_replica_log.cc b/src/rgw/rgw_replica_log.cc index 961abc2e1880..d72b6af23c30 100644 --- a/src/rgw/rgw_replica_log.cc +++ b/src/rgw/rgw_replica_log.cc @@ -14,8 +14,10 @@ #include "rgw_replica_log.h" #include "cls/replica_log/cls_replica_log_client.h" +#include "cls/rgw/cls_rgw_client.h" #include "rgw_rados.h" +#define dout_subsys ceph_subsys_rgw void RGWReplicaBounds::dump(Formatter *f) const { @@ -132,3 +134,50 @@ RGWReplicaBucketLogger::RGWReplicaBucketLogger(RGWRados *_store) : prefix = _store->ctx()->_conf->rgw_replica_log_obj_prefix; prefix.append("."); } + +string RGWReplicaBucketLogger::obj_name(const rgw_bucket& bucket, int shard_id) +{ + string s = prefix + bucket.name; + + if (shard_id >= 0) { + char buf[16]; + snprintf(buf, sizeof(buf), ".%d", shard_id); + s += buf; + } + return s; +} + +int RGWReplicaBucketLogger::update_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id, + const string& marker, const utime_t& time, + const list *entries) +{ + if (shard_id >= 0 || + !BucketIndexShardsManager::is_shards_marker(marker)) { + return RGWReplicaLogger::update_bound(obj_name(bucket, shard_id), pool, + daemon_id, marker, time, entries); + } + + BucketIndexShardsManager sm; + int ret = sm.from_string(marker, shard_id); + if (ret < 0) { + ldout(cct, 0) << "ERROR: could not parse shards marker: " << marker << dendl; + return ret; + } + + map& vals = sm.get(); + + ret = 0; + + map::iterator iter; + for (iter = vals.begin(); iter != vals.end(); ++iter) { + ldout(cct, 20) << "updating bound: bucket=" << bucket << " shard=" << iter->first << " marker=" << marker << dendl; + int r = RGWReplicaLogger::update_bound(obj_name(bucket, iter->first), pool, + daemon_id, iter->second, time, entries); + if (r < 0) { + ldout(cct, 0) << "failed to update bound: bucket=" << bucket << " shard=" << iter->first << " marker=" << marker << dendl; + ret = r; + } + } + + return ret; +} diff --git a/src/rgw/rgw_replica_log.h b/src/rgw/rgw_replica_log.h index 456b230a6520..a9adc9eedbe6 100644 --- a/src/rgw/rgw_replica_log.h +++ b/src/rgw/rgw_replica_log.h @@ -97,20 +97,19 @@ public: class RGWReplicaBucketLogger : private RGWReplicaLogger { string pool; string prefix; + + string obj_name(const rgw_bucket& bucket, int shard_id); public: RGWReplicaBucketLogger(RGWRados *_store); - int update_bound(const rgw_bucket& bucket, const string& daemon_id, + int update_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id, const string& marker, const utime_t& time, - const list *entries) { - return RGWReplicaLogger::update_bound(prefix+bucket.name, pool, - daemon_id, marker, time, entries); - } - int delete_bound(const rgw_bucket& bucket, const string& daemon_id) { - return RGWReplicaLogger::delete_bound(prefix+bucket.name, pool, + const list *entries); + int delete_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id) { + return RGWReplicaLogger::delete_bound(obj_name(bucket, shard_id), pool, daemon_id); } - int get_bounds(const rgw_bucket& bucket, RGWReplicaBounds& bounds) { - return RGWReplicaLogger::get_bounds(prefix+bucket.name, pool, + int get_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds) { + return RGWReplicaLogger::get_bounds(obj_name(bucket, shard_id), pool, bounds); } }; diff --git a/src/rgw/rgw_rest_log.cc b/src/rgw/rgw_rest_log.cc index 9f32fc9ebd3f..1db6cadb0d8e 100644 --- a/src/rgw/rgw_rest_log.cc +++ b/src/rgw/rgw_rest_log.cc @@ -282,6 +282,12 @@ void RGWOp_BILog_List::execute() { return; } + int shard_id; + http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id); + if (http_ret < 0) { + return; + } + if (!bucket_instance.empty()) { http_ret = store->get_bucket_instance_info(NULL, bucket_instance, bucket_info, NULL, NULL); if (http_ret < 0) { @@ -307,7 +313,7 @@ void RGWOp_BILog_List::execute() { send_response(); do { list entries; - int ret = store->list_bi_log_entries(bucket_info.bucket, + int ret = store->list_bi_log_entries(bucket_info.bucket, shard_id, marker, max_entries - count, entries, &truncated); if (ret < 0) { @@ -419,6 +425,13 @@ void RGWOp_BILog_Delete::execute() { http_ret = -EINVAL; return; } + + int shard_id; + http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id); + if (http_ret < 0) { + return; + } + if (!bucket_instance.empty()) { http_ret = store->get_bucket_instance_info(NULL, bucket_instance, bucket_info, NULL, NULL); if (http_ret < 0) { @@ -432,7 +445,7 @@ void RGWOp_BILog_Delete::execute() { return; } } - http_ret = store->trim_bi_log_entries(bucket_info.bucket, start_marker, end_marker); + http_ret = store->trim_bi_log_entries(bucket_info.bucket, shard_id, start_marker, end_marker); if (http_ret < 0) { dout(5) << "ERROR: trim_bi_log_entries() " << dendl; } diff --git a/src/rgw/rgw_rest_log.h b/src/rgw/rgw_rest_log.h index ff1bf3466d37..22221d4078c7 100644 --- a/src/rgw/rgw_rest_log.h +++ b/src/rgw/rgw_rest_log.h @@ -38,11 +38,11 @@ public: }; class RGWOp_BILog_Info : public RGWRESTOp { - uint64_t bucket_ver; - uint64_t master_ver; + string bucket_ver; + string master_ver; string max_marker; public: - RGWOp_BILog_Info() : bucket_ver(0), master_ver(0) {} + RGWOp_BILog_Info() : bucket_ver(), master_ver() {} ~RGWOp_BILog_Info() {} int check_caps(RGWUserCaps& caps) { diff --git a/src/rgw/rgw_rest_replica_log.cc b/src/rgw/rgw_rest_replica_log.cc index e7dd962f0f76..0309a2cabd25 100644 --- a/src/rgw/rgw_rest_replica_log.cc +++ b/src/rgw/rgw_rest_replica_log.cc @@ -181,6 +181,13 @@ void RGWOp_BILog_SetBounds::execute() { return; } + int shard_id; + http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id); + if (http_ret < 0) { + dout(5) << "failed to parse bucket instance" << dendl; + return; + } + rgw_bucket bucket; if ((http_ret = bucket_instance_to_bucket(store, bucket_instance, bucket)) < 0) return; @@ -194,7 +201,7 @@ void RGWOp_BILog_SetBounds::execute() { return; } - http_ret = rl.update_bound(bucket, daemon_id, marker, ut, &markers); + http_ret = rl.update_bound(bucket, shard_id, daemon_id, marker, ut, &markers); } void RGWOp_BILog_GetBounds::execute() { @@ -206,12 +213,19 @@ void RGWOp_BILog_GetBounds::execute() { return; } + int shard_id; + http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id); + if (http_ret < 0) { + dout(5) << "failed to parse bucket instance" << dendl; + return; + } + rgw_bucket bucket; if ((http_ret = bucket_instance_to_bucket(store, bucket_instance, bucket)) < 0) return; RGWReplicaBucketLogger rl(store); - http_ret = rl.get_bounds(bucket, bounds); + http_ret = rl.get_bounds(bucket, shard_id, bounds); } void RGWOp_BILog_GetBounds::send_response() { @@ -237,12 +251,19 @@ void RGWOp_BILog_DeleteBounds::execute() { return; } + int shard_id; + http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id); + if (http_ret < 0) { + dout(5) << "failed to parse bucket instance" << dendl; + return; + } + rgw_bucket bucket; if ((http_ret = bucket_instance_to_bucket(store, bucket_instance, bucket)) < 0) return; RGWReplicaBucketLogger rl(store); - http_ret = rl.delete_bound(bucket, daemon_id); + http_ret = rl.delete_bound(bucket, shard_id, daemon_id); } RGWOp *RGWHandler_ReplicaLog::op_get() { diff --git a/src/rgw/rgw_rest_swift.cc b/src/rgw/rgw_rest_swift.cc index 960cf9c3d02c..20ac37e9990b 100644 --- a/src/rgw/rgw_rest_swift.cc +++ b/src/rgw/rgw_rest_swift.cc @@ -559,7 +559,6 @@ void RGWCopyObj_ObjStore_SWIFT::send_response() int RGWGetObj_ObjStore_SWIFT::send_response_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) { const char *content_type = NULL; - int orig_ret = ret; map response_attrs; map::iterator riter; @@ -601,11 +600,7 @@ int RGWGetObj_ObjStore_SWIFT::send_response_data(bufferlist& bl, off_t bl_ofs, o } } - if (partial_content && !ret) - ret = -STATUS_PARTIAL_CONTENT; - - if (ret) - set_req_state_err(s, ret); + set_req_state_err(s, (partial_content && !ret) ? STATUS_PARTIAL_CONTENT : ret); dump_errno(s); for (riter = response_attrs.begin(); riter != response_attrs.end(); ++riter) { @@ -619,7 +614,7 @@ int RGWGetObj_ObjStore_SWIFT::send_response_data(bufferlist& bl, off_t bl_ofs, o sent_header = true; send_data: - if (get_data && !orig_ret) { + if (get_data && !ret) { int r = s->cio->write(bl.c_str() + bl_ofs, bl_len); if (r < 0) return r; diff --git a/src/test/Makefile.am b/src/test/Makefile.am index ca200a840c35..8e1aeaed2f19 100644 --- a/src/test/Makefile.am +++ b/src/test/Makefile.am @@ -849,8 +849,8 @@ bin_DEBUGPROGRAMS += ceph_test_cls_hello if WITH_RADOSGW ceph_test_cls_rgw_SOURCES = test/cls_rgw/test_cls_rgw.cc ceph_test_cls_rgw_LDADD = \ - $(LIBRADOS) libcls_rgw_client.la \ - $(LIBCOMMON) $(UNITTEST_LDADD) $(RADOS_TEST_LDADD) + $(LIBRADOS) $(CRYPTO_LIBS) libcls_rgw_client.la \ + $(LIBCOMMON) $(UNITTEST_LDADD) $(CEPH_GLOBAL) $(RADOS_TEST_LDADD) ceph_test_cls_rgw_CXXFLAGS = $(UNITTEST_CXXFLAGS) bin_DEBUGPROGRAMS += ceph_test_cls_rgw endif # WITH_RADOSGW diff --git a/src/test/cls_rgw/test_cls_rgw.cc b/src/test/cls_rgw/test_cls_rgw.cc index 44cb30307245..0df7ab2a1874 100644 --- a/src/test/cls_rgw/test_cls_rgw.cc +++ b/src/test/cls_rgw/test_cls_rgw.cc @@ -3,6 +3,7 @@ #include "include/types.h" #include "cls/rgw/cls_rgw_client.h" +#include "cls/rgw/cls_rgw_ops.h" #include "gtest/gtest.h" #include "test/librados/test.h" @@ -10,6 +11,7 @@ #include #include #include +#include using namespace librados; @@ -66,12 +68,20 @@ public: void test_stats(librados::IoCtx& ioctx, string& oid, int category, uint64_t num_entries, uint64_t total_size) { - rgw_bucket_dir_header header; - ASSERT_EQ(0, cls_rgw_get_dir_header(ioctx, oid, &header)); - - rgw_bucket_category_stats& stats = header.stats[category]; - ASSERT_EQ(total_size, stats.total_size); - ASSERT_EQ(num_entries, stats.num_entries); + map results; + map oids; + oids[0] = oid; + ASSERT_EQ(0, CLSRGWIssueGetDirHeader(ioctx, oids, results, 8)()); + + uint64_t entries = 0; + uint64_t size = 0; + map::iterator iter = results.begin(); + for (; iter != results.end(); ++iter) { + entries += (iter->second).dir.header.stats[category].num_entries; + size += (iter->second).dir.header.stats[category].total_size; + } + ASSERT_EQ(total_size, size); + ASSERT_EQ(num_entries, entries); } void index_prepare(OpMgr& mgr, librados::IoCtx& ioctx, string& oid, RGWModifyOp index_op, string& tag, string& obj, string& loc) @@ -339,9 +349,10 @@ TEST(cls_rgw, index_suggest) cls_rgw_encode_suggestion(suggest_op, dirent, updates); } - op = mgr.write_op(); - cls_rgw_bucket_set_tag_timeout(*op, 1); // short tag timeout - ASSERT_EQ(0, ioctx.operate(bucket_oid, op)); + map bucket_objs; + bucket_objs[0] = bucket_oid; + int r = CLSRGWIssueSetTagTimeout(ioctx, bucket_objs, 8 /* max aio */, 1)(); + ASSERT_EQ(0, r); sleep(1);