]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: Refactor Aio and AioThrottle 26461/head
authorAdam C. Emerson <aemerson@redhat.com>
Sat, 16 Feb 2019 05:57:36 +0000 (00:57 -0500)
committerAdam C. Emerson <aemerson@redhat.com>
Mon, 8 Apr 2019 16:45:44 +0000 (12:45 -0400)
To support in-development work on local caching (and in-development
work on having RGW use the 'unleashed' version of RADOS) divorce the
implementation of AioThrottle from the specifics of librados in accord
with a design that Casey Bodley <cbodley@redhat.com> outlined.

Also include a 'support' function to simplify using the aio_operate
machinery on librados. It's a bit ugly given how completions are
implemented, but all that's hidden away.

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/rgw/CMakeLists.txt
src/rgw/rgw_aio.cc [new file with mode: 0644]
src/rgw/rgw_aio.h
src/rgw/rgw_aio_throttle.cc
src/rgw/rgw_aio_throttle.h
src/rgw/rgw_putobj_processor.cc
src/rgw/rgw_rados.cc
src/test/rgw/test_rgw_throttle.cc

index 412ccb62cbb9f4f6abdfe1f57e1536f165484511..23881c3c9d7dddc3637830cea6fbdeba88cfabee 100644 (file)
@@ -34,6 +34,7 @@ set(librgw_common_srcs
   rgw_acl.cc
   rgw_acl_s3.cc
   rgw_acl_swift.cc
+  rgw_aio.cc
   rgw_aio_throttle.cc
   rgw_auth.cc
   rgw_auth_s3.cc
diff --git a/src/rgw/rgw_aio.cc b/src/rgw/rgw_aio.cc
new file mode 100644 (file)
index 0000000..b311982
--- /dev/null
@@ -0,0 +1,74 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <type_traits>
+#include "include/rados/librados.hpp"
+
+#include "rgw_aio.h"
+
+namespace rgw {
+namespace {
+void cb(librados::completion_t, void* arg);
+}
+struct state {
+  Aio* aio;
+  librados::AioCompletion* c;
+
+  state(Aio* aio, AioResult& r)
+    : aio(aio),
+      c(librados::Rados::aio_create_completion(&r, nullptr, &cb)) {}
+};
+
+namespace {
+void cb(librados::completion_t, void* arg) {
+  static_assert(sizeof(AioResult::user_data) >= sizeof(state));
+  static_assert(std::is_trivially_destructible_v<state>);
+  auto& r = *(static_cast<AioResult*>(arg));
+  auto s = reinterpret_cast<state*>(&r.user_data);
+  r.result = s->c->get_return_value();
+  s->c->release();
+  s->aio->put(r);
+}
+
+template<typename T,
+        bool read = std::is_same_v<std::decay_t<T>,
+                                   librados::ObjectReadOperation>,
+        typename = std::enable_if_t<std::is_base_of_v<librados::ObjectOperation,
+                                                      std::decay_t<T>> &&
+                                    !std::is_lvalue_reference_v<T> &&
+                                    !std::is_const_v<T>>>
+Aio::OpFunc aio_abstract(T&& op) {
+  return [op = std::move(op)](Aio* aio, AioResult& r) mutable {
+          auto s = new (&r.user_data) state(aio, r);
+          if constexpr (read) {
+            r.result = r.obj.aio_operate(s->c, &op, &r.data);
+          } else {
+            r.result = r.obj.aio_operate(s->c, &op);
+          }
+          if (r.result < 0) {
+            s->c->release();
+            aio->put(r);
+          }
+        };
+}
+}
+
+Aio::OpFunc Aio::librados_op(librados::ObjectReadOperation&& op) {
+  return aio_abstract(std::move(op));
+}
+Aio::OpFunc Aio::librados_op(librados::ObjectWriteOperation&& op) {
+  return aio_abstract(std::move(op));
+}
+}
index 0ca401dae048d148cb4a4726c23a23ca130b3b1e..4602be0a622a0612121f8581e19ac5ef634dca21 100644 (file)
 
 #pragma once
 
-#include "include/rados/librados_fwd.hpp"
+#include <cstdint>
+#include <memory>
+#include <type_traits>
+
 #include <boost/intrusive/list.hpp>
-#include "rgw_common.h"
+#include "include/rados/librados_fwd.hpp"
+
 #include "services/svc_rados.h" // cant forward declare RGWSI_RADOS::Obj
 
+#include "rgw_common.h"
+
+#include "include/function2.hpp"
+
 namespace rgw {
 
 struct AioResult {
@@ -27,6 +35,13 @@ struct AioResult {
   uint64_t id = 0; // id allows caller to associate a result with its request
   bufferlist data; // result buffer for reads
   int result = 0;
+  std::aligned_storage_t<3 * sizeof(void*)> user_data;
+
+  AioResult() = default;
+  AioResult(const AioResult&) = delete;
+  AioResult& operator =(const AioResult&) = delete;
+  AioResult(AioResult&&) = delete;
+  AioResult& operator =(AioResult&&) = delete;
 };
 struct AioResultEntry : AioResult, boost::intrusive::list_base_hook<> {
   virtual ~AioResultEntry() {}
@@ -57,15 +72,14 @@ inline int check_for_errors(const AioResultList& results) {
 // each call returns a list of results from prior completions
 class Aio {
  public:
-  virtual ~Aio() {}
+  using OpFunc = fu2::unique_function<void(Aio*, AioResult&) &&>;
 
-  virtual AioResultList submit(RGWSI_RADOS::Obj& obj,
-                               librados::ObjectReadOperation *op,
-                               uint64_t cost, uint64_t id) = 0;
+  virtual ~Aio() {}
 
-  virtual AioResultList submit(RGWSI_RADOS::Obj& obj,
-                               librados::ObjectWriteOperation *op,
-                               uint64_t cost, uint64_t id) = 0;
+  virtual AioResultList get(const RGWSI_RADOS::Obj& obj,
+                           OpFunc&& f,
+                           uint64_t cost, uint64_t id) = 0;
+  virtual void put(AioResult& r) = 0;
 
   // poll for any ready completions without waiting
   virtual AioResultList poll() = 0;
@@ -75,6 +89,9 @@ class Aio {
 
   // wait for all outstanding completions and return their results
   virtual AioResultList drain() = 0;
+
+  static OpFunc librados_op(librados::ObjectReadOperation&& op);
+  static OpFunc librados_op(librados::ObjectWriteOperation&& op);
 };
 
 } // namespace rgw
index 1ac1be259d3ee1c455442407b87671ca9e292efa..c596ba4ed8e816d7a2f1594ae5ec18eb2f2039b7 100644 (file)
 
 namespace rgw {
 
-void AioThrottle::aio_cb(void *cb, void *arg)
-{
-  Pending& p = *static_cast<Pending*>(arg);
-  p.result = p.completion->get_return_value();
-  p.parent->put(p);
-}
-
 bool AioThrottle::waiter_ready() const
 {
   switch (waiter) {
@@ -37,76 +30,43 @@ bool AioThrottle::waiter_ready() const
   }
 }
 
-AioResultList AioThrottle::submit(RGWSI_RADOS::Obj& obj,
-                                  librados::ObjectWriteOperation *op,
-                                  uint64_t cost, uint64_t id)
+AioResultList AioThrottle::get(const RGWSI_RADOS::Obj& obj,
+                              OpFunc&& f,
+                              uint64_t cost, uint64_t id)
 {
   auto p = std::make_unique<Pending>();
   p->obj = obj;
   p->id = id;
   p->cost = cost;
 
+  std::unique_lock lock{mutex};
   if (cost > window) {
     p->result = -EDEADLK; // would never succeed
     completed.push_back(*p);
   } else {
-    get(*p);
-    p->result = obj.aio_operate(p->completion, op);
-    if (p->result < 0) {
-      put(*p);
+    // wait for the write size to become available
+    pending_size += p->cost;
+    if (!is_available()) {
+      ceph_assert(waiter == Wait::None);
+      waiter = Wait::Available;
+      cond.wait(lock, [this] { return is_available(); });
+      waiter = Wait::None;
     }
-  }
-  p.release();
-  return std::move(completed);
-}
 
-AioResultList AioThrottle::submit(RGWSI_RADOS::Obj& obj,
-                                  librados::ObjectReadOperation *op,
-                                  uint64_t cost, uint64_t id)
-{
-  auto p = std::make_unique<Pending>();
-  p->obj = obj;
-  p->id = id;
-  p->cost = cost;
-
-  if (cost > window) {
-    p->result = -EDEADLK; // would never succeed
-    completed.push_back(*p);
-  } else {
-    get(*p);
-    p->result = obj.aio_operate(p->completion, op, &p->data);
-    if (p->result < 0) {
-      put(*p);
-    }
+    // register the pending write and attach a completion
+    p->parent = this;
+    pending.push_back(*p);
+    lock.unlock();
+    std::move(f)(this, *static_cast<AioResult*>(p.get()));
+    lock.lock();
   }
   p.release();
   return std::move(completed);
 }
 
-void AioThrottle::get(Pending& p)
-{
-  std::unique_lock lock{mutex};
-
-  // wait for the write size to become available
-  pending_size += p.cost;
-  if (!is_available()) {
-    ceph_assert(waiter == Wait::None);
-    waiter = Wait::Available;
-    cond.wait(lock, [this] { return is_available(); });
-    waiter = Wait::None;
-  }
-
-  // register the pending write and attach a completion
-  p.parent = this;
-  p.completion = librados::Rados::aio_create_completion(&p, nullptr, aio_cb);
-  pending.push_back(p);
-}
-
-void AioThrottle::put(Pending& p)
+void AioThrottle::put(AioResult& r)
 {
-  p.completion->release();
-  p.completion = nullptr;
-
+  auto& p = static_cast<Pending&>(r);
   std::scoped_lock lock{mutex};
 
   // move from pending to completed
index 751d7f9883ac8022bc65d2d989f78c50a4f03da3..615301622cc52cfc8e742998a8f07e16a22f6513 100644 (file)
@@ -38,7 +38,6 @@ class AioThrottle : public Aio {
   struct Pending : AioResultEntry {
     AioThrottle *parent = nullptr;
     uint64_t cost = 0;
-    librados::AioCompletion *completion = nullptr;
   };
   OwningList<Pending> pending;
   AioResultList completed;
@@ -51,11 +50,6 @@ class AioThrottle : public Aio {
   ceph::mutex mutex = ceph::make_mutex("AioThrottle");
   ceph::condition_variable cond;
 
-  void get(Pending& p);
-  void put(Pending& p);
-
-  static void aio_cb(void *cb, void *arg);
-
  public:
   AioThrottle(uint64_t window) : window(window) {}
 
@@ -65,13 +59,11 @@ class AioThrottle : public Aio {
     ceph_assert(completed.empty());
   }
 
-  AioResultList submit(RGWSI_RADOS::Obj& obj,
-                       librados::ObjectReadOperation *op,
-                       uint64_t cost, uint64_t id) override;
+  AioResultList get(const RGWSI_RADOS::Obj& obj,
+                   OpFunc&& f,
+                   uint64_t cost, uint64_t id) override;
+  void put(AioResult& r) override;
 
-  AioResultList submit(RGWSI_RADOS::Obj& obj,
-                       librados::ObjectWriteOperation *op,
-                       uint64_t cost, uint64_t id) override;
 
   AioResultList poll() override;
 
index 3ebff9223c4b7147957ba84a4407a4ffca12b8b6..45ac26c7ebca4c41dd3788b6d0b2d5027ce780b2 100644 (file)
@@ -92,7 +92,7 @@ int RadosWriter::process(bufferlist&& bl, uint64_t offset)
     op.write(offset, data);
   }
   constexpr uint64_t id = 0; // unused
-  auto c = aio->submit(stripe_obj, &op, cost, id);
+  auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op)), cost, id);
   return process_completed(c, &written);
 }
 
@@ -105,7 +105,7 @@ int RadosWriter::write_exclusive(const bufferlist& data)
   op.write_full(data);
 
   constexpr uint64_t id = 0; // unused
-  auto c = aio->submit(stripe_obj, &op, cost, id);
+  auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op)), cost, id);
   auto d = aio->drain();
   c.splice(c.end(), d);
   return process_completed(c, &written);
index a99415efe5e86823718299319b8fb59b21fda3b6..dc8932411022babcecf369b95d369a7fbcc84cf8 100644 (file)
@@ -916,12 +916,12 @@ public:
 
   // implements DoutPrefixProvider
   CephContext *get_cct() const override { return store->ctx(); }
-  unsigned get_subsys() const
+  unsigned get_subsys() const override
   {
     return dout_subsys;
   }
 
-  std::ostream& gen_prefix(std::ostream& out) const
+  std::ostream& gen_prefix(std::ostream& out) const override
   {
     return out << "sync log trim: ";
   }
@@ -6828,7 +6828,7 @@ int RGWRados::get_obj_iterate_cb(const rgw_raw_obj& read_obj, off_t obj_ofs,
   const uint64_t cost = len;
   const uint64_t id = obj_ofs; // use logical object offset for sorting replies
 
-  auto completed = d->aio->submit(obj, &op, cost, id);
+  auto completed = d->aio->get(obj, rgw::Aio::librados_op(std::move(op)), cost, id);
 
   return d->flush(std::move(completed));
 }
index c1b97a0482ae832553636350c43632d379163a04..88eb6cecead4012a89c5e7e0d15a938705902682 100644 (file)
@@ -33,7 +33,7 @@ struct RadosEnv : public ::testing::Environment {
       r = 0;
     ASSERT_EQ(0, r);
   }
-  void TearDown() {
+  void TearDown() override {
     rados.reset();
   }
 };
@@ -61,16 +61,16 @@ TEST_F(Aio_Throttle, NoThrottleUpToMax)
   auto obj = make_obj(__PRETTY_FUNCTION__);
   {
     librados::ObjectWriteOperation op1;
-    auto c1 = throttle.submit(obj, &op1, 1, 0);
+    auto c1 = throttle.get(obj, rgw::Aio::librados_op(std::move(op1)), 1, 0);
     EXPECT_TRUE(c1.empty());
     librados::ObjectWriteOperation op2;
-    auto c2 = throttle.submit(obj, &op2, 1, 0);
+    auto c2 = throttle.get(obj, rgw::Aio::librados_op(std::move(op2)), 1, 0);
     EXPECT_TRUE(c2.empty());
     librados::ObjectWriteOperation op3;
-    auto c3 = throttle.submit(obj, &op3, 1, 0);
+    auto c3 = throttle.get(obj, rgw::Aio::librados_op(std::move(op3)), 1, 0);
     EXPECT_TRUE(c3.empty());
     librados::ObjectWriteOperation op4;
-    auto c4 = throttle.submit(obj, &op4, 1, 0);
+    auto c4 = throttle.get(obj, rgw::Aio::librados_op(std::move(op4)), 1, 0);
     EXPECT_TRUE(c4.empty());
     // no completions because no ops had to wait
     auto c5 = throttle.poll();
@@ -88,7 +88,7 @@ TEST_F(Aio_Throttle, CostOverWindow)
   auto obj = make_obj(__PRETTY_FUNCTION__);
 
   librados::ObjectWriteOperation op;
-  auto c = throttle.submit(obj, &op, 8, 0);
+  auto c = throttle.get(obj, rgw::Aio::librados_op(std::move(op)), 8, 0);
   ASSERT_EQ(1u, c.size());
   EXPECT_EQ(-EDEADLK, c.front().result);
 }
@@ -107,7 +107,7 @@ TEST_F(Aio_Throttle, ThrottleOverMax)
 
   for (uint64_t i = 0; i < total; i++) {
     librados::ObjectWriteOperation op;
-    auto c = throttle.submit(obj, &op, 1, 0);
+    auto c = throttle.get(obj, rgw::Aio::librados_op(std::move(op)), 1, 0);
     outstanding++;
     outstanding -= c.size();
     if (max_outstanding < outstanding) {