*/
#include "rgw/rgw_aio_throttle.h"
-#include "rgw/rgw_rados.h"
-#include "include/rados/librados.hpp"
+#include <optional>
+#include <thread>
+#include "include/scope_guard.h"
#ifdef HAVE_BOOST_CONTEXT
#include <boost/asio/spawn.hpp>
namespace rgw {
+struct scoped_completion {
+ Aio* aio = nullptr;
+ AioResult* result = nullptr;
+ ~scoped_completion() { if (aio) { complete(-ECANCELED); } }
+ void complete(int r) {
+ result->result = r;
+ aio->put(*result);
+ aio = nullptr;
+ }
+};
+
+auto wait_on(scoped_completion& c) {
+ return [&c] (Aio* aio, AioResult& r) { c.aio = aio; c.result = &r; };
+}
+
+auto wait_for(boost::asio::io_context& context, ceph::timespan duration) {
+ return [&context, duration] (Aio* aio, AioResult& r) {
+ using Clock = ceph::coarse_mono_clock;
+ using Timer = boost::asio::basic_waitable_timer<Clock>;
+ auto t = std::make_unique<Timer>(context);
+ t->expires_after(duration);
+ t->async_wait([aio, &r, t=std::move(t)] (boost::system::error_code ec) {
+ if (ec != boost::asio::error::operation_aborted) {
+ aio->put(r);
+ }
+ });
+ };
+}
+
TEST_F(Aio_Throttle, NoThrottleUpToMax)
{
BlockingAioThrottle throttle(4);
auto obj = make_obj(__PRETTY_FUNCTION__);
{
- librados::ObjectWriteOperation op1;
- auto c1 = throttle.get(obj, rgw::Aio::librados_op(std::move(op1)), 1, 0);
+ scoped_completion op1;
+ auto c1 = throttle.get(obj, wait_on(op1), 1, 0);
EXPECT_TRUE(c1.empty());
- librados::ObjectWriteOperation op2;
- auto c2 = throttle.get(obj, rgw::Aio::librados_op(std::move(op2)), 1, 0);
+ scoped_completion op2;
+ auto c2 = throttle.get(obj, wait_on(op2), 1, 0);
EXPECT_TRUE(c2.empty());
- librados::ObjectWriteOperation op3;
- auto c3 = throttle.get(obj, rgw::Aio::librados_op(std::move(op3)), 1, 0);
+ scoped_completion op3;
+ auto c3 = throttle.get(obj, wait_on(op3), 1, 0);
EXPECT_TRUE(c3.empty());
- librados::ObjectWriteOperation op4;
- auto c4 = throttle.get(obj, rgw::Aio::librados_op(std::move(op4)), 1, 0);
+ scoped_completion op4;
+ auto c4 = throttle.get(obj, wait_on(op4), 1, 0);
EXPECT_TRUE(c4.empty());
// no completions because no ops had to wait
auto c5 = throttle.poll();
+ EXPECT_TRUE(c5.empty());
}
auto completions = throttle.drain();
ASSERT_EQ(4u, completions.size());
for (auto& c : completions) {
- EXPECT_EQ(-EINVAL, c.result);
+ EXPECT_EQ(-ECANCELED, c.result);
}
}
BlockingAioThrottle throttle(4);
auto obj = make_obj(__PRETTY_FUNCTION__);
- librados::ObjectWriteOperation op;
- auto c = throttle.get(obj, rgw::Aio::librados_op(std::move(op)), 8, 0);
+ scoped_completion op;
+ auto c = throttle.get(obj, wait_on(op), 8, 0);
ASSERT_EQ(1u, c.size());
EXPECT_EQ(-EDEADLK, c.front().result);
}
uint64_t max_outstanding = 0;
uint64_t outstanding = 0;
+ // timer thread
+ boost::asio::io_context context;
+ using Executor = boost::asio::io_context::executor_type;
+ using Work = boost::asio::executor_work_guard<Executor>;
+ std::optional<Work> work(context.get_executor());
+ std::thread worker([&context] { context.run(); });
+ auto g = make_scope_guard([&work, &worker] {
+ work.reset();
+ worker.join();
+ });
+
for (uint64_t i = 0; i < total; i++) {
- librados::ObjectWriteOperation op;
- auto c = throttle.get(obj, rgw::Aio::librados_op(std::move(op)), 1, 0);
+ using namespace std::chrono_literals;
+ auto c = throttle.get(obj, wait_for(context, 10ms), 1, 0);
outstanding++;
outstanding -= c.size();
if (max_outstanding < outstanding) {
boost::asio::spawn(context,
[&] (boost::asio::yield_context yield) {
YieldingAioThrottle throttle(4, context, yield);
- librados::ObjectWriteOperation op;
- auto c = throttle.submit(obj, &op, 8, 0);
+ scoped_completion op;
+ auto c = throttle.get(obj, wait_on(op), 8, 0);
ASSERT_EQ(1u, c.size());
EXPECT_EQ(-EDEADLK, c.front().result);
});
[&] (boost::asio::yield_context yield) {
YieldingAioThrottle throttle(window, context, yield);
for (uint64_t i = 0; i < total; i++) {
- librados::ObjectWriteOperation op;
- auto c = throttle.submit(obj, &op, 1, 0);
+ using namespace std::chrono_literals;
+ auto c = throttle.get(obj, wait_for(context, 10ms), 1, 0);
outstanding++;
outstanding -= c.size();
if (max_outstanding < outstanding) {
auto c = throttle.drain();
outstanding -= c.size();
});
+ context.poll(); // run until we block
+ EXPECT_EQ(window, outstanding);
+
context.run();
EXPECT_EQ(0u, outstanding);
EXPECT_EQ(window, max_outstanding);