]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: move resharding code to BucketReshard class
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 9 May 2017 22:19:10 +0000 (15:19 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Mon, 5 Jun 2017 20:17:43 +0000 (13:17 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_admin.cc
src/rgw/rgw_bucket.h
src/rgw/rgw_rados.cc
src/rgw/rgw_reshard.cc
src/rgw/rgw_reshard.h

index 478f1124c1d9994cc0dd7d3a587a05943fa833e4..a5692985dbdb29ffe47a5dbd42ca95e1350b89e7 100644 (file)
@@ -191,9 +191,9 @@ void _usage()
   cout << "  role-policy get            get the specified inline policy document embedded with the given role\n";
   cout << "  role-policy delete         delete policy attached to a role\n";
   cout << "  reshard add                schedule a resharding of a bucket\n";
-  cout << "  reshard list                  list all bucket resharding or scheduled to be reshared\n";
-  cout << "  reshard execute         execute resharding of  a bucket \n";
-  cout << "  reshard cancel           cancel resharding a bucket\n";
+  cout << "  reshard list               list all bucket resharding or scheduled to be reshared\n";
+  cout << "  reshard execute            execute resharding of  a bucket \n";
+  cout << "  reshard cancel             cancel resharding a bucket\n";
   cout << "options:\n";
   cout << "   --tenant=<tenant>         tenant name\n";
   cout << "   --uid=<id>                user id\n";
@@ -2215,176 +2215,6 @@ static void parse_tier_config_param(const string& s, map<string, string, ltstr_n
   }
 }
 
-#define RESHARD_SHARD_WINDOW 64
-#define RESHARD_MAX_AIO 128
-
-class BucketReshardShard {
-  RGWRados *store;
-  RGWBucketInfo& bucket_info;
-  int num_shard;
-  RGWRados::BucketShard bs;
-  vector<rgw_cls_bi_entry> entries;
-  map<uint8_t, rgw_bucket_category_stats> stats;
-  deque<librados::AioCompletion *>& aio_completions;
-
-  int wait_next_completion() {
-    librados::AioCompletion *c = aio_completions.front();
-    aio_completions.pop_front();
-
-    c->wait_for_safe();
-
-    int ret = c->get_return_value();
-    c->release();
-
-    if (ret < 0) {
-      cerr << "ERROR: reshard rados operation failed: " << cpp_strerror(-ret) << std::endl;
-      return ret;
-    }
-
-    return 0;
-  }
-
-  int get_completion(librados::AioCompletion **c) {
-    if (aio_completions.size() >= RESHARD_MAX_AIO) {
-      int ret = wait_next_completion();
-      if (ret < 0) {
-        return ret;
-      }
-    }
-
-    *c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
-    aio_completions.push_back(*c);
-
-    return 0;
-  }
-
-public:
-  BucketReshardShard(RGWRados *_store, RGWBucketInfo& _bucket_info,
-                     int _num_shard,
-                     deque<librados::AioCompletion *>& _completions) : store(_store), bucket_info(_bucket_info), bs(store),
-                                                                       aio_completions(_completions) {
-    num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1);
-    bs.init(bucket_info.bucket, num_shard);
-  }
-
-  int get_num_shard() {
-    return num_shard;
-  }
-
-  int add_entry(rgw_cls_bi_entry& entry, bool account, uint8_t category,
-                const rgw_bucket_category_stats& entry_stats) {
-    entries.push_back(entry);
-    if (account) {
-      rgw_bucket_category_stats& target = stats[category];
-      target.num_entries += entry_stats.num_entries;
-      target.total_size += entry_stats.total_size;
-      target.total_size_rounded += entry_stats.total_size_rounded;
-    }
-    if (entries.size() >= RESHARD_SHARD_WINDOW) {
-      int ret = flush();
-      if (ret < 0) {
-        return ret;
-      }
-    }
-    return 0;
-  }
-  int flush() {
-    if (entries.size() == 0) {
-      return 0;
-    }
-
-    librados::ObjectWriteOperation op;
-    for (auto& entry : entries) {
-      store->bi_put(op, bs, entry);
-    }
-    cls_rgw_bucket_update_stats(op, false, stats);
-
-    librados::AioCompletion *c;
-    int ret = get_completion(&c);
-    if (ret < 0) {
-      return ret;
-    }
-    ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &op);
-    if (ret < 0) {
-      std::cerr << "ERROR: failed to store entries in target bucket shard (bs=" << bs.bucket << "/" << bs.shard_id << ") error=" << cpp_strerror(-ret) << std::endl;
-      return ret;
-    }
-    entries.clear();
-    stats.clear();
-    return 0;
-  }
-
-  int wait_all_aio() {
-    int ret = 0;
-    while (!aio_completions.empty()) {
-      int r = wait_next_completion();
-      if (r < 0) {
-        ret = r;
-      }
-    }
-    return ret;
-  }
-};
-
-class BucketReshardManager {
-  RGWRados *store;
-  RGWBucketInfo& target_bucket_info;
-  deque<librados::AioCompletion *> completions;
-  int num_target_shards;
-  vector<BucketReshardShard *> target_shards;
-
-public:
-  BucketReshardManager(RGWRados *_store, RGWBucketInfo& _target_bucket_info, int _num_target_shards) : store(_store), target_bucket_info(_target_bucket_info),
-                                                                                                       num_target_shards(_num_target_shards) {
-    target_shards.resize(num_target_shards);
-    for (int i = 0; i < num_target_shards; ++i) {
-      target_shards[i] = new BucketReshardShard(store, target_bucket_info, i, completions);
-    }
-  }
-
-  ~BucketReshardManager() {
-    for (auto& shard : target_shards) {
-      int ret = shard->wait_all_aio();
-      if (ret < 0) {
-        ldout(store->ctx(), 20) << __func__ << ": shard->wait_all_aio() returned ret=" << ret << dendl;
-      }
-    }
-  }
-
-  int add_entry(int shard_index,
-                rgw_cls_bi_entry& entry, bool account, uint8_t category,
-                const rgw_bucket_category_stats& entry_stats) {
-    int ret = target_shards[shard_index]->add_entry(entry, account, category, entry_stats);
-    if (ret < 0) {
-      cerr << "ERROR: target_shards.add_entry(" << entry.idx << ") returned error: " << cpp_strerror(-ret) << std::endl;
-      return ret;
-    }
-    return 0;
-  }
-
-  int finish() {
-    int ret = 0;
-    for (auto& shard : target_shards) {
-      int r = shard->flush();
-      if (r < 0) {
-        cerr << "ERROR: target_shards[" << shard->get_num_shard() << "].flush() returned error: " << cpp_strerror(-r) << std::endl;
-        ret = r;
-      }
-    }
-    for (auto& shard : target_shards) {
-      int r = shard->wait_all_aio();
-      if (r < 0) {
-        cerr << "ERROR: target_shards[" << shard->get_num_shard() << "].wait_all_aio() returned error: " << cpp_strerror(-r) << std::endl;
-        ret = r;
-      }
-      delete shard;
-    }
-    target_shards.clear();
-    return ret;
-  }
-};
-
-
 int check_reshard_bucket_params(RGWRados *store,
                                const string& bucket_name,
                                const string& tenant,
@@ -2427,152 +2257,6 @@ int check_reshard_bucket_params(RGWRados *store,
   return 0;
 }
 
-int create_new_bucket_instance(RGWRados *store,
-                              int new_num_shards,
-                              const RGWBucketInfo& bucket_info,
-                              map<string, bufferlist>& attrs,
-                              RGWBucketInfo& new_bucket_info)
-{
-
-  store->create_bucket_id(&new_bucket_info.bucket.bucket_id);
-  new_bucket_info.bucket.oid.clear();
-
-  new_bucket_info.num_shards = new_num_shards;
-  new_bucket_info.objv_tracker.clear();
-
-  int ret = store->init_bucket_index(new_bucket_info, new_bucket_info.num_shards);
-  if (ret < 0) {
-    cerr << "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret) << std::endl;
-    return -ret;
-  }
-
-  ret = store->put_bucket_instance_info(new_bucket_info, true, real_time(), &attrs);
-  if (ret < 0) {
-    cerr << "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret) << std::endl;
-    return -ret;
-  }
-
-  return 0;
-}
-
-int reshard_bucket(RGWRados *store,
-                  Formatter *formatter,
-                  int num_shards,
-                  rgw_bucket& bucket,
-                  RGWBucketInfo& bucket_info,
-                  RGWBucketInfo& new_bucket_info,
-                  int max_entries,
-                  RGWBucketAdminOpState& bucket_op,
-                  bool verbose)
-{
-
-  int ret = 0;
-
-  cout << "*** NOTICE: operation will not remove old bucket index objects ***" << std::endl;
-  cout << "***         these will need to be removed manually             ***" << std::endl;
-  cout << "old bucket instance id: " << bucket_info.bucket.bucket_id << std::endl;
-  cout << "new bucket instance id: " << new_bucket_info.bucket.bucket_id << std::endl;
-
-  list<rgw_cls_bi_entry> entries;
-
-  if (max_entries < 0) {
-    max_entries = 1000;
-  }
-
-  int num_target_shards = (new_bucket_info.num_shards > 0 ? new_bucket_info.num_shards : 1);
-
-  BucketReshardManager target_shards_mgr(store, new_bucket_info, num_target_shards);
-
-  if (verbose) {
-    formatter->open_array_section("entries");
-  }
-
-  uint64_t total_entries = 0;
-
-  if (!verbose) {
-    cout << "total entries:";
-  }
-
-  int num_source_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
-  string marker;
-  for (int i = 0; i < num_source_shards; ++i) {
-    bool is_truncated = true;
-    marker.clear();
-    while (is_truncated) {
-      entries.clear();
-      ret = store->bi_list(bucket, i, string(), marker, max_entries, &entries, &is_truncated);
-      if (ret < 0) {
-       cerr << "ERROR: bi_list(): " << cpp_strerror(-ret) << std::endl;
-       return -ret;
-      }
-
-      list<rgw_cls_bi_entry>::iterator iter;
-      for (iter = entries.begin(); iter != entries.end(); ++iter) {
-       rgw_cls_bi_entry& entry = *iter;
-       if (verbose) {
-         formatter->open_object_section("entry");
-
-         encode_json("shard_id", i, formatter);
-         encode_json("num_entry", total_entries, formatter);
-         encode_json("entry", entry, formatter);
-       }
-       total_entries++;
-
-       marker = entry.idx;
-
-       int target_shard_id;
-       cls_rgw_obj_key cls_key;
-       uint8_t category;
-       rgw_bucket_category_stats stats;
-       bool account = entry.get_info(&cls_key, &category, &stats);
-       rgw_obj_key key(cls_key);
-       rgw_obj obj(new_bucket_info.bucket, key);
-       int ret = store->get_target_shard_id(new_bucket_info, obj.get_hash_object(), &target_shard_id);
-       if (ret < 0) {
-         cerr << "ERROR: get_target_shard_id() returned ret=" << ret << std::endl;
-         return ret;
-       }
-
-       int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
-
-       ret = target_shards_mgr.add_entry(shard_index, entry, account, category, stats);
-       if (ret < 0) {
-         return ret;
-       }
-       if (verbose) {
-         formatter->close_section();
-         formatter->flush(cout);
-         formatter->flush(cout);
-       } else if (!(total_entries % 1000)) {
-         cout << " " << total_entries;
-       }
-      }
-    }
-  }
-  if (verbose) {
-    formatter->close_section();
-    formatter->flush(cout);
-  } else {
-    cout << " " << total_entries << std::endl;
-  }
-
-  ret = target_shards_mgr.finish();
-  if (ret < 0) {
-    cerr << "ERROR: failed to reshard" << std::endl;
-    return EIO;
-  }
-
-  bucket_op.set_bucket_id(new_bucket_info.bucket.bucket_id);
-  bucket_op.set_user_id(new_bucket_info.owner);
-  string err;
-  int r = RGWBucketAdminOp::link(store, bucket_op, &err);
-  if (r < 0) {
-    cerr << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << err << "; " << cpp_strerror(-r) << std::endl;
-    return -r;
-  }
-  return 0;
-}
-
 #ifdef BUILDING_FOR_EMBEDDED
 extern "C" int cephd_rgw_admin(int argc, const char **argv)
 #else
@@ -5860,22 +5544,10 @@ next:
       return ret;
     }
 
