using namespace librados;
+/**
+ * This class represents the bucket index object operation callback context.
+ */
+template <typename T>
+class ClsBucketIndexOpCtx : public ObjectOperationCompletion {
+private:
+ T *data;
+ int *ret_code;
+public:
+ ClsBucketIndexOpCtx(T* _data, int *_ret_code) : data(_data), ret_code(_ret_code) { assert(data); }
+ ~ClsBucketIndexOpCtx() {}
+ void handle_completion(int r, bufferlist& outbl) {
+ if (r >= 0) {
+ try {
+ bufferlist::iterator iter = outbl.begin();
+ ::decode((*data), iter);
+ } catch (buffer::error& err) {
+ r = -EIO;
+ }
+ }
+ if (ret_code) {
+ *ret_code = r;
+ }
+ }
+};
+
/*
* Callback implementation for AIO request.
*/
break;
}
- int num_completions, r;
+ int num_completions, r = 0;
while (manager.wait_for_completions(-EEXIST, &num_completions, &r)) {
if (r >= 0 && ret >= 0) {
for(int i = 0; i < num_completions && iter != bucket_objs.end(); ++i, ++iter) {
o.exec("rgw", "bucket_complete_op", in);
}
-
-int cls_rgw_list_op(IoCtx& io_ctx, string& oid, string& start_obj,
- string& filter_prefix, uint32_t num_entries,
- rgw_bucket_dir *dir, bool *is_truncated)
-{
- bufferlist in, out;
+static bool issue_bucket_list_op(librados::IoCtx& io_ctx,
+ const string& oid, const string& start_obj, const string& filter_prefix,
+ uint32_t num_entries, BucketIndexAioManager *manager,
+ struct rgw_cls_list_ret *pdata) {
+ bufferlist in;
struct rgw_cls_list_op call;
call.start_obj = start_obj;
call.filter_prefix = filter_prefix;
call.num_entries = num_entries;
::encode(call, in);
- int r = io_ctx.exec(oid, "rgw", "bucket_list", in, out);
- if (r < 0)
- return r;
- struct rgw_cls_list_ret ret;
- try {
- bufferlist::iterator iter = out.begin();
- ::decode(ret, iter);
- } catch (buffer::error& err) {
- return -EIO;
+ librados::ObjectReadOperation op;
+ op.exec("rgw", "bucket_list", in, new ClsBucketIndexOpCtx<struct rgw_cls_list_ret>(pdata, NULL));
+
+ BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
+ AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
+ int r = io_ctx.aio_operate(oid, c, &op, NULL);
+ if (r >= 0)
+ manager->add_pending(arg->id, c);
+ return r;
+}
+
+int cls_rgw_list_op(IoCtx& io_ctx, const string& start_obj,
+ const string& filter_prefix, uint32_t num_entries,
+ map<string, struct rgw_cls_list_ret>& list_results, uint32_t max_aio)
+{
+ int ret = 0;
+ BucketIndexAioManager manager;
+ map<string, struct rgw_cls_list_ret>::iterator iter = list_results.begin();
+ for (; iter != list_results.end() && max_aio-- > 0; ++iter) {
+ ret = issue_bucket_list_op(io_ctx, iter->first, start_obj, filter_prefix, num_entries, &manager, &iter->second);
+ if (ret < 0)
+ break;
}
- if (dir)
- *dir = ret.dir;
- if (is_truncated)
- *is_truncated = ret.is_truncated;
+ int num_completions, r = 0;
+ while (manager.wait_for_completions(0, &num_completions, &r)) {
+ if (r >= 0 && ret >= 0) {
+ for (int i = 0; i < num_completions && iter != list_results.end(); ++i, ++iter) {
+ int issue_ret = issue_bucket_list_op(io_ctx, iter->first, start_obj, filter_prefix, num_entries, &manager, &iter->second);
+ if (issue_ret < 0) {
+ ret = issue_ret;
+ break;
+ }
+ }
+ } else if (ret >= 0) {
+ ret = r;
+ }
+ }
- return r;
+ return ret;
}
int cls_rgw_bucket_check_index_op(IoCtx& io_ctx, string& oid,
rgw_bucket_entry_ver& ver, string& name, rgw_bucket_dir_entry_meta& dir_meta,
list<string> *remove_objs, bool log_op);
-int cls_rgw_list_op(librados::IoCtx& io_ctx, string& oid, string& start_obj,
- string& filter_prefix, uint32_t num_entries,
- rgw_bucket_dir *dir, bool *is_truncated);
+/**
+ * List the bucket with the starting object and filter prefix.
+ * NOTE: this method do listing requests for each bucket index shards identified by
+ * the keys of the *list_results* map, which means the map should be popludated
+ * by the caller to fill with each bucket index object id.
+ *
+ * io_ctx - IO context for rados.
+ * start_obj - marker for the listing.
+ * filter_prefix - filter prefix.
+ * num_entries - number of entries to request for each object (note the total
+ * amount of entries returned depends on the number of shardings).
+ * list_results - the list results keyed by bucket index object id.
+ * max_aio - the maximum number of AIO (for throttling).
+ *
+ * Return 0 on success, a failure code otherwise.
+*/
+int cls_rgw_list_op(librados::IoCtx& io_ctx, const string & start_obj,
+ const string& filter_prefix, uint32_t num_entries,
+ map<string, struct rgw_cls_list_ret>& list_results,
+ uint32_t max_aio);
int cls_rgw_bucket_check_index_op(librados::IoCtx& io_ctx, string& oid,
rgw_bucket_dir_header *existing_header,
while (is_truncated) {
map<string, RGWObjEnt> result;
- int r = store->cls_bucket_list(bucket, marker, prefix, 1000, result,
- &is_truncated, &marker,
- bucket_object_check_filter);
+ int r = store->cls_bucket_list(bucket, marker, prefix, 1000,
+ result, &is_truncated, &marker,
+ bucket_object_check_filter);
if (r == -ENOENT) {
break;
#include "rgw_metadata.h"
#include "rgw_bucket.h"
+#include "cls/rgw/cls_rgw_ops.h"
#include "cls/rgw/cls_rgw_types.h"
#include "cls/rgw/cls_rgw_client.h"
#include "cls/refcount/cls_refcount_client.h"
result.push_back(ent);
count++;
}
+
+ // Either the back-end telling us truncated, or we don't consume all
+ // items returned per the amount caller request
+ truncated = (truncated || eiter != ent_map.end());
}
done:
return 0;
}
+template<typename T>
+int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+ map<string, T>& bucket_objs)
+{
+ vector<string> oids;
+ int ret = open_bucket_index(bucket, index_ctx, oids);
+ if (ret < 0)
+ return ret;
+
+ vector<string>::const_iterator iter = oids.begin();
+ for (; iter != oids.end(); ++iter) {
+ bucket_objs[*iter] = T();
+ }
+ return 0;
+}
+
int RGWRados::open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index_ctx,
const string& obj_key, string *bucket_obj)
{
return r;
}
-int RGWRados::cls_bucket_list(rgw_bucket& bucket, string start, string prefix,
- uint32_t num, map<string, RGWObjEnt>& m,
- bool *is_truncated, string *last_entry,
- bool (*force_check_filter)(const string& name))
+int RGWRados::cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix,
+ uint32_t num_entries, map<string, RGWObjEnt>& m, bool *is_truncated,
+ string *last_entry, bool (*force_check_filter)(const string& name))
{
- ldout(cct, 10) << "cls_bucket_list " << bucket << " start " << start << " num " << num << dendl;
+ ldout(cct, 10) << "cls_bucket_list " << bucket << " start " << start << " num_entries " << num_entries << dendl;
librados::IoCtx index_ctx;
- string oid;
- int r = open_bucket_index(bucket, index_ctx, oid);
+ // key - oid (for different shards if there is any)
+ // value - list result for the corresponding oid (shard), it is filled by the AIO callback
+ map<string, struct rgw_cls_list_ret> list_results;
+ int r = open_bucket_index(bucket, index_ctx, list_results);
if (r < 0)
return r;
- struct rgw_bucket_dir dir;
- r = cls_rgw_list_op(index_ctx, oid, start, prefix, num, &dir, is_truncated);
+ r = cls_rgw_list_op(index_ctx, start, prefix, num_entries, list_results, cct->_conf->rgw_bucket_index_max_aio);
if (r < 0)
return r;
- map<string, struct rgw_bucket_dir_entry>::iterator miter;
- bufferlist updates;
- for (miter = dir.m.begin(); miter != dir.m.end(); ++miter) {
- RGWObjEnt e;
- rgw_bucket_dir_entry& dirent = miter->second;
+ // Create a list of iterators that are used to iterate each shard
+ vector<map<string, struct rgw_bucket_dir_entry>::iterator> vcurrents(list_results.size());
+ vector<map<string, struct rgw_bucket_dir_entry>::iterator> vends(list_results.size());
+ vector<string> vnames(list_results.size());
+ map<string, struct rgw_cls_list_ret>::iterator iter = list_results.begin();
+ *is_truncated = false;
+ for (; iter != list_results.end(); ++iter) {
+ vcurrents.push_back(iter->second.dir.m.begin());
+ vends.push_back(iter->second.dir.m.end());
+ vnames.push_back(iter->first);
+ *is_truncated = (*is_truncated || iter->second.is_truncated);
+ }
+
+ // Create a map to track the next candidate entry from each shard, if the entry
+ // from a specified shard is selected/erased, the next entry from that shard will
+ // be inserted for next round selection
+ map<string, size_t> candidates;
+ for (size_t i = 0; i < vcurrents.size(); ++i) {
+ if (vcurrents[i] != vends[i]) {
+ candidates[vcurrents[i]->second.name] = i;
+ }
+ }
+
+ map<string, bufferlist> updates;
+ uint32_t count = 0;
+ while (count < num_entries && !candidates.empty()) {
+ // Select the next one
+ int pos = candidates.begin()->second;
+ struct rgw_bucket_dir_entry& dirent = vcurrents[pos]->second;
// fill it in with initial values; we may correct later
+ RGWObjEnt e;
e.name = dirent.name;
e.size = dirent.meta.size;
e.mtime = dirent.meta.mtime;
e.content_type = dirent.meta.content_type;
e.tag = dirent.tag;
- /* oh, that shouldn't happen! */
- if (e.name.empty()) {
- ldout(cct, 0) << "WARNING: got empty dirent name, skipping" << dendl;
- continue;
- }
-
bool force_check = force_check_filter && force_check_filter(dirent.name);
-
if (!dirent.exists || !dirent.pending_map.empty() || force_check) {
/* there are uncommitted ops. We need to check the current state,
* and if the tags are old we need to do cleanup as well. */
librados::IoCtx sub_ctx;
sub_ctx.dup(index_ctx);
- r = check_disk_state(sub_ctx, bucket, dirent, e, updates);
+ r = check_disk_state(sub_ctx, bucket, dirent, e, updates[vnames[pos]]);
if (r < 0) {
if (r == -ENOENT)
continue;
}
m[e.name] = e;
ldout(cct, 10) << "RGWRados::cls_bucket_list: got " << e.name << dendl;
+
+ // Refresh the candidates map
+ candidates.erase(candidates.begin());
+ ++vcurrents[pos];
+ if (vcurrents[pos] != vends[pos]) {
+ candidates[vcurrents[pos]->second.name] = pos;
+ }
+ ++count;
}
- if (dir.m.size()) {
- *last_entry = dir.m.rbegin()->first;
+ // Suggest updates if there is any
+ map<string, bufferlist>::iterator miter = updates.begin();
+ for (; miter != updates.end(); ++miter) {
+ if (miter->second.length()) {
+ ObjectWriteOperation o;
+ cls_rgw_suggest_changes(o, miter->second);
+ // we don't care if we lose suggested updates, send them off blindly
+ AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
+ index_ctx.aio_operate(miter->first, c, &o);
+ c->release();
+ }
}
- if (updates.length()) {
- ObjectWriteOperation o;
- cls_rgw_suggest_changes(o, updates);
- // we don't care if we lose suggested updates, send them off blindly
- AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
- r = index_ctx.aio_operate(oid, c, &o);
- c->release();
+ // Check if all the returned entries are consumed or not
+ for (size_t i = 0; i < vcurrents.size(); ++i) {
+ if (vcurrents[i] != vends[i])
+ *is_truncated = true;
}
- return m.size();
+ if (m.size())
+ *last_entry = m.rbegin()->first;
+ return 0;
}
int RGWRados::cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info)
const string& obj_key, string *bucket_obj);
int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
vector<string>& bucket_objs);
+ template<typename T>
+ int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+ map<string, T>& bucket_objs);
struct GetObjState {
librados::IoCtx io_ctx;
bool sent_data;
int cls_obj_complete_del(rgw_bucket& bucket, string& tag, int64_t pool, uint64_t epoch, string& name);
int cls_obj_complete_cancel(rgw_bucket& bucket, string& tag, string& name);
int cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeout);
- int cls_bucket_list(rgw_bucket& bucket, string start, string prefix, uint32_t num,
- map<string, RGWObjEnt>& m, bool *is_truncated,
- string *last_entry, bool (*force_check_filter)(const string& name) = NULL);
+ int cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix, uint32_t hint_num,
+ map<string, RGWObjEnt>& m, bool *is_truncated, string *last_entry,
+ bool (*force_check_filter)(const string& name) = NULL);
int cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header);
int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx);
int prepare_update_index(RGWObjState *state, rgw_bucket& bucket,