]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: Add lock/unlock bucket instance and block_while_resharding methods
authorOrit Wasserman <owasserm@redhat.com>
Tue, 18 Apr 2017 09:30:07 +0000 (12:30 +0300)
committerYehuda Sadeh <yehuda@redhat.com>
Mon, 5 Jun 2017 20:17:34 +0000 (13:17 -0700)
Signed-off-by: Orit Wasserman <owasserm@redhat.com>
src/rgw/rgw_reshard.cc
src/rgw/rgw_reshard.h

index bae28de06ba208fd3e5e756dcd70180ada87beb7..ab7bc8c9c038b137cdd55a818e8de22948c76685 100644 (file)
@@ -14,7 +14,8 @@ const string reshard_oid = "reshard";
 const string reshard_lock_name = "reshard_process";
 const string bucket_instance_lock_name = "bucket_instance_lock";
 
-RGWReshard::RGWReshard(CephContext *_cct, RGWRados* _store):cct(_cct), store(_store)
+RGWReshard::RGWReshard(CephContext *_cct, RGWRados* _store):cct(_cct), store(_store),
+                                                           instance_lock(bucket_instance_lock_name)
 {
     max_jobs = cct->_conf->rgw_reshard_max_jobs;
 }
@@ -268,3 +269,68 @@ int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_r
 }
 
 
+int RGWReshard::lock_bucket_index_shared(const string& oid)
+{
+  int ret = get_io_ctx(io_ctx);
+  if (ret < 0)
+    return ret;
+
+  ret = instance_lock.lock_shared(&io_ctx, oid);
+  if (ret == -EBUSY) {
+    dout(0) << "RGWReshardLog::add failed to acquire lock on " << reshard_oid << dendl;
+    return 0;
+  }
+
+  return ret;
+}
+
+int RGWReshard::unlock_bucket_index(const string& oid)
+{
+  int ret = get_io_ctx(io_ctx);
+  if (ret < 0)
+    return ret;
+  instance_lock.unlock(&io_ctx, oid);
+  return 0;
+}
+
+
+const int num_retries = 10;
+const int default_reshard_sleep_duration = 30;
+
+int RGWReshard::block_while_resharding(const string& bucket_instance_oid)
+{
+  int ret = 0;
+  cls_rgw_bucket_instance_entry entry;
+  bool resharding = false;
+
+  for (int i=0; i< num_retries;i++) {
+    ret = lock_bucket_index_shared(bucket_instance_oid);
+    if (ret < 0) {
+      return ret;
+    }
+
+    ret = cls_rgw_get_bucket_resharding(io_ctx, bucket_instance_oid,
+                                       entry, resharding);
+    if (ret < 0) {
+      ldout(cct, 0) << "RGWReshard::" << __func__ << " ERROR: failed to get bucket resharding :"  <<
+       cpp_strerror(-ret)<< dendl;
+      return ret;
+    }
+
+    if (resharding) {
+      ret = unlock_bucket_index(bucket_instance_oid);
+      if (ret < 0) {
+       return ret;
+      }
+      sleep(default_reshard_sleep_duration);
+    } else {
+      return 0;
+    }
+  }
+  ldout(cct, 0) << "RGWReshard::" << __func__ << " ERROR: bucket is still resharding, please retry" << dendl;
+  ret = unlock_bucket_index(bucket_instance_oid);
+  if (ret < 0) {
+    return ret;
+  }
+  return -EAGAIN;
+}
index 9a71e3078876902345a8ba52c7603ff5aaa6ebbe..ddde84635ec4607039db891c1fd04206ac4868f4 100644 (file)
@@ -7,6 +7,7 @@
 #include <vector>
 #include "include/rados/librados.hpp"
 #include "cls/rgw/cls_rgw_types.h"
+#include "cls/lock/cls_lock_client.h"
 
 class CephContext;
 class RGWRados;
@@ -16,8 +17,11 @@ class RGWReshard {
     RGWRados *store;
     string lock_name;
     int max_jobs;
+    rados::cls::lock::Lock instance_lock;
+    librados::IoCtx io_ctx;
 
     int get_io_ctx(librados::IoCtx& io_ctx);
+
   public:
     RGWReshard(CephContext* cct, RGWRados* _store);
     int add(cls_rgw_reshard_entry& entry);
@@ -27,7 +31,6 @@ class RGWReshard {
     int list(string& marker, uint32_t max, list<cls_rgw_reshard_entry>& entries, bool& is_truncated);
     int set_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
     int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
-    int get_bucket_instance_info(const rgw_bucket& bucket, RGWBucketInfo& bucket_info);
 };
 
 #endif