]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
cls_rgw, rgw: switch different ops to new concurrent infrastructure
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 19 Sep 2014 22:34:54 +0000 (15:34 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Wed, 14 Jan 2015 03:21:24 +0000 (19:21 -0800)
Make all the relevant ops use the CLSRGWConcurrentIO infrastructure,
which simplifies things.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h
src/rgw/rgw_rados.cc

index b1876a0478f71bf5a9c2a480919fe123be0e9b19..e253474d7bebc4dce1d82a8b3e9ac165df244ec0 100644 (file)
@@ -193,35 +193,9 @@ static bool issue_bucket_list_op(librados::IoCtx& io_ctx,
   return r;
 }
 
-int cls_rgw_list_op(IoCtx& io_ctx, const string& start_obj,
-        const string& filter_prefix, uint32_t num_entries,
-        map<string, struct rgw_cls_list_ret>& list_results, uint32_t max_aio)
+int CLSRGWIssueBucketList::issue_op()
 {
-  int ret = 0;
-  BucketIndexAioManager manager;
-  map<string, struct rgw_cls_list_ret>::iterator iter = list_results.begin();
-  for (; iter != list_results.end() && max_aio-- > 0; ++iter) {
-    ret = issue_bucket_list_op(io_ctx, iter->first, start_obj, filter_prefix, num_entries, &manager, &iter->second);
-    if (ret < 0)
-      break;
-  }
-
-  int num_completions, r = 0;
-  while (manager.wait_for_completions(0, &num_completions, &r)) {
-    if (r >= 0 && ret >= 0) {
-      for (int i = 0; i < num_completions && iter != list_results.end(); ++i, ++iter) {
-        int issue_ret = issue_bucket_list_op(io_ctx, iter->first, start_obj, filter_prefix, num_entries, &manager, &iter->second);
-        if (issue_ret < 0) {
-          ret = issue_ret;
-          break;
-        }
-      }
-    } else if (ret >= 0) {
-      ret = r;
-    }
-  }
-
-  return ret;
+  return issue_bucket_list_op(io_ctx, iter->first, start_obj, filter_prefix, num_entries, &manager, &iter->second);
 }
 
 static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager,
@@ -239,33 +213,9 @@ static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, Bucket
   return r;
 }
 
-int cls_rgw_bucket_check_index_op(IoCtx& io_ctx,
-    map<string, struct rgw_cls_check_index_ret>& bucket_objs_ret, uint32_t max_aio)
+int CLSRGWIssueBucketCheck::issue_op()
 {
-  int ret = 0;
-  BucketIndexAioManager manager;
-  map<string, struct rgw_cls_check_index_ret>::iterator iter = bucket_objs_ret.begin();
-  for (; iter != bucket_objs_ret.end() && max_aio-- > 0; ++iter) {
-    ret = issue_bucket_check_index_op(io_ctx, iter->first, &manager, &iter->second);
-    if (ret < 0)
-      break;
-  }
-
-  int num_completions, r = 0;
-  while (manager.wait_for_completions(0, &num_completions, &r)) {
-    if (r >= 0 && ret >= 0) {
-      for (int i = 0; i < num_completions && iter != bucket_objs_ret.end(); ++i, ++iter) {
-        int issue_ret = issue_bucket_check_index_op(io_ctx, iter->first, &manager, &iter->second);
-        if (issue_ret < 0) {
-          ret = issue_ret;
-          break;
-        }
-      }
-    } else if (ret >= 0) {
-      ret = r;
-    }
-  }
-  return ret;
+  return issue_bucket_check_index_op(io_ctx, iter->first, &manager, &iter->second);
 }
 
 static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid,
@@ -282,33 +232,9 @@ static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid,
   return r;
 }
 
-int cls_rgw_bucket_rebuild_index_op(IoCtx& io_ctx, const vector<string>& bucket_objs,
-    uint32_t max_aio)
+int CLSRGWIssueBucketRebuild::issue_op()
 {
-  int ret = 0;
-  BucketIndexAioManager manager;
-  vector<string>::const_iterator iter = bucket_objs.begin();
-  for (; iter != bucket_objs.end() && max_aio-- > 0; ++iter) {
-    ret = issue_bucket_rebuild_index_op(io_ctx, *iter, &manager);
-    if (ret < 0)
-      break;
-  }
-
-  int num_completions, r = 0;
-  while (manager.wait_for_completions(0, &num_completions, &r)) {
-    if (r >= 0 && ret >= 0) {
-      for (int i = 0; i < num_completions && iter != bucket_objs.end(); ++i, ++iter) {
-        int issue_ret = issue_bucket_rebuild_index_op(io_ctx, *iter, &manager);
-        if (issue_ret < 0) {
-          ret = issue_ret;
-          break;
-        }
-      }
-    } else if (ret >= 0) {
-      ret = r;
-    }
-  }
-  return ret;
+  return issue_bucket_rebuild_index_op(io_ctx, *iter, &manager);
 }
 
 void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates)
