From: Adam C. Emerson Date: Sat, 16 Feb 2019 05:57:36 +0000 (-0500) Subject: rgw: Refactor Aio and AioThrottle X-Git-Tag: v15.1.0~2921^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=103627e77209203b36bffd9c010d7ec8ea41b593;p=ceph.git rgw: Refactor Aio and AioThrottle To support in-development work on local caching (and in-development work on having RGW use the 'unleashed' version of RADOS) divorce the implementation of AioThrottle from the specifics of librados in accord with a design that Casey Bodley outlined. Also include a 'support' function to simplify using the aio_operate machinery on librados. It's a bit ugly given how completions are implemented, but all that's hidden away. Signed-off-by: Adam C. Emerson --- diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 412ccb62cbb9..23881c3c9d7d 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -34,6 +34,7 @@ set(librgw_common_srcs rgw_acl.cc rgw_acl_s3.cc rgw_acl_swift.cc + rgw_aio.cc rgw_aio_throttle.cc rgw_auth.cc rgw_auth_s3.cc diff --git a/src/rgw/rgw_aio.cc b/src/rgw/rgw_aio.cc new file mode 100644 index 000000000000..b311982872f1 --- /dev/null +++ b/src/rgw/rgw_aio.cc @@ -0,0 +1,74 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include "include/rados/librados.hpp" + +#include "rgw_aio.h" + +namespace rgw { +namespace { +void cb(librados::completion_t, void* arg); +} +struct state { + Aio* aio; + librados::AioCompletion* c; + + state(Aio* aio, AioResult& r) + : aio(aio), + c(librados::Rados::aio_create_completion(&r, nullptr, &cb)) {} +}; + +namespace { +void cb(librados::completion_t, void* arg) { + static_assert(sizeof(AioResult::user_data) >= sizeof(state)); + static_assert(std::is_trivially_destructible_v); + auto& r = *(static_cast(arg)); + auto s = reinterpret_cast(&r.user_data); + r.result = s->c->get_return_value(); + s->c->release(); + s->aio->put(r); +} + +template, + librados::ObjectReadOperation>, + typename = std::enable_if_t> && + !std::is_lvalue_reference_v && + !std::is_const_v>> +Aio::OpFunc aio_abstract(T&& op) { + return [op = std::move(op)](Aio* aio, AioResult& r) mutable { + auto s = new (&r.user_data) state(aio, r); + if constexpr (read) { + r.result = r.obj.aio_operate(s->c, &op, &r.data); + } else { + r.result = r.obj.aio_operate(s->c, &op); + } + if (r.result < 0) { + s->c->release(); + aio->put(r); + } + }; +} +} + +Aio::OpFunc Aio::librados_op(librados::ObjectReadOperation&& op) { + return aio_abstract(std::move(op)); +} +Aio::OpFunc Aio::librados_op(librados::ObjectWriteOperation&& op) { + return aio_abstract(std::move(op)); +} +} diff --git a/src/rgw/rgw_aio.h b/src/rgw/rgw_aio.h index 0ca401dae048..4602be0a622a 100644 --- a/src/rgw/rgw_aio.h +++ b/src/rgw/rgw_aio.h @@ -15,11 +15,19 @@ #pragma once -#include "include/rados/librados_fwd.hpp" +#include +#include +#include + #include -#include "rgw_common.h" +#include "include/rados/librados_fwd.hpp" + #include "services/svc_rados.h" // cant forward declare RGWSI_RADOS::Obj +#include "rgw_common.h" + +#include "include/function2.hpp" + namespace rgw { struct AioResult { @@ -27,6 +35,13 @@ struct AioResult { uint64_t id = 0; // id allows caller to associate a result with its request bufferlist data; // result buffer for reads int result = 0; + std::aligned_storage_t<3 * sizeof(void*)> user_data; + + AioResult() = default; + AioResult(const AioResult&) = delete; + AioResult& operator =(const AioResult&) = delete; + AioResult(AioResult&&) = delete; + AioResult& operator =(AioResult&&) = delete; }; struct AioResultEntry : AioResult, boost::intrusive::list_base_hook<> { virtual ~AioResultEntry() {} @@ -57,15 +72,14 @@ inline int check_for_errors(const AioResultList& results) { // each call returns a list of results from prior completions class Aio { public: - virtual ~Aio() {} + using OpFunc = fu2::unique_function; - virtual AioResultList submit(RGWSI_RADOS::Obj& obj, - librados::ObjectReadOperation *op, - uint64_t cost, uint64_t id) = 0; + virtual ~Aio() {} - virtual AioResultList submit(RGWSI_RADOS::Obj& obj, - librados::ObjectWriteOperation *op, - uint64_t cost, uint64_t id) = 0; + virtual AioResultList get(const RGWSI_RADOS::Obj& obj, + OpFunc&& f, + uint64_t cost, uint64_t id) = 0; + virtual void put(AioResult& r) = 0; // poll for any ready completions without waiting virtual AioResultList poll() = 0; @@ -75,6 +89,9 @@ class Aio { // wait for all outstanding completions and return their results virtual AioResultList drain() = 0; + + static OpFunc librados_op(librados::ObjectReadOperation&& op); + static OpFunc librados_op(librados::ObjectWriteOperation&& op); }; } // namespace rgw diff --git a/src/rgw/rgw_aio_throttle.cc b/src/rgw/rgw_aio_throttle.cc index 1ac1be259d3e..c596ba4ed8e8 100644 --- a/src/rgw/rgw_aio_throttle.cc +++ b/src/rgw/rgw_aio_throttle.cc @@ -20,13 +20,6 @@ namespace rgw { -void AioThrottle::aio_cb(void *cb, void *arg) -{ - Pending& p = *static_cast(arg); - p.result = p.completion->get_return_value(); - p.parent->put(p); -} - bool AioThrottle::waiter_ready() const { switch (waiter) { @@ -37,76 +30,43 @@ bool AioThrottle::waiter_ready() const } } -AioResultList AioThrottle::submit(RGWSI_RADOS::Obj& obj, - librados::ObjectWriteOperation *op, - uint64_t cost, uint64_t id) +AioResultList AioThrottle::get(const RGWSI_RADOS::Obj& obj, + OpFunc&& f, + uint64_t cost, uint64_t id) { auto p = std::make_unique(); p->obj = obj; p->id = id; p->cost = cost; + std::unique_lock lock{mutex}; if (cost > window) { p->result = -EDEADLK; // would never succeed completed.push_back(*p); } else { - get(*p); - p->result = obj.aio_operate(p->completion, op); - if (p->result < 0) { - put(*p); + // wait for the write size to become available + pending_size += p->cost; + if (!is_available()) { + ceph_assert(waiter == Wait::None); + waiter = Wait::Available; + cond.wait(lock, [this] { return is_available(); }); + waiter = Wait::None; } - } - p.release(); - return std::move(completed); -} -AioResultList AioThrottle::submit(RGWSI_RADOS::Obj& obj, - librados::ObjectReadOperation *op, - uint64_t cost, uint64_t id) -{ - auto p = std::make_unique(); - p->obj = obj; - p->id = id; - p->cost = cost; - - if (cost > window) { - p->result = -EDEADLK; // would never succeed - completed.push_back(*p); - } else { - get(*p); - p->result = obj.aio_operate(p->completion, op, &p->data); - if (p->result < 0) { - put(*p); - } + // register the pending write and attach a completion + p->parent = this; + pending.push_back(*p); + lock.unlock(); + std::move(f)(this, *static_cast(p.get())); + lock.lock(); } p.release(); return std::move(completed); } -void AioThrottle::get(Pending& p) -{ - std::unique_lock lock{mutex}; - - // wait for the write size to become available - pending_size += p.cost; - if (!is_available()) { - ceph_assert(waiter == Wait::None); - waiter = Wait::Available; - cond.wait(lock, [this] { return is_available(); }); - waiter = Wait::None; - } - - // register the pending write and attach a completion - p.parent = this; - p.completion = librados::Rados::aio_create_completion(&p, nullptr, aio_cb); - pending.push_back(p); -} - -void AioThrottle::put(Pending& p) +void AioThrottle::put(AioResult& r) { - p.completion->release(); - p.completion = nullptr; - + auto& p = static_cast(r); std::scoped_lock lock{mutex}; // move from pending to completed diff --git a/src/rgw/rgw_aio_throttle.h b/src/rgw/rgw_aio_throttle.h index 751d7f9883ac..615301622cc5 100644 --- a/src/rgw/rgw_aio_throttle.h +++ b/src/rgw/rgw_aio_throttle.h @@ -38,7 +38,6 @@ class AioThrottle : public Aio { struct Pending : AioResultEntry { AioThrottle *parent = nullptr; uint64_t cost = 0; - librados::AioCompletion *completion = nullptr; }; OwningList pending; AioResultList completed; @@ -51,11 +50,6 @@ class AioThrottle : public Aio { ceph::mutex mutex = ceph::make_mutex("AioThrottle"); ceph::condition_variable cond; - void get(Pending& p); - void put(Pending& p); - - static void aio_cb(void *cb, void *arg); - public: AioThrottle(uint64_t window) : window(window) {} @@ -65,13 +59,11 @@ class AioThrottle : public Aio { ceph_assert(completed.empty()); } - AioResultList submit(RGWSI_RADOS::Obj& obj, - librados::ObjectReadOperation *op, - uint64_t cost, uint64_t id) override; + AioResultList get(const RGWSI_RADOS::Obj& obj, + OpFunc&& f, + uint64_t cost, uint64_t id) override; + void put(AioResult& r) override; - AioResultList submit(RGWSI_RADOS::Obj& obj, - librados::ObjectWriteOperation *op, - uint64_t cost, uint64_t id) override; AioResultList poll() override; diff --git a/src/rgw/rgw_putobj_processor.cc b/src/rgw/rgw_putobj_processor.cc index 3ebff9223c4b..45ac26c7ebca 100644 --- a/src/rgw/rgw_putobj_processor.cc +++ b/src/rgw/rgw_putobj_processor.cc @@ -92,7 +92,7 @@ int RadosWriter::process(bufferlist&& bl, uint64_t offset) op.write(offset, data); } constexpr uint64_t id = 0; // unused - auto c = aio->submit(stripe_obj, &op, cost, id); + auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op)), cost, id); return process_completed(c, &written); } @@ -105,7 +105,7 @@ int RadosWriter::write_exclusive(const bufferlist& data) op.write_full(data); constexpr uint64_t id = 0; // unused - auto c = aio->submit(stripe_obj, &op, cost, id); + auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op)), cost, id); auto d = aio->drain(); c.splice(c.end(), d); return process_completed(c, &written); diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index a99415efe5e8..dc8932411022 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -916,12 +916,12 @@ public: // implements DoutPrefixProvider CephContext *get_cct() const override { return store->ctx(); } - unsigned get_subsys() const + unsigned get_subsys() const override { return dout_subsys; } - std::ostream& gen_prefix(std::ostream& out) const + std::ostream& gen_prefix(std::ostream& out) const override { return out << "sync log trim: "; } @@ -6828,7 +6828,7 @@ int RGWRados::get_obj_iterate_cb(const rgw_raw_obj& read_obj, off_t obj_ofs, const uint64_t cost = len; const uint64_t id = obj_ofs; // use logical object offset for sorting replies - auto completed = d->aio->submit(obj, &op, cost, id); + auto completed = d->aio->get(obj, rgw::Aio::librados_op(std::move(op)), cost, id); return d->flush(std::move(completed)); } diff --git a/src/test/rgw/test_rgw_throttle.cc b/src/test/rgw/test_rgw_throttle.cc index c1b97a0482ae..88eb6cecead4 100644 --- a/src/test/rgw/test_rgw_throttle.cc +++ b/src/test/rgw/test_rgw_throttle.cc @@ -33,7 +33,7 @@ struct RadosEnv : public ::testing::Environment { r = 0; ASSERT_EQ(0, r); } - void TearDown() { + void TearDown() override { rados.reset(); } }; @@ -61,16 +61,16 @@ TEST_F(Aio_Throttle, NoThrottleUpToMax) auto obj = make_obj(__PRETTY_FUNCTION__); { librados::ObjectWriteOperation op1; - auto c1 = throttle.submit(obj, &op1, 1, 0); + auto c1 = throttle.get(obj, rgw::Aio::librados_op(std::move(op1)), 1, 0); EXPECT_TRUE(c1.empty()); librados::ObjectWriteOperation op2; - auto c2 = throttle.submit(obj, &op2, 1, 0); + auto c2 = throttle.get(obj, rgw::Aio::librados_op(std::move(op2)), 1, 0); EXPECT_TRUE(c2.empty()); librados::ObjectWriteOperation op3; - auto c3 = throttle.submit(obj, &op3, 1, 0); + auto c3 = throttle.get(obj, rgw::Aio::librados_op(std::move(op3)), 1, 0); EXPECT_TRUE(c3.empty()); librados::ObjectWriteOperation op4; - auto c4 = throttle.submit(obj, &op4, 1, 0); + auto c4 = throttle.get(obj, rgw::Aio::librados_op(std::move(op4)), 1, 0); EXPECT_TRUE(c4.empty()); // no completions because no ops had to wait auto c5 = throttle.poll(); @@ -88,7 +88,7 @@ TEST_F(Aio_Throttle, CostOverWindow) auto obj = make_obj(__PRETTY_FUNCTION__); librados::ObjectWriteOperation op; - auto c = throttle.submit(obj, &op, 8, 0); + auto c = throttle.get(obj, rgw::Aio::librados_op(std::move(op)), 8, 0); ASSERT_EQ(1u, c.size()); EXPECT_EQ(-EDEADLK, c.front().result); } @@ -107,7 +107,7 @@ TEST_F(Aio_Throttle, ThrottleOverMax) for (uint64_t i = 0; i < total; i++) { librados::ObjectWriteOperation op; - auto c = throttle.submit(obj, &op, 1, 0); + auto c = throttle.get(obj, rgw::Aio::librados_op(std::move(op)), 1, 0); outstanding++; outstanding -= c.size(); if (max_outstanding < outstanding) {