]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add YieldingAioThrottle
authorCasey Bodley <cbodley@redhat.com>
Wed, 10 Oct 2018 12:04:31 +0000 (08:04 -0400)
committerCasey Bodley <cbodley@redhat.com>
Wed, 24 Apr 2019 13:44:14 +0000 (09:44 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_aio_throttle.cc
src/rgw/rgw_aio_throttle.h
src/test/rgw/test_rgw_throttle.cc

index 8d3369c47c888204c0634ad0a9bb6ca0668b08af..3bad71eb8ffba31a54f6b7032d3e62d40c764a9e 100644 (file)
@@ -110,4 +110,98 @@ AioResultList BlockingAioThrottle::drain()
   return std::move(completed);
 }
 
+#ifdef HAVE_BOOST_CONTEXT
+
+template <typename CompletionToken>
+auto YieldingAioThrottle::async_wait(CompletionToken&& token)
+{
+  using boost::asio::async_completion;
+  using Signature = void(boost::system::error_code);
+  async_completion<CompletionToken, Signature> init(token);
+  completion = Completion::create(context.get_executor(),
+                                  std::move(init.completion_handler));
+  return init.result.get();
+}
+
+AioResultList YieldingAioThrottle::get(const RGWSI_RADOS::Obj& obj,
+                                       OpFunc&& f,
+                                       uint64_t cost, uint64_t id)
+{
+  auto p = std::make_unique<Pending>();
+  p->obj = obj;
+  p->id = id;
+  p->cost = cost;
+
+  if (cost > window) {
+    p->result = -EDEADLK; // would never succeed
+    completed.push_back(*p);
+  } else {
+    // wait for the write size to become available
+    pending_size += p->cost;
+    if (!is_available()) {
+      ceph_assert(waiter == Wait::None);
+      ceph_assert(!completion);
+
+      boost::system::error_code ec;
+      waiter = Wait::Available;
+      async_wait(yield[ec]);
+    }
+
+    // register the pending write and initiate the operation
+    pending.push_back(*p);
+    std::move(f)(this, *static_cast<AioResult*>(p.get()));
+  }
+  p.release();
+  return std::move(completed);
+}
+
+void YieldingAioThrottle::put(AioResult& r)
+{
+  auto& p = static_cast<Pending&>(r);
+
+  // move from pending to completed
+  pending.erase(pending.iterator_to(p));
+  completed.push_back(p);
+
+  pending_size -= p.cost;
+
+  if (waiter_ready()) {
+    ceph_assert(completion);
+    ceph::async::post(std::move(completion), boost::system::error_code{});
+    waiter = Wait::None;
+  }
+}
+
+AioResultList YieldingAioThrottle::poll()
+{
+  return std::move(completed);
+}
+
+AioResultList YieldingAioThrottle::wait()
+{
+  if (!has_completion() && !pending.empty()) {
+    ceph_assert(waiter == Wait::None);
+    ceph_assert(!completion);
+
+    boost::system::error_code ec;
+    waiter = Wait::Completion;
+    async_wait(yield[ec]);
+  }
+  return std::move(completed);
+}
+
+AioResultList YieldingAioThrottle::drain()
+{
+  if (!is_drained()) {
+    ceph_assert(waiter == Wait::None);
+    ceph_assert(!completion);
+
+    boost::system::error_code ec;
+    waiter = Wait::Drained;
+    async_wait(yield[ec]);
+  }
+  return std::move(completed);
+}
+#endif // HAVE_BOOST_CONTEXT
+
 } // namespace rgw
index eeb12f826f588c37257372f7cb0bd08b3a5f4267..613abb534b00dd48a56e792d2966c5cfcabcee78 100644 (file)
@@ -18,6 +18,7 @@
 #include "include/rados/librados_fwd.hpp"
 #include <memory>
 #include "common/ceph_mutex.h"
+#include "common/async/completion.h"
 #include "services/svc_rados.h"
 #include "rgw_aio.h"
 
