return manager->aio_operate(io_ctx, oid, &op);
}
-int CLSRGWIssueBucketIndexInit::issue_op()
+int CLSRGWIssueBucketIndexInit::issue_op(int shard_id, const string& oid)
{
- return issue_bucket_index_init_op(io_ctx, *iter, &manager);
+ return issue_bucket_index_init_op(io_ctx, oid, &manager);
}
void CLSRGWIssueBucketIndexInit::cleanup()
{
// Do best effort removal
- for (vector<string>::iterator citer = objs_container.begin(); citer != iter; ++citer) {
- io_ctx.remove(*citer);
+ for (map<int, string>::iterator citer = objs_container.begin(); citer != iter; ++citer) {
+ io_ctx.remove(citer->second);
}
}
-int CLSRGWIssueSetTagTimeout::issue_op()
+int CLSRGWIssueSetTagTimeout::issue_op(int shard_id, const string& oid)
{
- return issue_bucket_set_tag_timeout_op(io_ctx, *iter, tag_timeout, &manager);
+ return issue_bucket_set_tag_timeout_op(io_ctx, oid, tag_timeout, &manager);
}
void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag,
return manager->aio_operate(io_ctx, oid, &op);
}
-int CLSRGWIssueBucketList::issue_op()
+int CLSRGWIssueBucketList::issue_op(int shard_id, const string& oid)
{
- return issue_bucket_list_op(io_ctx, iter->first, start_obj, filter_prefix, num_entries, &manager, &iter->second);
+ return issue_bucket_list_op(io_ctx, oid, start_obj, filter_prefix, num_entries, &manager, &result[shard_id]);
}
-static bool issue_bi_log_list_op(librados::IoCtx& io_ctx,
- const string& oid, BucketIndexShardsManager& marker_mgr, uint32_t max, BucketIndexAioManager *manager,
+static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, const string& oid, int shard_id,
+ BucketIndexShardsManager& marker_mgr, uint32_t max, BucketIndexAioManager *manager,
struct cls_rgw_bi_log_list_ret *pdata) {
bufferlist in;
cls_rgw_bi_log_list_op call;
- call.marker = marker_mgr.get(oid, "");
+ call.marker = marker_mgr.get(shard_id, "");
call.max = max;
::encode(call, in);
return manager->aio_operate(io_ctx, oid, &op);
}
-int CLSRGWIssueBILogList::issue_op()
+int CLSRGWIssueBILogList::issue_op(int shard_id, const string& oid)
{
- return issue_bi_log_list_op(io_ctx, iter->first, marker_mgr, max, &manager, &iter->second);
+ return issue_bi_log_list_op(io_ctx, oid, shard_id, marker_mgr, max, &manager, &result[shard_id]);
}
-static bool issue_bi_log_trim(librados::IoCtx& io_ctx,
- string& oid, BucketIndexShardsManager& start_marker_mgr,
- BucketIndexShardsManager& end_marker_mgr, BucketIndexAioManager *manager) {
+static bool issue_bi_log_trim(librados::IoCtx& io_ctx, const string& oid, int shard_id,
+ BucketIndexShardsManager& start_marker_mgr,
+ BucketIndexShardsManager& end_marker_mgr, BucketIndexAioManager *manager) {
bufferlist in;
cls_rgw_bi_log_trim_op call;
- call.start_marker = start_marker_mgr.get(oid, "");
- call.end_marker = end_marker_mgr.get(oid, "");
+ call.start_marker = start_marker_mgr.get(shard_id, "");
+ call.end_marker = end_marker_mgr.get(shard_id, "");
::encode(call, in);
ObjectWriteOperation op;
op.exec("rgw", "bi_log_trim", in);
return manager->aio_operate(io_ctx, oid, &op);
}
-int CLSRGWIssueBILogTrim::issue_op()
+int CLSRGWIssueBILogTrim::issue_op(int shard_id, const string& oid)
{
- return issue_bi_log_trim(io_ctx, *iter, start_marker_mgr, end_marker_mgr, &manager);
+ return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager);
}
static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager,
return manager->aio_operate(io_ctx, oid, &op);
}
-int CLSRGWIssueBucketCheck::issue_op()
+int CLSRGWIssueBucketCheck::issue_op(int shard_id, const string& oid)
{
- return issue_bucket_check_index_op(io_ctx, iter->first, &manager, &iter->second);
+ return issue_bucket_check_index_op(io_ctx, oid, &manager, &result[shard_id]);
}
static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid,
return manager->aio_operate(io_ctx, oid, &op);
}
-int CLSRGWIssueBucketRebuild::issue_op()
+int CLSRGWIssueBucketRebuild::issue_op(int shard_id, const string& oid)
{
- return issue_bucket_rebuild_index_op(io_ctx, *iter, &manager);
+ return issue_bucket_rebuild_index_op(io_ctx, oid, &manager);
}
void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates)
o.exec("rgw", "dir_suggest_changes", updates);
}
-int CLSRGWIssueGetDirHeader::issue_op()
+int CLSRGWIssueGetDirHeader::issue_op(int shard_id, const string& oid)
{
- return issue_bucket_list_op(io_ctx, iter->first, "", "", 0, &manager, &iter->second);
+ return issue_bucket_list_op(io_ctx, oid, "", "", 0, &manager, &result[shard_id]);
}
class GetDirHeaderCompletion : public ObjectOperationCompletion {
class BucketIndexShardsManager {
private:
// Per shard setting manager, for example, marker.
- map<string, string> value_by_shards;
+ map<int, string> value_by_shards;
public:
const static string KEY_VALUE_SEPARATOR;
const static string SHARDS_SEPARATOR;
- void add(const string& shard, const string& value) {
+ void add(int shard, const string& value) {
value_by_shards[shard] = value;
}
- const string& get(const string& shard, const string& default_value) {
- map<string, string>::iterator iter = value_by_shards.find(shard);
+ const string& get(int shard, const string& default_value) {
+ map<int, string>::iterator iter = value_by_shards.find(shard);
return (iter == value_by_shards.end() ? default_value : iter->second);
}
void to_string(string *out) const {
if (out) {
- map<string, string>::const_iterator iter = value_by_shards.begin();
+ map<int, string>::const_iterator iter = value_by_shards.begin();
// No shards
if (value_by_shards.size() == 1) {
*out = iter->second;
// Not the first item, append a separator first
out->append(SHARDS_SEPARATOR);
}
- out->append(iter->first);
+ char buf[16];
+ snprintf(buf, sizeof(buf), "%d", iter->first);
+ out->append(buf);
out->append(KEY_VALUE_SEPARATOR);
out->append(iter->second);
}
}
}
- int from_string(const string& composed_marker, bool has_shards, const string& oid) {
+ int from_string(const string& composed_marker, bool has_shards) {
value_by_shards.clear();
if (!has_shards) {
- add(oid, composed_marker);
+ add(0, composed_marker);
} else {
list<string> shards;
get_str_list(composed_marker, SHARDS_SEPARATOR.c_str(), shards);
list<string>::const_iterator iter = shards.begin();
for (; iter != shards.end(); ++iter) {
size_t pos = iter->find(KEY_VALUE_SEPARATOR);
- if (pos == string::npos)
+ if (pos == string::npos) {
return -EINVAL;
- string name = iter->substr(0, pos);
- value_by_shards[name] = iter->substr(pos + 1, iter->length() - pos - 1);
+ }
+ string shard_str = iter->substr(0, pos);
+ string err;
+ int shard = (int)strict_strtol(shard_str.c_str(), 10, &err);
+ if (!err.empty()) {
+ return -EINVAL;
+ }
+ value_by_shards[shard] = iter->substr(pos + 1);
}
}
return 0;
/* bucket index */
void cls_rgw_bucket_init(librados::ObjectWriteOperation& o);
-template<class T>
class CLSRGWConcurrentIO {
protected:
librados::IoCtx& io_ctx;
- T& objs_container;
- typename T::iterator iter;
+ map<int, string>& objs_container;
+ map<int, string>::iterator iter;
uint32_t max_aio;
BucketIndexAioManager manager;
- virtual int issue_op() = 0;
+ virtual int issue_op(int shard_id, const string& oid) = 0;
virtual void cleanup() {}
virtual int valid_ret_code() { return 0; }
virtual void reset_container(vector<string>& objs) {}
public:
- CLSRGWConcurrentIO(librados::IoCtx& ioc, T& _objs_container,
+ CLSRGWConcurrentIO(librados::IoCtx& ioc, map<int, string>& _objs_container,
uint32_t _max_aio) : io_ctx(ioc), objs_container(_objs_container), max_aio(_max_aio) {}
virtual ~CLSRGWConcurrentIO() {}
int ret = 0;
iter = objs_container.begin();
for (; iter != objs_container.end() && max_aio-- > 0; ++iter) {
- ret = issue_op();
+ ret = issue_op(iter->first, iter->second);
if (ret < 0)
break;
}
while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r, &objs)) {
if (r >= 0 && ret >= 0) {
for(int i = 0; i < num_completions && iter != objs_container.end(); ++i, ++iter) {
- int issue_ret = issue_op();
+ int issue_ret = issue_op(iter->first, iter->second);
if(issue_ret < 0) {
ret = issue_ret;
break;
}
};
-class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO<vector<string> > {
+class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO {
protected:
- int issue_op();
+ int issue_op(int shard_id, const string& oid);
int valid_ret_code() { return -EEXIST; }
void cleanup();
public:
- CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc, vector<string>& _bucket_objs,
+ CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc, map<int, string>& _bucket_objs,
uint32_t _max_aio) :
- CLSRGWConcurrentIO<vector<string> >(ioc, _bucket_objs, _max_aio) {}
+ CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {}
};
-class CLSRGWIssueSetTagTimeout : public CLSRGWConcurrentIO<vector<string> > {
+class CLSRGWIssueSetTagTimeout : public CLSRGWConcurrentIO {
uint64_t tag_timeout;
protected:
- int issue_op();
+ int issue_op(int shard_id, const string& oid);
public:
- CLSRGWIssueSetTagTimeout(librados::IoCtx& ioc, vector<string>& _bucket_objs,
+ CLSRGWIssueSetTagTimeout(librados::IoCtx& ioc, map<int, string>& _bucket_objs,
uint32_t _max_aio, uint64_t _tag_timeout) :
- CLSRGWConcurrentIO<vector<string> >(ioc, _bucket_objs, _max_aio),
- tag_timeout(_tag_timeout) {}
+ CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio), tag_timeout(_tag_timeout) {}
};
void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag,
* Return 0 on success, a failure code otherwise.
*/
-class CLSRGWIssueBucketList : public CLSRGWConcurrentIO<map<string, rgw_cls_list_ret> > {
+class CLSRGWIssueBucketList : public CLSRGWConcurrentIO {
string start_obj;
string filter_prefix;
uint32_t num_entries;
+ map<int, rgw_cls_list_ret>& result;
protected:
- int issue_op();
+ int issue_op(int shard_id, const string& oid);
public:
CLSRGWIssueBucketList(librados::IoCtx& io_ctx, const string& _start_obj,
const string& _filter_prefix, uint32_t _num_entries,
- map<string, struct rgw_cls_list_ret>& list_results,
+ map<int, string>& oids,
+ map<int, struct rgw_cls_list_ret>& list_results,
uint32_t max_aio) :
- CLSRGWConcurrentIO<map<string, rgw_cls_list_ret> > (io_ctx, list_results, max_aio),
- start_obj(_start_obj), filter_prefix(_filter_prefix), num_entries(_num_entries) {}
+ CLSRGWConcurrentIO(io_ctx, oids, max_aio),
+ start_obj(_start_obj), filter_prefix(_filter_prefix), num_entries(_num_entries), result(list_results) {}
};
-class CLSRGWIssueBILogList : public CLSRGWConcurrentIO<map<string, cls_rgw_bi_log_list_ret> > {
+class CLSRGWIssueBILogList : public CLSRGWConcurrentIO {
+ map<int, struct cls_rgw_bi_log_list_ret>& result;
BucketIndexShardsManager& marker_mgr;
uint32_t max;
protected:
- int issue_op();
+ int issue_op(int shard_id, const string& oid);
public:
CLSRGWIssueBILogList(librados::IoCtx& io_ctx, BucketIndexShardsManager& _marker_mgr, uint32_t _max,
- map<string, struct cls_rgw_bi_log_list_ret>& bi_log_lists, uint32_t max_aio) :
- CLSRGWConcurrentIO<map<string, cls_rgw_bi_log_list_ret> >(io_ctx, bi_log_lists, max_aio),
+ map<int, string>& oids,
+ map<int, struct cls_rgw_bi_log_list_ret>& bi_log_lists, uint32_t max_aio) :
+ CLSRGWConcurrentIO(io_ctx, oids, max_aio), result(bi_log_lists),
marker_mgr(_marker_mgr), max(_max) {}
};
-class CLSRGWIssueBILogTrim : public CLSRGWConcurrentIO<vector<string> > {
+class CLSRGWIssueBILogTrim : public CLSRGWConcurrentIO {
BucketIndexShardsManager& start_marker_mgr;
BucketIndexShardsManager& end_marker_mgr;
protected:
- int issue_op();
+ int issue_op(int shard_id, const string& oid);
// Trim until -ENODATA is returned.
int valid_ret_code() { return -ENODATA; }
bool need_multiple_rounds() { return true; }
- void add_object(const string& oid) { objs_container.push_back(oid); }
- void reset_container(vector<string>& objs) {
+ void add_object(int shard, const string& oid) { objs_container[shard] = oid; }
+ void reset_container(map<int, string>& objs) {
objs_container.swap(objs);
iter = objs_container.begin();
objs.clear();
}
public:
CLSRGWIssueBILogTrim(librados::IoCtx& io_ctx, BucketIndexShardsManager& _start_marker_mgr,
- BucketIndexShardsManager& _end_marker_mgr, vector<string>& _bucket_objs, uint32_t max_aio) :
- CLSRGWConcurrentIO<vector<string> >(io_ctx, _bucket_objs, max_aio),
+ BucketIndexShardsManager& _end_marker_mgr, map<int, string>& _bucket_objs, uint32_t max_aio) :
+ CLSRGWConcurrentIO(io_ctx, _bucket_objs, max_aio),
start_marker_mgr(_start_marker_mgr), end_marker_mgr(_end_marker_mgr) {}
};
*
* Return 0 on success, a failure code otherwise.
*/
-class CLSRGWIssueBucketCheck : public CLSRGWConcurrentIO<map<string, struct rgw_cls_check_index_ret> > {
+class CLSRGWIssueBucketCheck : public CLSRGWConcurrentIO /*<map<string, struct rgw_cls_check_index_ret> >*/ {
+ map<int, struct rgw_cls_check_index_ret>& result;
protected:
- int issue_op();
+ int issue_op(int shard_id, const string& oid);
public:
- CLSRGWIssueBucketCheck(librados::IoCtx& ioc, map<string, struct rgw_cls_check_index_ret>& bucket_objs_ret,
+ CLSRGWIssueBucketCheck(librados::IoCtx& ioc, map<int, string>& oids, map<int, struct rgw_cls_check_index_ret>& bucket_objs_ret,
uint32_t _max_aio) :
- CLSRGWConcurrentIO<map<string, struct rgw_cls_check_index_ret> >(ioc, bucket_objs_ret, _max_aio) {}
+ CLSRGWConcurrentIO(ioc, oids, _max_aio), result(bucket_objs_ret) {}
};
-class CLSRGWIssueBucketRebuild : public CLSRGWConcurrentIO<vector<string> > {
+class CLSRGWIssueBucketRebuild : public CLSRGWConcurrentIO {
protected:
- int issue_op();
+ int issue_op(int shard_id, const string& oid);
public:
- CLSRGWIssueBucketRebuild(librados::IoCtx& io_ctx, vector<string>& bucket_objs,
- uint32_t max_aio) : CLSRGWConcurrentIO<vector<string> >(io_ctx, bucket_objs, max_aio) {}
+ CLSRGWIssueBucketRebuild(librados::IoCtx& io_ctx, map<int, string>& bucket_objs,
+ uint32_t max_aio) : CLSRGWConcurrentIO(io_ctx, bucket_objs, max_aio) {}
};
-class CLSRGWIssueGetDirHeader : public CLSRGWConcurrentIO<map<string, rgw_cls_list_ret> > {
+class CLSRGWIssueGetDirHeader : public CLSRGWConcurrentIO {
+ map<int, rgw_cls_list_ret>& result;
protected:
- int issue_op();
+ int issue_op(int shard_id, const string& oid);
public:
- CLSRGWIssueGetDirHeader(librados::IoCtx& io_ctx, map<string, rgw_cls_list_ret>& dir_headers,
+ CLSRGWIssueGetDirHeader(librados::IoCtx& io_ctx, map<int, string>& oids, map<int, rgw_cls_list_ret>& dir_headers,
uint32_t max_aio) :
- CLSRGWConcurrentIO<map<string, rgw_cls_list_ret> >(io_ctx, dir_headers, max_aio) {}
+ CLSRGWConcurrentIO(io_ctx, oids, max_aio), result(dir_headers) {}
};
int cls_rgw_get_dir_header(librados::IoCtx& io_ctx, map<string, rgw_cls_list_ret>& dir_headers,