]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw_admin: use aio operations for bucket resharding
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 26 Sep 2016 22:49:37 +0000 (15:49 -0700)
committerRobin H. Johnson <robin.johnson@dreamhost.com>
Thu, 9 Feb 2017 22:36:47 +0000 (14:36 -0800)
also created shards manager to make things slightly cleaner

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
(cherry picked from commit 97e7ee9ca213ccf4b8f537e02125bd0c4ef24103)
See: http://tracker.ceph.com/issues/17556
See: https://github.com/ceph/ceph/pull/11368
Signed-off-by: Robin H. Johnson <robin.johnson@dreamhost.com>
src/rgw/rgw_admin.cc

index 09593aa48a0181a20a9c29d14ff4a0ac5e193504..f3cbbebc77bb03fe9705a032bf5df60fb18ca67d 100644 (file)
@@ -2011,6 +2011,7 @@ static void parse_tier_config_param(const string& s, map<string, string>& out)
 }
 
 #define RESHARD_SHARD_WINDOW 64
+#define RESHARD_MAX_AIO 128
 
 class BucketReshardShard {
   RGWRados *store;
@@ -2019,15 +2020,54 @@ class BucketReshardShard {
   RGWRados::BucketShard bs;
   vector<rgw_cls_bi_entry> entries;
   map<uint8_t, rgw_bucket_category_stats> stats;
+  deque<librados::AioCompletion *>& aio_completions;
+
+  int wait_next_completion() {
+    librados::AioCompletion *c = aio_completions.front();
+    aio_completions.pop_front();
+
+    c->wait_for_safe();
+
+    int ret = c->get_return_value();
+    c->release();
+
+    if (ret < 0) {
+      cerr << "ERROR: reshard rados operation failed: " << cpp_strerror(-ret) << std::endl;
+      return ret;
+    }
+
+    return 0;
+  }
+
+  int get_completion(librados::AioCompletion **c) {
+    if (aio_completions.size() >= RESHARD_MAX_AIO) {
+      int ret = wait_next_completion();
+      if (ret < 0) {
+        return ret;
+      }
+    }
+
+    *c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
+    aio_completions.push_back(*c);
+
+    return 0;
+  }
 
 public:
-  BucketReshardShard(RGWRados *_store, RGWBucketInfo& _bucket_info, int _num_shard) : store(_store), bucket_info(_bucket_info), bs(store) {
+  BucketReshardShard(RGWRados *_store, RGWBucketInfo& _bucket_info,
+                     int _num_shard,
+                     deque<librados::AioCompletion *>& _completions) : store(_store), bucket_info(_bucket_info), bs(store),
+                                                                       aio_completions(_completions) {
     num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1);
     bs.init(bucket_info.bucket, num_shard);
   }
 
+  int get_num_shard() {
+    return num_shard;
+  }
+
   int add_entry(rgw_cls_bi_entry& entry, bool account, uint8_t category,
-                rgw_bucket_category_stats entry_stats) {
+                const rgw_bucket_category_stats& entry_stats) {
     entries.push_back(entry);
     if (account) {
       rgw_bucket_category_stats& target = stats[category];
@@ -2053,7 +2093,13 @@ public:
       store->bi_put(op, bs, entry);
     }
     cls_rgw_bucket_update_stats(op, false, stats);
-    int ret = bs.index_ctx.operate(bs.bucket_obj, &op);
+
+    librados::AioCompletion *c;
+    int ret = get_completion(&c);
+    if (ret < 0) {
+      return ret;
+    }
+    ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &op);
     if (ret < 0) {
       std::cerr << "ERROR: failed to store entries in target bucket shard (bs=" << bs.bucket << "/" << bs.shard_id << ") error=" << cpp_strerror(-ret) << std::endl;
       return ret;
@@ -2062,6 +2108,75 @@ public:
     stats.clear();
     return 0;
   }
+
+  int wait_all_aio() {
+    int ret = 0;
+    while (!aio_completions.empty()) {
+      int r = wait_next_completion();
+      if (r < 0) {
+        ret = r;
+      }
+    }
+    return ret;
+  }
+};
+
+class BucketReshardManager {
+  RGWRados *store;
+  RGWBucketInfo& target_bucket_info;
+  deque<librados::AioCompletion *> completions;
+  int num_target_shards;
+  vector<BucketReshardShard *> target_shards;
+
+public:
+  BucketReshardManager(RGWRados *_store, RGWBucketInfo& _target_bucket_info, int _num_target_shards) : store(_store), target_bucket_info(_target_bucket_info),
+                                                                                                       num_target_shards(_num_target_shards) {
+    target_shards.resize(num_target_shards);
+    for (int i = 0; i < num_target_shards; ++i) {
+      target_shards[i] = new BucketReshardShard(store, target_bucket_info, i, completions);
+    }
+  }
+
+  ~BucketReshardManager() {
+    for (auto& shard : target_shards) {
+      int ret = shard->wait_all_aio();
+      if (ret < 0) {
+        ldout(store->ctx(), 20) << __func__ << ": shard->wait_all_aio() returned ret=" << ret << dendl;
+      }
+    }
+  }
+
+  int add_entry(int shard_index,
+                rgw_cls_bi_entry& entry, bool account, uint8_t category,
+                const rgw_bucket_category_stats& entry_stats) {
+    int ret = target_shards[shard_index]->add_entry(entry, account, category, entry_stats);
+    if (ret < 0) {
+      cerr << "ERROR: target_shards.add_entry(" << entry.idx << ") returned error: " << cpp_strerror(-ret) << std::endl;
+      return ret;
+    }
+    return 0;
+  }
+
+  int finish() {
+    int ret = 0;
+    for (auto& shard : target_shards) {
+      int r = shard->flush();
+      if (r < 0) {
+        cerr << "ERROR: target_shards[" << shard->get_num_shard() << "].flush() returned error: " << cpp_strerror(-r) << std::endl;
+        ret = r;
+      }
+    }
+    for (auto& shard : target_shards) {
+      int r = shard->wait_all_aio();
+      if (r < 0) {
+        cerr << "ERROR: target_shards[" << shard->get_num_shard() << "].wait_all_aio() returned error: " << cpp_strerror(-r) << std::endl;
+        ret = r;
+      }
+      delete shard;
+    }
+    target_shards.clear();
+    return ret;
+  }
 };
 
 int main(int argc, char **argv)
@@ -4961,10 +5076,8 @@ next:
     }
 
     int num_target_shards = (new_bucket_info.num_shards > 0 ? new_bucket_info.num_shards : 1);
