rgw_acl.cc
rgw_acl_s3.cc
rgw_acl_swift.cc
+ rgw_aio.cc
rgw_aio_throttle.cc
rgw_auth.cc
rgw_auth_s3.cc
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <type_traits>
+#include "include/rados/librados.hpp"
+
+#include "rgw_aio.h"
+
+namespace rgw {
+namespace {
+void cb(librados::completion_t, void* arg);
+}
+struct state {
+ Aio* aio;
+ librados::AioCompletion* c;
+
+ state(Aio* aio, AioResult& r)
+ : aio(aio),
+ c(librados::Rados::aio_create_completion(&r, nullptr, &cb)) {}
+};
+
+namespace {
+void cb(librados::completion_t, void* arg) {
+ static_assert(sizeof(AioResult::user_data) >= sizeof(state));
+ static_assert(std::is_trivially_destructible_v<state>);
+ auto& r = *(static_cast<AioResult*>(arg));
+ auto s = reinterpret_cast<state*>(&r.user_data);
+ r.result = s->c->get_return_value();
+ s->c->release();
+ s->aio->put(r);
+}
+
+template<typename T,
+ bool read = std::is_same_v<std::decay_t<T>,
+ librados::ObjectReadOperation>,
+ typename = std::enable_if_t<std::is_base_of_v<librados::ObjectOperation,
+ std::decay_t<T>> &&
+ !std::is_lvalue_reference_v<T> &&
+ !std::is_const_v<T>>>
+Aio::OpFunc aio_abstract(T&& op) {
+ return [op = std::move(op)](Aio* aio, AioResult& r) mutable {
+ auto s = new (&r.user_data) state(aio, r);
+ if constexpr (read) {
+ r.result = r.obj.aio_operate(s->c, &op, &r.data);
+ } else {
+ r.result = r.obj.aio_operate(s->c, &op);
+ }
+ if (r.result < 0) {
+ s->c->release();
+ aio->put(r);
+ }
+ };
+}
+}
+
+Aio::OpFunc Aio::librados_op(librados::ObjectReadOperation&& op) {
+ return aio_abstract(std::move(op));
+}
+Aio::OpFunc Aio::librados_op(librados::ObjectWriteOperation&& op) {
+ return aio_abstract(std::move(op));
+}
+}
#pragma once
-#include "include/rados/librados_fwd.hpp"
+#include <cstdint>
+#include <memory>
+#include <type_traits>
+
#include <boost/intrusive/list.hpp>
-#include "rgw_common.h"
+#include "include/rados/librados_fwd.hpp"
+
#include "services/svc_rados.h" // cant forward declare RGWSI_RADOS::Obj
+#include "rgw_common.h"
+
+#include "include/function2.hpp"
+
namespace rgw {
struct AioResult {
uint64_t id = 0; // id allows caller to associate a result with its request
bufferlist data; // result buffer for reads
int result = 0;
+ std::aligned_storage_t<3 * sizeof(void*)> user_data;
+
+ AioResult() = default;
+ AioResult(const AioResult&) = delete;
+ AioResult& operator =(const AioResult&) = delete;
+ AioResult(AioResult&&) = delete;
+ AioResult& operator =(AioResult&&) = delete;
};
struct AioResultEntry : AioResult, boost::intrusive::list_base_hook<> {
virtual ~AioResultEntry() {}
// each call returns a list of results from prior completions
class Aio {
public:
- virtual ~Aio() {}
+ using OpFunc = fu2::unique_function<void(Aio*, AioResult&) &&>;
- virtual AioResultList submit(RGWSI_RADOS::Obj& obj,
- librados::ObjectReadOperation *op,
- uint64_t cost, uint64_t id) = 0;
+ virtual ~Aio() {}
- virtual AioResultList submit(RGWSI_RADOS::Obj& obj,
- librados::ObjectWriteOperation *op,
- uint64_t cost, uint64_t id) = 0;
+ virtual AioResultList get(const RGWSI_RADOS::Obj& obj,
+ OpFunc&& f,
+ uint64_t cost, uint64_t id) = 0;
+ virtual void put(AioResult& r) = 0;
// poll for any ready completions without waiting
virtual AioResultList poll() = 0;
// wait for all outstanding completions and return their results
virtual AioResultList drain() = 0;
+
+ static OpFunc librados_op(librados::ObjectReadOperation&& op);
+ static OpFunc librados_op(librados::ObjectWriteOperation&& op);
};
} // namespace rgw
namespace rgw {
-void AioThrottle::aio_cb(void *cb, void *arg)
-{
- Pending& p = *static_cast<Pending*>(arg);
- p.result = p.completion->get_return_value();
- p.parent->put(p);
-}
-
bool AioThrottle::waiter_ready() const
{
switch (waiter) {
}
}
-AioResultList AioThrottle::submit(RGWSI_RADOS::Obj& obj,
- librados::ObjectWriteOperation *op,
- uint64_t cost, uint64_t id)
+AioResultList AioThrottle::get(const RGWSI_RADOS::Obj& obj,
+ OpFunc&& f,
+ uint64_t cost, uint64_t id)
{
auto p = std::make_unique<Pending>();
p->obj = obj;
p->id = id;
p->cost = cost;
+ std::unique_lock lock{mutex};
if (cost > window) {
p->result = -EDEADLK; // would never succeed
completed.push_back(*p);
} else {
- get(*p);
- p->result = obj.aio_operate(p->completion, op);
- if (p->result < 0) {
- put(*p);
+ // wait for the write size to become available
+ pending_size += p->cost;
+ if (!is_available()) {
+ ceph_assert(waiter == Wait::None);
+ waiter = Wait::Available;
+ cond.wait(lock, [this] { return is_available(); });
+ waiter = Wait::None;
}
- }
- p.release();
- return std::move(completed);
-}
-AioResultList AioThrottle::submit(RGWSI_RADOS::Obj& obj,
- librados::ObjectReadOperation *op,
- uint64_t cost, uint64_t id)
-{
- auto p = std::make_unique<Pending>();
- p->obj = obj;
- p->id = id;
- p->cost = cost;
-
- if (cost > window) {
- p->result = -EDEADLK; // would never succeed
- completed.push_back(*p);
- } else {
- get(*p);
- p->result = obj.aio_operate(p->completion, op, &p->data);
- if (p->result < 0) {
- put(*p);
- }
+ // register the pending write and attach a completion
+ p->parent = this;
+ pending.push_back(*p);
+ lock.unlock();
+ std::move(f)(this, *static_cast<AioResult*>(p.get()));
+ lock.lock();
}
p.release();
return std::move(completed);
}
-void AioThrottle::get(Pending& p)
-{
- std::unique_lock lock{mutex};
-
- // wait for the write size to become available
- pending_size += p.cost;
- if (!is_available()) {
- ceph_assert(waiter == Wait::None);
- waiter = Wait::Available;
- cond.wait(lock, [this] { return is_available(); });
- waiter = Wait::None;
- }
-
- // register the pending write and attach a completion
- p.parent = this;
- p.completion = librados::Rados::aio_create_completion(&p, nullptr, aio_cb);
- pending.push_back(p);
-}
-
-void AioThrottle::put(Pending& p)
+void AioThrottle::put(AioResult& r)
{
- p.completion->release();
- p.completion = nullptr;
-
+ auto& p = static_cast<Pending&>(r);
std::scoped_lock lock{mutex};
// move from pending to completed
struct Pending : AioResultEntry {
AioThrottle *parent = nullptr;
uint64_t cost = 0;
- librados::AioCompletion *completion = nullptr;
};
OwningList<Pending> pending;
AioResultList completed;
ceph::mutex mutex = ceph::make_mutex("AioThrottle");
ceph::condition_variable cond;
- void get(Pending& p);
- void put(Pending& p);
-
- static void aio_cb(void *cb, void *arg);
-
public:
AioThrottle(uint64_t window) : window(window) {}
ceph_assert(completed.empty());
}
- AioResultList submit(RGWSI_RADOS::Obj& obj,
- librados::ObjectReadOperation *op,
- uint64_t cost, uint64_t id) override;
+ AioResultList get(const RGWSI_RADOS::Obj& obj,
+ OpFunc&& f,
+ uint64_t cost, uint64_t id) override;
+ void put(AioResult& r) override;
- AioResultList submit(RGWSI_RADOS::Obj& obj,
- librados::ObjectWriteOperation *op,
- uint64_t cost, uint64_t id) override;
AioResultList poll() override;
op.write(offset, data);
}
constexpr uint64_t id = 0; // unused
- auto c = aio->submit(stripe_obj, &op, cost, id);
+ auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op)), cost, id);
return process_completed(c, &written);
}
op.write_full(data);
constexpr uint64_t id = 0; // unused
- auto c = aio->submit(stripe_obj, &op, cost, id);
+ auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op)), cost, id);
auto d = aio->drain();
c.splice(c.end(), d);
return process_completed(c, &written);
// implements DoutPrefixProvider
CephContext *get_cct() const override { return store->ctx(); }
- unsigned get_subsys() const
+ unsigned get_subsys() const override
{
return dout_subsys;
}
- std::ostream& gen_prefix(std::ostream& out) const
+ std::ostream& gen_prefix(std::ostream& out) const override
{
return out << "sync log trim: ";
}
const uint64_t cost = len;
const uint64_t id = obj_ofs; // use logical object offset for sorting replies
- auto completed = d->aio->submit(obj, &op, cost, id);
+ auto completed = d->aio->get(obj, rgw::Aio::librados_op(std::move(op)), cost, id);
return d->flush(std::move(completed));
}
r = 0;
ASSERT_EQ(0, r);
}
- void TearDown() {
+ void TearDown() override {
rados.reset();
}
};
auto obj = make_obj(__PRETTY_FUNCTION__);
{
librados::ObjectWriteOperation op1;
- auto c1 = throttle.submit(obj, &op1, 1, 0);
+ auto c1 = throttle.get(obj, rgw::Aio::librados_op(std::move(op1)), 1, 0);
EXPECT_TRUE(c1.empty());
librados::ObjectWriteOperation op2;
- auto c2 = throttle.submit(obj, &op2, 1, 0);
+ auto c2 = throttle.get(obj, rgw::Aio::librados_op(std::move(op2)), 1, 0);
EXPECT_TRUE(c2.empty());
librados::ObjectWriteOperation op3;
- auto c3 = throttle.submit(obj, &op3, 1, 0);
+ auto c3 = throttle.get(obj, rgw::Aio::librados_op(std::move(op3)), 1, 0);
EXPECT_TRUE(c3.empty());
librados::ObjectWriteOperation op4;
- auto c4 = throttle.submit(obj, &op4, 1, 0);
+ auto c4 = throttle.get(obj, rgw::Aio::librados_op(std::move(op4)), 1, 0);
EXPECT_TRUE(c4.empty());
// no completions because no ops had to wait
auto c5 = throttle.poll();
auto obj = make_obj(__PRETTY_FUNCTION__);
librados::ObjectWriteOperation op;
- auto c = throttle.submit(obj, &op, 8, 0);
+ auto c = throttle.get(obj, rgw::Aio::librados_op(std::move(op)), 8, 0);
ASSERT_EQ(1u, c.size());
EXPECT_EQ(-EDEADLK, c.front().result);
}
for (uint64_t i = 0; i < total; i++) {
librados::ObjectWriteOperation op;
- auto c = throttle.submit(obj, &op, 1, 0);
+ auto c = throttle.get(obj, rgw::Aio::librados_op(std::move(op)), 1, 0);
outstanding++;
outstanding -= c.size();
if (max_outstanding < outstanding) {