return -ERR_BUSY_RESHARDING;
}
+
+static int create_new_bucket_instance(RGWRados *store,
+ int new_num_shards,
+ const RGWBucketInfo& bucket_info,
+ map<string, bufferlist>& attrs,
+ RGWBucketInfo& new_bucket_info)
+{
+
+ store->create_bucket_id(&new_bucket_info.bucket.bucket_id);
+ new_bucket_info.bucket.oid.clear();
+
+ new_bucket_info.num_shards = new_num_shards;
+ new_bucket_info.objv_tracker.clear();
+
+ int ret = store->init_bucket_index(new_bucket_info, new_bucket_info.num_shards);
+ if (ret < 0) {
+ cerr << "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+
+ ret = store->put_bucket_instance_info(new_bucket_info, true, real_time(), &attrs);
+ if (ret < 0) {
+ cerr << "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+
+ return 0;
+}
+
int RGWReshard::process_single_shard(const string& shard)
{
string marker;
CephContext *cct = store->ctx();
int max_entries = 1000;
int max_secs = 10;
-
+
rados::cls::lock::Lock l(reshard_lock_name);
utime_t time(max_secs, 0);
<< dendl;
continue;
}
+
+ for(auto& entry: entries) {
+ /* resharding has not started */
+ if(entry.new_instance_id.empty()) {
+ RGWObjectCtx obj_ctx(store);
+ rgw_bucket bucket;
+ RGWBucketInfo bucket_info;
+ map<string, bufferlist> attrs;
+
+ ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name, bucket_info, nullptr,
+ &attrs);
+ if (ret < 0) {
+ ldout(cct, 0) << __func__ << ": Error in get_bucket_info: " << cpp_strerror(-ret) << dendl;
+ return -ret;
+ }
+
+ RGWBucketInfo new_bucket_info(bucket_info);
+ ret = create_new_bucket_instance(store, entry.new_num_shards, bucket_info, attrs,
+ new_bucket_info);
+ if (ret < 0) {
+ ldout(cct, 0) << __func__ << " ERROR: could not create new bucket info: " << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+
+ entry.new_instance_id = entry.bucket_name + ":" + new_bucket_info.bucket.bucket_id;
+
+ ret = add(entry);
+ if (ret < 0) {
+ ldout(cct, 0) << __func__ << ":Error in updateing entry bucket " << entry.bucket_name << ": " <<
+ cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+
+ Formatter* formatter = new JSONFormatter(false);
+ auto formatter_ptr = std::unique_ptr<Formatter>(formatter);
+ RGWBucketAdminOpState bucket_op;
+ ret = reshard_bucket(formatter, entry.new_num_shards, bucket, bucket_info, new_bucket_info,
+ max_entries, bucket_op, true);
+ formatter->flush(cout);
+ if (ret < 0) {
+ return ret;
+ }
+
+ ret = remove(entry);
+ if (ret < 0) {
+ ldout(cct, 0)<< __func__ << ":Error removing bucket " << entry.bucket_name << " for resharding queue: "
+ << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+ }
+ }
} while (truncated);
l.unlock(&store->reshard_pool_ctx, shard);