]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: replace reshard blocking sleep with interruptible condition wait
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 18 May 2017 18:51:21 +0000 (11:51 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Mon, 5 Jun 2017 20:17:52 +0000 (13:17 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_reshard.cc
src/rgw/rgw_reshard.h

index 51a21a89350c06d47a0ecf65fa85ec9e02e67fed..fb3abc2cc283db35d25d57e10cafb5af3b797264 100644 (file)
@@ -3719,6 +3719,10 @@ void RGWRados::finalize()
   delete obj_tombstone_cache;
   delete sync_modules_manager;
 
+  if (reshard_wait.get()) {
+    reshard_wait->stop();
+    reshard_wait.reset();
+  }
   delete reshard;
   delete index_completion_manager;
 }
@@ -4532,6 +4536,8 @@ int RGWRados::init_complete()
     obj_tombstone_cache = new tombstone_cache_t(cct->_conf->rgw_obj_tombstone_cache_size);
   }
 
+  reshard_wait = std::make_shared<RGWReshardWait>(this);
+
   reshard = new RGWReshard(this);
   index_completion_manager = new RGWIndexCompletionManager(this);
   ret = index_completion_manager->start();
@@ -9804,9 +9810,8 @@ int RGWRados::Bucket::UpdateIndex::guard_reshard(BucketShard **pbs, std::functio
       break;
     }
     ldout(store->ctx(), 0) << "NOTICE: resharding operation on bucket index detected, blocking" << dendl;
-    RGWReshard reshard(store);
     string new_bucket_id;
-    r = reshard.block_while_resharding(bs, &new_bucket_id);
+    r = store->block_while_resharding(bs, &new_bucket_id);
     if (r == -ERR_BUSY_RESHARDING) {
       continue;
     }
@@ -10772,9 +10777,8 @@ int RGWRados::guard_reshard(BucketShard *bs, const rgw_obj& obj_instance, std::f
       break;
     }
     ldout(cct, 0) << "NOTICE: resharding operation on bucket index detected, blocking" << dendl;
-    RGWReshard reshard(this);
     string new_bucket_id;
-    r = reshard.block_while_resharding(bs, &new_bucket_id);
+    r = block_while_resharding(bs, &new_bucket_id);
     if (r == -ERR_BUSY_RESHARDING) {
       continue;
     }
@@ -10796,6 +10800,13 @@ int RGWRados::guard_reshard(BucketShard *bs, const rgw_obj& obj_instance, std::f
   return 0;
 }
 
+int RGWRados::block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id)
+{
+  std::shared_ptr<RGWReshardWait> waiter = reshard_wait;
+
+  return waiter->block_while_resharding(bs, new_bucket_id);
+}
+
 int RGWRados::bucket_index_link_olh(const RGWBucketInfo& bucket_info, RGWObjState& olh_state, const rgw_obj& obj_instance,
                                     bool delete_marker,
                                     const string& op_tag,
index 88594eba0266e1850946afddf7380e9dda052d8f..fe6633b9da3a936013cd37cd6f12d0990756ba54 100644 (file)
@@ -39,6 +39,7 @@ class RGWRESTConn;
 struct RGWZoneGroup;
 struct RGWZoneParams;
 class RGWReshard;
+class RGWReshardWait;
 
 /* flags for put_obj_meta() */
 #define PUT_OBJ_CREATE      0x01
@@ -2456,6 +2457,7 @@ public:
   RGWDataChangesLog *data_log;
 
   RGWReshard *reshard;
+  std::shared_ptr<RGWReshardWait> reshard_wait;
 
   virtual ~RGWRados() = default;
 
@@ -3252,6 +3254,7 @@ public:
   int obj_operate(const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::ObjectReadOperation *op);
 
   int guard_reshard(BucketShard *bs, const rgw_obj& obj_instance, std::function<int(BucketShard *)> call);
+  int block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id);
 
   void bucket_index_guard_olh_op(RGWObjState& olh_state, librados::ObjectOperation& op);
   int olh_init_modification(const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, string *op_tag);
index 851be230e76e1c283e03a2e0025735c873ef4177..10b7f87df38123b8576b3e6fab1e1dc600d5b988 100644 (file)
@@ -604,15 +604,28 @@ int RGWReshard::unlock_bucket_index(const string& oid)
 const int num_retries = 10;
 const int default_reshard_sleep_duration = 5;
 
-int RGWReshard::block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id)
+int RGWReshardWait::do_wait()
+{
+  Mutex::Locker l(lock);
+
+  cond.WaitInterval(lock, utime_t(default_reshard_sleep_duration, 0));
+
+  if (going_down) {
+    return -ECANCELED;
+  }
+
+  return 0;
+}
+
+int RGWReshardWait::block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id)
 {
   int ret = 0;
   cls_rgw_bucket_instance_entry entry;
 
-  for (int i=0; i< num_retries;i++) {
+  for (int i=0; i < num_retries;i++) {
     ret = cls_rgw_get_bucket_resharding(bs->index_ctx, bs->bucket_obj, &entry);
     if (ret < 0) {
-      ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: failed to get bucket resharding :"  <<
+      ldout(store->ctx(), 0) << __func__ << " ERROR: failed to get bucket resharding :"  <<
        cpp_strerror(-ret)<< dendl;
       return ret;
     }
@@ -622,10 +635,18 @@ int RGWReshard::block_while_resharding(RGWRados::BucketShard *bs, string *new_bu
     }
     ldout(store->ctx(), 20) << "NOTICE: reshard still in progress; " << (i < num_retries - 1 ? "retrying" : "too many retries") << dendl;
     /* needed to unlock as clear resharding uses the same lock */
-#warning replace sleep with interruptible condition
-    sleep(default_reshard_sleep_duration);
+
+    if (i == num_retries - 1) {
+      break;
+    }
+
+    ret = do_wait();
+    if (ret < 0) {
+      ldout(store->ctx(), 0) << __func__ << " ERROR: bucket is still resharding, please retry" << dendl;
+      return ret;
+    }
   }
-  ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: bucket is still resharding, please retry" << dendl;
+  ldout(store->ctx(), 0) << __func__ << " ERROR: bucket is still resharding, please retry" << dendl;
   return -ERR_BUSY_RESHARDING;
 }
 
index bb13753fc2e272f231c5091f4a8682adf73aa22d..1d4366203674c1f5667cb5d90a3900f8b32bf905 100644 (file)
@@ -106,7 +106,6 @@ protected:
     int remove(cls_rgw_reshard_entry& entry);
     int list(string& marker, uint32_t max, list<cls_rgw_reshard_entry>& entries, bool& is_truncated);
     int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
-    int block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id);
 
     int reshard_bucket(Formatter *formatter,
                       int num_shards,
@@ -125,4 +124,27 @@ protected:
     void stop_processor();
 };
 
+
+class RGWReshardWait {
+  RGWRados *store;
+  Mutex lock{"RGWReshardWait::lock"};
+  Cond cond;
+
+  bool going_down{false};
+
+  int do_wait();
+public:
+  RGWReshardWait(RGWRados *_store) : store(_store) {}
+  ~RGWReshardWait() {
+    assert(going_down);
+  }
+  int block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id);
+
+  void stop() {
+    Mutex::Locker l(lock);
+    going_down = true;
+    cond.SignalAll();
+  }
+};
+
 #endif