]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add rgw::putobj::AioThrottle
authorCasey Bodley <cbodley@redhat.com>
Wed, 10 Oct 2018 19:11:54 +0000 (15:11 -0400)
committerCasey Bodley <cbodley@redhat.com>
Mon, 15 Oct 2018 21:05:12 +0000 (17:05 -0400)
implement the throttling algorithm in terms of rgw::putobj::Aio. this
differs from RGWPutObjProcessor_Aio in that it doesn't wait on the
first pending write to complete before making process on later ones

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/CMakeLists.txt
src/rgw/rgw_putobj_throttle.cc [new file with mode: 0644]
src/rgw/rgw_putobj_throttle.h [new file with mode: 0644]
src/test/rgw/CMakeLists.txt
src/test/rgw/test_rgw_throttle.cc [new file with mode: 0644]

index c830d8e79c49bfa38175b2b66d4c0f2d70c3fa4c..cb6083321fb0c48c5df6226b8b5ffd80798a29ea 100644 (file)
@@ -85,6 +85,7 @@ set(librgw_common_srcs
   rgw_otp.cc
   rgw_policy_s3.cc
   rgw_putobj.cc
+  rgw_putobj_throttle.cc
   rgw_quota.cc
   rgw_rados.cc
   rgw_resolve.cc
diff --git a/src/rgw/rgw_putobj_throttle.cc b/src/rgw/rgw_putobj_throttle.cc
new file mode 100644 (file)
index 0000000..e6bbf47
--- /dev/null
@@ -0,0 +1,150 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "include/rados/librados.hpp"
+
+#include "rgw_putobj_throttle.h"
+#include "rgw_rados.h"
+
+namespace rgw::putobj {
+
+void AioThrottle::aio_cb(void *cb, void *arg)
+{
+  Pending& p = *static_cast<Pending*>(arg);
+  p.result = p.completion->get_return_value();
+  p.parent->put(p);
+}
+
+bool AioThrottle::waiter_ready() const
+{
+  switch (waiter) {
+  case Wait::Available: return is_available();
+  case Wait::Completion: return has_completion();
+  case Wait::Drained: return is_drained();
+  default: return false;
+  }
+}
+
+ResultList AioThrottle::submit(rgw_rados_ref& ref, const rgw_raw_obj& obj,
+                               librados::ObjectWriteOperation *op,
+                               uint64_t cost)
+{
+  auto p = std::make_unique<Pending>();
+  p->obj = obj;
+  p->cost = cost;
+
+  if (cost > window) {
+    p->result = -EDEADLK; // would never succeed
+    completed.push_back(*p);
+  } else {
+    get(*p);
+    p->result = ref.ioctx.aio_operate(ref.oid, p->completion, op);
+    if (p->result < 0) {
+      put(*p);
+    }
+  }
+  p.release();
+  return std::move(completed);
+}
+
+ResultList AioThrottle::submit(rgw_rados_ref& ref, const rgw_raw_obj& obj,
+                               librados::ObjectReadOperation *op,
+                               bufferlist *data, uint64_t cost)
+{
+  auto p = std::make_unique<Pending>();
+  p->obj = obj;
+  p->cost = cost;
+
+  if (cost > window) {
+    p->result = -EDEADLK; // would never succeed
+    completed.push_back(*p);
+  } else {
+    get(*p);
+    p->result = ref.ioctx.aio_operate(ref.oid, p->completion, op, data);
+    if (p->result < 0) {
+      put(*p);
+    }
+  }
+  p.release();
+  return std::move(completed);
+}
+
+void AioThrottle::get(Pending& p)
+{
+  std::unique_lock lock{mutex};
+
+  // wait for the write size to become available
+  pending_size += p.cost;
+  if (!is_available()) {
+    ceph_assert(waiter == Wait::None);
+    waiter = Wait::Available;
+    cond.wait(lock, [this] { return is_available(); });
+    waiter = Wait::None;
+  }
+
+  // register the pending write and attach a completion
+  p.parent = this;
+  p.completion = librados::Rados::aio_create_completion(&p, nullptr, aio_cb);
+  pending.push_back(p);
+}
+
+void AioThrottle::put(Pending& p)
+{
+  p.completion->release();
+  p.completion = nullptr;
+
+  std::scoped_lock lock{mutex};
+
+  // move from pending to completed
+  pending.erase(pending.iterator_to(p));
+  completed.push_back(p);
+
+  pending_size -= p.cost;
+
+  if (waiter_ready()) {
+    cond.notify_one();
+  }
+}
+
+ResultList AioThrottle::poll()
+{
+  std::unique_lock lock{mutex};
+  return std::move(completed);
+}
+
+ResultList AioThrottle::wait()
+{
+  std::unique_lock lock{mutex};
+  if (completed.empty() && !pending.empty()) {
+    ceph_assert(waiter == Wait::None);
+    waiter = Wait::Completion;
+    cond.wait(lock, [this] { return has_completion(); });
+    waiter = Wait::None;
+  }
+  return std::move(completed);
+}
+
+ResultList AioThrottle::drain()
+{
+  std::unique_lock lock{mutex};
+  if (!pending.empty()) {
+    ceph_assert(waiter == Wait::None);
+    waiter = Wait::Drained;
+    cond.wait(lock, [this] { return is_drained(); });
+    waiter = Wait::None;
+  }
+  return std::move(completed);
+}
+
+} // namespace rgw::putobj
diff --git a/src/rgw/rgw_putobj_throttle.h b/src/rgw/rgw_putobj_throttle.h
new file mode 100644 (file)
index 0000000..9a6b78c
--- /dev/null
@@ -0,0 +1,83 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <memory>
+#include "common/ceph_mutex.h"
+#include "rgw_putobj_aio.h"
+
+namespace librados {
+class AioCompletion;
+}
+
+namespace rgw::putobj {
+
+// a throttle for aio operations that enforces a maximum window on outstanding
+// bytes. only supports a single waiter, so all public functions must be called
+// from the same thread
+class AioThrottle : public Aio {
+ protected:
+  const uint64_t window;
+  uint64_t pending_size = 0;
+
+  bool is_available() const { return pending_size <= window; }
+  bool has_completion() const { return !completed.empty(); }
+  bool is_drained() const { return pending.empty(); }
+
+  struct Pending : ResultEntry {
+    AioThrottle *parent = nullptr;
+    uint64_t cost = 0;
+    librados::AioCompletion *completion = nullptr;
+  };
+  OwningList<Pending> pending;
+  ResultList completed;
+
+  enum class Wait { None, Available, Completion, Drained };
+  Wait waiter = Wait::None;
+
+  bool waiter_ready() const;
+
+  ceph::mutex mutex = ceph::make_mutex("AioThrottle");
+  ceph::condition_variable cond;
+
+  void get(Pending& p);
+  void put(Pending& p);
+
+  static void aio_cb(void *cb, void *arg);
+
+ public:
+  AioThrottle(uint64_t window) : window(window) {}
+
+  virtual ~AioThrottle() {
+    // must drain before destructing
+    ceph_assert(pending.empty());
+    ceph_assert(completed.empty());
+  }
+
+  ResultList submit(rgw_rados_ref& ref, const rgw_raw_obj& obj,
+                    librados::ObjectReadOperation *op, bufferlist *data,
+                    uint64_t cost) override;
+
+  ResultList submit(rgw_rados_ref& ref, const rgw_raw_obj& obj,
+                    librados::ObjectWriteOperation *op, uint64_t cost) override;
+
+  ResultList poll() override;
+
+  ResultList wait() override;
+
+  ResultList drain() override;
+};
+
+} // namespace rgw::putobj
index e37c1723611ec81eebe44c4148cbfb1f759aaf2b..9efb55866f8c2905be8309f553bbc9f9d7f9c674 100644 (file)
@@ -96,6 +96,12 @@ add_executable(unittest_rgw_putobj test_rgw_putobj.cc)
 add_ceph_unittest(unittest_rgw_putobj)
 target_link_libraries(unittest_rgw_putobj rgw_a ${UNITTEST_LIBS})
 
+add_executable(ceph_test_rgw_throttle
+  test_rgw_throttle.cc
+  $<TARGET_OBJECTS:unit-main>)
+target_link_libraries(ceph_test_rgw_throttle rgw_a
+  librados global ${UNITTEST_LIBS})
+
 add_executable(unittest_rgw_iam_policy test_rgw_iam_policy.cc)
 add_ceph_unittest(unittest_rgw_iam_policy)
 target_link_libraries(unittest_rgw_iam_policy
diff --git a/src/test/rgw/test_rgw_throttle.cc b/src/test/rgw/test_rgw_throttle.cc
new file mode 100644 (file)
index 0000000..ea4214a
--- /dev/null
@@ -0,0 +1,147 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "rgw/rgw_putobj_throttle.h"
+#include "rgw/rgw_rados.h"
+
+#include "include/rados/librados.hpp"
+
+#include <gtest/gtest.h>
+
+struct RadosEnv : public ::testing::Environment {
+ public:
+  static constexpr auto poolname = "ceph_test_rgw_throttle";
+
+  static librados::Rados rados;
+  static librados::IoCtx io;
+
+  void SetUp() override {
+    ASSERT_EQ(0, rados.init_with_context(g_ceph_context));
+    ASSERT_EQ(0, rados.connect());
+    // open/create test pool
+    int r = rados.ioctx_create(poolname, io);
+    if (r == -ENOENT) {
+      r = rados.pool_create(poolname);
+      if (r == -EEXIST) {
+        r = 0;
+      } else if (r == 0) {
+        r = rados.ioctx_create(poolname, io);
+      }
+    }
+    ASSERT_EQ(0, r);
+  }
+  void TearDown() {
+    rados.shutdown();
+  }
+};
+librados::Rados RadosEnv::rados;
+librados::IoCtx RadosEnv::io;
+
+auto *const rados_env = ::testing::AddGlobalTestEnvironment(new RadosEnv);
+
+// test fixture for global setup/teardown
+class RadosFixture : public ::testing::Test {
+ protected:
+  librados::IoCtx& io;
+
+  rgw_raw_obj make_obj(const std::string& oid) {
+    return {{RadosEnv::poolname}, oid};
+  }
+  rgw_rados_ref make_ref(const rgw_raw_obj& obj) {
+    return {obj.pool, obj.oid, "", io};
+  }
+ public:
+  RadosFixture() : io(RadosEnv::io) {}
+};
+
+using PutObj_Throttle = RadosFixture;
+
+namespace rgw::putobj {
+
+inline bool operator==(const Result& lhs, const Result& rhs) {
+  return lhs.obj == rhs.obj && lhs.result == rhs.result;
+}
+std::ostream& operator<<(std::ostream& out, const Result& r) {
+  return out << "{r=" << r.result << " obj='" << r.obj << "'";
+}
+
+TEST_F(PutObj_Throttle, NoThrottleUpToMax)
+{
+  AioThrottle throttle(4);
+  auto obj = make_obj(__PRETTY_FUNCTION__);
+  auto ref = make_ref(obj);
+  {
+    librados::ObjectWriteOperation op1;
+    auto c1 = throttle.submit(ref, obj, &op1, 1);
+    EXPECT_TRUE(c1.empty());
+    librados::ObjectWriteOperation op2;
+    auto c2 = throttle.submit(ref, obj, &op2, 1);
+    EXPECT_TRUE(c2.empty());
+    librados::ObjectWriteOperation op3;
+    auto c3 = throttle.submit(ref, obj, &op3, 1);
+    EXPECT_TRUE(c3.empty());
+    librados::ObjectWriteOperation op4;
+    auto c4 = throttle.submit(ref, obj, &op4, 1);
+    EXPECT_TRUE(c4.empty());
+    // no completions because no ops had to wait
+    auto c5 = throttle.poll();
+  }
+  auto completions = throttle.drain();
+  ASSERT_EQ(4u, completions.size());
+  for (auto& c : completions) {
+    EXPECT_EQ(Result({obj, -EINVAL}), c);
+  }
+}
+
+TEST_F(PutObj_Throttle, CostOverWindow)
+{
+  AioThrottle throttle(4);
+  auto obj = make_obj(__PRETTY_FUNCTION__);
+  auto ref = make_ref(obj);
+
+  librados::ObjectWriteOperation op;
+  auto c = throttle.submit(ref, obj, &op, 8);
+  ASSERT_EQ(1u, c.size());
+  EXPECT_EQ(Result({obj, -EDEADLK}), c.front());
+}
+
+TEST_F(PutObj_Throttle, AioThrottleOverMax)
+{
+  constexpr uint64_t window = 4;
+  AioThrottle throttle(window);
+
+  auto obj = make_obj(__PRETTY_FUNCTION__);
+  auto ref = make_ref(obj);
+
+  // issue 32 writes, and verify that max_outstanding <= window
+  constexpr uint64_t total = 32;
+  uint64_t max_outstanding = 0;
+  uint64_t outstanding = 0;
+
+  for (uint64_t i = 0; i < total; i++) {
+    librados::ObjectWriteOperation op;
+    auto c = throttle.submit(ref, obj, &op, 1);
+    outstanding++;
+    outstanding -= c.size();
+    if (max_outstanding < outstanding) {
+      max_outstanding = outstanding;
+    }
+  }
+  auto c = throttle.drain();
+  outstanding -= c.size();
+  EXPECT_EQ(0u, outstanding);
+  EXPECT_EQ(window, max_outstanding);
+}
+
+} // namespace rgw::putobj