]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw_admin: bucket rehsrading, initial work
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 26 Sep 2016 23:09:34 +0000 (16:09 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 30 Sep 2016 17:16:36 +0000 (10:16 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
Conflicts:
src/rgw/rgw_admin.cc

src/rgw/rgw_admin.cc

index d2c04a8fac6eb9b048441bb1f37b898c2f0e86ea..4eed8ed2b0230394a60f9fe88d55b825734761a7 100644 (file)
@@ -1,4 +1,3 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 // vim: ts=8 sw=2 smarttab
 
 #include <errno.h>
@@ -224,6 +223,7 @@ enum {
   OPT_BUCKET_CHECK,
   OPT_BUCKET_RM,
   OPT_BUCKET_REWRITE,
+  OPT_BUCKET_RESHARD,
   OPT_POLICY,
   OPT_POOL_ADD,
   OPT_POOL_RM,
@@ -367,6 +367,8 @@ static int get_cmd(const char *cmd, const char *prev_cmd, bool *need_more)
       return OPT_BUCKET_RM;
     if (strcmp(cmd, "rewrite") == 0)
       return OPT_BUCKET_REWRITE;
+    if (strcmp(cmd, "reshard") == 0)
+      return OPT_BUCKET_RESHARD;
     if (strcmp(cmd, "check") == 0)
       return OPT_BUCKET_CHECK;
   } else if (strcmp(prev_cmd, "log") == 0) {
@@ -633,16 +635,16 @@ public:
 };
 
 static int init_bucket(const string& bucket_name, const string& bucket_id,
-                       RGWBucketInfo& bucket_info, rgw_bucket& bucket)
+                       RGWBucketInfo& bucket_info, rgw_bucket& bucket, map<string, bufferlist> *pattrs = NULL)
 {
   if (!bucket_name.empty()) {
     RGWObjectCtx obj_ctx(store);
     int r;
     if (bucket_id.empty()) {
-      r = store->get_bucket_info(obj_ctx, bucket_name, bucket_info, NULL);
+      r = store->get_bucket_info(obj_ctx, bucket_name, bucket_info, NULL, pattrs);
     } else {
       string bucket_instance_id = bucket_name + ":" + bucket_id;
-      r = store->get_bucket_instance_info(obj_ctx, bucket_instance_id, bucket_info, NULL, NULL);
+      r = store->get_bucket_instance_info(obj_ctx, bucket_instance_id, bucket_info, NULL, pattrs);
     }
     if (r < 0) {
       cerr << "could not get bucket info for bucket=" << bucket_name << std::endl;
@@ -1082,6 +1084,51 @@ int do_check_object_locator(const string& bucket_name, bool fix, bool remove_bad
   return 0;
 }
 
+#define RESHARD_SHARD_WINDOW 64
+
+class BucketReshardShard {
+  RGWRados *store;
+  RGWBucketInfo& bucket_info;
+  int num_shard;
+  RGWRados::BucketShard bs;
+  vector<rgw_cls_bi_entry> entries;
+
+public:
+  BucketReshardShard(RGWRados *_store, RGWBucketInfo& _bucket_info, int _num_shard) : store(_store), bucket_info(_bucket_info), bs(store) {
+    num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1);
+    bs.init(bucket_info.bucket, num_shard);
+  }
+
+  int add_entry(rgw_cls_bi_entry& entry) {
+    entries.push_back(entry);
+    if (entries.size() >= RESHARD_SHARD_WINDOW) {
+      int ret = flush();
+      if (ret < 0) {
+        return ret;
+      }
+    }
+    return 0;
+  }
+  int flush() {
+    if (entries.size() == 0) {
+      return 0;
+    }
+
+    librados::ObjectWriteOperation op;
+    for (vector<rgw_cls_bi_entry>::iterator iter = entries.begin(); iter != entries.end(); ++iter) {
+      rgw_cls_bi_entry& entry = *iter;
+      store->bi_put(op, bs, entry);
+    }
+    int ret = bs.index_ctx.operate(bs.bucket_obj, &op);
+    if (ret < 0) {
+      std::cerr << "ERROR: failed to store entries in target bucket shard (bs=" << bs.bucket << "/" << bs.shard_id << ") error=" << cpp_strerror(-ret) << std::endl;
+      return ret;
+    }
+    entries.clear();
+    return 0;
+  }
+};
+
 
 int main(int argc, char **argv) 
 {
@@ -2286,6 +2333,10 @@ next:
   }
 
   if (opt_cmd == OPT_BI_LIST) {
+    if (bucket_name.empty()) {
+      cerr << "ERROR: bucket name not specified" << std::endl;
+      return EINVAL;
+    }
     RGWBucketInfo bucket_info;
     int ret = init_bucket(bucket_name, bucket_id, bucket_info, bucket);
     if (ret < 0) {
@@ -2299,25 +2350,36 @@ next:
       max_entries = 1000;
     }
 
+    int max_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
 
-    formatter->open_array_section("entries");
-
-    do {
-      entries.clear();
-      ret = store->bi_list(bucket, object, marker, max_entries, &entries, &is_truncated);
+    for (int i = 0; i < max_shards; i++) {
+      RGWRados::BucketShard bs(store);
+      int shard_id = (bucket_info.num_shards > 0  ? i : -1);
+      int ret = bs.init(bucket, shard_id);
       if (ret < 0) {
-        cerr << "ERROR: bi_list(): " << cpp_strerror(-ret) << std::endl;
+        cerr << "ERROR: bs.init(bucket=" << bucket << ", shard=" << shard_id << "): " << cpp_strerror(-ret) << std::endl;
         return -ret;
       }
 
-      list<rgw_cls_bi_entry>::iterator iter;
-      for (iter = entries.begin(); iter != entries.end(); ++iter) {
-        rgw_cls_bi_entry& entry = *iter;
-        encode_json("entry", entry, formatter);
-        marker = entry.idx;
-      }
-      formatter->flush(cout);
-    } while (is_truncated);
+      formatter->open_array_section("entries");
+
+      do {
+        entries.clear();
+        ret = store->bi_list(bs, marker, max_entries, &entries, &is_truncated);
+        if (ret < 0) {
+          cerr << "ERROR: bi_list(): " << cpp_strerror(-ret) << std::endl;
+          return -ret;
+        }
+
+        list<rgw_cls_bi_entry>::iterator iter;
+        for (iter = entries.begin(); iter != entries.end(); ++iter) {
+          rgw_cls_bi_entry& entry = *iter;
+          encode_json("entry", entry, formatter);
+          marker = entry.idx;
+        }
+        formatter->flush(cout);
+      } while (is_truncated);
+    }
     formatter->close_section();
     formatter->flush(cout);
   }
@@ -2477,6 +2539,107 @@ next:
     formatter->flush(cout);
   }
 
+  if (opt_cmd == OPT_BUCKET_RESHARD) {
+    if (bucket_name.empty()) {
+      cerr << "ERROR: bucket not specified" << std::endl;
+      return EINVAL;
+    }
+
+    RGWBucketInfo bucket_info;
+    map<string, bufferlist> attrs;
+    int ret = init_bucket(bucket_name, bucket_id, bucket_info, bucket, &attrs);
+    if (ret < 0) {
+      cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+
+    int num_source_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
+
+    RGWBucketInfo new_bucket_info(bucket_info);
+    store->create_bucket_id(&new_bucket_info.bucket.bucket_id);
+    new_bucket_info.bucket.oid.clear();
+
+    new_bucket_info.num_shards = num_shards;
+    new_bucket_info.objv_tracker.clear();
+
+    ret = store->init_bucket_index(new_bucket_info.bucket, new_bucket_info.num_shards);
+    if (ret < 0) {
+      cerr << "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+
+    ret = store->put_bucket_instance_info(new_bucket_info, true, 0, &attrs);
+    if (ret < 0) {
+      cerr << "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret) << std::endl;
+      return -ret;
+    }
+    list<rgw_cls_bi_entry> entries;
+
+    if (max_entries < 0) {
+      max_entries = 1000;
+    }
+
+    int num_target_shards = (new_bucket_info.num_shards > 0 ? new_bucket_info.num_shards : 1);
+    BucketReshardShard *target_shards[num_target_shards];
+    for (int i = 0; i < num_target_shards; ++i) {
+      target_shards[i] = new BucketReshardShard(store, new_bucket_info, i);
+    }
+
+    formatter->open_array_section("entries");
+
+    for (int i = 0; i < num_source_shards; ++i) {
+      bool is_truncated = true;
+      marker.clear();
+      while (is_truncated) {
+        entries.clear();
+        ret = store->bi_list(bucket, i, marker, max_entries, &entries, &is_truncated);
+        if (ret < 0) {
+          cerr << "ERROR: bi_list(): " << cpp_strerror(-ret) << std::endl;
+          return -ret;
+        }
+
+        list<rgw_cls_bi_entry>::iterator iter;
+        for (iter = entries.begin(); iter != entries.end(); ++iter) {
+          rgw_cls_bi_entry& entry = *iter;
+          encode_json("entry", entry, formatter);
+          marker = entry.idx;
+
+          int target_shard_id;
+          cls_rgw_obj_key cls_key;
+          entry.get_key(&cls_key);
+          rgw_obj_key key(cls_key);
+          rgw_obj obj(new_bucket_info.bucket, key);
+          int ret = store->get_target_shard_id(new_bucket_info, obj.get_hash_object(), &target_shard_id);
+          if (ret < 0) {
+            cerr << "ERROR: get_target_shard_id() returned ret=" << ret << std::endl;
+            return ret;
+          }
+
+          int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
+
+          ret = target_shards[shard_index]->add_entry(entry);
+          if (ret < 0) {
+            cerr << "ERROR: target_shards.add_entry(" << key << ") returned error: " << cpp_strerror(-ret) << std::endl;
+            return ret;
+          }
+        }
+
+        formatter->flush(cout);
+      }
+    }
+    formatter->close_section();
+    formatter->flush(cout);
+
+    for (int i = 0; i < num_target_shards; ++i) {
+      int ret = target_shards[i]->flush();
+      if (ret < 0) {
+        cerr << "ERROR: target_shards[" << i << "].flush() returned error: " << cpp_strerror(-ret) << std::endl;
+        return ret;
+      }
+      delete target_shards[i];
+    }
+  }
+
   if (opt_cmd == OPT_OBJECT_UNLINK) {
     RGWBucketInfo bucket_info;
     int ret = init_bucket(bucket_name, bucket_id, bucket_info, bucket);