@@ -76,4 +77,40 @@ class BlockingAioThrottle final : public Aio, private Throttle {
   AioResultList drain() override final;
 };
 
+#ifdef HAVE_BOOST_CONTEXT
+// a throttle that yields the coroutine instead of blocking. all public
+// functions must be called within the coroutine strand
+class YieldingAioThrottle final : public Aio, private Throttle {
+  boost::asio::io_context& context;
+  boost::asio::yield_context yield;
+  struct Handler;
+
+  // completion callback associated with the waiter
+  using Completion = ceph::async::Completion<void(boost::system::error_code)>;
+  std::unique_ptr<Completion> completion;
+
+  template <typename CompletionToken>
+  auto async_wait(CompletionToken&& token);
+
+  struct Pending : AioResultEntry { uint64_t cost = 0; };
+
+ public:
+  YieldingAioThrottle(uint64_t window, boost::asio::io_context& context,
+                      boost::asio::yield_context yield)
+    : Throttle(window), context(context), yield(yield)
+  {}
+
+  AioResultList get(const RGWSI_RADOS::Obj& obj, OpFunc&& f,
+                    uint64_t cost, uint64_t id) override final;
+
+  void put(AioResult& r) override final;
+
+  AioResultList poll() override final;
+
+  AioResultList wait() override final;
+
+  AioResultList drain() override final;
+};
+#endif // HAVE_BOOST_CONTEXT
+
 } // namespace rgw
index 5a14b1a9cca7f0179b3b4350e441cc23cbd51278..4eb3110725f2dd37da2081ff2b55386aaaa09ae4 100644 (file)
@@ -17,6 +17,9 @@
 
 #include "include/rados/librados.hpp"
 
+#ifdef HAVE_BOOST_CONTEXT
+#include <boost/asio/spawn.hpp>
+#endif
 #include <gtest/gtest.h>
 
 struct RadosEnv : public ::testing::Environment {
@@ -120,4 +123,53 @@ TEST_F(Aio_Throttle, ThrottleOverMax)
   EXPECT_EQ(window, max_outstanding);
 }
 
+#ifdef HAVE_BOOST_CONTEXT
+TEST_F(Aio_Throttle, YieldCostOverWindow)
+{
+  auto obj = make_obj(__PRETTY_FUNCTION__);
+
+  boost::asio::io_context context;
+  boost::asio::spawn(context,
+    [&] (boost::asio::yield_context yield) {
+      YieldingAioThrottle throttle(4, context, yield);
+      librados::ObjectWriteOperation op;
+      auto c = throttle.submit(obj, &op, 8, 0);
+      ASSERT_EQ(1u, c.size());
+      EXPECT_EQ(-EDEADLK, c.front().result);
+    });
+}
+
+TEST_F(Aio_Throttle, YieldingThrottleOverMax)
+{
+  constexpr uint64_t window = 4;
+
+  auto obj = make_obj(__PRETTY_FUNCTION__);
+
+  // issue 32 writes, and verify that max_outstanding <= window
+  constexpr uint64_t total = 32;
+  uint64_t max_outstanding = 0;
+  uint64_t outstanding = 0;
+
+  boost::asio::io_context context;
+  boost::asio::spawn(context,
+    [&] (boost::asio::yield_context yield) {
+      YieldingAioThrottle throttle(window, context, yield);
+      for (uint64_t i = 0; i < total; i++) {
+        librados::ObjectWriteOperation op;
+        auto c = throttle.submit(obj, &op, 1, 0);
+        outstanding++;
+        outstanding -= c.size();
+        if (max_outstanding < outstanding) {
+          max_outstanding = outstanding;
+        }
+      }
+      auto c = throttle.drain();
+      outstanding -= c.size();
+    });
+  context.run();
+  EXPECT_EQ(0u, outstanding);
+  EXPECT_EQ(window, max_outstanding);
+}
+#endif // HAVE_BOOST_CONTEXT
+
 } // namespace rgw