]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: actually reshard an entry from the resharding queue
authorOrit Wasserman <owasserm@redhat.com>
Fri, 19 May 2017 13:48:12 +0000 (16:48 +0300)
committerYehuda Sadeh <yehuda@redhat.com>
Mon, 5 Jun 2017 20:17:52 +0000 (13:17 -0700)
Signed-off-by: Orit Wasserman <owasserm@redhat.com>
src/rgw/rgw_reshard.cc

index d91367cf3e4a126520658e1c826180b603c2ee4e..851be230e76e1c283e03a2e0025735c873ef4177 100644 (file)
@@ -629,6 +629,35 @@ int RGWReshard::block_while_resharding(RGWRados::BucketShard *bs, string *new_bu
   return -ERR_BUSY_RESHARDING;
 }
 
+
+static  int create_new_bucket_instance(RGWRados *store,
+                                     int new_num_shards,
+                                     const RGWBucketInfo& bucket_info,
+                                     map<string, bufferlist>& attrs,
+                                     RGWBucketInfo& new_bucket_info)
+{
+
+  store->create_bucket_id(&new_bucket_info.bucket.bucket_id);
+  new_bucket_info.bucket.oid.clear();
+
+  new_bucket_info.num_shards = new_num_shards;
+  new_bucket_info.objv_tracker.clear();
+
+  int ret = store->init_bucket_index(new_bucket_info, new_bucket_info.num_shards);
+  if (ret < 0) {
+    cerr << "ERROR: failed to init new bucket indexes: " << cpp_strerror(-ret) << std::endl;
+    return -ret;
+  }
+
+  ret = store->put_bucket_instance_info(new_bucket_info, true, real_time(), &attrs);
+  if (ret < 0) {
+    cerr << "ERROR: failed to store new bucket instance info: " << cpp_strerror(-ret) << std::endl;
+    return -ret;
+  }
+
+  return 0;
+}
+
 int RGWReshard::process_single_shard(const string& shard)
 {
   string marker;
@@ -637,7 +666,7 @@ int RGWReshard::process_single_shard(const string& shard)
   CephContext *cct = store->ctx();
   int max_entries = 1000;
   int max_secs = 10;
-  
+
   rados::cls::lock::Lock l(reshard_lock_name);
 
   utime_t time(max_secs, 0);
@@ -657,6 +686,57 @@ int RGWReshard::process_single_shard(const string& shard)
                      << dendl;
       continue;
     }
+
+    for(auto& entry: entries) {
+      /* resharding has not started */
+      if(entry.new_instance_id.empty()) {
+       RGWObjectCtx obj_ctx(store);
+       rgw_bucket bucket;
+       RGWBucketInfo bucket_info;
+       map<string, bufferlist> attrs;
+
+       ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name, bucket_info, nullptr,
+                                    &attrs);
+       if (ret < 0) {
+         ldout(cct, 0) <<  __func__ << ": Error in get_bucket_info: " << cpp_strerror(-ret) << dendl;
+         return -ret;
+       }
+
+       RGWBucketInfo new_bucket_info(bucket_info);
+       ret = create_new_bucket_instance(store, entry.new_num_shards, bucket_info, attrs,
+                                        new_bucket_info);
+       if (ret < 0) {
+         ldout(cct, 0)  << __func__ << " ERROR: could not create new bucket info: " << cpp_strerror(-ret) << dendl;
+         return ret;
+       }
+
+       entry.new_instance_id = entry.bucket_name + ":" + new_bucket_info.bucket.bucket_id;
+
+       ret = add(entry);
+       if (ret < 0) {
+         ldout(cct, 0) << __func__ << ":Error in updateing entry bucket " << entry.bucket_name << ": " <<
+           cpp_strerror(-ret) << dendl;
+         return ret;
+       }
+
+       Formatter* formatter = new JSONFormatter(false);
+       auto formatter_ptr = std::unique_ptr<Formatter>(formatter);
+       RGWBucketAdminOpState bucket_op;
+       ret = reshard_bucket(formatter, entry.new_num_shards, bucket, bucket_info, new_bucket_info,
+                            max_entries, bucket_op, true);
+       formatter->flush(cout);
+       if (ret < 0) {
+         return ret;
+       }
+
+       ret = remove(entry);
+       if (ret < 0) {
+         ldout(cct, 0)<< __func__ << ":Error removing bucket " << entry.bucket_name << " for resharding queue: "
+                      << cpp_strerror(-ret) << dendl;
+         return ret;
+       }
+      }
+    }
   } while (truncated);
 
   l.unlock(&store->reshard_pool_ctx, shard);