-    RGWBucketInfo new_bucket_info(bucket_info);
-    ret = create_new_bucket_instance(store, num_shards, bucket_info, attrs,
-                                    new_bucket_info);
-    if (ret < 0) {
-      return ret;
-    }
+    RGWBucketReshard br(store, bucket_info, attrs);
 
-    return reshard_bucket(store,
-                         formatter,
-                         num_shards,
-                         bucket,
-                         bucket_info,
-                         new_bucket_info,
-                         max_entries,
-                         bucket_op,
-                         verbose);
+    return br.execute(num_shards, max_entries,
+                      verbose, &cout, formatter);
   }
 
   if (opt_cmd == OPT_RESHARD_ADD) {
@@ -5899,7 +5571,7 @@ next:
 
     int num_source_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
 
-    RGWReshard reshard(g_ceph_context, store);
+    RGWReshard reshard(store);
     cls_rgw_reshard_entry entry;
     entry.time = real_clock::now();
     entry.tenant = tenant;
@@ -5921,7 +5593,7 @@ next:
       max_entries = 1000;
     }
 
-    RGWReshard reshard(g_ceph_context, store);
+    RGWReshard reshard(store);
 
     formatter->open_array_section("reshard");
     do {
@@ -5944,9 +5616,9 @@ next:
     return 0;
   }
 
