]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: update entry in resharding log when resharding has started
authorOrit Wasserman <owasserm@redhat.com>
Tue, 23 May 2017 10:37:08 +0000 (13:37 +0300)
committerYehuda Sadeh <yehuda@redhat.com>
Mon, 5 Jun 2017 20:18:01 +0000 (13:18 -0700)
Signed-off-by: Orit Wasserman <owasserm@redhat.com>
src/rgw/rgw_reshard.cc
src/rgw/rgw_reshard.h

index a80291d09d101846e5ef9d2b02f1e52ff4110929..7ef28f0fcd8d1a135f5deb4581ec9abaa93249df 100644 (file)
@@ -9,6 +9,8 @@
 #include "common/errno.h"
 #include "common/ceph_json.h"
 
+#include "common/dout.h"
+
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
 
@@ -190,7 +192,7 @@ public:
 RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info, const map<string, bufferlist>& _bucket_attrs) :
                                                      store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs),
                                                      reshard_lock(reshard_lock_name) {
-  const rgw_bucket& b = bucket_info.bucket;                                                       
+  const rgw_bucket& b = bucket_info.bucket;
   reshard_oid = b.tenant + (b.tenant.empty() ? "" : ":") + b.name + ":" + b.bucket_id;
 
   utime_t lock_duration(store->ctx()->_conf->rgw_reshard_bucket_lock_duration, 0);
@@ -510,7 +512,7 @@ int RGWBucketReshard::get_status(list<cls_rgw_bucket_instance_entry> *status)
 }
 
 int RGWBucketReshard::execute(int num_shards, int max_op_entries,
-                              bool verbose, ostream *out, Formatter *formatter)
+                              bool verbose, ostream *out, Formatter *formatter, RGWReshard* reshard_log)
 
 {
   int ret = lock_bucket();
@@ -519,12 +521,19 @@ int RGWBucketReshard::execute(int num_shards, int max_op_entries,
   }
 
   RGWBucketInfo new_bucket_info;
-
   ret = create_new_bucket_instance(num_shards, new_bucket_info);
   if (ret < 0) {
     unlock_bucket();
     return ret;
   }
+
+  if (reshard_log) {
+    ret = reshard_log->update(bucket_info, new_bucket_info);
+    if (ret < 0) {
+      return ret;
+    }
+  }
+
   ret = set_resharding_status(new_bucket_info.bucket.bucket_id, num_shards, CLS_RGW_RESHARD_IN_PROGRESS);
   if (ret < 0) {
     unlock_bucket();
@@ -599,6 +608,28 @@ int RGWReshard::add(cls_rgw_reshard_entry& entry)
   return 0;
 }
 
+int RGWReshard::update(const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info)
+{
+  cls_rgw_reshard_entry entry;
+  entry.bucket_name = bucket_info.bucket.name;
+  entry.bucket_id = bucket_info.bucket.bucket_id;
+
+  int ret = get(entry);
+  if (ret < 0) {
+    return ret;
+  }
+
+  entry.new_instance_id = new_bucket_info.bucket.name + ":"  + new_bucket_info.bucket.bucket_id;
+
+  ret = add(entry);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " <<
+      cpp_strerror(-ret) << dendl;
+  }
+
+  return ret;
+}
+
 
 int RGWReshard::list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated)
 {
@@ -713,7 +744,7 @@ int RGWReshardWait::block_while_resharding(RGWRados::BucketShard *bs, string *ne
 int RGWReshard::process_single_logshard(int logshard_num)
 {
   string marker;
-  bool truncated = false;
+  bool truncated = true;
 
   CephContext *cct = store->ctx();
   int max_entries = 1000;
@@ -750,54 +781,41 @@ int RGWReshard::process_single_logshard(int logshard_num)
     }
 
     for(auto& entry: entries) {
-      /* resharding has not started */
-      RGWObjectCtx obj_ctx(store);
-      RGWBucketInfo bucket_info;
-      map<string, bufferlist> attrs;
+      if(entry.new_instance_id.empty()) {
 
-      ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name, bucket_info, nullptr,
-                                   &attrs);
-      if (ret < 0) {
-        ldout(cct, 0) <<  __func__ << ": Error in get_bucket_info: " << cpp_strerror(-ret) << dendl;
-        return -ret;
-      }
-      rgw_bucket bucket = bucket_info.bucket;
-      RGWBucketInfo new_bucket_info(bucket_info);
+       ldout(store->ctx(), 20) << __func__ << " resharding " << entry.bucket_name  << dendl;
 
-      if(entry.new_instance_id.empty()) {
-       ret = create_new_bucket_instance(store, entry.new_num_shards, bucket_info, attrs,
-                                        new_bucket_info);
+       RGWObjectCtx obj_ctx(store);
+       rgw_bucket bucket;
+       RGWBucketInfo bucket_info;
+       map<string, bufferlist> attrs;
+
+       ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name, bucket_info, nullptr,
+                                   &attrs);
        if (ret < 0) {
-         ldout(cct, 0)  << __func__ << " ERROR: could not create new bucket info: " << cpp_strerror(-ret) << dendl;
-         return ret;
+         ldout(cct, 0) <<  __func__ << ": Error in get_bucket_info: " << cpp_strerror(-ret) << dendl;
+         return -ret;
        }
 
-       entry.new_instance_id = new_bucket_info.bucket.bucket_id;
-
-        ldout(cct, 20) << "reshard: assigning new bucket instance id for bucket=" << bucket.name
-            << " new_instance_id=" << entry.new_instance_id << dendl;
+       RGWBucketReshard br(store, bucket_info, attrs);
 
-       ret = add(entry);
+       Formatter* formatter = new JSONFormatter(false);
+       auto formatter_ptr = std::unique_ptr<Formatter>(formatter);
+       ret = br.execute(entry.new_num_shards, max_entries, true,nullptr, formatter, this);
        if (ret < 0) {
-         ldout(cct, 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " <<
-           cpp_strerror(-ret) << dendl;
+         ldout (store->ctx(), 0) <<  __func__ << "ERROR in reshard_bucket " << entry.bucket_name << ":" <<
+           cpp_strerror(-ret)<< dendl;
          return ret;
        }
-      }
 
-      RGWBucketAdminOpState bucket_op;
-      RGWBucketReshard reshard_op(store, bucket_info, attrs);
-      ret = reshard_op.do_reshard(entry.new_num_shards, new_bucket_info,
-                                  max_entries, verbose, out, formatter);
-      if (ret < 0) {
-        return ret;
-      }
+       ldout (store->ctx(), 20) <<  " removing entry" << entry.bucket_name<< dendl;
 
-      ret = remove(entry);
-      if (ret < 0) {
-        ldout(cct, 0)<< __func__ << ":Error removing bucket " << entry.bucket_name << " for resharding queue: "
-          << cpp_strerror(-ret) << dendl;
-        return ret;
+       ret = remove(entry);
+       if (ret < 0) {
+         ldout(cct, 0)<< __func__ << ":Error removing bucket " << entry.bucket_name << " for resharding queue: "
+                      << cpp_strerror(-ret) << dendl;
+         return ret;
+       }
       }
       utime_t now = ceph_clock_now();
 
index 4237d2dc4fc3f3d2c71b094d54da9b6a994297ac..9873707f97d35a187f61e2210460114c4e9e15b8 100644 (file)
@@ -29,8 +29,7 @@ class RGWBucketReshard {
   int set_resharding_status(const string& new_instance_id, int32_t num_shards, cls_rgw_reshard_status status);
   int clear_resharding();
 
-  int create_new_bucket_instance(int new_num_shards,
-                                 RGWBucketInfo& new_bucket_info);
+  int create_new_bucket_instance(int new_num_shards, RGWBucketInfo& new_bucket_info);
   int do_reshard(int num_shards,
                 const RGWBucketInfo& new_bucket_info,
                 int max_entries,
@@ -43,7 +42,8 @@ public:
 
   int execute(int num_shards, int max_op_entries,
               bool verbose = false, ostream *out = nullptr,
-              Formatter *formatter = nullptr);
+              Formatter *formatter = nullptr,
+             RGWReshard *reshard_log = nullptr);
   int abort();
   int get_status(std::list<cls_rgw_bucket_instance_entry> *status);
 };
@@ -87,6 +87,7 @@ protected:
 public:
   RGWReshard(RGWRados* _store, bool _verbose = false, ostream *_out = nullptr, Formatter *_formatter = nullptr);
   int add(cls_rgw_reshard_entry& entry);
+  int update(const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info);
   int get(cls_rgw_reshard_entry& entry);
   int remove(cls_rgw_reshard_entry& entry);
   int list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated);