From: Casey Bodley Date: Wed, 10 Oct 2018 12:04:31 +0000 (-0400) Subject: rgw: add YieldingAioThrottle X-Git-Tag: v15.1.0~2838^2~7 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=3a2830325d01342f3f892174620485daf93e673f;p=ceph.git rgw: add YieldingAioThrottle Signed-off-by: Casey Bodley --- diff --git a/src/rgw/rgw_aio_throttle.cc b/src/rgw/rgw_aio_throttle.cc index 8d3369c47c8..3bad71eb8ff 100644 --- a/src/rgw/rgw_aio_throttle.cc +++ b/src/rgw/rgw_aio_throttle.cc @@ -110,4 +110,98 @@ AioResultList BlockingAioThrottle::drain() return std::move(completed); } +#ifdef HAVE_BOOST_CONTEXT + +template +auto YieldingAioThrottle::async_wait(CompletionToken&& token) +{ + using boost::asio::async_completion; + using Signature = void(boost::system::error_code); + async_completion init(token); + completion = Completion::create(context.get_executor(), + std::move(init.completion_handler)); + return init.result.get(); +} + +AioResultList YieldingAioThrottle::get(const RGWSI_RADOS::Obj& obj, + OpFunc&& f, + uint64_t cost, uint64_t id) +{ + auto p = std::make_unique(); + p->obj = obj; + p->id = id; + p->cost = cost; + + if (cost > window) { + p->result = -EDEADLK; // would never succeed + completed.push_back(*p); + } else { + // wait for the write size to become available + pending_size += p->cost; + if (!is_available()) { + ceph_assert(waiter == Wait::None); + ceph_assert(!completion); + + boost::system::error_code ec; + waiter = Wait::Available; + async_wait(yield[ec]); + } + + // register the pending write and initiate the operation + pending.push_back(*p); + std::move(f)(this, *static_cast(p.get())); + } + p.release(); + return std::move(completed); +} + +void YieldingAioThrottle::put(AioResult& r) +{ + auto& p = static_cast(r); + + // move from pending to completed + pending.erase(pending.iterator_to(p)); + completed.push_back(p); + + pending_size -= p.cost; + + if (waiter_ready()) { + ceph_assert(completion); + ceph::async::post(std::move(completion), boost::system::error_code{}); + waiter = Wait::None; + } +} + +AioResultList YieldingAioThrottle::poll() +{ + return std::move(completed); +} + +AioResultList YieldingAioThrottle::wait() +{ + if (!has_completion() && !pending.empty()) { + ceph_assert(waiter == Wait::None); + ceph_assert(!completion); + + boost::system::error_code ec; + waiter = Wait::Completion; + async_wait(yield[ec]); + } + return std::move(completed); +} + +AioResultList YieldingAioThrottle::drain() +{ + if (!is_drained()) { + ceph_assert(waiter == Wait::None); + ceph_assert(!completion); + + boost::system::error_code ec; + waiter = Wait::Drained; + async_wait(yield[ec]); + } + return std::move(completed); +} +#endif // HAVE_BOOST_CONTEXT + } // namespace rgw diff --git a/src/rgw/rgw_aio_throttle.h b/src/rgw/rgw_aio_throttle.h index eeb12f826f5..613abb534b0 100644 --- a/src/rgw/rgw_aio_throttle.h +++ b/src/rgw/rgw_aio_throttle.h @@ -18,6 +18,7 @@ #include "include/rados/librados_fwd.hpp" #include #include "common/ceph_mutex.h" +#include "common/async/completion.h" #include "services/svc_rados.h" #include "rgw_aio.h" @@ -76,4 +77,40 @@ class BlockingAioThrottle final : public Aio, private Throttle { AioResultList drain() override final; }; +#ifdef HAVE_BOOST_CONTEXT +// a throttle that yields the coroutine instead of blocking. all public +// functions must be called within the coroutine strand +class YieldingAioThrottle final : public Aio, private Throttle { + boost::asio::io_context& context; + boost::asio::yield_context yield; + struct Handler; + + // completion callback associated with the waiter + using Completion = ceph::async::Completion; + std::unique_ptr completion; + + template + auto async_wait(CompletionToken&& token); + + struct Pending : AioResultEntry { uint64_t cost = 0; }; + + public: + YieldingAioThrottle(uint64_t window, boost::asio::io_context& context, + boost::asio::yield_context yield) + : Throttle(window), context(context), yield(yield) + {} + + AioResultList get(const RGWSI_RADOS::Obj& obj, OpFunc&& f, + uint64_t cost, uint64_t id) override final; + + void put(AioResult& r) override final; + + AioResultList poll() override final; + + AioResultList wait() override final; + + AioResultList drain() override final; +}; +#endif // HAVE_BOOST_CONTEXT + } // namespace rgw diff --git a/src/test/rgw/test_rgw_throttle.cc b/src/test/rgw/test_rgw_throttle.cc index 5a14b1a9cca..4eb3110725f 100644 --- a/src/test/rgw/test_rgw_throttle.cc +++ b/src/test/rgw/test_rgw_throttle.cc @@ -17,6 +17,9 @@ #include "include/rados/librados.hpp" +#ifdef HAVE_BOOST_CONTEXT +#include +#endif #include struct RadosEnv : public ::testing::Environment { @@ -120,4 +123,53 @@ TEST_F(Aio_Throttle, ThrottleOverMax) EXPECT_EQ(window, max_outstanding); } +#ifdef HAVE_BOOST_CONTEXT +TEST_F(Aio_Throttle, YieldCostOverWindow) +{ + auto obj = make_obj(__PRETTY_FUNCTION__); + + boost::asio::io_context context; + boost::asio::spawn(context, + [&] (boost::asio::yield_context yield) { + YieldingAioThrottle throttle(4, context, yield); + librados::ObjectWriteOperation op; + auto c = throttle.submit(obj, &op, 8, 0); + ASSERT_EQ(1u, c.size()); + EXPECT_EQ(-EDEADLK, c.front().result); + }); +} + +TEST_F(Aio_Throttle, YieldingThrottleOverMax) +{ + constexpr uint64_t window = 4; + + auto obj = make_obj(__PRETTY_FUNCTION__); + + // issue 32 writes, and verify that max_outstanding <= window + constexpr uint64_t total = 32; + uint64_t max_outstanding = 0; + uint64_t outstanding = 0; + + boost::asio::io_context context; + boost::asio::spawn(context, + [&] (boost::asio::yield_context yield) { + YieldingAioThrottle throttle(window, context, yield); + for (uint64_t i = 0; i < total; i++) { + librados::ObjectWriteOperation op; + auto c = throttle.submit(obj, &op, 1, 0); + outstanding++; + outstanding -= c.size(); + if (max_outstanding < outstanding) { + max_outstanding = outstanding; + } + } + auto c = throttle.drain(); + outstanding -= c.size(); + }); + context.run(); + EXPECT_EQ(0u, outstanding); + EXPECT_EQ(window, max_outstanding); +} +#endif // HAVE_BOOST_CONTEXT + } // namespace rgw