From 6613358ddc5339c8e33c409387fd6044db0b6f26 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Mon, 19 Jan 2015 09:26:00 -0800 Subject: [PATCH] Revert "Merge remote-tracking branch 'origin/wip-bi-sharding-3' into next" This reverts commit f79d8f24e9c0bf0d0b37270eba2745a878f2caed, reversing changes made to 896c8899ac28eb0403bfaa20454f3756f3705c51. --- src/cls/rgw/cls_rgw.cc | 10 + src/cls/rgw/cls_rgw_client.cc | 304 +++++------- src/cls/rgw/cls_rgw_client.h | 407 +--------------- src/common/config_opts.h | 16 - src/rgw/rgw_admin.cc | 14 +- src/rgw/rgw_bucket.cc | 110 ++--- src/rgw/rgw_bucket.h | 16 +- src/rgw/rgw_common.cc | 2 - src/rgw/rgw_common.h | 55 +-- src/rgw/rgw_json_enc.cc | 8 - src/rgw/rgw_op.cc | 15 +- src/rgw/rgw_quota.cc | 4 +- src/rgw/rgw_rados.cc | 796 ++++++++----------------------- src/rgw/rgw_rados.h | 135 ++---- src/rgw/rgw_replica_log.cc | 49 -- src/rgw/rgw_replica_log.h | 17 +- src/rgw/rgw_rest_log.cc | 17 +- src/rgw/rgw_rest_log.h | 6 +- src/rgw/rgw_rest_replica_log.cc | 27 +- src/rgw/rgw_rest_swift.cc | 9 +- src/test/Makefile.am | 4 +- src/test/cls_rgw/test_cls_rgw.cc | 29 +- 22 files changed, 478 insertions(+), 1572 deletions(-) diff --git a/src/cls/rgw/cls_rgw.cc b/src/cls/rgw/cls_rgw.cc index 6198d62810bac..eb4a4232d1892 100644 --- a/src/cls/rgw/cls_rgw.cc +++ b/src/cls/rgw/cls_rgw.cc @@ -814,6 +814,8 @@ static int bi_log_iterate_entries(cls_method_context_t hctx, const string& marke map keys; string filter_prefix, end_key; + bufferlist start_bl; + bool start_key_added = false; uint32_t i = 0; string key; @@ -827,6 +829,10 @@ static int bi_log_iterate_entries(cls_method_context_t hctx, const string& marke key.append(marker); start_key = key; + int ret = cls_cxx_map_get_val(hctx, start_key, &start_bl); + if ((ret < 0) && (ret != -ENOENT)) { + return ret; + } } else { start_key = key_iter; } @@ -850,6 +856,10 @@ static int bi_log_iterate_entries(cls_method_context_t hctx, const string& marke if (ret < 0) return ret; + if ((start_bl.length() > 0) && (!start_key_added)) { + keys[start_key] = start_bl; + start_key_added = true; + } map::iterator iter = keys.begin(); if (iter == keys.end()) break; diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index 545b36bcff569..c13c1a1559c62 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -11,131 +11,19 @@ using namespace librados; -const string BucketIndexShardsManager::KEY_VALUE_SEPARATOR = "#"; -const string BucketIndexShardsManager::SHARDS_SEPARATOR = ","; - -/** - * This class represents the bucket index object operation callback context. - */ -template -class ClsBucketIndexOpCtx : public ObjectOperationCompletion { -private: - T *data; - int *ret_code; -public: - ClsBucketIndexOpCtx(T* _data, int *_ret_code) : data(_data), ret_code(_ret_code) { assert(data); } - ~ClsBucketIndexOpCtx() {} - void handle_completion(int r, bufferlist& outbl) { - if (r >= 0) { - try { - bufferlist::iterator iter = outbl.begin(); - ::decode((*data), iter); - } catch (buffer::error& err) { - r = -EIO; - } - } - if (ret_code) { - *ret_code = r; - } - } -}; - -void BucketIndexAioManager::do_completion(int id) { - Mutex::Locker l(lock); - - map::iterator iter = pendings.find(id); - assert(iter != pendings.end()); - completions[id] = iter->second; - pendings.erase(iter); - - // If the caller needs a list of finished objects, store them - // for further processing - map::iterator miter = pending_objs.find(id); - if (miter != pending_objs.end()) { - completion_objs[id] = miter->second; - pending_objs.erase(miter); - } - - cond.Signal(); -} - -bool BucketIndexAioManager::wait_for_completions(int valid_ret_code, - int *num_completions, int *ret_code, map *objs) { - lock.Lock(); - if (pendings.empty() && completions.empty()) { - lock.Unlock(); - return false; - } - - if (completions.empty()) { - // Wait for AIO completion - cond.Wait(lock); - } - - // Clear the completed AIOs - map::iterator iter = completions.begin(); - for (; iter != completions.end(); ++iter) { - int r = iter->second->get_return_value(); - if (objs && r == 0) { /* update list of successfully completed objs */ - map::iterator liter = completion_objs.find(iter->first); - if (liter != completion_objs.end()) { - (*objs)[liter->first] = liter->second; - } - } - if (ret_code && (r < 0 && r != valid_ret_code)) - (*ret_code) = r; - iter->second->release(); - } - if (num_completions) - (*num_completions) = completions.size(); - completions.clear(); - lock.Unlock(); - - return true; -} - void cls_rgw_bucket_init(ObjectWriteOperation& o) { bufferlist in; o.exec("rgw", "bucket_init_index", in); } -static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx, - const string& oid, BucketIndexAioManager *manager) { - bufferlist in; - librados::ObjectWriteOperation op; - op.create(true); - op.exec("rgw", "bucket_init_index", in); - return manager->aio_operate(io_ctx, oid, &op); -} - -static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx, - const string& oid, uint64_t timeout, BucketIndexAioManager *manager) { +void cls_rgw_bucket_set_tag_timeout(ObjectWriteOperation& o, uint64_t tag_timeout) +{ bufferlist in; struct rgw_cls_tag_timeout_op call; - call.tag_timeout = timeout; + call.tag_timeout = tag_timeout; ::encode(call, in); - ObjectWriteOperation op; - op.exec("rgw", "bucket_set_tag_timeout", in); - return manager->aio_operate(io_ctx, oid, &op); -} - -int CLSRGWIssueBucketIndexInit::issue_op(int shard_id, const string& oid) -{ - return issue_bucket_index_init_op(io_ctx, oid, &manager); -} - -void CLSRGWIssueBucketIndexInit::cleanup() -{ - // Do best effort removal - for (map::iterator citer = objs_container.begin(); citer != iter; ++citer) { - io_ctx.remove(citer->second); - } -} - -int CLSRGWIssueSetTagTimeout::issue_op(int shard_id, const string& oid) -{ - return issue_bucket_set_tag_timeout_op(io_ctx, oid, tag_timeout, &manager); + o.exec("rgw", "bucket_set_tag_timeout", in); } void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag, @@ -171,89 +59,70 @@ void cls_rgw_bucket_complete_op(ObjectWriteOperation& o, RGWModifyOp op, string& o.exec("rgw", "bucket_complete_op", in); } -static bool issue_bucket_list_op(librados::IoCtx& io_ctx, - const string& oid, const string& start_obj, const string& filter_prefix, - uint32_t num_entries, BucketIndexAioManager *manager, - struct rgw_cls_list_ret *pdata) { - bufferlist in; + +int cls_rgw_list_op(IoCtx& io_ctx, string& oid, string& start_obj, + string& filter_prefix, uint32_t num_entries, + rgw_bucket_dir *dir, bool *is_truncated) +{ + bufferlist in, out; struct rgw_cls_list_op call; call.start_obj = start_obj; call.filter_prefix = filter_prefix; call.num_entries = num_entries; ::encode(call, in); + int r = io_ctx.exec(oid, "rgw", "bucket_list", in, out); + if (r < 0) + return r; - librados::ObjectReadOperation op; - op.exec("rgw", "bucket_list", in, new ClsBucketIndexOpCtx(pdata, NULL)); - return manager->aio_operate(io_ctx, oid, &op); -} - -int CLSRGWIssueBucketList::issue_op(int shard_id, const string& oid) -{ - return issue_bucket_list_op(io_ctx, oid, start_obj, filter_prefix, num_entries, &manager, &result[shard_id]); -} + struct rgw_cls_list_ret ret; + try { + bufferlist::iterator iter = out.begin(); + ::decode(ret, iter); + } catch (buffer::error& err) { + return -EIO; + } -static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, const string& oid, int shard_id, - BucketIndexShardsManager& marker_mgr, uint32_t max, BucketIndexAioManager *manager, - struct cls_rgw_bi_log_list_ret *pdata) { - bufferlist in; - cls_rgw_bi_log_list_op call; - call.marker = marker_mgr.get(shard_id, ""); - call.max = max; - ::encode(call, in); + if (dir) + *dir = ret.dir; + if (is_truncated) + *is_truncated = ret.is_truncated; - librados::ObjectReadOperation op; - op.exec("rgw", "bi_log_list", in, new ClsBucketIndexOpCtx(pdata, NULL)); - return manager->aio_operate(io_ctx, oid, &op); + return r; } -int CLSRGWIssueBILogList::issue_op(int shard_id, const string& oid) +int cls_rgw_bucket_check_index_op(IoCtx& io_ctx, string& oid, + rgw_bucket_dir_header *existing_header, + rgw_bucket_dir_header *calculated_header) { - return issue_bi_log_list_op(io_ctx, oid, shard_id, marker_mgr, max, &manager, &result[shard_id]); -} + bufferlist in, out; + int r = io_ctx.exec(oid, "rgw", "bucket_check_index", in, out); + if (r < 0) + return r; -static bool issue_bi_log_trim(librados::IoCtx& io_ctx, const string& oid, int shard_id, - BucketIndexShardsManager& start_marker_mgr, - BucketIndexShardsManager& end_marker_mgr, BucketIndexAioManager *manager) { - bufferlist in; - cls_rgw_bi_log_trim_op call; - call.start_marker = start_marker_mgr.get(shard_id, ""); - call.end_marker = end_marker_mgr.get(shard_id, ""); - ::encode(call, in); - ObjectWriteOperation op; - op.exec("rgw", "bi_log_trim", in); - return manager->aio_operate(io_ctx, oid, &op); -} + struct rgw_cls_check_index_ret ret; + try { + bufferlist::iterator iter = out.begin(); + ::decode(ret, iter); + } catch (buffer::error& err) { + return -EIO; + } -int CLSRGWIssueBILogTrim::issue_op(int shard_id, const string& oid) -{ - return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager); -} + if (existing_header) + *existing_header = ret.existing_header; + if (calculated_header) + *calculated_header = ret.calculated_header; -static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager, - struct rgw_cls_check_index_ret *pdata) { - bufferlist in; - librados::ObjectReadOperation op; - op.exec("rgw", "bucket_check_index", in, new ClsBucketIndexOpCtx( - pdata, NULL)); - return manager->aio_operate(io_ctx, oid, &op); + return 0; } -int CLSRGWIssueBucketCheck::issue_op(int shard_id, const string& oid) +int cls_rgw_bucket_rebuild_index_op(IoCtx& io_ctx, string& oid) { - return issue_bucket_check_index_op(io_ctx, oid, &manager, &result[shard_id]); -} - -static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid, - BucketIndexAioManager *manager) { - bufferlist in; - librados::ObjectWriteOperation op; - op.exec("rgw", "bucket_rebuild_index", in); - return manager->aio_operate(io_ctx, oid, &op); -} + bufferlist in, out; + int r = io_ctx.exec(oid, "rgw", "bucket_rebuild_index", in, out); + if (r < 0) + return r; -int CLSRGWIssueBucketRebuild::issue_op(int shard_id, const string& oid) -{ - return issue_bucket_rebuild_index_op(io_ctx, oid, &manager); + return 0; } void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates) @@ -267,9 +136,28 @@ void cls_rgw_suggest_changes(ObjectWriteOperation& o, bufferlist& updates) o.exec("rgw", "dir_suggest_changes", updates); } -int CLSRGWIssueGetDirHeader::issue_op(int shard_id, const string& oid) +int cls_rgw_get_dir_header(IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header) { - return issue_bucket_list_op(io_ctx, oid, "", "", 0, &manager, &result[shard_id]); + bufferlist in, out; + struct rgw_cls_list_op call; + call.num_entries = 0; + ::encode(call, in); + int r = io_ctx.exec(oid, "rgw", "bucket_list", in, out); + if (r < 0) + return r; + + struct rgw_cls_list_ret ret; + try { + bufferlist::iterator iter = out.begin(); + ::decode(ret, iter); + } catch (buffer::error& err) { + return -EIO; + } + + if (header) + *header = ret.dir.header; + + return r; } class GetDirHeaderCompletion : public ObjectOperationCompletion { @@ -310,6 +198,56 @@ int cls_rgw_get_dir_header_async(IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB return 0; } +int cls_rgw_bi_log_list(IoCtx& io_ctx, string& oid, string& marker, uint32_t max, + list& entries, bool *truncated) +{ + bufferlist in, out; + cls_rgw_bi_log_list_op call; + call.marker = marker; + call.max = max; + ::encode(call, in); + int r = io_ctx.exec(oid, "rgw", "bi_log_list", in, out); + if (r < 0) + return r; + + cls_rgw_bi_log_list_ret ret; + try { + bufferlist::iterator iter = out.begin(); + ::decode(ret, iter); + } catch (buffer::error& err) { + return -EIO; + } + + entries = ret.entries; + + if (truncated) + *truncated = ret.truncated; + + return r; +} + +int cls_rgw_bi_log_trim(IoCtx& io_ctx, string& oid, string& start_marker, string& end_marker) +{ + do { + int r; + bufferlist in, out; + cls_rgw_bi_log_trim_op call; + call.start_marker = start_marker; + call.end_marker = end_marker; + ::encode(call, in); + r = io_ctx.exec(oid, "rgw", "bi_log_trim", in, out); + + if (r == -ENODATA) + break; + + if (r < 0) + return r; + + } while (1); + + return 0; +} + int cls_rgw_usage_log_read(IoCtx& io_ctx, string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries, string& read_iter, map& usage, diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index 79de35825eff1..c6b5b757fa843 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -2,305 +2,20 @@ #define CEPH_CLS_RGW_CLIENT_H #include "include/types.h" -#include "include/str_list.h" #include "include/rados/librados.hpp" #include "cls_rgw_types.h" -#include "cls_rgw_ops.h" #include "common/RefCountedObj.h" -// Forward declaration -class BucketIndexAioManager; - -/* - * Bucket index AIO request argument, this is used to pass a argument - * to callback. - */ -struct BucketIndexAioArg : public RefCountedObject { - BucketIndexAioArg(int _id, BucketIndexAioManager* _manager) : - id(_id), manager(_manager) {} - int id; - BucketIndexAioManager* manager; -}; - -/* - * This class manages AIO completions. This class is not completely thread-safe, - * methods like *get_next* is not thread-safe and is expected to be called from - * within one thread. - */ -class BucketIndexAioManager { -private: - map pendings; - map completions; - map pending_objs; - map completion_objs; - int next; - Mutex lock; - Cond cond; - /* - * Callback implementation for AIO request. - */ - static void bucket_index_op_completion_cb(void* cb, void* arg) { - BucketIndexAioArg* cb_arg = (BucketIndexAioArg*) arg; - cb_arg->manager->do_completion(cb_arg->id); - cb_arg->put(); - } - - /* - * Get next request ID. This method is not thread-safe. - * - * Return next request ID. - */ - int get_next() { return next++; } - - /* - * Add a new pending AIO completion instance. - * - * @param id - the request ID. - * @param completion - the AIO completion instance. - * @param oid - the object id associated with the object, if it is NULL, we don't - * track the object id per callback. - */ - void add_pending(int id, librados::AioCompletion* completion, const string& oid) { - pendings[id] = completion; - pending_objs[id] = oid; - } -public: - /* - * Create a new instance. - */ - BucketIndexAioManager() : next(0), lock("BucketIndexAioManager::lock") {} - - - /* - * Do completion for the given AIO request. - */ - void do_completion(int id); - - /* - * Wait for AIO completions. - * - * valid_ret_code - valid AIO return code. - * num_completions - number of completions. - * ret_code - return code of failed AIO. - * objs - a list of objects that has been finished the AIO. - * - * Return false if there is no pending AIO, true otherwise. - */ - bool wait_for_completions(int valid_ret_code, int *num_completions, int *ret_code, - map *objs); - - /** - * Do aio read operation. - */ - bool aio_operate(librados::IoCtx& io_ctx, const string& oid, librados::ObjectReadOperation *op) { - Mutex::Locker l(lock); - BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this); - librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb); - int r = io_ctx.aio_operate(oid, c, (librados::ObjectReadOperation*)op, NULL); - if (r >= 0) { - add_pending(arg->id, c, oid); - } - return r; - } - - /** - * Do aio write operation. - */ - bool aio_operate(librados::IoCtx& io_ctx, const string& oid, librados::ObjectWriteOperation *op) { - Mutex::Locker l(lock); - BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this); - librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb); - int r = io_ctx.aio_operate(oid, c, (librados::ObjectWriteOperation*)op); - if (r >= 0) { - add_pending(arg->id, c, oid); - } - return r; - } -}; - class RGWGetDirHeader_CB : public RefCountedObject { public: virtual ~RGWGetDirHeader_CB() {} virtual void handle_response(int r, rgw_bucket_dir_header& header) = 0; }; -class BucketIndexShardsManager { -private: - // Per shard setting manager, for example, marker. - map value_by_shards; -public: - const static string KEY_VALUE_SEPARATOR; - const static string SHARDS_SEPARATOR; - - void add(int shard, const string& value) { - value_by_shards[shard] = value; - } - - const string& get(int shard, const string& default_value) { - map::iterator iter = value_by_shards.find(shard); - return (iter == value_by_shards.end() ? default_value : iter->second); - } - - map& get() { - return value_by_shards; - } - - bool empty() { - return value_by_shards.empty(); - } - - void to_string(string *out) const { - if (!out) { - return; - } - out->clear(); - map::const_iterator iter = value_by_shards.begin(); - for (; iter != value_by_shards.end(); ++iter) { - if (out->length()) { - // Not the first item, append a separator first - out->append(SHARDS_SEPARATOR); - } - char buf[16]; - snprintf(buf, sizeof(buf), "%d", iter->first); - out->append(buf); - out->append(KEY_VALUE_SEPARATOR); - out->append(iter->second); - } - } - - static bool is_shards_marker(const string& marker) { - return marker.find(KEY_VALUE_SEPARATOR) != string::npos; - } - - /* - * convert from string. There are two options of how the string looks like: - * - * 1. Single shard, no shard id specified, e.g. 000001.23.1 - * - * for this case, if passed shard_id >= 0, use this shard id, otherwise assume that it's a - * bucket with no shards. - * - * 2. One or more shards, shard id specified for each shard, e.g., 0#00002.12,1#00003.23.2 - * - */ - int from_string(const string& composed_marker, int shard_id) { - value_by_shards.clear(); - vector shards; - get_str_vec(composed_marker, SHARDS_SEPARATOR.c_str(), shards); - if (shards.size() > 1 && shard_id >= 0) { - return -EINVAL; - } - vector::const_iterator iter = shards.begin(); - for (; iter != shards.end(); ++iter) { - size_t pos = iter->find(KEY_VALUE_SEPARATOR); - if (pos == string::npos) { - if (!value_by_shards.empty()) { - return -EINVAL; - } - if (shard_id < 0) { - add(0, *iter); - } else { - add(shard_id, *iter); - } - return 0; - } - string shard_str = iter->substr(0, pos); - string err; - int shard = (int)strict_strtol(shard_str.c_str(), 10, &err); - if (!err.empty()) { - return -EINVAL; - } - add(shard, iter->substr(pos + 1)); - } - return 0; - } -}; - /* bucket index */ void cls_rgw_bucket_init(librados::ObjectWriteOperation& o); -class CLSRGWConcurrentIO { -protected: - librados::IoCtx& io_ctx; - map& objs_container; - map::iterator iter; - uint32_t max_aio; - BucketIndexAioManager manager; - - virtual int issue_op(int shard_id, const string& oid) = 0; - - virtual void cleanup() {} - virtual int valid_ret_code() { return 0; } - // Return true if multiple rounds of OPs might be needed, this happens when - // OP needs to be re-send until a certain code is returned. - virtual bool need_multiple_rounds() { return false; } - // Add a new object to the end of the container. - virtual void add_object(int shard, const string& oid) {} - virtual void reset_container(map& objs) {} - -public: - CLSRGWConcurrentIO(librados::IoCtx& ioc, map& _objs_container, - uint32_t _max_aio) : io_ctx(ioc), objs_container(_objs_container), max_aio(_max_aio) {} - virtual ~CLSRGWConcurrentIO() {} - - int operator()() { - int ret = 0; - iter = objs_container.begin(); - for (; iter != objs_container.end() && max_aio-- > 0; ++iter) { - ret = issue_op(iter->first, iter->second); - if (ret < 0) - break; - } - - int num_completions, r = 0; - map objs; - map *pobjs = (need_multiple_rounds() ? &objs : NULL); - while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r, pobjs)) { - if (r >= 0 && ret >= 0) { - for(int i = 0; i < num_completions && iter != objs_container.end(); ++i, ++iter) { - int issue_ret = issue_op(iter->first, iter->second); - if(issue_ret < 0) { - ret = issue_ret; - break; - } - } - } else if (ret >= 0) { - ret = r; - } - if (need_multiple_rounds() && iter == objs_container.end() && !objs.empty()) { - // For those objects which need another round, use them to reset - // the container - reset_container(objs); - } - } - - if (ret < 0) { - cleanup(); - } - return ret; - } -}; - -class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO { -protected: - int issue_op(int shard_id, const string& oid); - int valid_ret_code() { return -EEXIST; } - void cleanup(); -public: - CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc, map& _bucket_objs, - uint32_t _max_aio) : - CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {} -}; - -class CLSRGWIssueSetTagTimeout : public CLSRGWConcurrentIO { - uint64_t tag_timeout; -protected: - int issue_op(int shard_id, const string& oid); -public: - CLSRGWIssueSetTagTimeout(librados::IoCtx& ioc, map& _bucket_objs, - uint32_t _max_aio, uint64_t _tag_timeout) : - CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio), tag_timeout(_tag_timeout) {} -}; +void cls_rgw_bucket_set_tag_timeout(librados::ObjectWriteOperation& o, uint64_t tag_timeout); void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag, string& name, string& locator, bool log_op); @@ -309,118 +24,28 @@ void cls_rgw_bucket_complete_op(librados::ObjectWriteOperation& o, RGWModifyOp o rgw_bucket_entry_ver& ver, string& name, rgw_bucket_dir_entry_meta& dir_meta, list *remove_objs, bool log_op); -/** - * List the bucket with the starting object and filter prefix. - * NOTE: this method do listing requests for each bucket index shards identified by - * the keys of the *list_results* map, which means the map should be popludated - * by the caller to fill with each bucket index object id. - * - * io_ctx - IO context for rados. - * start_obj - marker for the listing. - * filter_prefix - filter prefix. - * num_entries - number of entries to request for each object (note the total - * amount of entries returned depends on the number of shardings). - * list_results - the list results keyed by bucket index object id. - * max_aio - the maximum number of AIO (for throttling). - * - * Return 0 on success, a failure code otherwise. -*/ - -class CLSRGWIssueBucketList : public CLSRGWConcurrentIO { - string start_obj; - string filter_prefix; - uint32_t num_entries; - map& result; -protected: - int issue_op(int shard_id, const string& oid); -public: - CLSRGWIssueBucketList(librados::IoCtx& io_ctx, const string& _start_obj, - const string& _filter_prefix, uint32_t _num_entries, - map& oids, - map& list_results, - uint32_t max_aio) : - CLSRGWConcurrentIO(io_ctx, oids, max_aio), - start_obj(_start_obj), filter_prefix(_filter_prefix), num_entries(_num_entries), result(list_results) {} -}; - -class CLSRGWIssueBILogList : public CLSRGWConcurrentIO { - map& result; - BucketIndexShardsManager& marker_mgr; - uint32_t max; -protected: - int issue_op(int shard_id, const string& oid); -public: - CLSRGWIssueBILogList(librados::IoCtx& io_ctx, BucketIndexShardsManager& _marker_mgr, uint32_t _max, - map& oids, - map& bi_log_lists, uint32_t max_aio) : - CLSRGWConcurrentIO(io_ctx, oids, max_aio), result(bi_log_lists), - marker_mgr(_marker_mgr), max(_max) {} -}; - -class CLSRGWIssueBILogTrim : public CLSRGWConcurrentIO { - BucketIndexShardsManager& start_marker_mgr; - BucketIndexShardsManager& end_marker_mgr; -protected: - int issue_op(int shard_id, const string& oid); - // Trim until -ENODATA is returned. - int valid_ret_code() { return -ENODATA; } - bool need_multiple_rounds() { return true; } - void add_object(int shard, const string& oid) { objs_container[shard] = oid; } - void reset_container(map& objs) { - objs_container.swap(objs); - iter = objs_container.begin(); - objs.clear(); - } -public: - CLSRGWIssueBILogTrim(librados::IoCtx& io_ctx, BucketIndexShardsManager& _start_marker_mgr, - BucketIndexShardsManager& _end_marker_mgr, map& _bucket_objs, uint32_t max_aio) : - CLSRGWConcurrentIO(io_ctx, _bucket_objs, max_aio), - start_marker_mgr(_start_marker_mgr), end_marker_mgr(_end_marker_mgr) {} -}; - -/** - * Check the bucket index. - * - * io_ctx - IO context for rados. - * bucket_objs_ret - check result for all shards. - * max_aio - the maximum number of AIO (for throttling). - * - * Return 0 on success, a failure code otherwise. - */ -class CLSRGWIssueBucketCheck : public CLSRGWConcurrentIO /* >*/ { - map& result; -protected: - int issue_op(int shard_id, const string& oid); -public: - CLSRGWIssueBucketCheck(librados::IoCtx& ioc, map& oids, map& bucket_objs_ret, - uint32_t _max_aio) : - CLSRGWConcurrentIO(ioc, oids, _max_aio), result(bucket_objs_ret) {} -}; - -class CLSRGWIssueBucketRebuild : public CLSRGWConcurrentIO { -protected: - int issue_op(int shard_id, const string& oid); -public: - CLSRGWIssueBucketRebuild(librados::IoCtx& io_ctx, map& bucket_objs, - uint32_t max_aio) : CLSRGWConcurrentIO(io_ctx, bucket_objs, max_aio) {} -}; - -class CLSRGWIssueGetDirHeader : public CLSRGWConcurrentIO { - map& result; -protected: - int issue_op(int shard_id, const string& oid); -public: - CLSRGWIssueGetDirHeader(librados::IoCtx& io_ctx, map& oids, map& dir_headers, - uint32_t max_aio) : - CLSRGWConcurrentIO(io_ctx, oids, max_aio), result(dir_headers) {} -}; +int cls_rgw_list_op(librados::IoCtx& io_ctx, string& oid, string& start_obj, + string& filter_prefix, uint32_t num_entries, + rgw_bucket_dir *dir, bool *is_truncated); +int cls_rgw_bucket_check_index_op(librados::IoCtx& io_ctx, string& oid, + rgw_bucket_dir_header *existing_header, + rgw_bucket_dir_header *calculated_header); +int cls_rgw_bucket_rebuild_index_op(librados::IoCtx& io_ctx, string& oid); + +int cls_rgw_get_dir_header(librados::IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header); int cls_rgw_get_dir_header_async(librados::IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx); void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates); void cls_rgw_suggest_changes(librados::ObjectWriteOperation& o, bufferlist& updates); +/* bucket index log */ + +int cls_rgw_bi_log_list(librados::IoCtx& io_ctx, string& oid, string& marker, uint32_t max, + list& entries, bool *truncated); +int cls_rgw_bi_log_trim(librados::IoCtx& io_ctx, string& oid, string& start_marker, string& end_marker); + /* usage logging */ int cls_rgw_usage_log_read(librados::IoCtx& io_ctx, string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries, diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 03a3d71d63608..08739c417205e 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -853,22 +853,6 @@ OPTION(nss_db_path, OPT_STR, "") // path to nss db OPTION(rgw_max_chunk_size, OPT_INT, 512 * 1024) -/** - * override max bucket index shards in zone configuration (if not zero) - * - * Represents the number of shards for the bucket index object, a value of zero - * indicates there is no sharding. By default (no sharding, the name of the object - * is '.dir.{marker}', with sharding, the name is '.dir.{markder}.{sharding_id}', - * sharding_id is zero-based value. It is not recommended to set a too large value - * (e.g. thousand) as it increases the cost for bucket listing. - */ -OPTION(rgw_override_bucket_index_max_shards, OPT_U32, 0) - -/** - * Represents the maximum AIO pending requests for the bucket index object shards. - */ -OPTION(rgw_bucket_index_max_aio, OPT_U32, 8) - OPTION(rgw_data, OPT_STR, "/var/lib/ceph/radosgw/$cluster-$id") OPTION(rgw_enable_apis, OPT_STR, "s3, swift, swift_auth, admin") OPTION(rgw_cache_enabled, OPT_BOOL, true) // rgw cache enabled diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 03b51a59685ee..2c775ca1e25f2 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -519,7 +519,7 @@ int bucket_stats(rgw_bucket& bucket, Formatter *formatter) return r; map stats; - string bucket_ver, master_ver; + uint64_t bucket_ver, master_ver; string max_marker; int ret = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, stats, &max_marker); if (ret < 0) { @@ -535,8 +535,8 @@ int bucket_stats(rgw_bucket& bucket, Formatter *formatter) formatter->dump_string("marker", bucket.marker); formatter->dump_string("owner", bucket_info.owner); formatter->dump_int("mtime", mtime); - formatter->dump_string("ver", bucket_ver); - formatter->dump_string("master_ver", master_ver); + formatter->dump_int("ver", bucket_ver); + formatter->dump_int("master_ver", master_ver); formatter->dump_string("max_marker", max_marker); dump_bucket_usage(stats, formatter); formatter->close_section(); @@ -2350,7 +2350,7 @@ next: do { list entries; - ret = store->list_bi_log_entries(bucket, shard_id, marker, max_entries - count, entries, &truncated); + ret = store->list_bi_log_entries(bucket, marker, max_entries - count, entries, &truncated); if (ret < 0) { cerr << "ERROR: list_bi_log_entries(): " << cpp_strerror(-ret) << std::endl; return -ret; @@ -2382,7 +2382,7 @@ next: cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl; return -ret; } - ret = store->trim_bi_log_entries(bucket, shard_id, start_marker, end_marker); + ret = store->trim_bi_log_entries(bucket, start_marker, end_marker); if (ret < 0) { cerr << "ERROR: trim_bi_log_entries(): " << cpp_strerror(-ret) << std::endl; return -ret; @@ -2565,7 +2565,7 @@ next: } RGWReplicaBucketLogger logger(store); - ret = logger.get_bounds(bucket, shard_id, bounds); + ret = logger.get_bounds(bucket, bounds); if (ret < 0) return -ret; } else { // shouldn't get here @@ -2616,7 +2616,7 @@ next: } RGWReplicaBucketLogger logger(store); - ret = logger.delete_bound(bucket, shard_id, daemon_id); + ret = logger.delete_bound(bucket, daemon_id); if (ret < 0) return -ret; } diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index 48abc4d72e66f..4afe1ae10192a 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -233,32 +233,6 @@ int rgw_bucket_instance_remove_entry(RGWRados *store, string& entry, RGWObjVersi return store->meta_mgr->remove_entry(bucket_instance_meta_handler, entry, objv_tracker); } -int rgw_bucket_parse_bucket_instance(const string& bucket_instance, string *target_bucket_instance, int *shard_id) -{ - ssize_t pos = bucket_instance.rfind(':'); - if (pos < 0) { - return -EINVAL; - } - - string first = bucket_instance.substr(0, pos); - string second = bucket_instance.substr(pos + 1); - - if (first.find(':') == string::npos) { - *shard_id = -1; - *target_bucket_instance = bucket_instance; - return 0; - } - - *target_bucket_instance = first; - string err; - *shard_id = strict_strtol(second.c_str(), 10, &err); - if (!err.empty()) { - return -EINVAL; - } - - return 0; -} - int rgw_bucket_set_attrs(RGWRados *store, RGWBucketInfo& bucket_info, map& attrs, map* rmattrs, @@ -384,7 +358,7 @@ int rgw_remove_bucket(RGWRados *store, const string& bucket_owner, rgw_bucket& b RGWBucketInfo info; bufferlist bl; - string bucket_ver, master_ver; + uint64_t bucket_ver, master_ver; ret = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, stats, NULL); if (ret < 0) @@ -755,9 +729,9 @@ int RGWBucket::check_object_index(RGWBucketAdminOpState& op_state, while (is_truncated) { map result; - int r = store->cls_bucket_list(bucket, marker, prefix, 1000, - result, &is_truncated, &marker, - bucket_object_check_filter); + int r = store->cls_bucket_list(bucket, marker, prefix, 1000, result, + &is_truncated, &marker, + bucket_object_check_filter); if (r == -ENOENT) { break; @@ -983,7 +957,7 @@ static int bucket_stats(RGWRados *store, std::string& bucket_name, Formatter *f bucket = bucket_info.bucket; - string bucket_ver, master_ver; + uint64_t bucket_ver, master_ver; string max_marker; int ret = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, stats, &max_marker); if (ret < 0) { @@ -998,8 +972,8 @@ static int bucket_stats(RGWRados *store, std::string& bucket_name, Formatter *f formatter->dump_string("id", bucket.bucket_id); formatter->dump_string("marker", bucket.marker); formatter->dump_string("owner", bucket_info.owner); - formatter->dump_string("ver", bucket_ver); - formatter->dump_string("master_ver", master_ver); + formatter->dump_int("ver", bucket_ver); + formatter->dump_int("master_ver", master_ver); formatter->dump_int("mtime", mtime); formatter->dump_string("max_marker", max_marker); dump_bucket_usage(stats, formatter); @@ -1102,10 +1076,9 @@ void rgw_data_change::dump(Formatter *f) const } -int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) { - const string& name = bs.bucket.name; - int shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0); - uint32_t r = (ceph_str_hash_linux(name.c_str(), name.size()) + shard_shift) % num_shards; +int RGWDataChangesLog::choose_oid(rgw_bucket& bucket) { + string& name = bucket.name; + uint32_t r = ceph_str_hash_linux(name.c_str(), name.size()) % num_shards; return (int)r; } @@ -1117,22 +1090,19 @@ int RGWDataChangesLog::renew_entries() /* we can't keep the bucket name as part of the cls_log_entry, and we need * it later, so we keep two lists under the map */ - map, list > > m; + map, list > > m; lock.Lock(); - map entries; + map entries; entries.swap(cur_cycle); lock.Unlock(); - map::iterator iter; + map::iterator iter; string section; utime_t ut = ceph_clock_now(cct); for (iter = entries.begin(); iter != entries.end(); ++iter) { - const rgw_bucket_shard& bs = iter->first; - const rgw_bucket& bucket = bs.bucket; - int shard_id = bs.shard_id; - - int index = choose_oid(bs); + rgw_bucket& bucket = iter->second; + int index = choose_oid(bucket); cls_log_entry entry; @@ -1140,21 +1110,16 @@ int RGWDataChangesLog::renew_entries() bufferlist bl; change.entity_type = ENTITY_TYPE_BUCKET; change.key = bucket.name + ":" + bucket.bucket_id; - if (shard_id >= 0) { - char buf[16]; - snprintf(buf, sizeof(buf), ":%d", shard_id); - change.key += buf; - } change.timestamp = ut; ::encode(change, bl); store->time_log_prepare_entry(entry, ut, section, bucket.name, bl); - m[index].first.push_back(bs); + m[index].first.push_back(bucket.name); m[index].second.push_back(entry); } - map, list > >::iterator miter; + map, list > >::iterator miter; for (miter = m.begin(); miter != m.end(); ++miter) { list& entries = miter->second.second; @@ -1171,8 +1136,8 @@ int RGWDataChangesLog::renew_entries() utime_t expiration = now; expiration += utime_t(cct->_conf->rgw_data_log_window, 0); - list& buckets = miter->second.first; - list::iterator liter; + list& buckets = miter->second.first; + list::iterator liter; for (liter = buckets.begin(); liter != buckets.end(); ++liter) { update_renewed(*liter, expiration); } @@ -1181,41 +1146,39 @@ int RGWDataChangesLog::renew_entries() return 0; } -void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status) +void RGWDataChangesLog::_get_change(string& bucket_name, ChangeStatusPtr& status) { assert(lock.is_locked()); - if (!changes.find(bs, status)) { + if (!changes.find(bucket_name, status)) { status = ChangeStatusPtr(new ChangeStatus); - changes.add(bs, status); + changes.add(bucket_name, status); } } -void RGWDataChangesLog::register_renew(rgw_bucket_shard& bs) +void RGWDataChangesLog::register_renew(rgw_bucket& bucket) { Mutex::Locker l(lock); - cur_cycle[bs] = true; + cur_cycle[bucket.name] = bucket; } -void RGWDataChangesLog::update_renewed(rgw_bucket_shard& bs, utime_t& expiration) +void RGWDataChangesLog::update_renewed(string& bucket_name, utime_t& expiration) { Mutex::Locker l(lock); ChangeStatusPtr status; - _get_change(bs, status); + _get_change(bucket_name, status); - ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bs.bucket.name << " shard_id=" << bs.shard_id << " expiration=" << expiration << dendl; + ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bucket_name << " expiration=" << expiration << dendl; status->cur_expiration = expiration; } -int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) { +int RGWDataChangesLog::add_entry(rgw_bucket& bucket) { if (!store->need_to_log_data()) return 0; - rgw_bucket_shard bs(bucket, shard_id); - lock.Lock(); ChangeStatusPtr status; - _get_change(bs, status); + _get_change(bucket.name, status); lock.Unlock(); @@ -1223,13 +1186,13 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) { status->lock->Lock(); - ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " shard_id=" << shard_id << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl; + ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl; if (now < status->cur_expiration) { /* no need to send, recently completed */ status->lock->Unlock(); - register_renew(bs); + register_renew(bucket); return 0; } @@ -1246,7 +1209,7 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) { int ret = cond->wait(); cond->put(); if (!ret) { - register_renew(bs); + register_renew(bucket); } return ret; } @@ -1254,7 +1217,7 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) { status->cond = new RefCountedCond; status->pending = true; - string& oid = oids[choose_oid(bs)]; + string& oid = oids[choose_oid(bucket)]; utime_t expiration; int ret; @@ -1271,11 +1234,6 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) { rgw_data_change change; change.entity_type = ENTITY_TYPE_BUCKET; change.key = bucket.name + ":" + bucket.bucket_id; - if (shard_id >= 0) { - char buf[16]; - snprintf(buf, sizeof(buf), ":%d", shard_id); - change.key += buf; - } change.timestamp = now; ::encode(change, bl); string section; @@ -1728,7 +1686,7 @@ public: objv_tracker = bci.info.objv_tracker; - ret = store->init_bucket_index(bci.info.bucket, bci.info.num_shards); + ret = store->init_bucket_index(bci.info.bucket); if (ret < 0) return ret; diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h index d0c2f4b184938..3bdd68c057d0d 100644 --- a/src/rgw/rgw_bucket.h +++ b/src/rgw/rgw_bucket.h @@ -32,8 +32,6 @@ extern int rgw_bucket_instance_store_info(RGWRados *store, string& oid, bufferli map *pattrs, RGWObjVersionTracker *objv_tracker, time_t mtime); -extern int rgw_bucket_parse_bucket_instance(const string& bucket_instance, string *target_bucket_instance, int *shard_id); - extern int rgw_bucket_instance_remove_entry(RGWRados *store, string& entry, RGWObjVersionTracker *objv_tracker); extern int rgw_bucket_delete_bucket_obj(RGWRados *store, string& bucket_name, RGWObjVersionTracker& objv_tracker); @@ -316,13 +314,13 @@ class RGWDataChangesLog { typedef ceph::shared_ptr ChangeStatusPtr; - lru_map changes; + lru_map changes; - map cur_cycle; + map cur_cycle; - void _get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status); - void register_renew(rgw_bucket_shard& bs); - void update_renewed(rgw_bucket_shard& bs, utime_t& expiration); + void _get_change(string& bucket_name, ChangeStatusPtr& status); + void register_renew(rgw_bucket& bucket); + void update_renewed(string& bucket_name, utime_t& expiration); class ChangesRenewThread : public Thread { CephContext *cct; @@ -364,8 +362,8 @@ public: ~RGWDataChangesLog(); - int choose_oid(const rgw_bucket_shard& bs); - int add_entry(rgw_bucket& bucket, int shard_id); + int choose_oid(rgw_bucket& bucket); + int add_entry(rgw_bucket& bucket); int renew_entries(); int list_entries(int shard, utime_t& start_time, utime_t& end_time, int max_entries, list& entries, diff --git a/src/rgw/rgw_common.cc b/src/rgw/rgw_common.cc index a36d89de2a068..dfe3361a83997 100644 --- a/src/rgw/rgw_common.cc +++ b/src/rgw/rgw_common.cc @@ -26,8 +26,6 @@ PerfCounters *perfcounter = NULL; -const uint32_t RGWBucketInfo::NUM_SHARDS_BLIND_BUCKET(UINT32_MAX); - int rgw_perf_start(CephContext *cct) { PerfCountersBuilder plb(cct, cct->_conf->name.to_str(), l_rgw_first, l_rgw_last); diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index a4c9b41ef167b..d9175e8dc93e1 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -140,10 +140,6 @@ using ceph::crypto::MD5; #define ERR_USER_SUSPENDED 2100 #define ERR_INTERNAL_ERROR 2200 -#ifndef UINT32_MAX -#define UINT32_MAX (4294967295) -#endif - typedef void *RGWAccessHandle; @@ -681,25 +677,6 @@ inline ostream& operator<<(ostream& out, const rgw_bucket &b) { return out; } -struct rgw_bucket_shard { - rgw_bucket bucket; - int shard_id; - - rgw_bucket_shard() : shard_id(-1) {} - rgw_bucket_shard(rgw_bucket& _b, int _sid) : bucket(_b), shard_id(_sid) {} - - bool operator<(const rgw_bucket_shard& b) const { - if (bucket < b.bucket) { - return true; - } - if (b.bucket < bucket) { - return false; - } - return shard_id < b.shard_id; - } -}; - - struct RGWObjVersionTracker { obj_version read_version; obj_version write_version; @@ -744,10 +721,6 @@ enum RGWBucketFlags { struct RGWBucketInfo { - enum BIShardsHashType { - MOD = 0 - }; - rgw_bucket bucket; string owner; uint32_t flags; @@ -759,20 +732,8 @@ struct RGWBucketInfo obj_version ep_objv; /* entry point object version, for runtime tracking only */ RGWQuotaInfo quota; - // Represents the number of bucket index object shards: - // - value of 0 indicates there is no sharding (this is by default before this - // feature is implemented). - // - value of UINT32_T::MAX indicates this is a blind bucket. - uint32_t num_shards; - - // Represents the bucket index shard hash type. - uint8_t bucket_index_shard_hash_type; - - // Represents the shard number for blind bucket. - const static uint32_t NUM_SHARDS_BLIND_BUCKET; - void encode(bufferlist& bl) const { - ENCODE_START(11, 4, bl); + ENCODE_START(9, 4, bl); ::encode(bucket, bl); ::encode(owner, bl); ::encode(flags, bl); @@ -782,8 +743,6 @@ struct RGWBucketInfo ::encode(placement_rule, bl); ::encode(has_instance_obj, bl); ::encode(quota, bl); - ::encode(num_shards, bl); - ::encode(bucket_index_shard_hash_type, bl); ENCODE_FINISH(bl); } void decode(bufferlist::iterator& bl) { @@ -806,10 +765,6 @@ struct RGWBucketInfo ::decode(has_instance_obj, bl); if (struct_v >= 9) ::decode(quota, bl); - if (struct_v >= 10) - ::decode(num_shards, bl); - if (struct_v >= 11) - ::decode(bucket_index_shard_hash_type, bl); DECODE_FINISH(bl); } void dump(Formatter *f) const; @@ -817,7 +772,7 @@ struct RGWBucketInfo void decode_json(JSONObj *obj); - RGWBucketInfo() : flags(0), creation_time(0), has_instance_obj(false), num_shards(0), bucket_index_shard_hash_type(MOD) {} + RGWBucketInfo() : flags(0), creation_time(0), has_instance_obj(false) {} }; WRITE_CLASS_ENCODER(RGWBucketInfo) @@ -1067,9 +1022,6 @@ public: bool in_extra_data; /* in-memory only member, does not serialize */ - // Represents the hash index source for this object once it is set (non-empty) - std::string index_hash_source; - rgw_obj() : in_extra_data(false) {} rgw_obj(const char *b, const char *o) : in_extra_data(false) { rgw_bucket _b(b); @@ -1168,9 +1120,6 @@ public: return orig_key; } - string& get_hash_object() { - return index_hash_source.empty() ? object : index_hash_source; - } /** * Translate a namespace-mangled object name to the user-facing name * existing in the given namespace. diff --git a/src/rgw/rgw_json_enc.cc b/src/rgw/rgw_json_enc.cc index f4ce380d20cdc..cd731b78a5920 100644 --- a/src/rgw/rgw_json_enc.cc +++ b/src/rgw/rgw_json_enc.cc @@ -545,8 +545,6 @@ void RGWBucketInfo::dump(Formatter *f) const encode_json("placement_rule", placement_rule, f); encode_json("has_instance_obj", has_instance_obj, f); encode_json("quota", quota, f); - encode_json("num_shards", num_shards, f); - encode_json("bi_shard_hash_type", (uint32_t)bucket_index_shard_hash_type, f); } void RGWBucketInfo::decode_json(JSONObj *obj) { @@ -558,10 +556,6 @@ void RGWBucketInfo::decode_json(JSONObj *obj) { JSONDecoder::decode_json("placement_rule", placement_rule, obj); JSONDecoder::decode_json("has_instance_obj", has_instance_obj, obj); JSONDecoder::decode_json("quota", quota, obj); - JSONDecoder::decode_json("num_shards", num_shards, obj); - uint32_t hash_type; - JSONDecoder::decode_json("bi_shard_hash_type", hash_type, obj); - bucket_index_shard_hash_type = (uint8_t)hash_type; } void RGWObjEnt::dump(Formatter *f) const @@ -664,7 +658,6 @@ void RGWZone::dump(Formatter *f) const encode_json("endpoints", endpoints, f); encode_json("log_meta", log_meta, f); encode_json("log_data", log_data, f); - encode_json("bucket_index_max_shards", bucket_index_max_shards, f); } void RGWZone::decode_json(JSONObj *obj) @@ -673,7 +666,6 @@ void RGWZone::decode_json(JSONObj *obj) JSONDecoder::decode_json("endpoints", endpoints, obj); JSONDecoder::decode_json("log_meta", log_meta, obj); JSONDecoder::decode_json("log_data", log_data, obj); - JSONDecoder::decode_json("bucket_index_max_shards", bucket_index_max_shards, obj); } void RGWRegionPlacementTarget::dump(Formatter *f) const diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 679b70bd14c23..11bb93d2d57a0 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -323,14 +323,7 @@ static int rgw_build_policies(RGWRados *store, struct req_state *s, bool only_bu string obj_str; RGWUserInfo bucket_owner_info; - string bi = s->info.args.get(RGW_SYS_PARAM_PREFIX "bucket-instance"); - if (!bi.empty()) { - int shard_id; - ret = rgw_bucket_parse_bucket_instance(bi, &s->bucket_instance_id, &shard_id); - if (ret < 0) { - return ret; - } - } + s->bucket_instance_id = s->info.args.get(RGW_SYS_PARAM_PREFIX "bucket-instance"); s->bucket_acl = new RGWAccessControlPolicy(s->cct); @@ -1461,7 +1454,6 @@ int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, void *obj_ctx, string } head_obj = manifest_gen.get_cur_obj(); - head_obj.index_hash_source = obj_str; cur_obj = head_obj; return 0; @@ -2539,7 +2531,6 @@ void RGWInitMultipart::execute() obj.init_ns(s->bucket, tmp_obj_name, mp_ns); // the meta object will be indexed with 0 size, we c obj.set_in_extra_data(true); - obj.index_hash_source = s->object_str; ret = store->put_obj_meta(s->obj_ctx, obj, 0, NULL, attrs, RGW_OBJ_CATEGORY_MULTIMETA, PUT_OBJ_CREATE_EXCL, s->owner.get_id()); } while (ret == -EEXIST); } @@ -2748,7 +2739,6 @@ void RGWCompleteMultipart::execute() meta_obj.init_ns(s->bucket, meta_oid, mp_ns); meta_obj.set_in_extra_data(true); - meta_obj.index_hash_source = s->object_str; ret = get_obj_attrs(store, s, meta_obj, attrs, NULL, NULL); if (ret < 0) { @@ -2900,7 +2890,6 @@ void RGWAbortMultipart::execute() string oid = mp.get_part(obj_iter->second.num); rgw_obj obj; obj.init_ns(s->bucket, oid, mp_ns); - obj.index_hash_source = s->object_str; ret = store->delete_obj(s->obj_ctx, owner, obj); if (ret < 0 && ret != -ENOENT) return; @@ -2909,7 +2898,6 @@ void RGWAbortMultipart::execute() RGWObjManifest::obj_iterator oiter; for (oiter = manifest.obj_begin(); oiter != manifest.obj_end(); ++oiter) { rgw_obj loc = oiter.get_location(); - loc.index_hash_source = s->object_str; ret = store->delete_obj(s->obj_ctx, owner, loc); if (ret < 0 && ret != -ENOENT) return; @@ -2921,7 +2909,6 @@ void RGWAbortMultipart::execute() // and also remove the metadata obj meta_obj.init_ns(s->bucket, meta_oid, mp_ns); meta_obj.set_in_extra_data(true); - meta_obj.index_hash_source = s->object_str; ret = store->delete_obj(s->obj_ctx, owner, meta_obj); if (ret == -ENOENT) { ret = -ERR_NO_SUCH_BUCKET; diff --git a/src/rgw/rgw_quota.cc b/src/rgw/rgw_quota.cc index 910da2fffb7c2..a48ce69890bdf 100644 --- a/src/rgw/rgw_quota.cc +++ b/src/rgw/rgw_quota.cc @@ -318,8 +318,8 @@ int RGWBucketStatsCache::fetch_stats_from_storage(const string& user, rgw_bucket { RGWBucketInfo bucket_info; - string bucket_ver; - string master_ver; + uint64_t bucket_ver; + uint64_t master_ver; map bucket_stats; int r = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, bucket_stats, NULL); diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 164363f83034a..bb110142f4912 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -19,7 +19,6 @@ #include "rgw_metadata.h" #include "rgw_bucket.h" -#include "cls/rgw/cls_rgw_ops.h" #include "cls/rgw/cls_rgw_types.h" #include "cls/rgw/cls_rgw_client.h" #include "cls/refcount/cls_refcount_client.h" @@ -49,8 +48,6 @@ using namespace librados; #define dout_subsys ceph_subsys_rgw -#define MAX_BUCKET_INDEX_SHARDS_PRIME 7877 - using namespace std; static RGWCache cached_rados_provider; @@ -80,6 +77,7 @@ static RGWObjCategory main_category = RGW_OBJ_CATEGORY_MAIN; #define RGW_STATELOG_OBJ_PREFIX "statelog." + #define dout_subsys ceph_subsys_rgw void RGWDefaultRegionInfo::dump(Formatter *f) const { @@ -1453,15 +1451,6 @@ int RGWRados::init_complete() quota_handler = RGWQuotaHandler::generate_handler(this, quota_threads); - bucket_index_max_shards = (cct->_conf->rgw_override_bucket_index_max_shards ? cct->_conf->rgw_override_bucket_index_max_shards : - zone_public_config.bucket_index_max_shards); - if (bucket_index_max_shards > MAX_BUCKET_INDEX_SHARDS_PRIME) { - bucket_index_max_shards = MAX_BUCKET_INDEX_SHARDS_PRIME; - ldout(cct, 1) << __func__ << " bucket index max shards is too large, reset to value: " - << MAX_BUCKET_INDEX_SHARDS_PRIME << dendl; - } - ldout(cct, 20) << __func__ << " bucket index max shards: " << bucket_index_max_shards << dendl; - return ret; } @@ -1683,15 +1672,6 @@ int RGWRados::open_bucket_data_extra_ctx(rgw_bucket& bucket, librados::IoCtx& da return 0; } -void RGWRados::build_bucket_index_marker(const string& shard_id_str, const string& shard_marker, - string *marker) { - if (marker) { - *marker = shard_id_str; - marker->append(BucketIndexShardsManager::KEY_VALUE_SEPARATOR); - marker->append(shard_marker); - } -} - int RGWRados::open_bucket_index_ctx(rgw_bucket& bucket, librados::IoCtx& index_ctx) { int r = open_bucket_pool_ctx(bucket.name, bucket.index_pool, index_ctx); @@ -2014,7 +1994,7 @@ void RGWRados::shard_name(const string& prefix, unsigned max_shards, const strin name = prefix + buf; } -void RGWRados::time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, const string& section, const string& key, bufferlist& bl) +void RGWRados::time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, string& section, string& key, bufferlist& bl) { cls_log_add_prepare_entry(entry, ut, section, key, bl); } @@ -2342,10 +2322,6 @@ int RGWRados::list_objects(rgw_bucket& bucket, int max, string& prefix, string& result.push_back(ent); count++; } - - // Either the back-end telling us truncated, or we don't consume all - // items returned per the amount caller request - truncated = (truncated || eiter != ent_map.end()); } done: @@ -2382,7 +2358,7 @@ int RGWRados::create_pool(rgw_bucket& bucket) return 0; } -int RGWRados::init_bucket_index(rgw_bucket& bucket, int num_shards) +int RGWRados::init_bucket_index(rgw_bucket& bucket) { librados::IoCtx index_ctx; // context for new bucket @@ -2393,10 +2369,13 @@ int RGWRados::init_bucket_index(rgw_bucket& bucket, int num_shards) string dir_oid = dir_oid_prefix; dir_oid.append(bucket.marker); - map bucket_objs; - get_bucket_index_objects(dir_oid, num_shards, bucket_objs); + librados::ObjectWriteOperation op; + op.create(true); + r = cls_rgw_init_index(index_ctx, op, dir_oid); + if (r < 0 && r != -EEXIST) + return r; - return CLSRGWIssueBucketIndexInit(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)(); + return 0; } /** @@ -2447,7 +2426,7 @@ int RGWRados::create_bucket(RGWUserInfo& owner, rgw_bucket& bucket, string dir_oid = dir_oid_prefix; dir_oid.append(bucket.marker); - r = init_bucket_index(bucket, bucket_index_max_shards); + r = init_bucket_index(bucket); if (r < 0) return r; @@ -2463,8 +2442,6 @@ int RGWRados::create_bucket(RGWUserInfo& owner, rgw_bucket& bucket, info.owner = owner.user_id; info.region = region_name; info.placement_rule = selected_placement_rule; - info.num_shards = bucket_index_max_shards; - info.bucket_index_shard_hash_type = RGWBucketInfo::MOD; if (!creation_time) time(&info.creation_time); else @@ -2493,16 +2470,11 @@ int RGWRados::create_bucket(RGWUserInfo& owner, rgw_bucket& bucket, /* remove bucket index */ librados::IoCtx index_ctx; // context for new bucket - map bucket_objs; - int r = open_bucket_index(bucket, index_ctx, bucket_objs); + int r = open_bucket_index_ctx(bucket, index_ctx); if (r < 0) return r; - map::const_iterator biter; - for (biter = bucket_objs.begin(); biter != bucket_objs.end(); ++biter) { - // Do best effort removal - index_ctx.remove(biter->second); - } + index_ctx.remove(dir_oid); } /* ret == -ENOENT here */ } @@ -2835,25 +2807,6 @@ int RGWRados::get_obj_ref(const rgw_obj& obj, rgw_rados_ref *ref, rgw_bucket *bu return 0; } -int RGWRados::BucketShard::init(rgw_bucket& _bucket, rgw_obj& obj) -{ - bucket = _bucket; - - if (store->bucket_is_system(bucket)) { - return 0; - } - - int ret = store->open_bucket_index_shard(bucket, index_ctx, obj.get_hash_object(), &bucket_obj, &shard_id); - if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: open_bucket_index_shard() returned ret=" << ret << dendl; - return ret; - } - ldout(store->ctx(), 20) << " bucket index object: " << bucket_obj << dendl; - - return 0; -} - - /** * Write/overwrite an object to the bucket storage. * bucket: the bucket to store the object in @@ -2975,14 +2928,7 @@ int RGWRados::put_obj_meta_impl(void *ctx, rgw_obj& obj, uint64_t size, index_tag = state->write_tag; } - librados::IoCtx index_ctx; - BucketShard bs(this); - r = bs.init(bucket, obj); - if (r < 0) { - return r; - } - - r = prepare_update_index(NULL, bs, CLS_RGW_OP_ADD, obj, index_tag); + r = prepare_update_index(NULL, bucket, CLS_RGW_OP_ADD, obj, index_tag); if (r < 0) return r; @@ -3004,8 +2950,8 @@ int RGWRados::put_obj_meta_impl(void *ctx, rgw_obj& obj, uint64_t size, ldout(cct, 0) << "ERROR: complete_atomic_overwrite returned r=" << r << dendl; } - r = complete_update_index(bs, obj, index_tag, poolid, epoch, size, - ut, etag, content_type, &acl_bl, category, remove_objs); + r = complete_update_index(bucket, obj.object, index_tag, poolid, epoch, size, + ut, etag, content_type, &acl_bl, category, remove_objs); if (r < 0) goto done_cancel; @@ -3021,7 +2967,7 @@ int RGWRados::put_obj_meta_impl(void *ctx, rgw_obj& obj, uint64_t size, return 0; done_cancel: - int ret = complete_update_index_cancel(bs, obj, index_tag); + int ret = complete_update_index_cancel(bucket, obj.object, index_tag); if (ret < 0) { ldout(cct, 0) << "ERROR: complete_update_index_cancel() returned ret=" << ret << dendl; } @@ -3819,86 +3765,7 @@ int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, return 0; } -int RGWRados::open_bucket_index_base(rgw_bucket& bucket, librados::IoCtx& index_ctx, - string& bucket_oid_base) { - if (bucket_is_system(bucket)) - return -EINVAL; - - int r = open_bucket_index_ctx(bucket, index_ctx); - if (r < 0) - return r; - - if (bucket.marker.empty()) { - ldout(cct, 0) << "ERROR: empty marker for bucket operation" << dendl; - return -EIO; - } - - bucket_oid_base = dir_oid_prefix; - bucket_oid_base.append(bucket.marker); - - return 0; - -} - -int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, - map& bucket_objs, int shard_id, map *bucket_instance_ids) { - string bucket_oid_base; - int ret = open_bucket_index_base(bucket, index_ctx, bucket_oid_base); - if (ret < 0) - return ret; - - // Get the bucket info - RGWBucketInfo binfo; - ret = get_bucket_instance_info(NULL, bucket, binfo, NULL, NULL); - if (ret < 0) - return ret; - - get_bucket_index_objects(bucket_oid_base, binfo.num_shards, bucket_objs, shard_id); - if (bucket_instance_ids) { - get_bucket_instance_ids(binfo, shard_id, bucket_instance_ids); - } - return 0; -} - -template -int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, - map& oids, map& bucket_objs, - int shard_id, map *bucket_instance_ids) -{ - int ret = open_bucket_index(bucket, index_ctx, oids, shard_id, bucket_instance_ids); - if (ret < 0) - return ret; - - map::const_iterator iter = oids.begin(); - for (; iter != oids.end(); ++iter) { - bucket_objs[iter->first] = T(); - } - return 0; -} - -int RGWRados::open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index_ctx, - const string& obj_key, string *bucket_obj, int *shard_id) -{ - string bucket_oid_base; - int ret = open_bucket_index_base(bucket, index_ctx, bucket_oid_base); - if (ret < 0) - return ret; - - // Get the bucket info - RGWBucketInfo binfo; - ret = get_bucket_instance_info(NULL, bucket, binfo, NULL, NULL); - if (ret < 0) - return ret; - - ret = get_bucket_index_object(bucket_oid_base, obj_key, binfo.num_shards, - (RGWBucketInfo::BIShardsHashType)binfo.bucket_index_shard_hash_type, bucket_obj, shard_id); - if (ret < 0) { - ldout(cct, 10) << "get_bucket_index_object() returned ret=" << ret << dendl; - } - return 0; -} - -static void accumulate_raw_stats(rgw_bucket_dir_header& header, map& stats) +static void translate_raw_stats(rgw_bucket_dir_header& header, map& stats) { map::iterator iter = header.stats.begin(); for (; iter != header.stats.end(); ++iter) { @@ -3906,9 +3773,9 @@ static void accumulate_raw_stats(rgw_bucket_dir_header& header, mapsecond; s.category = (RGWObjCategory)iter->first; - s.num_kb += ((header_stats.total_size + 1023) / 1024); - s.num_kb_rounded += ((header_stats.total_size_rounded + 1023) / 1024); - s.num_objects += header_stats.num_entries; + s.num_kb = ((header_stats.total_size + 1023) / 1024); + s.num_kb_rounded = ((header_stats.total_size_rounded + 1023) / 1024); + s.num_objects = header_stats.num_entries; } } @@ -3917,24 +3784,21 @@ int RGWRados::bucket_check_index(rgw_bucket& bucket, map *calculated_stats) { librados::IoCtx index_ctx; - // key - bucket index object id - // value - bucket index check OP returned result with the given bucket index object (shard) - map oids; - map bucket_objs_ret; - int ret = open_bucket_index(bucket, index_ctx, oids, bucket_objs_ret); + string oid; + + int ret = open_bucket_index(bucket, index_ctx, oid); if (ret < 0) return ret; - ret = CLSRGWIssueBucketCheck(index_ctx, oids, bucket_objs_ret, cct->_conf->rgw_bucket_index_max_aio)(); + rgw_bucket_dir_header existing_header; + rgw_bucket_dir_header calculated_header; + + ret = cls_rgw_bucket_check_index_op(index_ctx, oid, &existing_header, &calculated_header); if (ret < 0) return ret; - // Aggregate results (from different shards if there is any) - map::iterator iter; - for (iter = bucket_objs_ret.begin(); iter != bucket_objs_ret.end(); ++iter) { - accumulate_raw_stats(iter->second.existing_header, *existing_stats); - accumulate_raw_stats(iter->second.calculated_header, *calculated_stats); - } + translate_raw_stats(existing_header, *existing_stats); + translate_raw_stats(calculated_header, *calculated_stats); return 0; } @@ -3942,12 +3806,13 @@ int RGWRados::bucket_check_index(rgw_bucket& bucket, int RGWRados::bucket_rebuild_index(rgw_bucket& bucket) { librados::IoCtx index_ctx; - map bucket_objs; - int r = open_bucket_index(bucket, index_ctx, bucket_objs); - if (r < 0) - return r; + string oid; + + int ret = open_bucket_index(bucket, index_ctx, oid); + if (ret < 0) + return ret; - return CLSRGWIssueBucketRebuild(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)(); + return cls_rgw_bucket_rebuild_index_op(index_ctx, oid); } @@ -4009,14 +3874,8 @@ int RGWRados::delete_obj_impl(void *ctx, const string& bucket_owner, rgw_obj& ob bool ret_not_existed = (state && !state->exists); - BucketShard bs(this); - r = bs.init(bucket, obj); - if (r < 0) { - return r; - } - string tag; - r = prepare_update_index(state, bs, CLS_RGW_OP_DEL, obj, tag); + r = prepare_update_index(state, bucket, CLS_RGW_OP_DEL, obj, tag); if (r < 0) return r; @@ -4031,9 +3890,9 @@ int RGWRados::delete_obj_impl(void *ctx, const string& bucket_owner, rgw_obj& ob int64_t poolid = ref.ioctx.get_id(); if (r >= 0 || r == -ENOENT) { uint64_t epoch = ref.ioctx.get_last_version(); - r = complete_update_index_del(bs, obj, tag, poolid, epoch); + r = complete_update_index_del(bucket, obj.object, tag, poolid, epoch); } else { - int ret = complete_update_index_cancel(bs, obj, tag); + int ret = complete_update_index_cancel(bucket, obj.object, tag); if (ret < 0) { ldout(cct, 0) << "ERROR: complete_update_index_cancel returned ret=" << ret << dendl; } @@ -4091,14 +3950,8 @@ int RGWRados::delete_obj_index(rgw_obj& obj) std::string oid, key; get_obj_bucket_and_oid_key(obj, bucket, oid, key); - BucketShard bs(this); - int r = bs.init(bucket, obj); - if (r < 0) { - return r; - } - string tag; - r = complete_update_index_del(bs, obj, tag, -1 /* pool */, 0); + int r = complete_update_index_del(bucket, obj.object, tag, -1 /* pool */, 0); return r; } @@ -4434,17 +4287,9 @@ int RGWRados::set_attrs(void *ctx, rgw_obj& obj, if (!op.size()) return 0; - librados::IoCtx index_ctx; - BucketShard bs(this); - r = bs.init(bucket, obj); - if (r < 0) { - ldout(cct, 10) << "bs.init() returned r=" << r << dendl; - return r; - } - string tag; if (state) { - r = prepare_update_index(state, bs, CLS_RGW_OP_ADD, obj, tag); + r = prepare_update_index(state, bucket, CLS_RGW_OP_ADD, obj, tag); if (r < 0) return r; } @@ -4460,10 +4305,10 @@ int RGWRados::set_attrs(void *ctx, rgw_obj& obj, uint64_t epoch = ref.ioctx.get_last_version(); int64_t poolid = ref.ioctx.get_id(); utime_t mtime = ceph_clock_now(cct); - r = complete_update_index(bs, obj, tag, poolid, epoch, state->size, + r = complete_update_index(bucket, obj.object, tag, poolid, epoch, state->size, mtime, etag, content_type, &acl_bl, RGW_OBJ_CATEGORY_MAIN, NULL); } else { - int ret = complete_update_index_cancel(bs, obj, tag); + int ret = complete_update_index_cancel(bucket, obj.object, tag); if (ret < 0) { ldout(cct, 0) << "ERROR: comlete_update_index_cancel() returned r=" << r << dendl; } @@ -4659,13 +4504,13 @@ done_err: return r; } -int RGWRados::prepare_update_index(RGWObjState *state, BucketShard& bs, +int RGWRados::prepare_update_index(RGWObjState *state, rgw_bucket& bucket, RGWModifyOp op, rgw_obj& obj, string& tag) { - if (bucket_is_system(bs.bucket)) + if (bucket_is_system(bucket)) return 0; - int ret = data_log->add_entry(bs.bucket, bs.shard_id); + int ret = data_log->add_entry(obj.bucket); if (ret < 0) { lderr(cct) << "ERROR: failed writing data log" << dendl; return ret; @@ -4682,20 +4527,21 @@ int RGWRados::prepare_update_index(RGWObjState *state, BucketShard& bs, append_rand_alpha(cct, tag, tag, 32); } } - ret = cls_obj_prepare_op(bs, op, tag, obj.object, obj.key); + ret = cls_obj_prepare_op(bucket, op, tag, + obj.object, obj.key); return ret; } -int RGWRados::complete_update_index(BucketShard& bs, rgw_obj& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size, +int RGWRados::complete_update_index(rgw_bucket& bucket, string& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size, utime_t& ut, string& etag, string& content_type, bufferlist *acl_bl, RGWObjCategory category, list *remove_objs) { - if (bucket_is_system(bs.bucket)) + if (bucket_is_system(bucket)) return 0; RGWObjEnt ent; - ent.name = oid.object; + ent.name = oid; ent.size = size; ent.mtime = ut; ent.etag = etag; @@ -4710,7 +4556,7 @@ int RGWRados::complete_update_index(BucketShard& bs, rgw_obj& oid, string& tag, ent.owner_display_name = owner.get_display_name(); ent.content_type = content_type; - int ret = cls_obj_complete_add(bs, tag, poolid, epoch, ent, category, remove_objs); + int ret = cls_obj_complete_add(bucket, tag, poolid, epoch, ent, category, remove_objs); return ret; } @@ -4734,7 +4580,6 @@ int RGWRados::clone_objs_impl(void *ctx, rgw_obj& dst_obj, string etag; string content_type; bufferlist acl_bl; - BucketShard bs(this); bool update_index = (category == RGW_OBJ_CATEGORY_MAIN || category == RGW_OBJ_CATEGORY_MULTIMETA); @@ -4815,13 +4660,8 @@ int RGWRados::clone_objs_impl(void *ctx, rgw_obj& dst_obj, int64_t poolid = io_ctx.get_id(); int ret; - ret = bs.init(bucket, dst_obj); - if (ret < 0) { - goto done; - } - if (update_index) { - ret = prepare_update_index(state, bs, CLS_RGW_OP_ADD, dst_obj, tag); + ret = prepare_update_index(state, bucket, CLS_RGW_OP_ADD, dst_obj, tag); if (ret < 0) goto done; } @@ -4835,10 +4675,10 @@ done: if (update_index) { if (ret >= 0) { - ret = complete_update_index(bs, dst_obj, tag, poolid, epoch, size, + ret = complete_update_index(bucket, dst_obj.object, tag, poolid, epoch, size, ut, etag, content_type, &acl_bl, category, NULL); } else { - int r = complete_update_index_cancel(bs, dst_obj, tag); + int r = complete_update_index_cancel(bucket, dst_obj.object, tag); if (r < 0) { ldout(cct, 0) << "ERROR: comlete_update_index_cancel() returned r=" << r << dendl; } @@ -5537,95 +5377,57 @@ int RGWRados::obj_stat(void *ctx, rgw_obj& obj, uint64_t *psize, time_t *pmtime, return 0; } -int RGWRados::get_bucket_stats(rgw_bucket& bucket, string *bucket_ver, string *master_ver, - map& stats, string *max_marker) +int RGWRados::get_bucket_stats(rgw_bucket& bucket, uint64_t *bucket_ver, uint64_t *master_ver, map& stats, + string *max_marker) { - map headers; - map bucket_instance_ids; - int r = cls_bucket_head(bucket, headers, &bucket_instance_ids); + rgw_bucket_dir_header header; + int r = cls_bucket_head(bucket, header); if (r < 0) return r; - assert(headers.size() == bucket_instance_ids.size()); - - map::iterator iter = headers.begin(); - map::iterator viter = bucket_instance_ids.begin(); - BucketIndexShardsManager ver_mgr; - BucketIndexShardsManager master_ver_mgr; - BucketIndexShardsManager marker_mgr; - char buf[64]; - for(; iter != headers.end(); ++iter, ++viter) { - accumulate_raw_stats(iter->second, stats); - snprintf(buf, sizeof(buf), "%lu", iter->second.ver); - ver_mgr.add(viter->first, string(buf)); - snprintf(buf, sizeof(buf), "%lu", iter->second.master_ver); - master_ver_mgr.add(viter->first, string(buf)); - marker_mgr.add(viter->first, iter->second.max_marker); - } - ver_mgr.to_string(bucket_ver); - master_ver_mgr.to_string(master_ver); - marker_mgr.to_string(max_marker); + stats.clear(); + + translate_raw_stats(header, stats); + + *bucket_ver = header.ver; + *master_ver = header.master_ver; + + if (max_marker) + *max_marker = header.max_marker; + return 0; } class RGWGetBucketStatsContext : public RGWGetDirHeader_CB { RGWGetBucketStats_CB *cb; - uint32_t pendings; - map stats; - int ret_code; - bool should_cb; - Mutex lock; public: - RGWGetBucketStatsContext(RGWGetBucketStats_CB *_cb, uint32_t _pendings) - : cb(_cb), pendings(_pendings), stats(), ret_code(0), should_cb(true), - lock("RGWGetBucketStatsContext") {} - + RGWGetBucketStatsContext(RGWGetBucketStats_CB *_cb) : cb(_cb) {} void handle_response(int r, rgw_bucket_dir_header& header) { - Mutex::Locker l(lock); - if (should_cb) { - if ( r >= 0) { - accumulate_raw_stats(header, stats); - } else { - ret_code = r; - } + map stats; - // Are we all done? - if (--pendings == 0) { - if (!ret_code) { - cb->set_response(&stats); - } - cb->handle_response(ret_code); - cb->put(); - } + if (r >= 0) { + translate_raw_stats(header, stats); + cb->set_response(header.ver, header.master_ver, &stats, header.max_marker); } - } - void unset_cb() { - Mutex::Locker l(lock); - should_cb = false; + cb->handle_response(r); + + cb->put(); } }; int RGWRados::get_bucket_stats_async(rgw_bucket& bucket, RGWGetBucketStats_CB *ctx) { - RGWBucketInfo binfo; - int r = get_bucket_instance_info(NULL, bucket, binfo, NULL, NULL); - if (r < 0) - return r; - - int num_aio = 0; - RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(ctx, binfo.num_shards); - assert(get_ctx); - r = cls_bucket_head_async(bucket, get_ctx, &num_aio); - get_ctx->put(); + RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(ctx); + int r = cls_bucket_head_async(bucket, get_ctx); if (r < 0) { ctx->put(); - if (num_aio) { - get_ctx->unset_cb(); - } + delete get_ctx; + return r; } - return r; + + return 0; } class RGWGetUserStatsContext : public RGWGetUserHeader_CB { @@ -5719,7 +5521,7 @@ int RGWRados::get_bucket_instance_info(void *ctx, rgw_bucket& bucket, RGWBucketI time_t *pmtime, map *pattrs) { string oid; - if (bucket.oid.empty()) { + if (!bucket.oid.empty()) { get_bucket_meta_oid(bucket, oid); } else { oid = bucket.oid; @@ -6034,21 +5836,21 @@ int RGWRados::update_containers_stats(map& m) RGWBucketEnt& ent = iter->second; rgw_bucket& bucket = ent.bucket; - map headers; - int r = cls_bucket_head(bucket, headers); + rgw_bucket_dir_header header; + int r = cls_bucket_head(bucket, header); if (r < 0) return r; - map::iterator hiter = headers.begin(); - for (; hiter != headers.end(); ++hiter) { - RGWObjCategory category = main_category; - map::iterator iter = (hiter->second.stats).find((uint8_t)category); - if (iter != hiter->second.stats.end()) { - struct rgw_bucket_category_stats& stats = iter->second; - ent.count += stats.num_entries; - ent.size += stats.total_size; - ent.size_rounded += stats.total_size_rounded; - } + ent.count = 0; + ent.size = 0; + + RGWObjCategory category = main_category; + map::iterator iter = header.stats.find((uint8_t)category); + if (iter != header.stats.end()) { + struct rgw_bucket_category_stats& stats = iter->second; + ent.count = stats.num_entries; + ent.size = stats.total_size; + ent.size_rounded = stats.total_size_rounded; } } @@ -6171,125 +5973,43 @@ int RGWRados::list_raw_objects(rgw_bucket& pool, const string& prefix_filter, return oids.size(); } -int RGWRados::list_bi_log_entries(rgw_bucket& bucket, int shard_id, string& marker, uint32_t max, +int RGWRados::list_bi_log_entries(rgw_bucket& bucket, string& marker, uint32_t max, std::list& result, bool *truncated) { - ldout(cct, 20) << __func__ << ": " << bucket << " marker " << marker << " shard_id=" << shard_id << " max " << max << dendl; result.clear(); librados::IoCtx index_ctx; - map oids; - map bi_log_lists; - map bucket_instance_ids; - int r = open_bucket_index(bucket, index_ctx, oids, shard_id, &bucket_instance_ids); - if (r < 0) - return r; - - BucketIndexShardsManager marker_mgr; - bool has_shards = (oids.size() > 1 || shard_id >= 0); - // If there are multiple shards for the bucket index object, the marker - // should have the pattern '{shard_id_1}#{shard_marker_1},{shard_id_2}# - // {shard_marker_2}...', if there is no sharding, the bi_log_list should - // only contain one record, and the key is the bucket instance id. - r = marker_mgr.from_string(marker, shard_id); - if (r < 0) - return r; - - r = CLSRGWIssueBILogList(index_ctx, marker_mgr, max, oids, bi_log_lists, cct->_conf->rgw_bucket_index_max_aio)(); + string oid; + int r = open_bucket_index(bucket, index_ctx, oid); if (r < 0) return r; - vector shard_ids_str; - map::iterator> vcurrents; - map::iterator> vends; - if (truncated) { - *truncated = false; - } - map::iterator miter = bi_log_lists.begin(); - for (; miter != bi_log_lists.end(); ++miter) { - int shard_id = miter->first; - vcurrents[shard_id] = miter->second.entries.begin(); - vends[shard_id] = miter->second.entries.end(); - if (truncated) { - *truncated = (*truncated || miter->second.truncated); - } - } - - size_t total = 0; - bool has_more = true; - map::iterator>::iterator viter; - map::iterator>::iterator eiter; - while (total < max && has_more) { - has_more = false; - - viter = vcurrents.begin(); - eiter = vends.begin(); - - for (; total < max && viter != vcurrents.end(); ++viter, ++eiter) { - assert (eiter != vends.end()); - - int shard_id = viter->first; - list::iterator& liter = viter->second; - - if (liter == eiter->second){ - continue; - } - rgw_bi_log_entry& entry = *(liter); - if (has_shards) { - char buf[16]; - snprintf(buf, sizeof(buf), "%d", shard_id); - string tmp_id; - build_bucket_index_marker(buf, entry.id, &tmp_id); - entry.id.swap(tmp_id); - } - marker_mgr.add(shard_id, entry.id); - result.push_back(entry); - total++; - has_more = true; - ++liter; - } - } - - if (truncated) { - for (viter = vcurrents.begin(), eiter = vends.begin(); viter != vcurrents.end(); ++viter, ++eiter) { - assert (eiter != vends.end()); - *truncated = (*truncated || (viter->second != eiter->second)); - } - } + std::list entries; + int ret = cls_rgw_bi_log_list(index_ctx, oid, marker, max - result.size(), entries, truncated); + if (ret < 0) + return ret; - // Refresh marker, if there are multiple shards, the output will look like - // '{shard_oid_1}#{shard_marker_1},{shard_oid_2}#{shard_marker_2}...', - // if there is no sharding, the simply marker (without oid) is returned - if (has_shards) { - marker_mgr.to_string(&marker); - } else { - if (!result.empty()) { - marker = result.rbegin()->id; - } + std::list::iterator iter; + for (iter = entries.begin(); iter != entries.end(); ++iter) { + result.push_back(*iter); } return 0; } -int RGWRados::trim_bi_log_entries(rgw_bucket& bucket, int shard_id, string& start_marker, string& end_marker) +int RGWRados::trim_bi_log_entries(rgw_bucket& bucket, string& start_marker, string& end_marker) { librados::IoCtx index_ctx; - map bucket_objs; - int r = open_bucket_index(bucket, index_ctx, bucket_objs, shard_id); + string oid; + int r = open_bucket_index(bucket, index_ctx, oid); if (r < 0) return r; - BucketIndexShardsManager start_marker_mgr; - r = start_marker_mgr.from_string(start_marker, shard_id); - if (r < 0) - return r; - BucketIndexShardsManager end_marker_mgr; - r = end_marker_mgr.from_string(end_marker, shard_id); - if (r < 0) - return r; + int ret = cls_rgw_bi_log_trim(index_ctx, oid, start_marker, end_marker); + if (ret < 0) + return ret; - return CLSRGWIssueBILogTrim(index_ctx, start_marker_mgr, end_marker_mgr, bucket_objs, - cct->_conf->rgw_bucket_index_max_aio)(); + return 0; } int RGWRados::gc_operate(string& oid, librados::ObjectWriteOperation *op) @@ -6328,20 +6048,34 @@ int RGWRados::cls_rgw_init_index(librados::IoCtx& index_ctx, librados::ObjectWri return r; } -int RGWRados::cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag, +int RGWRados::cls_obj_prepare_op(rgw_bucket& bucket, RGWModifyOp op, string& tag, string& name, string& locator) { + librados::IoCtx index_ctx; + string oid; + + int r = open_bucket_index(bucket, index_ctx, oid); + if (r < 0) + return r; + ObjectWriteOperation o; cls_rgw_bucket_prepare_op(o, op, tag, name, locator, zone_public_config.log_data); - int ret = bs.index_ctx.operate(bs.bucket_obj, &o); - return ret; + r = index_ctx.operate(oid, &o); + return r; } -int RGWRados::cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag, +int RGWRados::cls_obj_complete_op(rgw_bucket& bucket, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, list *remove_objs) { + librados::IoCtx index_ctx; + string oid; + + int r = open_bucket_index(bucket, index_ctx, oid); + if (r < 0) + return r; + ObjectWriteOperation o; rgw_bucket_dir_entry_meta dir_meta; dir_meta.size = ent.size; @@ -6358,97 +6092,77 @@ int RGWRados::cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag, cls_rgw_bucket_complete_op(o, op, tag, ver, ent.name, dir_meta, remove_objs, zone_public_config.log_data); AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL); - int ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &o); + r = index_ctx.aio_operate(oid, c, &o); c->release(); - return ret; + return r; } -int RGWRados::cls_obj_complete_add(BucketShard& bs, string& tag, +int RGWRados::cls_obj_complete_add(rgw_bucket& bucket, string& tag, int64_t pool, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, - list *remove_obj) + list *remove_objs) { - return cls_obj_complete_op(bs, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_obj); + return cls_obj_complete_op(bucket, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_objs); } -int RGWRados::cls_obj_complete_del(BucketShard& bs, string& tag, +int RGWRados::cls_obj_complete_del(rgw_bucket& bucket, string& tag, int64_t pool, uint64_t epoch, string& name) { RGWObjEnt ent; ent.name = name; - return cls_obj_complete_op(bs, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, NULL); + return cls_obj_complete_op(bucket, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, NULL); } -int RGWRados::cls_obj_complete_cancel(BucketShard& bs, string& tag, string& name) +int RGWRados::cls_obj_complete_cancel(rgw_bucket& bucket, string& tag, string& name) { RGWObjEnt ent; ent.name = name; - return cls_obj_complete_op(bs, CLS_RGW_OP_ADD, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL); + return cls_obj_complete_op(bucket, CLS_RGW_OP_ADD, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL); } int RGWRados::cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeout) { librados::IoCtx index_ctx; - map bucket_objs; - int r = open_bucket_index(bucket, index_ctx, bucket_objs); + string oid; + + int r = open_bucket_index(bucket, index_ctx, oid); if (r < 0) return r; - return CLSRGWIssueSetTagTimeout(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio, timeout)(); + ObjectWriteOperation o; + cls_rgw_bucket_set_tag_timeout(o, timeout); + + r = index_ctx.operate(oid, &o); + + return r; } -int RGWRados::cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix, - uint32_t num_entries, map& m, bool *is_truncated, - string *last_entry, bool (*force_check_filter)(const string& name)) +int RGWRados::cls_bucket_list(rgw_bucket& bucket, string start, string prefix, + uint32_t num, map& m, + bool *is_truncated, string *last_entry, + bool (*force_check_filter)(const string& name)) { - ldout(cct, 10) << "cls_bucket_list " << bucket << " start " << start << " num_entries " << num_entries << dendl; + ldout(cct, 10) << "cls_bucket_list " << bucket << " start " << start << " num " << num << dendl; librados::IoCtx index_ctx; - // key - oid (for different shards if there is any) - // value - list result for the corresponding oid (shard), it is filled by the AIO callback - map oids; - map list_results; - int r = open_bucket_index(bucket, index_ctx, oids); + string oid; + int r = open_bucket_index(bucket, index_ctx, oid); if (r < 0) return r; - r = CLSRGWIssueBucketList(index_ctx, start, prefix, num_entries, oids, list_results, cct->_conf->rgw_bucket_index_max_aio)(); + struct rgw_bucket_dir dir; + r = cls_rgw_list_op(index_ctx, oid, start, prefix, num, &dir, is_truncated); if (r < 0) return r; - // Create a list of iterators that are used to iterate each shard - vector::iterator> vcurrents(list_results.size()); - vector::iterator> vends(list_results.size()); - vector vnames(list_results.size()); - map::iterator iter = list_results.begin(); - *is_truncated = false; - for (; iter != list_results.end(); ++iter) { - vcurrents.push_back(iter->second.dir.m.begin()); - vends.push_back(iter->second.dir.m.end()); - vnames.push_back(oids[iter->first]); - *is_truncated = (*is_truncated || iter->second.is_truncated); - } - - // Create a map to track the next candidate entry from each shard, if the entry - // from a specified shard is selected/erased, the next entry from that shard will - // be inserted for next round selection - map candidates; - for (size_t i = 0; i < vcurrents.size(); ++i) { - if (vcurrents[i] != vends[i]) { - candidates[vcurrents[i]->second.name] = i; - } - } - - map updates; - uint32_t count = 0; - while (count < num_entries && !candidates.empty()) { - // Select the next one - int pos = candidates.begin()->second; - struct rgw_bucket_dir_entry& dirent = vcurrents[pos]->second; + map::iterator miter; + bufferlist updates; + for (miter = dir.m.begin(); miter != dir.m.end(); ++miter) { + RGWObjEnt e; + rgw_bucket_dir_entry& dirent = miter->second; // fill it in with initial values; we may correct later - RGWObjEnt e; e.name = dirent.name; e.size = dirent.meta.size; e.mtime = dirent.meta.mtime; @@ -6458,53 +6172,44 @@ int RGWRados::cls_bucket_list(rgw_bucket& bucket, const string& start, const str e.content_type = dirent.meta.content_type; e.tag = dirent.tag; + /* oh, that shouldn't happen! */ + if (e.name.empty()) { + ldout(cct, 0) << "WARNING: got empty dirent name, skipping" << dendl; + continue; + } + bool force_check = force_check_filter && force_check_filter(dirent.name); + if (!dirent.exists || !dirent.pending_map.empty() || force_check) { /* there are uncommitted ops. We need to check the current state, * and if the tags are old we need to do cleanup as well. */ librados::IoCtx sub_ctx; sub_ctx.dup(index_ctx); - r = check_disk_state(sub_ctx, bucket, dirent, e, updates[vnames[pos]]); - if (r < 0 && r != -ENOENT) { + r = check_disk_state(sub_ctx, bucket, dirent, e, updates); + if (r < 0) { + if (r == -ENOENT) + continue; + else return r; } } - if (r >= 0) { - m[e.name] = e; - ldout(cct, 10) << "RGWRados::cls_bucket_list: got " << e.name << dendl; - ++count; - } - - // Refresh the candidates map - candidates.erase(candidates.begin()); - ++vcurrents[pos]; - if (vcurrents[pos] != vends[pos]) { - candidates[vcurrents[pos]->second.name] = pos; - } + m[e.name] = e; + ldout(cct, 10) << "RGWRados::cls_bucket_list: got " << e.name << dendl; } - // Suggest updates if there is any - map::iterator miter = updates.begin(); - for (; miter != updates.end(); ++miter) { - if (miter->second.length()) { - ObjectWriteOperation o; - cls_rgw_suggest_changes(o, miter->second); - // we don't care if we lose suggested updates, send them off blindly - AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL); - index_ctx.aio_operate(miter->first, c, &o); - c->release(); - } + if (dir.m.size()) { + *last_entry = dir.m.rbegin()->first; } - // Check if all the returned entries are consumed or not - for (size_t i = 0; i < vcurrents.size(); ++i) { - if (vcurrents[i] != vends[i]) - *is_truncated = true; + if (updates.length()) { + ObjectWriteOperation o; + cls_rgw_suggest_changes(o, updates); + // we don't care if we lose suggested updates, send them off blindly + AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL); + r = index_ctx.aio_operate(oid, c, &o); + c->release(); } - if (m.size()) - *last_entry = m.rbegin()->first; - - return 0; + return m.size(); } int RGWRados::cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info) @@ -6697,45 +6402,34 @@ int RGWRados::check_disk_state(librados::IoCtx io_ctx, return 0; } -int RGWRados::cls_bucket_head(rgw_bucket& bucket, map& headers, map *bucket_instance_ids) +int RGWRados::cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header) { librados::IoCtx index_ctx; - map oids; - map list_results; - int r = open_bucket_index(bucket, index_ctx, oids, list_results, -1, bucket_instance_ids); + string oid; + int r = open_bucket_index(bucket, index_ctx, oid); if (r < 0) return r; - r = CLSRGWIssueGetDirHeader(index_ctx, oids, list_results, cct->_conf->rgw_bucket_index_max_aio)(); + r = cls_rgw_get_dir_header(index_ctx, oid, &header); if (r < 0) return r; - map::iterator iter = list_results.begin(); - for(; iter != list_results.end(); ++iter) { - headers[oids[iter->first]] = iter->second.dir.header; - } return 0; } -int RGWRados::cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx, int *num_aio) +int RGWRados::cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx) { librados::IoCtx index_ctx; - map bucket_objs; - int r = open_bucket_index(bucket, index_ctx, bucket_objs); + string oid; + int r = open_bucket_index(bucket, index_ctx, oid); if (r < 0) return r; - map::iterator iter = bucket_objs.begin(); - for (; iter != bucket_objs.end(); ++iter) { - r = cls_rgw_get_dir_header_async(index_ctx, iter->second, static_cast(ctx->get())); - if (r < 0) { - ctx->put(); - break; - } else { - (*num_aio)++; - } - } - return r; + r = cls_rgw_get_dir_header_async(index_ctx, oid, ctx); + if (r < 0) + return r; + + return 0; } int RGWRados::cls_user_get_header(const string& user_id, cls_user_header *header) @@ -6786,8 +6480,8 @@ int RGWRados::cls_user_get_header_async(const string& user_id, RGWGetUserHeader_ int RGWRados::cls_user_sync_bucket_stats(rgw_obj& user_obj, rgw_bucket& bucket) { - map headers; - int r = cls_bucket_head(bucket, headers); + rgw_bucket_dir_header header; + int r = cls_bucket_head(bucket, header); if (r < 0) { ldout(cct, 20) << "cls_bucket_header() returned " << r << dendl; return r; @@ -6797,15 +6491,12 @@ int RGWRados::cls_user_sync_bucket_stats(rgw_obj& user_obj, rgw_bucket& bucket) bucket.convert(&entry.bucket); - map::iterator hiter = headers.begin(); - for (; hiter != headers.end(); ++hiter) { - map::iterator iter = hiter->second.stats.begin(); - for (; iter != hiter->second.stats.end(); ++iter) { - struct rgw_bucket_category_stats& header_stats = iter->second; - entry.size += header_stats.total_size; - entry.size_rounded += header_stats.total_size_rounded; - entry.count += header_stats.num_entries; - } + map::iterator iter = header.stats.begin(); + for (; iter != header.stats.end(); ++iter) { + struct rgw_bucket_category_stats& header_stats = iter->second; + entry.size += header_stats.total_size; + entry.size_rounded += header_stats.total_size_rounded; + entry.count += header_stats.num_entries; } list entries; @@ -7024,81 +6715,6 @@ int RGWRados::remove_temp_objects(string date, string time) return 0; } -void RGWRados::get_bucket_index_objects(const string& bucket_oid_base, - uint32_t num_shards, map& bucket_objects, int shard_id) -{ - if (!num_shards) { - bucket_objects[0] = bucket_oid_base; - } else { - char buf[bucket_oid_base.size() + 32]; - if (shard_id < 0) { - for (uint32_t i = 0; i < num_shards; ++i) { - snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), i); - bucket_objects[i] = buf; - } - } else { - if ((uint32_t)shard_id > num_shards) { - return; - } - snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), shard_id); - bucket_objects[shard_id] = buf; - } - } -} - -void RGWRados::get_bucket_instance_ids(RGWBucketInfo& bucket_info, int shard_id, map *result) -{ - rgw_bucket& bucket = bucket_info.bucket; - string plain_id = bucket.name + ":" + bucket.bucket_id; - if (!bucket_info.num_shards) { - (*result)[0] = plain_id; - } else { - char buf[16]; - if (shard_id < 0) { - for (uint32_t i = 0; i < bucket_info.num_shards; ++i) { - snprintf(buf, sizeof(buf), ":%d", i); - (*result)[i] = plain_id + buf; - } - } else { - if ((uint32_t)shard_id > bucket_info.num_shards) { - return; - } - snprintf(buf, sizeof(buf), ":%d", shard_id); - (*result)[shard_id] = plain_id + buf; - } - } -} - -int RGWRados::get_bucket_index_object(const string& bucket_oid_base, const string& obj_key, - uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj, int *shard_id) -{ - int r = 0; - switch (hash_type) { - case RGWBucketInfo::MOD: - if (!num_shards) { - // By default with no sharding, we use the bucket oid as itself - (*bucket_obj) = bucket_oid_base; - if (shard_id) { - *shard_id = -1; - } - } else { - uint32_t sid = ceph_str_hash_linux(obj_key.c_str(), obj_key.size()); - uint32_t sid2 = sid ^ ((sid & 0xFF) << 24); - sid = sid2 % MAX_BUCKET_INDEX_SHARDS_PRIME % num_shards; - char buf[bucket_oid_base.size() + 32]; - snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), sid); - (*bucket_obj) = buf; - if (shard_id) { - *shard_id = (int)sid; - } - } - break; - default: - r = -ENOTSUP; - } - return r; -} - int RGWRados::process_intent_log(rgw_bucket& bucket, string& oid, time_t epoch, int flags, bool purge) { diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 1071b2f1ae619..57338b8601630 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -913,24 +913,14 @@ struct RGWZone { bool log_meta; bool log_data; -/** - * Represents the number of shards for the bucket index object, a value of zero - * indicates there is no sharding. By default (no sharding, the name of the object - * is '.dir.{marker}', with sharding, the name is '.dir.{markder}.{sharding_id}', - * sharding_id is zero-based value. It is not recommended to set a too large value - * (e.g. thousand) as it increases the cost for bucket listing. - */ - uint32_t bucket_index_max_shards; - - RGWZone() : log_meta(false), log_data(false), bucket_index_max_shards(0) {} + RGWZone() : log_meta(false), log_data(false) {} void encode(bufferlist& bl) const { - ENCODE_START(3, 1, bl); + ENCODE_START(2, 1, bl); ::encode(name, bl); ::encode(endpoints, bl); ::encode(log_meta, bl); ::encode(log_data, bl); - ::encode(bucket_index_max_shards, bl); ENCODE_FINISH(bl); } @@ -942,9 +932,6 @@ struct RGWZone { ::decode(log_meta, bl); ::decode(log_data, bl); } - if (struct_v >= 3) { - ::decode(bucket_index_max_shards, bl); - } DECODE_FINISH(bl); } void dump(Formatter *f) const; @@ -1203,13 +1190,21 @@ public: class RGWGetBucketStats_CB : public RefCountedObject { protected: rgw_bucket bucket; + uint64_t bucket_ver; + uint64_t master_ver; map *stats; + string max_marker; public: RGWGetBucketStats_CB(rgw_bucket& _bucket) : bucket(_bucket), stats(NULL) {} virtual ~RGWGetBucketStats_CB() {} virtual void handle_response(int r) = 0; - virtual void set_response(map *_stats) { + virtual void set_response(uint64_t _bucket_ver, uint64_t _master_ver, + map *_stats, + const string &_max_marker) { + bucket_ver = _bucket_ver; + master_ver = _master_ver; stats = _stats; + max_marker = _max_marker; } }; @@ -1266,20 +1261,6 @@ class RGWRados int open_bucket_data_ctx(rgw_bucket& bucket, librados::IoCtx& io_ctx); int open_bucket_data_extra_ctx(rgw_bucket& bucket, librados::IoCtx& io_ctx); int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, string& bucket_oid); - int open_bucket_index_base(rgw_bucket& bucket, librados::IoCtx& index_ctx, - string& bucket_oid_base); - int open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index_ctx, - const string& obj_key, string *bucket_obj, int *shard_id); - int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, - map& bucket_objs, int shard_id = -1, map *bucket_instance_ids = NULL); - template - int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, - map& oids, map& bucket_objs, - int shard_id = -1, map *bucket_instance_ids = NULL); - void build_bucket_index_marker(const string& shard_id_str, const string& shard_marker, - string *marker); - - void get_bucket_instance_ids(RGWBucketInfo& bucket_info, int shard_id, map *result); struct GetObjState { librados::IoCtx io_ctx; @@ -1313,9 +1294,6 @@ class RGWRados Mutex bucket_id_lock; - // This field represents the number of bucket index object shards - uint32_t bucket_index_max_shards; - int get_obj_ioctx(const rgw_obj& obj, librados::IoCtx *ioctx); int get_obj_ref(const rgw_obj& obj, rgw_rados_ref *ref, rgw_bucket *bucket, bool ref_system_obj = false); uint64_t max_bucket_id; @@ -1387,9 +1365,7 @@ public: gc(NULL), use_gc_thread(false), quota_threads(false), num_watchers(0), watchers(NULL), watch_handles(NULL), watch_initialized(false), - bucket_id_lock("rados_bucket_id"), - bucket_index_max_shards(0), - max_bucket_id(0), + bucket_id_lock("rados_bucket_id"), max_bucket_id(0), cct(NULL), rados(NULL), pools_initialized(false), quota_handler(NULL), @@ -1503,7 +1479,7 @@ public: * create a bucket with name bucket and the given list of attrs * returns 0 on success, -ERR# otherwise. */ - virtual int init_bucket_index(rgw_bucket& bucket, int num_shards); + virtual int init_bucket_index(rgw_bucket& bucket); int select_bucket_placement(RGWUserInfo& user_info, const string& region_name, const std::string& rule, const std::string& bucket_name, rgw_bucket& bucket, string *pselected_rule); int select_legacy_bucket_placement(const string& bucket_name, rgw_bucket& bucket); @@ -1835,8 +1811,8 @@ public: } int decode_policy(bufferlist& bl, ACLOwner *owner); - int get_bucket_stats(rgw_bucket& bucket, string *bucket_ver, string *master_ver, - map& stats, string *max_marker); + int get_bucket_stats(rgw_bucket& bucket, uint64_t *bucket_ver, uint64_t *master_ver, map& stats, + string *max_marker); int get_bucket_stats_async(rgw_bucket& bucket, RGWGetBucketStats_CB *cb); int get_user_stats(const string& user, RGWStorageStats& stats); int get_user_stats_async(const string& user, RGWGetUserStats_CB *cb); @@ -1860,50 +1836,39 @@ public: virtual int put_linked_bucket_info(RGWBucketInfo& info, bool exclusive, time_t mtime, obj_version *pep_objv, map *pattrs, bool create_entry_point); - struct BucketShard { - RGWRados *store; - rgw_bucket bucket; - int shard_id; - librados::IoCtx index_ctx; - string bucket_obj; - - BucketShard(RGWRados *_store) : store(_store), shard_id(-1) {} - int init(rgw_bucket& _bucket, rgw_obj& obj); - }; - int cls_rgw_init_index(librados::IoCtx& io_ctx, librados::ObjectWriteOperation& op, string& oid); - int cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag, string& name, string& locator); - int cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch, + int cls_obj_prepare_op(rgw_bucket& bucket, RGWModifyOp op, string& tag, + string& name, string& locator); + int cls_obj_complete_op(rgw_bucket& bucket, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, list *remove_objs); - int cls_obj_complete_add(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, list *remove_objs); - int cls_obj_complete_del(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, string& name); - int cls_obj_complete_cancel(BucketShard& bs, string& tag, string& name); + int cls_obj_complete_add(rgw_bucket& bucket, string& tag, int64_t pool, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, list *remove_objs); + int cls_obj_complete_del(rgw_bucket& bucket, string& tag, int64_t pool, uint64_t epoch, string& name); + int cls_obj_complete_cancel(rgw_bucket& bucket, string& tag, string& name); int cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeout); - int cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix, uint32_t hint_num, - map& m, bool *is_truncated, string *last_entry, - bool (*force_check_filter)(const string& name) = NULL); - int cls_bucket_head(rgw_bucket& bucket, map& headers, map *bucket_instance_ids = NULL); - int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx, int *num_aio); - - int prepare_update_index(RGWObjState *state, BucketShard& bucket_shard, + int cls_bucket_list(rgw_bucket& bucket, string start, string prefix, uint32_t num, + map& m, bool *is_truncated, + string *last_entry, bool (*force_check_filter)(const string& name) = NULL); + int cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header); + int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx); + int prepare_update_index(RGWObjState *state, rgw_bucket& bucket, RGWModifyOp op, rgw_obj& oid, string& tag); - int complete_update_index(BucketShard& bucket_shard, rgw_obj& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size, + int complete_update_index(rgw_bucket& bucket, string& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size, utime_t& ut, string& etag, string& content_type, bufferlist *acl_bl, RGWObjCategory category, list *remove_objs); - int complete_update_index_del(BucketShard& bucket_shard, rgw_obj& oid, string& tag, int64_t pool, uint64_t epoch) { - if (bucket_is_system(bucket_shard.bucket)) + int complete_update_index_del(rgw_bucket& bucket, string& oid, string& tag, int64_t pool, uint64_t epoch) { + if (bucket_is_system(bucket)) return 0; - return cls_obj_complete_del(bucket_shard, tag, pool, epoch, oid.object); + return cls_obj_complete_del(bucket, tag, pool, epoch, oid); } - int complete_update_index_cancel(BucketShard& bucket_shard, rgw_obj& oid, string& tag) { - if (bucket_is_system(bucket_shard.bucket)) + int complete_update_index_cancel(rgw_bucket& bucket, string& oid, string& tag) { + if (bucket_is_system(bucket)) return 0; - return cls_obj_complete_cancel(bucket_shard, tag, oid.object); + return cls_obj_complete_cancel(bucket, tag, oid); } - int list_bi_log_entries(rgw_bucket& bucket, int shard_id, string& marker, uint32_t max, std::list& result, bool *truncated); - int trim_bi_log_entries(rgw_bucket& bucket, int shard_id, string& marker, string& end_marker); + int list_bi_log_entries(rgw_bucket& bucket, string& marker, uint32_t max, std::list& result, bool *truncated); + int trim_bi_log_entries(rgw_bucket& bucket, string& marker, string& end_marker); int cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info); int cls_obj_usage_log_read(string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries, @@ -1912,7 +1877,7 @@ public: void shard_name(const string& prefix, unsigned max_shards, const string& key, string& name); void shard_name(const string& prefix, unsigned max_shards, const string& section, const string& key, string& name); - void time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, const string& section, const string& key, bufferlist& bl); + void time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, string& section, string& key, bufferlist& bl); int time_log_add(const string& oid, list& entries); int time_log_add(const string& oid, const utime_t& ut, const string& section, const string& key, bufferlist& bl); int time_log_list(const string& oid, utime_t& start_time, utime_t& end_time, @@ -1980,32 +1945,6 @@ public: } private: - /** - * This is a helper method, it generates a list of bucket index objects with the given - * bucket base oid and number of shards. - * - * bucket_oid_base [in] - base name of the bucket index object; - * num_shards [in] - number of bucket index object shards. - * bucket_objs [out] - filled by this method, a list of bucket index objects. - */ - void get_bucket_index_objects(const string& bucket_oid_base, uint32_t num_shards, - map& bucket_objs, int shard_id = -1); - - /** - * Get the bucket index object with the given base bucket index object and object key, - * and the number of bucket index shards. - * - * bucket_oid_base [in] - bucket object base name. - * obj_key [in] - object key. - * num_shards [in] - number of bucket index shards. - * hash_type [in] - type of hash to find the shard ID. - * bucket_obj [out] - the bucket index object for the given object. - * - * Return 0 on success, a failure code otherwise. - */ - int get_bucket_index_object(const string& bucket_oid_base, const string& obj_key, - uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj, int *shard); - int process_intent_log(rgw_bucket& bucket, string& oid, time_t epoch, int flags, bool purge); /** diff --git a/src/rgw/rgw_replica_log.cc b/src/rgw/rgw_replica_log.cc index d72b6af23c305..961abc2e18806 100644 --- a/src/rgw/rgw_replica_log.cc +++ b/src/rgw/rgw_replica_log.cc @@ -14,10 +14,8 @@ #include "rgw_replica_log.h" #include "cls/replica_log/cls_replica_log_client.h" -#include "cls/rgw/cls_rgw_client.h" #include "rgw_rados.h" -#define dout_subsys ceph_subsys_rgw void RGWReplicaBounds::dump(Formatter *f) const { @@ -134,50 +132,3 @@ RGWReplicaBucketLogger::RGWReplicaBucketLogger(RGWRados *_store) : prefix = _store->ctx()->_conf->rgw_replica_log_obj_prefix; prefix.append("."); } - -string RGWReplicaBucketLogger::obj_name(const rgw_bucket& bucket, int shard_id) -{ - string s = prefix + bucket.name; - - if (shard_id >= 0) { - char buf[16]; - snprintf(buf, sizeof(buf), ".%d", shard_id); - s += buf; - } - return s; -} - -int RGWReplicaBucketLogger::update_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id, - const string& marker, const utime_t& time, - const list *entries) -{ - if (shard_id >= 0 || - !BucketIndexShardsManager::is_shards_marker(marker)) { - return RGWReplicaLogger::update_bound(obj_name(bucket, shard_id), pool, - daemon_id, marker, time, entries); - } - - BucketIndexShardsManager sm; - int ret = sm.from_string(marker, shard_id); - if (ret < 0) { - ldout(cct, 0) << "ERROR: could not parse shards marker: " << marker << dendl; - return ret; - } - - map& vals = sm.get(); - - ret = 0; - - map::iterator iter; - for (iter = vals.begin(); iter != vals.end(); ++iter) { - ldout(cct, 20) << "updating bound: bucket=" << bucket << " shard=" << iter->first << " marker=" << marker << dendl; - int r = RGWReplicaLogger::update_bound(obj_name(bucket, iter->first), pool, - daemon_id, iter->second, time, entries); - if (r < 0) { - ldout(cct, 0) << "failed to update bound: bucket=" << bucket << " shard=" << iter->first << " marker=" << marker << dendl; - ret = r; - } - } - - return ret; -} diff --git a/src/rgw/rgw_replica_log.h b/src/rgw/rgw_replica_log.h index a9adc9eedbe6f..456b230a6520e 100644 --- a/src/rgw/rgw_replica_log.h +++ b/src/rgw/rgw_replica_log.h @@ -97,19 +97,20 @@ public: class RGWReplicaBucketLogger : private RGWReplicaLogger { string pool; string prefix; - - string obj_name(const rgw_bucket& bucket, int shard_id); public: RGWReplicaBucketLogger(RGWRados *_store); - int update_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id, + int update_bound(const rgw_bucket& bucket, const string& daemon_id, const string& marker, const utime_t& time, - const list *entries); - int delete_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id) { - return RGWReplicaLogger::delete_bound(obj_name(bucket, shard_id), pool, + const list *entries) { + return RGWReplicaLogger::update_bound(prefix+bucket.name, pool, + daemon_id, marker, time, entries); + } + int delete_bound(const rgw_bucket& bucket, const string& daemon_id) { + return RGWReplicaLogger::delete_bound(prefix+bucket.name, pool, daemon_id); } - int get_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds) { - return RGWReplicaLogger::get_bounds(obj_name(bucket, shard_id), pool, + int get_bounds(const rgw_bucket& bucket, RGWReplicaBounds& bounds) { + return RGWReplicaLogger::get_bounds(prefix+bucket.name, pool, bounds); } }; diff --git a/src/rgw/rgw_rest_log.cc b/src/rgw/rgw_rest_log.cc index 1db6cadb0d8e0..9f32fc9ebd3f9 100644 --- a/src/rgw/rgw_rest_log.cc +++ b/src/rgw/rgw_rest_log.cc @@ -282,12 +282,6 @@ void RGWOp_BILog_List::execute() { return; } - int shard_id; - http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id); - if (http_ret < 0) { - return; - } - if (!bucket_instance.empty()) { http_ret = store->get_bucket_instance_info(NULL, bucket_instance, bucket_info, NULL, NULL); if (http_ret < 0) { @@ -313,7 +307,7 @@ void RGWOp_BILog_List::execute() { send_response(); do { list entries; - int ret = store->list_bi_log_entries(bucket_info.bucket, shard_id, + int ret = store->list_bi_log_entries(bucket_info.bucket, marker, max_entries - count, entries, &truncated); if (ret < 0) { @@ -425,13 +419,6 @@ void RGWOp_BILog_Delete::execute() { http_ret = -EINVAL; return; } - - int shard_id; - http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id); - if (http_ret < 0) { - return; - } - if (!bucket_instance.empty()) { http_ret = store->get_bucket_instance_info(NULL, bucket_instance, bucket_info, NULL, NULL); if (http_ret < 0) { @@ -445,7 +432,7 @@ void RGWOp_BILog_Delete::execute() { return; } } - http_ret = store->trim_bi_log_entries(bucket_info.bucket, shard_id, start_marker, end_marker); + http_ret = store->trim_bi_log_entries(bucket_info.bucket, start_marker, end_marker); if (http_ret < 0) { dout(5) << "ERROR: trim_bi_log_entries() " << dendl; } diff --git a/src/rgw/rgw_rest_log.h b/src/rgw/rgw_rest_log.h index 22221d4078c7d..ff1bf3466d378 100644 --- a/src/rgw/rgw_rest_log.h +++ b/src/rgw/rgw_rest_log.h @@ -38,11 +38,11 @@ public: }; class RGWOp_BILog_Info : public RGWRESTOp { - string bucket_ver; - string master_ver; + uint64_t bucket_ver; + uint64_t master_ver; string max_marker; public: - RGWOp_BILog_Info() : bucket_ver(), master_ver() {} + RGWOp_BILog_Info() : bucket_ver(0), master_ver(0) {} ~RGWOp_BILog_Info() {} int check_caps(RGWUserCaps& caps) { diff --git a/src/rgw/rgw_rest_replica_log.cc b/src/rgw/rgw_rest_replica_log.cc index 0309a2cabd25b..e7dd962f0f764 100644 --- a/src/rgw/rgw_rest_replica_log.cc +++ b/src/rgw/rgw_rest_replica_log.cc @@ -181,13 +181,6 @@ void RGWOp_BILog_SetBounds::execute() { return; } - int shard_id; - http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id); - if (http_ret < 0) { - dout(5) << "failed to parse bucket instance" << dendl; - return; - } - rgw_bucket bucket; if ((http_ret = bucket_instance_to_bucket(store, bucket_instance, bucket)) < 0) return; @@ -201,7 +194,7 @@ void RGWOp_BILog_SetBounds::execute() { return; } - http_ret = rl.update_bound(bucket, shard_id, daemon_id, marker, ut, &markers); + http_ret = rl.update_bound(bucket, daemon_id, marker, ut, &markers); } void RGWOp_BILog_GetBounds::execute() { @@ -213,19 +206,12 @@ void RGWOp_BILog_GetBounds::execute() { return; } - int shard_id; - http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id); - if (http_ret < 0) { - dout(5) << "failed to parse bucket instance" << dendl; - return; - } - rgw_bucket bucket; if ((http_ret = bucket_instance_to_bucket(store, bucket_instance, bucket)) < 0) return; RGWReplicaBucketLogger rl(store); - http_ret = rl.get_bounds(bucket, shard_id, bounds); + http_ret = rl.get_bounds(bucket, bounds); } void RGWOp_BILog_GetBounds::send_response() { @@ -251,19 +237,12 @@ void RGWOp_BILog_DeleteBounds::execute() { return; } - int shard_id; - http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id); - if (http_ret < 0) { - dout(5) << "failed to parse bucket instance" << dendl; - return; - } - rgw_bucket bucket; if ((http_ret = bucket_instance_to_bucket(store, bucket_instance, bucket)) < 0) return; RGWReplicaBucketLogger rl(store); - http_ret = rl.delete_bound(bucket, shard_id, daemon_id); + http_ret = rl.delete_bound(bucket, daemon_id); } RGWOp *RGWHandler_ReplicaLog::op_get() { diff --git a/src/rgw/rgw_rest_swift.cc b/src/rgw/rgw_rest_swift.cc index 20ac37e9990b5..960cf9c3d02c0 100644 --- a/src/rgw/rgw_rest_swift.cc +++ b/src/rgw/rgw_rest_swift.cc @@ -559,6 +559,7 @@ void RGWCopyObj_ObjStore_SWIFT::send_response() int RGWGetObj_ObjStore_SWIFT::send_response_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) { const char *content_type = NULL; + int orig_ret = ret; map response_attrs; map::iterator riter; @@ -600,7 +601,11 @@ int RGWGetObj_ObjStore_SWIFT::send_response_data(bufferlist& bl, off_t bl_ofs, o } } - set_req_state_err(s, (partial_content && !ret) ? STATUS_PARTIAL_CONTENT : ret); + if (partial_content && !ret) + ret = -STATUS_PARTIAL_CONTENT; + + if (ret) + set_req_state_err(s, ret); dump_errno(s); for (riter = response_attrs.begin(); riter != response_attrs.end(); ++riter) { @@ -614,7 +619,7 @@ int RGWGetObj_ObjStore_SWIFT::send_response_data(bufferlist& bl, off_t bl_ofs, o sent_header = true; send_data: - if (get_data && !ret) { + if (get_data && !orig_ret) { int r = s->cio->write(bl.c_str() + bl_ofs, bl_len); if (r < 0) return r; diff --git a/src/test/Makefile.am b/src/test/Makefile.am index 9d8c8c21209c8..03ba231b66483 100644 --- a/src/test/Makefile.am +++ b/src/test/Makefile.am @@ -808,8 +808,8 @@ bin_DEBUGPROGRAMS += ceph_test_cls_hello if WITH_RADOSGW ceph_test_cls_rgw_SOURCES = test/cls_rgw/test_cls_rgw.cc ceph_test_cls_rgw_LDADD = \ - $(LIBRADOS) $(CRYPTO_LIBS) libcls_rgw_client.la \ - $(LIBCOMMON) $(UNITTEST_LDADD) $(CEPH_GLOBAL) $(RADOS_TEST_LDADD) + $(LIBRADOS) libcls_rgw_client.la \ + $(LIBCOMMON) $(UNITTEST_LDADD) $(RADOS_TEST_LDADD) ceph_test_cls_rgw_CXXFLAGS = $(UNITTEST_CXXFLAGS) bin_DEBUGPROGRAMS += ceph_test_cls_rgw endif # WITH_RADOSGW diff --git a/src/test/cls_rgw/test_cls_rgw.cc b/src/test/cls_rgw/test_cls_rgw.cc index 0df7ab2a18743..44cb30307245d 100644 --- a/src/test/cls_rgw/test_cls_rgw.cc +++ b/src/test/cls_rgw/test_cls_rgw.cc @@ -3,7 +3,6 @@ #include "include/types.h" #include "cls/rgw/cls_rgw_client.h" -#include "cls/rgw/cls_rgw_ops.h" #include "gtest/gtest.h" #include "test/librados/test.h" @@ -11,7 +10,6 @@ #include #include #include -#include using namespace librados; @@ -68,20 +66,12 @@ public: void test_stats(librados::IoCtx& ioctx, string& oid, int category, uint64_t num_entries, uint64_t total_size) { - map results; - map oids; - oids[0] = oid; - ASSERT_EQ(0, CLSRGWIssueGetDirHeader(ioctx, oids, results, 8)()); - - uint64_t entries = 0; - uint64_t size = 0; - map::iterator iter = results.begin(); - for (; iter != results.end(); ++iter) { - entries += (iter->second).dir.header.stats[category].num_entries; - size += (iter->second).dir.header.stats[category].total_size; - } - ASSERT_EQ(total_size, size); - ASSERT_EQ(num_entries, entries); + rgw_bucket_dir_header header; + ASSERT_EQ(0, cls_rgw_get_dir_header(ioctx, oid, &header)); + + rgw_bucket_category_stats& stats = header.stats[category]; + ASSERT_EQ(total_size, stats.total_size); + ASSERT_EQ(num_entries, stats.num_entries); } void index_prepare(OpMgr& mgr, librados::IoCtx& ioctx, string& oid, RGWModifyOp index_op, string& tag, string& obj, string& loc) @@ -349,10 +339,9 @@ TEST(cls_rgw, index_suggest) cls_rgw_encode_suggestion(suggest_op, dirent, updates); } - map bucket_objs; - bucket_objs[0] = bucket_oid; - int r = CLSRGWIssueSetTagTimeout(ioctx, bucket_objs, 8 /* max aio */, 1)(); - ASSERT_EQ(0, r); + op = mgr.write_op(); + cls_rgw_bucket_set_tag_timeout(*op, 1); // short tag timeout + ASSERT_EQ(0, ioctx.operate(bucket_oid, op)); sleep(1); -- 2.39.5