namespace rgw {
-bool AioThrottle::waiter_ready() const
+bool Throttle::waiter_ready() const
{
switch (waiter) {
case Wait::Available: return is_available();
}
}
-AioResultList AioThrottle::get(const RGWSI_RADOS::Obj& obj,
- OpFunc&& f,
- uint64_t cost, uint64_t id)
+AioResultList BlockingAioThrottle::get(const RGWSI_RADOS::Obj& obj,
+ OpFunc&& f,
+ uint64_t cost, uint64_t id)
{
auto p = std::make_unique<Pending>();
p->obj = obj;
return std::move(completed);
}
-void AioThrottle::put(AioResult& r)
+void BlockingAioThrottle::put(AioResult& r)
{
auto& p = static_cast<Pending&>(r);
std::scoped_lock lock{mutex};
}
}
-AioResultList AioThrottle::poll()
+AioResultList BlockingAioThrottle::poll()
{
std::unique_lock lock{mutex};
return std::move(completed);
}
-AioResultList AioThrottle::wait()
+AioResultList BlockingAioThrottle::wait()
{
std::unique_lock lock{mutex};
if (completed.empty() && !pending.empty()) {
return std::move(completed);
}
-AioResultList AioThrottle::drain()
+AioResultList BlockingAioThrottle::drain()
{
std::unique_lock lock{mutex};
if (!pending.empty()) {
namespace rgw {
-// a throttle for aio operations that enforces a maximum window on outstanding
-// bytes. only supports a single waiter, so all public functions must be called
-// from the same thread
-class AioThrottle : public Aio {
+class Throttle {
protected:
const uint64_t window;
uint64_t pending_size = 0;
+ AioResultList pending;
+ AioResultList completed;
+
bool is_available() const { return pending_size <= window; }
bool has_completion() const { return !completed.empty(); }
bool is_drained() const { return pending.empty(); }
- struct Pending : AioResultEntry {
- AioThrottle *parent = nullptr;
- uint64_t cost = 0;
- };
- OwningList<Pending> pending;
- AioResultList completed;
-
enum class Wait { None, Available, Completion, Drained };
Wait waiter = Wait::None;
bool waiter_ready() const;
- ceph::mutex mutex = ceph::make_mutex("AioThrottle");
- ceph::condition_variable cond;
-
public:
- AioThrottle(uint64_t window) : window(window) {}
+ Throttle(uint64_t window) : window(window) {}
- virtual ~AioThrottle() {
+ ~Throttle() {
// must drain before destructing
ceph_assert(pending.empty());
ceph_assert(completed.empty());
}
+};
+
+// a throttle for aio operations. all public functions must be called from
+// the same thread
+class BlockingAioThrottle final : public Aio, private Throttle {
+ ceph::mutex mutex = ceph::make_mutex("AioThrottle");
+ ceph::condition_variable cond;
+
+ struct Pending : AioResultEntry {
+ BlockingAioThrottle *parent = nullptr;
+ uint64_t cost = 0;
+ librados::AioCompletion *completion = nullptr;
+ };
+ public:
+ BlockingAioThrottle(uint64_t window) : Throttle(window) {}
- AioResultList get(const RGWSI_RADOS::Obj& obj,
- OpFunc&& f,
- uint64_t cost, uint64_t id) override;
- void put(AioResult& r) override;
+ AioResultList get(const RGWSI_RADOS::Obj& obj, OpFunc&& f,
+ uint64_t cost, uint64_t id) override final;
+ void put(AioResult& r) override final;
- AioResultList poll() override;
+ AioResultList poll() override final;
- AioResultList wait() override;
+ AioResultList wait() override final;
- AioResultList drain() override;
+ AioResultList drain() override final;
};
} // namespace rgw
const std::string& bucket_name;
const std::string& obj_name;
RGWFileHandle* rgw_fh;
- std::optional<rgw::AioThrottle> aio;
+ std::optional<rgw::BlockingAioThrottle> aio;
std::optional<rgw::putobj::AtomicObjectProcessor> processor;
rgw::putobj::DataProcessor* filter;
boost::optional<RGWPutObj_Compress> compressor;
}
// create the object processor
- rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
+ rgw::BlockingAioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
using namespace rgw::putobj;
constexpr auto max_processor_size = std::max({sizeof(MultipartObjectProcessor),
sizeof(AtomicObjectProcessor),
store->gen_rand_obj_instance_name(&obj);
}
- rgw::AioThrottle aio(s->cct->_conf->rgw_put_obj_min_window_size);
+ rgw::BlockingAioThrottle aio(s->cct->_conf->rgw_put_obj_min_window_size);
using namespace rgw::putobj;
AtomicObjectProcessor processor(&aio, store, s->bucket_info,
rgw_placement_rule dest_placement = s->dest_placement;
dest_placement.inherit_from(binfo.placement_rule);
- rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
+ rgw::BlockingAioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
using namespace rgw::putobj;
-
AtomicObjectProcessor processor(&aio, store, binfo, &s->dest_placement, bowner.get_id(),
obj_ctx, obj, 0, s->req_id, this);
set_mtime_weight.high_precision = high_precision_time;
int ret;
- rgw::AioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
+ rgw::BlockingAioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
using namespace rgw::putobj;
const rgw_placement_rule *ptail_rule = (dest_placement_rule ? &(*dest_placement_rule) : nullptr);
AtomicObjectProcessor processor(&aio, this, dest_bucket_info, ptail_rule, user_id,
string tag;
append_rand_alpha(cct, tag, tag, 32);
- rgw::AioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
+ rgw::BlockingAioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
using namespace rgw::putobj;
AtomicObjectProcessor processor(&aio, this, dest_bucket_info, &dest_placement,
dest_bucket_info.owner, obj_ctx,
const uint64_t chunk_size = cct->_conf->rgw_get_obj_max_req_size;
const uint64_t window_size = cct->_conf->rgw_get_obj_window_size;
- rgw::AioThrottle aio(window_size);
+ rgw::BlockingAioThrottle aio(window_size);
get_obj_data data(store, cb, &aio, ofs);
int r = store->iterate_obj(obj_ctx, source->get_bucket_info(), state.obj,
RGWBucketInfo& bucket_info = bucket->bucket_info;
- using namespace rgw::putobj;
- rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
+ rgw::BlockingAioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
RGWObjectCtx obj_ctx(store);
rgw_obj obj(bucket_info.bucket, key);
string req_id = store->svc.zone_utils->unique_id(store->get_new_req_id());
+ using namespace rgw::putobj;
AtomicObjectProcessor processor(&aio, store, bucket_info,
nullptr,
owner.get_id(),
if (ret < 0)
return ret;
- using namespace rgw::putobj;
-
DataProcessor *filter = &processor;
CompressorRef plugin;
TEST_F(Aio_Throttle, NoThrottleUpToMax)
{
- AioThrottle throttle(4);
+ BlockingAioThrottle throttle(4);
auto obj = make_obj(__PRETTY_FUNCTION__);
{
librados::ObjectWriteOperation op1;
TEST_F(Aio_Throttle, CostOverWindow)
{
- AioThrottle throttle(4);
+ BlockingAioThrottle throttle(4);
auto obj = make_obj(__PRETTY_FUNCTION__);
librados::ObjectWriteOperation op;
TEST_F(Aio_Throttle, ThrottleOverMax)
{
constexpr uint64_t window = 4;
- AioThrottle throttle(window);
+ BlockingAioThrottle throttle(window);
auto obj = make_obj(__PRETTY_FUNCTION__);