+#if 0
   if (opt_cmd == OPT_RESHARD_EXECUTE) {
-    RGWReshard reshard(g_ceph_context, store);
-
+    RGWReshard reshard(store);
     if (bucket_name.empty()) {
       cerr << "ERROR: bucket not specified" << std::endl;
       return EINVAL;
@@ -5961,6 +5633,8 @@ next:
       return -ret;
     }
 
+    RGWBucketReshard bucket_reshard(store, bucket_info);
+
     cls_rgw_reshard_entry entry;
     entry.tenant = tenant;
     entry.bucket_name = bucket_name;
@@ -6016,9 +5690,10 @@ next:
 
     return 0;
   }
+#endif
 
   if (opt_cmd == OPT_RESHARD_CANCEL) {
-    RGWReshard reshard(g_ceph_context, store);
+    RGWReshard reshard(store);
 
     if (bucket_name.empty()) {
       cerr << "ERROR: bucket not specified" << std::endl;
index 14c472faa8cd083994b4ef93062104a13cca2fd9..b4c32b2c570e26f2e7e315ce7c8637d7b030e1be 100644 (file)
@@ -210,11 +210,11 @@ struct RGWBucketAdminOpState {
 
   void set_max_aio(int value) { max_aio = value; }
 
-  void set_user_id(rgw_user& user_id) {
+  void set_user_id(const rgw_user& user_id) {
     if (!user_id.empty())
       uid = user_id;
   }
-  void set_bucket_name(std::string& bucket_str) {
+  void set_bucket_name(const std::string& bucket_str) {
     bucket_name = bucket_str; 
   }
   void set_object(std::string& object_str) {
index 003c1a0db7c9a1b419f893a4289904da38916ecd..0f88e99558c9c0f35fe0db2fd7c34d5c682ccf63 100644 (file)
@@ -4270,7 +4270,7 @@ int RGWRados::init_complete()
     obj_tombstone_cache = new tombstone_cache_t(cct->_conf->rgw_obj_tombstone_cache_size);
   }
 
-  reshard = new RGWReshard(cct, this);
+  reshard = new RGWReshard(this);
 
   return ret;
 }
index 7c2fb7a0f7ffeb37a588fd3a6fdee3377300eb33..cac34a3e4327cd2902eecaa10a4581a7e53a56b9 100644 (file)
@@ -2,10 +2,12 @@
 // vim: ts=8 sw=2 smarttab
 
 #include "rgw_rados.h"
+#include "rgw_bucket.h"
 #include "rgw_reshard.h"
 #include "cls/rgw/cls_rgw_client.h"
 #include "cls/lock/cls_lock_client.h"
 #include "common/errno.h"
+#include "common/ceph_json.h"
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
@@ -14,8 +16,179 @@ const string reshard_oid = "reshard";
 const string reshard_lock_name = "reshard_process";
 const string bucket_instance_lock_name = "bucket_instance_lock";
 
-RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info) :
-                                                     store(_store), bucket_info(_bucket_info),
+using namespace std;
+
+#define RESHARD_SHARD_WINDOW 64
+#define RESHARD_MAX_AIO 128
+
+class BucketReshardShard {
+  RGWRados *store;
+  const RGWBucketInfo& bucket_info;
+  int num_shard;
+  RGWRados::BucketShard bs;
+  vector<rgw_cls_bi_entry> entries;
+  map<uint8_t, rgw_bucket_category_stats> stats;
+  deque<librados::AioCompletion *>& aio_completions;
+
+  int wait_next_completion() {
+    librados::AioCompletion *c = aio_completions.front();
+    aio_completions.pop_front();
+
+    c->wait_for_safe();
+
+    int ret = c->get_return_value();
+    c->release();
+
+    if (ret < 0) {
+      cerr << "ERROR: reshard rados operation failed: " << cpp_strerror(-ret) << std::endl;
+      return ret;
+    }
+
+    return 0;
+  }
+
+  int get_completion(librados::AioCompletion **c) {
+    if (aio_completions.size() >= RESHARD_MAX_AIO) {
+      int ret = wait_next_completion();
+      if (ret < 0) {
+        return ret;
+      }
+    }
+
+    *c = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
+    aio_completions.push_back(*c);
+
+    return 0;
+  }
+
+public:
+  BucketReshardShard(RGWRados *_store, const RGWBucketInfo& _bucket_info,
+                     int _num_shard,
+                     deque<librados::AioCompletion *>& _completions) : store(_store), bucket_info(_bucket_info), bs(store),
+                                                                       aio_completions(_completions) {
+    num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1);
+    bs.init(bucket_info.bucket, num_shard);
+  }
+
+  int get_num_shard() {
+    return num_shard;
+  }
+
+  int add_entry(rgw_cls_bi_entry& entry, bool account, uint8_t category,
+                const rgw_bucket_category_stats& entry_stats) {
+    entries.push_back(entry);
+    if (account) {
+      rgw_bucket_category_stats& target = stats[category];
+      target.num_entries += entry_stats.num_entries;
+      target.total_size += entry_stats.total_size;
+      target.total_size_rounded += entry_stats.total_size_rounded;
+    }
+    if (entries.size() >= RESHARD_SHARD_WINDOW) {
+      int ret = flush();
+      if (ret < 0) {
+        return ret;
+      }
+    }
+    return 0;
+  }
+  int flush() {
+    if (entries.size() == 0) {
+      return 0;
+    }
+
+    librados::ObjectWriteOperation op;
+    for (auto& entry : entries) {
+      store->bi_put(op, bs, entry);
+    }
+    cls_rgw_bucket_update_stats(op, false, stats);
+
+    librados::AioCompletion *c;
+    int ret = get_completion(&c);
+    if (ret < 0) {
+      return ret;
+    }
+    ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &op);
+    if (ret < 0) {
+      std::cerr << "ERROR: failed to store entries in target bucket shard (bs=" << bs.bucket << "/" << bs.shard_id << ") error=" << cpp_strerror(-ret) << std::endl;
+      return ret;
+    }
+    entries.clear();
+    stats.clear();
+    return 0;
+  }
+
+  int wait_all_aio() {
+    int ret = 0;
+    while (!aio_completions.empty()) {
+      int r = wait_next_completion();
+      if (r < 0) {
+        ret = r;
+      }
+    }
+    return ret;
+  }
+};
+
+class BucketReshardManager {
+  RGWRados *store;
+  const RGWBucketInfo& target_bucket_info;
+  deque<librados::AioCompletion *> completions;
+  int num_target_shards;
+  vector<BucketReshardShard *> target_shards;
+
+public:
+  BucketReshardManager(RGWRados *_store, const RGWBucketInfo& _target_bucket_info, int _num_target_shards) : store(_store), target_bucket_info(_target_bucket_info),
+                                                                                                       num_target_shards(_num_target_shards) {
+    target_shards.resize(num_target_shards);
+    for (int i = 0; i < num_target_shards; ++i) {
+      target_shards[i] = new BucketReshardShard(store, target_bucket_info, i, completions);
+    }
+  }
+
+  ~BucketReshardManager() {
+    for (auto& shard : target_shards) {
+      int ret = shard->wait_all_aio();
+      if (ret < 0) {
+        ldout(store->ctx(), 20) << __func__ << ": shard->wait_all_aio() returned ret=" << ret << dendl;
+      }
+    }
+  }
+
+  int add_entry(int shard_index,
+                rgw_cls_bi_entry& entry, bool account, uint8_t category,
+                const rgw_bucket_category_stats& entry_stats) {
+    int ret = target_shards[shard_index]->add_entry(entry, account, category, entry_stats);
+    if (ret < 0) {
+      cerr << "ERROR: target_shards.add_entry(" << entry.idx << ") returned error: " << cpp_strerror(-ret) << std::endl;
+      return ret;
+    }
+    return 0;
+  }
+
+  int finish() {
+    int ret = 0;
+    for (auto& shard : target_shards) {
+      int r = shard->flush();
+      if (r < 0) {
+        cerr << "ERROR: target_shards[" << shard->get_num_shard() << "].flush() returned error: " << cpp_strerror(-r) << std::endl;
+        ret = r;
+      }
+    }
+    for (auto& shard : target_shards) {
+      int r = shard->wait_all_aio();
+      if (r < 0) {
+        cerr << "ERROR: target_shards[" << shard->get_num_shard() << "].wait_all_aio() returned error: " << cpp_strerror(-r) << std::endl;
+        ret = r;
+      }
+      delete shard;
+    }
+    target_shards.clear();
+    return ret;
+  }
+};
+
+RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info, const map<string, bufferlist>& _bucket_attrs) :
+                                                     store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs),
                                                      reshard_lock(reshard_lock_name) {
   const rgw_bucket& b = bucket_info.bucket;                                                       
   reshard_oid = b.tenant + (b.tenant.empty() ? "" : ":") + b.name + ":" + b.bucket_id;
@@ -73,10 +246,189 @@ int RGWBucketReshard::clear_resharding()
   return 0;
 }
 
