]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: reshard: renew lease and handle marker when listing reshard repo
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 24 May 2017 18:21:46 +0000 (11:21 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Mon, 5 Jun 2017 20:17:58 +0000 (13:17 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_admin.cc
src/rgw/rgw_reshard.cc

index e1a9027007ec711a3d1047e070b58b192d0183ee..7f12c1f36be292c633ae8b61fa2a60df30c7e091 100644 (file)
@@ -5650,10 +5650,10 @@ next:
         for (auto iter=entries.begin(); iter != entries.end(); ++iter) {
           cls_rgw_reshard_entry& entry = *iter;
           encode_json("entry", entry, formatter);
+          entry.get_key(&marker);
         }
         count += entries.size();
         formatter->flush(cout);
-#warning marker?
       } while (is_truncated && count < max_entries);
 
       if (count >= max_entries) {
index 5bdf32d0afbe72eb49c7761eefff55c11b68b178..7bc885dcdcdc6914af6f4b023a3c25537b2c5790 100644 (file)
@@ -197,6 +197,7 @@ RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucke
 #define COOKIE_LEN 16
   char cookie_buf[COOKIE_LEN + 1];
   gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
+  cookie_buf[COOKIE_LEN] = '\0';
 
   reshard_lock.set_cookie(cookie_buf);
   reshard_lock.set_duration(lock_duration);
@@ -671,6 +672,12 @@ int RGWReshard::process_single_logshard(int logshard_num)
   utime_t time(max_secs, 0);
   l.set_duration(time);
 
+  char cookie_buf[COOKIE_LEN + 1];
+  gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
+  cookie_buf[COOKIE_LEN] = '\0';
+
+  l.set_cookie(cookie_buf);
+
   string logshard_oid;
   get_logshard_oid(logshard_num, &logshard_oid);
 
@@ -680,6 +687,8 @@ int RGWReshard::process_single_logshard(int logshard_num)
     return ret;
   }
 
+  utime_t lock_start_time = ceph_clock_now();
+
   do {
     std::list<cls_rgw_reshard_entry> entries;
     ret = list(logshard_num, marker, max_entries, entries, &truncated);
@@ -690,21 +699,20 @@ int RGWReshard::process_single_logshard(int logshard_num)
 
     for(auto& entry: entries) {
       /* resharding has not started */
-      if(entry.new_instance_id.empty()) {
-       RGWObjectCtx obj_ctx(store);
-       rgw_bucket bucket;
-       RGWBucketInfo bucket_info;
-       map<string, bufferlist> attrs;
+      RGWObjectCtx obj_ctx(store);
+      RGWBucketInfo bucket_info;
+      map<string, bufferlist> attrs;
 
-       ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name, bucket_info, nullptr,
-                                    &attrs);
-       if (ret < 0) {
-         ldout(cct, 0) <<  __func__ << ": Error in get_bucket_info: " << cpp_strerror(-ret) << dendl;
-         return -ret;
-       }
-        bucket = bucket_info.bucket;
+      ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name, bucket_info, nullptr,
+                                   &attrs);
+      if (ret < 0) {
+        ldout(cct, 0) <<  __func__ << ": Error in get_bucket_info: " << cpp_strerror(-ret) << dendl;
+        return -ret;
+      }
+      rgw_bucket bucket = bucket_info.bucket;
+      RGWBucketInfo new_bucket_info(bucket_info);
 
-       RGWBucketInfo new_bucket_info(bucket_info);
+      if(entry.new_instance_id.empty()) {
        ret = create_new_bucket_instance(store, entry.new_num_shards, bucket_info, attrs,
                                         new_bucket_info);
        if (ret < 0) {
@@ -714,31 +722,44 @@ int RGWReshard::process_single_logshard(int logshard_num)
 
        entry.new_instance_id = new_bucket_info.bucket.bucket_id;
 
+        ldout(cct, 20) << "reshard: assigning new bucket instance id for bucket=" << bucket.name
+            << " new_instance_id=" << entry.new_instance_id << dendl;
+
        ret = add(entry);
        if (ret < 0) {
          ldout(cct, 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " <<
            cpp_strerror(-ret) << dendl;
          return ret;
        }
+      }
 
-        RGWBucketAdminOpState bucket_op;
-        RGWBucketReshard reshard_op(store, bucket_info, attrs);
-        ret = reshard_op.do_reshard(entry.new_num_shards, new_bucket_info,
-                                    max_entries, false, nullptr, nullptr);
-       if (ret < 0) {
-         return ret;
-       }
+      RGWBucketAdminOpState bucket_op;
+      RGWBucketReshard reshard_op(store, bucket_info, attrs);
+      ret = reshard_op.do_reshard(entry.new_num_shards, new_bucket_info,
+                                  max_entries, false, nullptr, nullptr);
+      if (ret < 0) {
+        return ret;
+      }
 
-       ret = remove(entry);
-       if (ret < 0) {
-         ldout(cct, 0)<< __func__ << ":Error removing bucket " << entry.bucket_name << " for resharding queue: "
-                      << cpp_strerror(-ret) << dendl;
-         return ret;
-       }
+      ret = remove(entry);
+      if (ret < 0) {
+        ldout(cct, 0)<< __func__ << ":Error removing bucket " << entry.bucket_name << " for resharding queue: "
+          << cpp_strerror(-ret) << dendl;
+        return ret;
+      }
+      utime_t now = ceph_clock_now();
+
+      if (now > lock_start_time + max_secs / 2) { /* do you need to renew lock? */
+        l.set_renew(true);
+        ret = l.lock_exclusive(&store->reshard_pool_ctx, logshard_oid);
+        if (ret == -EBUSY) { /* already locked by another processor */
+          ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << logshard_oid << dendl;
+          return ret;
+        }
+        lock_start_time = now;
       }
+      entry.get_key(&marker);
     }
-#warning update marker?
-#warning do work here, renew lock
   } while (truncated);
 
   l.unlock(&store->reshard_pool_ctx, logshard_oid);