]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
cls_rgw: clean up CLSRGWConcurrentIO
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 5 Dec 2014 22:10:50 +0000 (14:10 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Wed, 14 Jan 2015 03:21:26 +0000 (19:21 -0800)
Class is no longer a template, and keeps a map of oids by shard_id. Call
issue_op() using both shard_id and oids. Shard id is used for mapping
the results in the derived classes.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h

index c0c5d7c7731e7a2a079cd33be7fc22d613e74c77..51f1d0fbae2fb26e83d2b2a5dcb846c102c36842 100644 (file)
@@ -115,22 +115,22 @@ static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
   return manager->aio_operate(io_ctx, oid, &op);
 }
 
-int CLSRGWIssueBucketIndexInit::issue_op()
+int CLSRGWIssueBucketIndexInit::issue_op(int shard_id, const string& oid)
 {
-  return issue_bucket_index_init_op(io_ctx, *iter, &manager);
+  return issue_bucket_index_init_op(io_ctx, oid, &manager);
 }
 
 void CLSRGWIssueBucketIndexInit::cleanup()
 {
   // Do best effort removal
-  for (vector<string>::iterator citer = objs_container.begin(); citer != iter; ++citer) {
-    io_ctx.remove(*citer);
+  for (map<int, string>::iterator citer = objs_container.begin(); citer != iter; ++citer) {
+    io_ctx.remove(citer->second);
   }
 }
 
-int CLSRGWIssueSetTagTimeout::issue_op()
+int CLSRGWIssueSetTagTimeout::issue_op(int shard_id, const string& oid)
 {
-  return issue_bucket_set_tag_timeout_op(io_ctx, *iter, tag_timeout, &manager);
+  return issue_bucket_set_tag_timeout_op(io_ctx, oid, tag_timeout, &manager);
 }
 
 void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag,
@@ -182,17 +182,17 @@ static bool issue_bucket_list_op(librados::IoCtx& io_ctx,
   return manager->aio_operate(io_ctx, oid, &op);
 }
 
-int CLSRGWIssueBucketList::issue_op()
+int CLSRGWIssueBucketList::issue_op(int shard_id, const string& oid)
 {
-  return issue_bucket_list_op(io_ctx, iter->first, start_obj, filter_prefix, num_entries, &manager, &iter->second);
+  return issue_bucket_list_op(io_ctx, oid, start_obj, filter_prefix, num_entries, &manager, &result[shard_id]);
 }
 
-static bool issue_bi_log_list_op(librados::IoCtx& io_ctx,
-    const string& oid, BucketIndexShardsManager& marker_mgr, uint32_t max, BucketIndexAioManager *manager,
+static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, const string& oid, int shard_id,
+                                 BucketIndexShardsManager& marker_mgr, uint32_t max, BucketIndexAioManager *manager,
     struct cls_rgw_bi_log_list_ret *pdata) {
   bufferlist in;
   cls_rgw_bi_log_list_op call;
-  call.marker = marker_mgr.get(oid, "");
+  call.marker = marker_mgr.get(shard_id, "");
   call.max = max;
   ::encode(call, in);
 
@@ -201,27 +201,27 @@ static bool issue_bi_log_list_op(librados::IoCtx& io_ctx,
   return manager->aio_operate(io_ctx, oid, &op);
 }
 
-int CLSRGWIssueBILogList::issue_op()
+int CLSRGWIssueBILogList::issue_op(int shard_id, const string& oid)
 {
-  return issue_bi_log_list_op(io_ctx, iter->first, marker_mgr, max, &manager, &iter->second);
+  return issue_bi_log_list_op(io_ctx, oid, shard_id, marker_mgr, max, &manager, &result[shard_id]);
 }
 
-static bool issue_bi_log_trim(librados::IoCtx& io_ctx,
-    string& oid, BucketIndexShardsManager& start_marker_mgr,
-    BucketIndexShardsManager& end_marker_mgr, BucketIndexAioManager *manager) {
+static bool issue_bi_log_trim(librados::IoCtx& io_ctx, const string& oid, int shard_id,
+                              BucketIndexShardsManager& start_marker_mgr,
+                              BucketIndexShardsManager& end_marker_mgr, BucketIndexAioManager *manager) {
   bufferlist in;
   cls_rgw_bi_log_trim_op call;
-  call.start_marker = start_marker_mgr.get(oid, "");
-  call.end_marker = end_marker_mgr.get(oid, "");
+  call.start_marker = start_marker_mgr.get(shard_id, "");
+  call.end_marker = end_marker_mgr.get(shard_id, "");
   ::encode(call, in);
   ObjectWriteOperation op;
   op.exec("rgw", "bi_log_trim", in);
   return manager->aio_operate(io_ctx, oid, &op);
 }
 
-int CLSRGWIssueBILogTrim::issue_op()
+int CLSRGWIssueBILogTrim::issue_op(int shard_id, const string& oid)
 {
-  return issue_bi_log_trim(io_ctx, *iter, start_marker_mgr, end_marker_mgr, &manager);
+  return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager);
 }
 
 static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager,
@@ -233,9 +233,9 @@ static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, Bucket
   return manager->aio_operate(io_ctx, oid, &op);
 }
 
-int CLSRGWIssueBucketCheck::issue_op()
+int CLSRGWIssueBucketCheck::issue_op(int shard_id, const string& oid)
 {
-  return issue_bucket_check_index_op(io_ctx, iter->first, &manager, &iter->second);
+  return issue_bucket_check_index_op(io_ctx, oid, &manager, &result[shard_id]);
 }
 
 static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid,
@@ -246,9 +246,9 @@ static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid,
   return manager->aio_operate(io_ctx, oid, &op);
 }
 
-int CLSRGWIssueBucketRebuild::issue_op()
+int CLSRGWIssueBucketRebuild::issue_op(int shard_id, const string& oid)
 {
-  return issue_bucket_rebuild_index_op(io_ctx, *iter, &manager);
+  return issue_bucket_rebuild_index_op(io_ctx, oid, &manager);
 }
 
 void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates)
@@ -262,9 +262,9 @@ void cls_rgw_suggest_changes(ObjectWriteOperation& o, bufferlist& updates)
   o.exec("rgw", "dir_suggest_changes", updates);
 }
 
-int CLSRGWIssueGetDirHeader::issue_op()
+int CLSRGWIssueGetDirHeader::issue_op(int shard_id, const string& oid)
 {
-  return issue_bucket_list_op(io_ctx, iter->first, "", "", 0, &manager, &iter->second);
+  return issue_bucket_list_op(io_ctx, oid, "", "", 0, &manager, &result[shard_id]);
 }
 
 class GetDirHeaderCompletion : public ObjectOperationCompletion {
index 14a702cc4296230fe5398f345a59154a1583b1d9..cd6e852baaac509b1baba55d7ac853b5a7bd4087 100644 (file)
@@ -129,17 +129,17 @@ public:
 class BucketIndexShardsManager {
 private:
   // Per shard setting manager, for example, marker.
-  map<string, string> value_by_shards;
+  map<int, string> value_by_shards;
 public:
   const static string KEY_VALUE_SEPARATOR;
   const static string SHARDS_SEPARATOR;
 
-  void add(const string& shard, const string& value) {
+  void add(int shard, const string& value) {
     value_by_shards[shard] = value;
   }
 
-  const string& get(const string& shard, const string& default_value) {
-    map<string, string>::iterator iter = value_by_shards.find(shard);
+  const string& get(int shard, const string& default_value) {
+    map<int, string>::iterator iter = value_by_shards.find(shard);
     return (iter == value_by_shards.end() ? default_value : iter->second);
   }
 
@@ -149,7 +149,7 @@ public:
 
   void to_string(string *out) const {
     if (out) {
-      map<string, string>::const_iterator iter = value_by_shards.begin();
+      map<int, string>::const_iterator iter = value_by_shards.begin();
       // No shards
       if (value_by_shards.size() == 1) {
         *out = iter->second;
@@ -159,7 +159,9 @@ public:
             // Not the first item, append a separator first
             out->append(SHARDS_SEPARATOR);
           }
-          out->append(iter->first);
+          char buf[16];
+          snprintf(buf, sizeof(buf), "%d", iter->first);
+          out->append(buf);
           out->append(KEY_VALUE_SEPARATOR);
           out->append(iter->second);
         }
@@ -167,20 +169,26 @@ public:
     }
   }
 
-  int from_string(const string& composed_marker, bool has_shards, const string& oid) {
+  int from_string(const string& composed_marker, bool has_shards) {
     value_by_shards.clear();
     if (!has_shards) {
-      add(oid, composed_marker);
+      add(0, composed_marker);
     } else {
       list<string> shards;
       get_str_list(composed_marker, SHARDS_SEPARATOR.c_str(), shards);
       list<string>::const_iterator iter = shards.begin();
       for (; iter != shards.end(); ++iter) {
         size_t pos = iter->find(KEY_VALUE_SEPARATOR);
-        if (pos == string::npos)
+        if (pos == string::npos) {
           return -EINVAL;
-        string name = iter->substr(0, pos);
-        value_by_shards[name] = iter->substr(pos + 1, iter->length() - pos - 1);
+        }
+        string shard_str = iter->substr(0, pos);
+        string err;
+        int shard = (int)strict_strtol(shard_str.c_str(), 10, &err);
+        if (!err.empty()) {
+          return -EINVAL;
+        }
+        value_by_shards[shard] = iter->substr(pos + 1);
       }
     }
     return 0;
@@ -190,16 +198,15 @@ public:
 /* bucket index */
 void cls_rgw_bucket_init(librados::ObjectWriteOperation& o);
 
-template<class T>
 class CLSRGWConcurrentIO {
 protected:
   librados::IoCtx& io_ctx;
-  T& objs_container;
-  typename T::iterator iter;
+  map<int, string>& objs_container;
+  map<int, string>::iterator iter;
   uint32_t max_aio;
   BucketIndexAioManager manager;
 
-  virtual int issue_op() = 0;
+  virtual int issue_op(int shard_id, const string& oid) = 0;
 
   virtual void cleanup() {}
   virtual int valid_ret_code() { return 0; }
@@ -211,7 +218,7 @@ protected:
   virtual void reset_container(vector<string>& objs) {}
 
 public:
-  CLSRGWConcurrentIO(librados::IoCtx& ioc, T& _objs_container,
+  CLSRGWConcurrentIO(librados::IoCtx& ioc, map<int, string>& _objs_container,
                      uint32_t _max_aio) : io_ctx(ioc), objs_container(_objs_container), max_aio(_max_aio) {}
   virtual ~CLSRGWConcurrentIO() {}
 
@@ -219,7 +226,7 @@ public:
     int ret = 0;
     iter = objs_container.begin();
     for (; iter != objs_container.end() && max_aio-- > 0; ++iter) {
-      ret = issue_op();
+      ret = issue_op(iter->first, iter->second);
       if (ret < 0)
         break;
     }
@@ -229,7 +236,7 @@ public:
     while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r, &objs)) {
       if (r >= 0 && ret >= 0) {
         for(int i = 0; i < num_completions && iter != objs_container.end(); ++i, ++iter) {
-          int issue_ret = issue_op();
+          int issue_ret = issue_op(iter->first, iter->second);
           if(issue_ret < 0) {
             ret = issue_ret;
             break;
@@ -252,26 +259,25 @@ public:
   }
 };
 
-class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO<vector<string> > {
+class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO {
 protected:
-  int issue_op();
+  int issue_op(int shard_id, const string& oid);
   int valid_ret_code() { return -EEXIST; }
   void cleanup();
 public:
-  CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc, vector<string>& _bucket_objs,
+  CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc, map<int, string>& _bucket_objs,
                      uint32_t _max_aio) :
-    CLSRGWConcurrentIO<vector<string> >(ioc, _bucket_objs, _max_aio) {}
+    CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {}
 };
 
-class CLSRGWIssueSetTagTimeout : public CLSRGWConcurrentIO<vector<string> > {
+class CLSRGWIssueSetTagTimeout : public CLSRGWConcurrentIO {
   uint64_t tag_timeout;
 protected:
-  int issue_op();
+  int issue_op(int shard_id, const string& oid);
 public:
-  CLSRGWIssueSetTagTimeout(librados::IoCtx& ioc, vector<string>& _bucket_objs,
+  CLSRGWIssueSetTagTimeout(librados::IoCtx& ioc, map<int, string>& _bucket_objs,
                      uint32_t _max_aio, uint64_t _tag_timeout) :
-    CLSRGWConcurrentIO<vector<string> >(ioc, _bucket_objs, _max_aio),
-    tag_timeout(_tag_timeout) {}
+    CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio), tag_timeout(_tag_timeout) {}
 };
 
 void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag,
@@ -298,51 +304,55 @@ void cls_rgw_bucket_complete_op(librados::ObjectWriteOperation& o, RGWModifyOp o
  * Return 0 on success, a failure code otherwise.
 */
 
-class CLSRGWIssueBucketList : public CLSRGWConcurrentIO<map<string, rgw_cls_list_ret> > {
+class CLSRGWIssueBucketList : public CLSRGWConcurrentIO {
   string start_obj;
   string filter_prefix;
   uint32_t num_entries;
+  map<int, rgw_cls_list_ret>& result;
 protected:
-  int issue_op();
+  int issue_op(int shard_id, const string& oid);
 public:
   CLSRGWIssueBucketList(librados::IoCtx& io_ctx, const string& _start_obj,
                         const string& _filter_prefix, uint32_t _num_entries,
-                        map<string, struct rgw_cls_list_ret>& list_results,
+                        map<int, string>& oids,
+                        map<int, struct rgw_cls_list_ret>& list_results,
                         uint32_t max_aio) :
-  CLSRGWConcurrentIO<map<string, rgw_cls_list_ret> > (io_ctx, list_results, max_aio),
-  start_obj(_start_obj), filter_prefix(_filter_prefix), num_entries(_num_entries) {}
+  CLSRGWConcurrentIO(io_ctx, oids, max_aio),
+  start_obj(_start_obj), filter_prefix(_filter_prefix), num_entries(_num_entries), result(list_results) {}
 };
 
-class CLSRGWIssueBILogList : public CLSRGWConcurrentIO<map<string, cls_rgw_bi_log_list_ret> > {
+class CLSRGWIssueBILogList : public CLSRGWConcurrentIO {
+  map<int, struct cls_rgw_bi_log_list_ret>& result;
   BucketIndexShardsManager& marker_mgr;
   uint32_t max;
 protected:
-  int issue_op();
+  int issue_op(int shard_id, const string& oid);
 public:
   CLSRGWIssueBILogList(librados::IoCtx& io_ctx, BucketIndexShardsManager& _marker_mgr, uint32_t _max,
-      map<string, struct cls_rgw_bi_log_list_ret>& bi_log_lists, uint32_t max_aio) :
-    CLSRGWConcurrentIO<map<string, cls_rgw_bi_log_list_ret> >(io_ctx, bi_log_lists, max_aio),
+                       map<int, string>& oids,
+                       map<int, struct cls_rgw_bi_log_list_ret>& bi_log_lists, uint32_t max_aio) :
+    CLSRGWConcurrentIO(io_ctx, oids, max_aio), result(bi_log_lists),
     marker_mgr(_marker_mgr), max(_max) {}
 };
 
-class CLSRGWIssueBILogTrim : public CLSRGWConcurrentIO<vector<string> > {
+class CLSRGWIssueBILogTrim : public CLSRGWConcurrentIO {
   BucketIndexShardsManager& start_marker_mgr;
   BucketIndexShardsManager& end_marker_mgr;
 protected:
-  int issue_op();
+  int issue_op(int shard_id, const string& oid);
   // Trim until -ENODATA is returned.
   int valid_ret_code() { return -ENODATA; }
   bool need_multiple_rounds() { return true; }
-  void add_object(const string& oid) { objs_container.push_back(oid); }
-  void reset_container(vector<string>& objs) {
+  void add_object(int shard, const string& oid) { objs_container[shard] = oid; }
+  void reset_container(map<int, string>& objs) {
     objs_container.swap(objs);
     iter = objs_container.begin();
     objs.clear();
   }
 public:
   CLSRGWIssueBILogTrim(librados::IoCtx& io_ctx, BucketIndexShardsManager& _start_marker_mgr,
-      BucketIndexShardsManager& _end_marker_mgr, vector<string>& _bucket_objs, uint32_t max_aio) :
-    CLSRGWConcurrentIO<vector<string> >(io_ctx, _bucket_objs, max_aio),
+      BucketIndexShardsManager& _end_marker_mgr, map<int, string>& _bucket_objs, uint32_t max_aio) :
+    CLSRGWConcurrentIO(io_ctx, _bucket_objs, max_aio),
     start_marker_mgr(_start_marker_mgr), end_marker_mgr(_end_marker_mgr) {}
 };
 
@@ -355,30 +365,32 @@ public:
  *
  * Return 0 on success, a failure code otherwise.
  */
-class CLSRGWIssueBucketCheck : public CLSRGWConcurrentIO<map<string, struct rgw_cls_check_index_ret> > {
+class CLSRGWIssueBucketCheck : public CLSRGWConcurrentIO /*<map<string, struct rgw_cls_check_index_ret> >*/ {
+  map<int, struct rgw_cls_check_index_ret>& result;
 protected:
-  int issue_op();
+  int issue_op(int shard_id, const string& oid);
 public:
-  CLSRGWIssueBucketCheck(librados::IoCtx& ioc, map<string, struct rgw_cls_check_index_ret>& bucket_objs_ret,
+  CLSRGWIssueBucketCheck(librados::IoCtx& ioc, map<int, string>& oids, map<int, struct rgw_cls_check_index_ret>& bucket_objs_ret,
                      uint32_t _max_aio) :
-    CLSRGWConcurrentIO<map<string, struct rgw_cls_check_index_ret> >(ioc, bucket_objs_ret, _max_aio) {}
+    CLSRGWConcurrentIO(ioc, oids, _max_aio), result(bucket_objs_ret) {}
 };
 
-class CLSRGWIssueBucketRebuild : public CLSRGWConcurrentIO<vector<string> > {
+class CLSRGWIssueBucketRebuild : public CLSRGWConcurrentIO {
 protected:
-  int issue_op();
+  int issue_op(int shard_id, const string& oid);
 public:
-  CLSRGWIssueBucketRebuild(librados::IoCtx& io_ctx, vector<string>& bucket_objs,
-                           uint32_t max_aio) : CLSRGWConcurrentIO<vector<string> >(io_ctx, bucket_objs, max_aio) {}
+  CLSRGWIssueBucketRebuild(librados::IoCtx& io_ctx, map<int, string>& bucket_objs,
+                           uint32_t max_aio) : CLSRGWConcurrentIO(io_ctx, bucket_objs, max_aio) {}
 };
 
-class CLSRGWIssueGetDirHeader : public CLSRGWConcurrentIO<map<string, rgw_cls_list_ret> > {
+class CLSRGWIssueGetDirHeader : public CLSRGWConcurrentIO {
+  map<int, rgw_cls_list_ret>& result;
 protected:
-  int issue_op();
+  int issue_op(int shard_id, const string& oid);
 public:
-  CLSRGWIssueGetDirHeader(librados::IoCtx& io_ctx, map<string, rgw_cls_list_ret>& dir_headers,
+  CLSRGWIssueGetDirHeader(librados::IoCtx& io_ctx, map<int, string>& oids, map<int, rgw_cls_list_ret>& dir_headers,
                           uint32_t max_aio) :
-    CLSRGWConcurrentIO<map<string, rgw_cls_list_ret> >(io_ctx, dir_headers, max_aio) {}
+    CLSRGWConcurrentIO(io_ctx, oids, max_aio), result(dir_headers) {}
 };
 
 int cls_rgw_get_dir_header(librados::IoCtx& io_ctx, map<string, rgw_cls_list_ret>& dir_headers,