map<string, bufferlist> keys;
string filter_prefix, end_key;
- bufferlist start_bl;
- bool start_key_added = false;
uint32_t i = 0;
string key;
key.append(marker);
start_key = key;
- int ret = cls_cxx_map_get_val(hctx, start_key, &start_bl);
- if ((ret < 0) && (ret != -ENOENT)) {
- return ret;
- }
} else {
start_key = key_iter;
}
if (ret < 0)
return ret;
- if ((start_bl.length() > 0) && (!start_key_added)) {
- keys[start_key] = start_bl;
- start_key_added = true;
- }
map<string, bufferlist>::iterator iter = keys.begin();
if (iter == keys.end())
break;
using namespace librados;
+const string BucketIndexShardsManager::KEY_VALUE_SEPARATOR = "#";
+const string BucketIndexShardsManager::SHARDS_SEPARATOR = ",";
+
+/**
+ * This class represents the bucket index object operation callback context.
+ */
+template <typename T>
+class ClsBucketIndexOpCtx : public ObjectOperationCompletion {
+private:
+ T *data;
+ int *ret_code;
+public:
+ ClsBucketIndexOpCtx(T* _data, int *_ret_code) : data(_data), ret_code(_ret_code) { assert(data); }
+ ~ClsBucketIndexOpCtx() {}
+ void handle_completion(int r, bufferlist& outbl) {
+ if (r >= 0) {
+ try {
+ bufferlist::iterator iter = outbl.begin();
+ ::decode((*data), iter);
+ } catch (buffer::error& err) {
+ r = -EIO;
+ }
+ }
+ if (ret_code) {
+ *ret_code = r;
+ }
+ }
+};
+
+void BucketIndexAioManager::do_completion(int id) {
+ Mutex::Locker l(lock);
+
+ map<int, librados::AioCompletion*>::iterator iter = pendings.find(id);
+ assert(iter != pendings.end());
+ completions[id] = iter->second;
+ pendings.erase(iter);
+
+ // If the caller needs a list of finished objects, store them
+ // for further processing
+ map<int, string>::iterator miter = pending_objs.find(id);
+ if (miter != pending_objs.end()) {
+ completion_objs[id] = miter->second;
+ pending_objs.erase(miter);
+ }
+
+ cond.Signal();
+}
+
+bool BucketIndexAioManager::wait_for_completions(int valid_ret_code,
+ int *num_completions, int *ret_code, map<int, string> *objs) {
+ lock.Lock();
+ if (pendings.empty() && completions.empty()) {
+ lock.Unlock();
+ return false;
+ }
+
+ if (completions.empty()) {
+ // Wait for AIO completion
+ cond.Wait(lock);
+ }
+
+ // Clear the completed AIOs
+ map<int, librados::AioCompletion*>::iterator iter = completions.begin();
+ for (; iter != completions.end(); ++iter) {
+ int r = iter->second->get_return_value();
+ if (objs && r == 0) { /* update list of successfully completed objs */
+ map<int, string>::iterator liter = completion_objs.find(iter->first);
+ if (liter != completion_objs.end()) {
+ (*objs)[liter->first] = liter->second;
+ }
+ }
+ if (ret_code && (r < 0 && r != valid_ret_code))
+ (*ret_code) = r;
+ iter->second->release();
+ }
+ if (num_completions)
+ (*num_completions) = completions.size();
+ completions.clear();
+ lock.Unlock();
+
+ return true;
+}
+
void cls_rgw_bucket_init(ObjectWriteOperation& o)
{
bufferlist in;
o.exec("rgw", "bucket_init_index", in);
}
-void cls_rgw_bucket_set_tag_timeout(ObjectWriteOperation& o, uint64_t tag_timeout)
-{
+static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx,
+ const string& oid, BucketIndexAioManager *manager) {
+ bufferlist in;
+ librados::ObjectWriteOperation op;
+ op.create(true);
+ op.exec("rgw", "bucket_init_index", in);
+ return manager->aio_operate(io_ctx, oid, &op);
+}
+
+static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
+ const string& oid, uint64_t timeout, BucketIndexAioManager *manager) {
bufferlist in;
struct rgw_cls_tag_timeout_op call;
- call.tag_timeout = tag_timeout;
+ call.tag_timeout = timeout;
::encode(call, in);
- o.exec("rgw", "bucket_set_tag_timeout", in);
+ ObjectWriteOperation op;
+ op.exec("rgw", "bucket_set_tag_timeout", in);
+ return manager->aio_operate(io_ctx, oid, &op);
+}
+
+int CLSRGWIssueBucketIndexInit::issue_op(int shard_id, const string& oid)
+{
+ return issue_bucket_index_init_op(io_ctx, oid, &manager);
+}
+
+void CLSRGWIssueBucketIndexInit::cleanup()
+{
+ // Do best effort removal
+ for (map<int, string>::iterator citer = objs_container.begin(); citer != iter; ++citer) {
+ io_ctx.remove(citer->second);
+ }
+}
+
+int CLSRGWIssueSetTagTimeout::issue_op(int shard_id, const string& oid)
+{
+ return issue_bucket_set_tag_timeout_op(io_ctx, oid, tag_timeout, &manager);
}
void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag,
o.exec("rgw", "bucket_complete_op", in);
}
-
-int cls_rgw_list_op(IoCtx& io_ctx, string& oid, string& start_obj,
- string& filter_prefix, uint32_t num_entries,
- rgw_bucket_dir *dir, bool *is_truncated)
-{
- bufferlist in, out;
+static bool issue_bucket_list_op(librados::IoCtx& io_ctx,
+ const string& oid, const string& start_obj, const string& filter_prefix,
+ uint32_t num_entries, BucketIndexAioManager *manager,
+ struct rgw_cls_list_ret *pdata) {
+ bufferlist in;
struct rgw_cls_list_op call;
call.start_obj = start_obj;
call.filter_prefix = filter_prefix;
call.num_entries = num_entries;
::encode(call, in);
- int r = io_ctx.exec(oid, "rgw", "bucket_list", in, out);
- if (r < 0)
- return r;
- struct rgw_cls_list_ret ret;
- try {
- bufferlist::iterator iter = out.begin();
- ::decode(ret, iter);
- } catch (buffer::error& err) {
- return -EIO;
- }
+ librados::ObjectReadOperation op;
+ op.exec("rgw", "bucket_list", in, new ClsBucketIndexOpCtx<struct rgw_cls_list_ret>(pdata, NULL));
+ return manager->aio_operate(io_ctx, oid, &op);
+}
- if (dir)
- *dir = ret.dir;
- if (is_truncated)
- *is_truncated = ret.is_truncated;
+int CLSRGWIssueBucketList::issue_op(int shard_id, const string& oid)
+{
+ return issue_bucket_list_op(io_ctx, oid, start_obj, filter_prefix, num_entries, &manager, &result[shard_id]);
+}
- return r;
+static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, const string& oid, int shard_id,
+ BucketIndexShardsManager& marker_mgr, uint32_t max, BucketIndexAioManager *manager,
+ struct cls_rgw_bi_log_list_ret *pdata) {
+ bufferlist in;
+ cls_rgw_bi_log_list_op call;
+ call.marker = marker_mgr.get(shard_id, "");
+ call.max = max;
+ ::encode(call, in);
+
+ librados::ObjectReadOperation op;
+ op.exec("rgw", "bi_log_list", in, new ClsBucketIndexOpCtx<struct cls_rgw_bi_log_list_ret>(pdata, NULL));
+ return manager->aio_operate(io_ctx, oid, &op);
}
-int cls_rgw_bucket_check_index_op(IoCtx& io_ctx, string& oid,
- rgw_bucket_dir_header *existing_header,
- rgw_bucket_dir_header *calculated_header)
+int CLSRGWIssueBILogList::issue_op(int shard_id, const string& oid)
{
- bufferlist in, out;
- int r = io_ctx.exec(oid, "rgw", "bucket_check_index", in, out);
- if (r < 0)
- return r;
+ return issue_bi_log_list_op(io_ctx, oid, shard_id, marker_mgr, max, &manager, &result[shard_id]);
+}
- struct rgw_cls_check_index_ret ret;
- try {
- bufferlist::iterator iter = out.begin();
- ::decode(ret, iter);
- } catch (buffer::error& err) {
- return -EIO;
- }
+static bool issue_bi_log_trim(librados::IoCtx& io_ctx, const string& oid, int shard_id,
+ BucketIndexShardsManager& start_marker_mgr,
+ BucketIndexShardsManager& end_marker_mgr, BucketIndexAioManager *manager) {
+ bufferlist in;
+ cls_rgw_bi_log_trim_op call;
+ call.start_marker = start_marker_mgr.get(shard_id, "");
+ call.end_marker = end_marker_mgr.get(shard_id, "");
+ ::encode(call, in);
+ ObjectWriteOperation op;
+ op.exec("rgw", "bi_log_trim", in);
+ return manager->aio_operate(io_ctx, oid, &op);
+}
- if (existing_header)
- *existing_header = ret.existing_header;
- if (calculated_header)
- *calculated_header = ret.calculated_header;
+int CLSRGWIssueBILogTrim::issue_op(int shard_id, const string& oid)
+{
+ return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager);
+}
- return 0;
+static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager,
+ struct rgw_cls_check_index_ret *pdata) {
+ bufferlist in;
+ librados::ObjectReadOperation op;
+ op.exec("rgw", "bucket_check_index", in, new ClsBucketIndexOpCtx<struct rgw_cls_check_index_ret>(
+ pdata, NULL));
+ return manager->aio_operate(io_ctx, oid, &op);
}
-int cls_rgw_bucket_rebuild_index_op(IoCtx& io_ctx, string& oid)
+int CLSRGWIssueBucketCheck::issue_op(int shard_id, const string& oid)
{
- bufferlist in, out;
- int r = io_ctx.exec(oid, "rgw", "bucket_rebuild_index", in, out);
- if (r < 0)
- return r;
+ return issue_bucket_check_index_op(io_ctx, oid, &manager, &result[shard_id]);
+}
- return 0;
+static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid,
+ BucketIndexAioManager *manager) {
+ bufferlist in;
+ librados::ObjectWriteOperation op;
+ op.exec("rgw", "bucket_rebuild_index", in);
+ return manager->aio_operate(io_ctx, oid, &op);
+}
+
+int CLSRGWIssueBucketRebuild::issue_op(int shard_id, const string& oid)
+{
+ return issue_bucket_rebuild_index_op(io_ctx, oid, &manager);
}
void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates)
o.exec("rgw", "dir_suggest_changes", updates);
}
-int cls_rgw_get_dir_header(IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header)
+int CLSRGWIssueGetDirHeader::issue_op(int shard_id, const string& oid)
{
- bufferlist in, out;
- struct rgw_cls_list_op call;
- call.num_entries = 0;
- ::encode(call, in);
- int r = io_ctx.exec(oid, "rgw", "bucket_list", in, out);
- if (r < 0)
- return r;
-
- struct rgw_cls_list_ret ret;
- try {
- bufferlist::iterator iter = out.begin();
- ::decode(ret, iter);
- } catch (buffer::error& err) {
- return -EIO;
- }
-
- if (header)
- *header = ret.dir.header;
-
- return r;
+ return issue_bucket_list_op(io_ctx, oid, "", "", 0, &manager, &result[shard_id]);
}
class GetDirHeaderCompletion : public ObjectOperationCompletion {
return 0;
}
-int cls_rgw_bi_log_list(IoCtx& io_ctx, string& oid, string& marker, uint32_t max,
- list<rgw_bi_log_entry>& entries, bool *truncated)
-{
- bufferlist in, out;
- cls_rgw_bi_log_list_op call;
- call.marker = marker;
- call.max = max;
- ::encode(call, in);
- int r = io_ctx.exec(oid, "rgw", "bi_log_list", in, out);
- if (r < 0)
- return r;
-
- cls_rgw_bi_log_list_ret ret;
- try {
- bufferlist::iterator iter = out.begin();
- ::decode(ret, iter);
- } catch (buffer::error& err) {
- return -EIO;
- }
-
- entries = ret.entries;
-
- if (truncated)
- *truncated = ret.truncated;
-
- return r;
-}
-
-int cls_rgw_bi_log_trim(IoCtx& io_ctx, string& oid, string& start_marker, string& end_marker)
-{
- do {
- int r;
- bufferlist in, out;
- cls_rgw_bi_log_trim_op call;
- call.start_marker = start_marker;
- call.end_marker = end_marker;
- ::encode(call, in);
- r = io_ctx.exec(oid, "rgw", "bi_log_trim", in, out);
-
- if (r == -ENODATA)
- break;
-
- if (r < 0)
- return r;
-
- } while (1);
-
- return 0;
-}
-
int cls_rgw_usage_log_read(IoCtx& io_ctx, string& oid, string& user,
uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
string& read_iter, map<rgw_user_bucket, rgw_usage_log_entry>& usage,
#define CEPH_CLS_RGW_CLIENT_H
#include "include/types.h"
+#include "include/str_list.h"
#include "include/rados/librados.hpp"
#include "cls_rgw_types.h"
+#include "cls_rgw_ops.h"
#include "common/RefCountedObj.h"
+// Forward declaration
+class BucketIndexAioManager;
+
+/*
+ * Bucket index AIO request argument, this is used to pass a argument
+ * to callback.
+ */
+struct BucketIndexAioArg : public RefCountedObject {
+ BucketIndexAioArg(int _id, BucketIndexAioManager* _manager) :
+ id(_id), manager(_manager) {}
+ int id;
+ BucketIndexAioManager* manager;
+};
+
+/*
+ * This class manages AIO completions. This class is not completely thread-safe,
+ * methods like *get_next* is not thread-safe and is expected to be called from
+ * within one thread.
+ */
+class BucketIndexAioManager {
+private:
+ map<int, librados::AioCompletion*> pendings;
+ map<int, librados::AioCompletion*> completions;
+ map<int, string> pending_objs;
+ map<int, string> completion_objs;
+ int next;
+ Mutex lock;
+ Cond cond;
+ /*
+ * Callback implementation for AIO request.
+ */
+ static void bucket_index_op_completion_cb(void* cb, void* arg) {
+ BucketIndexAioArg* cb_arg = (BucketIndexAioArg*) arg;
+ cb_arg->manager->do_completion(cb_arg->id);
+ cb_arg->put();
+ }
+
+ /*
+ * Get next request ID. This method is not thread-safe.
+ *
+ * Return next request ID.
+ */
+ int get_next() { return next++; }
+
+ /*
+ * Add a new pending AIO completion instance.
+ *
+ * @param id - the request ID.
+ * @param completion - the AIO completion instance.
+ * @param oid - the object id associated with the object, if it is NULL, we don't
+ * track the object id per callback.
+ */
+ void add_pending(int id, librados::AioCompletion* completion, const string& oid) {
+ pendings[id] = completion;
+ pending_objs[id] = oid;
+ }
+public:
+ /*
+ * Create a new instance.
+ */
+ BucketIndexAioManager() : next(0), lock("BucketIndexAioManager::lock") {}
+
+
+ /*
+ * Do completion for the given AIO request.
+ */
+ void do_completion(int id);
+
+ /*
+ * Wait for AIO completions.
+ *
+ * valid_ret_code - valid AIO return code.
+ * num_completions - number of completions.
+ * ret_code - return code of failed AIO.
+ * objs - a list of objects that has been finished the AIO.
+ *
+ * Return false if there is no pending AIO, true otherwise.
+ */
+ bool wait_for_completions(int valid_ret_code, int *num_completions, int *ret_code,
+ map<int, string> *objs);
+
+ /**
+ * Do aio read operation.
+ */
+ bool aio_operate(librados::IoCtx& io_ctx, const string& oid, librados::ObjectReadOperation *op) {
+ Mutex::Locker l(lock);
+ BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this);
+ librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
+ int r = io_ctx.aio_operate(oid, c, (librados::ObjectReadOperation*)op, NULL);
+ if (r >= 0) {
+ add_pending(arg->id, c, oid);
+ }
+ return r;
+ }
+
+ /**
+ * Do aio write operation.
+ */
+ bool aio_operate(librados::IoCtx& io_ctx, const string& oid, librados::ObjectWriteOperation *op) {
+ Mutex::Locker l(lock);
+ BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this);
+ librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
+ int r = io_ctx.aio_operate(oid, c, (librados::ObjectWriteOperation*)op);
+ if (r >= 0) {
+ add_pending(arg->id, c, oid);
+ }
+ return r;
+ }
+};
+
class RGWGetDirHeader_CB : public RefCountedObject {
public:
virtual ~RGWGetDirHeader_CB() {}
virtual void handle_response(int r, rgw_bucket_dir_header& header) = 0;
};
+class BucketIndexShardsManager {
+private:
+ // Per shard setting manager, for example, marker.
+ map<int, string> value_by_shards;
+public:
+ const static string KEY_VALUE_SEPARATOR;
+ const static string SHARDS_SEPARATOR;
+
+ void add(int shard, const string& value) {
+ value_by_shards[shard] = value;
+ }
+
+ const string& get(int shard, const string& default_value) {
+ map<int, string>::iterator iter = value_by_shards.find(shard);
+ return (iter == value_by_shards.end() ? default_value : iter->second);
+ }
+
+ map<int, string>& get() {
+ return value_by_shards;
+ }
+
+ bool empty() {
+ return value_by_shards.empty();
+ }
+
+ void to_string(string *out) const {
+ if (!out) {
+ return;
+ }
+ out->clear();
+ map<int, string>::const_iterator iter = value_by_shards.begin();
+ for (; iter != value_by_shards.end(); ++iter) {
+ if (out->length()) {
+ // Not the first item, append a separator first
+ out->append(SHARDS_SEPARATOR);
+ }
+ char buf[16];
+ snprintf(buf, sizeof(buf), "%d", iter->first);
+ out->append(buf);
+ out->append(KEY_VALUE_SEPARATOR);
+ out->append(iter->second);
+ }
+ }
+
+ static bool is_shards_marker(const string& marker) {
+ return marker.find(KEY_VALUE_SEPARATOR) != string::npos;
+ }
+
+ /*
+ * convert from string. There are two options of how the string looks like:
+ *
+ * 1. Single shard, no shard id specified, e.g. 000001.23.1
+ *
+ * for this case, if passed shard_id >= 0, use this shard id, otherwise assume that it's a
+ * bucket with no shards.
+ *
+ * 2. One or more shards, shard id specified for each shard, e.g., 0#00002.12,1#00003.23.2
+ *
+ */
+ int from_string(const string& composed_marker, int shard_id) {
+ value_by_shards.clear();
+ vector<string> shards;
+ get_str_vec(composed_marker, SHARDS_SEPARATOR.c_str(), shards);
+ if (shards.size() > 1 && shard_id >= 0) {
+ return -EINVAL;
+ }
+ vector<string>::const_iterator iter = shards.begin();
+ for (; iter != shards.end(); ++iter) {
+ size_t pos = iter->find(KEY_VALUE_SEPARATOR);
+ if (pos == string::npos) {
+ if (!value_by_shards.empty()) {
+ return -EINVAL;
+ }
+ if (shard_id < 0) {
+ add(0, *iter);
+ } else {
+ add(shard_id, *iter);
+ }
+ return 0;
+ }
+ string shard_str = iter->substr(0, pos);
+ string err;
+ int shard = (int)strict_strtol(shard_str.c_str(), 10, &err);
+ if (!err.empty()) {
+ return -EINVAL;
+ }
+ add(shard, iter->substr(pos + 1));
+ }
+ return 0;
+ }
+};
+
/* bucket index */
void cls_rgw_bucket_init(librados::ObjectWriteOperation& o);
-void cls_rgw_bucket_set_tag_timeout(librados::ObjectWriteOperation& o, uint64_t tag_timeout);
+class CLSRGWConcurrentIO {
+protected:
+ librados::IoCtx& io_ctx;
+ map<int, string>& objs_container;
+ map<int, string>::iterator iter;
+ uint32_t max_aio;
+ BucketIndexAioManager manager;
+
+ virtual int issue_op(int shard_id, const string& oid) = 0;
+
+ virtual void cleanup() {}
+ virtual int valid_ret_code() { return 0; }
+ // Return true if multiple rounds of OPs might be needed, this happens when
+ // OP needs to be re-send until a certain code is returned.
+ virtual bool need_multiple_rounds() { return false; }
+ // Add a new object to the end of the container.
+ virtual void add_object(int shard, const string& oid) {}
+ virtual void reset_container(map<int, string>& objs) {}
+
+public:
+ CLSRGWConcurrentIO(librados::IoCtx& ioc, map<int, string>& _objs_container,
+ uint32_t _max_aio) : io_ctx(ioc), objs_container(_objs_container), max_aio(_max_aio) {}
+ virtual ~CLSRGWConcurrentIO() {}
+
+ int operator()() {
+ int ret = 0;
+ iter = objs_container.begin();
+ for (; iter != objs_container.end() && max_aio-- > 0; ++iter) {
+ ret = issue_op(iter->first, iter->second);
+ if (ret < 0)
+ break;
+ }
+
+ int num_completions, r = 0;
+ map<int, string> objs;
+ map<int, string> *pobjs = (need_multiple_rounds() ? &objs : NULL);
+ while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r, pobjs)) {
+ if (r >= 0 && ret >= 0) {
+ for(int i = 0; i < num_completions && iter != objs_container.end(); ++i, ++iter) {
+ int issue_ret = issue_op(iter->first, iter->second);
+ if(issue_ret < 0) {
+ ret = issue_ret;
+ break;
+ }
+ }
+ } else if (ret >= 0) {
+ ret = r;
+ }
+ if (need_multiple_rounds() && iter == objs_container.end() && !objs.empty()) {
+ // For those objects which need another round, use them to reset
+ // the container
+ reset_container(objs);
+ }
+ }
+
+ if (ret < 0) {
+ cleanup();
+ }
+ return ret;
+ }
+};
+
+class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO {
+protected:
+ int issue_op(int shard_id, const string& oid);
+ int valid_ret_code() { return -EEXIST; }
+ void cleanup();
+public:
+ CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc, map<int, string>& _bucket_objs,
+ uint32_t _max_aio) :
+ CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {}
+};
+
+class CLSRGWIssueSetTagTimeout : public CLSRGWConcurrentIO {
+ uint64_t tag_timeout;
+protected:
+ int issue_op(int shard_id, const string& oid);
+public:
+ CLSRGWIssueSetTagTimeout(librados::IoCtx& ioc, map<int, string>& _bucket_objs,
+ uint32_t _max_aio, uint64_t _tag_timeout) :
+ CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio), tag_timeout(_tag_timeout) {}
+};
void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag,
string& name, string& locator, bool log_op);
rgw_bucket_entry_ver& ver, string& name, rgw_bucket_dir_entry_meta& dir_meta,
list<string> *remove_objs, bool log_op);
-int cls_rgw_list_op(librados::IoCtx& io_ctx, string& oid, string& start_obj,
- string& filter_prefix, uint32_t num_entries,
- rgw_bucket_dir *dir, bool *is_truncated);
+/**
+ * List the bucket with the starting object and filter prefix.
+ * NOTE: this method do listing requests for each bucket index shards identified by
+ * the keys of the *list_results* map, which means the map should be popludated
+ * by the caller to fill with each bucket index object id.
+ *
+ * io_ctx - IO context for rados.
+ * start_obj - marker for the listing.
+ * filter_prefix - filter prefix.
+ * num_entries - number of entries to request for each object (note the total
+ * amount of entries returned depends on the number of shardings).
+ * list_results - the list results keyed by bucket index object id.
+ * max_aio - the maximum number of AIO (for throttling).
+ *
+ * Return 0 on success, a failure code otherwise.
+*/
+
+class CLSRGWIssueBucketList : public CLSRGWConcurrentIO {
+ string start_obj;
+ string filter_prefix;
+ uint32_t num_entries;
+ map<int, rgw_cls_list_ret>& result;
+protected:
+ int issue_op(int shard_id, const string& oid);
+public:
+ CLSRGWIssueBucketList(librados::IoCtx& io_ctx, const string& _start_obj,
+ const string& _filter_prefix, uint32_t _num_entries,
+ map<int, string>& oids,
+ map<int, struct rgw_cls_list_ret>& list_results,
+ uint32_t max_aio) :
+ CLSRGWConcurrentIO(io_ctx, oids, max_aio),
+ start_obj(_start_obj), filter_prefix(_filter_prefix), num_entries(_num_entries), result(list_results) {}
+};
+
+class CLSRGWIssueBILogList : public CLSRGWConcurrentIO {
+ map<int, struct cls_rgw_bi_log_list_ret>& result;
+ BucketIndexShardsManager& marker_mgr;
+ uint32_t max;
+protected:
+ int issue_op(int shard_id, const string& oid);
+public:
+ CLSRGWIssueBILogList(librados::IoCtx& io_ctx, BucketIndexShardsManager& _marker_mgr, uint32_t _max,
+ map<int, string>& oids,
+ map<int, struct cls_rgw_bi_log_list_ret>& bi_log_lists, uint32_t max_aio) :
+ CLSRGWConcurrentIO(io_ctx, oids, max_aio), result(bi_log_lists),
+ marker_mgr(_marker_mgr), max(_max) {}
+};
+
+class CLSRGWIssueBILogTrim : public CLSRGWConcurrentIO {
+ BucketIndexShardsManager& start_marker_mgr;
+ BucketIndexShardsManager& end_marker_mgr;
+protected:
+ int issue_op(int shard_id, const string& oid);
+ // Trim until -ENODATA is returned.
+ int valid_ret_code() { return -ENODATA; }
+ bool need_multiple_rounds() { return true; }
+ void add_object(int shard, const string& oid) { objs_container[shard] = oid; }
+ void reset_container(map<int, string>& objs) {
+ objs_container.swap(objs);
+ iter = objs_container.begin();
+ objs.clear();
+ }
+public:
+ CLSRGWIssueBILogTrim(librados::IoCtx& io_ctx, BucketIndexShardsManager& _start_marker_mgr,
+ BucketIndexShardsManager& _end_marker_mgr, map<int, string>& _bucket_objs, uint32_t max_aio) :
+ CLSRGWConcurrentIO(io_ctx, _bucket_objs, max_aio),
+ start_marker_mgr(_start_marker_mgr), end_marker_mgr(_end_marker_mgr) {}
+};
+
+/**
+ * Check the bucket index.
+ *
+ * io_ctx - IO context for rados.
+ * bucket_objs_ret - check result for all shards.
+ * max_aio - the maximum number of AIO (for throttling).
+ *
+ * Return 0 on success, a failure code otherwise.
+ */
+class CLSRGWIssueBucketCheck : public CLSRGWConcurrentIO /*<map<string, struct rgw_cls_check_index_ret> >*/ {
+ map<int, struct rgw_cls_check_index_ret>& result;
+protected:
+ int issue_op(int shard_id, const string& oid);
+public:
+ CLSRGWIssueBucketCheck(librados::IoCtx& ioc, map<int, string>& oids, map<int, struct rgw_cls_check_index_ret>& bucket_objs_ret,
+ uint32_t _max_aio) :
+ CLSRGWConcurrentIO(ioc, oids, _max_aio), result(bucket_objs_ret) {}
+};
+
+class CLSRGWIssueBucketRebuild : public CLSRGWConcurrentIO {
+protected:
+ int issue_op(int shard_id, const string& oid);
+public:
+ CLSRGWIssueBucketRebuild(librados::IoCtx& io_ctx, map<int, string>& bucket_objs,
+ uint32_t max_aio) : CLSRGWConcurrentIO(io_ctx, bucket_objs, max_aio) {}
+};
+
+class CLSRGWIssueGetDirHeader : public CLSRGWConcurrentIO {
+ map<int, rgw_cls_list_ret>& result;
+protected:
+ int issue_op(int shard_id, const string& oid);
+public:
+ CLSRGWIssueGetDirHeader(librados::IoCtx& io_ctx, map<int, string>& oids, map<int, rgw_cls_list_ret>& dir_headers,
+ uint32_t max_aio) :
+ CLSRGWConcurrentIO(io_ctx, oids, max_aio), result(dir_headers) {}
+};
-int cls_rgw_bucket_check_index_op(librados::IoCtx& io_ctx, string& oid,
- rgw_bucket_dir_header *existing_header,
- rgw_bucket_dir_header *calculated_header);
-int cls_rgw_bucket_rebuild_index_op(librados::IoCtx& io_ctx, string& oid);
-
-int cls_rgw_get_dir_header(librados::IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header);
int cls_rgw_get_dir_header_async(librados::IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx);
void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates);
void cls_rgw_suggest_changes(librados::ObjectWriteOperation& o, bufferlist& updates);
-/* bucket index log */
-
-int cls_rgw_bi_log_list(librados::IoCtx& io_ctx, string& oid, string& marker, uint32_t max,
- list<rgw_bi_log_entry>& entries, bool *truncated);
-int cls_rgw_bi_log_trim(librados::IoCtx& io_ctx, string& oid, string& start_marker, string& end_marker);
-
/* usage logging */
int cls_rgw_usage_log_read(librados::IoCtx& io_ctx, string& oid, string& user,
uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
OPTION(rgw_max_chunk_size, OPT_INT, 512 * 1024)
+/**
+ * override max bucket index shards in zone configuration (if not zero)
+ *
+ * Represents the number of shards for the bucket index object, a value of zero
+ * indicates there is no sharding. By default (no sharding, the name of the object
+ * is '.dir.{marker}', with sharding, the name is '.dir.{markder}.{sharding_id}',
+ * sharding_id is zero-based value. It is not recommended to set a too large value
+ * (e.g. thousand) as it increases the cost for bucket listing.
+ */
+OPTION(rgw_override_bucket_index_max_shards, OPT_U32, 0)
+
+/**
+ * Represents the maximum AIO pending requests for the bucket index object shards.
+ */
+OPTION(rgw_bucket_index_max_aio, OPT_U32, 8)
+
OPTION(rgw_data, OPT_STR, "/var/lib/ceph/radosgw/$cluster-$id")
OPTION(rgw_enable_apis, OPT_STR, "s3, swift, swift_auth, admin")
OPTION(rgw_cache_enabled, OPT_BOOL, true) // rgw cache enabled
return r;
map<RGWObjCategory, RGWStorageStats> stats;
- uint64_t bucket_ver, master_ver;
+ string bucket_ver, master_ver;
string max_marker;
int ret = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, stats, &max_marker);
if (ret < 0) {
formatter->dump_string("marker", bucket.marker);
formatter->dump_string("owner", bucket_info.owner);
formatter->dump_int("mtime", mtime);
- formatter->dump_int("ver", bucket_ver);
- formatter->dump_int("master_ver", master_ver);
+ formatter->dump_string("ver", bucket_ver);
+ formatter->dump_string("master_ver", master_ver);
formatter->dump_string("max_marker", max_marker);
dump_bucket_usage(stats, formatter);
formatter->close_section();
do {
list<rgw_bi_log_entry> entries;
- ret = store->list_bi_log_entries(bucket, marker, max_entries - count, entries, &truncated);
+ ret = store->list_bi_log_entries(bucket, shard_id, marker, max_entries - count, entries, &truncated);
if (ret < 0) {
cerr << "ERROR: list_bi_log_entries(): " << cpp_strerror(-ret) << std::endl;
return -ret;
cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
- ret = store->trim_bi_log_entries(bucket, start_marker, end_marker);
+ ret = store->trim_bi_log_entries(bucket, shard_id, start_marker, end_marker);
if (ret < 0) {
cerr << "ERROR: trim_bi_log_entries(): " << cpp_strerror(-ret) << std::endl;
return -ret;
}
RGWReplicaBucketLogger logger(store);
- ret = logger.get_bounds(bucket, bounds);
+ ret = logger.get_bounds(bucket, shard_id, bounds);
if (ret < 0)
return -ret;
} else { // shouldn't get here
}
RGWReplicaBucketLogger logger(store);
- ret = logger.delete_bound(bucket, daemon_id);
+ ret = logger.delete_bound(bucket, shard_id, daemon_id);
if (ret < 0)
return -ret;
}
return store->meta_mgr->remove_entry(bucket_instance_meta_handler, entry, objv_tracker);
}
+int rgw_bucket_parse_bucket_instance(const string& bucket_instance, string *target_bucket_instance, int *shard_id)
+{
+ ssize_t pos = bucket_instance.rfind(':');
+ if (pos < 0) {
+ return -EINVAL;
+ }
+
+ string first = bucket_instance.substr(0, pos);
+ string second = bucket_instance.substr(pos + 1);
+
+ if (first.find(':') == string::npos) {
+ *shard_id = -1;
+ *target_bucket_instance = bucket_instance;
+ return 0;
+ }
+
+ *target_bucket_instance = first;
+ string err;
+ *shard_id = strict_strtol(second.c_str(), 10, &err);
+ if (!err.empty()) {
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
int rgw_bucket_set_attrs(RGWRados *store, RGWBucketInfo& bucket_info,
map<string, bufferlist>& attrs,
map<string, bufferlist>* rmattrs,
RGWBucketInfo info;
bufferlist bl;
- uint64_t bucket_ver, master_ver;
+ string bucket_ver, master_ver;
ret = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, stats, NULL);
if (ret < 0)
while (is_truncated) {
map<string, RGWObjEnt> result;
- int r = store->cls_bucket_list(bucket, marker, prefix, 1000, result,
- &is_truncated, &marker,
- bucket_object_check_filter);
+ int r = store->cls_bucket_list(bucket, marker, prefix, 1000,
+ result, &is_truncated, &marker,
+ bucket_object_check_filter);
if (r == -ENOENT) {
break;
bucket = bucket_info.bucket;
- uint64_t bucket_ver, master_ver;
+ string bucket_ver, master_ver;
string max_marker;
int ret = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, stats, &max_marker);
if (ret < 0) {
formatter->dump_string("id", bucket.bucket_id);
formatter->dump_string("marker", bucket.marker);
formatter->dump_string("owner", bucket_info.owner);
- formatter->dump_int("ver", bucket_ver);
- formatter->dump_int("master_ver", master_ver);
+ formatter->dump_string("ver", bucket_ver);
+ formatter->dump_string("master_ver", master_ver);
formatter->dump_int("mtime", mtime);
formatter->dump_string("max_marker", max_marker);
dump_bucket_usage(stats, formatter);
}
-int RGWDataChangesLog::choose_oid(rgw_bucket& bucket) {
- string& name = bucket.name;
- uint32_t r = ceph_str_hash_linux(name.c_str(), name.size()) % num_shards;
+int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) {
+ const string& name = bs.bucket.name;
+ int shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0);
+ uint32_t r = (ceph_str_hash_linux(name.c_str(), name.size()) + shard_shift) % num_shards;
return (int)r;
}
/* we can't keep the bucket name as part of the cls_log_entry, and we need
* it later, so we keep two lists under the map */
- map<int, pair<list<string>, list<cls_log_entry> > > m;
+ map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > > m;
lock.Lock();
- map<string, rgw_bucket> entries;
+ map<rgw_bucket_shard, bool> entries;
entries.swap(cur_cycle);
lock.Unlock();
- map<string, rgw_bucket>::iterator iter;
+ map<rgw_bucket_shard, bool>::iterator iter;
string section;
utime_t ut = ceph_clock_now(cct);
for (iter = entries.begin(); iter != entries.end(); ++iter) {
- rgw_bucket& bucket = iter->second;
- int index = choose_oid(bucket);
+ const rgw_bucket_shard& bs = iter->first;
+ const rgw_bucket& bucket = bs.bucket;
+ int shard_id = bs.shard_id;
+
+ int index = choose_oid(bs);
cls_log_entry entry;
bufferlist bl;
change.entity_type = ENTITY_TYPE_BUCKET;
change.key = bucket.name + ":" + bucket.bucket_id;
+ if (shard_id >= 0) {
+ char buf[16];
+ snprintf(buf, sizeof(buf), ":%d", shard_id);
+ change.key += buf;
+ }
change.timestamp = ut;
::encode(change, bl);
store->time_log_prepare_entry(entry, ut, section, bucket.name, bl);
- m[index].first.push_back(bucket.name);
+ m[index].first.push_back(bs);
m[index].second.push_back(entry);
}
- map<int, pair<list<string>, list<cls_log_entry> > >::iterator miter;
+ map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > >::iterator miter;
for (miter = m.begin(); miter != m.end(); ++miter) {
list<cls_log_entry>& entries = miter->second.second;
utime_t expiration = now;
expiration += utime_t(cct->_conf->rgw_data_log_window, 0);
- list<string>& buckets = miter->second.first;
- list<string>::iterator liter;
+ list<rgw_bucket_shard>& buckets = miter->second.first;
+ list<rgw_bucket_shard>::iterator liter;
for (liter = buckets.begin(); liter != buckets.end(); ++liter) {
update_renewed(*liter, expiration);
}
return 0;
}
-void RGWDataChangesLog::_get_change(string& bucket_name, ChangeStatusPtr& status)
+void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status)
{
assert(lock.is_locked());
- if (!changes.find(bucket_name, status)) {
+ if (!changes.find(bs, status)) {
status = ChangeStatusPtr(new ChangeStatus);
- changes.add(bucket_name, status);
+ changes.add(bs, status);
}
}
-void RGWDataChangesLog::register_renew(rgw_bucket& bucket)
+void RGWDataChangesLog::register_renew(rgw_bucket_shard& bs)
{
Mutex::Locker l(lock);
- cur_cycle[bucket.name] = bucket;
+ cur_cycle[bs] = true;
}
-void RGWDataChangesLog::update_renewed(string& bucket_name, utime_t& expiration)
+void RGWDataChangesLog::update_renewed(rgw_bucket_shard& bs, utime_t& expiration)
{
Mutex::Locker l(lock);
ChangeStatusPtr status;
- _get_change(bucket_name, status);
+ _get_change(bs, status);
- ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bucket_name << " expiration=" << expiration << dendl;
+ ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bs.bucket.name << " shard_id=" << bs.shard_id << " expiration=" << expiration << dendl;
status->cur_expiration = expiration;
}
-int RGWDataChangesLog::add_entry(rgw_bucket& bucket) {
+int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) {
if (!store->need_to_log_data())
return 0;
+ rgw_bucket_shard bs(bucket, shard_id);
+
lock.Lock();
ChangeStatusPtr status;
- _get_change(bucket.name, status);
+ _get_change(bs, status);
lock.Unlock();
status->lock->Lock();
- ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl;
+ ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " shard_id=" << shard_id << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl;
if (now < status->cur_expiration) {
/* no need to send, recently completed */
status->lock->Unlock();
- register_renew(bucket);
+ register_renew(bs);
return 0;
}
int ret = cond->wait();
cond->put();
if (!ret) {
- register_renew(bucket);
+ register_renew(bs);
}
return ret;
}
status->cond = new RefCountedCond;
status->pending = true;
- string& oid = oids[choose_oid(bucket)];
+ string& oid = oids[choose_oid(bs)];
utime_t expiration;
int ret;
rgw_data_change change;
change.entity_type = ENTITY_TYPE_BUCKET;
change.key = bucket.name + ":" + bucket.bucket_id;
+ if (shard_id >= 0) {
+ char buf[16];
+ snprintf(buf, sizeof(buf), ":%d", shard_id);
+ change.key += buf;
+ }
change.timestamp = now;
::encode(change, bl);
string section;
objv_tracker = bci.info.objv_tracker;
- ret = store->init_bucket_index(bci.info.bucket);
+ ret = store->init_bucket_index(bci.info.bucket, bci.info.num_shards);
if (ret < 0)
return ret;
map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker,
time_t mtime);
+extern int rgw_bucket_parse_bucket_instance(const string& bucket_instance, string *target_bucket_instance, int *shard_id);
+
extern int rgw_bucket_instance_remove_entry(RGWRados *store, string& entry, RGWObjVersionTracker *objv_tracker);
extern int rgw_bucket_delete_bucket_obj(RGWRados *store, string& bucket_name, RGWObjVersionTracker& objv_tracker);
typedef ceph::shared_ptr<ChangeStatus> ChangeStatusPtr;
- lru_map<string, ChangeStatusPtr> changes;
+ lru_map<rgw_bucket_shard, ChangeStatusPtr> changes;
- map<string, rgw_bucket> cur_cycle;
+ map<rgw_bucket_shard, bool> cur_cycle;
- void _get_change(string& bucket_name, ChangeStatusPtr& status);
- void register_renew(rgw_bucket& bucket);
- void update_renewed(string& bucket_name, utime_t& expiration);
+ void _get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status);
+ void register_renew(rgw_bucket_shard& bs);
+ void update_renewed(rgw_bucket_shard& bs, utime_t& expiration);
class ChangesRenewThread : public Thread {
CephContext *cct;
~RGWDataChangesLog();
- int choose_oid(rgw_bucket& bucket);
- int add_entry(rgw_bucket& bucket);
+ int choose_oid(const rgw_bucket_shard& bs);
+ int add_entry(rgw_bucket& bucket, int shard_id);
int renew_entries();
int list_entries(int shard, utime_t& start_time, utime_t& end_time, int max_entries,
list<rgw_data_change>& entries,
PerfCounters *perfcounter = NULL;
+const uint32_t RGWBucketInfo::NUM_SHARDS_BLIND_BUCKET(UINT32_MAX);
+
int rgw_perf_start(CephContext *cct)
{
PerfCountersBuilder plb(cct, cct->_conf->name.to_str(), l_rgw_first, l_rgw_last);
#define ERR_USER_SUSPENDED 2100
#define ERR_INTERNAL_ERROR 2200
+#ifndef UINT32_MAX
+#define UINT32_MAX (4294967295)
+#endif
+
typedef void *RGWAccessHandle;
return out;
}
+struct rgw_bucket_shard {
+ rgw_bucket bucket;
+ int shard_id;
+
+ rgw_bucket_shard() : shard_id(-1) {}
+ rgw_bucket_shard(rgw_bucket& _b, int _sid) : bucket(_b), shard_id(_sid) {}
+
+ bool operator<(const rgw_bucket_shard& b) const {
+ if (bucket < b.bucket) {
+ return true;
+ }
+ if (b.bucket < bucket) {
+ return false;
+ }
+ return shard_id < b.shard_id;
+ }
+};
+
+
struct RGWObjVersionTracker {
obj_version read_version;
obj_version write_version;
struct RGWBucketInfo
{
+ enum BIShardsHashType {
+ MOD = 0
+ };
+
rgw_bucket bucket;
string owner;
uint32_t flags;
obj_version ep_objv; /* entry point object version, for runtime tracking only */
RGWQuotaInfo quota;
+ // Represents the number of bucket index object shards:
+ // - value of 0 indicates there is no sharding (this is by default before this
+ // feature is implemented).
+ // - value of UINT32_T::MAX indicates this is a blind bucket.
+ uint32_t num_shards;
+
+ // Represents the bucket index shard hash type.
+ uint8_t bucket_index_shard_hash_type;
+
+ // Represents the shard number for blind bucket.
+ const static uint32_t NUM_SHARDS_BLIND_BUCKET;
+
void encode(bufferlist& bl) const {
- ENCODE_START(9, 4, bl);
+ ENCODE_START(11, 4, bl);
::encode(bucket, bl);
::encode(owner, bl);
::encode(flags, bl);
::encode(placement_rule, bl);
::encode(has_instance_obj, bl);
::encode(quota, bl);
+ ::encode(num_shards, bl);
+ ::encode(bucket_index_shard_hash_type, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator& bl) {
::decode(has_instance_obj, bl);
if (struct_v >= 9)
::decode(quota, bl);
+ if (struct_v >= 10)
+ ::decode(num_shards, bl);
+ if (struct_v >= 11)
+ ::decode(bucket_index_shard_hash_type, bl);
DECODE_FINISH(bl);
}
void dump(Formatter *f) const;
void decode_json(JSONObj *obj);
- RGWBucketInfo() : flags(0), creation_time(0), has_instance_obj(false) {}
+ RGWBucketInfo() : flags(0), creation_time(0), has_instance_obj(false), num_shards(0), bucket_index_shard_hash_type(MOD) {}
};
WRITE_CLASS_ENCODER(RGWBucketInfo)
bool in_extra_data; /* in-memory only member, does not serialize */
+ // Represents the hash index source for this object once it is set (non-empty)
+ std::string index_hash_source;
+
rgw_obj() : in_extra_data(false) {}
rgw_obj(const char *b, const char *o) : in_extra_data(false) {
rgw_bucket _b(b);
return orig_key;
}
+ string& get_hash_object() {
+ return index_hash_source.empty() ? object : index_hash_source;
+ }
/**
* Translate a namespace-mangled object name to the user-facing name
* existing in the given namespace.
encode_json("placement_rule", placement_rule, f);
encode_json("has_instance_obj", has_instance_obj, f);
encode_json("quota", quota, f);
+ encode_json("num_shards", num_shards, f);
+ encode_json("bi_shard_hash_type", (uint32_t)bucket_index_shard_hash_type, f);
}
void RGWBucketInfo::decode_json(JSONObj *obj) {
JSONDecoder::decode_json("placement_rule", placement_rule, obj);
JSONDecoder::decode_json("has_instance_obj", has_instance_obj, obj);
JSONDecoder::decode_json("quota", quota, obj);
+ JSONDecoder::decode_json("num_shards", num_shards, obj);
+ uint32_t hash_type;
+ JSONDecoder::decode_json("bi_shard_hash_type", hash_type, obj);
+ bucket_index_shard_hash_type = (uint8_t)hash_type;
}
void RGWObjEnt::dump(Formatter *f) const
encode_json("endpoints", endpoints, f);
encode_json("log_meta", log_meta, f);
encode_json("log_data", log_data, f);
+ encode_json("bucket_index_max_shards", bucket_index_max_shards, f);
}
void RGWZone::decode_json(JSONObj *obj)
JSONDecoder::decode_json("endpoints", endpoints, obj);
JSONDecoder::decode_json("log_meta", log_meta, obj);
JSONDecoder::decode_json("log_data", log_data, obj);
+ JSONDecoder::decode_json("bucket_index_max_shards", bucket_index_max_shards, obj);
}
void RGWRegionPlacementTarget::dump(Formatter *f) const
string obj_str;
RGWUserInfo bucket_owner_info;
- s->bucket_instance_id = s->info.args.get(RGW_SYS_PARAM_PREFIX "bucket-instance");
+ string bi = s->info.args.get(RGW_SYS_PARAM_PREFIX "bucket-instance");
+ if (!bi.empty()) {
+ int shard_id;
+ ret = rgw_bucket_parse_bucket_instance(bi, &s->bucket_instance_id, &shard_id);
+ if (ret < 0) {
+ return ret;
+ }
+ }
s->bucket_acl = new RGWAccessControlPolicy(s->cct);
}
head_obj = manifest_gen.get_cur_obj();
+ head_obj.index_hash_source = obj_str;
cur_obj = head_obj;
return 0;
obj.init_ns(s->bucket, tmp_obj_name, mp_ns);
// the meta object will be indexed with 0 size, we c
obj.set_in_extra_data(true);
+ obj.index_hash_source = s->object_str;
ret = store->put_obj_meta(s->obj_ctx, obj, 0, NULL, attrs, RGW_OBJ_CATEGORY_MULTIMETA, PUT_OBJ_CREATE_EXCL, s->owner.get_id());
} while (ret == -EEXIST);
}
meta_obj.init_ns(s->bucket, meta_oid, mp_ns);
meta_obj.set_in_extra_data(true);
+ meta_obj.index_hash_source = s->object_str;
ret = get_obj_attrs(store, s, meta_obj, attrs, NULL, NULL);
if (ret < 0) {
string oid = mp.get_part(obj_iter->second.num);
rgw_obj obj;
obj.init_ns(s->bucket, oid, mp_ns);
+ obj.index_hash_source = s->object_str;
ret = store->delete_obj(s->obj_ctx, owner, obj);
if (ret < 0 && ret != -ENOENT)
return;
RGWObjManifest::obj_iterator oiter;
for (oiter = manifest.obj_begin(); oiter != manifest.obj_end(); ++oiter) {
rgw_obj loc = oiter.get_location();
+ loc.index_hash_source = s->object_str;
ret = store->delete_obj(s->obj_ctx, owner, loc);
if (ret < 0 && ret != -ENOENT)
return;
// and also remove the metadata obj
meta_obj.init_ns(s->bucket, meta_oid, mp_ns);
meta_obj.set_in_extra_data(true);
+ meta_obj.index_hash_source = s->object_str;
ret = store->delete_obj(s->obj_ctx, owner, meta_obj);
if (ret == -ENOENT) {
ret = -ERR_NO_SUCH_BUCKET;
{
RGWBucketInfo bucket_info;
- uint64_t bucket_ver;
- uint64_t master_ver;
+ string bucket_ver;
+ string master_ver;
map<RGWObjCategory, RGWStorageStats> bucket_stats;
int r = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, bucket_stats, NULL);
#include "rgw_metadata.h"
#include "rgw_bucket.h"
+#include "cls/rgw/cls_rgw_ops.h"
#include "cls/rgw/cls_rgw_types.h"
#include "cls/rgw/cls_rgw_client.h"
#include "cls/refcount/cls_refcount_client.h"
#define dout_subsys ceph_subsys_rgw
+#define MAX_BUCKET_INDEX_SHARDS_PRIME 7877
+
using namespace std;
static RGWCache<RGWRados> cached_rados_provider;
#define RGW_STATELOG_OBJ_PREFIX "statelog."
-
#define dout_subsys ceph_subsys_rgw
void RGWDefaultRegionInfo::dump(Formatter *f) const {
quota_handler = RGWQuotaHandler::generate_handler(this, quota_threads);
+ bucket_index_max_shards = (cct->_conf->rgw_override_bucket_index_max_shards ? cct->_conf->rgw_override_bucket_index_max_shards :
+ zone_public_config.bucket_index_max_shards);
+ if (bucket_index_max_shards > MAX_BUCKET_INDEX_SHARDS_PRIME) {
+ bucket_index_max_shards = MAX_BUCKET_INDEX_SHARDS_PRIME;
+ ldout(cct, 1) << __func__ << " bucket index max shards is too large, reset to value: "
+ << MAX_BUCKET_INDEX_SHARDS_PRIME << dendl;
+ }
+ ldout(cct, 20) << __func__ << " bucket index max shards: " << bucket_index_max_shards << dendl;
+
return ret;
}
return 0;
}
+void RGWRados::build_bucket_index_marker(const string& shard_id_str, const string& shard_marker,
+ string *marker) {
+ if (marker) {
+ *marker = shard_id_str;
+ marker->append(BucketIndexShardsManager::KEY_VALUE_SEPARATOR);
+ marker->append(shard_marker);
+ }
+}
+
int RGWRados::open_bucket_index_ctx(rgw_bucket& bucket, librados::IoCtx& index_ctx)
{
int r = open_bucket_pool_ctx(bucket.name, bucket.index_pool, index_ctx);
name = prefix + buf;
}
-void RGWRados::time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, string& section, string& key, bufferlist& bl)
+void RGWRados::time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, const string& section, const string& key, bufferlist& bl)
{
cls_log_add_prepare_entry(entry, ut, section, key, bl);
}
result.push_back(ent);
count++;
}
+
+ // Either the back-end telling us truncated, or we don't consume all
+ // items returned per the amount caller request
+ truncated = (truncated || eiter != ent_map.end());
}
done:
return 0;
}
-int RGWRados::init_bucket_index(rgw_bucket& bucket)
+int RGWRados::init_bucket_index(rgw_bucket& bucket, int num_shards)
{
librados::IoCtx index_ctx; // context for new bucket
string dir_oid = dir_oid_prefix;
dir_oid.append(bucket.marker);
- librados::ObjectWriteOperation op;
- op.create(true);
- r = cls_rgw_init_index(index_ctx, op, dir_oid);
- if (r < 0 && r != -EEXIST)
- return r;
+ map<int, string> bucket_objs;
+ get_bucket_index_objects(dir_oid, num_shards, bucket_objs);
- return 0;
+ return CLSRGWIssueBucketIndexInit(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
}
/**
string dir_oid = dir_oid_prefix;
dir_oid.append(bucket.marker);
- r = init_bucket_index(bucket);
+ r = init_bucket_index(bucket, bucket_index_max_shards);
if (r < 0)
return r;
info.owner = owner.user_id;
info.region = region_name;
info.placement_rule = selected_placement_rule;
+ info.num_shards = bucket_index_max_shards;
+ info.bucket_index_shard_hash_type = RGWBucketInfo::MOD;
if (!creation_time)
time(&info.creation_time);
else
/* remove bucket index */
librados::IoCtx index_ctx; // context for new bucket
- int r = open_bucket_index_ctx(bucket, index_ctx);
+ map<int, string> bucket_objs;
+ int r = open_bucket_index(bucket, index_ctx, bucket_objs);
if (r < 0)
return r;
- index_ctx.remove(dir_oid);
+ map<int, string>::const_iterator biter;
+ for (biter = bucket_objs.begin(); biter != bucket_objs.end(); ++biter) {
+ // Do best effort removal
+ index_ctx.remove(biter->second);
+ }
}
/* ret == -ENOENT here */
}
return 0;
}
+int RGWRados::BucketShard::init(rgw_bucket& _bucket, rgw_obj& obj)
+{
+ bucket = _bucket;
+
+ if (store->bucket_is_system(bucket)) {
+ return 0;
+ }
+
+ int ret = store->open_bucket_index_shard(bucket, index_ctx, obj.get_hash_object(), &bucket_obj, &shard_id);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: open_bucket_index_shard() returned ret=" << ret << dendl;
+ return ret;
+ }
+ ldout(store->ctx(), 20) << " bucket index object: " << bucket_obj << dendl;
+
+ return 0;
+}
+
+
/**
* Write/overwrite an object to the bucket storage.
* bucket: the bucket to store the object in
index_tag = state->write_tag;
}
- r = prepare_update_index(NULL, bucket, CLS_RGW_OP_ADD, obj, index_tag);
+ librados::IoCtx index_ctx;
+ BucketShard bs(this);
+ r = bs.init(bucket, obj);
+ if (r < 0) {
+ return r;
+ }
+
+ r = prepare_update_index(NULL, bs, CLS_RGW_OP_ADD, obj, index_tag);
if (r < 0)
return r;
ldout(cct, 0) << "ERROR: complete_atomic_overwrite returned r=" << r << dendl;
}
- r = complete_update_index(bucket, obj.object, index_tag, poolid, epoch, size,
- ut, etag, content_type, &acl_bl, category, remove_objs);
+ r = complete_update_index(bs, obj, index_tag, poolid, epoch, size,
+ ut, etag, content_type, &acl_bl, category, remove_objs);
if (r < 0)
goto done_cancel;
return 0;
done_cancel:
- int ret = complete_update_index_cancel(bucket, obj.object, index_tag);
+ int ret = complete_update_index_cancel(bs, obj, index_tag);
if (ret < 0) {
ldout(cct, 0) << "ERROR: complete_update_index_cancel() returned ret=" << ret << dendl;
}
return 0;
}
-static void translate_raw_stats(rgw_bucket_dir_header& header, map<RGWObjCategory, RGWStorageStats>& stats)
+int RGWRados::open_bucket_index_base(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+ string& bucket_oid_base) {
+ if (bucket_is_system(bucket))
+ return -EINVAL;
+
+ int r = open_bucket_index_ctx(bucket, index_ctx);
+ if (r < 0)
+ return r;
+
+ if (bucket.marker.empty()) {
+ ldout(cct, 0) << "ERROR: empty marker for bucket operation" << dendl;
+ return -EIO;
+ }
+
+ bucket_oid_base = dir_oid_prefix;
+ bucket_oid_base.append(bucket.marker);
+
+ return 0;
+
+}
+
+int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+ map<int, string>& bucket_objs, int shard_id, map<int, string> *bucket_instance_ids) {
+ string bucket_oid_base;
+ int ret = open_bucket_index_base(bucket, index_ctx, bucket_oid_base);
+ if (ret < 0)
+ return ret;
+
+ // Get the bucket info
+ RGWBucketInfo binfo;
+ ret = get_bucket_instance_info(NULL, bucket, binfo, NULL, NULL);
+ if (ret < 0)
+ return ret;
+
+ get_bucket_index_objects(bucket_oid_base, binfo.num_shards, bucket_objs, shard_id);
+ if (bucket_instance_ids) {
+ get_bucket_instance_ids(binfo, shard_id, bucket_instance_ids);
+ }
+ return 0;
+}
+
+template<typename T>
+int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+ map<int, string>& oids, map<int, T>& bucket_objs,
+ int shard_id, map<int, string> *bucket_instance_ids)
+{
+ int ret = open_bucket_index(bucket, index_ctx, oids, shard_id, bucket_instance_ids);
+ if (ret < 0)
+ return ret;
+
+ map<int, string>::const_iterator iter = oids.begin();
+ for (; iter != oids.end(); ++iter) {
+ bucket_objs[iter->first] = T();
+ }
+ return 0;
+}
+
+int RGWRados::open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+ const string& obj_key, string *bucket_obj, int *shard_id)
+{
+ string bucket_oid_base;
+ int ret = open_bucket_index_base(bucket, index_ctx, bucket_oid_base);
+ if (ret < 0)
+ return ret;
+
+ // Get the bucket info
+ RGWBucketInfo binfo;
+ ret = get_bucket_instance_info(NULL, bucket, binfo, NULL, NULL);
+ if (ret < 0)
+ return ret;
+
+ ret = get_bucket_index_object(bucket_oid_base, obj_key, binfo.num_shards,
+ (RGWBucketInfo::BIShardsHashType)binfo.bucket_index_shard_hash_type, bucket_obj, shard_id);
+ if (ret < 0) {
+ ldout(cct, 10) << "get_bucket_index_object() returned ret=" << ret << dendl;
+ }
+ return 0;
+}
+
+static void accumulate_raw_stats(rgw_bucket_dir_header& header, map<RGWObjCategory, RGWStorageStats>& stats)
{
map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = header.stats.begin();
for (; iter != header.stats.end(); ++iter) {
RGWStorageStats& s = stats[category];
struct rgw_bucket_category_stats& header_stats = iter->second;
s.category = (RGWObjCategory)iter->first;
- s.num_kb = ((header_stats.total_size + 1023) / 1024);
- s.num_kb_rounded = ((header_stats.total_size_rounded + 1023) / 1024);
- s.num_objects = header_stats.num_entries;
+ s.num_kb += ((header_stats.total_size + 1023) / 1024);
+ s.num_kb_rounded += ((header_stats.total_size_rounded + 1023) / 1024);
+ s.num_objects += header_stats.num_entries;
}
}
map<RGWObjCategory, RGWStorageStats> *calculated_stats)
{
librados::IoCtx index_ctx;
- string oid;
-
- int ret = open_bucket_index(bucket, index_ctx, oid);
+ // key - bucket index object id
+ // value - bucket index check OP returned result with the given bucket index object (shard)
+ map<int, string> oids;
+ map<int, struct rgw_cls_check_index_ret> bucket_objs_ret;
+ int ret = open_bucket_index(bucket, index_ctx, oids, bucket_objs_ret);
if (ret < 0)
return ret;
- rgw_bucket_dir_header existing_header;
- rgw_bucket_dir_header calculated_header;
-
- ret = cls_rgw_bucket_check_index_op(index_ctx, oid, &existing_header, &calculated_header);
+ ret = CLSRGWIssueBucketCheck(index_ctx, oids, bucket_objs_ret, cct->_conf->rgw_bucket_index_max_aio)();
if (ret < 0)
return ret;
- translate_raw_stats(existing_header, *existing_stats);
- translate_raw_stats(calculated_header, *calculated_stats);
+ // Aggregate results (from different shards if there is any)
+ map<int, struct rgw_cls_check_index_ret>::iterator iter;
+ for (iter = bucket_objs_ret.begin(); iter != bucket_objs_ret.end(); ++iter) {
+ accumulate_raw_stats(iter->second.existing_header, *existing_stats);
+ accumulate_raw_stats(iter->second.calculated_header, *calculated_stats);
+ }
return 0;
}
int RGWRados::bucket_rebuild_index(rgw_bucket& bucket)
{
librados::IoCtx index_ctx;
- string oid;
-
- int ret = open_bucket_index(bucket, index_ctx, oid);
- if (ret < 0)
- return ret;
+ map<int, string> bucket_objs;
+ int r = open_bucket_index(bucket, index_ctx, bucket_objs);
+ if (r < 0)
+ return r;
- return cls_rgw_bucket_rebuild_index_op(index_ctx, oid);
+ return CLSRGWIssueBucketRebuild(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
}
bool ret_not_existed = (state && !state->exists);
+ BucketShard bs(this);
+ r = bs.init(bucket, obj);
+ if (r < 0) {
+ return r;
+ }
+
string tag;
- r = prepare_update_index(state, bucket, CLS_RGW_OP_DEL, obj, tag);
+ r = prepare_update_index(state, bs, CLS_RGW_OP_DEL, obj, tag);
if (r < 0)
return r;
int64_t poolid = ref.ioctx.get_id();
if (r >= 0 || r == -ENOENT) {
uint64_t epoch = ref.ioctx.get_last_version();
- r = complete_update_index_del(bucket, obj.object, tag, poolid, epoch);
+ r = complete_update_index_del(bs, obj, tag, poolid, epoch);
} else {
- int ret = complete_update_index_cancel(bucket, obj.object, tag);
+ int ret = complete_update_index_cancel(bs, obj, tag);
if (ret < 0) {
ldout(cct, 0) << "ERROR: complete_update_index_cancel returned ret=" << ret << dendl;
}
std::string oid, key;
get_obj_bucket_and_oid_key(obj, bucket, oid, key);
+ BucketShard bs(this);
+ int r = bs.init(bucket, obj);
+ if (r < 0) {
+ return r;
+ }
+
string tag;
- int r = complete_update_index_del(bucket, obj.object, tag, -1 /* pool */, 0);
+ r = complete_update_index_del(bs, obj, tag, -1 /* pool */, 0);
return r;
}
if (!op.size())
return 0;
+ librados::IoCtx index_ctx;
+ BucketShard bs(this);
+ r = bs.init(bucket, obj);
+ if (r < 0) {
+ ldout(cct, 10) << "bs.init() returned r=" << r << dendl;
+ return r;
+ }
+
string tag;
if (state) {
- r = prepare_update_index(state, bucket, CLS_RGW_OP_ADD, obj, tag);
+ r = prepare_update_index(state, bs, CLS_RGW_OP_ADD, obj, tag);
if (r < 0)
return r;
}
uint64_t epoch = ref.ioctx.get_last_version();
int64_t poolid = ref.ioctx.get_id();
utime_t mtime = ceph_clock_now(cct);
- r = complete_update_index(bucket, obj.object, tag, poolid, epoch, state->size,
+ r = complete_update_index(bs, obj, tag, poolid, epoch, state->size,
mtime, etag, content_type, &acl_bl, RGW_OBJ_CATEGORY_MAIN, NULL);
} else {
- int ret = complete_update_index_cancel(bucket, obj.object, tag);
+ int ret = complete_update_index_cancel(bs, obj, tag);
if (ret < 0) {
ldout(cct, 0) << "ERROR: comlete_update_index_cancel() returned r=" << r << dendl;
}
return r;
}
-int RGWRados::prepare_update_index(RGWObjState *state, rgw_bucket& bucket,
+int RGWRados::prepare_update_index(RGWObjState *state, BucketShard& bs,
RGWModifyOp op, rgw_obj& obj, string& tag)
{
- if (bucket_is_system(bucket))
+ if (bucket_is_system(bs.bucket))
return 0;
- int ret = data_log->add_entry(obj.bucket);
+ int ret = data_log->add_entry(bs.bucket, bs.shard_id);
if (ret < 0) {
lderr(cct) << "ERROR: failed writing data log" << dendl;
return ret;
append_rand_alpha(cct, tag, tag, 32);
}
}
- ret = cls_obj_prepare_op(bucket, op, tag,
- obj.object, obj.key);
+ ret = cls_obj_prepare_op(bs, op, tag, obj.object, obj.key);
return ret;
}
-int RGWRados::complete_update_index(rgw_bucket& bucket, string& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size,
+int RGWRados::complete_update_index(BucketShard& bs, rgw_obj& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size,
utime_t& ut, string& etag, string& content_type, bufferlist *acl_bl, RGWObjCategory category,
list<string> *remove_objs)
{
- if (bucket_is_system(bucket))
+ if (bucket_is_system(bs.bucket))
return 0;
RGWObjEnt ent;
- ent.name = oid;
+ ent.name = oid.object;
ent.size = size;
ent.mtime = ut;
ent.etag = etag;
ent.owner_display_name = owner.get_display_name();
ent.content_type = content_type;
- int ret = cls_obj_complete_add(bucket, tag, poolid, epoch, ent, category, remove_objs);
+ int ret = cls_obj_complete_add(bs, tag, poolid, epoch, ent, category, remove_objs);
return ret;
}
string etag;
string content_type;
bufferlist acl_bl;
+ BucketShard bs(this);
bool update_index = (category == RGW_OBJ_CATEGORY_MAIN ||
category == RGW_OBJ_CATEGORY_MULTIMETA);
int64_t poolid = io_ctx.get_id();
int ret;
+ ret = bs.init(bucket, dst_obj);
+ if (ret < 0) {
+ goto done;
+ }
+
if (update_index) {
- ret = prepare_update_index(state, bucket, CLS_RGW_OP_ADD, dst_obj, tag);
+ ret = prepare_update_index(state, bs, CLS_RGW_OP_ADD, dst_obj, tag);
if (ret < 0)
goto done;
}
if (update_index) {
if (ret >= 0) {
- ret = complete_update_index(bucket, dst_obj.object, tag, poolid, epoch, size,
+ ret = complete_update_index(bs, dst_obj, tag, poolid, epoch, size,
ut, etag, content_type, &acl_bl, category, NULL);
} else {
- int r = complete_update_index_cancel(bucket, dst_obj.object, tag);
+ int r = complete_update_index_cancel(bs, dst_obj, tag);
if (r < 0) {
ldout(cct, 0) << "ERROR: comlete_update_index_cancel() returned r=" << r << dendl;
}
return 0;
}
-int RGWRados::get_bucket_stats(rgw_bucket& bucket, uint64_t *bucket_ver, uint64_t *master_ver, map<RGWObjCategory, RGWStorageStats>& stats,
- string *max_marker)
+int RGWRados::get_bucket_stats(rgw_bucket& bucket, string *bucket_ver, string *master_ver,
+ map<RGWObjCategory, RGWStorageStats>& stats, string *max_marker)
{
- rgw_bucket_dir_header header;
- int r = cls_bucket_head(bucket, header);
+ map<string, rgw_bucket_dir_header> headers;
+ map<int, string> bucket_instance_ids;
+ int r = cls_bucket_head(bucket, headers, &bucket_instance_ids);
if (r < 0)
return r;
- stats.clear();
-
- translate_raw_stats(header, stats);
-
- *bucket_ver = header.ver;
- *master_ver = header.master_ver;
-
- if (max_marker)
- *max_marker = header.max_marker;
-
+ assert(headers.size() == bucket_instance_ids.size());
+
+ map<string, rgw_bucket_dir_header>::iterator iter = headers.begin();
+ map<int, string>::iterator viter = bucket_instance_ids.begin();
+ BucketIndexShardsManager ver_mgr;
+ BucketIndexShardsManager master_ver_mgr;
+ BucketIndexShardsManager marker_mgr;
+ char buf[64];
+ for(; iter != headers.end(); ++iter, ++viter) {
+ accumulate_raw_stats(iter->second, stats);
+ snprintf(buf, sizeof(buf), "%lu", iter->second.ver);
+ ver_mgr.add(viter->first, string(buf));
+ snprintf(buf, sizeof(buf), "%lu", iter->second.master_ver);
+ master_ver_mgr.add(viter->first, string(buf));
+ marker_mgr.add(viter->first, iter->second.max_marker);
+ }
+ ver_mgr.to_string(bucket_ver);
+ master_ver_mgr.to_string(master_ver);
+ marker_mgr.to_string(max_marker);
return 0;
}
class RGWGetBucketStatsContext : public RGWGetDirHeader_CB {
RGWGetBucketStats_CB *cb;
+ uint32_t pendings;
+ map<RGWObjCategory, RGWStorageStats> stats;
+ int ret_code;
+ bool should_cb;
+ Mutex lock;
public:
- RGWGetBucketStatsContext(RGWGetBucketStats_CB *_cb) : cb(_cb) {}
+ RGWGetBucketStatsContext(RGWGetBucketStats_CB *_cb, uint32_t _pendings)
+ : cb(_cb), pendings(_pendings), stats(), ret_code(0), should_cb(true),
+ lock("RGWGetBucketStatsContext") {}
+
void handle_response(int r, rgw_bucket_dir_header& header) {
- map<RGWObjCategory, RGWStorageStats> stats;
+ Mutex::Locker l(lock);
+ if (should_cb) {
+ if ( r >= 0) {
+ accumulate_raw_stats(header, stats);
+ } else {
+ ret_code = r;
+ }
- if (r >= 0) {
- translate_raw_stats(header, stats);
- cb->set_response(header.ver, header.master_ver, &stats, header.max_marker);
+ // Are we all done?
+ if (--pendings == 0) {
+ if (!ret_code) {
+ cb->set_response(&stats);
+ }
+ cb->handle_response(ret_code);
+ cb->put();
+ }
}
+ }
- cb->handle_response(r);
-
- cb->put();
+ void unset_cb() {
+ Mutex::Locker l(lock);
+ should_cb = false;
}
};
int RGWRados::get_bucket_stats_async(rgw_bucket& bucket, RGWGetBucketStats_CB *ctx)
{
- RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(ctx);
- int r = cls_bucket_head_async(bucket, get_ctx);
+ RGWBucketInfo binfo;
+ int r = get_bucket_instance_info(NULL, bucket, binfo, NULL, NULL);
+ if (r < 0)
+ return r;
+
+ int num_aio = 0;
+ RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(ctx, binfo.num_shards);
+ assert(get_ctx);
+ r = cls_bucket_head_async(bucket, get_ctx, &num_aio);
+ get_ctx->put();
if (r < 0) {
ctx->put();
- delete get_ctx;
- return r;
+ if (num_aio) {
+ get_ctx->unset_cb();
+ }
}
-
- return 0;
+ return r;
}
class RGWGetUserStatsContext : public RGWGetUserHeader_CB {
time_t *pmtime, map<string, bufferlist> *pattrs)
{
string oid;
- if (!bucket.oid.empty()) {
+ if (bucket.oid.empty()) {
get_bucket_meta_oid(bucket, oid);
} else {
oid = bucket.oid;
RGWBucketEnt& ent = iter->second;
rgw_bucket& bucket = ent.bucket;
- rgw_bucket_dir_header header;
- int r = cls_bucket_head(bucket, header);
+ map<string, rgw_bucket_dir_header> headers;
+ int r = cls_bucket_head(bucket, headers);
if (r < 0)
return r;
- ent.count = 0;
- ent.size = 0;
-
- RGWObjCategory category = main_category;
- map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = header.stats.find((uint8_t)category);
- if (iter != header.stats.end()) {
- struct rgw_bucket_category_stats& stats = iter->second;
- ent.count = stats.num_entries;
- ent.size = stats.total_size;
- ent.size_rounded = stats.total_size_rounded;
+ map<string, rgw_bucket_dir_header>::iterator hiter = headers.begin();
+ for (; hiter != headers.end(); ++hiter) {
+ RGWObjCategory category = main_category;
+ map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = (hiter->second.stats).find((uint8_t)category);
+ if (iter != hiter->second.stats.end()) {
+ struct rgw_bucket_category_stats& stats = iter->second;
+ ent.count += stats.num_entries;
+ ent.size += stats.total_size;
+ ent.size_rounded += stats.total_size_rounded;
+ }
}
}
return oids.size();
}
-int RGWRados::list_bi_log_entries(rgw_bucket& bucket, string& marker, uint32_t max,
+int RGWRados::list_bi_log_entries(rgw_bucket& bucket, int shard_id, string& marker, uint32_t max,
std::list<rgw_bi_log_entry>& result, bool *truncated)
{
+ ldout(cct, 20) << __func__ << ": " << bucket << " marker " << marker << " shard_id=" << shard_id << " max " << max << dendl;
result.clear();
librados::IoCtx index_ctx;
- string oid;
- int r = open_bucket_index(bucket, index_ctx, oid);
+ map<int, string> oids;
+ map<int, cls_rgw_bi_log_list_ret> bi_log_lists;
+ map<int, string> bucket_instance_ids;
+ int r = open_bucket_index(bucket, index_ctx, oids, shard_id, &bucket_instance_ids);
if (r < 0)
return r;
- std::list<rgw_bi_log_entry> entries;
- int ret = cls_rgw_bi_log_list(index_ctx, oid, marker, max - result.size(), entries, truncated);
- if (ret < 0)
- return ret;
+ BucketIndexShardsManager marker_mgr;
+ bool has_shards = (oids.size() > 1 || shard_id >= 0);
+ // If there are multiple shards for the bucket index object, the marker
+ // should have the pattern '{shard_id_1}#{shard_marker_1},{shard_id_2}#
+ // {shard_marker_2}...', if there is no sharding, the bi_log_list should
+ // only contain one record, and the key is the bucket instance id.
+ r = marker_mgr.from_string(marker, shard_id);
+ if (r < 0)
+ return r;
+
+ r = CLSRGWIssueBILogList(index_ctx, marker_mgr, max, oids, bi_log_lists, cct->_conf->rgw_bucket_index_max_aio)();
+ if (r < 0)
+ return r;
- std::list<rgw_bi_log_entry>::iterator iter;
- for (iter = entries.begin(); iter != entries.end(); ++iter) {
- result.push_back(*iter);
+ vector<string> shard_ids_str;
+ map<int, list<rgw_bi_log_entry>::iterator> vcurrents;
+ map<int, list<rgw_bi_log_entry>::iterator> vends;
+ if (truncated) {
+ *truncated = false;
+ }
+ map<int, cls_rgw_bi_log_list_ret>::iterator miter = bi_log_lists.begin();
+ for (; miter != bi_log_lists.end(); ++miter) {
+ int shard_id = miter->first;
+ vcurrents[shard_id] = miter->second.entries.begin();
+ vends[shard_id] = miter->second.entries.end();
+ if (truncated) {
+ *truncated = (*truncated || miter->second.truncated);
+ }
+ }
+
+ size_t total = 0;
+ bool has_more = true;
+ map<int, list<rgw_bi_log_entry>::iterator>::iterator viter;
+ map<int, list<rgw_bi_log_entry>::iterator>::iterator eiter;
+ while (total < max && has_more) {
+ has_more = false;
+
+ viter = vcurrents.begin();
+ eiter = vends.begin();
+
+ for (; total < max && viter != vcurrents.end(); ++viter, ++eiter) {
+ assert (eiter != vends.end());
+
+ int shard_id = viter->first;
+ list<rgw_bi_log_entry>::iterator& liter = viter->second;
+
+ if (liter == eiter->second){
+ continue;
+ }
+ rgw_bi_log_entry& entry = *(liter);
+ if (has_shards) {
+ char buf[16];
+ snprintf(buf, sizeof(buf), "%d", shard_id);
+ string tmp_id;
+ build_bucket_index_marker(buf, entry.id, &tmp_id);
+ entry.id.swap(tmp_id);
+ }
+ marker_mgr.add(shard_id, entry.id);
+ result.push_back(entry);
+ total++;
+ has_more = true;
+ ++liter;
+ }
+ }
+
+ if (truncated) {
+ for (viter = vcurrents.begin(), eiter = vends.begin(); viter != vcurrents.end(); ++viter, ++eiter) {
+ assert (eiter != vends.end());
+ *truncated = (*truncated || (viter->second != eiter->second));
+ }
+ }
+
+ // Refresh marker, if there are multiple shards, the output will look like
+ // '{shard_oid_1}#{shard_marker_1},{shard_oid_2}#{shard_marker_2}...',
+ // if there is no sharding, the simply marker (without oid) is returned
+ if (has_shards) {
+ marker_mgr.to_string(&marker);
+ } else {
+ if (!result.empty()) {
+ marker = result.rbegin()->id;
+ }
}
return 0;
}
-int RGWRados::trim_bi_log_entries(rgw_bucket& bucket, string& start_marker, string& end_marker)
+int RGWRados::trim_bi_log_entries(rgw_bucket& bucket, int shard_id, string& start_marker, string& end_marker)
{
librados::IoCtx index_ctx;
- string oid;
- int r = open_bucket_index(bucket, index_ctx, oid);
+ map<int, string> bucket_objs;
+ int r = open_bucket_index(bucket, index_ctx, bucket_objs, shard_id);
if (r < 0)
return r;
- int ret = cls_rgw_bi_log_trim(index_ctx, oid, start_marker, end_marker);
- if (ret < 0)
- return ret;
+ BucketIndexShardsManager start_marker_mgr;
+ r = start_marker_mgr.from_string(start_marker, shard_id);
+ if (r < 0)
+ return r;
+ BucketIndexShardsManager end_marker_mgr;
+ r = end_marker_mgr.from_string(end_marker, shard_id);
+ if (r < 0)
+ return r;
- return 0;
+ return CLSRGWIssueBILogTrim(index_ctx, start_marker_mgr, end_marker_mgr, bucket_objs,
+ cct->_conf->rgw_bucket_index_max_aio)();
}
int RGWRados::gc_operate(string& oid, librados::ObjectWriteOperation *op)
return r;
}
-int RGWRados::cls_obj_prepare_op(rgw_bucket& bucket, RGWModifyOp op, string& tag,
+int RGWRados::cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag,
string& name, string& locator)
{
- librados::IoCtx index_ctx;
- string oid;
-
- int r = open_bucket_index(bucket, index_ctx, oid);
- if (r < 0)
- return r;
-
ObjectWriteOperation o;
cls_rgw_bucket_prepare_op(o, op, tag, name, locator, zone_public_config.log_data);
- r = index_ctx.operate(oid, &o);
- return r;
+ int ret = bs.index_ctx.operate(bs.bucket_obj, &o);
+ return ret;
}
-int RGWRados::cls_obj_complete_op(rgw_bucket& bucket, RGWModifyOp op, string& tag,
+int RGWRados::cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag,
int64_t pool, uint64_t epoch,
RGWObjEnt& ent, RGWObjCategory category,
list<string> *remove_objs)
{
- librados::IoCtx index_ctx;
- string oid;
-
- int r = open_bucket_index(bucket, index_ctx, oid);
- if (r < 0)
- return r;
-
ObjectWriteOperation o;
rgw_bucket_dir_entry_meta dir_meta;
dir_meta.size = ent.size;
cls_rgw_bucket_complete_op(o, op, tag, ver, ent.name, dir_meta, remove_objs, zone_public_config.log_data);
AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
- r = index_ctx.aio_operate(oid, c, &o);
+ int ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &o);
c->release();
- return r;
+ return ret;
}
-int RGWRados::cls_obj_complete_add(rgw_bucket& bucket, string& tag,
+int RGWRados::cls_obj_complete_add(BucketShard& bs, string& tag,
int64_t pool, uint64_t epoch,
RGWObjEnt& ent, RGWObjCategory category,
- list<string> *remove_objs)
+ list<string> *remove_obj)
{
- return cls_obj_complete_op(bucket, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_objs);
+ return cls_obj_complete_op(bs, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_obj);
}
-int RGWRados::cls_obj_complete_del(rgw_bucket& bucket, string& tag,
+int RGWRados::cls_obj_complete_del(BucketShard& bs, string& tag,
int64_t pool, uint64_t epoch,
string& name)
{
RGWObjEnt ent;
ent.name = name;
- return cls_obj_complete_op(bucket, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, NULL);
+ return cls_obj_complete_op(bs, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, NULL);
}
-int RGWRados::cls_obj_complete_cancel(rgw_bucket& bucket, string& tag, string& name)
+int RGWRados::cls_obj_complete_cancel(BucketShard& bs, string& tag, string& name)
{
RGWObjEnt ent;
ent.name = name;
- return cls_obj_complete_op(bucket, CLS_RGW_OP_ADD, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL);
+ return cls_obj_complete_op(bs, CLS_RGW_OP_ADD, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL);
}
int RGWRados::cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeout)
{
librados::IoCtx index_ctx;
- string oid;
-
- int r = open_bucket_index(bucket, index_ctx, oid);
+ map<int, string> bucket_objs;
+ int r = open_bucket_index(bucket, index_ctx, bucket_objs);
if (r < 0)
return r;
- ObjectWriteOperation o;
- cls_rgw_bucket_set_tag_timeout(o, timeout);
-
- r = index_ctx.operate(oid, &o);
-
- return r;
+ return CLSRGWIssueSetTagTimeout(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio, timeout)();
}
-int RGWRados::cls_bucket_list(rgw_bucket& bucket, string start, string prefix,
- uint32_t num, map<string, RGWObjEnt>& m,
- bool *is_truncated, string *last_entry,
- bool (*force_check_filter)(const string& name))
+int RGWRados::cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix,
+ uint32_t num_entries, map<string, RGWObjEnt>& m, bool *is_truncated,
+ string *last_entry, bool (*force_check_filter)(const string& name))
{
- ldout(cct, 10) << "cls_bucket_list " << bucket << " start " << start << " num " << num << dendl;
+ ldout(cct, 10) << "cls_bucket_list " << bucket << " start " << start << " num_entries " << num_entries << dendl;
librados::IoCtx index_ctx;
- string oid;
- int r = open_bucket_index(bucket, index_ctx, oid);
+ // key - oid (for different shards if there is any)
+ // value - list result for the corresponding oid (shard), it is filled by the AIO callback
+ map<int, string> oids;
+ map<int, struct rgw_cls_list_ret> list_results;
+ int r = open_bucket_index(bucket, index_ctx, oids);
if (r < 0)
return r;
- struct rgw_bucket_dir dir;
- r = cls_rgw_list_op(index_ctx, oid, start, prefix, num, &dir, is_truncated);
+ r = CLSRGWIssueBucketList(index_ctx, start, prefix, num_entries, oids, list_results, cct->_conf->rgw_bucket_index_max_aio)();
if (r < 0)
return r;
- map<string, struct rgw_bucket_dir_entry>::iterator miter;
- bufferlist updates;
- for (miter = dir.m.begin(); miter != dir.m.end(); ++miter) {
- RGWObjEnt e;
- rgw_bucket_dir_entry& dirent = miter->second;
+ // Create a list of iterators that are used to iterate each shard
+ vector<map<string, struct rgw_bucket_dir_entry>::iterator> vcurrents(list_results.size());
+ vector<map<string, struct rgw_bucket_dir_entry>::iterator> vends(list_results.size());
+ vector<string> vnames(list_results.size());
+ map<int, struct rgw_cls_list_ret>::iterator iter = list_results.begin();
+ *is_truncated = false;
+ for (; iter != list_results.end(); ++iter) {
+ vcurrents.push_back(iter->second.dir.m.begin());
+ vends.push_back(iter->second.dir.m.end());
+ vnames.push_back(oids[iter->first]);
+ *is_truncated = (*is_truncated || iter->second.is_truncated);
+ }
+
+ // Create a map to track the next candidate entry from each shard, if the entry
+ // from a specified shard is selected/erased, the next entry from that shard will
+ // be inserted for next round selection
+ map<string, size_t> candidates;
+ for (size_t i = 0; i < vcurrents.size(); ++i) {
+ if (vcurrents[i] != vends[i]) {
+ candidates[vcurrents[i]->second.name] = i;
+ }
+ }
+
+ map<string, bufferlist> updates;
+ uint32_t count = 0;
+ while (count < num_entries && !candidates.empty()) {
+ // Select the next one
+ int pos = candidates.begin()->second;
+ struct rgw_bucket_dir_entry& dirent = vcurrents[pos]->second;
// fill it in with initial values; we may correct later
+ RGWObjEnt e;
e.name = dirent.name;
e.size = dirent.meta.size;
e.mtime = dirent.meta.mtime;
e.content_type = dirent.meta.content_type;
e.tag = dirent.tag;
- /* oh, that shouldn't happen! */
- if (e.name.empty()) {
- ldout(cct, 0) << "WARNING: got empty dirent name, skipping" << dendl;
- continue;
- }
-
bool force_check = force_check_filter && force_check_filter(dirent.name);
-
if (!dirent.exists || !dirent.pending_map.empty() || force_check) {
/* there are uncommitted ops. We need to check the current state,
* and if the tags are old we need to do cleanup as well. */
librados::IoCtx sub_ctx;
sub_ctx.dup(index_ctx);
- r = check_disk_state(sub_ctx, bucket, dirent, e, updates);
- if (r < 0) {
- if (r == -ENOENT)
- continue;
- else
+ r = check_disk_state(sub_ctx, bucket, dirent, e, updates[vnames[pos]]);
+ if (r < 0 && r != -ENOENT) {
return r;
}
}
- m[e.name] = e;
- ldout(cct, 10) << "RGWRados::cls_bucket_list: got " << e.name << dendl;
+ if (r >= 0) {
+ m[e.name] = e;
+ ldout(cct, 10) << "RGWRados::cls_bucket_list: got " << e.name << dendl;
+ ++count;
+ }
+
+ // Refresh the candidates map
+ candidates.erase(candidates.begin());
+ ++vcurrents[pos];
+ if (vcurrents[pos] != vends[pos]) {
+ candidates[vcurrents[pos]->second.name] = pos;
+ }
}
- if (dir.m.size()) {
- *last_entry = dir.m.rbegin()->first;
+ // Suggest updates if there is any
+ map<string, bufferlist>::iterator miter = updates.begin();
+ for (; miter != updates.end(); ++miter) {
+ if (miter->second.length()) {
+ ObjectWriteOperation o;
+ cls_rgw_suggest_changes(o, miter->second);
+ // we don't care if we lose suggested updates, send them off blindly
+ AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
+ index_ctx.aio_operate(miter->first, c, &o);
+ c->release();
+ }
}
- if (updates.length()) {
- ObjectWriteOperation o;
- cls_rgw_suggest_changes(o, updates);
- // we don't care if we lose suggested updates, send them off blindly
- AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
- r = index_ctx.aio_operate(oid, c, &o);
- c->release();
+ // Check if all the returned entries are consumed or not
+ for (size_t i = 0; i < vcurrents.size(); ++i) {
+ if (vcurrents[i] != vends[i])
+ *is_truncated = true;
}
- return m.size();
+ if (m.size())
+ *last_entry = m.rbegin()->first;
+
+ return 0;
}
int RGWRados::cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info)
return 0;
}
-int RGWRados::cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header)
+int RGWRados::cls_bucket_head(rgw_bucket& bucket, map<string, struct rgw_bucket_dir_header>& headers, map<int, string> *bucket_instance_ids)
{
librados::IoCtx index_ctx;
- string oid;
- int r = open_bucket_index(bucket, index_ctx, oid);
+ map<int, string> oids;
+ map<int, struct rgw_cls_list_ret> list_results;
+ int r = open_bucket_index(bucket, index_ctx, oids, list_results, -1, bucket_instance_ids);
if (r < 0)
return r;
- r = cls_rgw_get_dir_header(index_ctx, oid, &header);
+ r = CLSRGWIssueGetDirHeader(index_ctx, oids, list_results, cct->_conf->rgw_bucket_index_max_aio)();
if (r < 0)
return r;
+ map<int, struct rgw_cls_list_ret>::iterator iter = list_results.begin();
+ for(; iter != list_results.end(); ++iter) {
+ headers[oids[iter->first]] = iter->second.dir.header;
+ }
return 0;
}
-int RGWRados::cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx)
+int RGWRados::cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx, int *num_aio)
{
librados::IoCtx index_ctx;
- string oid;
- int r = open_bucket_index(bucket, index_ctx, oid);
+ map<int, string> bucket_objs;
+ int r = open_bucket_index(bucket, index_ctx, bucket_objs);
if (r < 0)
return r;
- r = cls_rgw_get_dir_header_async(index_ctx, oid, ctx);
- if (r < 0)
- return r;
-
- return 0;
+ map<int, string>::iterator iter = bucket_objs.begin();
+ for (; iter != bucket_objs.end(); ++iter) {
+ r = cls_rgw_get_dir_header_async(index_ctx, iter->second, static_cast<RGWGetDirHeader_CB*>(ctx->get()));
+ if (r < 0) {
+ ctx->put();
+ break;
+ } else {
+ (*num_aio)++;
+ }
+ }
+ return r;
}
int RGWRados::cls_user_get_header(const string& user_id, cls_user_header *header)
int RGWRados::cls_user_sync_bucket_stats(rgw_obj& user_obj, rgw_bucket& bucket)
{
- rgw_bucket_dir_header header;
- int r = cls_bucket_head(bucket, header);
+ map<string, struct rgw_bucket_dir_header> headers;
+ int r = cls_bucket_head(bucket, headers);
if (r < 0) {
ldout(cct, 20) << "cls_bucket_header() returned " << r << dendl;
return r;
bucket.convert(&entry.bucket);
- map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = header.stats.begin();
- for (; iter != header.stats.end(); ++iter) {
- struct rgw_bucket_category_stats& header_stats = iter->second;
- entry.size += header_stats.total_size;
- entry.size_rounded += header_stats.total_size_rounded;
- entry.count += header_stats.num_entries;
+ map<string, struct rgw_bucket_dir_header>::iterator hiter = headers.begin();
+ for (; hiter != headers.end(); ++hiter) {
+ map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = hiter->second.stats.begin();
+ for (; iter != hiter->second.stats.end(); ++iter) {
+ struct rgw_bucket_category_stats& header_stats = iter->second;
+ entry.size += header_stats.total_size;
+ entry.size_rounded += header_stats.total_size_rounded;
+ entry.count += header_stats.num_entries;
+ }
}
list<cls_user_bucket_entry> entries;
return 0;
}
+void RGWRados::get_bucket_index_objects(const string& bucket_oid_base,
+ uint32_t num_shards, map<int, string>& bucket_objects, int shard_id)
+{
+ if (!num_shards) {
+ bucket_objects[0] = bucket_oid_base;
+ } else {
+ char buf[bucket_oid_base.size() + 32];
+ if (shard_id < 0) {
+ for (uint32_t i = 0; i < num_shards; ++i) {
+ snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), i);
+ bucket_objects[i] = buf;
+ }
+ } else {
+ if ((uint32_t)shard_id > num_shards) {
+ return;
+ }
+ snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), shard_id);
+ bucket_objects[shard_id] = buf;
+ }
+ }
+}
+
+void RGWRados::get_bucket_instance_ids(RGWBucketInfo& bucket_info, int shard_id, map<int, string> *result)
+{
+ rgw_bucket& bucket = bucket_info.bucket;
+ string plain_id = bucket.name + ":" + bucket.bucket_id;
+ if (!bucket_info.num_shards) {
+ (*result)[0] = plain_id;
+ } else {
+ char buf[16];
+ if (shard_id < 0) {
+ for (uint32_t i = 0; i < bucket_info.num_shards; ++i) {
+ snprintf(buf, sizeof(buf), ":%d", i);
+ (*result)[i] = plain_id + buf;
+ }
+ } else {
+ if ((uint32_t)shard_id > bucket_info.num_shards) {
+ return;
+ }
+ snprintf(buf, sizeof(buf), ":%d", shard_id);
+ (*result)[shard_id] = plain_id + buf;
+ }
+ }
+}
+
+int RGWRados::get_bucket_index_object(const string& bucket_oid_base, const string& obj_key,
+ uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj, int *shard_id)
+{
+ int r = 0;
+ switch (hash_type) {
+ case RGWBucketInfo::MOD:
+ if (!num_shards) {
+ // By default with no sharding, we use the bucket oid as itself
+ (*bucket_obj) = bucket_oid_base;
+ if (shard_id) {
+ *shard_id = -1;
+ }
+ } else {
+ uint32_t sid = ceph_str_hash_linux(obj_key.c_str(), obj_key.size());
+ uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
+ sid = sid2 % MAX_BUCKET_INDEX_SHARDS_PRIME % num_shards;
+ char buf[bucket_oid_base.size() + 32];
+ snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), sid);
+ (*bucket_obj) = buf;
+ if (shard_id) {
+ *shard_id = (int)sid;
+ }
+ }
+ break;
+ default:
+ r = -ENOTSUP;
+ }
+ return r;
+}
+
int RGWRados::process_intent_log(rgw_bucket& bucket, string& oid,
time_t epoch, int flags, bool purge)
{
bool log_meta;
bool log_data;
- RGWZone() : log_meta(false), log_data(false) {}
+/**
+ * Represents the number of shards for the bucket index object, a value of zero
+ * indicates there is no sharding. By default (no sharding, the name of the object
+ * is '.dir.{marker}', with sharding, the name is '.dir.{markder}.{sharding_id}',
+ * sharding_id is zero-based value. It is not recommended to set a too large value
+ * (e.g. thousand) as it increases the cost for bucket listing.
+ */
+ uint32_t bucket_index_max_shards;
+
+ RGWZone() : log_meta(false), log_data(false), bucket_index_max_shards(0) {}
void encode(bufferlist& bl) const {
- ENCODE_START(2, 1, bl);
+ ENCODE_START(3, 1, bl);
::encode(name, bl);
::encode(endpoints, bl);
::encode(log_meta, bl);
::encode(log_data, bl);
+ ::encode(bucket_index_max_shards, bl);
ENCODE_FINISH(bl);
}
::decode(log_meta, bl);
::decode(log_data, bl);
}
+ if (struct_v >= 3) {
+ ::decode(bucket_index_max_shards, bl);
+ }
DECODE_FINISH(bl);
}
void dump(Formatter *f) const;
class RGWGetBucketStats_CB : public RefCountedObject {
protected:
rgw_bucket bucket;
- uint64_t bucket_ver;
- uint64_t master_ver;
map<RGWObjCategory, RGWStorageStats> *stats;
- string max_marker;
public:
RGWGetBucketStats_CB(rgw_bucket& _bucket) : bucket(_bucket), stats(NULL) {}
virtual ~RGWGetBucketStats_CB() {}
virtual void handle_response(int r) = 0;
- virtual void set_response(uint64_t _bucket_ver, uint64_t _master_ver,
- map<RGWObjCategory, RGWStorageStats> *_stats,
- const string &_max_marker) {
- bucket_ver = _bucket_ver;
- master_ver = _master_ver;
+ virtual void set_response(map<RGWObjCategory, RGWStorageStats> *_stats) {
stats = _stats;
- max_marker = _max_marker;
}
};
int open_bucket_data_ctx(rgw_bucket& bucket, librados::IoCtx& io_ctx);
int open_bucket_data_extra_ctx(rgw_bucket& bucket, librados::IoCtx& io_ctx);
int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, string& bucket_oid);
+ int open_bucket_index_base(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+ string& bucket_oid_base);
+ int open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+ const string& obj_key, string *bucket_obj, int *shard_id);
+ int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+ map<int, string>& bucket_objs, int shard_id = -1, map<int, string> *bucket_instance_ids = NULL);
+ template<typename T>
+ int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+ map<int, string>& oids, map<int, T>& bucket_objs,
+ int shard_id = -1, map<int, string> *bucket_instance_ids = NULL);
+ void build_bucket_index_marker(const string& shard_id_str, const string& shard_marker,
+ string *marker);
+
+ void get_bucket_instance_ids(RGWBucketInfo& bucket_info, int shard_id, map<int, string> *result);
struct GetObjState {
librados::IoCtx io_ctx;
Mutex bucket_id_lock;
+ // This field represents the number of bucket index object shards
+ uint32_t bucket_index_max_shards;
+
int get_obj_ioctx(const rgw_obj& obj, librados::IoCtx *ioctx);
int get_obj_ref(const rgw_obj& obj, rgw_rados_ref *ref, rgw_bucket *bucket, bool ref_system_obj = false);
uint64_t max_bucket_id;
gc(NULL), use_gc_thread(false), quota_threads(false),
num_watchers(0), watchers(NULL), watch_handles(NULL),
watch_initialized(false),
- bucket_id_lock("rados_bucket_id"), max_bucket_id(0),
+ bucket_id_lock("rados_bucket_id"),
+ bucket_index_max_shards(0),
+ max_bucket_id(0),
cct(NULL), rados(NULL),
pools_initialized(false),
quota_handler(NULL),
* create a bucket with name bucket and the given list of attrs
* returns 0 on success, -ERR# otherwise.
*/
- virtual int init_bucket_index(rgw_bucket& bucket);
+ virtual int init_bucket_index(rgw_bucket& bucket, int num_shards);
int select_bucket_placement(RGWUserInfo& user_info, const string& region_name, const std::string& rule,
const std::string& bucket_name, rgw_bucket& bucket, string *pselected_rule);
int select_legacy_bucket_placement(const string& bucket_name, rgw_bucket& bucket);
}
int decode_policy(bufferlist& bl, ACLOwner *owner);
- int get_bucket_stats(rgw_bucket& bucket, uint64_t *bucket_ver, uint64_t *master_ver, map<RGWObjCategory, RGWStorageStats>& stats,
- string *max_marker);
+ int get_bucket_stats(rgw_bucket& bucket, string *bucket_ver, string *master_ver,
+ map<RGWObjCategory, RGWStorageStats>& stats, string *max_marker);
int get_bucket_stats_async(rgw_bucket& bucket, RGWGetBucketStats_CB *cb);
int get_user_stats(const string& user, RGWStorageStats& stats);
int get_user_stats_async(const string& user, RGWGetUserStats_CB *cb);
virtual int put_linked_bucket_info(RGWBucketInfo& info, bool exclusive, time_t mtime, obj_version *pep_objv,
map<string, bufferlist> *pattrs, bool create_entry_point);
+ struct BucketShard {
+ RGWRados *store;
+ rgw_bucket bucket;
+ int shard_id;
+ librados::IoCtx index_ctx;
+ string bucket_obj;
+
+ BucketShard(RGWRados *_store) : store(_store), shard_id(-1) {}
+ int init(rgw_bucket& _bucket, rgw_obj& obj);
+ };
+
int cls_rgw_init_index(librados::IoCtx& io_ctx, librados::ObjectWriteOperation& op, string& oid);
- int cls_obj_prepare_op(rgw_bucket& bucket, RGWModifyOp op, string& tag,
- string& name, string& locator);
- int cls_obj_complete_op(rgw_bucket& bucket, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch,
+ int cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag, string& name, string& locator);
+ int cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch,
RGWObjEnt& ent, RGWObjCategory category, list<string> *remove_objs);
- int cls_obj_complete_add(rgw_bucket& bucket, string& tag, int64_t pool, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, list<string> *remove_objs);
- int cls_obj_complete_del(rgw_bucket& bucket, string& tag, int64_t pool, uint64_t epoch, string& name);
- int cls_obj_complete_cancel(rgw_bucket& bucket, string& tag, string& name);
+ int cls_obj_complete_add(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, list<string> *remove_objs);
+ int cls_obj_complete_del(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, string& name);
+ int cls_obj_complete_cancel(BucketShard& bs, string& tag, string& name);
int cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeout);
- int cls_bucket_list(rgw_bucket& bucket, string start, string prefix, uint32_t num,
- map<string, RGWObjEnt>& m, bool *is_truncated,
- string *last_entry, bool (*force_check_filter)(const string& name) = NULL);
- int cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header);
- int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx);
- int prepare_update_index(RGWObjState *state, rgw_bucket& bucket,
+ int cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix, uint32_t hint_num,
+ map<string, RGWObjEnt>& m, bool *is_truncated, string *last_entry,
+ bool (*force_check_filter)(const string& name) = NULL);
+ int cls_bucket_head(rgw_bucket& bucket, map<string, struct rgw_bucket_dir_header>& headers, map<int, string> *bucket_instance_ids = NULL);
+ int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx, int *num_aio);
+
+ int prepare_update_index(RGWObjState *state, BucketShard& bucket_shard,
RGWModifyOp op, rgw_obj& oid, string& tag);
- int complete_update_index(rgw_bucket& bucket, string& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size,
+ int complete_update_index(BucketShard& bucket_shard, rgw_obj& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size,
utime_t& ut, string& etag, string& content_type, bufferlist *acl_bl, RGWObjCategory category,
list<string> *remove_objs);
- int complete_update_index_del(rgw_bucket& bucket, string& oid, string& tag, int64_t pool, uint64_t epoch) {
- if (bucket_is_system(bucket))
+ int complete_update_index_del(BucketShard& bucket_shard, rgw_obj& oid, string& tag, int64_t pool, uint64_t epoch) {
+ if (bucket_is_system(bucket_shard.bucket))
return 0;
- return cls_obj_complete_del(bucket, tag, pool, epoch, oid);
+ return cls_obj_complete_del(bucket_shard, tag, pool, epoch, oid.object);
}
- int complete_update_index_cancel(rgw_bucket& bucket, string& oid, string& tag) {
- if (bucket_is_system(bucket))
+ int complete_update_index_cancel(BucketShard& bucket_shard, rgw_obj& oid, string& tag) {
+ if (bucket_is_system(bucket_shard.bucket))
return 0;
- return cls_obj_complete_cancel(bucket, tag, oid);
+ return cls_obj_complete_cancel(bucket_shard, tag, oid.object);
}
- int list_bi_log_entries(rgw_bucket& bucket, string& marker, uint32_t max, std::list<rgw_bi_log_entry>& result, bool *truncated);
- int trim_bi_log_entries(rgw_bucket& bucket, string& marker, string& end_marker);
+ int list_bi_log_entries(rgw_bucket& bucket, int shard_id, string& marker, uint32_t max, std::list<rgw_bi_log_entry>& result, bool *truncated);
+ int trim_bi_log_entries(rgw_bucket& bucket, int shard_id, string& marker, string& end_marker);
int cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info);
int cls_obj_usage_log_read(string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
void shard_name(const string& prefix, unsigned max_shards, const string& key, string& name);
void shard_name(const string& prefix, unsigned max_shards, const string& section, const string& key, string& name);
- void time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, string& section, string& key, bufferlist& bl);
+ void time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, const string& section, const string& key, bufferlist& bl);
int time_log_add(const string& oid, list<cls_log_entry>& entries);
int time_log_add(const string& oid, const utime_t& ut, const string& section, const string& key, bufferlist& bl);
int time_log_list(const string& oid, utime_t& start_time, utime_t& end_time,
}
private:
+ /**
+ * This is a helper method, it generates a list of bucket index objects with the given
+ * bucket base oid and number of shards.
+ *
+ * bucket_oid_base [in] - base name of the bucket index object;
+ * num_shards [in] - number of bucket index object shards.
+ * bucket_objs [out] - filled by this method, a list of bucket index objects.
+ */
+ void get_bucket_index_objects(const string& bucket_oid_base, uint32_t num_shards,
+ map<int, string>& bucket_objs, int shard_id = -1);
+
+ /**
+ * Get the bucket index object with the given base bucket index object and object key,
+ * and the number of bucket index shards.
+ *
+ * bucket_oid_base [in] - bucket object base name.
+ * obj_key [in] - object key.
+ * num_shards [in] - number of bucket index shards.
+ * hash_type [in] - type of hash to find the shard ID.
+ * bucket_obj [out] - the bucket index object for the given object.
+ *
+ * Return 0 on success, a failure code otherwise.
+ */
+ int get_bucket_index_object(const string& bucket_oid_base, const string& obj_key,
+ uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj, int *shard);
+
int process_intent_log(rgw_bucket& bucket, string& oid,
time_t epoch, int flags, bool purge);
/**
#include "rgw_replica_log.h"
#include "cls/replica_log/cls_replica_log_client.h"
+#include "cls/rgw/cls_rgw_client.h"
#include "rgw_rados.h"
+#define dout_subsys ceph_subsys_rgw
void RGWReplicaBounds::dump(Formatter *f) const
{
prefix = _store->ctx()->_conf->rgw_replica_log_obj_prefix;
prefix.append(".");
}
+
+string RGWReplicaBucketLogger::obj_name(const rgw_bucket& bucket, int shard_id)
+{
+ string s = prefix + bucket.name;
+
+ if (shard_id >= 0) {
+ char buf[16];
+ snprintf(buf, sizeof(buf), ".%d", shard_id);
+ s += buf;
+ }
+ return s;
+}
+
+int RGWReplicaBucketLogger::update_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id,
+ const string& marker, const utime_t& time,
+ const list<RGWReplicaItemMarker> *entries)
+{
+ if (shard_id >= 0 ||
+ !BucketIndexShardsManager::is_shards_marker(marker)) {
+ return RGWReplicaLogger::update_bound(obj_name(bucket, shard_id), pool,
+ daemon_id, marker, time, entries);
+ }
+
+ BucketIndexShardsManager sm;
+ int ret = sm.from_string(marker, shard_id);
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: could not parse shards marker: " << marker << dendl;
+ return ret;
+ }
+
+ map<int, string>& vals = sm.get();
+
+ ret = 0;
+
+ map<int, string>::iterator iter;
+ for (iter = vals.begin(); iter != vals.end(); ++iter) {
+ ldout(cct, 20) << "updating bound: bucket=" << bucket << " shard=" << iter->first << " marker=" << marker << dendl;
+ int r = RGWReplicaLogger::update_bound(obj_name(bucket, iter->first), pool,
+ daemon_id, iter->second, time, entries);
+ if (r < 0) {
+ ldout(cct, 0) << "failed to update bound: bucket=" << bucket << " shard=" << iter->first << " marker=" << marker << dendl;
+ ret = r;
+ }
+ }
+
+ return ret;
+}
class RGWReplicaBucketLogger : private RGWReplicaLogger {
string pool;
string prefix;
+
+ string obj_name(const rgw_bucket& bucket, int shard_id);
public:
RGWReplicaBucketLogger(RGWRados *_store);
- int update_bound(const rgw_bucket& bucket, const string& daemon_id,
+ int update_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id,
const string& marker, const utime_t& time,
- const list<RGWReplicaItemMarker> *entries) {
- return RGWReplicaLogger::update_bound(prefix+bucket.name, pool,
- daemon_id, marker, time, entries);
- }
- int delete_bound(const rgw_bucket& bucket, const string& daemon_id) {
- return RGWReplicaLogger::delete_bound(prefix+bucket.name, pool,
+ const list<RGWReplicaItemMarker> *entries);
+ int delete_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id) {
+ return RGWReplicaLogger::delete_bound(obj_name(bucket, shard_id), pool,
daemon_id);
}
- int get_bounds(const rgw_bucket& bucket, RGWReplicaBounds& bounds) {
- return RGWReplicaLogger::get_bounds(prefix+bucket.name, pool,
+ int get_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds) {
+ return RGWReplicaLogger::get_bounds(obj_name(bucket, shard_id), pool,
bounds);
}
};
return;
}
+ int shard_id;
+ http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
+ if (http_ret < 0) {
+ return;
+ }
+
if (!bucket_instance.empty()) {
http_ret = store->get_bucket_instance_info(NULL, bucket_instance, bucket_info, NULL, NULL);
if (http_ret < 0) {
send_response();
do {
list<rgw_bi_log_entry> entries;
- int ret = store->list_bi_log_entries(bucket_info.bucket,
+ int ret = store->list_bi_log_entries(bucket_info.bucket, shard_id,
marker, max_entries - count,
entries, &truncated);
if (ret < 0) {
http_ret = -EINVAL;
return;
}
+
+ int shard_id;
+ http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
+ if (http_ret < 0) {
+ return;
+ }
+
if (!bucket_instance.empty()) {
http_ret = store->get_bucket_instance_info(NULL, bucket_instance, bucket_info, NULL, NULL);
if (http_ret < 0) {
return;
}
}
- http_ret = store->trim_bi_log_entries(bucket_info.bucket, start_marker, end_marker);
+ http_ret = store->trim_bi_log_entries(bucket_info.bucket, shard_id, start_marker, end_marker);
if (http_ret < 0) {
dout(5) << "ERROR: trim_bi_log_entries() " << dendl;
}
};
class RGWOp_BILog_Info : public RGWRESTOp {
- uint64_t bucket_ver;
- uint64_t master_ver;
+ string bucket_ver;
+ string master_ver;
string max_marker;
public:
- RGWOp_BILog_Info() : bucket_ver(0), master_ver(0) {}
+ RGWOp_BILog_Info() : bucket_ver(), master_ver() {}
~RGWOp_BILog_Info() {}
int check_caps(RGWUserCaps& caps) {
return;
}
+ int shard_id;
+ http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
+ if (http_ret < 0) {
+ dout(5) << "failed to parse bucket instance" << dendl;
+ return;
+ }
+
rgw_bucket bucket;
if ((http_ret = bucket_instance_to_bucket(store, bucket_instance, bucket)) < 0)
return;
return;
}
- http_ret = rl.update_bound(bucket, daemon_id, marker, ut, &markers);
+ http_ret = rl.update_bound(bucket, shard_id, daemon_id, marker, ut, &markers);
}
void RGWOp_BILog_GetBounds::execute() {
return;
}
+ int shard_id;
+ http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
+ if (http_ret < 0) {
+ dout(5) << "failed to parse bucket instance" << dendl;
+ return;
+ }
+
rgw_bucket bucket;
if ((http_ret = bucket_instance_to_bucket(store, bucket_instance, bucket)) < 0)
return;
RGWReplicaBucketLogger rl(store);
- http_ret = rl.get_bounds(bucket, bounds);
+ http_ret = rl.get_bounds(bucket, shard_id, bounds);
}
void RGWOp_BILog_GetBounds::send_response() {
return;
}
+ int shard_id;
+ http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
+ if (http_ret < 0) {
+ dout(5) << "failed to parse bucket instance" << dendl;
+ return;
+ }
+
rgw_bucket bucket;
if ((http_ret = bucket_instance_to_bucket(store, bucket_instance, bucket)) < 0)
return;
RGWReplicaBucketLogger rl(store);
- http_ret = rl.delete_bound(bucket, daemon_id);
+ http_ret = rl.delete_bound(bucket, shard_id, daemon_id);
}
RGWOp *RGWHandler_ReplicaLog::op_get() {
int RGWGetObj_ObjStore_SWIFT::send_response_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
{
const char *content_type = NULL;
- int orig_ret = ret;
map<string, string> response_attrs;
map<string, string>::iterator riter;
}
}
- if (partial_content && !ret)
- ret = -STATUS_PARTIAL_CONTENT;
-
- if (ret)
- set_req_state_err(s, ret);
+ set_req_state_err(s, (partial_content && !ret) ? STATUS_PARTIAL_CONTENT : ret);
dump_errno(s);
for (riter = response_attrs.begin(); riter != response_attrs.end(); ++riter) {
sent_header = true;
send_data:
- if (get_data && !orig_ret) {
+ if (get_data && !ret) {
int r = s->cio->write(bl.c_str() + bl_ofs, bl_len);
if (r < 0)
return r;
if WITH_RADOSGW
ceph_test_cls_rgw_SOURCES = test/cls_rgw/test_cls_rgw.cc
ceph_test_cls_rgw_LDADD = \
- $(LIBRADOS) libcls_rgw_client.la \
- $(LIBCOMMON) $(UNITTEST_LDADD) $(RADOS_TEST_LDADD)
+ $(LIBRADOS) $(CRYPTO_LIBS) libcls_rgw_client.la \
+ $(LIBCOMMON) $(UNITTEST_LDADD) $(CEPH_GLOBAL) $(RADOS_TEST_LDADD)
ceph_test_cls_rgw_CXXFLAGS = $(UNITTEST_CXXFLAGS)
bin_DEBUGPROGRAMS += ceph_test_cls_rgw
endif # WITH_RADOSGW
#include "include/types.h"
#include "cls/rgw/cls_rgw_client.h"
+#include "cls/rgw/cls_rgw_ops.h"
#include "gtest/gtest.h"
#include "test/librados/test.h"
#include <errno.h>
#include <string>
#include <vector>
+#include <map>
using namespace librados;
void test_stats(librados::IoCtx& ioctx, string& oid, int category, uint64_t num_entries, uint64_t total_size)
{
- rgw_bucket_dir_header header;
- ASSERT_EQ(0, cls_rgw_get_dir_header(ioctx, oid, &header));
-
- rgw_bucket_category_stats& stats = header.stats[category];
- ASSERT_EQ(total_size, stats.total_size);
- ASSERT_EQ(num_entries, stats.num_entries);
+ map<int, struct rgw_cls_list_ret> results;
+ map<int, string> oids;
+ oids[0] = oid;
+ ASSERT_EQ(0, CLSRGWIssueGetDirHeader(ioctx, oids, results, 8)());
+
+ uint64_t entries = 0;
+ uint64_t size = 0;
+ map<int, struct rgw_cls_list_ret>::iterator iter = results.begin();
+ for (; iter != results.end(); ++iter) {
+ entries += (iter->second).dir.header.stats[category].num_entries;
+ size += (iter->second).dir.header.stats[category].total_size;
+ }
+ ASSERT_EQ(total_size, size);
+ ASSERT_EQ(num_entries, entries);
}
void index_prepare(OpMgr& mgr, librados::IoCtx& ioctx, string& oid, RGWModifyOp index_op, string& tag, string& obj, string& loc)
cls_rgw_encode_suggestion(suggest_op, dirent, updates);
}
- op = mgr.write_op();
- cls_rgw_bucket_set_tag_timeout(*op, 1); // short tag timeout
- ASSERT_EQ(0, ioctx.operate(bucket_oid, op));
+ map<int, string> bucket_objs;
+ bucket_objs[0] = bucket_oid;
+ int r = CLSRGWIssueSetTagTimeout(ioctx, bucket_objs, 8 /* max aio */, 1)();
+ ASSERT_EQ(0, r);
sleep(1);