-    BucketReshardShard *target_shards[num_target_shards];
-    for (int i = 0; i < num_target_shards; ++i) {
-      target_shards[i] = new BucketReshardShard(store, new_bucket_info, i);
-    }
+
+    BucketReshardManager target_shards_mgr(store, new_bucket_info, num_target_shards);
 
     formatter->open_array_section("entries");
 
@@ -5000,9 +5113,8 @@ next:
 
           int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
 
-          ret = target_shards[shard_index]->add_entry(entry, account, category, stats);
+          ret = target_shards_mgr.add_entry(shard_index, entry, account, category, stats);
           if (ret < 0) {
-            cerr << "ERROR: target_shards.add_entry(" << key << ") returned error: " << cpp_strerror(-ret) << std::endl;
             return ret;
           }
         }
@@ -5013,13 +5125,10 @@ next:
     formatter->close_section();
     formatter->flush(cout);
 
-    for (int i = 0; i < num_target_shards; ++i) {
-      int ret = target_shards[i]->flush();
-      if (ret < 0) {
-        cerr << "ERROR: target_shards[" << i << "].flush() returned error: " << cpp_strerror(-ret) << std::endl;
-        return ret;
-      }
-      delete target_shards[i];
+    ret = target_shards_mgr.finish();
+    if (ret < 0) {
+      cerr << "ERROR: failed to reshard" << std::endl;
+      return EIO;
     }
 
     bucket_op.set_bucket_id(new_bucket_info.bucket.bucket_id);