return r;
}
-int CLSRGWConcurrentIO::operator()() {
- int ret = 0;
- vector<string>::const_iterator iter = bucket_objs.begin();
- BucketIndexAioManager manager;
- for (; iter != bucket_objs.end() && max_aio-- > 0; ++iter) {
- ret = issue_op(*iter);
- if (ret < 0)
- break;
- }
-
- int num_completions, r = 0;
- while (manager.wait_for_completions(0, &num_completions, &r)) {
- if (r >= 0 && ret >= 0) {
- for(int i = 0; i < num_completions && iter != bucket_objs.end(); ++i, ++iter) {
- int issue_ret = issue_op(*iter);
- if(issue_ret < 0) {
- ret = issue_ret;
- break;
- }
- }
- } else if (ret >= 0) {
- ret = r;
- }
- }
-
- if (ret < 0) {
- cleanup();
- }
- return ret;
-}
-
-int CLSRGWIssueBucketIndexInit::issue_op(const string& obj)
+int CLSRGWIssueBucketIndexInit::issue_op()
{
- issued_objs.push_back(obj);
- return issue_bucket_index_init_op(io_ctx, obj, &manager);
+ return issue_bucket_index_init_op(io_ctx, *iter, &manager);
}
void CLSRGWIssueBucketIndexInit::cleanup()
{
// Do best effort removal
- for (vector<string>::iterator iter = issued_objs.begin(); iter != issued_objs.end(); ++iter) {
- io_ctx.remove(*iter);
+ for (vector<string>::iterator citer = objs_container.begin(); citer != iter; ++citer) {
+ io_ctx.remove(*citer);
}
}
-int CLSRGWIssueSetTagTimeout::issue_op(const string& obj)
+int CLSRGWIssueSetTagTimeout::issue_op()
{
- return issue_bucket_set_tag_timeout_op(io_ctx, obj, tag_timeout, &manager);
+ return issue_bucket_set_tag_timeout_op(io_ctx, *iter, tag_timeout, &manager);
}
void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag,
/* bucket index */
void cls_rgw_bucket_init(librados::ObjectWriteOperation& o);
+template<class T>
class CLSRGWConcurrentIO {
protected:
librados::IoCtx& io_ctx;
- vector<string>& bucket_objs;
+ T& objs_container;
+ typename T::const_iterator iter;
uint32_t max_aio;
BucketIndexAioManager manager;
- virtual int issue_op(const string& obj) = 0;
+ virtual int issue_op() = 0;
virtual void cleanup() {}
virtual int valid_ret_code() { return 0; }
public:
- CLSRGWConcurrentIO(librados::IoCtx& ioc, vector<string>& _bucket_objs,
- uint32_t _max_aio) : io_ctx(ioc), bucket_objs(_bucket_objs), max_aio(_max_aio) {}
+ CLSRGWConcurrentIO(librados::IoCtx& ioc, vector<string>& _objs_container,
+ uint32_t _max_aio) : io_ctx(ioc), objs_container(_objs_container), max_aio(_max_aio) {}
virtual ~CLSRGWConcurrentIO() {}
- int operator()();
+ int operator()() {
+ int ret = 0;
+ iter = objs_container.begin();
+ for (; iter != objs_container.end() && max_aio-- > 0; ++iter) {
+ ret = issue_op();
+ if (ret < 0)
+ break;
+ }
+
+ int num_completions, r = 0;
+ while (manager.wait_for_completions(0, &num_completions, &r)) {
+ if (r >= 0 && ret >= 0) {
+ for(int i = 0; i < num_completions && iter != objs_container.end(); ++i, ++iter) {
+ int issue_ret = issue_op();
+ if(issue_ret < 0) {
+ ret = issue_ret;
+ break;
+ }
+ }
+ } else if (ret >= 0) {
+ ret = r;
+ }
+ }
+
+ if (ret < 0) {
+ cleanup();
+ }
+ return ret;
+ }
};
-class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO {
- vector<string> issued_objs;
+class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO<vector<string> > {
protected:
- int issue_op(const string& obj);
+ int issue_op();
int valid_ret_code() { return -EEXIST; }
void cleanup();
public:
uint32_t _max_aio) : CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {}
};
-class CLSRGWIssueSetTagTimeout : public CLSRGWConcurrentIO {
+class CLSRGWIssueSetTagTimeout : public CLSRGWConcurrentIO<vector<string> > {
uint64_t tag_timeout;
protected:
- int issue_op(const string& obj);
+ int issue_op();
public:
CLSRGWIssueSetTagTimeout(librados::IoCtx& ioc, vector<string>& _bucket_objs,
uint32_t _max_aio, uint64_t _tag_timeout) : CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio),