rgw_otp.cc
rgw_policy_s3.cc
rgw_putobj.cc
+ rgw_putobj_throttle.cc
rgw_quota.cc
rgw_rados.cc
rgw_resolve.cc
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "include/rados/librados.hpp"
+
+#include "rgw_putobj_throttle.h"
+#include "rgw_rados.h"
+
+namespace rgw::putobj {
+
+void AioThrottle::aio_cb(void *cb, void *arg)
+{
+ Pending& p = *static_cast<Pending*>(arg);
+ p.result = p.completion->get_return_value();
+ p.parent->put(p);
+}
+
+bool AioThrottle::waiter_ready() const
+{
+ switch (waiter) {
+ case Wait::Available: return is_available();
+ case Wait::Completion: return has_completion();
+ case Wait::Drained: return is_drained();
+ default: return false;
+ }
+}
+
+ResultList AioThrottle::submit(rgw_rados_ref& ref, const rgw_raw_obj& obj,
+ librados::ObjectWriteOperation *op,
+ uint64_t cost)
+{
+ auto p = std::make_unique<Pending>();
+ p->obj = obj;
+ p->cost = cost;
+
+ if (cost > window) {
+ p->result = -EDEADLK; // would never succeed
+ completed.push_back(*p);
+ } else {
+ get(*p);
+ p->result = ref.ioctx.aio_operate(ref.oid, p->completion, op);
+ if (p->result < 0) {
+ put(*p);
+ }
+ }
+ p.release();
+ return std::move(completed);
+}
+
+ResultList AioThrottle::submit(rgw_rados_ref& ref, const rgw_raw_obj& obj,
+ librados::ObjectReadOperation *op,
+ bufferlist *data, uint64_t cost)
+{
+ auto p = std::make_unique<Pending>();
+ p->obj = obj;
+ p->cost = cost;
+
+ if (cost > window) {
+ p->result = -EDEADLK; // would never succeed
+ completed.push_back(*p);
+ } else {
+ get(*p);
+ p->result = ref.ioctx.aio_operate(ref.oid, p->completion, op, data);
+ if (p->result < 0) {
+ put(*p);
+ }
+ }
+ p.release();
+ return std::move(completed);
+}
+
+void AioThrottle::get(Pending& p)
+{
+ std::unique_lock lock{mutex};
+
+ // wait for the write size to become available
+ pending_size += p.cost;
+ if (!is_available()) {
+ ceph_assert(waiter == Wait::None);
+ waiter = Wait::Available;
+ cond.wait(lock, [this] { return is_available(); });
+ waiter = Wait::None;
+ }
+
+ // register the pending write and attach a completion
+ p.parent = this;
+ p.completion = librados::Rados::aio_create_completion(&p, nullptr, aio_cb);
+ pending.push_back(p);
+}
+
+void AioThrottle::put(Pending& p)
+{
+ p.completion->release();
+ p.completion = nullptr;
+
+ std::scoped_lock lock{mutex};
+
+ // move from pending to completed
+ pending.erase(pending.iterator_to(p));
+ completed.push_back(p);
+
+ pending_size -= p.cost;
+
+ if (waiter_ready()) {
+ cond.notify_one();
+ }
+}
+
+ResultList AioThrottle::poll()
+{
+ std::unique_lock lock{mutex};
+ return std::move(completed);
+}
+
+ResultList AioThrottle::wait()
+{
+ std::unique_lock lock{mutex};
+ if (completed.empty() && !pending.empty()) {
+ ceph_assert(waiter == Wait::None);
+ waiter = Wait::Completion;
+ cond.wait(lock, [this] { return has_completion(); });
+ waiter = Wait::None;
+ }
+ return std::move(completed);
+}
+
+ResultList AioThrottle::drain()
+{
+ std::unique_lock lock{mutex};
+ if (!pending.empty()) {
+ ceph_assert(waiter == Wait::None);
+ waiter = Wait::Drained;
+ cond.wait(lock, [this] { return is_drained(); });
+ waiter = Wait::None;
+ }
+ return std::move(completed);
+}
+
+} // namespace rgw::putobj
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <memory>
+#include "common/ceph_mutex.h"
+#include "rgw_putobj_aio.h"
+
+namespace librados {
+class AioCompletion;
+}
+
+namespace rgw::putobj {
+
+// a throttle for aio operations that enforces a maximum window on outstanding
+// bytes. only supports a single waiter, so all public functions must be called
+// from the same thread
+class AioThrottle : public Aio {
+ protected:
+ const uint64_t window;
+ uint64_t pending_size = 0;
+
+ bool is_available() const { return pending_size <= window; }
+ bool has_completion() const { return !completed.empty(); }
+ bool is_drained() const { return pending.empty(); }
+
+ struct Pending : ResultEntry {
+ AioThrottle *parent = nullptr;
+ uint64_t cost = 0;
+ librados::AioCompletion *completion = nullptr;
+ };
+ OwningList<Pending> pending;
+ ResultList completed;
+
+ enum class Wait { None, Available, Completion, Drained };
+ Wait waiter = Wait::None;
+
+ bool waiter_ready() const;
+
+ ceph::mutex mutex = ceph::make_mutex("AioThrottle");
+ ceph::condition_variable cond;
+
+ void get(Pending& p);
+ void put(Pending& p);
+
+ static void aio_cb(void *cb, void *arg);
+
+ public:
+ AioThrottle(uint64_t window) : window(window) {}
+
+ virtual ~AioThrottle() {
+ // must drain before destructing
+ ceph_assert(pending.empty());
+ ceph_assert(completed.empty());
+ }
+
+ ResultList submit(rgw_rados_ref& ref, const rgw_raw_obj& obj,
+ librados::ObjectReadOperation *op, bufferlist *data,
+ uint64_t cost) override;
+
+ ResultList submit(rgw_rados_ref& ref, const rgw_raw_obj& obj,
+ librados::ObjectWriteOperation *op, uint64_t cost) override;
+
+ ResultList poll() override;
+
+ ResultList wait() override;
+
+ ResultList drain() override;
+};
+
+} // namespace rgw::putobj
add_ceph_unittest(unittest_rgw_putobj)
target_link_libraries(unittest_rgw_putobj rgw_a ${UNITTEST_LIBS})
+add_executable(ceph_test_rgw_throttle
+ test_rgw_throttle.cc
+ $<TARGET_OBJECTS:unit-main>)
+target_link_libraries(ceph_test_rgw_throttle rgw_a
+ librados global ${UNITTEST_LIBS})
+
add_executable(unittest_rgw_iam_policy test_rgw_iam_policy.cc)
add_ceph_unittest(unittest_rgw_iam_policy)
target_link_libraries(unittest_rgw_iam_policy
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "rgw/rgw_putobj_throttle.h"
+#include "rgw/rgw_rados.h"
+
+#include "include/rados/librados.hpp"
+
+#include <gtest/gtest.h>
+
+struct RadosEnv : public ::testing::Environment {
+ public:
+ static constexpr auto poolname = "ceph_test_rgw_throttle";
+
+ static librados::Rados rados;
+ static librados::IoCtx io;
+
+ void SetUp() override {
+ ASSERT_EQ(0, rados.init_with_context(g_ceph_context));
+ ASSERT_EQ(0, rados.connect());
+ // open/create test pool
+ int r = rados.ioctx_create(poolname, io);
+ if (r == -ENOENT) {
+ r = rados.pool_create(poolname);
+ if (r == -EEXIST) {
+ r = 0;
+ } else if (r == 0) {
+ r = rados.ioctx_create(poolname, io);
+ }
+ }
+ ASSERT_EQ(0, r);
+ }
+ void TearDown() {
+ rados.shutdown();
+ }
+};
+librados::Rados RadosEnv::rados;
+librados::IoCtx RadosEnv::io;
+
+auto *const rados_env = ::testing::AddGlobalTestEnvironment(new RadosEnv);
+
+// test fixture for global setup/teardown
+class RadosFixture : public ::testing::Test {
+ protected:
+ librados::IoCtx& io;
+
+ rgw_raw_obj make_obj(const std::string& oid) {
+ return {{RadosEnv::poolname}, oid};
+ }
+ rgw_rados_ref make_ref(const rgw_raw_obj& obj) {
+ return {obj.pool, obj.oid, "", io};
+ }
+ public:
+ RadosFixture() : io(RadosEnv::io) {}
+};
+
+using PutObj_Throttle = RadosFixture;
+
+namespace rgw::putobj {
+
+inline bool operator==(const Result& lhs, const Result& rhs) {
+ return lhs.obj == rhs.obj && lhs.result == rhs.result;
+}
+std::ostream& operator<<(std::ostream& out, const Result& r) {
+ return out << "{r=" << r.result << " obj='" << r.obj << "'";
+}
+
+TEST_F(PutObj_Throttle, NoThrottleUpToMax)
+{
+ AioThrottle throttle(4);
+ auto obj = make_obj(__PRETTY_FUNCTION__);
+ auto ref = make_ref(obj);
+ {
+ librados::ObjectWriteOperation op1;
+ auto c1 = throttle.submit(ref, obj, &op1, 1);
+ EXPECT_TRUE(c1.empty());
+ librados::ObjectWriteOperation op2;
+ auto c2 = throttle.submit(ref, obj, &op2, 1);
+ EXPECT_TRUE(c2.empty());
+ librados::ObjectWriteOperation op3;
+ auto c3 = throttle.submit(ref, obj, &op3, 1);
+ EXPECT_TRUE(c3.empty());
+ librados::ObjectWriteOperation op4;
+ auto c4 = throttle.submit(ref, obj, &op4, 1);
+ EXPECT_TRUE(c4.empty());
+ // no completions because no ops had to wait
+ auto c5 = throttle.poll();
+ }
+ auto completions = throttle.drain();
+ ASSERT_EQ(4u, completions.size());
+ for (auto& c : completions) {
+ EXPECT_EQ(Result({obj, -EINVAL}), c);
+ }
+}
+
+TEST_F(PutObj_Throttle, CostOverWindow)
+{
+ AioThrottle throttle(4);
+ auto obj = make_obj(__PRETTY_FUNCTION__);
+ auto ref = make_ref(obj);
+
+ librados::ObjectWriteOperation op;
+ auto c = throttle.submit(ref, obj, &op, 8);
+ ASSERT_EQ(1u, c.size());
+ EXPECT_EQ(Result({obj, -EDEADLK}), c.front());
+}
+
+TEST_F(PutObj_Throttle, AioThrottleOverMax)
+{
+ constexpr uint64_t window = 4;
+ AioThrottle throttle(window);
+
+ auto obj = make_obj(__PRETTY_FUNCTION__);
+ auto ref = make_ref(obj);
+
+ // issue 32 writes, and verify that max_outstanding <= window
+ constexpr uint64_t total = 32;
+ uint64_t max_outstanding = 0;
+ uint64_t outstanding = 0;
+
+ for (uint64_t i = 0; i < total; i++) {
+ librados::ObjectWriteOperation op;
+ auto c = throttle.submit(ref, obj, &op, 1);
+ outstanding++;
+ outstanding -= c.size();
+ if (max_outstanding < outstanding) {
+ max_outstanding = outstanding;
+ }
+ }
+ auto c = throttle.drain();
+ outstanding -= c.size();
+ EXPECT_EQ(0u, outstanding);
+ EXPECT_EQ(window, max_outstanding);
+}
+
+} // namespace rgw::putobj