return std::move(completed);
}
+#ifdef HAVE_BOOST_CONTEXT
+
+template <typename CompletionToken>
+auto YieldingAioThrottle::async_wait(CompletionToken&& token)
+{
+ using boost::asio::async_completion;
+ using Signature = void(boost::system::error_code);
+ async_completion<CompletionToken, Signature> init(token);
+ completion = Completion::create(context.get_executor(),
+ std::move(init.completion_handler));
+ return init.result.get();
+}
+
+AioResultList YieldingAioThrottle::get(const RGWSI_RADOS::Obj& obj,
+ OpFunc&& f,
+ uint64_t cost, uint64_t id)
+{
+ auto p = std::make_unique<Pending>();
+ p->obj = obj;
+ p->id = id;
+ p->cost = cost;
+
+ if (cost > window) {
+ p->result = -EDEADLK; // would never succeed
+ completed.push_back(*p);
+ } else {
+ // wait for the write size to become available
+ pending_size += p->cost;
+ if (!is_available()) {
+ ceph_assert(waiter == Wait::None);
+ ceph_assert(!completion);
+
+ boost::system::error_code ec;
+ waiter = Wait::Available;
+ async_wait(yield[ec]);
+ }
+
+ // register the pending write and initiate the operation
+ pending.push_back(*p);
+ std::move(f)(this, *static_cast<AioResult*>(p.get()));
+ }
+ p.release();
+ return std::move(completed);
+}
+
+void YieldingAioThrottle::put(AioResult& r)
+{
+ auto& p = static_cast<Pending&>(r);
+
+ // move from pending to completed
+ pending.erase(pending.iterator_to(p));
+ completed.push_back(p);
+
+ pending_size -= p.cost;
+
+ if (waiter_ready()) {
+ ceph_assert(completion);
+ ceph::async::post(std::move(completion), boost::system::error_code{});
+ waiter = Wait::None;
+ }
+}
+
+AioResultList YieldingAioThrottle::poll()
+{
+ return std::move(completed);
+}
+
+AioResultList YieldingAioThrottle::wait()
+{
+ if (!has_completion() && !pending.empty()) {
+ ceph_assert(waiter == Wait::None);
+ ceph_assert(!completion);
+
+ boost::system::error_code ec;
+ waiter = Wait::Completion;
+ async_wait(yield[ec]);
+ }
+ return std::move(completed);
+}
+
+AioResultList YieldingAioThrottle::drain()
+{
+ if (!is_drained()) {
+ ceph_assert(waiter == Wait::None);
+ ceph_assert(!completion);
+
+ boost::system::error_code ec;
+ waiter = Wait::Drained;
+ async_wait(yield[ec]);
+ }
+ return std::move(completed);
+}
+#endif // HAVE_BOOST_CONTEXT
+
} // namespace rgw
#include "include/rados/librados_fwd.hpp"
#include <memory>
#include "common/ceph_mutex.h"
+#include "common/async/completion.h"
#include "services/svc_rados.h"
#include "rgw_aio.h"
AioResultList drain() override final;
};
+#ifdef HAVE_BOOST_CONTEXT
+// a throttle that yields the coroutine instead of blocking. all public
+// functions must be called within the coroutine strand
+class YieldingAioThrottle final : public Aio, private Throttle {
+ boost::asio::io_context& context;
+ boost::asio::yield_context yield;
+ struct Handler;
+
+ // completion callback associated with the waiter
+ using Completion = ceph::async::Completion<void(boost::system::error_code)>;
+ std::unique_ptr<Completion> completion;
+
+ template <typename CompletionToken>
+ auto async_wait(CompletionToken&& token);
+
+ struct Pending : AioResultEntry { uint64_t cost = 0; };
+
+ public:
+ YieldingAioThrottle(uint64_t window, boost::asio::io_context& context,
+ boost::asio::yield_context yield)
+ : Throttle(window), context(context), yield(yield)
+ {}
+
+ AioResultList get(const RGWSI_RADOS::Obj& obj, OpFunc&& f,
+ uint64_t cost, uint64_t id) override final;
+
+ void put(AioResult& r) override final;
+
+ AioResultList poll() override final;
+
+ AioResultList wait() override final;
+
+ AioResultList drain() override final;
+};
+#endif // HAVE_BOOST_CONTEXT
+
} // namespace rgw
#include "include/rados/librados.hpp"
+#ifdef HAVE_BOOST_CONTEXT
+#include <boost/asio/spawn.hpp>
+#endif
#include <gtest/gtest.h>
struct RadosEnv : public ::testing::Environment {
EXPECT_EQ(window, max_outstanding);
}
+#ifdef HAVE_BOOST_CONTEXT
+TEST_F(Aio_Throttle, YieldCostOverWindow)
+{
+ auto obj = make_obj(__PRETTY_FUNCTION__);
+
+ boost::asio::io_context context;
+ boost::asio::spawn(context,
+ [&] (boost::asio::yield_context yield) {
+ YieldingAioThrottle throttle(4, context, yield);
+ librados::ObjectWriteOperation op;
+ auto c = throttle.submit(obj, &op, 8, 0);
+ ASSERT_EQ(1u, c.size());
+ EXPECT_EQ(-EDEADLK, c.front().result);
+ });
+}
+
+TEST_F(Aio_Throttle, YieldingThrottleOverMax)
+{
+ constexpr uint64_t window = 4;
+
+ auto obj = make_obj(__PRETTY_FUNCTION__);
+
+ // issue 32 writes, and verify that max_outstanding <= window
+ constexpr uint64_t total = 32;
+ uint64_t max_outstanding = 0;
+ uint64_t outstanding = 0;
+
+ boost::asio::io_context context;
+ boost::asio::spawn(context,
+ [&] (boost::asio::yield_context yield) {
+ YieldingAioThrottle throttle(window, context, yield);
+ for (uint64_t i = 0; i < total; i++) {
+ librados::ObjectWriteOperation op;
+ auto c = throttle.submit(obj, &op, 1, 0);
+ outstanding++;
+ outstanding -= c.size();
+ if (max_outstanding < outstanding) {
+ max_outstanding = outstanding;
+ }
+ }
+ auto c = throttle.drain();
+ outstanding -= c.size();
+ });
+ context.run();
+ EXPECT_EQ(0u, outstanding);
+ EXPECT_EQ(window, max_outstanding);
+}
+#endif // HAVE_BOOST_CONTEXT
+
} // namespace rgw