]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: RGWReshardWait takes optional_yield
authorCasey Bodley <cbodley@redhat.com>
Sat, 24 Nov 2018 03:33:38 +0000 (22:33 -0500)
committerCasey Bodley <cbodley@redhat.com>
Fri, 30 Nov 2018 19:37:03 +0000 (14:37 -0500)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_reshard.cc
src/rgw/rgw_reshard.h

index 93ce2247ee4358b668d3a87d413c5cc8b270df05..e0aca7084f0c061039960168824c218890326a7d 100644 (file)
@@ -6216,7 +6216,8 @@ int RGWRados::Bucket::UpdateIndex::guard_reshard(BucketShard **pbs, std::functio
     }
     ldout(store->ctx(), 0) << "NOTICE: resharding operation on bucket index detected, blocking" << dendl;
     string new_bucket_id;
-    r = store->block_while_resharding(bs, &new_bucket_id, target->bucket_info);
+    r = store->block_while_resharding(bs, &new_bucket_id,
+                                      target->bucket_info, null_yield);
     if (r == -ERR_BUSY_RESHARDING) {
       continue;
     }
@@ -7091,7 +7092,7 @@ int RGWRados::guard_reshard(BucketShard *bs,
     }
     ldout(cct, 0) << "NOTICE: resharding operation on bucket index detected, blocking" << dendl;
     string new_bucket_id;
-    r = block_while_resharding(bs, &new_bucket_id, bucket_info);
+    r = block_while_resharding(bs, &new_bucket_id, bucket_info, null_yield);
     if (r == -ERR_BUSY_RESHARDING) {
       continue;
     }
@@ -7115,11 +7116,12 @@ int RGWRados::guard_reshard(BucketShard *bs,
 
 int RGWRados::block_while_resharding(RGWRados::BucketShard *bs,
                                     string *new_bucket_id,
-                                    const RGWBucketInfo& bucket_info)
+                                     const RGWBucketInfo& bucket_info,
+                                     optional_yield y)
 {
   std::shared_ptr<RGWReshardWait> waiter = reshard_wait;
 
-  return waiter->block_while_resharding(bs, new_bucket_id, bucket_info);
+  return waiter->block_while_resharding(bs, new_bucket_id, bucket_info, y);
 }
 
 int RGWRados::bucket_index_link_olh(const RGWBucketInfo& bucket_info, RGWObjState& olh_state, const rgw_obj& obj_instance,
index 5a9c142cfd05a5c7a341e64319f51f536a75a410..59703c49fc2301c1b5fceb0e49601dd34c78fb2d 100644 (file)
@@ -2044,7 +2044,8 @@ public:
                    std::function<int(BucketShard *)> call);
   int block_while_resharding(RGWRados::BucketShard *bs,
                             string *new_bucket_id,
-                            const RGWBucketInfo& bucket_info);
+                            const RGWBucketInfo& bucket_info,
+                             optional_yield y);
 
   void bucket_index_guard_olh_op(RGWObjState& olh_state, librados::ObjectOperation& op);
   int olh_init_modification(const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, string *op_tag);
index 9d8b94efefe5e938b4866a4dc5e2e932b493ab69..ed6b935f7ec735791daf74f6cb4095ad165d4e62 100644 (file)
@@ -865,10 +865,34 @@ int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_r
 const int num_retries = 10;
 static const std::chrono::seconds default_reshard_sleep_duration(5);
 
-int RGWReshardWait::do_wait()
+int RGWReshardWait::do_wait(optional_yield y)
 {
   std::unique_lock lock(mutex);
 
+  if (going_down) {
+    return -ECANCELED;
+  }
+
+#ifdef HAVE_BOOST_CONTEXT
+  if (y) {
+    auto& context = y.get_io_context();
+    auto& yield = y.get_yield_context();
+
+    Waiter waiter(context);
+    waiters.push_back(waiter);
+    lock.unlock();
+
+    waiter.timer.expires_after(default_reshard_sleep_duration);
+
+    boost::system::error_code ec;
+    waiter.timer.async_wait(yield[ec]);
+
+    lock.lock();
+    waiters.erase(waiters.iterator_to(waiter));
+    return -ec.value();
+  }
+#endif
+
   cond.wait_for(lock, default_reshard_sleep_duration);
 
   if (going_down) {
@@ -878,9 +902,21 @@ int RGWReshardWait::do_wait()
   return 0;
 }
 
+void RGWReshardWait::stop()
+{
+  std::scoped_lock lock(mutex);
+  going_down = true;
+  cond.notify_all();
+  for (auto& waiter : waiters) {
+    // unblock any waiters with ECANCELED
+    waiter.timer.cancel();
+  }
+}
+
 int RGWReshardWait::block_while_resharding(RGWRados::BucketShard *bs,
                                           string *new_bucket_id,
-                                          const RGWBucketInfo& bucket_info)
+                                          const RGWBucketInfo& bucket_info,
+                                           optional_yield y)
 {
   int ret = 0;
   cls_rgw_bucket_instance_entry entry;
@@ -939,7 +975,7 @@ int RGWReshardWait::block_while_resharding(RGWRados::BucketShard *bs,
       } // if taking of lock succeeded
     } // block to encapsulate recovery from incomplete reshard
 
-    ret = do_wait();
+    ret = do_wait(y);
     if (ret < 0) {
       ldout(store->ctx(), 0) << __func__ << " ERROR: bucket is still resharding, please retry" << dendl;
       return ret;
index bf113ff527acc6d6350a167dc03c58db1ddf068e..74233db8e93825d63d713557936f50f97618a09b 100644 (file)
@@ -7,6 +7,8 @@
 #include <vector>
 #include <functional>
 
+#include <boost/intrusive/list.hpp>
+
 #include "include/rados/librados.hpp"
 #include "common/ceph_time.h"
 #include "cls/rgw/cls_rgw_types.h"
@@ -175,9 +177,16 @@ class RGWReshardWait {
   ceph::mutex mutex = ceph::make_mutex("RGWReshardWait::lock");
   ceph::condition_variable cond;
 
+  using Clock = ceph::coarse_real_clock;
+  struct Waiter : boost::intrusive::list_base_hook<> {
+    boost::asio::basic_waitable_timer<Clock> timer;
+    explicit Waiter(boost::asio::io_context& ioc) : timer(ioc) {}
+  };
+  boost::intrusive::list<Waiter> waiters;
+
   bool going_down{false};
 
-  int do_wait();
+  int do_wait(optional_yield y);
 public:
   explicit RGWReshardWait(RGWRados *_store) : store(_store) {}
   ~RGWReshardWait() {
@@ -185,13 +194,10 @@ public:
   }
   int block_while_resharding(RGWRados::BucketShard *bs,
                             string *new_bucket_id,
-                            const RGWBucketInfo& bucket_info);
-
-  void stop() {
-    std::scoped_lock lock(mutex);
-    going_down = true;
-    cond.notify_all();
-  }
+                            const RGWBucketInfo& bucket_info,
+                             optional_yield y);
+  // unblock any threads waiting on reshard
+  void stop();
 };
 
 #endif