-RGWReshard::RGWReshard(CephContext *_cct, RGWRados* _store):cct(_cct), store(_store),
-                                                           instance_lock(bucket_instance_lock_name)
+int RGWBucketReshard::create_new_bucket_instance(int new_num_shards,
+                                                 RGWBucketInfo& new_bucket_info)
+{
+  store->create_bucket_id(&new_bucket_info.bucket.bucket_id);
+  new_bucket_info.bucket.oid.clear();
+
+  new_bucket_info.num_shards = new_num_shards;
+  new_bucket_info.objv_tracker.clear();
+
+  int ret = store->init_bucket_index(new_bucket_info, new_bucket_info.num_shards);
+  if (ret < 0) {
+    cerr << "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret) << std::endl;
+    return -ret;
+  }
+
+  ret = store->put_bucket_instance_info(new_bucket_info, true, real_time(), &bucket_attrs);
+  if (ret < 0) {
+    cerr << "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret) << std::endl;
+    return -ret;
+  }
+
+  return 0;
+}
+
+int RGWBucketReshard::do_reshard(
+                  int num_shards,
+                  const RGWBucketInfo& new_bucket_info,
+                  int max_entries,
+                   bool verbose,
+                   ostream *out,
+                  Formatter *formatter)
+{
+  rgw_bucket& bucket = bucket_info.bucket;
+
+  int ret = 0;
+
+  if (out) {
+    (*out) << "*** NOTICE: operation will not remove old bucket index objects ***" << std::endl;
+    (*out) << "***         these will need to be removed manually             ***" << std::endl;
+    (*out) << "old bucket instance id: " << bucket_info.bucket.bucket_id << std::endl;
+    (*out) << "new bucket instance id: " << new_bucket_info.bucket.bucket_id << std::endl;
+  }
+
+  list<rgw_cls_bi_entry> entries;
+
+  if (max_entries < 0) {
+    ldout(store->ctx(), 0) << __func__ << ": can't reshard, negative max_entries" << dendl;
+    return -EINVAL;
+  }
+
+  int num_target_shards = (new_bucket_info.num_shards > 0 ? new_bucket_info.num_shards : 1);
+
+  BucketReshardManager target_shards_mgr(store, new_bucket_info, num_target_shards);
+
+  verbose = verbose && (formatter != nullptr);
+
+  if (verbose) {
+    formatter->open_array_section("entries");
+  }
+
+  uint64_t total_entries = 0;
+
+  if (!verbose) {
+    cout << "total entries:";
+  }
+
+  int num_source_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
+  string marker;
+  for (int i = 0; i < num_source_shards; ++i) {
+    bool is_truncated = true;
+    marker.clear();
+    while (is_truncated) {
+      entries.clear();
+      ret = store->bi_list(bucket, i, string(), marker, max_entries, &entries, &is_truncated);
+      if (ret < 0) {
+       derr << "ERROR: bi_list(): " << cpp_strerror(-ret) << dendl;
+       return -ret;
+      }
+
+      list<rgw_cls_bi_entry>::iterator iter;
+      for (iter = entries.begin(); iter != entries.end(); ++iter) {
+       rgw_cls_bi_entry& entry = *iter;
+       if (verbose) {
+         formatter->open_object_section("entry");
+
+         encode_json("shard_id", i, formatter);
+         encode_json("num_entry", total_entries, formatter);
+         encode_json("entry", entry, formatter);
+       }
+       total_entries++;
+
+       marker = entry.idx;
+
+       int target_shard_id;
+       cls_rgw_obj_key cls_key;
+       uint8_t category;
+       rgw_bucket_category_stats stats;
+       bool account = entry.get_info(&cls_key, &category, &stats);
+       rgw_obj_key key(cls_key);
+       rgw_obj obj(new_bucket_info.bucket, key);
+       int ret = store->get_target_shard_id(new_bucket_info, obj.get_hash_object(), &target_shard_id);
+       if (ret < 0) {
+         lderr(store->ctx()) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl;
+         return ret;
+       }
+
+       int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
+
+       ret = target_shards_mgr.add_entry(shard_index, entry, account, category, stats);
+       if (ret < 0) {
+         return ret;
+       }
+       if (verbose) {
+         formatter->close_section();
+         formatter->flush(*out);
+         formatter->flush(*out);
+       } else if (out && !(total_entries % 1000)) {
+         (*out) << " " << total_entries;
+       }
+      }
+    }
+  }
+  if (verbose) {
+    formatter->close_section();
+    formatter->flush(*out);
+  } else if (out) {
+    (*out) << " " << total_entries << std::endl;
+  }
+
+  ret = target_shards_mgr.finish();
+  if (ret < 0) {
+    lderr(store->ctx()) << "ERROR: failed to reshard" << dendl;
+    return EIO;
+  }
+
+  RGWBucketAdminOpState bucket_op;
+
+  bucket_op.set_bucket_name(new_bucket_info.bucket.name);
+  bucket_op.set_bucket_id(new_bucket_info.bucket.bucket_id);
+  bucket_op.set_user_id(new_bucket_info.owner);
+  string err;
+  int r = RGWBucketAdminOp::link(store, bucket_op, &err);
+  if (r < 0) {
+    lderr(store->ctx()) << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << err << "; " << cpp_strerror(-r) << dendl;
+    return -r;
+  }
+  return 0;
+}
+
+int RGWBucketReshard::execute(int num_shards, int max_op_entries,
+                              bool verbose, ostream *out, Formatter *formatter)
+
+{
+  int ret = lock_bucket();
+  if (ret < 0) {
+    return ret;
+  }
+
+  RGWBucketInfo new_bucket_info;
+
+  ret = create_new_bucket_instance(num_shards, new_bucket_info);
+  if (ret < 0) {
+    return ret;
+  }
+
+  ret = do_reshard(num_shards,
+                  new_bucket_info,
+                  max_op_entries,
+                   verbose, out, formatter);
+
+  if (ret < 0) {
+    return ret;
+  }
+
+  unlock_bucket();
+
+  return 0;
+}
+
+
+RGWReshard::RGWReshard(RGWRados* _store): store(_store), instance_lock(bucket_instance_lock_name)
 {
-    max_jobs = cct->_conf->rgw_reshard_max_jobs;
+  max_jobs = store->ctx()->_conf->rgw_reshard_max_jobs;
 }
 
 