@@ -322,33 +248,9 @@ void cls_rgw_suggest_changes(ObjectWriteOperation& o, bufferlist& updates)
   o.exec("rgw", "dir_suggest_changes", updates);
 }
 
-int cls_rgw_get_dir_header(IoCtx& io_ctx, map<string, rgw_cls_list_ret>& dir_headers,
-    uint32_t max_aio)
+int CLSRGWIssueGetDirHeader::issue_op()
 {
-  int ret = 0;
-  BucketIndexAioManager manager;
-  map<string, rgw_cls_list_ret>::iterator iter = dir_headers.begin();
-  for (; iter != dir_headers.end() && max_aio-- > 0; ++iter) {
-    ret = issue_bucket_list_op(io_ctx, iter->first, "", "", 0, &manager, &iter->second);
-    if (ret < 0)
-      break;
-  }
-
-  int num_completions, r = 0;
-  while (manager.wait_for_completions(0, &num_completions, &r)) {
-    if (r >= 0 && ret >= 0) {
-      for (int i = 0; i < num_completions && iter != dir_headers.end(); ++i, ++iter) {
-        int issue_ret = issue_bucket_list_op(io_ctx, iter->first, "", "", 0, &manager, &iter->second);
-        if (issue_ret < 0) {
-          ret = issue_ret;
-          break;
-        }
-      }
-    } else if (ret >= 0) {
-      ret = r;
-    }
-  }
-  return ret;
+  return issue_bucket_list_op(io_ctx, iter->first, "", "", 0, &manager, &iter->second);
 }
 
 class GetDirHeaderCompletion : public ObjectOperationCompletion {
index 19c1ab1e2214e7008bd3d610a849786f5a6e360e..ebd005fba335d85dbea4b8e5d00e5b1a6db5d1ec 100644 (file)
@@ -4,6 +4,7 @@
 #include "include/types.h"
 #include "include/rados/librados.hpp"
 #include "cls_rgw_types.h"
+#include "cls_rgw_ops.h"
 #include "common/RefCountedObj.h"
 
 /*
@@ -116,7 +117,7 @@ class CLSRGWConcurrentIO {
 protected:
   librados::IoCtx& io_ctx;
   T& objs_container;
-  typename T::const_iterator iter;
+  typename T::iterator iter;
   uint32_t max_aio;
   BucketIndexAioManager manager;
 
@@ -126,7 +127,7 @@ protected:
   virtual int valid_ret_code() { return 0; }
 
 public:
-  CLSRGWConcurrentIO(librados::IoCtx& ioc, vector<string>& _objs_container,
+  CLSRGWConcurrentIO(librados::IoCtx& ioc, T& _objs_container,
                      uint32_t _max_aio) : io_ctx(ioc), objs_container(_objs_container), max_aio(_max_aio) {}
   virtual ~CLSRGWConcurrentIO() {}
 
@@ -168,7 +169,8 @@ protected:
   void cleanup();
 public:
   CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc, vector<string>& _bucket_objs,
-                     uint32_t _max_aio) : CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {}
+                     uint32_t _max_aio) :
+    CLSRGWConcurrentIO<vector<string> >(ioc, _bucket_objs, _max_aio) {}
 };
 
 class CLSRGWIssueSetTagTimeout : public CLSRGWConcurrentIO<vector<string> > {
@@ -177,8 +179,9 @@ protected:
   int issue_op();
 public:
   CLSRGWIssueSetTagTimeout(librados::IoCtx& ioc, vector<string>& _bucket_objs,
-                     uint32_t _max_aio, uint64_t _tag_timeout) : CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio),
-                                                        tag_timeout(_tag_timeout) {}
+                     uint32_t _max_aio, uint64_t _tag_timeout) :
+    CLSRGWConcurrentIO<vector<string> >(ioc, _bucket_objs, _max_aio),
+    tag_timeout(_tag_timeout) {}
 };
 
 void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag,
@@ -204,10 +207,21 @@ void cls_rgw_bucket_complete_op(librados::ObjectWriteOperation& o, RGWModifyOp o
  *
  * Return 0 on success, a failure code otherwise.
 */
-int cls_rgw_list_op(librados::IoCtx& io_ctx, const string & start_obj,
-                    const string& filter_prefix, uint32_t num_entries,
-                    map<string, struct rgw_cls_list_ret>& list_results,
-                    uint32_t max_aio);
+
+class CLSRGWIssueBucketList : public CLSRGWConcurrentIO<map<string, rgw_cls_list_ret> > {
+  string start_obj;
+  string filter_prefix;
+  uint32_t num_entries;
+protected:
+  int issue_op();
+public:
+  CLSRGWIssueBucketList(librados::IoCtx& io_ctx, const string& _start_obj,
+                        const string& _filter_prefix, uint32_t _num_entries,
+                        map<string, struct rgw_cls_list_ret>& list_results,
+                        uint32_t max_aio) :
+  CLSRGWConcurrentIO<map<string, rgw_cls_list_ret> > (io_ctx, list_results, max_aio),
+  start_obj(_start_obj), filter_prefix(_filter_prefix), num_entries(_num_entries) {}
+};
 
 /**
  * Check the bucket index.
@@ -218,11 +232,32 @@ int cls_rgw_list_op(librados::IoCtx& io_ctx, const string & start_obj,
  *
  * Return 0 on success, a failure code otherwise.
  */
-int cls_rgw_bucket_check_index_op(librados::IoCtx& io_ctx,
-    map<string, struct rgw_cls_check_index_ret>& bucket_objs_ret, uint32_t max_aio);
-int cls_rgw_bucket_rebuild_index_op(librados::IoCtx& io_ctx, const vector<string>& bucket_objs,
-    uint32_t max_aio);
-  
+class CLSRGWIssueBucketCheck : public CLSRGWConcurrentIO<map<string, struct rgw_cls_check_index_ret> > {
+protected:
+  int issue_op();
+public:
+  CLSRGWIssueBucketCheck(librados::IoCtx& ioc, map<string, struct rgw_cls_check_index_ret>& bucket_objs_ret,
+                     uint32_t _max_aio) :
+    CLSRGWConcurrentIO<map<string, struct rgw_cls_check_index_ret> >(ioc, bucket_objs_ret, _max_aio) {}
+};
+
+class CLSRGWIssueBucketRebuild : public CLSRGWConcurrentIO<vector<string> > {
+protected:
+  int issue_op();
+public:
+  CLSRGWIssueBucketRebuild(librados::IoCtx& io_ctx, vector<string>& bucket_objs,
+                           uint32_t max_aio) : CLSRGWConcurrentIO<vector<string> >(io_ctx, bucket_objs, max_aio) {}
+};
+
+class CLSRGWIssueGetDirHeader : public CLSRGWConcurrentIO<map<string, rgw_cls_list_ret> > {
+protected:
+  int issue_op();
+public:
+  CLSRGWIssueGetDirHeader(librados::IoCtx& io_ctx, map<string, rgw_cls_list_ret>& dir_headers,
+                          uint32_t max_aio) :
+    CLSRGWConcurrentIO<map<string, rgw_cls_list_ret> >(io_ctx, dir_headers, max_aio) {}
+};
+
 int cls_rgw_get_dir_header(librados::IoCtx& io_ctx, map<string, rgw_cls_list_ret>& dir_headers,
     uint32_t max_aio);
 int cls_rgw_get_dir_header_async(librados::IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx);
index 7b1ea8afc0f1042eb45d62a633af1f048eaf845c..605b3ffee685bcd53a1beac4a58f6532b2f55eb8 100644 (file)
@@ -3876,7 +3876,7 @@ int RGWRados::bucket_check_index(rgw_bucket& bucket,
   if (ret < 0)
     return ret;
 
-  ret = cls_rgw_bucket_check_index_op(index_ctx, bucket_objs_ret, cct->_conf->rgw_bucket_index_max_aio);
+  ret = CLSRGWIssueBucketCheck(index_ctx, bucket_objs_ret, cct->_conf->rgw_bucket_index_max_aio)();
   if (ret < 0)
     return ret;
 
@@ -3898,7 +3898,7 @@ int RGWRados::bucket_rebuild_index(rgw_bucket& bucket)
   if (r < 0)
     return r;
 
-  return cls_rgw_bucket_rebuild_index_op(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio);
+  return CLSRGWIssueBucketRebuild(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
 }
 
 
@@ -6266,7 +6266,7 @@ int RGWRados::cls_bucket_list(rgw_bucket& bucket, const string& start, const str
   if (r < 0)
     return r;
 
-  r = cls_rgw_list_op(index_ctx, start, prefix, num_entries, list_results, cct->_conf->rgw_bucket_index_max_aio);
+  r = CLSRGWIssueBucketList(index_ctx, start, prefix, num_entries, list_results, cct->_conf->rgw_bucket_index_max_aio)();
   if (r < 0)
     return r;
 
@@ -6558,7 +6558,7 @@ int RGWRados::cls_bucket_head(rgw_bucket& bucket, map<string, struct rgw_bucket_
   if (r < 0)
     return r;
 
-  r = cls_rgw_get_dir_header(index_ctx, list_results, cct->_conf->rgw_bucket_index_max_aio);
+  r = CLSRGWIssueGetDirHeader(index_ctx, list_results, cct->_conf->rgw_bucket_index_max_aio)();
   if (r < 0)
     return r;