From 346b924cb22c0cbeef544f5bf0c8c20abed175b9 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Wed, 10 Oct 2018 08:03:55 -0400 Subject: [PATCH] rgw: add BlockingAioThrottle Signed-off-by: Casey Bodley --- src/rgw/rgw_aio_throttle.cc | 16 +++++----- src/rgw/rgw_aio_throttle.h | 50 +++++++++++++++++-------------- src/rgw/rgw_file.h | 2 +- src/rgw/rgw_op.cc | 7 ++--- src/rgw/rgw_rados.cc | 6 ++-- src/rgw/rgw_tools.cc | 6 ++-- src/test/rgw/test_rgw_throttle.cc | 6 ++-- 7 files changed, 47 insertions(+), 46 deletions(-) diff --git a/src/rgw/rgw_aio_throttle.cc b/src/rgw/rgw_aio_throttle.cc index c596ba4ed8e..8d3369c47c8 100644 --- a/src/rgw/rgw_aio_throttle.cc +++ b/src/rgw/rgw_aio_throttle.cc @@ -20,7 +20,7 @@ namespace rgw { -bool AioThrottle::waiter_ready() const +bool Throttle::waiter_ready() const { switch (waiter) { case Wait::Available: return is_available(); @@ -30,9 +30,9 @@ bool AioThrottle::waiter_ready() const } } -AioResultList AioThrottle::get(const RGWSI_RADOS::Obj& obj, - OpFunc&& f, - uint64_t cost, uint64_t id) +AioResultList BlockingAioThrottle::get(const RGWSI_RADOS::Obj& obj, + OpFunc&& f, + uint64_t cost, uint64_t id) { auto p = std::make_unique(); p->obj = obj; @@ -64,7 +64,7 @@ AioResultList AioThrottle::get(const RGWSI_RADOS::Obj& obj, return std::move(completed); } -void AioThrottle::put(AioResult& r) +void BlockingAioThrottle::put(AioResult& r) { auto& p = static_cast(r); std::scoped_lock lock{mutex}; @@ -80,13 +80,13 @@ void AioThrottle::put(AioResult& r) } } -AioResultList AioThrottle::poll() +AioResultList BlockingAioThrottle::poll() { std::unique_lock lock{mutex}; return std::move(completed); } -AioResultList AioThrottle::wait() +AioResultList BlockingAioThrottle::wait() { std::unique_lock lock{mutex}; if (completed.empty() && !pending.empty()) { @@ -98,7 +98,7 @@ AioResultList AioThrottle::wait() return std::move(completed); } -AioResultList AioThrottle::drain() +AioResultList BlockingAioThrottle::drain() { std::unique_lock lock{mutex}; if (!pending.empty()) { diff --git a/src/rgw/rgw_aio_throttle.h b/src/rgw/rgw_aio_throttle.h index 615301622cc..eeb12f826f5 100644 --- a/src/rgw/rgw_aio_throttle.h +++ b/src/rgw/rgw_aio_throttle.h @@ -23,53 +23,57 @@ namespace rgw { -// a throttle for aio operations that enforces a maximum window on outstanding -// bytes. only supports a single waiter, so all public functions must be called -// from the same thread -class AioThrottle : public Aio { +class Throttle { protected: const uint64_t window; uint64_t pending_size = 0; + AioResultList pending; + AioResultList completed; + bool is_available() const { return pending_size <= window; } bool has_completion() const { return !completed.empty(); } bool is_drained() const { return pending.empty(); } - struct Pending : AioResultEntry { - AioThrottle *parent = nullptr; - uint64_t cost = 0; - }; - OwningList pending; - AioResultList completed; - enum class Wait { None, Available, Completion, Drained }; Wait waiter = Wait::None; bool waiter_ready() const; - ceph::mutex mutex = ceph::make_mutex("AioThrottle"); - ceph::condition_variable cond; - public: - AioThrottle(uint64_t window) : window(window) {} + Throttle(uint64_t window) : window(window) {} - virtual ~AioThrottle() { + ~Throttle() { // must drain before destructing ceph_assert(pending.empty()); ceph_assert(completed.empty()); } +}; + +// a throttle for aio operations. all public functions must be called from +// the same thread +class BlockingAioThrottle final : public Aio, private Throttle { + ceph::mutex mutex = ceph::make_mutex("AioThrottle"); + ceph::condition_variable cond; + + struct Pending : AioResultEntry { + BlockingAioThrottle *parent = nullptr; + uint64_t cost = 0; + librados::AioCompletion *completion = nullptr; + }; + public: + BlockingAioThrottle(uint64_t window) : Throttle(window) {} - AioResultList get(const RGWSI_RADOS::Obj& obj, - OpFunc&& f, - uint64_t cost, uint64_t id) override; - void put(AioResult& r) override; + AioResultList get(const RGWSI_RADOS::Obj& obj, OpFunc&& f, + uint64_t cost, uint64_t id) override final; + void put(AioResult& r) override final; - AioResultList poll() override; + AioResultList poll() override final; - AioResultList wait() override; + AioResultList wait() override final; - AioResultList drain() override; + AioResultList drain() override final; }; } // namespace rgw diff --git a/src/rgw/rgw_file.h b/src/rgw/rgw_file.h index 1bd94332403..2da5187deb2 100644 --- a/src/rgw/rgw_file.h +++ b/src/rgw/rgw_file.h @@ -2351,7 +2351,7 @@ public: const std::string& bucket_name; const std::string& obj_name; RGWFileHandle* rgw_fh; - std::optional aio; + std::optional aio; std::optional processor; rgw::putobj::DataProcessor* filter; boost::optional compressor; diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 6e97957e1e6..13d472ec336 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -3636,7 +3636,7 @@ void RGWPutObj::execute() } // create the object processor - rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size); + rgw::BlockingAioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size); using namespace rgw::putobj; constexpr auto max_processor_size = std::max({sizeof(MultipartObjectProcessor), sizeof(AtomicObjectProcessor), @@ -3999,7 +3999,7 @@ void RGWPostObj::execute() store->gen_rand_obj_instance_name(&obj); } - rgw::AioThrottle aio(s->cct->_conf->rgw_put_obj_min_window_size); + rgw::BlockingAioThrottle aio(s->cct->_conf->rgw_put_obj_min_window_size); using namespace rgw::putobj; AtomicObjectProcessor processor(&aio, store, s->bucket_info, @@ -6745,10 +6745,9 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path, rgw_placement_rule dest_placement = s->dest_placement; dest_placement.inherit_from(binfo.placement_rule); - rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size); + rgw::BlockingAioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size); using namespace rgw::putobj; - AtomicObjectProcessor processor(&aio, store, binfo, &s->dest_placement, bowner.get_id(), obj_ctx, obj, 0, s->req_id, this); diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 49b7d13d4ea..bd665b9ba60 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -4278,7 +4278,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx, set_mtime_weight.high_precision = high_precision_time; int ret; - rgw::AioThrottle aio(cct->_conf->rgw_put_obj_min_window_size); + rgw::BlockingAioThrottle aio(cct->_conf->rgw_put_obj_min_window_size); using namespace rgw::putobj; const rgw_placement_rule *ptail_rule = (dest_placement_rule ? &(*dest_placement_rule) : nullptr); AtomicObjectProcessor processor(&aio, this, dest_bucket_info, ptail_rule, user_id, @@ -4856,7 +4856,7 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx, string tag; append_rand_alpha(cct, tag, tag, 32); - rgw::AioThrottle aio(cct->_conf->rgw_put_obj_min_window_size); + rgw::BlockingAioThrottle aio(cct->_conf->rgw_put_obj_min_window_size); using namespace rgw::putobj; AtomicObjectProcessor processor(&aio, this, dest_bucket_info, &dest_placement, dest_bucket_info.owner, obj_ctx, @@ -6826,7 +6826,7 @@ int RGWRados::Object::Read::iterate(int64_t ofs, int64_t end, RGWGetDataCB *cb) const uint64_t chunk_size = cct->_conf->rgw_get_obj_max_req_size; const uint64_t window_size = cct->_conf->rgw_get_obj_window_size; - rgw::AioThrottle aio(window_size); + rgw::BlockingAioThrottle aio(window_size); get_obj_data data(store, cb, &aio, ofs); int r = store->iterate_obj(obj_ctx, source->get_bucket_info(), state.obj, diff --git a/src/rgw/rgw_tools.cc b/src/rgw/rgw_tools.cc index 1c84ebfc6f3..4c719ee4115 100644 --- a/src/rgw/rgw_tools.cc +++ b/src/rgw/rgw_tools.cc @@ -429,8 +429,7 @@ int RGWDataAccess::Object::put(bufferlist& data, RGWBucketInfo& bucket_info = bucket->bucket_info; - using namespace rgw::putobj; - rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size); + rgw::BlockingAioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size); RGWObjectCtx obj_ctx(store); rgw_obj obj(bucket_info.bucket, key); @@ -439,6 +438,7 @@ int RGWDataAccess::Object::put(bufferlist& data, string req_id = store->svc.zone_utils->unique_id(store->get_new_req_id()); + using namespace rgw::putobj; AtomicObjectProcessor processor(&aio, store, bucket_info, nullptr, owner.get_id(), @@ -448,8 +448,6 @@ int RGWDataAccess::Object::put(bufferlist& data, if (ret < 0) return ret; - using namespace rgw::putobj; - DataProcessor *filter = &processor; CompressorRef plugin; diff --git a/src/test/rgw/test_rgw_throttle.cc b/src/test/rgw/test_rgw_throttle.cc index 88eb6cecead..5a14b1a9cca 100644 --- a/src/test/rgw/test_rgw_throttle.cc +++ b/src/test/rgw/test_rgw_throttle.cc @@ -57,7 +57,7 @@ namespace rgw { TEST_F(Aio_Throttle, NoThrottleUpToMax) { - AioThrottle throttle(4); + BlockingAioThrottle throttle(4); auto obj = make_obj(__PRETTY_FUNCTION__); { librados::ObjectWriteOperation op1; @@ -84,7 +84,7 @@ TEST_F(Aio_Throttle, NoThrottleUpToMax) TEST_F(Aio_Throttle, CostOverWindow) { - AioThrottle throttle(4); + BlockingAioThrottle throttle(4); auto obj = make_obj(__PRETTY_FUNCTION__); librados::ObjectWriteOperation op; @@ -96,7 +96,7 @@ TEST_F(Aio_Throttle, CostOverWindow) TEST_F(Aio_Throttle, ThrottleOverMax) { constexpr uint64_t window = 4; - AioThrottle throttle(window); + BlockingAioThrottle throttle(window); auto obj = make_obj(__PRETTY_FUNCTION__); -- 2.39.5