]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add BlockingAioThrottle
authorCasey Bodley <cbodley@redhat.com>
Wed, 10 Oct 2018 12:03:55 +0000 (08:03 -0400)
committerCasey Bodley <cbodley@redhat.com>
Wed, 24 Apr 2019 13:44:14 +0000 (09:44 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_aio_throttle.cc
src/rgw/rgw_aio_throttle.h
src/rgw/rgw_file.h
src/rgw/rgw_op.cc
src/rgw/rgw_rados.cc
src/rgw/rgw_tools.cc
src/test/rgw/test_rgw_throttle.cc

index c596ba4ed8e816d7a2f1594ae5ec18eb2f2039b7..8d3369c47c888204c0634ad0a9bb6ca0668b08af 100644 (file)
@@ -20,7 +20,7 @@
 
 namespace rgw {
 
-bool AioThrottle::waiter_ready() const
+bool Throttle::waiter_ready() const
 {
   switch (waiter) {
   case Wait::Available: return is_available();
@@ -30,9 +30,9 @@ bool AioThrottle::waiter_ready() const
   }
 }
 
-AioResultList AioThrottle::get(const RGWSI_RADOS::Obj& obj,
-                              OpFunc&& f,
-                              uint64_t cost, uint64_t id)
+AioResultList BlockingAioThrottle::get(const RGWSI_RADOS::Obj& obj,
+                                       OpFunc&& f,
+                                       uint64_t cost, uint64_t id)
 {
   auto p = std::make_unique<Pending>();
   p->obj = obj;
@@ -64,7 +64,7 @@ AioResultList AioThrottle::get(const RGWSI_RADOS::Obj& obj,
   return std::move(completed);
 }
 
-void AioThrottle::put(AioResult& r)
+void BlockingAioThrottle::put(AioResult& r)
 {
   auto& p = static_cast<Pending&>(r);
   std::scoped_lock lock{mutex};
@@ -80,13 +80,13 @@ void AioThrottle::put(AioResult& r)
   }
 }
 
-AioResultList AioThrottle::poll()
+AioResultList BlockingAioThrottle::poll()
 {
   std::unique_lock lock{mutex};
   return std::move(completed);
 }
 
-AioResultList AioThrottle::wait()
+AioResultList BlockingAioThrottle::wait()
 {
   std::unique_lock lock{mutex};
   if (completed.empty() && !pending.empty()) {
@@ -98,7 +98,7 @@ AioResultList AioThrottle::wait()
   return std::move(completed);
 }
 
-AioResultList AioThrottle::drain()
+AioResultList BlockingAioThrottle::drain()
 {
   std::unique_lock lock{mutex};
   if (!pending.empty()) {
index 615301622cc52cfc8e742998a8f07e16a22f6513..eeb12f826f588c37257372f7cb0bd08b3a5f4267 100644 (file)
 
 namespace rgw {
 
-// a throttle for aio operations that enforces a maximum window on outstanding
-// bytes. only supports a single waiter, so all public functions must be called
-// from the same thread
-class AioThrottle : public Aio {
+class Throttle {
  protected:
   const uint64_t window;
   uint64_t pending_size = 0;
 
+  AioResultList pending;
+  AioResultList completed;
+
   bool is_available() const { return pending_size <= window; }
   bool has_completion() const { return !completed.empty(); }
   bool is_drained() const { return pending.empty(); }
 
-  struct Pending : AioResultEntry {
-    AioThrottle *parent = nullptr;
-    uint64_t cost = 0;
-  };
-  OwningList<Pending> pending;
-  AioResultList completed;
-
   enum class Wait { None, Available, Completion, Drained };
   Wait waiter = Wait::None;
 
   bool waiter_ready() const;
 
-  ceph::mutex mutex = ceph::make_mutex("AioThrottle");
-  ceph::condition_variable cond;
-
  public:
-  AioThrottle(uint64_t window) : window(window) {}
+  Throttle(uint64_t window) : window(window) {}
 
-  virtual ~AioThrottle() {
+  ~Throttle() {
     // must drain before destructing
     ceph_assert(pending.empty());
     ceph_assert(completed.empty());
   }
+};
+
+// a throttle for aio operations. all public functions must be called from
+// the same thread
+class BlockingAioThrottle final : public Aio, private Throttle {
+  ceph::mutex mutex = ceph::make_mutex("AioThrottle");
+  ceph::condition_variable cond;
+
+  struct Pending : AioResultEntry {
+    BlockingAioThrottle *parent = nullptr;
+    uint64_t cost = 0;
+    librados::AioCompletion *completion = nullptr;
+  };
+ public:
+  BlockingAioThrottle(uint64_t window) : Throttle(window) {}
 
-  AioResultList get(const RGWSI_RADOS::Obj& obj,
-                   OpFunc&& f,
-                   uint64_t cost, uint64_t id) override;
-  void put(AioResult& r) override;
+  AioResultList get(const RGWSI_RADOS::Obj& obj, OpFunc&& f,
+                    uint64_t cost, uint64_t id) override final;
 
+  void put(AioResult& r) override final;
 
-  AioResultList poll() override;
+  AioResultList poll() override final;
 
-  AioResultList wait() override;
+  AioResultList wait() override final;
 
-  AioResultList drain() override;
+  AioResultList drain() override final;
 };
 
 } // namespace rgw
index 1bd9433240370b2c518178c1474aeed0cf83f96d..2da5187deb2bccc211cc1c09a40f37b3689bf6f0 100644 (file)
@@ -2351,7 +2351,7 @@ public:
   const std::string& bucket_name;
   const std::string& obj_name;
   RGWFileHandle* rgw_fh;
-  std::optional<rgw::AioThrottle> aio;
+  std::optional<rgw::BlockingAioThrottle> aio;
   std::optional<rgw::putobj::AtomicObjectProcessor> processor;
   rgw::putobj::DataProcessor* filter;
   boost::optional<RGWPutObj_Compress> compressor;
index 6e97957e1e606c0529074461539dea57c361d438..13d472ec3368e250aa2a9cbdbda908e92d566176 100644 (file)
@@ -3636,7 +3636,7 @@ void RGWPutObj::execute()
   }
 
   // create the object processor
-  rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
+  rgw::BlockingAioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
   using namespace rgw::putobj;
   constexpr auto max_processor_size = std::max({sizeof(MultipartObjectProcessor),
                                                sizeof(AtomicObjectProcessor),
@@ -3999,7 +3999,7 @@ void RGWPostObj::execute()
       store->gen_rand_obj_instance_name(&obj);
     }
 
-    rgw::AioThrottle aio(s->cct->_conf->rgw_put_obj_min_window_size);
+    rgw::BlockingAioThrottle aio(s->cct->_conf->rgw_put_obj_min_window_size);
 
     using namespace rgw::putobj;
     AtomicObjectProcessor processor(&aio, store, s->bucket_info,
@@ -6745,10 +6745,9 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path,
   rgw_placement_rule dest_placement = s->dest_placement;
   dest_placement.inherit_from(binfo.placement_rule);
 
-  rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
+  rgw::BlockingAioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
 
   using namespace rgw::putobj;
-
   AtomicObjectProcessor processor(&aio, store, binfo, &s->dest_placement, bowner.get_id(),
                                   obj_ctx, obj, 0, s->req_id, this);
 
index 49b7d13d4ead252d96a1746fcdc3c0f6d34fd8e8..bd665b9ba602d09c2319c588b3ec2b56410b6304 100644 (file)
@@ -4278,7 +4278,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
   set_mtime_weight.high_precision = high_precision_time;
   int ret;
 
-  rgw::AioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
+  rgw::BlockingAioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
   using namespace rgw::putobj;
   const rgw_placement_rule *ptail_rule = (dest_placement_rule ? &(*dest_placement_rule) : nullptr);
   AtomicObjectProcessor processor(&aio, this, dest_bucket_info, ptail_rule, user_id,
@@ -4856,7 +4856,7 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx,
   string tag;
   append_rand_alpha(cct, tag, tag, 32);
 
-  rgw::AioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
+  rgw::BlockingAioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
   using namespace rgw::putobj;
   AtomicObjectProcessor processor(&aio, this, dest_bucket_info, &dest_placement,
                                   dest_bucket_info.owner, obj_ctx,
@@ -6826,7 +6826,7 @@ int RGWRados::Object::Read::iterate(int64_t ofs, int64_t end, RGWGetDataCB *cb)
   const uint64_t chunk_size = cct->_conf->rgw_get_obj_max_req_size;
   const uint64_t window_size = cct->_conf->rgw_get_obj_window_size;
 
-  rgw::AioThrottle aio(window_size);
+  rgw::BlockingAioThrottle aio(window_size);
   get_obj_data data(store, cb, &aio, ofs);
 
   int r = store->iterate_obj(obj_ctx, source->get_bucket_info(), state.obj,
index 1c84ebfc6f33a65dd0efb2d37732efde1dfec05e..4c719ee41154eb5a240bb97363c92f8694c6cec9 100644 (file)
@@ -429,8 +429,7 @@ int RGWDataAccess::Object::put(bufferlist& data,
 
   RGWBucketInfo& bucket_info = bucket->bucket_info;
 
-  using namespace rgw::putobj;
-  rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
+  rgw::BlockingAioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
 
   RGWObjectCtx obj_ctx(store);
   rgw_obj obj(bucket_info.bucket, key);
@@ -439,6 +438,7 @@ int RGWDataAccess::Object::put(bufferlist& data,
 
   string req_id = store->svc.zone_utils->unique_id(store->get_new_req_id());
 
+  using namespace rgw::putobj;
   AtomicObjectProcessor processor(&aio, store, bucket_info,
                                   nullptr,
                                   owner.get_id(),
@@ -448,8 +448,6 @@ int RGWDataAccess::Object::put(bufferlist& data,
   if (ret < 0)
     return ret;
 
-  using namespace rgw::putobj;
-
   DataProcessor *filter = &processor;
 
   CompressorRef plugin;
index 88eb6cecead4012a89c5e7e0d15a938705902682..5a14b1a9cca7f0179b3b4350e441cc23cbd51278 100644 (file)
@@ -57,7 +57,7 @@ namespace rgw {
 
 TEST_F(Aio_Throttle, NoThrottleUpToMax)
 {
-  AioThrottle throttle(4);
+  BlockingAioThrottle throttle(4);
   auto obj = make_obj(__PRETTY_FUNCTION__);
   {
     librados::ObjectWriteOperation op1;
@@ -84,7 +84,7 @@ TEST_F(Aio_Throttle, NoThrottleUpToMax)
 
 TEST_F(Aio_Throttle, CostOverWindow)
 {
-  AioThrottle throttle(4);
+  BlockingAioThrottle throttle(4);
   auto obj = make_obj(__PRETTY_FUNCTION__);
 
   librados::ObjectWriteOperation op;
@@ -96,7 +96,7 @@ TEST_F(Aio_Throttle, CostOverWindow)
 TEST_F(Aio_Throttle, ThrottleOverMax)
 {
   constexpr uint64_t window = 4;
-  AioThrottle throttle(window);
+  BlockingAioThrottle throttle(window);
 
   auto obj = make_obj(__PRETTY_FUNCTION__);