map<string, bufferlist> keys;
string filter_prefix, end_key;
+ bufferlist start_bl;
+ bool start_key_added = false;
uint32_t i = 0;
string key;
key.append(marker);
start_key = key;
+ int ret = cls_cxx_map_get_val(hctx, start_key, &start_bl);
+ if ((ret < 0) && (ret != -ENOENT)) {
+ return ret;
+ }
} else {
start_key = key_iter;
}
if (ret < 0)
return ret;
+ if ((start_bl.length() > 0) && (!start_key_added)) {
+ keys[start_key] = start_bl;
+ start_key_added = true;
+ }
map<string, bufferlist>::iterator iter = keys.begin();
if (iter == keys.end())
break;
using namespace librados;
-const string BucketIndexShardsManager::KEY_VALUE_SEPARATOR = "#";
-const string BucketIndexShardsManager::SHARDS_SEPARATOR = ",";
-
-/**
- * This class represents the bucket index object operation callback context.
- */
-template <typename T>
-class ClsBucketIndexOpCtx : public ObjectOperationCompletion {
-private:
- T *data;
- int *ret_code;
-public:
- ClsBucketIndexOpCtx(T* _data, int *_ret_code) : data(_data), ret_code(_ret_code) { assert(data); }
- ~ClsBucketIndexOpCtx() {}
- void handle_completion(int r, bufferlist& outbl) {
- if (r >= 0) {
- try {
- bufferlist::iterator iter = outbl.begin();
- ::decode((*data), iter);
- } catch (buffer::error& err) {
- r = -EIO;
- }
- }
- if (ret_code) {
- *ret_code = r;
- }
- }
-};
-
-void BucketIndexAioManager::do_completion(int id) {
- Mutex::Locker l(lock);
-
- map<int, librados::AioCompletion*>::iterator iter = pendings.find(id);
- assert(iter != pendings.end());
- completions[id] = iter->second;
- pendings.erase(iter);
-
- // If the caller needs a list of finished objects, store them
- // for further processing
- map<int, string>::iterator miter = pending_objs.find(id);
- if (miter != pending_objs.end()) {
- completion_objs[id] = miter->second;
- pending_objs.erase(miter);
- }
-
- cond.Signal();
-}
-
-bool BucketIndexAioManager::wait_for_completions(int valid_ret_code,
- int *num_completions, int *ret_code, map<int, string> *objs) {
- lock.Lock();
- if (pendings.empty() && completions.empty()) {
- lock.Unlock();
- return false;
- }
-
- if (completions.empty()) {
- // Wait for AIO completion
- cond.Wait(lock);
- }
-
- // Clear the completed AIOs
- map<int, librados::AioCompletion*>::iterator iter = completions.begin();
- for (; iter != completions.end(); ++iter) {
- int r = iter->second->get_return_value();
- if (objs && r == 0) { /* update list of successfully completed objs */
- map<int, string>::iterator liter = completion_objs.find(iter->first);
- if (liter != completion_objs.end()) {
- (*objs)[liter->first] = liter->second;
- }
- }
- if (ret_code && (r < 0 && r != valid_ret_code))
- (*ret_code) = r;
- iter->second->release();
- }
- if (num_completions)
- (*num_completions) = completions.size();
- completions.clear();
- lock.Unlock();
-
- return true;
-}
-
void cls_rgw_bucket_init(ObjectWriteOperation& o)
{
bufferlist in;
o.exec("rgw", "bucket_init_index", in);
}
-static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx,
- const string& oid, BucketIndexAioManager *manager) {
- bufferlist in;
- librados::ObjectWriteOperation op;
- op.create(true);
- op.exec("rgw", "bucket_init_index", in);
- return manager->aio_operate(io_ctx, oid, &op);
-}
-
-static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
- const string& oid, uint64_t timeout, BucketIndexAioManager *manager) {
+void cls_rgw_bucket_set_tag_timeout(ObjectWriteOperation& o, uint64_t tag_timeout)
+{
bufferlist in;
struct rgw_cls_tag_timeout_op call;
- call.tag_timeout = timeout;
+ call.tag_timeout = tag_timeout;
::encode(call, in);
- ObjectWriteOperation op;
- op.exec("rgw", "bucket_set_tag_timeout", in);
- return manager->aio_operate(io_ctx, oid, &op);
-}
-
-int CLSRGWIssueBucketIndexInit::issue_op(int shard_id, const string& oid)
-{
- return issue_bucket_index_init_op(io_ctx, oid, &manager);
-}
-
-void CLSRGWIssueBucketIndexInit::cleanup()
-{
- // Do best effort removal
- for (map<int, string>::iterator citer = objs_container.begin(); citer != iter; ++citer) {
- io_ctx.remove(citer->second);
- }
-}
-
-int CLSRGWIssueSetTagTimeout::issue_op(int shard_id, const string& oid)
-{
- return issue_bucket_set_tag_timeout_op(io_ctx, oid, tag_timeout, &manager);
+ o.exec("rgw", "bucket_set_tag_timeout", in);
}
void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag,
o.exec("rgw", "bucket_complete_op", in);
}
-static bool issue_bucket_list_op(librados::IoCtx& io_ctx,
- const string& oid, const string& start_obj, const string& filter_prefix,
- uint32_t num_entries, BucketIndexAioManager *manager,
- struct rgw_cls_list_ret *pdata) {
- bufferlist in;
+
+int cls_rgw_list_op(IoCtx& io_ctx, string& oid, string& start_obj,
+ string& filter_prefix, uint32_t num_entries,
+ rgw_bucket_dir *dir, bool *is_truncated)
+{
+ bufferlist in, out;
struct rgw_cls_list_op call;
call.start_obj = start_obj;
call.filter_prefix = filter_prefix;
call.num_entries = num_entries;
::encode(call, in);
+ int r = io_ctx.exec(oid, "rgw", "bucket_list", in, out);
+ if (r < 0)
+ return r;
- librados::ObjectReadOperation op;
- op.exec("rgw", "bucket_list", in, new ClsBucketIndexOpCtx<struct rgw_cls_list_ret>(pdata, NULL));
- return manager->aio_operate(io_ctx, oid, &op);
-}
-
-int CLSRGWIssueBucketList::issue_op(int shard_id, const string& oid)
-{
- return issue_bucket_list_op(io_ctx, oid, start_obj, filter_prefix, num_entries, &manager, &result[shard_id]);
-}
+ struct rgw_cls_list_ret ret;
+ try {
+ bufferlist::iterator iter = out.begin();
+ ::decode(ret, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
-static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, const string& oid, int shard_id,
- BucketIndexShardsManager& marker_mgr, uint32_t max, BucketIndexAioManager *manager,
- struct cls_rgw_bi_log_list_ret *pdata) {
- bufferlist in;
- cls_rgw_bi_log_list_op call;
- call.marker = marker_mgr.get(shard_id, "");
- call.max = max;
- ::encode(call, in);
+ if (dir)
+ *dir = ret.dir;
+ if (is_truncated)
+ *is_truncated = ret.is_truncated;
- librados::ObjectReadOperation op;
- op.exec("rgw", "bi_log_list", in, new ClsBucketIndexOpCtx<struct cls_rgw_bi_log_list_ret>(pdata, NULL));
- return manager->aio_operate(io_ctx, oid, &op);
+ return r;
}
-int CLSRGWIssueBILogList::issue_op(int shard_id, const string& oid)
+int cls_rgw_bucket_check_index_op(IoCtx& io_ctx, string& oid,
+ rgw_bucket_dir_header *existing_header,
+ rgw_bucket_dir_header *calculated_header)
{
- return issue_bi_log_list_op(io_ctx, oid, shard_id, marker_mgr, max, &manager, &result[shard_id]);
-}
+ bufferlist in, out;
+ int r = io_ctx.exec(oid, "rgw", "bucket_check_index", in, out);
+ if (r < 0)
+ return r;
-static bool issue_bi_log_trim(librados::IoCtx& io_ctx, const string& oid, int shard_id,
- BucketIndexShardsManager& start_marker_mgr,
- BucketIndexShardsManager& end_marker_mgr, BucketIndexAioManager *manager) {
- bufferlist in;
- cls_rgw_bi_log_trim_op call;
- call.start_marker = start_marker_mgr.get(shard_id, "");
- call.end_marker = end_marker_mgr.get(shard_id, "");
- ::encode(call, in);
- ObjectWriteOperation op;
- op.exec("rgw", "bi_log_trim", in);
- return manager->aio_operate(io_ctx, oid, &op);
-}
+ struct rgw_cls_check_index_ret ret;
+ try {
+ bufferlist::iterator iter = out.begin();
+ ::decode(ret, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
-int CLSRGWIssueBILogTrim::issue_op(int shard_id, const string& oid)
-{
- return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager);
-}
+ if (existing_header)
+ *existing_header = ret.existing_header;
+ if (calculated_header)
+ *calculated_header = ret.calculated_header;
-static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager,
- struct rgw_cls_check_index_ret *pdata) {
- bufferlist in;
- librados::ObjectReadOperation op;
- op.exec("rgw", "bucket_check_index", in, new ClsBucketIndexOpCtx<struct rgw_cls_check_index_ret>(
- pdata, NULL));
- return manager->aio_operate(io_ctx, oid, &op);
+ return 0;
}
-int CLSRGWIssueBucketCheck::issue_op(int shard_id, const string& oid)
+int cls_rgw_bucket_rebuild_index_op(IoCtx& io_ctx, string& oid)
{
- return issue_bucket_check_index_op(io_ctx, oid, &manager, &result[shard_id]);
-}
-
-static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid,
- BucketIndexAioManager *manager) {
- bufferlist in;
- librados::ObjectWriteOperation op;
- op.exec("rgw", "bucket_rebuild_index", in);
- return manager->aio_operate(io_ctx, oid, &op);
-}
+ bufferlist in, out;
+ int r = io_ctx.exec(oid, "rgw", "bucket_rebuild_index", in, out);
+ if (r < 0)
+ return r;
-int CLSRGWIssueBucketRebuild::issue_op(int shard_id, const string& oid)
-{
- return issue_bucket_rebuild_index_op(io_ctx, oid, &manager);
+ return 0;
}
void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates)
o.exec("rgw", "dir_suggest_changes", updates);
}
-int CLSRGWIssueGetDirHeader::issue_op(int shard_id, const string& oid)
+int cls_rgw_get_dir_header(IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header)
{
- return issue_bucket_list_op(io_ctx, oid, "", "", 0, &manager, &result[shard_id]);
+ bufferlist in, out;
+ struct rgw_cls_list_op call;
+ call.num_entries = 0;
+ ::encode(call, in);
+ int r = io_ctx.exec(oid, "rgw", "bucket_list", in, out);
+ if (r < 0)
+ return r;
+
+ struct rgw_cls_list_ret ret;
+ try {
+ bufferlist::iterator iter = out.begin();
+ ::decode(ret, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+
+ if (header)
+ *header = ret.dir.header;
+
+ return r;
}
class GetDirHeaderCompletion : public ObjectOperationCompletion {
return 0;
}
+int cls_rgw_bi_log_list(IoCtx& io_ctx, string& oid, string& marker, uint32_t max,
+ list<rgw_bi_log_entry>& entries, bool *truncated)
+{
+ bufferlist in, out;
+ cls_rgw_bi_log_list_op call;
+ call.marker = marker;
+ call.max = max;
+ ::encode(call, in);
+ int r = io_ctx.exec(oid, "rgw", "bi_log_list", in, out);
+ if (r < 0)
+ return r;
+
+ cls_rgw_bi_log_list_ret ret;
+ try {
+ bufferlist::iterator iter = out.begin();
+ ::decode(ret, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+
+ entries = ret.entries;
+
+ if (truncated)
+ *truncated = ret.truncated;
+
+ return r;
+}
+
+int cls_rgw_bi_log_trim(IoCtx& io_ctx, string& oid, string& start_marker, string& end_marker)
+{
+ do {
+ int r;
+ bufferlist in, out;
+ cls_rgw_bi_log_trim_op call;
+ call.start_marker = start_marker;
+ call.end_marker = end_marker;
+ ::encode(call, in);
+ r = io_ctx.exec(oid, "rgw", "bi_log_trim", in, out);
+
+ if (r == -ENODATA)
+ break;
+
+ if (r < 0)
+ return r;
+
+ } while (1);
+
+ return 0;
+}
+
int cls_rgw_usage_log_read(IoCtx& io_ctx, string& oid, string& user,
uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
string& read_iter, map<rgw_user_bucket, rgw_usage_log_entry>& usage,
#define CEPH_CLS_RGW_CLIENT_H
#include "include/types.h"
-#include "include/str_list.h"
#include "include/rados/librados.hpp"
#include "cls_rgw_types.h"
-#include "cls_rgw_ops.h"
#include "common/RefCountedObj.h"
-// Forward declaration
-class BucketIndexAioManager;
-
-/*
- * Bucket index AIO request argument, this is used to pass a argument
- * to callback.
- */
-struct BucketIndexAioArg : public RefCountedObject {
- BucketIndexAioArg(int _id, BucketIndexAioManager* _manager) :
- id(_id), manager(_manager) {}
- int id;
- BucketIndexAioManager* manager;
-};
-
-/*
- * This class manages AIO completions. This class is not completely thread-safe,
- * methods like *get_next* is not thread-safe and is expected to be called from
- * within one thread.
- */
-class BucketIndexAioManager {
-private:
- map<int, librados::AioCompletion*> pendings;
- map<int, librados::AioCompletion*> completions;
- map<int, string> pending_objs;
- map<int, string> completion_objs;
- int next;
- Mutex lock;
- Cond cond;
- /*
- * Callback implementation for AIO request.
- */
- static void bucket_index_op_completion_cb(void* cb, void* arg) {
- BucketIndexAioArg* cb_arg = (BucketIndexAioArg*) arg;
- cb_arg->manager->do_completion(cb_arg->id);
- cb_arg->put();
- }
-
- /*
- * Get next request ID. This method is not thread-safe.
- *
- * Return next request ID.
- */
- int get_next() { return next++; }
-
- /*
- * Add a new pending AIO completion instance.
- *
- * @param id - the request ID.
- * @param completion - the AIO completion instance.
- * @param oid - the object id associated with the object, if it is NULL, we don't
- * track the object id per callback.
- */
- void add_pending(int id, librados::AioCompletion* completion, const string& oid) {
- pendings[id] = completion;
- pending_objs[id] = oid;
- }
-public:
- /*
- * Create a new instance.
- */
- BucketIndexAioManager() : next(0), lock("BucketIndexAioManager::lock") {}
-
-
- /*
- * Do completion for the given AIO request.
- */
- void do_completion(int id);
-
- /*
- * Wait for AIO completions.
- *
- * valid_ret_code - valid AIO return code.
- * num_completions - number of completions.
- * ret_code - return code of failed AIO.
- * objs - a list of objects that has been finished the AIO.
- *
- * Return false if there is no pending AIO, true otherwise.
- */
- bool wait_for_completions(int valid_ret_code, int *num_completions, int *ret_code,
- map<int, string> *objs);
-
- /**
- * Do aio read operation.
- */
- bool aio_operate(librados::IoCtx& io_ctx, const string& oid, librados::ObjectReadOperation *op) {
- Mutex::Locker l(lock);
- BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this);
- librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
- int r = io_ctx.aio_operate(oid, c, (librados::ObjectReadOperation*)op, NULL);
- if (r >= 0) {
- add_pending(arg->id, c, oid);
- }
- return r;
- }
-
- /**
- * Do aio write operation.
- */
- bool aio_operate(librados::IoCtx& io_ctx, const string& oid, librados::ObjectWriteOperation *op) {
- Mutex::Locker l(lock);
- BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this);
- librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
- int r = io_ctx.aio_operate(oid, c, (librados::ObjectWriteOperation*)op);
- if (r >= 0) {
- add_pending(arg->id, c, oid);
- }
- return r;
- }
-};
-
class RGWGetDirHeader_CB : public RefCountedObject {
public:
virtual ~RGWGetDirHeader_CB() {}
virtual void handle_response(int r, rgw_bucket_dir_header& header) = 0;
};
-class BucketIndexShardsManager {
-private:
- // Per shard setting manager, for example, marker.
- map<int, string> value_by_shards;
-public:
- const static string KEY_VALUE_SEPARATOR;
- const static string SHARDS_SEPARATOR;
-
- void add(int shard, const string& value) {
- value_by_shards[shard] = value;
- }
-
- const string& get(int shard, const string& default_value) {
- map<int, string>::iterator iter = value_by_shards.find(shard);
- return (iter == value_by_shards.end() ? default_value : iter->second);
- }
-
- map<int, string>& get() {
- return value_by_shards;
- }
-
- bool empty() {
- return value_by_shards.empty();
- }
-
- void to_string(string *out) const {
- if (!out) {
- return;
- }
- out->clear();
- map<int, string>::const_iterator iter = value_by_shards.begin();
- for (; iter != value_by_shards.end(); ++iter) {
- if (out->length()) {
- // Not the first item, append a separator first
- out->append(SHARDS_SEPARATOR);
- }
- char buf[16];
- snprintf(buf, sizeof(buf), "%d", iter->first);
- out->append(buf);
- out->append(KEY_VALUE_SEPARATOR);
- out->append(iter->second);
- }
- }
-
- static bool is_shards_marker(const string& marker) {
- return marker.find(KEY_VALUE_SEPARATOR) != string::npos;
- }
-
- /*
- * convert from string. There are two options of how the string looks like:
- *
- * 1. Single shard, no shard id specified, e.g. 000001.23.1
- *
- * for this case, if passed shard_id >= 0, use this shard id, otherwise assume that it's a
- * bucket with no shards.
- *
- * 2. One or more shards, shard id specified for each shard, e.g., 0#00002.12,1#00003.23.2
- *
- */
- int from_string(const string& composed_marker, int shard_id) {
- value_by_shards.clear();
- vector<string> shards;
- get_str_vec(composed_marker, SHARDS_SEPARATOR.c_str(), shards);
- if (shards.size() > 1 && shard_id >= 0) {
- return -EINVAL;
- }
- vector<string>::const_iterator iter = shards.begin();
- for (; iter != shards.end(); ++iter) {
- size_t pos = iter->find(KEY_VALUE_SEPARATOR);
- if (pos == string::npos) {
- if (!value_by_shards.empty()) {
- return -EINVAL;
- }
- if (shard_id < 0) {
- add(0, *iter);
- } else {
- add(shard_id, *iter);
- }
- return 0;
- }
- string shard_str = iter->substr(0, pos);
- string err;
- int shard = (int)strict_strtol(shard_str.c_str(), 10, &err);
- if (!err.empty()) {
- return -EINVAL;
- }
- add(shard, iter->substr(pos + 1));
- }
- return 0;
- }
-};
-
/* bucket index */
void cls_rgw_bucket_init(librados::ObjectWriteOperation& o);
-class CLSRGWConcurrentIO {
-protected:
- librados::IoCtx& io_ctx;
- map<int, string>& objs_container;
- map<int, string>::iterator iter;
- uint32_t max_aio;
- BucketIndexAioManager manager;
-
- virtual int issue_op(int shard_id, const string& oid) = 0;
-
- virtual void cleanup() {}
- virtual int valid_ret_code() { return 0; }
- // Return true if multiple rounds of OPs might be needed, this happens when
- // OP needs to be re-send until a certain code is returned.
- virtual bool need_multiple_rounds() { return false; }
- // Add a new object to the end of the container.
- virtual void add_object(int shard, const string& oid) {}
- virtual void reset_container(map<int, string>& objs) {}
-
-public:
- CLSRGWConcurrentIO(librados::IoCtx& ioc, map<int, string>& _objs_container,
- uint32_t _max_aio) : io_ctx(ioc), objs_container(_objs_container), max_aio(_max_aio) {}
- virtual ~CLSRGWConcurrentIO() {}
-
- int operator()() {
- int ret = 0;
- iter = objs_container.begin();
- for (; iter != objs_container.end() && max_aio-- > 0; ++iter) {
- ret = issue_op(iter->first, iter->second);
- if (ret < 0)
- break;
- }
-
- int num_completions, r = 0;
- map<int, string> objs;
- map<int, string> *pobjs = (need_multiple_rounds() ? &objs : NULL);
- while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r, pobjs)) {
- if (r >= 0 && ret >= 0) {
- for(int i = 0; i < num_completions && iter != objs_container.end(); ++i, ++iter) {
- int issue_ret = issue_op(iter->first, iter->second);
- if(issue_ret < 0) {
- ret = issue_ret;
- break;
- }
- }
- } else if (ret >= 0) {
- ret = r;
- }
- if (need_multiple_rounds() && iter == objs_container.end() && !objs.empty()) {
- // For those objects which need another round, use them to reset
- // the container
- reset_container(objs);
- }
- }
-
- if (ret < 0) {
- cleanup();
- }
- return ret;
- }
-};
-
-class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO {
-protected:
- int issue_op(int shard_id, const string& oid);
- int valid_ret_code() { return -EEXIST; }
- void cleanup();
-public:
- CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc, map<int, string>& _bucket_objs,
- uint32_t _max_aio) :
- CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {}
-};
-
-class CLSRGWIssueSetTagTimeout : public CLSRGWConcurrentIO {
- uint64_t tag_timeout;
-protected:
- int issue_op(int shard_id, const string& oid);
-public:
- CLSRGWIssueSetTagTimeout(librados::IoCtx& ioc, map<int, string>& _bucket_objs,
- uint32_t _max_aio, uint64_t _tag_timeout) :
- CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio), tag_timeout(_tag_timeout) {}
-};
+void cls_rgw_bucket_set_tag_timeout(librados::ObjectWriteOperation& o, uint64_t tag_timeout);
void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag,
string& name, string& locator, bool log_op);
rgw_bucket_entry_ver& ver, string& name, rgw_bucket_dir_entry_meta& dir_meta,
list<string> *remove_objs, bool log_op);
-/**
- * List the bucket with the starting object and filter prefix.
- * NOTE: this method do listing requests for each bucket index shards identified by
- * the keys of the *list_results* map, which means the map should be popludated
- * by the caller to fill with each bucket index object id.
- *
- * io_ctx - IO context for rados.
- * start_obj - marker for the listing.
- * filter_prefix - filter prefix.
- * num_entries - number of entries to request for each object (note the total
- * amount of entries returned depends on the number of shardings).
- * list_results - the list results keyed by bucket index object id.
- * max_aio - the maximum number of AIO (for throttling).
- *
- * Return 0 on success, a failure code otherwise.
-*/
-
-class CLSRGWIssueBucketList : public CLSRGWConcurrentIO {
- string start_obj;
- string filter_prefix;
- uint32_t num_entries;
- map<int, rgw_cls_list_ret>& result;
-protected:
- int issue_op(int shard_id, const string& oid);
-public:
- CLSRGWIssueBucketList(librados::IoCtx& io_ctx, const string& _start_obj,
- const string& _filter_prefix, uint32_t _num_entries,
- map<int, string>& oids,
- map<int, struct rgw_cls_list_ret>& list_results,
- uint32_t max_aio) :
- CLSRGWConcurrentIO(io_ctx, oids, max_aio),
- start_obj(_start_obj), filter_prefix(_filter_prefix), num_entries(_num_entries), result(list_results) {}
-};
-
-class CLSRGWIssueBILogList : public CLSRGWConcurrentIO {
- map<int, struct cls_rgw_bi_log_list_ret>& result;
- BucketIndexShardsManager& marker_mgr;
- uint32_t max;
-protected:
- int issue_op(int shard_id, const string& oid);
-public:
- CLSRGWIssueBILogList(librados::IoCtx& io_ctx, BucketIndexShardsManager& _marker_mgr, uint32_t _max,
- map<int, string>& oids,
- map<int, struct cls_rgw_bi_log_list_ret>& bi_log_lists, uint32_t max_aio) :
- CLSRGWConcurrentIO(io_ctx, oids, max_aio), result(bi_log_lists),
- marker_mgr(_marker_mgr), max(_max) {}
-};
-
-class CLSRGWIssueBILogTrim : public CLSRGWConcurrentIO {
- BucketIndexShardsManager& start_marker_mgr;
- BucketIndexShardsManager& end_marker_mgr;
-protected:
- int issue_op(int shard_id, const string& oid);
- // Trim until -ENODATA is returned.
- int valid_ret_code() { return -ENODATA; }
- bool need_multiple_rounds() { return true; }
- void add_object(int shard, const string& oid) { objs_container[shard] = oid; }
- void reset_container(map<int, string>& objs) {
- objs_container.swap(objs);
- iter = objs_container.begin();
- objs.clear();
- }
-public:
- CLSRGWIssueBILogTrim(librados::IoCtx& io_ctx, BucketIndexShardsManager& _start_marker_mgr,
- BucketIndexShardsManager& _end_marker_mgr, map<int, string>& _bucket_objs, uint32_t max_aio) :
- CLSRGWConcurrentIO(io_ctx, _bucket_objs, max_aio),
- start_marker_mgr(_start_marker_mgr), end_marker_mgr(_end_marker_mgr) {}
-};
-
-/**
- * Check the bucket index.
- *
- * io_ctx - IO context for rados.
- * bucket_objs_ret - check result for all shards.
- * max_aio - the maximum number of AIO (for throttling).
- *
- * Return 0 on success, a failure code otherwise.
- */
-class CLSRGWIssueBucketCheck : public CLSRGWConcurrentIO /*<map<string, struct rgw_cls_check_index_ret> >*/ {
- map<int, struct rgw_cls_check_index_ret>& result;
-protected:
- int issue_op(int shard_id, const string& oid);
-public:
- CLSRGWIssueBucketCheck(librados::IoCtx& ioc, map<int, string>& oids, map<int, struct rgw_cls_check_index_ret>& bucket_objs_ret,
- uint32_t _max_aio) :
- CLSRGWConcurrentIO(ioc, oids, _max_aio), result(bucket_objs_ret) {}
-};
-
-class CLSRGWIssueBucketRebuild : public CLSRGWConcurrentIO {
-protected:
- int issue_op(int shard_id, const string& oid);
-public:
- CLSRGWIssueBucketRebuild(librados::IoCtx& io_ctx, map<int, string>& bucket_objs,
- uint32_t max_aio) : CLSRGWConcurrentIO(io_ctx, bucket_objs, max_aio) {}
-};
-
-class CLSRGWIssueGetDirHeader : public CLSRGWConcurrentIO {
- map<int, rgw_cls_list_ret>& result;
-protected:
- int issue_op(int shard_id, const string& oid);
-public:
- CLSRGWIssueGetDirHeader(librados::IoCtx& io_ctx, map<int, string>& oids, map<int, rgw_cls_list_ret>& dir_headers,
- uint32_t max_aio) :
- CLSRGWConcurrentIO(io_ctx, oids, max_aio), result(dir_headers) {}
-};
+int cls_rgw_list_op(librados::IoCtx& io_ctx, string& oid, string& start_obj,
+ string& filter_prefix, uint32_t num_entries,
+ rgw_bucket_dir *dir, bool *is_truncated);
+int cls_rgw_bucket_check_index_op(librados::IoCtx& io_ctx, string& oid,
+ rgw_bucket_dir_header *existing_header,
+ rgw_bucket_dir_header *calculated_header);
+int cls_rgw_bucket_rebuild_index_op(librados::IoCtx& io_ctx, string& oid);
+
+int cls_rgw_get_dir_header(librados::IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header);
int cls_rgw_get_dir_header_async(librados::IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx);
void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates);
void cls_rgw_suggest_changes(librados::ObjectWriteOperation& o, bufferlist& updates);
+/* bucket index log */
+
+int cls_rgw_bi_log_list(librados::IoCtx& io_ctx, string& oid, string& marker, uint32_t max,
+ list<rgw_bi_log_entry>& entries, bool *truncated);
+int cls_rgw_bi_log_trim(librados::IoCtx& io_ctx, string& oid, string& start_marker, string& end_marker);
+
/* usage logging */
int cls_rgw_usage_log_read(librados::IoCtx& io_ctx, string& oid, string& user,
uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
OPTION(rgw_max_chunk_size, OPT_INT, 512 * 1024)
-/**
- * override max bucket index shards in zone configuration (if not zero)
- *
- * Represents the number of shards for the bucket index object, a value of zero
- * indicates there is no sharding. By default (no sharding, the name of the object
- * is '.dir.{marker}', with sharding, the name is '.dir.{markder}.{sharding_id}',
- * sharding_id is zero-based value. It is not recommended to set a too large value
- * (e.g. thousand) as it increases the cost for bucket listing.
- */
-OPTION(rgw_override_bucket_index_max_shards, OPT_U32, 0)
-
-/**
- * Represents the maximum AIO pending requests for the bucket index object shards.
- */
-OPTION(rgw_bucket_index_max_aio, OPT_U32, 8)
-
OPTION(rgw_data, OPT_STR, "/var/lib/ceph/radosgw/$cluster-$id")
OPTION(rgw_enable_apis, OPT_STR, "s3, swift, swift_auth, admin")
OPTION(rgw_cache_enabled, OPT_BOOL, true) // rgw cache enabled
return r;
map<RGWObjCategory, RGWStorageStats> stats;
- string bucket_ver, master_ver;
+ uint64_t bucket_ver, master_ver;
string max_marker;
int ret = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, stats, &max_marker);
if (ret < 0) {
formatter->dump_string("marker", bucket.marker);
formatter->dump_string("owner", bucket_info.owner);
formatter->dump_int("mtime", mtime);
- formatter->dump_string("ver", bucket_ver);
- formatter->dump_string("master_ver", master_ver);
+ formatter->dump_int("ver", bucket_ver);
+ formatter->dump_int("master_ver", master_ver);
formatter->dump_string("max_marker", max_marker);
dump_bucket_usage(stats, formatter);
formatter->close_section();
do {
list<rgw_bi_log_entry> entries;
- ret = store->list_bi_log_entries(bucket, shard_id, marker, max_entries - count, entries, &truncated);
+ ret = store->list_bi_log_entries(bucket, marker, max_entries - count, entries, &truncated);
if (ret < 0) {
cerr << "ERROR: list_bi_log_entries(): " << cpp_strerror(-ret) << std::endl;
return -ret;
cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
- ret = store->trim_bi_log_entries(bucket, shard_id, start_marker, end_marker);
+ ret = store->trim_bi_log_entries(bucket, start_marker, end_marker);
if (ret < 0) {
cerr << "ERROR: trim_bi_log_entries(): " << cpp_strerror(-ret) << std::endl;
return -ret;
}
RGWReplicaBucketLogger logger(store);
- ret = logger.get_bounds(bucket, shard_id, bounds);
+ ret = logger.get_bounds(bucket, bounds);
if (ret < 0)
return -ret;
} else { // shouldn't get here
}
RGWReplicaBucketLogger logger(store);
- ret = logger.delete_bound(bucket, shard_id, daemon_id);
+ ret = logger.delete_bound(bucket, daemon_id);
if (ret < 0)
return -ret;
}
return store->meta_mgr->remove_entry(bucket_instance_meta_handler, entry, objv_tracker);
}
-int rgw_bucket_parse_bucket_instance(const string& bucket_instance, string *target_bucket_instance, int *shard_id)
-{
- ssize_t pos = bucket_instance.rfind(':');
- if (pos < 0) {
- return -EINVAL;
- }
-
- string first = bucket_instance.substr(0, pos);
- string second = bucket_instance.substr(pos + 1);
-
- if (first.find(':') == string::npos) {
- *shard_id = -1;
- *target_bucket_instance = bucket_instance;
- return 0;
- }
-
- *target_bucket_instance = first;
- string err;
- *shard_id = strict_strtol(second.c_str(), 10, &err);
- if (!err.empty()) {
- return -EINVAL;
- }
-
- return 0;
-}
-
int rgw_bucket_set_attrs(RGWRados *store, RGWBucketInfo& bucket_info,
map<string, bufferlist>& attrs,
map<string, bufferlist>* rmattrs,
RGWBucketInfo info;
bufferlist bl;
- string bucket_ver, master_ver;
+ uint64_t bucket_ver, master_ver;
ret = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, stats, NULL);
if (ret < 0)
while (is_truncated) {
map<string, RGWObjEnt> result;
- int r = store->cls_bucket_list(bucket, marker, prefix, 1000,
- result, &is_truncated, &marker,
- bucket_object_check_filter);
+ int r = store->cls_bucket_list(bucket, marker, prefix, 1000, result,
+ &is_truncated, &marker,
+ bucket_object_check_filter);
if (r == -ENOENT) {
break;
bucket = bucket_info.bucket;
- string bucket_ver, master_ver;
+ uint64_t bucket_ver, master_ver;
string max_marker;
int ret = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, stats, &max_marker);
if (ret < 0) {
formatter->dump_string("id", bucket.bucket_id);
formatter->dump_string("marker", bucket.marker);
formatter->dump_string("owner", bucket_info.owner);
- formatter->dump_string("ver", bucket_ver);
- formatter->dump_string("master_ver", master_ver);
+ formatter->dump_int("ver", bucket_ver);
+ formatter->dump_int("master_ver", master_ver);
formatter->dump_int("mtime", mtime);
formatter->dump_string("max_marker", max_marker);
dump_bucket_usage(stats, formatter);
}
-int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) {
- const string& name = bs.bucket.name;
- int shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0);
- uint32_t r = (ceph_str_hash_linux(name.c_str(), name.size()) + shard_shift) % num_shards;
+int RGWDataChangesLog::choose_oid(rgw_bucket& bucket) {
+ string& name = bucket.name;
+ uint32_t r = ceph_str_hash_linux(name.c_str(), name.size()) % num_shards;
return (int)r;
}
/* we can't keep the bucket name as part of the cls_log_entry, and we need
* it later, so we keep two lists under the map */
- map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > > m;
+ map<int, pair<list<string>, list<cls_log_entry> > > m;
lock.Lock();
- map<rgw_bucket_shard, bool> entries;
+ map<string, rgw_bucket> entries;
entries.swap(cur_cycle);
lock.Unlock();
- map<rgw_bucket_shard, bool>::iterator iter;
+ map<string, rgw_bucket>::iterator iter;
string section;
utime_t ut = ceph_clock_now(cct);
for (iter = entries.begin(); iter != entries.end(); ++iter) {
- const rgw_bucket_shard& bs = iter->first;
- const rgw_bucket& bucket = bs.bucket;
- int shard_id = bs.shard_id;
-
- int index = choose_oid(bs);
+ rgw_bucket& bucket = iter->second;
+ int index = choose_oid(bucket);
cls_log_entry entry;
bufferlist bl;
change.entity_type = ENTITY_TYPE_BUCKET;
change.key = bucket.name + ":" + bucket.bucket_id;
- if (shard_id >= 0) {
- char buf[16];
- snprintf(buf, sizeof(buf), ":%d", shard_id);
- change.key += buf;
- }
change.timestamp = ut;
::encode(change, bl);
store->time_log_prepare_entry(entry, ut, section, bucket.name, bl);
- m[index].first.push_back(bs);
+ m[index].first.push_back(bucket.name);
m[index].second.push_back(entry);
}
- map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > >::iterator miter;
+ map<int, pair<list<string>, list<cls_log_entry> > >::iterator miter;
for (miter = m.begin(); miter != m.end(); ++miter) {
list<cls_log_entry>& entries = miter->second.second;
utime_t expiration = now;
expiration += utime_t(cct->_conf->rgw_data_log_window, 0);
- list<rgw_bucket_shard>& buckets = miter->second.first;
- list<rgw_bucket_shard>::iterator liter;
+ list<string>& buckets = miter->second.first;
+ list<string>::iterator liter;
for (liter = buckets.begin(); liter != buckets.end(); ++liter) {
update_renewed(*liter, expiration);
}
return 0;
}
-void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status)
+void RGWDataChangesLog::_get_change(string& bucket_name, ChangeStatusPtr& status)
{
assert(lock.is_locked());
- if (!changes.find(bs, status)) {
+ if (!changes.find(bucket_name, status)) {
status = ChangeStatusPtr(new ChangeStatus);
- changes.add(bs, status);
+ changes.add(bucket_name, status);
}
}
-void RGWDataChangesLog::register_renew(rgw_bucket_shard& bs)
+void RGWDataChangesLog::register_renew(rgw_bucket& bucket)
{
Mutex::Locker l(lock);
- cur_cycle[bs] = true;
+ cur_cycle[bucket.name] = bucket;
}
-void RGWDataChangesLog::update_renewed(rgw_bucket_shard& bs, utime_t& expiration)
+void RGWDataChangesLog::update_renewed(string& bucket_name, utime_t& expiration)
{
Mutex::Locker l(lock);
ChangeStatusPtr status;
- _get_change(bs, status);
+ _get_change(bucket_name, status);
- ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bs.bucket.name << " shard_id=" << bs.shard_id << " expiration=" << expiration << dendl;
+ ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bucket_name << " expiration=" << expiration << dendl;
status->cur_expiration = expiration;
}
-int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) {
+int RGWDataChangesLog::add_entry(rgw_bucket& bucket) {
if (!store->need_to_log_data())
return 0;
- rgw_bucket_shard bs(bucket, shard_id);
-
lock.Lock();
ChangeStatusPtr status;
- _get_change(bs, status);
+ _get_change(bucket.name, status);
lock.Unlock();
status->lock->Lock();
- ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " shard_id=" << shard_id << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl;
+ ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl;
if (now < status->cur_expiration) {
/* no need to send, recently completed */
status->lock->Unlock();
- register_renew(bs);
+ register_renew(bucket);
return 0;
}
int ret = cond->wait();
cond->put();
if (!ret) {
- register_renew(bs);
+ register_renew(bucket);
}
return ret;
}
status->cond = new RefCountedCond;
status->pending = true;
- string& oid = oids[choose_oid(bs)];
+ string& oid = oids[choose_oid(bucket)];
utime_t expiration;
int ret;
rgw_data_change change;
change.entity_type = ENTITY_TYPE_BUCKET;
change.key = bucket.name + ":" + bucket.bucket_id;
- if (shard_id >= 0) {
- char buf[16];
- snprintf(buf, sizeof(buf), ":%d", shard_id);
- change.key += buf;
- }
change.timestamp = now;
::encode(change, bl);
string section;
objv_tracker = bci.info.objv_tracker;
- ret = store->init_bucket_index(bci.info.bucket, bci.info.num_shards);
+ ret = store->init_bucket_index(bci.info.bucket);
if (ret < 0)
return ret;
map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker,
time_t mtime);
-extern int rgw_bucket_parse_bucket_instance(const string& bucket_instance, string *target_bucket_instance, int *shard_id);
-
extern int rgw_bucket_instance_remove_entry(RGWRados *store, string& entry, RGWObjVersionTracker *objv_tracker);
extern int rgw_bucket_delete_bucket_obj(RGWRados *store, string& bucket_name, RGWObjVersionTracker& objv_tracker);
typedef ceph::shared_ptr<ChangeStatus> ChangeStatusPtr;
- lru_map<rgw_bucket_shard, ChangeStatusPtr> changes;
+ lru_map<string, ChangeStatusPtr> changes;
- map<rgw_bucket_shard, bool> cur_cycle;
+ map<string, rgw_bucket> cur_cycle;
- void _get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status);
- void register_renew(rgw_bucket_shard& bs);
- void update_renewed(rgw_bucket_shard& bs, utime_t& expiration);
+ void _get_change(string& bucket_name, ChangeStatusPtr& status);
+ void register_renew(rgw_bucket& bucket);
+ void update_renewed(string& bucket_name, utime_t& expiration);
class ChangesRenewThread : public Thread {
CephContext *cct;
~RGWDataChangesLog();
- int choose_oid(const rgw_bucket_shard& bs);
- int add_entry(rgw_bucket& bucket, int shard_id);
+ int choose_oid(rgw_bucket& bucket);
+ int add_entry(rgw_bucket& bucket);
int renew_entries();
int list_entries(int shard, utime_t& start_time, utime_t& end_time, int max_entries,
list<rgw_data_change>& entries,
PerfCounters *perfcounter = NULL;
-const uint32_t RGWBucketInfo::NUM_SHARDS_BLIND_BUCKET(UINT32_MAX);
-
int rgw_perf_start(CephContext *cct)
{
PerfCountersBuilder plb(cct, cct->_conf->name.to_str(), l_rgw_first, l_rgw_last);
#define ERR_USER_SUSPENDED 2100
#define ERR_INTERNAL_ERROR 2200
-#ifndef UINT32_MAX
-#define UINT32_MAX (4294967295)
-#endif
-
typedef void *RGWAccessHandle;
return out;
}
-struct rgw_bucket_shard {
- rgw_bucket bucket;
- int shard_id;
-
- rgw_bucket_shard() : shard_id(-1) {}
- rgw_bucket_shard(rgw_bucket& _b, int _sid) : bucket(_b), shard_id(_sid) {}
-
- bool operator<(const rgw_bucket_shard& b) const {
- if (bucket < b.bucket) {
- return true;
- }
- if (b.bucket < bucket) {
- return false;
- }
- return shard_id < b.shard_id;
- }
-};
-
-
struct RGWObjVersionTracker {
obj_version read_version;
obj_version write_version;
struct RGWBucketInfo
{
- enum BIShardsHashType {
- MOD = 0
- };
-
rgw_bucket bucket;
string owner;
uint32_t flags;
obj_version ep_objv; /* entry point object version, for runtime tracking only */
RGWQuotaInfo quota;
- // Represents the number of bucket index object shards:
- // - value of 0 indicates there is no sharding (this is by default before this
- // feature is implemented).
- // - value of UINT32_T::MAX indicates this is a blind bucket.
- uint32_t num_shards;
-
- // Represents the bucket index shard hash type.
- uint8_t bucket_index_shard_hash_type;
-
- // Represents the shard number for blind bucket.
- const static uint32_t NUM_SHARDS_BLIND_BUCKET;
-
void encode(bufferlist& bl) const {
- ENCODE_START(11, 4, bl);
+ ENCODE_START(9, 4, bl);
::encode(bucket, bl);
::encode(owner, bl);
::encode(flags, bl);
::encode(placement_rule, bl);
::encode(has_instance_obj, bl);
::encode(quota, bl);
- ::encode(num_shards, bl);
- ::encode(bucket_index_shard_hash_type, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator& bl) {
::decode(has_instance_obj, bl);
if (struct_v >= 9)
::decode(quota, bl);
- if (struct_v >= 10)
- ::decode(num_shards, bl);
- if (struct_v >= 11)
- ::decode(bucket_index_shard_hash_type, bl);
DECODE_FINISH(bl);
}
void dump(Formatter *f) const;
void decode_json(JSONObj *obj);
- RGWBucketInfo() : flags(0), creation_time(0), has_instance_obj(false), num_shards(0), bucket_index_shard_hash_type(MOD) {}
+ RGWBucketInfo() : flags(0), creation_time(0), has_instance_obj(false) {}
};
WRITE_CLASS_ENCODER(RGWBucketInfo)
bool in_extra_data; /* in-memory only member, does not serialize */
- // Represents the hash index source for this object once it is set (non-empty)
- std::string index_hash_source;
-
rgw_obj() : in_extra_data(false) {}
rgw_obj(const char *b, const char *o) : in_extra_data(false) {
rgw_bucket _b(b);
return orig_key;
}
- string& get_hash_object() {
- return index_hash_source.empty() ? object : index_hash_source;
- }
/**
* Translate a namespace-mangled object name to the user-facing name
* existing in the given namespace.
encode_json("placement_rule", placement_rule, f);
encode_json("has_instance_obj", has_instance_obj, f);
encode_json("quota", quota, f);
- encode_json("num_shards", num_shards, f);
- encode_json("bi_shard_hash_type", (uint32_t)bucket_index_shard_hash_type, f);
}
void RGWBucketInfo::decode_json(JSONObj *obj) {
JSONDecoder::decode_json("placement_rule", placement_rule, obj);
JSONDecoder::decode_json("has_instance_obj", has_instance_obj, obj);
JSONDecoder::decode_json("quota", quota, obj);
- JSONDecoder::decode_json("num_shards", num_shards, obj);
- uint32_t hash_type;
- JSONDecoder::decode_json("bi_shard_hash_type", hash_type, obj);
- bucket_index_shard_hash_type = (uint8_t)hash_type;
}
void RGWObjEnt::dump(Formatter *f) const
encode_json("endpoints", endpoints, f);
encode_json("log_meta", log_meta, f);
encode_json("log_data", log_data, f);
- encode_json("bucket_index_max_shards", bucket_index_max_shards, f);
}
void RGWZone::decode_json(JSONObj *obj)
JSONDecoder::decode_json("endpoints", endpoints, obj);
JSONDecoder::decode_json("log_meta", log_meta, obj);
JSONDecoder::decode_json("log_data", log_data, obj);
- JSONDecoder::decode_json("bucket_index_max_shards", bucket_index_max_shards, obj);
}
void RGWRegionPlacementTarget::dump(Formatter *f) const
string obj_str;
RGWUserInfo bucket_owner_info;
- string bi = s->info.args.get(RGW_SYS_PARAM_PREFIX "bucket-instance");
- if (!bi.empty()) {
- int shard_id;
- ret = rgw_bucket_parse_bucket_instance(bi, &s->bucket_instance_id, &shard_id);
- if (ret < 0) {
- return ret;
- }
- }
+ s->bucket_instance_id = s->info.args.get(RGW_SYS_PARAM_PREFIX "bucket-instance");
s->bucket_acl = new RGWAccessControlPolicy(s->cct);
}
head_obj = manifest_gen.get_cur_obj();
- head_obj.index_hash_source = obj_str;
cur_obj = head_obj;
return 0;
obj.init_ns(s->bucket, tmp_obj_name, mp_ns);
// the meta object will be indexed with 0 size, we c
obj.set_in_extra_data(true);
- obj.index_hash_source = s->object_str;
ret = store->put_obj_meta(s->obj_ctx, obj, 0, NULL, attrs, RGW_OBJ_CATEGORY_MULTIMETA, PUT_OBJ_CREATE_EXCL, s->owner.get_id());
} while (ret == -EEXIST);
}
meta_obj.init_ns(s->bucket, meta_oid, mp_ns);
meta_obj.set_in_extra_data(true);
- meta_obj.index_hash_source = s->object_str;
ret = get_obj_attrs(store, s, meta_obj, attrs, NULL, NULL);
if (ret < 0) {
string oid = mp.get_part(obj_iter->second.num);
rgw_obj obj;
obj.init_ns(s->bucket, oid, mp_ns);
- obj.index_hash_source = s->object_str;
ret = store->delete_obj(s->obj_ctx, owner, obj);
if (ret < 0 && ret != -ENOENT)
return;
RGWObjManifest::obj_iterator oiter;
for (oiter = manifest.obj_begin(); oiter != manifest.obj_end(); ++oiter) {
rgw_obj loc = oiter.get_location();
- loc.index_hash_source = s->object_str;
ret = store->delete_obj(s->obj_ctx, owner, loc);
if (ret < 0 && ret != -ENOENT)
return;
// and also remove the metadata obj
meta_obj.init_ns(s->bucket, meta_oid, mp_ns);
meta_obj.set_in_extra_data(true);
- meta_obj.index_hash_source = s->object_str;
ret = store->delete_obj(s->obj_ctx, owner, meta_obj);
if (ret == -ENOENT) {
ret = -ERR_NO_SUCH_BUCKET;
{
RGWBucketInfo bucket_info;
- string bucket_ver;
- string master_ver;
+ uint64_t bucket_ver;
+ uint64_t master_ver;
map<RGWObjCategory, RGWStorageStats> bucket_stats;
int r = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, bucket_stats, NULL);
#include "rgw_metadata.h"
#include "rgw_bucket.h"
-#include "cls/rgw/cls_rgw_ops.h"
#include "cls/rgw/cls_rgw_types.h"
#include "cls/rgw/cls_rgw_client.h"
#include "cls/refcount/cls_refcount_client.h"
#define dout_subsys ceph_subsys_rgw
-#define MAX_BUCKET_INDEX_SHARDS_PRIME 7877
-
using namespace std;
static RGWCache<RGWRados> cached_rados_provider;
#define RGW_STATELOG_OBJ_PREFIX "statelog."
+
#define dout_subsys ceph_subsys_rgw
void RGWDefaultRegionInfo::dump(Formatter *f) const {
quota_handler = RGWQuotaHandler::generate_handler(this, quota_threads);
- bucket_index_max_shards = (cct->_conf->rgw_override_bucket_index_max_shards ? cct->_conf->rgw_override_bucket_index_max_shards :
- zone_public_config.bucket_index_max_shards);
- if (bucket_index_max_shards > MAX_BUCKET_INDEX_SHARDS_PRIME) {
- bucket_index_max_shards = MAX_BUCKET_INDEX_SHARDS_PRIME;
- ldout(cct, 1) << __func__ << " bucket index max shards is too large, reset to value: "
- << MAX_BUCKET_INDEX_SHARDS_PRIME << dendl;
- }
- ldout(cct, 20) << __func__ << " bucket index max shards: " << bucket_index_max_shards << dendl;
-
return ret;
}
return 0;
}
-void RGWRados::build_bucket_index_marker(const string& shard_id_str, const string& shard_marker,
- string *marker) {
- if (marker) {
- *marker = shard_id_str;
- marker->append(BucketIndexShardsManager::KEY_VALUE_SEPARATOR);
- marker->append(shard_marker);
- }
-}
-
int RGWRados::open_bucket_index_ctx(rgw_bucket& bucket, librados::IoCtx& index_ctx)
{
int r = open_bucket_pool_ctx(bucket.name, bucket.index_pool, index_ctx);
name = prefix + buf;
}
-void RGWRados::time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, const string& section, const string& key, bufferlist& bl)
+void RGWRados::time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, string& section, string& key, bufferlist& bl)
{
cls_log_add_prepare_entry(entry, ut, section, key, bl);
}
result.push_back(ent);
count++;
}
-
- // Either the back-end telling us truncated, or we don't consume all
- // items returned per the amount caller request
- truncated = (truncated || eiter != ent_map.end());
}
done:
return 0;
}
-int RGWRados::init_bucket_index(rgw_bucket& bucket, int num_shards)
+int RGWRados::init_bucket_index(rgw_bucket& bucket)
{
librados::IoCtx index_ctx; // context for new bucket
string dir_oid = dir_oid_prefix;
dir_oid.append(bucket.marker);
- map<int, string> bucket_objs;
- get_bucket_index_objects(dir_oid, num_shards, bucket_objs);
+ librados::ObjectWriteOperation op;
+ op.create(true);
+ r = cls_rgw_init_index(index_ctx, op, dir_oid);
+ if (r < 0 && r != -EEXIST)
+ return r;
- return CLSRGWIssueBucketIndexInit(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
+ return 0;
}
/**
string dir_oid = dir_oid_prefix;
dir_oid.append(bucket.marker);
- r = init_bucket_index(bucket, bucket_index_max_shards);
+ r = init_bucket_index(bucket);
if (r < 0)
return r;
info.owner = owner.user_id;
info.region = region_name;
info.placement_rule = selected_placement_rule;
- info.num_shards = bucket_index_max_shards;
- info.bucket_index_shard_hash_type = RGWBucketInfo::MOD;
if (!creation_time)
time(&info.creation_time);
else
/* remove bucket index */
librados::IoCtx index_ctx; // context for new bucket
- map<int, string> bucket_objs;
- int r = open_bucket_index(bucket, index_ctx, bucket_objs);
+ int r = open_bucket_index_ctx(bucket, index_ctx);
if (r < 0)
return r;
- map<int, string>::const_iterator biter;
- for (biter = bucket_objs.begin(); biter != bucket_objs.end(); ++biter) {
- // Do best effort removal
- index_ctx.remove(biter->second);
- }
+ index_ctx.remove(dir_oid);
}
/* ret == -ENOENT here */
}
return 0;
}
-int RGWRados::BucketShard::init(rgw_bucket& _bucket, rgw_obj& obj)
-{
- bucket = _bucket;
-
- if (store->bucket_is_system(bucket)) {
- return 0;
- }
-
- int ret = store->open_bucket_index_shard(bucket, index_ctx, obj.get_hash_object(), &bucket_obj, &shard_id);
- if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: open_bucket_index_shard() returned ret=" << ret << dendl;
- return ret;
- }
- ldout(store->ctx(), 20) << " bucket index object: " << bucket_obj << dendl;
-
- return 0;
-}
-
-
/**
* Write/overwrite an object to the bucket storage.
* bucket: the bucket to store the object in
index_tag = state->write_tag;
}
- librados::IoCtx index_ctx;
- BucketShard bs(this);
- r = bs.init(bucket, obj);
- if (r < 0) {
- return r;
- }
-
- r = prepare_update_index(NULL, bs, CLS_RGW_OP_ADD, obj, index_tag);
+ r = prepare_update_index(NULL, bucket, CLS_RGW_OP_ADD, obj, index_tag);
if (r < 0)
return r;
ldout(cct, 0) << "ERROR: complete_atomic_overwrite returned r=" << r << dendl;
}
- r = complete_update_index(bs, obj, index_tag, poolid, epoch, size,
- ut, etag, content_type, &acl_bl, category, remove_objs);
+ r = complete_update_index(bucket, obj.object, index_tag, poolid, epoch, size,
+ ut, etag, content_type, &acl_bl, category, remove_objs);
if (r < 0)
goto done_cancel;
return 0;
done_cancel:
- int ret = complete_update_index_cancel(bs, obj, index_tag);
+ int ret = complete_update_index_cancel(bucket, obj.object, index_tag);
if (ret < 0) {
ldout(cct, 0) << "ERROR: complete_update_index_cancel() returned ret=" << ret << dendl;
}
return 0;
}
-int RGWRados::open_bucket_index_base(rgw_bucket& bucket, librados::IoCtx& index_ctx,
- string& bucket_oid_base) {
- if (bucket_is_system(bucket))
- return -EINVAL;
-
- int r = open_bucket_index_ctx(bucket, index_ctx);
- if (r < 0)
- return r;
-
- if (bucket.marker.empty()) {
- ldout(cct, 0) << "ERROR: empty marker for bucket operation" << dendl;
- return -EIO;
- }
-
- bucket_oid_base = dir_oid_prefix;
- bucket_oid_base.append(bucket.marker);
-
- return 0;
-
-}
-
-int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
- map<int, string>& bucket_objs, int shard_id, map<int, string> *bucket_instance_ids) {
- string bucket_oid_base;
- int ret = open_bucket_index_base(bucket, index_ctx, bucket_oid_base);
- if (ret < 0)
- return ret;
-
- // Get the bucket info
- RGWBucketInfo binfo;
- ret = get_bucket_instance_info(NULL, bucket, binfo, NULL, NULL);
- if (ret < 0)
- return ret;
-
- get_bucket_index_objects(bucket_oid_base, binfo.num_shards, bucket_objs, shard_id);
- if (bucket_instance_ids) {
- get_bucket_instance_ids(binfo, shard_id, bucket_instance_ids);
- }
- return 0;
-}
-
-template<typename T>
-int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
- map<int, string>& oids, map<int, T>& bucket_objs,
- int shard_id, map<int, string> *bucket_instance_ids)
-{
- int ret = open_bucket_index(bucket, index_ctx, oids, shard_id, bucket_instance_ids);
- if (ret < 0)
- return ret;
-
- map<int, string>::const_iterator iter = oids.begin();
- for (; iter != oids.end(); ++iter) {
- bucket_objs[iter->first] = T();
- }
- return 0;
-}
-
-int RGWRados::open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index_ctx,
- const string& obj_key, string *bucket_obj, int *shard_id)
-{
- string bucket_oid_base;
- int ret = open_bucket_index_base(bucket, index_ctx, bucket_oid_base);
- if (ret < 0)
- return ret;
-
- // Get the bucket info
- RGWBucketInfo binfo;
- ret = get_bucket_instance_info(NULL, bucket, binfo, NULL, NULL);
- if (ret < 0)
- return ret;
-
- ret = get_bucket_index_object(bucket_oid_base, obj_key, binfo.num_shards,
- (RGWBucketInfo::BIShardsHashType)binfo.bucket_index_shard_hash_type, bucket_obj, shard_id);
- if (ret < 0) {
- ldout(cct, 10) << "get_bucket_index_object() returned ret=" << ret << dendl;
- }
- return 0;
-}
-
-static void accumulate_raw_stats(rgw_bucket_dir_header& header, map<RGWObjCategory, RGWStorageStats>& stats)
+static void translate_raw_stats(rgw_bucket_dir_header& header, map<RGWObjCategory, RGWStorageStats>& stats)
{
map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = header.stats.begin();
for (; iter != header.stats.end(); ++iter) {
RGWStorageStats& s = stats[category];
struct rgw_bucket_category_stats& header_stats = iter->second;
s.category = (RGWObjCategory)iter->first;
- s.num_kb += ((header_stats.total_size + 1023) / 1024);
- s.num_kb_rounded += ((header_stats.total_size_rounded + 1023) / 1024);
- s.num_objects += header_stats.num_entries;
+ s.num_kb = ((header_stats.total_size + 1023) / 1024);
+ s.num_kb_rounded = ((header_stats.total_size_rounded + 1023) / 1024);
+ s.num_objects = header_stats.num_entries;
}
}
map<RGWObjCategory, RGWStorageStats> *calculated_stats)
{
librados::IoCtx index_ctx;
- // key - bucket index object id
- // value - bucket index check OP returned result with the given bucket index object (shard)
- map<int, string> oids;
- map<int, struct rgw_cls_check_index_ret> bucket_objs_ret;
- int ret = open_bucket_index(bucket, index_ctx, oids, bucket_objs_ret);
+ string oid;
+
+ int ret = open_bucket_index(bucket, index_ctx, oid);
if (ret < 0)
return ret;
- ret = CLSRGWIssueBucketCheck(index_ctx, oids, bucket_objs_ret, cct->_conf->rgw_bucket_index_max_aio)();
+ rgw_bucket_dir_header existing_header;
+ rgw_bucket_dir_header calculated_header;
+
+ ret = cls_rgw_bucket_check_index_op(index_ctx, oid, &existing_header, &calculated_header);
if (ret < 0)
return ret;
- // Aggregate results (from different shards if there is any)
- map<int, struct rgw_cls_check_index_ret>::iterator iter;
- for (iter = bucket_objs_ret.begin(); iter != bucket_objs_ret.end(); ++iter) {
- accumulate_raw_stats(iter->second.existing_header, *existing_stats);
- accumulate_raw_stats(iter->second.calculated_header, *calculated_stats);
- }
+ translate_raw_stats(existing_header, *existing_stats);
+ translate_raw_stats(calculated_header, *calculated_stats);
return 0;
}
int RGWRados::bucket_rebuild_index(rgw_bucket& bucket)
{
librados::IoCtx index_ctx;
- map<int, string> bucket_objs;
- int r = open_bucket_index(bucket, index_ctx, bucket_objs);
- if (r < 0)
- return r;
+ string oid;
+
+ int ret = open_bucket_index(bucket, index_ctx, oid);
+ if (ret < 0)
+ return ret;
- return CLSRGWIssueBucketRebuild(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
+ return cls_rgw_bucket_rebuild_index_op(index_ctx, oid);
}
bool ret_not_existed = (state && !state->exists);
- BucketShard bs(this);
- r = bs.init(bucket, obj);
- if (r < 0) {
- return r;
- }
-
string tag;
- r = prepare_update_index(state, bs, CLS_RGW_OP_DEL, obj, tag);
+ r = prepare_update_index(state, bucket, CLS_RGW_OP_DEL, obj, tag);
if (r < 0)
return r;
int64_t poolid = ref.ioctx.get_id();
if (r >= 0 || r == -ENOENT) {
uint64_t epoch = ref.ioctx.get_last_version();
- r = complete_update_index_del(bs, obj, tag, poolid, epoch);
+ r = complete_update_index_del(bucket, obj.object, tag, poolid, epoch);
} else {
- int ret = complete_update_index_cancel(bs, obj, tag);
+ int ret = complete_update_index_cancel(bucket, obj.object, tag);
if (ret < 0) {
ldout(cct, 0) << "ERROR: complete_update_index_cancel returned ret=" << ret << dendl;
}
std::string oid, key;
get_obj_bucket_and_oid_key(obj, bucket, oid, key);
- BucketShard bs(this);
- int r = bs.init(bucket, obj);
- if (r < 0) {
- return r;
- }
-
string tag;
- r = complete_update_index_del(bs, obj, tag, -1 /* pool */, 0);
+ int r = complete_update_index_del(bucket, obj.object, tag, -1 /* pool */, 0);
return r;
}
if (!op.size())
return 0;
- librados::IoCtx index_ctx;
- BucketShard bs(this);
- r = bs.init(bucket, obj);
- if (r < 0) {
- ldout(cct, 10) << "bs.init() returned r=" << r << dendl;
- return r;
- }
-
string tag;
if (state) {
- r = prepare_update_index(state, bs, CLS_RGW_OP_ADD, obj, tag);
+ r = prepare_update_index(state, bucket, CLS_RGW_OP_ADD, obj, tag);
if (r < 0)
return r;
}
uint64_t epoch = ref.ioctx.get_last_version();
int64_t poolid = ref.ioctx.get_id();
utime_t mtime = ceph_clock_now(cct);
- r = complete_update_index(bs, obj, tag, poolid, epoch, state->size,
+ r = complete_update_index(bucket, obj.object, tag, poolid, epoch, state->size,
mtime, etag, content_type, &acl_bl, RGW_OBJ_CATEGORY_MAIN, NULL);
} else {
- int ret = complete_update_index_cancel(bs, obj, tag);
+ int ret = complete_update_index_cancel(bucket, obj.object, tag);
if (ret < 0) {
ldout(cct, 0) << "ERROR: comlete_update_index_cancel() returned r=" << r << dendl;
}
return r;
}
-int RGWRados::prepare_update_index(RGWObjState *state, BucketShard& bs,
+int RGWRados::prepare_update_index(RGWObjState *state, rgw_bucket& bucket,
RGWModifyOp op, rgw_obj& obj, string& tag)
{
- if (bucket_is_system(bs.bucket))
+ if (bucket_is_system(bucket))
return 0;
- int ret = data_log->add_entry(bs.bucket, bs.shard_id);
+ int ret = data_log->add_entry(obj.bucket);
if (ret < 0) {
lderr(cct) << "ERROR: failed writing data log" << dendl;
return ret;
append_rand_alpha(cct, tag, tag, 32);
}
}
- ret = cls_obj_prepare_op(bs, op, tag, obj.object, obj.key);
+ ret = cls_obj_prepare_op(bucket, op, tag,
+ obj.object, obj.key);
return ret;
}
-int RGWRados::complete_update_index(BucketShard& bs, rgw_obj& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size,
+int RGWRados::complete_update_index(rgw_bucket& bucket, string& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size,
utime_t& ut, string& etag, string& content_type, bufferlist *acl_bl, RGWObjCategory category,
list<string> *remove_objs)
{
- if (bucket_is_system(bs.bucket))
+ if (bucket_is_system(bucket))
return 0;
RGWObjEnt ent;
- ent.name = oid.object;
+ ent.name = oid;
ent.size = size;
ent.mtime = ut;
ent.etag = etag;
ent.owner_display_name = owner.get_display_name();
ent.content_type = content_type;
- int ret = cls_obj_complete_add(bs, tag, poolid, epoch, ent, category, remove_objs);
+ int ret = cls_obj_complete_add(bucket, tag, poolid, epoch, ent, category, remove_objs);
return ret;
}
string etag;
string content_type;
bufferlist acl_bl;
- BucketShard bs(this);
bool update_index = (category == RGW_OBJ_CATEGORY_MAIN ||
category == RGW_OBJ_CATEGORY_MULTIMETA);
int64_t poolid = io_ctx.get_id();
int ret;
- ret = bs.init(bucket, dst_obj);
- if (ret < 0) {
- goto done;
- }
-
if (update_index) {
- ret = prepare_update_index(state, bs, CLS_RGW_OP_ADD, dst_obj, tag);
+ ret = prepare_update_index(state, bucket, CLS_RGW_OP_ADD, dst_obj, tag);
if (ret < 0)
goto done;
}
if (update_index) {
if (ret >= 0) {
- ret = complete_update_index(bs, dst_obj, tag, poolid, epoch, size,
+ ret = complete_update_index(bucket, dst_obj.object, tag, poolid, epoch, size,
ut, etag, content_type, &acl_bl, category, NULL);
} else {
- int r = complete_update_index_cancel(bs, dst_obj, tag);
+ int r = complete_update_index_cancel(bucket, dst_obj.object, tag);
if (r < 0) {
ldout(cct, 0) << "ERROR: comlete_update_index_cancel() returned r=" << r << dendl;
}
return 0;
}
-int RGWRados::get_bucket_stats(rgw_bucket& bucket, string *bucket_ver, string *master_ver,
- map<RGWObjCategory, RGWStorageStats>& stats, string *max_marker)
+int RGWRados::get_bucket_stats(rgw_bucket& bucket, uint64_t *bucket_ver, uint64_t *master_ver, map<RGWObjCategory, RGWStorageStats>& stats,
+ string *max_marker)
{
- map<string, rgw_bucket_dir_header> headers;
- map<int, string> bucket_instance_ids;
- int r = cls_bucket_head(bucket, headers, &bucket_instance_ids);
+ rgw_bucket_dir_header header;
+ int r = cls_bucket_head(bucket, header);
if (r < 0)
return r;
- assert(headers.size() == bucket_instance_ids.size());
-
- map<string, rgw_bucket_dir_header>::iterator iter = headers.begin();
- map<int, string>::iterator viter = bucket_instance_ids.begin();
- BucketIndexShardsManager ver_mgr;
- BucketIndexShardsManager master_ver_mgr;
- BucketIndexShardsManager marker_mgr;
- char buf[64];
- for(; iter != headers.end(); ++iter, ++viter) {
- accumulate_raw_stats(iter->second, stats);
- snprintf(buf, sizeof(buf), "%lu", iter->second.ver);
- ver_mgr.add(viter->first, string(buf));
- snprintf(buf, sizeof(buf), "%lu", iter->second.master_ver);
- master_ver_mgr.add(viter->first, string(buf));
- marker_mgr.add(viter->first, iter->second.max_marker);
- }
- ver_mgr.to_string(bucket_ver);
- master_ver_mgr.to_string(master_ver);
- marker_mgr.to_string(max_marker);
+ stats.clear();
+
+ translate_raw_stats(header, stats);
+
+ *bucket_ver = header.ver;
+ *master_ver = header.master_ver;
+
+ if (max_marker)
+ *max_marker = header.max_marker;
+
return 0;
}
class RGWGetBucketStatsContext : public RGWGetDirHeader_CB {
RGWGetBucketStats_CB *cb;
- uint32_t pendings;
- map<RGWObjCategory, RGWStorageStats> stats;
- int ret_code;
- bool should_cb;
- Mutex lock;
public:
- RGWGetBucketStatsContext(RGWGetBucketStats_CB *_cb, uint32_t _pendings)
- : cb(_cb), pendings(_pendings), stats(), ret_code(0), should_cb(true),
- lock("RGWGetBucketStatsContext") {}
-
+ RGWGetBucketStatsContext(RGWGetBucketStats_CB *_cb) : cb(_cb) {}
void handle_response(int r, rgw_bucket_dir_header& header) {
- Mutex::Locker l(lock);
- if (should_cb) {
- if ( r >= 0) {
- accumulate_raw_stats(header, stats);
- } else {
- ret_code = r;
- }
+ map<RGWObjCategory, RGWStorageStats> stats;
- // Are we all done?
- if (--pendings == 0) {
- if (!ret_code) {
- cb->set_response(&stats);
- }
- cb->handle_response(ret_code);
- cb->put();
- }
+ if (r >= 0) {
+ translate_raw_stats(header, stats);
+ cb->set_response(header.ver, header.master_ver, &stats, header.max_marker);
}
- }
- void unset_cb() {
- Mutex::Locker l(lock);
- should_cb = false;
+ cb->handle_response(r);
+
+ cb->put();
}
};
int RGWRados::get_bucket_stats_async(rgw_bucket& bucket, RGWGetBucketStats_CB *ctx)
{
- RGWBucketInfo binfo;
- int r = get_bucket_instance_info(NULL, bucket, binfo, NULL, NULL);
- if (r < 0)
- return r;
-
- int num_aio = 0;
- RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(ctx, binfo.num_shards);
- assert(get_ctx);
- r = cls_bucket_head_async(bucket, get_ctx, &num_aio);
- get_ctx->put();
+ RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(ctx);
+ int r = cls_bucket_head_async(bucket, get_ctx);
if (r < 0) {
ctx->put();
- if (num_aio) {
- get_ctx->unset_cb();
- }
+ delete get_ctx;
+ return r;
}
- return r;
+
+ return 0;
}
class RGWGetUserStatsContext : public RGWGetUserHeader_CB {
time_t *pmtime, map<string, bufferlist> *pattrs)
{
string oid;
- if (bucket.oid.empty()) {
+ if (!bucket.oid.empty()) {
get_bucket_meta_oid(bucket, oid);
} else {
oid = bucket.oid;
RGWBucketEnt& ent = iter->second;
rgw_bucket& bucket = ent.bucket;
- map<string, rgw_bucket_dir_header> headers;
- int r = cls_bucket_head(bucket, headers);
+ rgw_bucket_dir_header header;
+ int r = cls_bucket_head(bucket, header);
if (r < 0)
return r;
- map<string, rgw_bucket_dir_header>::iterator hiter = headers.begin();
- for (; hiter != headers.end(); ++hiter) {
- RGWObjCategory category = main_category;
- map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = (hiter->second.stats).find((uint8_t)category);
- if (iter != hiter->second.stats.end()) {
- struct rgw_bucket_category_stats& stats = iter->second;
- ent.count += stats.num_entries;
- ent.size += stats.total_size;
- ent.size_rounded += stats.total_size_rounded;
- }
+ ent.count = 0;
+ ent.size = 0;
+
+ RGWObjCategory category = main_category;
+ map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = header.stats.find((uint8_t)category);
+ if (iter != header.stats.end()) {
+ struct rgw_bucket_category_stats& stats = iter->second;
+ ent.count = stats.num_entries;
+ ent.size = stats.total_size;
+ ent.size_rounded = stats.total_size_rounded;
}
}
return oids.size();
}
-int RGWRados::list_bi_log_entries(rgw_bucket& bucket, int shard_id, string& marker, uint32_t max,
+int RGWRados::list_bi_log_entries(rgw_bucket& bucket, string& marker, uint32_t max,
std::list<rgw_bi_log_entry>& result, bool *truncated)
{
- ldout(cct, 20) << __func__ << ": " << bucket << " marker " << marker << " shard_id=" << shard_id << " max " << max << dendl;
result.clear();
librados::IoCtx index_ctx;
- map<int, string> oids;
- map<int, cls_rgw_bi_log_list_ret> bi_log_lists;
- map<int, string> bucket_instance_ids;
- int r = open_bucket_index(bucket, index_ctx, oids, shard_id, &bucket_instance_ids);
- if (r < 0)
- return r;
-
- BucketIndexShardsManager marker_mgr;
- bool has_shards = (oids.size() > 1 || shard_id >= 0);
- // If there are multiple shards for the bucket index object, the marker
- // should have the pattern '{shard_id_1}#{shard_marker_1},{shard_id_2}#
- // {shard_marker_2}...', if there is no sharding, the bi_log_list should
- // only contain one record, and the key is the bucket instance id.
- r = marker_mgr.from_string(marker, shard_id);
- if (r < 0)
- return r;
-
- r = CLSRGWIssueBILogList(index_ctx, marker_mgr, max, oids, bi_log_lists, cct->_conf->rgw_bucket_index_max_aio)();
+ string oid;
+ int r = open_bucket_index(bucket, index_ctx, oid);
if (r < 0)
return r;
- vector<string> shard_ids_str;
- map<int, list<rgw_bi_log_entry>::iterator> vcurrents;
- map<int, list<rgw_bi_log_entry>::iterator> vends;
- if (truncated) {
- *truncated = false;
- }
- map<int, cls_rgw_bi_log_list_ret>::iterator miter = bi_log_lists.begin();
- for (; miter != bi_log_lists.end(); ++miter) {
- int shard_id = miter->first;
- vcurrents[shard_id] = miter->second.entries.begin();
- vends[shard_id] = miter->second.entries.end();
- if (truncated) {
- *truncated = (*truncated || miter->second.truncated);
- }
- }
-
- size_t total = 0;
- bool has_more = true;
- map<int, list<rgw_bi_log_entry>::iterator>::iterator viter;
- map<int, list<rgw_bi_log_entry>::iterator>::iterator eiter;
- while (total < max && has_more) {
- has_more = false;
-
- viter = vcurrents.begin();
- eiter = vends.begin();
-
- for (; total < max && viter != vcurrents.end(); ++viter, ++eiter) {
- assert (eiter != vends.end());
-
- int shard_id = viter->first;
- list<rgw_bi_log_entry>::iterator& liter = viter->second;
-
- if (liter == eiter->second){
- continue;
- }
- rgw_bi_log_entry& entry = *(liter);
- if (has_shards) {
- char buf[16];
- snprintf(buf, sizeof(buf), "%d", shard_id);
- string tmp_id;
- build_bucket_index_marker(buf, entry.id, &tmp_id);
- entry.id.swap(tmp_id);
- }
- marker_mgr.add(shard_id, entry.id);
- result.push_back(entry);
- total++;
- has_more = true;
- ++liter;
- }
- }
-
- if (truncated) {
- for (viter = vcurrents.begin(), eiter = vends.begin(); viter != vcurrents.end(); ++viter, ++eiter) {
- assert (eiter != vends.end());
- *truncated = (*truncated || (viter->second != eiter->second));
- }
- }
+ std::list<rgw_bi_log_entry> entries;
+ int ret = cls_rgw_bi_log_list(index_ctx, oid, marker, max - result.size(), entries, truncated);
+ if (ret < 0)
+ return ret;
- // Refresh marker, if there are multiple shards, the output will look like
- // '{shard_oid_1}#{shard_marker_1},{shard_oid_2}#{shard_marker_2}...',
- // if there is no sharding, the simply marker (without oid) is returned
- if (has_shards) {
- marker_mgr.to_string(&marker);
- } else {
- if (!result.empty()) {
- marker = result.rbegin()->id;
- }
+ std::list<rgw_bi_log_entry>::iterator iter;
+ for (iter = entries.begin(); iter != entries.end(); ++iter) {
+ result.push_back(*iter);
}
return 0;
}
-int RGWRados::trim_bi_log_entries(rgw_bucket& bucket, int shard_id, string& start_marker, string& end_marker)
+int RGWRados::trim_bi_log_entries(rgw_bucket& bucket, string& start_marker, string& end_marker)
{
librados::IoCtx index_ctx;
- map<int, string> bucket_objs;
- int r = open_bucket_index(bucket, index_ctx, bucket_objs, shard_id);
+ string oid;
+ int r = open_bucket_index(bucket, index_ctx, oid);
if (r < 0)
return r;
- BucketIndexShardsManager start_marker_mgr;
- r = start_marker_mgr.from_string(start_marker, shard_id);
- if (r < 0)
- return r;
- BucketIndexShardsManager end_marker_mgr;
- r = end_marker_mgr.from_string(end_marker, shard_id);
- if (r < 0)
- return r;
+ int ret = cls_rgw_bi_log_trim(index_ctx, oid, start_marker, end_marker);
+ if (ret < 0)
+ return ret;
- return CLSRGWIssueBILogTrim(index_ctx, start_marker_mgr, end_marker_mgr, bucket_objs,
- cct->_conf->rgw_bucket_index_max_aio)();
+ return 0;
}
int RGWRados::gc_operate(string& oid, librados::ObjectWriteOperation *op)
return r;
}
-int RGWRados::cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag,
+int RGWRados::cls_obj_prepare_op(rgw_bucket& bucket, RGWModifyOp op, string& tag,
string& name, string& locator)
{
+ librados::IoCtx index_ctx;
+ string oid;
+
+ int r = open_bucket_index(bucket, index_ctx, oid);
+ if (r < 0)
+ return r;
+
ObjectWriteOperation o;
cls_rgw_bucket_prepare_op(o, op, tag, name, locator, zone_public_config.log_data);
- int ret = bs.index_ctx.operate(bs.bucket_obj, &o);
- return ret;
+ r = index_ctx.operate(oid, &o);
+ return r;
}
-int RGWRados::cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag,
+int RGWRados::cls_obj_complete_op(rgw_bucket& bucket, RGWModifyOp op, string& tag,
int64_t pool, uint64_t epoch,
RGWObjEnt& ent, RGWObjCategory category,
list<string> *remove_objs)
{
+ librados::IoCtx index_ctx;
+ string oid;
+
+ int r = open_bucket_index(bucket, index_ctx, oid);
+ if (r < 0)
+ return r;
+
ObjectWriteOperation o;
rgw_bucket_dir_entry_meta dir_meta;
dir_meta.size = ent.size;
cls_rgw_bucket_complete_op(o, op, tag, ver, ent.name, dir_meta, remove_objs, zone_public_config.log_data);
AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
- int ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &o);
+ r = index_ctx.aio_operate(oid, c, &o);
c->release();
- return ret;
+ return r;
}
-int RGWRados::cls_obj_complete_add(BucketShard& bs, string& tag,
+int RGWRados::cls_obj_complete_add(rgw_bucket& bucket, string& tag,
int64_t pool, uint64_t epoch,
RGWObjEnt& ent, RGWObjCategory category,
- list<string> *remove_obj)
+ list<string> *remove_objs)
{
- return cls_obj_complete_op(bs, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_obj);
+ return cls_obj_complete_op(bucket, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_objs);
}
-int RGWRados::cls_obj_complete_del(BucketShard& bs, string& tag,
+int RGWRados::cls_obj_complete_del(rgw_bucket& bucket, string& tag,
int64_t pool, uint64_t epoch,
string& name)
{
RGWObjEnt ent;
ent.name = name;
- return cls_obj_complete_op(bs, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, NULL);
+ return cls_obj_complete_op(bucket, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, NULL);
}
-int RGWRados::cls_obj_complete_cancel(BucketShard& bs, string& tag, string& name)
+int RGWRados::cls_obj_complete_cancel(rgw_bucket& bucket, string& tag, string& name)
{
RGWObjEnt ent;
ent.name = name;
- return cls_obj_complete_op(bs, CLS_RGW_OP_ADD, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL);
+ return cls_obj_complete_op(bucket, CLS_RGW_OP_ADD, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL);
}
int RGWRados::cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeout)
{
librados::IoCtx index_ctx;
- map<int, string> bucket_objs;
- int r = open_bucket_index(bucket, index_ctx, bucket_objs);
+ string oid;
+
+ int r = open_bucket_index(bucket, index_ctx, oid);
if (r < 0)
return r;
- return CLSRGWIssueSetTagTimeout(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio, timeout)();
+ ObjectWriteOperation o;
+ cls_rgw_bucket_set_tag_timeout(o, timeout);
+
+ r = index_ctx.operate(oid, &o);
+
+ return r;
}
-int RGWRados::cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix,
- uint32_t num_entries, map<string, RGWObjEnt>& m, bool *is_truncated,
- string *last_entry, bool (*force_check_filter)(const string& name))
+int RGWRados::cls_bucket_list(rgw_bucket& bucket, string start, string prefix,
+ uint32_t num, map<string, RGWObjEnt>& m,
+ bool *is_truncated, string *last_entry,
+ bool (*force_check_filter)(const string& name))
{
- ldout(cct, 10) << "cls_bucket_list " << bucket << " start " << start << " num_entries " << num_entries << dendl;
+ ldout(cct, 10) << "cls_bucket_list " << bucket << " start " << start << " num " << num << dendl;
librados::IoCtx index_ctx;
- // key - oid (for different shards if there is any)
- // value - list result for the corresponding oid (shard), it is filled by the AIO callback
- map<int, string> oids;
- map<int, struct rgw_cls_list_ret> list_results;
- int r = open_bucket_index(bucket, index_ctx, oids);
+ string oid;
+ int r = open_bucket_index(bucket, index_ctx, oid);
if (r < 0)
return r;
- r = CLSRGWIssueBucketList(index_ctx, start, prefix, num_entries, oids, list_results, cct->_conf->rgw_bucket_index_max_aio)();
+ struct rgw_bucket_dir dir;
+ r = cls_rgw_list_op(index_ctx, oid, start, prefix, num, &dir, is_truncated);
if (r < 0)
return r;
- // Create a list of iterators that are used to iterate each shard
- vector<map<string, struct rgw_bucket_dir_entry>::iterator> vcurrents(list_results.size());
- vector<map<string, struct rgw_bucket_dir_entry>::iterator> vends(list_results.size());
- vector<string> vnames(list_results.size());
- map<int, struct rgw_cls_list_ret>::iterator iter = list_results.begin();
- *is_truncated = false;
- for (; iter != list_results.end(); ++iter) {
- vcurrents.push_back(iter->second.dir.m.begin());
- vends.push_back(iter->second.dir.m.end());
- vnames.push_back(oids[iter->first]);
- *is_truncated = (*is_truncated || iter->second.is_truncated);
- }
-
- // Create a map to track the next candidate entry from each shard, if the entry
- // from a specified shard is selected/erased, the next entry from that shard will
- // be inserted for next round selection
- map<string, size_t> candidates;
- for (size_t i = 0; i < vcurrents.size(); ++i) {
- if (vcurrents[i] != vends[i]) {
- candidates[vcurrents[i]->second.name] = i;
- }
- }
-
- map<string, bufferlist> updates;
- uint32_t count = 0;
- while (count < num_entries && !candidates.empty()) {
- // Select the next one
- int pos = candidates.begin()->second;
- struct rgw_bucket_dir_entry& dirent = vcurrents[pos]->second;
+ map<string, struct rgw_bucket_dir_entry>::iterator miter;
+ bufferlist updates;
+ for (miter = dir.m.begin(); miter != dir.m.end(); ++miter) {
+ RGWObjEnt e;
+ rgw_bucket_dir_entry& dirent = miter->second;
// fill it in with initial values; we may correct later
- RGWObjEnt e;
e.name = dirent.name;
e.size = dirent.meta.size;
e.mtime = dirent.meta.mtime;
e.content_type = dirent.meta.content_type;
e.tag = dirent.tag;
+ /* oh, that shouldn't happen! */
+ if (e.name.empty()) {
+ ldout(cct, 0) << "WARNING: got empty dirent name, skipping" << dendl;
+ continue;
+ }
+
bool force_check = force_check_filter && force_check_filter(dirent.name);
+
if (!dirent.exists || !dirent.pending_map.empty() || force_check) {
/* there are uncommitted ops. We need to check the current state,
* and if the tags are old we need to do cleanup as well. */
librados::IoCtx sub_ctx;
sub_ctx.dup(index_ctx);
- r = check_disk_state(sub_ctx, bucket, dirent, e, updates[vnames[pos]]);
- if (r < 0 && r != -ENOENT) {
+ r = check_disk_state(sub_ctx, bucket, dirent, e, updates);
+ if (r < 0) {
+ if (r == -ENOENT)
+ continue;
+ else
return r;
}
}
- if (r >= 0) {
- m[e.name] = e;
- ldout(cct, 10) << "RGWRados::cls_bucket_list: got " << e.name << dendl;
- ++count;
- }
-
- // Refresh the candidates map
- candidates.erase(candidates.begin());
- ++vcurrents[pos];
- if (vcurrents[pos] != vends[pos]) {
- candidates[vcurrents[pos]->second.name] = pos;
- }
+ m[e.name] = e;
+ ldout(cct, 10) << "RGWRados::cls_bucket_list: got " << e.name << dendl;
}
- // Suggest updates if there is any
- map<string, bufferlist>::iterator miter = updates.begin();
- for (; miter != updates.end(); ++miter) {
- if (miter->second.length()) {
- ObjectWriteOperation o;
- cls_rgw_suggest_changes(o, miter->second);
- // we don't care if we lose suggested updates, send them off blindly
- AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
- index_ctx.aio_operate(miter->first, c, &o);
- c->release();
- }
+ if (dir.m.size()) {
+ *last_entry = dir.m.rbegin()->first;
}
- // Check if all the returned entries are consumed or not
- for (size_t i = 0; i < vcurrents.size(); ++i) {
- if (vcurrents[i] != vends[i])
- *is_truncated = true;
+ if (updates.length()) {
+ ObjectWriteOperation o;
+ cls_rgw_suggest_changes(o, updates);
+ // we don't care if we lose suggested updates, send them off blindly
+ AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
+ r = index_ctx.aio_operate(oid, c, &o);
+ c->release();
}
- if (m.size())
- *last_entry = m.rbegin()->first;
-
- return 0;
+ return m.size();
}
int RGWRados::cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info)
return 0;
}
-int RGWRados::cls_bucket_head(rgw_bucket& bucket, map<string, struct rgw_bucket_dir_header>& headers, map<int, string> *bucket_instance_ids)
+int RGWRados::cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header)
{
librados::IoCtx index_ctx;
- map<int, string> oids;
- map<int, struct rgw_cls_list_ret> list_results;
- int r = open_bucket_index(bucket, index_ctx, oids, list_results, -1, bucket_instance_ids);
+ string oid;
+ int r = open_bucket_index(bucket, index_ctx, oid);
if (r < 0)
return r;
- r = CLSRGWIssueGetDirHeader(index_ctx, oids, list_results, cct->_conf->rgw_bucket_index_max_aio)();
+ r = cls_rgw_get_dir_header(index_ctx, oid, &header);
if (r < 0)
return r;
- map<int, struct rgw_cls_list_ret>::iterator iter = list_results.begin();
- for(; iter != list_results.end(); ++iter) {
- headers[oids[iter->first]] = iter->second.dir.header;
- }
return 0;
}
-int RGWRados::cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx, int *num_aio)
+int RGWRados::cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx)
{
librados::IoCtx index_ctx;
- map<int, string> bucket_objs;
- int r = open_bucket_index(bucket, index_ctx, bucket_objs);
+ string oid;
+ int r = open_bucket_index(bucket, index_ctx, oid);
if (r < 0)
return r;
- map<int, string>::iterator iter = bucket_objs.begin();
- for (; iter != bucket_objs.end(); ++iter) {
- r = cls_rgw_get_dir_header_async(index_ctx, iter->second, static_cast<RGWGetDirHeader_CB*>(ctx->get()));
- if (r < 0) {
- ctx->put();
- break;
- } else {
- (*num_aio)++;
- }
- }
- return r;
+ r = cls_rgw_get_dir_header_async(index_ctx, oid, ctx);
+ if (r < 0)
+ return r;
+
+ return 0;
}
int RGWRados::cls_user_get_header(const string& user_id, cls_user_header *header)
int RGWRados::cls_user_sync_bucket_stats(rgw_obj& user_obj, rgw_bucket& bucket)
{
- map<string, struct rgw_bucket_dir_header> headers;
- int r = cls_bucket_head(bucket, headers);
+ rgw_bucket_dir_header header;
+ int r = cls_bucket_head(bucket, header);
if (r < 0) {
ldout(cct, 20) << "cls_bucket_header() returned " << r << dendl;
return r;
bucket.convert(&entry.bucket);
- map<string, struct rgw_bucket_dir_header>::iterator hiter = headers.begin();
- for (; hiter != headers.end(); ++hiter) {
- map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = hiter->second.stats.begin();
- for (; iter != hiter->second.stats.end(); ++iter) {
- struct rgw_bucket_category_stats& header_stats = iter->second;
- entry.size += header_stats.total_size;
- entry.size_rounded += header_stats.total_size_rounded;
- entry.count += header_stats.num_entries;
- }
+ map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = header.stats.begin();
+ for (; iter != header.stats.end(); ++iter) {
+ struct rgw_bucket_category_stats& header_stats = iter->second;
+ entry.size += header_stats.total_size;
+ entry.size_rounded += header_stats.total_size_rounded;
+ entry.count += header_stats.num_entries;
}
list<cls_user_bucket_entry> entries;
return 0;
}
-void RGWRados::get_bucket_index_objects(const string& bucket_oid_base,
- uint32_t num_shards, map<int, string>& bucket_objects, int shard_id)
-{
- if (!num_shards) {
- bucket_objects[0] = bucket_oid_base;
- } else {
- char buf[bucket_oid_base.size() + 32];
- if (shard_id < 0) {
- for (uint32_t i = 0; i < num_shards; ++i) {
- snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), i);
- bucket_objects[i] = buf;
- }
- } else {
- if ((uint32_t)shard_id > num_shards) {
- return;
- }
- snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), shard_id);
- bucket_objects[shard_id] = buf;
- }
- }
-}
-
-void RGWRados::get_bucket_instance_ids(RGWBucketInfo& bucket_info, int shard_id, map<int, string> *result)
-{
- rgw_bucket& bucket = bucket_info.bucket;
- string plain_id = bucket.name + ":" + bucket.bucket_id;
- if (!bucket_info.num_shards) {
- (*result)[0] = plain_id;
- } else {
- char buf[16];
- if (shard_id < 0) {
- for (uint32_t i = 0; i < bucket_info.num_shards; ++i) {
- snprintf(buf, sizeof(buf), ":%d", i);
- (*result)[i] = plain_id + buf;
- }
- } else {
- if ((uint32_t)shard_id > bucket_info.num_shards) {
- return;
- }
- snprintf(buf, sizeof(buf), ":%d", shard_id);
- (*result)[shard_id] = plain_id + buf;
- }
- }
-}
-
-int RGWRados::get_bucket_index_object(const string& bucket_oid_base, const string& obj_key,
- uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj, int *shard_id)
-{
- int r = 0;
- switch (hash_type) {
- case RGWBucketInfo::MOD:
- if (!num_shards) {
- // By default with no sharding, we use the bucket oid as itself
- (*bucket_obj) = bucket_oid_base;
- if (shard_id) {
- *shard_id = -1;
- }
- } else {
- uint32_t sid = ceph_str_hash_linux(obj_key.c_str(), obj_key.size());
- uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
- sid = sid2 % MAX_BUCKET_INDEX_SHARDS_PRIME % num_shards;
- char buf[bucket_oid_base.size() + 32];
- snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), sid);
- (*bucket_obj) = buf;
- if (shard_id) {
- *shard_id = (int)sid;
- }
- }
- break;
- default:
- r = -ENOTSUP;
- }
- return r;
-}
-
int RGWRados::process_intent_log(rgw_bucket& bucket, string& oid,
time_t epoch, int flags, bool purge)
{
bool log_meta;
bool log_data;
-/**
- * Represents the number of shards for the bucket index object, a value of zero
- * indicates there is no sharding. By default (no sharding, the name of the object
- * is '.dir.{marker}', with sharding, the name is '.dir.{markder}.{sharding_id}',
- * sharding_id is zero-based value. It is not recommended to set a too large value
- * (e.g. thousand) as it increases the cost for bucket listing.
- */
- uint32_t bucket_index_max_shards;
-
- RGWZone() : log_meta(false), log_data(false), bucket_index_max_shards(0) {}
+ RGWZone() : log_meta(false), log_data(false) {}
void encode(bufferlist& bl) const {
- ENCODE_START(3, 1, bl);
+ ENCODE_START(2, 1, bl);
::encode(name, bl);
::encode(endpoints, bl);
::encode(log_meta, bl);
::encode(log_data, bl);
- ::encode(bucket_index_max_shards, bl);
ENCODE_FINISH(bl);
}
::decode(log_meta, bl);
::decode(log_data, bl);
}
- if (struct_v >= 3) {
- ::decode(bucket_index_max_shards, bl);
- }
DECODE_FINISH(bl);
}
void dump(Formatter *f) const;
class RGWGetBucketStats_CB : public RefCountedObject {
protected:
rgw_bucket bucket;
+ uint64_t bucket_ver;
+ uint64_t master_ver;
map<RGWObjCategory, RGWStorageStats> *stats;
+ string max_marker;
public:
RGWGetBucketStats_CB(rgw_bucket& _bucket) : bucket(_bucket), stats(NULL) {}
virtual ~RGWGetBucketStats_CB() {}
virtual void handle_response(int r) = 0;
- virtual void set_response(map<RGWObjCategory, RGWStorageStats> *_stats) {
+ virtual void set_response(uint64_t _bucket_ver, uint64_t _master_ver,
+ map<RGWObjCategory, RGWStorageStats> *_stats,
+ const string &_max_marker) {
+ bucket_ver = _bucket_ver;
+ master_ver = _master_ver;
stats = _stats;
+ max_marker = _max_marker;
}
};
int open_bucket_data_ctx(rgw_bucket& bucket, librados::IoCtx& io_ctx);
int open_bucket_data_extra_ctx(rgw_bucket& bucket, librados::IoCtx& io_ctx);
int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, string& bucket_oid);
- int open_bucket_index_base(rgw_bucket& bucket, librados::IoCtx& index_ctx,
- string& bucket_oid_base);
- int open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index_ctx,
- const string& obj_key, string *bucket_obj, int *shard_id);
- int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
- map<int, string>& bucket_objs, int shard_id = -1, map<int, string> *bucket_instance_ids = NULL);
- template<typename T>
- int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
- map<int, string>& oids, map<int, T>& bucket_objs,
- int shard_id = -1, map<int, string> *bucket_instance_ids = NULL);
- void build_bucket_index_marker(const string& shard_id_str, const string& shard_marker,
- string *marker);
-
- void get_bucket_instance_ids(RGWBucketInfo& bucket_info, int shard_id, map<int, string> *result);
struct GetObjState {
librados::IoCtx io_ctx;
Mutex bucket_id_lock;
- // This field represents the number of bucket index object shards
- uint32_t bucket_index_max_shards;
-
int get_obj_ioctx(const rgw_obj& obj, librados::IoCtx *ioctx);
int get_obj_ref(const rgw_obj& obj, rgw_rados_ref *ref, rgw_bucket *bucket, bool ref_system_obj = false);
uint64_t max_bucket_id;
gc(NULL), use_gc_thread(false), quota_threads(false),
num_watchers(0), watchers(NULL), watch_handles(NULL),
watch_initialized(false),
- bucket_id_lock("rados_bucket_id"),
- bucket_index_max_shards(0),
- max_bucket_id(0),
+ bucket_id_lock("rados_bucket_id"), max_bucket_id(0),
cct(NULL), rados(NULL),
pools_initialized(false),
quota_handler(NULL),
* create a bucket with name bucket and the given list of attrs
* returns 0 on success, -ERR# otherwise.
*/
- virtual int init_bucket_index(rgw_bucket& bucket, int num_shards);
+ virtual int init_bucket_index(rgw_bucket& bucket);
int select_bucket_placement(RGWUserInfo& user_info, const string& region_name, const std::string& rule,
const std::string& bucket_name, rgw_bucket& bucket, string *pselected_rule);
int select_legacy_bucket_placement(const string& bucket_name, rgw_bucket& bucket);
}
int decode_policy(bufferlist& bl, ACLOwner *owner);
- int get_bucket_stats(rgw_bucket& bucket, string *bucket_ver, string *master_ver,
- map<RGWObjCategory, RGWStorageStats>& stats, string *max_marker);
+ int get_bucket_stats(rgw_bucket& bucket, uint64_t *bucket_ver, uint64_t *master_ver, map<RGWObjCategory, RGWStorageStats>& stats,
+ string *max_marker);
int get_bucket_stats_async(rgw_bucket& bucket, RGWGetBucketStats_CB *cb);
int get_user_stats(const string& user, RGWStorageStats& stats);
int get_user_stats_async(const string& user, RGWGetUserStats_CB *cb);
virtual int put_linked_bucket_info(RGWBucketInfo& info, bool exclusive, time_t mtime, obj_version *pep_objv,
map<string, bufferlist> *pattrs, bool create_entry_point);
- struct BucketShard {
- RGWRados *store;
- rgw_bucket bucket;
- int shard_id;
- librados::IoCtx index_ctx;
- string bucket_obj;
-
- BucketShard(RGWRados *_store) : store(_store), shard_id(-1) {}
- int init(rgw_bucket& _bucket, rgw_obj& obj);
- };
-
int cls_rgw_init_index(librados::IoCtx& io_ctx, librados::ObjectWriteOperation& op, string& oid);
- int cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag, string& name, string& locator);
- int cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch,
+ int cls_obj_prepare_op(rgw_bucket& bucket, RGWModifyOp op, string& tag,
+ string& name, string& locator);
+ int cls_obj_complete_op(rgw_bucket& bucket, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch,
RGWObjEnt& ent, RGWObjCategory category, list<string> *remove_objs);
- int cls_obj_complete_add(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, list<string> *remove_objs);
- int cls_obj_complete_del(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, string& name);
- int cls_obj_complete_cancel(BucketShard& bs, string& tag, string& name);
+ int cls_obj_complete_add(rgw_bucket& bucket, string& tag, int64_t pool, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, list<string> *remove_objs);
+ int cls_obj_complete_del(rgw_bucket& bucket, string& tag, int64_t pool, uint64_t epoch, string& name);
+ int cls_obj_complete_cancel(rgw_bucket& bucket, string& tag, string& name);
int cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeout);
- int cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix, uint32_t hint_num,
- map<string, RGWObjEnt>& m, bool *is_truncated, string *last_entry,
- bool (*force_check_filter)(const string& name) = NULL);
- int cls_bucket_head(rgw_bucket& bucket, map<string, struct rgw_bucket_dir_header>& headers, map<int, string> *bucket_instance_ids = NULL);
- int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx, int *num_aio);
-
- int prepare_update_index(RGWObjState *state, BucketShard& bucket_shard,
+ int cls_bucket_list(rgw_bucket& bucket, string start, string prefix, uint32_t num,
+ map<string, RGWObjEnt>& m, bool *is_truncated,
+ string *last_entry, bool (*force_check_filter)(const string& name) = NULL);
+ int cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header);
+ int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx);
+ int prepare_update_index(RGWObjState *state, rgw_bucket& bucket,
RGWModifyOp op, rgw_obj& oid, string& tag);
- int complete_update_index(BucketShard& bucket_shard, rgw_obj& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size,
+ int complete_update_index(rgw_bucket& bucket, string& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size,
utime_t& ut, string& etag, string& content_type, bufferlist *acl_bl, RGWObjCategory category,
list<string> *remove_objs);
- int complete_update_index_del(BucketShard& bucket_shard, rgw_obj& oid, string& tag, int64_t pool, uint64_t epoch) {
- if (bucket_is_system(bucket_shard.bucket))
+ int complete_update_index_del(rgw_bucket& bucket, string& oid, string& tag, int64_t pool, uint64_t epoch) {
+ if (bucket_is_system(bucket))
return 0;
- return cls_obj_complete_del(bucket_shard, tag, pool, epoch, oid.object);
+ return cls_obj_complete_del(bucket, tag, pool, epoch, oid);
}
- int complete_update_index_cancel(BucketShard& bucket_shard, rgw_obj& oid, string& tag) {
- if (bucket_is_system(bucket_shard.bucket))
+ int complete_update_index_cancel(rgw_bucket& bucket, string& oid, string& tag) {
+ if (bucket_is_system(bucket))
return 0;
- return cls_obj_complete_cancel(bucket_shard, tag, oid.object);
+ return cls_obj_complete_cancel(bucket, tag, oid);
}
- int list_bi_log_entries(rgw_bucket& bucket, int shard_id, string& marker, uint32_t max, std::list<rgw_bi_log_entry>& result, bool *truncated);
- int trim_bi_log_entries(rgw_bucket& bucket, int shard_id, string& marker, string& end_marker);
+ int list_bi_log_entries(rgw_bucket& bucket, string& marker, uint32_t max, std::list<rgw_bi_log_entry>& result, bool *truncated);
+ int trim_bi_log_entries(rgw_bucket& bucket, string& marker, string& end_marker);
int cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info);
int cls_obj_usage_log_read(string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
void shard_name(const string& prefix, unsigned max_shards, const string& key, string& name);
void shard_name(const string& prefix, unsigned max_shards, const string& section, const string& key, string& name);
- void time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, const string& section, const string& key, bufferlist& bl);
+ void time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, string& section, string& key, bufferlist& bl);
int time_log_add(const string& oid, list<cls_log_entry>& entries);
int time_log_add(const string& oid, const utime_t& ut, const string& section, const string& key, bufferlist& bl);
int time_log_list(const string& oid, utime_t& start_time, utime_t& end_time,
}
private:
- /**
- * This is a helper method, it generates a list of bucket index objects with the given
- * bucket base oid and number of shards.
- *
- * bucket_oid_base [in] - base name of the bucket index object;
- * num_shards [in] - number of bucket index object shards.
- * bucket_objs [out] - filled by this method, a list of bucket index objects.
- */
- void get_bucket_index_objects(const string& bucket_oid_base, uint32_t num_shards,
- map<int, string>& bucket_objs, int shard_id = -1);
-
- /**
- * Get the bucket index object with the given base bucket index object and object key,
- * and the number of bucket index shards.
- *
- * bucket_oid_base [in] - bucket object base name.
- * obj_key [in] - object key.
- * num_shards [in] - number of bucket index shards.
- * hash_type [in] - type of hash to find the shard ID.
- * bucket_obj [out] - the bucket index object for the given object.
- *
- * Return 0 on success, a failure code otherwise.
- */
- int get_bucket_index_object(const string& bucket_oid_base, const string& obj_key,
- uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj, int *shard);
-
int process_intent_log(rgw_bucket& bucket, string& oid,
time_t epoch, int flags, bool purge);
/**
#include "rgw_replica_log.h"
#include "cls/replica_log/cls_replica_log_client.h"
-#include "cls/rgw/cls_rgw_client.h"
#include "rgw_rados.h"
-#define dout_subsys ceph_subsys_rgw
void RGWReplicaBounds::dump(Formatter *f) const
{
prefix = _store->ctx()->_conf->rgw_replica_log_obj_prefix;
prefix.append(".");
}
-
-string RGWReplicaBucketLogger::obj_name(const rgw_bucket& bucket, int shard_id)
-{
- string s = prefix + bucket.name;
-
- if (shard_id >= 0) {
- char buf[16];
- snprintf(buf, sizeof(buf), ".%d", shard_id);
- s += buf;
- }
- return s;
-}
-
-int RGWReplicaBucketLogger::update_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id,
- const string& marker, const utime_t& time,
- const list<RGWReplicaItemMarker> *entries)
-{
- if (shard_id >= 0 ||
- !BucketIndexShardsManager::is_shards_marker(marker)) {
- return RGWReplicaLogger::update_bound(obj_name(bucket, shard_id), pool,
- daemon_id, marker, time, entries);
- }
-
- BucketIndexShardsManager sm;
- int ret = sm.from_string(marker, shard_id);
- if (ret < 0) {
- ldout(cct, 0) << "ERROR: could not parse shards marker: " << marker << dendl;
- return ret;
- }
-
- map<int, string>& vals = sm.get();
-
- ret = 0;
-
- map<int, string>::iterator iter;
- for (iter = vals.begin(); iter != vals.end(); ++iter) {
- ldout(cct, 20) << "updating bound: bucket=" << bucket << " shard=" << iter->first << " marker=" << marker << dendl;
- int r = RGWReplicaLogger::update_bound(obj_name(bucket, iter->first), pool,
- daemon_id, iter->second, time, entries);
- if (r < 0) {
- ldout(cct, 0) << "failed to update bound: bucket=" << bucket << " shard=" << iter->first << " marker=" << marker << dendl;
- ret = r;
- }
- }
-
- return ret;
-}
class RGWReplicaBucketLogger : private RGWReplicaLogger {
string pool;
string prefix;
-
- string obj_name(const rgw_bucket& bucket, int shard_id);
public:
RGWReplicaBucketLogger(RGWRados *_store);
- int update_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id,
+ int update_bound(const rgw_bucket& bucket, const string& daemon_id,
const string& marker, const utime_t& time,
- const list<RGWReplicaItemMarker> *entries);
- int delete_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id) {
- return RGWReplicaLogger::delete_bound(obj_name(bucket, shard_id), pool,
+ const list<RGWReplicaItemMarker> *entries) {
+ return RGWReplicaLogger::update_bound(prefix+bucket.name, pool,
+ daemon_id, marker, time, entries);
+ }
+ int delete_bound(const rgw_bucket& bucket, const string& daemon_id) {
+ return RGWReplicaLogger::delete_bound(prefix+bucket.name, pool,
daemon_id);
}
- int get_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds) {
- return RGWReplicaLogger::get_bounds(obj_name(bucket, shard_id), pool,
+ int get_bounds(const rgw_bucket& bucket, RGWReplicaBounds& bounds) {
+ return RGWReplicaLogger::get_bounds(prefix+bucket.name, pool,
bounds);
}
};
return;
}
- int shard_id;
- http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
- if (http_ret < 0) {
- return;
- }
-
if (!bucket_instance.empty()) {
http_ret = store->get_bucket_instance_info(NULL, bucket_instance, bucket_info, NULL, NULL);
if (http_ret < 0) {
send_response();
do {
list<rgw_bi_log_entry> entries;
- int ret = store->list_bi_log_entries(bucket_info.bucket, shard_id,
+ int ret = store->list_bi_log_entries(bucket_info.bucket,
marker, max_entries - count,
entries, &truncated);
if (ret < 0) {
http_ret = -EINVAL;
return;
}
-
- int shard_id;
- http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
- if (http_ret < 0) {
- return;
- }
-
if (!bucket_instance.empty()) {
http_ret = store->get_bucket_instance_info(NULL, bucket_instance, bucket_info, NULL, NULL);
if (http_ret < 0) {
return;
}
}
- http_ret = store->trim_bi_log_entries(bucket_info.bucket, shard_id, start_marker, end_marker);
+ http_ret = store->trim_bi_log_entries(bucket_info.bucket, start_marker, end_marker);
if (http_ret < 0) {
dout(5) << "ERROR: trim_bi_log_entries() " << dendl;
}
};
class RGWOp_BILog_Info : public RGWRESTOp {
- string bucket_ver;
- string master_ver;
+ uint64_t bucket_ver;
+ uint64_t master_ver;
string max_marker;
public:
- RGWOp_BILog_Info() : bucket_ver(), master_ver() {}
+ RGWOp_BILog_Info() : bucket_ver(0), master_ver(0) {}
~RGWOp_BILog_Info() {}
int check_caps(RGWUserCaps& caps) {
return;
}
- int shard_id;
- http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
- if (http_ret < 0) {
- dout(5) << "failed to parse bucket instance" << dendl;
- return;
- }
-
rgw_bucket bucket;
if ((http_ret = bucket_instance_to_bucket(store, bucket_instance, bucket)) < 0)
return;
return;
}
- http_ret = rl.update_bound(bucket, shard_id, daemon_id, marker, ut, &markers);
+ http_ret = rl.update_bound(bucket, daemon_id, marker, ut, &markers);
}
void RGWOp_BILog_GetBounds::execute() {
return;
}
- int shard_id;
- http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
- if (http_ret < 0) {
- dout(5) << "failed to parse bucket instance" << dendl;
- return;
- }
-
rgw_bucket bucket;
if ((http_ret = bucket_instance_to_bucket(store, bucket_instance, bucket)) < 0)
return;
RGWReplicaBucketLogger rl(store);
- http_ret = rl.get_bounds(bucket, shard_id, bounds);
+ http_ret = rl.get_bounds(bucket, bounds);
}
void RGWOp_BILog_GetBounds::send_response() {
return;
}
- int shard_id;
- http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
- if (http_ret < 0) {
- dout(5) << "failed to parse bucket instance" << dendl;
- return;
- }
-
rgw_bucket bucket;
if ((http_ret = bucket_instance_to_bucket(store, bucket_instance, bucket)) < 0)
return;
RGWReplicaBucketLogger rl(store);
- http_ret = rl.delete_bound(bucket, shard_id, daemon_id);
+ http_ret = rl.delete_bound(bucket, daemon_id);
}
RGWOp *RGWHandler_ReplicaLog::op_get() {
int RGWGetObj_ObjStore_SWIFT::send_response_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
{
const char *content_type = NULL;
+ int orig_ret = ret;
map<string, string> response_attrs;
map<string, string>::iterator riter;
}
}
- set_req_state_err(s, (partial_content && !ret) ? STATUS_PARTIAL_CONTENT : ret);
+ if (partial_content && !ret)
+ ret = -STATUS_PARTIAL_CONTENT;
+
+ if (ret)
+ set_req_state_err(s, ret);
dump_errno(s);
for (riter = response_attrs.begin(); riter != response_attrs.end(); ++riter) {
sent_header = true;
send_data:
- if (get_data && !ret) {
+ if (get_data && !orig_ret) {
int r = s->cio->write(bl.c_str() + bl_ofs, bl_len);
if (r < 0)
return r;
if WITH_RADOSGW
ceph_test_cls_rgw_SOURCES = test/cls_rgw/test_cls_rgw.cc
ceph_test_cls_rgw_LDADD = \
- $(LIBRADOS) $(CRYPTO_LIBS) libcls_rgw_client.la \
- $(LIBCOMMON) $(UNITTEST_LDADD) $(CEPH_GLOBAL) $(RADOS_TEST_LDADD)
+ $(LIBRADOS) libcls_rgw_client.la \
+ $(LIBCOMMON) $(UNITTEST_LDADD) $(RADOS_TEST_LDADD)
ceph_test_cls_rgw_CXXFLAGS = $(UNITTEST_CXXFLAGS)
bin_DEBUGPROGRAMS += ceph_test_cls_rgw
endif # WITH_RADOSGW
#include "include/types.h"
#include "cls/rgw/cls_rgw_client.h"
-#include "cls/rgw/cls_rgw_ops.h"
#include "gtest/gtest.h"
#include "test/librados/test.h"
#include <errno.h>
#include <string>
#include <vector>
-#include <map>
using namespace librados;
void test_stats(librados::IoCtx& ioctx, string& oid, int category, uint64_t num_entries, uint64_t total_size)
{
- map<int, struct rgw_cls_list_ret> results;
- map<int, string> oids;
- oids[0] = oid;
- ASSERT_EQ(0, CLSRGWIssueGetDirHeader(ioctx, oids, results, 8)());
-
- uint64_t entries = 0;
- uint64_t size = 0;
- map<int, struct rgw_cls_list_ret>::iterator iter = results.begin();
- for (; iter != results.end(); ++iter) {
- entries += (iter->second).dir.header.stats[category].num_entries;
- size += (iter->second).dir.header.stats[category].total_size;
- }
- ASSERT_EQ(total_size, size);
- ASSERT_EQ(num_entries, entries);
+ rgw_bucket_dir_header header;
+ ASSERT_EQ(0, cls_rgw_get_dir_header(ioctx, oid, &header));
+
+ rgw_bucket_category_stats& stats = header.stats[category];
+ ASSERT_EQ(total_size, stats.total_size);
+ ASSERT_EQ(num_entries, stats.num_entries);
}
void index_prepare(OpMgr& mgr, librados::IoCtx& ioctx, string& oid, RGWModifyOp index_op, string& tag, string& obj, string& loc)
cls_rgw_encode_suggestion(suggest_op, dirent, updates);
}
- map<int, string> bucket_objs;
- bucket_objs[0] = bucket_oid;
- int r = CLSRGWIssueSetTagTimeout(ioctx, bucket_objs, 8 /* max aio */, 1)();
- ASSERT_EQ(0, r);
+ op = mgr.write_op();
+ cls_rgw_bucket_set_tag_timeout(*op, 1); // short tag timeout
+ ASSERT_EQ(0, ioctx.operate(bucket_oid, op));
sleep(1);