@@ -86,7 +438,7 @@ int RGWReshard::add(cls_rgw_reshard_entry& entry)
 
   int ret = l.lock_exclusive(&store->reshard_pool_ctx, reshard_oid);
   if (ret == -EBUSY) {
-    ldout(cct, 0) << "RGWReshard::add failed to acquire lock on " << reshard_oid << dendl;
+    ldout(store->ctx(), 0) << "RGWReshard::add failed to acquire lock on " << reshard_oid << dendl;
     return 0;
   }
   if (ret < 0)
@@ -108,7 +460,7 @@ int RGWReshard::list(string& marker, uint32_t max, std::list<cls_rgw_reshard_ent
 
   int ret = l.lock_shared(&store->reshard_pool_ctx, reshard_oid);
   if (ret == -EBUSY) {
-    ldout(cct, 0) << "RGWReshard::list failed to acquire lock on " << reshard_oid << dendl;
+    ldout(store->ctx(), 0) << "RGWReshard::list failed to acquire lock on " << reshard_oid << dendl;
     return 0;
   }
   if (ret < 0)
@@ -126,7 +478,7 @@ int RGWReshard::get(cls_rgw_reshard_entry& entry)
 
   int ret = l.lock_shared(&store->reshard_pool_ctx, reshard_oid);
   if (ret == -EBUSY) {
-    ldout(cct, 0) << "RGWReshardLog::get failed to acquire lock on " << reshard_oid << dendl;
+    ldout(store->ctx(), 0) << "RGWReshardLog::get failed to acquire lock on " << reshard_oid << dendl;
     return 0;
   }
   if (ret < 0)
@@ -144,7 +496,7 @@ int RGWReshard::remove(cls_rgw_reshard_entry& entry)
 
   int ret = l.lock_exclusive(&store->reshard_pool_ctx, reshard_oid);
   if (ret == -EBUSY) {
-    ldout(cct, 0) << "RGWReshardLog::remove failed to acquire lock on " << reshard_oid << dendl;
+    ldout(store->ctx(), 0) << "RGWReshardLog::remove failed to acquire lock on " << reshard_oid << dendl;
     return 0;
   }
   if (ret < 0)
@@ -169,7 +521,7 @@ int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_r
 
   int ret = l.lock_exclusive(&store->reshard_pool_ctx, bucket_instance_oid);
   if (ret == -EBUSY) {
-    ldout(cct, 0) << "RGWReshardLog::add failed to acquire lock on " << reshard_oid << dendl;
+    ldout(store->ctx(), 0) << "RGWReshardLog::add failed to acquire lock on " << reshard_oid << dendl;
     return 0;
   }
   if (ret < 0)
@@ -187,7 +539,7 @@ int RGWReshard::lock_bucket_index_shared(const string& oid)
 {
   int ret = instance_lock.lock_shared(&store->reshard_pool_ctx, oid);
   if (ret == -EBUSY) {
-    ldout(cct, 0) << "RGWReshardLog::add failed to acquire lock on " << reshard_oid << dendl;
+    ldout(store->ctx(), 0) << "RGWReshardLog::add failed to acquire lock on " << reshard_oid << dendl;
     return 0;
   }
 
@@ -217,7 +569,7 @@ int RGWReshard::block_while_resharding(const string& bucket_instance_oid,
 
     ret = cls_rgw_get_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid, &entry);
     if (ret < 0) {
-      ldout(cct, 0) << "RGWReshard::" << __func__ << " ERROR: failed to get bucket resharding :"  <<
+      ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: failed to get bucket resharding :"  <<
        cpp_strerror(-ret)<< dendl;
       return ret;
     }
@@ -232,13 +584,12 @@ int RGWReshard::block_while_resharding(const string& bucket_instance_oid,
     /* needed to unlock as clear resharding uses the same lock */
     sleep(default_reshard_sleep_duration);
   }
-  ldout(cct, 0) << "RGWReshard::" << __func__ << " ERROR: bucket is still resharding, please retry" << dendl;
+  ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: bucket is still resharding, please retry" << dendl;
   return -EAGAIN;
 }
 
-BucketIndexLockGuard::BucketIndexLockGuard(CephContext* _cct, RGWRados* _store,
-                                          const string& bucket_instance_id, const string& _oid, const librados::IoCtx& _io_ctx) :
-  cct(_cct),store(_store),
+BucketIndexLockGuard::BucketIndexLockGuard(CephContext *_cct, RGWRados* _store, const string& bucket_instance_id, const string& _oid, const librados::IoCtx& _io_ctx) :
+  cct(_cct), store(_store),
   l(create_bucket_index_lock_name(bucket_instance_id)),
   oid(_oid), io_ctx(_io_ctx),locked(false)
 {
index 8f8a46df77849c848b94a4f052ec469684e06e04..c8dcda5dbece6eb227e102e4815fd9de84a37656 100644 (file)
@@ -37,6 +37,7 @@ protected:
 class RGWBucketReshard {
   RGWRados *store;
   RGWBucketInfo bucket_info;
+  std::map<string, bufferlist> bucket_attrs;
 
   string reshard_oid;
   rados::cls::lock::Lock reshard_lock;
@@ -45,15 +46,26 @@ class RGWBucketReshard {
   void unlock_bucket();
   int init_resharding(const cls_rgw_reshard_entry& entry);
   int clear_resharding();
+
+  int create_new_bucket_instance(int new_num_shards,
+                                 RGWBucketInfo& new_bucket_info);
+  int do_reshard(int num_shards,
+                const RGWBucketInfo& new_bucket_info,
+                int max_entries,
+                 bool verbose,
+                 ostream *os,
+                Formatter *formatter);
 public:
-  RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info);
+  RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info,
+                   const std::map<string, bufferlist>& _bucket_attrs);
 
-  int reshard();
-  int abort_reshard();
+  int execute(int num_shards, int max_op_entries,
+              bool verbose = false, ostream *out = nullptr,
+              Formatter *formatter = nullptr);
+  int abort();
 };
 
 class RGWReshard {
-    CephContext *cct;
     RGWRados *store;
     string lock_name;
     int max_jobs;
@@ -63,7 +75,7 @@ class RGWReshard {
     int unlock_bucket_index(const string& oid);
 
   public:
-    RGWReshard(CephContext* cct, RGWRados* _store);
+    RGWReshard(RGWRados* _store);
     int add(cls_rgw_reshard_entry& entry);
     int get(cls_rgw_reshard_entry& entry);
     int remove(cls_rgw_reshard_entry& entry);