From 9e1292e86e5460bd66472b199a225e5cfa04b6c8 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Wed, 10 Oct 2018 15:11:54 -0400 Subject: [PATCH] rgw: add rgw::putobj::AioThrottle implement the throttling algorithm in terms of rgw::putobj::Aio. this differs from RGWPutObjProcessor_Aio in that it doesn't wait on the first pending write to complete before making process on later ones Signed-off-by: Casey Bodley --- src/rgw/CMakeLists.txt | 1 + src/rgw/rgw_putobj_throttle.cc | 150 ++++++++++++++++++++++++++++++ src/rgw/rgw_putobj_throttle.h | 83 +++++++++++++++++ src/test/rgw/CMakeLists.txt | 6 ++ src/test/rgw/test_rgw_throttle.cc | 147 +++++++++++++++++++++++++++++ 5 files changed, 387 insertions(+) create mode 100644 src/rgw/rgw_putobj_throttle.cc create mode 100644 src/rgw/rgw_putobj_throttle.h create mode 100644 src/test/rgw/test_rgw_throttle.cc diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index c830d8e79c4..cb6083321fb 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -85,6 +85,7 @@ set(librgw_common_srcs rgw_otp.cc rgw_policy_s3.cc rgw_putobj.cc + rgw_putobj_throttle.cc rgw_quota.cc rgw_rados.cc rgw_resolve.cc diff --git a/src/rgw/rgw_putobj_throttle.cc b/src/rgw/rgw_putobj_throttle.cc new file mode 100644 index 00000000000..e6bbf471195 --- /dev/null +++ b/src/rgw/rgw_putobj_throttle.cc @@ -0,0 +1,150 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "include/rados/librados.hpp" + +#include "rgw_putobj_throttle.h" +#include "rgw_rados.h" + +namespace rgw::putobj { + +void AioThrottle::aio_cb(void *cb, void *arg) +{ + Pending& p = *static_cast(arg); + p.result = p.completion->get_return_value(); + p.parent->put(p); +} + +bool AioThrottle::waiter_ready() const +{ + switch (waiter) { + case Wait::Available: return is_available(); + case Wait::Completion: return has_completion(); + case Wait::Drained: return is_drained(); + default: return false; + } +} + +ResultList AioThrottle::submit(rgw_rados_ref& ref, const rgw_raw_obj& obj, + librados::ObjectWriteOperation *op, + uint64_t cost) +{ + auto p = std::make_unique(); + p->obj = obj; + p->cost = cost; + + if (cost > window) { + p->result = -EDEADLK; // would never succeed + completed.push_back(*p); + } else { + get(*p); + p->result = ref.ioctx.aio_operate(ref.oid, p->completion, op); + if (p->result < 0) { + put(*p); + } + } + p.release(); + return std::move(completed); +} + +ResultList AioThrottle::submit(rgw_rados_ref& ref, const rgw_raw_obj& obj, + librados::ObjectReadOperation *op, + bufferlist *data, uint64_t cost) +{ + auto p = std::make_unique(); + p->obj = obj; + p->cost = cost; + + if (cost > window) { + p->result = -EDEADLK; // would never succeed + completed.push_back(*p); + } else { + get(*p); + p->result = ref.ioctx.aio_operate(ref.oid, p->completion, op, data); + if (p->result < 0) { + put(*p); + } + } + p.release(); + return std::move(completed); +} + +void AioThrottle::get(Pending& p) +{ + std::unique_lock lock{mutex}; + + // wait for the write size to become available + pending_size += p.cost; + if (!is_available()) { + ceph_assert(waiter == Wait::None); + waiter = Wait::Available; + cond.wait(lock, [this] { return is_available(); }); + waiter = Wait::None; + } + + // register the pending write and attach a completion + p.parent = this; + p.completion = librados::Rados::aio_create_completion(&p, nullptr, aio_cb); + pending.push_back(p); +} + +void AioThrottle::put(Pending& p) +{ + p.completion->release(); + p.completion = nullptr; + + std::scoped_lock lock{mutex}; + + // move from pending to completed + pending.erase(pending.iterator_to(p)); + completed.push_back(p); + + pending_size -= p.cost; + + if (waiter_ready()) { + cond.notify_one(); + } +} + +ResultList AioThrottle::poll() +{ + std::unique_lock lock{mutex}; + return std::move(completed); +} + +ResultList AioThrottle::wait() +{ + std::unique_lock lock{mutex}; + if (completed.empty() && !pending.empty()) { + ceph_assert(waiter == Wait::None); + waiter = Wait::Completion; + cond.wait(lock, [this] { return has_completion(); }); + waiter = Wait::None; + } + return std::move(completed); +} + +ResultList AioThrottle::drain() +{ + std::unique_lock lock{mutex}; + if (!pending.empty()) { + ceph_assert(waiter == Wait::None); + waiter = Wait::Drained; + cond.wait(lock, [this] { return is_drained(); }); + waiter = Wait::None; + } + return std::move(completed); +} + +} // namespace rgw::putobj diff --git a/src/rgw/rgw_putobj_throttle.h b/src/rgw/rgw_putobj_throttle.h new file mode 100644 index 00000000000..9a6b78ca631 --- /dev/null +++ b/src/rgw/rgw_putobj_throttle.h @@ -0,0 +1,83 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include +#include "common/ceph_mutex.h" +#include "rgw_putobj_aio.h" + +namespace librados { +class AioCompletion; +} + +namespace rgw::putobj { + +// a throttle for aio operations that enforces a maximum window on outstanding +// bytes. only supports a single waiter, so all public functions must be called +// from the same thread +class AioThrottle : public Aio { + protected: + const uint64_t window; + uint64_t pending_size = 0; + + bool is_available() const { return pending_size <= window; } + bool has_completion() const { return !completed.empty(); } + bool is_drained() const { return pending.empty(); } + + struct Pending : ResultEntry { + AioThrottle *parent = nullptr; + uint64_t cost = 0; + librados::AioCompletion *completion = nullptr; + }; + OwningList pending; + ResultList completed; + + enum class Wait { None, Available, Completion, Drained }; + Wait waiter = Wait::None; + + bool waiter_ready() const; + + ceph::mutex mutex = ceph::make_mutex("AioThrottle"); + ceph::condition_variable cond; + + void get(Pending& p); + void put(Pending& p); + + static void aio_cb(void *cb, void *arg); + + public: + AioThrottle(uint64_t window) : window(window) {} + + virtual ~AioThrottle() { + // must drain before destructing + ceph_assert(pending.empty()); + ceph_assert(completed.empty()); + } + + ResultList submit(rgw_rados_ref& ref, const rgw_raw_obj& obj, + librados::ObjectReadOperation *op, bufferlist *data, + uint64_t cost) override; + + ResultList submit(rgw_rados_ref& ref, const rgw_raw_obj& obj, + librados::ObjectWriteOperation *op, uint64_t cost) override; + + ResultList poll() override; + + ResultList wait() override; + + ResultList drain() override; +}; + +} // namespace rgw::putobj diff --git a/src/test/rgw/CMakeLists.txt b/src/test/rgw/CMakeLists.txt index e37c1723611..9efb55866f8 100644 --- a/src/test/rgw/CMakeLists.txt +++ b/src/test/rgw/CMakeLists.txt @@ -96,6 +96,12 @@ add_executable(unittest_rgw_putobj test_rgw_putobj.cc) add_ceph_unittest(unittest_rgw_putobj) target_link_libraries(unittest_rgw_putobj rgw_a ${UNITTEST_LIBS}) +add_executable(ceph_test_rgw_throttle + test_rgw_throttle.cc + $) +target_link_libraries(ceph_test_rgw_throttle rgw_a + librados global ${UNITTEST_LIBS}) + add_executable(unittest_rgw_iam_policy test_rgw_iam_policy.cc) add_ceph_unittest(unittest_rgw_iam_policy) target_link_libraries(unittest_rgw_iam_policy diff --git a/src/test/rgw/test_rgw_throttle.cc b/src/test/rgw/test_rgw_throttle.cc new file mode 100644 index 00000000000..ea4214a5339 --- /dev/null +++ b/src/test/rgw/test_rgw_throttle.cc @@ -0,0 +1,147 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "rgw/rgw_putobj_throttle.h" +#include "rgw/rgw_rados.h" + +#include "include/rados/librados.hpp" + +#include + +struct RadosEnv : public ::testing::Environment { + public: + static constexpr auto poolname = "ceph_test_rgw_throttle"; + + static librados::Rados rados; + static librados::IoCtx io; + + void SetUp() override { + ASSERT_EQ(0, rados.init_with_context(g_ceph_context)); + ASSERT_EQ(0, rados.connect()); + // open/create test pool + int r = rados.ioctx_create(poolname, io); + if (r == -ENOENT) { + r = rados.pool_create(poolname); + if (r == -EEXIST) { + r = 0; + } else if (r == 0) { + r = rados.ioctx_create(poolname, io); + } + } + ASSERT_EQ(0, r); + } + void TearDown() { + rados.shutdown(); + } +}; +librados::Rados RadosEnv::rados; +librados::IoCtx RadosEnv::io; + +auto *const rados_env = ::testing::AddGlobalTestEnvironment(new RadosEnv); + +// test fixture for global setup/teardown +class RadosFixture : public ::testing::Test { + protected: + librados::IoCtx& io; + + rgw_raw_obj make_obj(const std::string& oid) { + return {{RadosEnv::poolname}, oid}; + } + rgw_rados_ref make_ref(const rgw_raw_obj& obj) { + return {obj.pool, obj.oid, "", io}; + } + public: + RadosFixture() : io(RadosEnv::io) {} +}; + +using PutObj_Throttle = RadosFixture; + +namespace rgw::putobj { + +inline bool operator==(const Result& lhs, const Result& rhs) { + return lhs.obj == rhs.obj && lhs.result == rhs.result; +} +std::ostream& operator<<(std::ostream& out, const Result& r) { + return out << "{r=" << r.result << " obj='" << r.obj << "'"; +} + +TEST_F(PutObj_Throttle, NoThrottleUpToMax) +{ + AioThrottle throttle(4); + auto obj = make_obj(__PRETTY_FUNCTION__); + auto ref = make_ref(obj); + { + librados::ObjectWriteOperation op1; + auto c1 = throttle.submit(ref, obj, &op1, 1); + EXPECT_TRUE(c1.empty()); + librados::ObjectWriteOperation op2; + auto c2 = throttle.submit(ref, obj, &op2, 1); + EXPECT_TRUE(c2.empty()); + librados::ObjectWriteOperation op3; + auto c3 = throttle.submit(ref, obj, &op3, 1); + EXPECT_TRUE(c3.empty()); + librados::ObjectWriteOperation op4; + auto c4 = throttle.submit(ref, obj, &op4, 1); + EXPECT_TRUE(c4.empty()); + // no completions because no ops had to wait + auto c5 = throttle.poll(); + } + auto completions = throttle.drain(); + ASSERT_EQ(4u, completions.size()); + for (auto& c : completions) { + EXPECT_EQ(Result({obj, -EINVAL}), c); + } +} + +TEST_F(PutObj_Throttle, CostOverWindow) +{ + AioThrottle throttle(4); + auto obj = make_obj(__PRETTY_FUNCTION__); + auto ref = make_ref(obj); + + librados::ObjectWriteOperation op; + auto c = throttle.submit(ref, obj, &op, 8); + ASSERT_EQ(1u, c.size()); + EXPECT_EQ(Result({obj, -EDEADLK}), c.front()); +} + +TEST_F(PutObj_Throttle, AioThrottleOverMax) +{ + constexpr uint64_t window = 4; + AioThrottle throttle(window); + + auto obj = make_obj(__PRETTY_FUNCTION__); + auto ref = make_ref(obj); + + // issue 32 writes, and verify that max_outstanding <= window + constexpr uint64_t total = 32; + uint64_t max_outstanding = 0; + uint64_t outstanding = 0; + + for (uint64_t i = 0; i < total; i++) { + librados::ObjectWriteOperation op; + auto c = throttle.submit(ref, obj, &op, 1); + outstanding++; + outstanding -= c.size(); + if (max_outstanding < outstanding) { + max_outstanding = outstanding; + } + } + auto c = throttle.drain(); + outstanding -= c.size(); + EXPECT_EQ(0u, outstanding); + EXPECT_EQ(window, max_outstanding); +} + +} // namespace rgw::putobj -- 2.39.5