using namespace librados;
+const string BucketIndexShardsManager::KEY_VALUE_SEPARATOR = "#";
+const string BucketIndexShardsManager::SHARDS_SEPARATOR = ",";
+
/**
* This class represents the bucket index object operation callback context.
*/
}
};
-/*
- * Callback implementation for AIO request.
- */
-static void bucket_index_op_completion_cb(void* cb, void* arg) {
- BucketIndexAioArg* cb_arg = (BucketIndexAioArg*) arg;
- cb_arg->manager->do_completion(cb_arg->id);
- cb_arg->put();
-}
-
void BucketIndexAioManager::do_completion(int id) {
Mutex::Locker l(lock);
librados::ObjectWriteOperation op;
op.create(true);
op.exec("rgw", "bucket_init_index", in);
- BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
- AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
- int r = io_ctx.aio_operate(oid, c, &op);
- if (r >= 0)
- manager->add_pending(arg->id, c);
- return r;
+ return manager->aio_operate(io_ctx, oid, &op);
}
static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
::encode(call, in);
ObjectWriteOperation op;
op.exec("rgw", "bucket_set_tag_timeout", in);
- BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
- AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
- int r = io_ctx.aio_operate(oid, c, &op);
- if (r >= 0) {
- manager->add_pending(arg->id, c);
- }
- return r;
+ return manager->aio_operate(io_ctx, oid, &op);
}
int CLSRGWIssueBucketIndexInit::issue_op()
librados::ObjectReadOperation op;
op.exec("rgw", "bucket_list", in, new ClsBucketIndexOpCtx<struct rgw_cls_list_ret>(pdata, NULL));
-
- BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
- AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
- int r = io_ctx.aio_operate(oid, c, &op, NULL);
- if (r >= 0)
- manager->add_pending(arg->id, c);
- return r;
+ return manager->aio_operate(io_ctx, oid, &op);
}
int CLSRGWIssueBucketList::issue_op()
return issue_bucket_list_op(io_ctx, iter->first, start_obj, filter_prefix, num_entries, &manager, &iter->second);
}
+static bool issue_bi_log_list_op(librados::IoCtx& io_ctx,
+ const string& oid, BucketIndexShardsManager& marker_mgr, uint32_t max, BucketIndexAioManager *manager,
+ struct cls_rgw_bi_log_list_ret *pdata) {
+ bufferlist in;
+ cls_rgw_bi_log_list_op call;
+ call.marker = marker_mgr.get(oid, "");
+ call.max = max;
+ ::encode(call, in);
+
+ librados::ObjectReadOperation op;
+ op.exec("rgw", "bi_log_list", in, new ClsBucketIndexOpCtx<struct cls_rgw_bi_log_list_ret>(pdata, NULL));
+ return manager->aio_operate(io_ctx, oid, &op);
+}
+
+int CLSRGWIssueBILogList::issue_op()
+{
+ return issue_bi_log_list_op(io_ctx, iter->first, marker_mgr, max, &manager, &iter->second);
+}
+
static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager,
struct rgw_cls_check_index_ret *pdata) {
bufferlist in;
librados::ObjectReadOperation op;
op.exec("rgw", "bucket_check_index", in, new ClsBucketIndexOpCtx<struct rgw_cls_check_index_ret>(
pdata, NULL));
- BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
- AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
- int r = io_ctx.aio_operate(oid, c, &op, NULL);
- if (r >= 0) {
- manager->add_pending(arg->id, c);
- }
- return r;
+ return manager->aio_operate(io_ctx, oid, &op);
}
int CLSRGWIssueBucketCheck::issue_op()
bufferlist in;
librados::ObjectWriteOperation op;
op.exec("rgw", "bucket_rebuild_index", in);
- BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
- AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
- int r = io_ctx.aio_operate(oid, c, &op);
- if (r >= 0) {
- manager->add_pending(arg->id, c);
- }
- return r;
+ return manager->aio_operate(io_ctx, oid, &op);
}
int CLSRGWIssueBucketRebuild::issue_op()
return 0;
}
-int cls_rgw_bi_log_list(IoCtx& io_ctx, string& oid, string& marker, uint32_t max,
- list<rgw_bi_log_entry>& entries, bool *truncated)
-{
- bufferlist in, out;
- cls_rgw_bi_log_list_op call;
- call.marker = marker;
- call.max = max;
- ::encode(call, in);
- int r = io_ctx.exec(oid, "rgw", "bi_log_list", in, out);
- if (r < 0)
- return r;
-
- cls_rgw_bi_log_list_ret ret;
- try {
- bufferlist::iterator iter = out.begin();
- ::decode(ret, iter);
- } catch (buffer::error& err) {
- return -EIO;
- }
-
- entries = ret.entries;
-
- if (truncated)
- *truncated = ret.truncated;
-
- return r;
-}
-
int cls_rgw_bi_log_trim(IoCtx& io_ctx, string& oid, string& start_marker, string& end_marker)
{
do {
#define CEPH_CLS_RGW_CLIENT_H
#include "include/types.h"
+#include "include/str_list.h"
#include "include/rados/librados.hpp"
#include "cls_rgw_types.h"
#include "cls_rgw_ops.h"
#include "common/RefCountedObj.h"
+// Forward declaration
+class BucketIndexAioManager;
+
+/*
+ * Bucket index AIO request argument, this is used to pass a argument
+ * to callback.
+ */
+struct BucketIndexAioArg : public RefCountedObject {
+ BucketIndexAioArg(int _id, BucketIndexAioManager* _manager) :
+ id(_id), manager(_manager) {}
+ int id;
+ BucketIndexAioManager* manager;
+};
+
/*
* This class manages AIO completions. This class is not completely thread-safe,
* methods like *get_next* is not thread-safe and is expected to be called from
int next;
Mutex lock;
Cond cond;
-public:
/*
- * Create a new instance.
+ * Callback implementation for AIO request.
*/
- BucketIndexAioManager() : pendings(), completions(), next(0),
- lock("BucketIndexAioManager::lock"), cond() {}
+ static void bucket_index_op_completion_cb(void* cb, void* arg) {
+ BucketIndexAioArg* cb_arg = (BucketIndexAioArg*) arg;
+ cb_arg->manager->do_completion(cb_arg->id);
+ cb_arg->put();
+ }
+
+ /*
+ * Get next request ID. This method is not thread-safe.
+ *
+ * Return next request ID.
+ */
+ int get_next() { return next++; }
/*
* Add a new pending AIO completion instance.
Mutex::Locker l(lock);
pendings[id] = completion;
}
-
+public:
/*
- * Do completion for the given AIO request.
+ * Create a new instance.
*/
- void do_completion(int id);
+ BucketIndexAioManager() : pendings(), completions(), next(0),
+ lock("BucketIndexAioManager::lock"), cond() {}
/*
- * Get next request ID. This method is not thread-safe.
- *
- * Return next request ID.
+ * Do completion for the given AIO request.
*/
- int get_next() { return next++; }
+ void do_completion(int id);
/*
* Wait for AIO completions.
* Return false if there is no pending AIO, true otherwise.
*/
bool wait_for_completions(int valid_ret_code, int *num_completions, int *ret_code);
-};
-/*
- * Bucket index AIO request argument, this is used to pass a argument
- * to callback.
- */
-struct BucketIndexAioArg : public RefCountedObject {
- BucketIndexAioArg(int _id, BucketIndexAioManager* _manager) :
- id(_id), manager(_manager) {}
- int id;
- BucketIndexAioManager* manager;
+ /**
+ * Do aio read operation.
+ */
+ bool aio_operate(librados::IoCtx& io_ctx, const string& oid, librados::ObjectReadOperation *op) {
+ BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this);
+ librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
+ int r = io_ctx.aio_operate(oid, c, (librados::ObjectReadOperation*)op, NULL);
+ if (r >= 0) {
+ add_pending(arg->id, c);
+ }
+ return r;
+ }
+
+ /**
+ * Do aio write operation.
+ */
+ bool aio_operate(librados::IoCtx& io_ctx, const string& oid, librados::ObjectWriteOperation *op) {
+ BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this);
+ librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
+ int r = io_ctx.aio_operate(oid, c, (librados::ObjectWriteOperation*)op);
+ if (r >= 0) {
+ add_pending(arg->id, c);
+ }
+ return r;
+ }
};
class RGWGetDirHeader_CB : public RefCountedObject {
private:
// Per shard setting manager, for example, marker.
map<string, string> value_by_shards;
- const static char KEY_VALUE_SEPARATOR = '#';
- const static char SHARDS_SEPARATOR = ',';
public:
- void add_item(const string& shard, const string& value) {
+ const static string KEY_VALUE_SEPARATOR;
+ const static string SHARDS_SEPARATOR;
+
+ void add(const string& shard, const string& value) {
value_by_shards[shard] = value;
}
+
+ const string& get(const string& shard, const string& default_value) {
+ map<string, string>::iterator iter = value_by_shards.find(shard);
+ return (iter == value_by_shards.end() ? default_value : iter->second);
+ }
+
+ bool empty() {
+ return value_by_shards.empty();
+ }
+
void to_string(string *out) const {
if (out) {
map<string, string>::const_iterator iter = value_by_shards.begin();
for (; iter != value_by_shards.end(); ++iter) {
if (out->length()) {
// Not the first item, append a separator first
- out->append(1, SHARDS_SEPARATOR);
+ out->append(SHARDS_SEPARATOR);
}
out->append(iter->first);
- out->append(1, KEY_VALUE_SEPARATOR);
+ out->append(KEY_VALUE_SEPARATOR);
out->append(iter->second);
}
}
}
}
+
+ int from_string(const string& composed_marker, bool has_shards, const string& oid) {
+ value_by_shards.clear();
+ if (!has_shards) {
+ add(oid, composed_marker);
+ } else {
+ list<string> shards;
+ get_str_list(composed_marker, SHARDS_SEPARATOR.c_str(), shards);
+ list<string>::const_iterator iter = shards.begin();
+ for (; iter != shards.end(); ++iter) {
+ size_t pos = iter->find(KEY_VALUE_SEPARATOR);
+ if (pos == string::npos)
+ return -EINVAL;
+ string name = iter->substr(0, pos);
+ value_by_shards[name] = iter->substr(pos + 1, iter->length() - pos - 1);
+ }
+ }
+ return 0;
+ }
};
/* bucket index */
start_obj(_start_obj), filter_prefix(_filter_prefix), num_entries(_num_entries) {}
};
+class CLSRGWIssueBILogList : public CLSRGWConcurrentIO<map<string, cls_rgw_bi_log_list_ret> > {
+ BucketIndexShardsManager& marker_mgr;
+ uint32_t max;
+protected:
+ int issue_op();
+public:
+ CLSRGWIssueBILogList(librados::IoCtx& io_ctx, BucketIndexShardsManager& _marker_mgr, uint32_t _max,
+ map<string, struct cls_rgw_bi_log_list_ret>& bi_log_lists, uint32_t max_aio) :
+ CLSRGWConcurrentIO<map<string, cls_rgw_bi_log_list_ret> >(io_ctx, bi_log_lists, max_aio),
+ marker_mgr(_marker_mgr), max(_max) {}
+};
+
/**
* Check the bucket index.
*
void cls_rgw_suggest_changes(librados::ObjectWriteOperation& o, bufferlist& updates);
-/* bucket index log */
-
-int cls_rgw_bi_log_list(librados::IoCtx& io_ctx, string& oid, string& marker, uint32_t max,
- list<rgw_bi_log_entry>& entries, bool *truncated);
int cls_rgw_bi_log_trim(librados::IoCtx& io_ctx, string& oid, string& start_marker, string& end_marker);
/* usage logging */
return 0;
}
+void RGWRados::build_bucket_index_marker(const string& shard_name, const string& shard_marker,
+ string *marker) {
+ if (marker) {
+ *marker = shard_name;
+ marker->append(BucketIndexShardsManager::KEY_VALUE_SEPARATOR);
+ marker->append(shard_marker);
+ }
+}
+
int RGWRados::open_bucket_index_ctx(rgw_bucket& bucket, librados::IoCtx& index_ctx)
{
int r = open_bucket_pool_ctx(bucket.name, bucket.index_pool, index_ctx);
for(; iter != headers.end(); ++iter) {
accumulate_raw_stats(iter->second, stats);
snprintf(buf, sizeof(buf), "%lu", iter->second.ver);
- ver_mgr.add_item(iter->first, string(buf));
+ ver_mgr.add(iter->first, string(buf));
snprintf(buf, sizeof(buf), "%lu", iter->second.master_ver);
- master_ver_mgr.add_item(iter->first, string(buf));
- marker_mgr.add_item(iter->first, iter->second.max_marker);
+ master_ver_mgr.add(iter->first, string(buf));
+ marker_mgr.add(iter->first, iter->second.max_marker);
}
ver_mgr.to_string(bucket_ver);
master_ver_mgr.to_string(master_ver);
int RGWRados::list_bi_log_entries(rgw_bucket& bucket, string& marker, uint32_t max,
std::list<rgw_bi_log_entry>& result, bool *truncated)
{
+ ldout(cct, 20) << __func__ << bucket << " marker " << marker << " max " << max << dendl;
result.clear();
librados::IoCtx index_ctx;
- string oid;
- int r = open_bucket_index(bucket, index_ctx, oid);
+ map<string, cls_rgw_bi_log_list_ret> bi_log_lists;
+ int r = open_bucket_index(bucket, index_ctx, bi_log_lists);
if (r < 0)
return r;
- std::list<rgw_bi_log_entry> entries;
- int ret = cls_rgw_bi_log_list(index_ctx, oid, marker, max - result.size(), entries, truncated);
- if (ret < 0)
- return ret;
+ BucketIndexShardsManager marker_mgr;
+ bool has_shards = (bi_log_lists.size() > 1);
+ // If there are multiple shards for the bucket index object, the marker
+ // should have the pattern '{shard_oid_1}#{shard_marker_1},{shard_oid_2}#
+ // {shard_marker_2}...', if there is no sharding, the bi_log_list should
+ // only contain one record, and the key is the bucket index object id.
+ r = marker_mgr.from_string(marker, has_shards, bi_log_lists.begin()->first);
+ if (r < 0)
+ return r;
+
+ r = CLSRGWIssueBILogList(index_ctx, marker_mgr, max, bi_log_lists, cct->_conf->rgw_bucket_index_max_aio)();
+ if (r < 0)
+ return r;
+
+ vector<list<rgw_bi_log_entry>::iterator> vcurrents;
+ vector<list<rgw_bi_log_entry>::iterator> vends;
+ vector<string> vnames;
+ if (truncated) {
+ *truncated = false;
+ }
+ map<string, cls_rgw_bi_log_list_ret>::iterator miter = bi_log_lists.begin();
+ for (; miter != bi_log_lists.end(); ++miter) {
+ vnames.push_back(miter->first);
+ vcurrents.push_back(miter->second.entries.begin());
+ vends.push_back(miter->second.entries.end());
+ if (truncated) {
+ *truncated = (*truncated || miter->second.truncated);
+ }
+ }
+
+ bool has_more = true;
+ while (result.size() < max && has_more) {
+ has_more = false;
+ for (size_t i = 0;
+ result.size() < max && i < vcurrents.size() && vcurrents[i] != vends[i];
+ ++vcurrents[i], ++i) {
+ if (vcurrents[i] != vends[i]) {
+ rgw_bi_log_entry& entry = *(vcurrents[i]);
+ if (has_shards) {
+ // Put the shard name as part of the ID, so that caller can easy find out
+ // the next marker
+ string tmp_id;
+ build_bucket_index_marker(vnames[i], entry.id, &tmp_id);
+ entry.id.swap(tmp_id);
+ }
+ marker_mgr.add(vnames[i], entry.id);
+ result.push_back(entry);
+ has_more = true;
+ }
+ }
+ }
- std::list<rgw_bi_log_entry>::iterator iter;
- for (iter = entries.begin(); iter != entries.end(); ++iter) {
- result.push_back(*iter);
+ for (size_t i = 0; i < vcurrents.size(); ++i) {
+ if (truncated) {
+ *truncated = (*truncated || (vcurrents[i] != vends[i]));
+ }
+ }
+
+ // Refresh marker, if there are multiple shards, the output will look like
+ // '{shard_oid_1}#{shard_marker_1},{shard_oid_2}#{shard_marker_2}...',
+ // if there is no sharding, the simply marker (without oid) is returned
+ if (has_shards) {
+ marker_mgr.to_string(&marker);
+ } else {
+ marker = result.rbegin()->id;
}
return 0;
template<typename T>
int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
map<string, T>& bucket_objs);
+ void build_bucket_index_marker(const string& shard_name, const string& shard_marker,
+ string *marker);
struct GetObjState {
librados::IoCtx io_ctx;