return r;
}
-int cls_rgw_bucket_index_init_op(librados::IoCtx& io_ctx,
- const vector<string>& bucket_objs, uint32_t max_aio)
-{
- int ret = 0;
- vector<string>::const_iterator iter = bucket_objs.begin();
- BucketIndexAioManager manager;
- // Issue *max_aio* requests, all subsequent requests are issued upon
- // pending request finishing
- for (; iter != bucket_objs.end() && max_aio-- > 0; ++iter) {
- ret = issue_bucket_index_init_op(io_ctx, *iter, &manager);
- if (ret < 0)
- break;
- }
-
- int num_completions, r = 0;
- while (manager.wait_for_completions(-EEXIST, &num_completions, &r)) {
- if (r >= 0 && ret >= 0) {
- for(int i = 0; i < num_completions && iter != bucket_objs.end(); ++i, ++iter) {
- int issue_ret = issue_bucket_index_init_op(io_ctx, *iter, &manager);
- if(issue_ret < 0) {
- ret = issue_ret;
- break;
- }
- }
- } else if (ret >= 0) {
- ret = r;
- }
- }
-
- if (ret < 0) {
- // Do best effort removal
- vector<string>::const_iterator citer = bucket_objs.begin();
- for(; citer != iter; ++citer) {
- io_ctx.remove(*citer);
- }
- }
-
- return ret;
-}
-
static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
const string& oid, uint64_t timeout, BucketIndexAioManager *manager) {
bufferlist in;
return r;
}
-int cls_rgw_bucket_set_tag_timeout(librados::IoCtx& io_ctx, const vector<string>& bucket_objs,
- uint64_t tag_timeout, uint32_t max_aio)
-{
+int CLSRGWConcurrentIO::operator()() {
int ret = 0;
vector<string>::const_iterator iter = bucket_objs.begin();
BucketIndexAioManager manager;
for (; iter != bucket_objs.end() && max_aio-- > 0; ++iter) {
- ret = issue_bucket_set_tag_timeout_op(io_ctx, *iter, tag_timeout, &manager);
+ ret = issue_op(*iter);
if (ret < 0)
break;
}
while (manager.wait_for_completions(0, &num_completions, &r)) {
if (r >= 0 && ret >= 0) {
for(int i = 0; i < num_completions && iter != bucket_objs.end(); ++i, ++iter) {
- int issue_ret = issue_bucket_set_tag_timeout_op(io_ctx, *iter, tag_timeout, &manager);
+ int issue_ret = issue_op(*iter);
if(issue_ret < 0) {
ret = issue_ret;
break;
ret = r;
}
}
+
+ if (ret < 0) {
+ cleanup();
+ }
return ret;
}
+int CLSRGWIssueBucketIndexInit::issue_op(const string& obj)
+{
+ issued_objs.push_back(obj);
+ return issue_bucket_index_init_op(io_ctx, obj, &manager);
+}
+
+void CLSRGWIssueBucketIndexInit::cleanup()
+{
+ // Do best effort removal
+ for (vector<string>::iterator iter = issued_objs.begin(); iter != issued_objs.end(); ++iter) {
+ io_ctx.remove(*iter);
+ }
+}
+
+int CLSRGWIssueSetTagTimeout::issue_op(const string& obj)
+{
+ return issue_bucket_set_tag_timeout_op(io_ctx, obj, tag_timeout, &manager);
+}
+
void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag,
string& name, string& locator, bool log_op)
{
/* bucket index */
void cls_rgw_bucket_init(librados::ObjectWriteOperation& o);
-/**
- * Init bucket index objects.
- *
- * io_ctx - IO context for rados.
- * bucket_objs - a lit of bucket index objects.
- * max_io - the maximum number of AIO (for throttling).
- *
- * Reutrn 0 on success, a failure code otherwise.
- */
-int cls_rgw_bucket_index_init_op(librados::IoCtx &io_ctx,
- const vector<string>& bucket_objs, uint32_t max_aio);
+class CLSRGWConcurrentIO {
+protected:
+ librados::IoCtx& io_ctx;
+ vector<string>& bucket_objs;
+ uint32_t max_aio;
+ BucketIndexAioManager manager;
+
+ virtual int issue_op(const string& obj) = 0;
+
+ virtual void cleanup() {}
+ virtual int valid_ret_code() { return 0; }
+
+public:
+ CLSRGWConcurrentIO(librados::IoCtx& ioc, vector<string>& _bucket_objs,
+ uint32_t _max_aio) : io_ctx(ioc), bucket_objs(_bucket_objs), max_aio(_max_aio) {}
+ virtual ~CLSRGWConcurrentIO() {}
-int cls_rgw_bucket_set_tag_timeout(librados::IoCtx& io_ctx,
- const vector<string>& bucket_objs, uint64_t tag_timeout, uint32_t max_aio);
+ int operator()();
+};
+
+class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO {
+ vector<string> issued_objs;
+protected:
+ int issue_op(const string& obj);
+ int valid_ret_code() { return -EEXIST; }
+ void cleanup();
+public:
+ CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc, vector<string>& _bucket_objs,
+ uint32_t _max_aio) : CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {}
+};
+
+class CLSRGWIssueSetTagTimeout : public CLSRGWConcurrentIO {
+ uint64_t tag_timeout;
+protected:
+ int issue_op(const string& obj);
+public:
+ CLSRGWIssueSetTagTimeout(librados::IoCtx& ioc, vector<string>& _bucket_objs,
+ uint32_t _max_aio, uint64_t _tag_timeout) : CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio),
+ tag_timeout(_tag_timeout) {}
+};
void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag,
string& name, string& locator, bool log_op);
vector<string> bucket_objs;
get_bucket_index_objects(dir_oid, bucket_index_max_shards, bucket_objs);
- return cls_rgw_bucket_index_init_op(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio);
+ return CLSRGWIssueBucketIndexInit(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
}
/**
if (r < 0)
return r;
- return cls_rgw_bucket_set_tag_timeout(index_ctx, bucket_objs, timeout, cct->_conf->rgw_bucket_index_max_aio);
+ return CLSRGWIssueSetTagTimeout(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio, timeout)();
}
int RGWRados::cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix,