]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
cls_rgw, rgw: create base class for common bucket shard operations
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 19 Sep 2014 21:55:12 +0000 (14:55 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Wed, 14 Jan 2015 03:21:23 +0000 (19:21 -0800)
Instead of copy pasting the same code all over again, create a base
class for the needed concurrent IO operations.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h
src/rgw/rgw_rados.cc

index ed937bc52eb0f8aea354f90eada6104ddd3b2210..f67b914aca8cc88e7d1b7ffbd7d9d320e8f1ba6a 100644 (file)
@@ -103,46 +103,6 @@ static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx,
   return r;
 }
 
-int cls_rgw_bucket_index_init_op(librados::IoCtx& io_ctx,
-        const vector<string>& bucket_objs, uint32_t max_aio)
-{
-  int ret = 0;
-  vector<string>::const_iterator iter = bucket_objs.begin();
-  BucketIndexAioManager manager;
-  // Issue *max_aio* requests, all subsequent requests are issued upon
-  // pending request finishing
-  for (; iter != bucket_objs.end() && max_aio-- > 0; ++iter) {
-    ret = issue_bucket_index_init_op(io_ctx, *iter, &manager);
-    if (ret < 0)
-      break;
-  }
-
-  int num_completions, r = 0;
-  while (manager.wait_for_completions(-EEXIST, &num_completions, &r)) {
-    if (r >= 0 && ret >= 0) {
-      for(int i = 0; i < num_completions && iter != bucket_objs.end(); ++i, ++iter) {
-        int issue_ret = issue_bucket_index_init_op(io_ctx, *iter, &manager);
-        if(issue_ret < 0) {
-          ret = issue_ret;
-          break;
-        }
-      }
-    } else if (ret >= 0) {
-      ret = r;
-    }
-  }
-
-  if (ret < 0) {
-    // Do best effort removal
-    vector<string>::const_iterator citer = bucket_objs.begin();
-    for(; citer != iter; ++citer) {
-      io_ctx.remove(*citer);
-    }
-  }
-
-  return ret;
-}
-
 static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
     const string& oid, uint64_t timeout, BucketIndexAioManager *manager) {
   bufferlist in;
@@ -160,14 +120,12 @@ static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
   return r;
 }
 
