From: Casey Bodley Date: Sat, 17 Nov 2018 16:38:02 +0000 (-0500) Subject: rgw: move aio throttles to namespace rgw X-Git-Tag: v14.1.0~704^2~8 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=85fb04416ea1f31389498ea51685db00af7251fd;p=ceph-ci.git rgw: move aio throttles to namespace rgw they're useful outside of putobj Signed-off-by: Casey Bodley --- diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 99add82c707..502ab0891ac 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -53,6 +53,7 @@ set(librgw_common_srcs rgw_acl.cc rgw_acl_s3.cc rgw_acl_swift.cc + rgw_aio_throttle.cc rgw_auth.cc rgw_auth_s3.cc rgw_basic_types.cc @@ -97,7 +98,6 @@ set(librgw_common_srcs rgw_policy_s3.cc rgw_putobj.cc rgw_putobj_processor.cc - rgw_putobj_throttle.cc rgw_quota.cc rgw_rados.cc rgw_resolve.cc diff --git a/src/rgw/rgw_aio.h b/src/rgw/rgw_aio.h new file mode 100644 index 00000000000..25f828a9dc4 --- /dev/null +++ b/src/rgw/rgw_aio.h @@ -0,0 +1,83 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include +#include "rgw_common.h" +#include "services/svc_rados.h" // cant forward declare RGWSI_RADOS::Obj + +namespace librados { +class ObjectReadOperation; +class ObjectWriteOperation; +} + +namespace rgw { + +struct AioResult { + rgw_raw_obj obj; + int result = 0; +}; +struct AioResultEntry : AioResult, boost::intrusive::list_base_hook<> { + virtual ~AioResultEntry() {} +}; +// a list of polymorphic entries that frees them on destruction +template +struct OwningList : boost::intrusive::list { + OwningList() = default; + ~OwningList() { this->clear_and_dispose(std::default_delete{}); } + OwningList(OwningList&&) = default; + OwningList& operator=(OwningList&&) = default; + OwningList(const OwningList&) = delete; + OwningList& operator=(const OwningList&) = delete; +}; +using AioResultList = OwningList; + +// returns the first error code or 0 if all succeeded +inline int check_for_errors(const AioResultList& results) { + for (auto& e : results) { + if (e.result < 0) { + return e.result; + } + } + return 0; +} + +// interface to submit async librados operations and wait on their completions. +// each call returns a list of results from prior completions +class Aio { + public: + virtual ~Aio() {} + + virtual AioResultList submit(RGWSI_RADOS::Obj& obj, + const rgw_raw_obj& raw_obj, + librados::ObjectReadOperation *op, + bufferlist *data, uint64_t cost) = 0; + + virtual AioResultList submit(RGWSI_RADOS::Obj& obj, + const rgw_raw_obj& raw_obj, + librados::ObjectWriteOperation *op, + uint64_t cost) = 0; + + // poll for any ready completions without waiting + virtual AioResultList poll() = 0; + + // return any ready completions. if there are none, wait for the next + virtual AioResultList wait() = 0; + + // wait for all outstanding completions and return their results + virtual AioResultList drain() = 0; +}; + +} // namespace rgw diff --git a/src/rgw/rgw_aio_throttle.cc b/src/rgw/rgw_aio_throttle.cc new file mode 100644 index 00000000000..bf8a8a82722 --- /dev/null +++ b/src/rgw/rgw_aio_throttle.cc @@ -0,0 +1,152 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "include/rados/librados.hpp" + +#include "rgw_aio_throttle.h" +#include "rgw_rados.h" + +namespace rgw { + +void AioThrottle::aio_cb(void *cb, void *arg) +{ + Pending& p = *static_cast(arg); + p.result = p.completion->get_return_value(); + p.parent->put(p); +} + +bool AioThrottle::waiter_ready() const +{ + switch (waiter) { + case Wait::Available: return is_available(); + case Wait::Completion: return has_completion(); + case Wait::Drained: return is_drained(); + default: return false; + } +} + +AioResultList AioThrottle::submit(RGWSI_RADOS::Obj& obj, + const rgw_raw_obj& raw_obj, + librados::ObjectWriteOperation *op, + uint64_t cost) +{ + auto p = std::make_unique(); + p->obj = raw_obj; + p->cost = cost; + + if (cost > window) { + p->result = -EDEADLK; // would never succeed + completed.push_back(*p); + } else { + get(*p); + p->result = obj.aio_operate(p->completion, op); + if (p->result < 0) { + put(*p); + } + } + p.release(); + return std::move(completed); +} + +AioResultList AioThrottle::submit(RGWSI_RADOS::Obj& obj, + const rgw_raw_obj& raw_obj, + librados::ObjectReadOperation *op, + bufferlist *data, uint64_t cost) +{ + auto p = std::make_unique(); + p->obj = raw_obj; + p->cost = cost; + + if (cost > window) { + p->result = -EDEADLK; // would never succeed + completed.push_back(*p); + } else { + get(*p); + p->result = obj.aio_operate(p->completion, op, data); + if (p->result < 0) { + put(*p); + } + } + p.release(); + return std::move(completed); +} + +void AioThrottle::get(Pending& p) +{ + std::unique_lock lock{mutex}; + + // wait for the write size to become available + pending_size += p.cost; + if (!is_available()) { + ceph_assert(waiter == Wait::None); + waiter = Wait::Available; + cond.wait(lock, [this] { return is_available(); }); + waiter = Wait::None; + } + + // register the pending write and attach a completion + p.parent = this; + p.completion = librados::Rados::aio_create_completion(&p, nullptr, aio_cb); + pending.push_back(p); +} + +void AioThrottle::put(Pending& p) +{ + p.completion->release(); + p.completion = nullptr; + + std::scoped_lock lock{mutex}; + + // move from pending to completed + pending.erase(pending.iterator_to(p)); + completed.push_back(p); + + pending_size -= p.cost; + + if (waiter_ready()) { + cond.notify_one(); + } +} + +AioResultList AioThrottle::poll() +{ + std::unique_lock lock{mutex}; + return std::move(completed); +} + +AioResultList AioThrottle::wait() +{ + std::unique_lock lock{mutex}; + if (completed.empty() && !pending.empty()) { + ceph_assert(waiter == Wait::None); + waiter = Wait::Completion; + cond.wait(lock, [this] { return has_completion(); }); + waiter = Wait::None; + } + return std::move(completed); +} + +AioResultList AioThrottle::drain() +{ + std::unique_lock lock{mutex}; + if (!pending.empty()) { + ceph_assert(waiter == Wait::None); + waiter = Wait::Drained; + cond.wait(lock, [this] { return is_drained(); }); + waiter = Wait::None; + } + return std::move(completed); +} + +} // namespace rgw diff --git a/src/rgw/rgw_aio_throttle.h b/src/rgw/rgw_aio_throttle.h new file mode 100644 index 00000000000..1f9c53d4b9b --- /dev/null +++ b/src/rgw/rgw_aio_throttle.h @@ -0,0 +1,85 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include +#include "common/ceph_mutex.h" +#include "services/svc_rados.h" +#include "rgw_aio.h" + +namespace librados { +class AioCompletion; +} + +namespace rgw { + +// a throttle for aio operations that enforces a maximum window on outstanding +// bytes. only supports a single waiter, so all public functions must be called +// from the same thread +class AioThrottle : public Aio { + protected: + const uint64_t window; + uint64_t pending_size = 0; + + bool is_available() const { return pending_size <= window; } + bool has_completion() const { return !completed.empty(); } + bool is_drained() const { return pending.empty(); } + + struct Pending : AioResultEntry { + AioThrottle *parent = nullptr; + uint64_t cost = 0; + librados::AioCompletion *completion = nullptr; + }; + OwningList pending; + AioResultList completed; + + enum class Wait { None, Available, Completion, Drained }; + Wait waiter = Wait::None; + + bool waiter_ready() const; + + ceph::mutex mutex = ceph::make_mutex("AioThrottle"); + ceph::condition_variable cond; + + void get(Pending& p); + void put(Pending& p); + + static void aio_cb(void *cb, void *arg); + + public: + AioThrottle(uint64_t window) : window(window) {} + + virtual ~AioThrottle() { + // must drain before destructing + ceph_assert(pending.empty()); + ceph_assert(completed.empty()); + } + + AioResultList submit(RGWSI_RADOS::Obj& obj, const rgw_raw_obj& raw_obj, + librados::ObjectReadOperation *op, + bufferlist *data, uint64_t cost) override; + + AioResultList submit(RGWSI_RADOS::Obj& obj, const rgw_raw_obj& raw_obj, + librados::ObjectWriteOperation *op, + uint64_t cost) override; + + AioResultList poll() override; + + AioResultList wait() override; + + AioResultList drain() override; +}; + +} // namespace rgw diff --git a/src/rgw/rgw_file.h b/src/rgw/rgw_file.h index 8702b0060dc..f91399b6de5 100644 --- a/src/rgw/rgw_file.h +++ b/src/rgw/rgw_file.h @@ -35,7 +35,7 @@ #include "rgw_ldap.h" #include "rgw_token.h" #include "rgw_putobj_processor.h" -#include "rgw_putobj_throttle.h" +#include "rgw_aio_throttle.h" #include "rgw_compression.h" @@ -2320,7 +2320,7 @@ public: const std::string& bucket_name; const std::string& obj_name; RGWFileHandle* rgw_fh; - std::optional aio; + std::optional aio; std::optional processor; rgw::putobj::DataProcessor* filter; boost::optional compressor; diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 552e3894ff5..2f8044981fb 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -30,6 +30,7 @@ #include "rgw_acl.h" #include "rgw_acl_s3.h" #include "rgw_acl_swift.h" +#include "rgw_aio_throttle.h" #include "rgw_user.h" #include "rgw_bucket.h" #include "rgw_log.h" @@ -45,7 +46,6 @@ #include "rgw_role.h" #include "rgw_tag_s3.h" #include "rgw_putobj_processor.h" -#include "rgw_putobj_throttle.h" #include "services/svc_zone.h" #include "services/svc_quota.h" @@ -3468,8 +3468,8 @@ void RGWPutObj::execute() } // create the object processor + rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size); using namespace rgw::putobj; - AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size); constexpr auto max_processor_size = std::max(sizeof(MultipartObjectProcessor), sizeof(AtomicObjectProcessor)); ceph::static_ptr processor; @@ -3806,8 +3806,8 @@ void RGWPostObj::execute() store->gen_rand_obj_instance_name(&obj); } + rgw::AioThrottle aio(s->cct->_conf->rgw_put_obj_min_window_size); using namespace rgw::putobj; - AioThrottle aio(s->cct->_conf->rgw_put_obj_min_window_size); AtomicObjectProcessor processor(&aio, store, s->bucket_info, s->bucket_owner.get_id(), *static_cast(s->obj_ctx), @@ -6579,8 +6579,8 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path, store->gen_rand_obj_instance_name(&obj); } + rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size); using namespace rgw::putobj; - AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size); AtomicObjectProcessor processor(&aio, store, binfo, bowner.get_id(), obj_ctx, obj, 0, s->req_id); diff --git a/src/rgw/rgw_putobj_aio.h b/src/rgw/rgw_putobj_aio.h deleted file mode 100644 index 73a07cc656b..00000000000 --- a/src/rgw/rgw_putobj_aio.h +++ /dev/null @@ -1,83 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2018 Red Hat, Inc. - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#pragma once - -#include -#include "rgw_common.h" -#include "services/svc_rados.h" // cant forward declare RGWSI_RADOS::Obj - -namespace librados { -class ObjectReadOperation; -class ObjectWriteOperation; -} - -namespace rgw::putobj { - -struct Result { - rgw_raw_obj obj; - int result = 0; -}; -struct ResultEntry : Result, boost::intrusive::list_base_hook<> { - virtual ~ResultEntry() {} -}; -// a list of polymorphic entries that frees them on destruction -template -struct OwningList : boost::intrusive::list { - OwningList() = default; - ~OwningList() { this->clear_and_dispose(std::default_delete{}); } - OwningList(OwningList&&) = default; - OwningList& operator=(OwningList&&) = default; - OwningList(const OwningList&) = delete; - OwningList& operator=(const OwningList&) = delete; -}; -using ResultList = OwningList; - -// returns the first error code or 0 if all succeeded -inline int check_for_errors(const ResultList& results) { - for (auto& e : results) { - if (e.result < 0) { - return e.result; - } - } - return 0; -} - -// interface to submit async librados operations and wait on their completions. -// each call returns a list of results from prior completions -class Aio { - public: - virtual ~Aio() {} - - virtual ResultList submit(RGWSI_RADOS::Obj& obj, - const rgw_raw_obj& raw_obj, - librados::ObjectReadOperation *op, - bufferlist *data, uint64_t cost) = 0; - - virtual ResultList submit(RGWSI_RADOS::Obj& obj, - const rgw_raw_obj& raw_obj, - librados::ObjectWriteOperation *op, - uint64_t cost) = 0; - - // poll for any ready completions without waiting - virtual ResultList poll() = 0; - - // return any ready completions. if there are none, wait for the next - virtual ResultList wait() = 0; - - // wait for all outstanding completions and return their results - virtual ResultList drain() = 0; -}; - -} // namespace rgw::putobj diff --git a/src/rgw/rgw_putobj_processor.cc b/src/rgw/rgw_putobj_processor.cc index 315f36f84bd..4dbb2599fca 100644 --- a/src/rgw/rgw_putobj_processor.cc +++ b/src/rgw/rgw_putobj_processor.cc @@ -12,7 +12,7 @@ * */ -#include "rgw_putobj_aio.h" +#include "rgw_aio.h" #include "rgw_putobj_processor.h" #include "rgw_multi.h" #include "services/svc_sys_obj.h" @@ -58,7 +58,7 @@ int HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset) } -static int process_completed(const ResultList& completed, RawObjSet *written) +static int process_completed(const AioResultList& completed, RawObjSet *written) { std::optional error; for (auto& r : completed) { diff --git a/src/rgw/rgw_putobj_processor.h b/src/rgw/rgw_putobj_processor.h index 65c26f8e3be..4aca1a3225d 100644 --- a/src/rgw/rgw_putobj_processor.h +++ b/src/rgw/rgw_putobj_processor.h @@ -20,7 +20,11 @@ #include "rgw_rados.h" #include "services/svc_rados.h" -namespace rgw::putobj { +namespace rgw { + +class Aio; + +namespace putobj { // a data consumer that writes an object in a bucket class ObjectProcessor : public DataProcessor { @@ -67,7 +71,6 @@ class HeadObjectProcessor : public ObjectProcessor { }; -class Aio; using RawObjSet = std::set; // a data sink that writes to rados objects and deletes them on cancelation @@ -210,4 +213,5 @@ class MultipartObjectProcessor : public ManifestObjectProcessor { rgw_zone_set *zones_trace, bool *canceled) override; }; -} // namespace rgw::putobj +} // namespace putobj +} // namespace rgw diff --git a/src/rgw/rgw_putobj_throttle.cc b/src/rgw/rgw_putobj_throttle.cc deleted file mode 100644 index 65878884b46..00000000000 --- a/src/rgw/rgw_putobj_throttle.cc +++ /dev/null @@ -1,152 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2018 Red Hat, Inc. - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include "include/rados/librados.hpp" - -#include "rgw_putobj_throttle.h" -#include "rgw_rados.h" - -namespace rgw::putobj { - -void AioThrottle::aio_cb(void *cb, void *arg) -{ - Pending& p = *static_cast(arg); - p.result = p.completion->get_return_value(); - p.parent->put(p); -} - -bool AioThrottle::waiter_ready() const -{ - switch (waiter) { - case Wait::Available: return is_available(); - case Wait::Completion: return has_completion(); - case Wait::Drained: return is_drained(); - default: return false; - } -} - -ResultList AioThrottle::submit(RGWSI_RADOS::Obj& obj, - const rgw_raw_obj& raw_obj, - librados::ObjectWriteOperation *op, - uint64_t cost) -{ - auto p = std::make_unique(); - p->obj = raw_obj; - p->cost = cost; - - if (cost > window) { - p->result = -EDEADLK; // would never succeed - completed.push_back(*p); - } else { - get(*p); - p->result = obj.aio_operate(p->completion, op); - if (p->result < 0) { - put(*p); - } - } - p.release(); - return std::move(completed); -} - -ResultList AioThrottle::submit(RGWSI_RADOS::Obj& obj, - const rgw_raw_obj& raw_obj, - librados::ObjectReadOperation *op, - bufferlist *data, uint64_t cost) -{ - auto p = std::make_unique(); - p->obj = raw_obj; - p->cost = cost; - - if (cost > window) { - p->result = -EDEADLK; // would never succeed - completed.push_back(*p); - } else { - get(*p); - p->result = obj.aio_operate(p->completion, op, data); - if (p->result < 0) { - put(*p); - } - } - p.release(); - return std::move(completed); -} - -void AioThrottle::get(Pending& p) -{ - std::unique_lock lock{mutex}; - - // wait for the write size to become available - pending_size += p.cost; - if (!is_available()) { - ceph_assert(waiter == Wait::None); - waiter = Wait::Available; - cond.wait(lock, [this] { return is_available(); }); - waiter = Wait::None; - } - - // register the pending write and attach a completion - p.parent = this; - p.completion = librados::Rados::aio_create_completion(&p, nullptr, aio_cb); - pending.push_back(p); -} - -void AioThrottle::put(Pending& p) -{ - p.completion->release(); - p.completion = nullptr; - - std::scoped_lock lock{mutex}; - - // move from pending to completed - pending.erase(pending.iterator_to(p)); - completed.push_back(p); - - pending_size -= p.cost; - - if (waiter_ready()) { - cond.notify_one(); - } -} - -ResultList AioThrottle::poll() -{ - std::unique_lock lock{mutex}; - return std::move(completed); -} - -ResultList AioThrottle::wait() -{ - std::unique_lock lock{mutex}; - if (completed.empty() && !pending.empty()) { - ceph_assert(waiter == Wait::None); - waiter = Wait::Completion; - cond.wait(lock, [this] { return has_completion(); }); - waiter = Wait::None; - } - return std::move(completed); -} - -ResultList AioThrottle::drain() -{ - std::unique_lock lock{mutex}; - if (!pending.empty()) { - ceph_assert(waiter == Wait::None); - waiter = Wait::Drained; - cond.wait(lock, [this] { return is_drained(); }); - waiter = Wait::None; - } - return std::move(completed); -} - -} // namespace rgw::putobj diff --git a/src/rgw/rgw_putobj_throttle.h b/src/rgw/rgw_putobj_throttle.h deleted file mode 100644 index 3421231d150..00000000000 --- a/src/rgw/rgw_putobj_throttle.h +++ /dev/null @@ -1,85 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2018 Red Hat, Inc. - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#pragma once - -#include -#include "common/ceph_mutex.h" -#include "services/svc_rados.h" -#include "rgw_putobj_aio.h" - -namespace librados { -class AioCompletion; -} - -namespace rgw::putobj { - -// a throttle for aio operations that enforces a maximum window on outstanding -// bytes. only supports a single waiter, so all public functions must be called -// from the same thread -class AioThrottle : public Aio { - protected: - const uint64_t window; - uint64_t pending_size = 0; - - bool is_available() const { return pending_size <= window; } - bool has_completion() const { return !completed.empty(); } - bool is_drained() const { return pending.empty(); } - - struct Pending : ResultEntry { - AioThrottle *parent = nullptr; - uint64_t cost = 0; - librados::AioCompletion *completion = nullptr; - }; - OwningList pending; - ResultList completed; - - enum class Wait { None, Available, Completion, Drained }; - Wait waiter = Wait::None; - - bool waiter_ready() const; - - ceph::mutex mutex = ceph::make_mutex("AioThrottle"); - ceph::condition_variable cond; - - void get(Pending& p); - void put(Pending& p); - - static void aio_cb(void *cb, void *arg); - - public: - AioThrottle(uint64_t window) : window(window) {} - - virtual ~AioThrottle() { - // must drain before destructing - ceph_assert(pending.empty()); - ceph_assert(completed.empty()); - } - - ResultList submit(RGWSI_RADOS::Obj& obj, const rgw_raw_obj& raw_obj, - librados::ObjectReadOperation *op, - bufferlist *data, uint64_t cost) override; - - ResultList submit(RGWSI_RADOS::Obj& obj, const rgw_raw_obj& raw_obj, - librados::ObjectWriteOperation *op, - uint64_t cost) override; - - ResultList poll() override; - - ResultList wait() override; - - ResultList drain() override; -}; - -} // namespace rgw::putobj diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 93ce2247ee4..080195b6597 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -24,12 +24,12 @@ #include "rgw_cache.h" #include "rgw_acl.h" #include "rgw_acl_s3.h" /* for dumping s3policy in debug log */ +#include "rgw_aio_throttle.h" #include "rgw_bucket.h" #include "rgw_rest_conn.h" #include "rgw_cr_rados.h" #include "rgw_cr_rest.h" #include "rgw_putobj_processor.h" -#include "rgw_putobj_throttle.h" #include "cls/rgw/cls_rgw_ops.h" #include "cls/rgw/cls_rgw_client.h" @@ -4183,8 +4183,8 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx, obj_time_weight set_mtime_weight; set_mtime_weight.high_precision = high_precision_time; + rgw::AioThrottle aio(cct->_conf->rgw_put_obj_min_window_size); using namespace rgw::putobj; - AioThrottle aio(cct->_conf->rgw_put_obj_min_window_size); AtomicObjectProcessor processor(&aio, this, dest_bucket_info, user_id, obj_ctx, dest_obj, olh_epoch, tag); int ret = processor.prepare(); @@ -4724,8 +4724,8 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx, string tag; append_rand_alpha(cct, tag, tag, 32); + rgw::AioThrottle aio(cct->_conf->rgw_put_obj_min_window_size); using namespace rgw::putobj; - AioThrottle aio(cct->_conf->rgw_put_obj_min_window_size); AtomicObjectProcessor processor(&aio, this, dest_bucket_info, dest_bucket_info.owner, obj_ctx, dest_obj, olh_epoch, tag); diff --git a/src/test/rgw/test_rgw_throttle.cc b/src/test/rgw/test_rgw_throttle.cc index 322e58ce815..37132816c77 100644 --- a/src/test/rgw/test_rgw_throttle.cc +++ b/src/test/rgw/test_rgw_throttle.cc @@ -12,7 +12,7 @@ * */ -#include "rgw/rgw_putobj_throttle.h" +#include "rgw/rgw_aio_throttle.h" #include "rgw/rgw_rados.h" #include "include/rados/librados.hpp" @@ -54,18 +54,18 @@ class RadosFixture : public ::testing::Test { } }; -using PutObj_Throttle = RadosFixture; +using Aio_Throttle = RadosFixture; -namespace rgw::putobj { +namespace rgw { -inline bool operator==(const Result& lhs, const Result& rhs) { +inline bool operator==(const AioResult& lhs, const AioResult& rhs) { return lhs.obj == rhs.obj && lhs.result == rhs.result; } -std::ostream& operator<<(std::ostream& out, const Result& r) { +std::ostream& operator<<(std::ostream& out, const AioResult& r) { return out << "{r=" << r.result << " obj='" << r.obj << "'"; } -TEST_F(PutObj_Throttle, NoThrottleUpToMax) +TEST_F(Aio_Throttle, NoThrottleUpToMax) { AioThrottle throttle(4); auto raw = make_raw_obj(__PRETTY_FUNCTION__); @@ -89,11 +89,11 @@ TEST_F(PutObj_Throttle, NoThrottleUpToMax) auto completions = throttle.drain(); ASSERT_EQ(4u, completions.size()); for (auto& c : completions) { - EXPECT_EQ(Result({raw, -EINVAL}), c); + EXPECT_EQ(AioResult({raw, -EINVAL}), c); } } -TEST_F(PutObj_Throttle, CostOverWindow) +TEST_F(Aio_Throttle, CostOverWindow) { AioThrottle throttle(4); auto raw = make_raw_obj(__PRETTY_FUNCTION__); @@ -102,10 +102,10 @@ TEST_F(PutObj_Throttle, CostOverWindow) librados::ObjectWriteOperation op; auto c = throttle.submit(obj, raw, &op, 8); ASSERT_EQ(1u, c.size()); - EXPECT_EQ(Result({raw, -EDEADLK}), c.front()); + EXPECT_EQ(AioResult({raw, -EDEADLK}), c.front()); } -TEST_F(PutObj_Throttle, AioThrottleOverMax) +TEST_F(Aio_Throttle, ThrottleOverMax) { constexpr uint64_t window = 4; AioThrottle throttle(window); @@ -133,4 +133,4 @@ TEST_F(PutObj_Throttle, AioThrottleOverMax) EXPECT_EQ(window, max_outstanding); } -} // namespace rgw::putobj +} // namespace rgw