-int cls_rgw_bucket_set_tag_timeout(librados::IoCtx& io_ctx, const vector<string>& bucket_objs,
-    uint64_t tag_timeout, uint32_t max_aio)
-{
+int CLSRGWConcurrentIO::operator()() {
   int ret = 0;
   vector<string>::const_iterator iter = bucket_objs.begin();
   BucketIndexAioManager manager;
   for (; iter != bucket_objs.end() && max_aio-- > 0; ++iter) {
-    ret = issue_bucket_set_tag_timeout_op(io_ctx, *iter, tag_timeout, &manager);
+    ret = issue_op(*iter);
     if (ret < 0)
       break;
   }
@@ -176,7 +134,7 @@ int cls_rgw_bucket_set_tag_timeout(librados::IoCtx& io_ctx, const vector<string>
   while (manager.wait_for_completions(0, &num_completions, &r)) {
     if (r >= 0 && ret >= 0) {
       for(int i = 0; i < num_completions && iter != bucket_objs.end(); ++i, ++iter) {
-        int issue_ret = issue_bucket_set_tag_timeout_op(io_ctx, *iter, tag_timeout, &manager);
+        int issue_ret = issue_op(*iter);
         if(issue_ret < 0) {
           ret = issue_ret;
           break;
@@ -186,9 +144,32 @@ int cls_rgw_bucket_set_tag_timeout(librados::IoCtx& io_ctx, const vector<string>
       ret = r;
     }
   }
+
+  if (ret < 0) {
+    cleanup();
+  }
   return ret;
 }
 
+int CLSRGWIssueBucketIndexInit::issue_op(const string& obj)
+{
+  issued_objs.push_back(obj);
+  return issue_bucket_index_init_op(io_ctx, obj, &manager);
+}
+
+void CLSRGWIssueBucketIndexInit::cleanup()
+{
+  // Do best effort removal
+  for (vector<string>::iterator iter = issued_objs.begin(); iter != issued_objs.end(); ++iter) {
+    io_ctx.remove(*iter);
+  }
+}
+
+int CLSRGWIssueSetTagTimeout::issue_op(const string& obj)
+{
+  return issue_bucket_set_tag_timeout_op(io_ctx, obj, tag_timeout, &manager);
+}
+
 void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag,
                                string& name, string& locator, bool log_op)
 {
index b28425e1fba6553fe525772411e4421cec2ff331..e60589d7995d2fb0285523edae737a5a11fb09f9 100644 (file)
@@ -111,20 +111,46 @@ public:
 /* bucket index */
 void cls_rgw_bucket_init(librados::ObjectWriteOperation& o);
 
-/**
- * Init bucket index objects.
- *
- * io_ctx      - IO context for rados.
- * bucket_objs - a lit of bucket index objects.
- * max_io      - the maximum number of AIO (for throttling).
- *
- * Reutrn 0 on success, a failure code otherwise.
- */
-int cls_rgw_bucket_index_init_op(librados::IoCtx &io_ctx,
-        const vector<string>& bucket_objs, uint32_t max_aio);
+class CLSRGWConcurrentIO {
+protected:
+  librados::IoCtx& io_ctx;
+  vector<string>& bucket_objs;
+  uint32_t max_aio;
+  BucketIndexAioManager manager;
+
+  virtual int issue_op(const string& obj) = 0;
+
+  virtual void cleanup() {}
+  virtual int valid_ret_code() { return 0; }
+
+public:
+  CLSRGWConcurrentIO(librados::IoCtx& ioc, vector<string>& _bucket_objs,
+                     uint32_t _max_aio) : io_ctx(ioc), bucket_objs(_bucket_objs), max_aio(_max_aio) {}
+  virtual ~CLSRGWConcurrentIO() {}
 
-int cls_rgw_bucket_set_tag_timeout(librados::IoCtx& io_ctx,
-    const vector<string>& bucket_objs, uint64_t tag_timeout, uint32_t max_aio);
+  int operator()();
+};
+
+class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO {
+  vector<string> issued_objs;
+protected:
+  int issue_op(const string& obj);
+  int valid_ret_code() { return -EEXIST; }
+  void cleanup();
+public:
+  CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc, vector<string>& _bucket_objs,
+                     uint32_t _max_aio) : CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {}
+};
+
+class CLSRGWIssueSetTagTimeout : public CLSRGWConcurrentIO {
+  uint64_t tag_timeout;
+protected:
+  int issue_op(const string& obj);
+public:
+  CLSRGWIssueSetTagTimeout(librados::IoCtx& ioc, vector<string>& _bucket_objs,
+                     uint32_t _max_aio, uint64_t _tag_timeout) : CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio),
+                                                        tag_timeout(_tag_timeout) {}
+};
 
 void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag,
                                string& name, string& locator, bool log_op);
index a5afa49b916e00cf19516083854cc4643c909dae..7b1ea8afc0f1042eb45d62a633af1f048eaf845c 100644 (file)
@@ -2380,7 +2380,7 @@ int RGWRados::init_bucket_index(rgw_bucket& bucket)
   vector<string> bucket_objs;
   get_bucket_index_objects(dir_oid, bucket_index_max_shards, bucket_objs);
 
-  return cls_rgw_bucket_index_init_op(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio);
+  return CLSRGWIssueBucketIndexInit(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
 }
 
 /**
@@ -6249,7 +6249,7 @@ int RGWRados::cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeou
   if (r < 0)
     return r;
 
-  return cls_rgw_bucket_set_tag_timeout(index_ctx, bucket_objs, timeout, cct->_conf->rgw_bucket_index_max_aio);
+  return CLSRGWIssueSetTagTimeout(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio, timeout)();
 }
 
 int RGWRados::cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix,