]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: move aio throttles to namespace rgw
authorCasey Bodley <cbodley@redhat.com>
Sat, 17 Nov 2018 16:38:02 +0000 (11:38 -0500)
committerCasey Bodley <cbodley@redhat.com>
Wed, 5 Dec 2018 16:16:54 +0000 (11:16 -0500)
they're useful outside of putobj

Signed-off-by: Casey Bodley <cbodley@redhat.com>
13 files changed:
src/rgw/CMakeLists.txt
src/rgw/rgw_aio.h [new file with mode: 0644]
src/rgw/rgw_aio_throttle.cc [new file with mode: 0644]
src/rgw/rgw_aio_throttle.h [new file with mode: 0644]
src/rgw/rgw_file.h
src/rgw/rgw_op.cc
src/rgw/rgw_putobj_aio.h [deleted file]
src/rgw/rgw_putobj_processor.cc
src/rgw/rgw_putobj_processor.h
src/rgw/rgw_putobj_throttle.cc [deleted file]
src/rgw/rgw_putobj_throttle.h [deleted file]
src/rgw/rgw_rados.cc
src/test/rgw/test_rgw_throttle.cc

index 99add82c707f2df3bbf0dbc7a4b989204f072643..502ab0891acb35d182578a2cff93ad3b6f1aac94 100644 (file)
@@ -53,6 +53,7 @@ set(librgw_common_srcs
   rgw_acl.cc
   rgw_acl_s3.cc
   rgw_acl_swift.cc
+  rgw_aio_throttle.cc
   rgw_auth.cc
   rgw_auth_s3.cc
   rgw_basic_types.cc
@@ -97,7 +98,6 @@ set(librgw_common_srcs
   rgw_policy_s3.cc
   rgw_putobj.cc
   rgw_putobj_processor.cc
-  rgw_putobj_throttle.cc
   rgw_quota.cc
   rgw_rados.cc
   rgw_resolve.cc
diff --git a/src/rgw/rgw_aio.h b/src/rgw/rgw_aio.h
new file mode 100644 (file)
index 0000000..25f828a
--- /dev/null
@@ -0,0 +1,83 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <boost/intrusive/list.hpp>
+#include "rgw_common.h"
+#include "services/svc_rados.h" // cant forward declare RGWSI_RADOS::Obj
+
+namespace librados {
+class ObjectReadOperation;
+class ObjectWriteOperation;
+}
+
+namespace rgw {
+
+struct AioResult {
+  rgw_raw_obj obj;
+  int result = 0;
+};
+struct AioResultEntry : AioResult, boost::intrusive::list_base_hook<> {
+  virtual ~AioResultEntry() {}
+};
+// a list of polymorphic entries that frees them on destruction
+template <typename T, typename ...Args>
+struct OwningList : boost::intrusive::list<T, Args...> {
+  OwningList() = default;
+  ~OwningList() { this->clear_and_dispose(std::default_delete<T>{}); }
+  OwningList(OwningList&&) = default;
+  OwningList& operator=(OwningList&&) = default;
+  OwningList(const OwningList&) = delete;
+  OwningList& operator=(const OwningList&) = delete;
+};
+using AioResultList = OwningList<AioResultEntry>;
+
+// returns the first error code or 0 if all succeeded
+inline int check_for_errors(const AioResultList& results) {
+  for (auto& e : results) {
+    if (e.result < 0) {
+      return e.result;
+    }
+  }
+  return 0;
+}
+
+// interface to submit async librados operations and wait on their completions.
+// each call returns a list of results from prior completions
+class Aio {
+ public:
+  virtual ~Aio() {}
+
+  virtual AioResultList submit(RGWSI_RADOS::Obj& obj,
+                               const rgw_raw_obj& raw_obj,
+                               librados::ObjectReadOperation *op,
+                               bufferlist *data, uint64_t cost) = 0;
+
+  virtual AioResultList submit(RGWSI_RADOS::Obj& obj,
+                               const rgw_raw_obj& raw_obj,
+                               librados::ObjectWriteOperation *op,
+                               uint64_t cost) = 0;
+
+  // poll for any ready completions without waiting
+  virtual AioResultList poll() = 0;
+
+  // return any ready completions. if there are none, wait for the next
+  virtual AioResultList wait() = 0;
+
+  // wait for all outstanding completions and return their results
+  virtual AioResultList drain() = 0;
+};
+
+} // namespace rgw
diff --git a/src/rgw/rgw_aio_throttle.cc b/src/rgw/rgw_aio_throttle.cc
new file mode 100644 (file)
index 0000000..bf8a8a8
--- /dev/null
@@ -0,0 +1,152 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "include/rados/librados.hpp"
+
+#include "rgw_aio_throttle.h"
+#include "rgw_rados.h"
+
+namespace rgw {
+
+void AioThrottle::aio_cb(void *cb, void *arg)
+{
+  Pending& p = *static_cast<Pending*>(arg);
+  p.result = p.completion->get_return_value();
+  p.parent->put(p);
+}
+
+bool AioThrottle::waiter_ready() const
+{
+  switch (waiter) {
+  case Wait::Available: return is_available();
+  case Wait::Completion: return has_completion();
+  case Wait::Drained: return is_drained();
+  default: return false;
+  }
+}
+
+AioResultList AioThrottle::submit(RGWSI_RADOS::Obj& obj,
+                                  const rgw_raw_obj& raw_obj,
+                                  librados::ObjectWriteOperation *op,
+                                  uint64_t cost)
+{
+  auto p = std::make_unique<Pending>();
+  p->obj = raw_obj;
+  p->cost = cost;
+
+  if (cost > window) {
+    p->result = -EDEADLK; // would never succeed
+    completed.push_back(*p);
+  } else {
+    get(*p);
+    p->result = obj.aio_operate(p->completion, op);
+    if (p->result < 0) {
+      put(*p);
+    }
+  }
+  p.release();
+  return std::move(completed);
+}
+
+AioResultList AioThrottle::submit(RGWSI_RADOS::Obj& obj,
+                                  const rgw_raw_obj& raw_obj,
+                                  librados::ObjectReadOperation *op,
+                                  bufferlist *data, uint64_t cost)
+{
+  auto p = std::make_unique<Pending>();
+  p->obj = raw_obj;
+  p->cost = cost;
+
+  if (cost > window) {
+    p->result = -EDEADLK; // would never succeed
+    completed.push_back(*p);
+  } else {
+    get(*p);
+    p->result = obj.aio_operate(p->completion, op, data);
+    if (p->result < 0) {
+      put(*p);
+    }
+  }
+  p.release();
+  return std::move(completed);
+}
+
+void AioThrottle::get(Pending& p)
+{
+  std::unique_lock lock{mutex};
+
+  // wait for the write size to become available
+  pending_size += p.cost;
+  if (!is_available()) {
+    ceph_assert(waiter == Wait::None);
+    waiter = Wait::Available;
+    cond.wait(lock, [this] { return is_available(); });
+    waiter = Wait::None;
+  }
+
+  // register the pending write and attach a completion
+  p.parent = this;
+  p.completion = librados::Rados::aio_create_completion(&p, nullptr, aio_cb);
+  pending.push_back(p);
+}
+
+void AioThrottle::put(Pending& p)
+{
+  p.completion->release();
+  p.completion = nullptr;
+
+  std::scoped_lock lock{mutex};
+
+  // move from pending to completed
+  pending.erase(pending.iterator_to(p));
+  completed.push_back(p);
+
+  pending_size -= p.cost;
+
+  if (waiter_ready()) {
+    cond.notify_one();
+  }
+}
+
+AioResultList AioThrottle::poll()
+{
+  std::unique_lock lock{mutex};
+  return std::move(completed);
+}
+
+AioResultList AioThrottle::wait()
+{
+  std::unique_lock lock{mutex};
+  if (completed.empty() && !pending.empty()) {
+    ceph_assert(waiter == Wait::None);
+    waiter = Wait::Completion;
+    cond.wait(lock, [this] { return has_completion(); });
+    waiter = Wait::None;
+  }
+  return std::move(completed);
+}
+
+AioResultList AioThrottle::drain()
+{
+  std::unique_lock lock{mutex};
+  if (!pending.empty()) {
+    ceph_assert(waiter == Wait::None);
+    waiter = Wait::Drained;
+    cond.wait(lock, [this] { return is_drained(); });
+    waiter = Wait::None;
+  }
+  return std::move(completed);
+}
+
+} // namespace rgw
diff --git a/src/rgw/rgw_aio_throttle.h b/src/rgw/rgw_aio_throttle.h
new file mode 100644 (file)
index 0000000..1f9c53d
--- /dev/null
@@ -0,0 +1,85 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Red Hat, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <memory>
+#include "common/ceph_mutex.h"
+#include "services/svc_rados.h"
+#include "rgw_aio.h"
+
+namespace librados {
+class AioCompletion;
+}
+
+namespace rgw {
+
+// a throttle for aio operations that enforces a maximum window on outstanding
+// bytes. only supports a single waiter, so all public functions must be called
+// from the same thread
+class AioThrottle : public Aio {
+ protected:
+  const uint64_t window;
+  uint64_t pending_size = 0;
+
+  bool is_available() const { return pending_size <= window; }
+  bool has_completion() const { return !completed.empty(); }
+  bool is_drained() const { return pending.empty(); }
+
+  struct Pending : AioResultEntry {
+    AioThrottle *parent = nullptr;
+    uint64_t cost = 0;
+    librados::AioCompletion *completion = nullptr;
+  };
+  OwningList<Pending> pending;
+  AioResultList completed;
+
+  enum class Wait { None, Available, Completion, Drained };
+  Wait waiter = Wait::None;
+
+  bool waiter_ready() const;
+
+  ceph::mutex mutex = ceph::make_mutex("AioThrottle");
+  ceph::condition_variable cond;
+
+  void get(Pending& p);
+  void put(Pending& p);
+
+  static void aio_cb(void *cb, void *arg);
+
+ public:
+  AioThrottle(uint64_t window) : window(window) {}
+
+  virtual ~AioThrottle() {
+    // must drain before destructing
+    ceph_assert(pending.empty());
+    ceph_assert(completed.empty());
+  }
+
+  AioResultList submit(RGWSI_RADOS::Obj& obj, const rgw_raw_obj& raw_obj,
+                       librados::ObjectReadOperation *op,
+                       bufferlist *data, uint64_t cost) override;
+
+  AioResultList submit(RGWSI_RADOS::Obj& obj, const rgw_raw_obj& raw_obj,
+                       librados::ObjectWriteOperation *op,
+                       uint64_t cost) override;
+
+  AioResultList poll() override;
+
+  AioResultList wait() override;
+
+  AioResultList drain() override;
+};
+
+} // namespace rgw
index 8702b0060dc83034029675067f171b85ea5a2415..f91399b6de529ab7f95bcb8155e868e7547b069c 100644 (file)
@@ -35,7 +35,7 @@
 #include "rgw_ldap.h"
 #include "rgw_token.h"
 #include "rgw_putobj_processor.h"
-#include "rgw_putobj_throttle.h"
+#include "rgw_aio_throttle.h"
 #include "rgw_compression.h"
 
 
@@ -2320,7 +2320,7 @@ public:
   const std::string& bucket_name;
   const std::string& obj_name;
   RGWFileHandle* rgw_fh;
-  std::optional<rgw::putobj::AioThrottle> aio;
+  std::optional<rgw::AioThrottle> aio;
   std::optional<rgw::putobj::AtomicObjectProcessor> processor;
   rgw::putobj::DataProcessor* filter;
   boost::optional<RGWPutObj_Compress> compressor;
index 552e3894ff5d0e7ed142efaf676fb58fa4eba967..2f8044981fbae97ef76f307851a47e5296215873 100644 (file)
@@ -30,6 +30,7 @@
 #include "rgw_acl.h"
 #include "rgw_acl_s3.h"
 #include "rgw_acl_swift.h"
+#include "rgw_aio_throttle.h"
 #include "rgw_user.h"
 #include "rgw_bucket.h"
 #include "rgw_log.h"
@@ -45,7 +46,6 @@
 #include "rgw_role.h"
 #include "rgw_tag_s3.h"
 #include "rgw_putobj_processor.h"
-#include "rgw_putobj_throttle.h"
 
 #include "services/svc_zone.h"
 #include "services/svc_quota.h"
@@ -3468,8 +3468,8 @@ void RGWPutObj::execute()
   }
 
   // create the object processor
+  rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
   using namespace rgw::putobj;
-  AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
   constexpr auto max_processor_size = std::max(sizeof(MultipartObjectProcessor),
                                                sizeof(AtomicObjectProcessor));
   ceph::static_ptr<ObjectProcessor, max_processor_size> processor;
@@ -3806,8 +3806,8 @@ void RGWPostObj::execute()
       store->gen_rand_obj_instance_name(&obj);
     }
 
+    rgw::AioThrottle aio(s->cct->_conf->rgw_put_obj_min_window_size);
     using namespace rgw::putobj;
-    AioThrottle aio(s->cct->_conf->rgw_put_obj_min_window_size);
     AtomicObjectProcessor processor(&aio, store, s->bucket_info,
                                     s->bucket_owner.get_id(),
                                     *static_cast<RGWObjectCtx*>(s->obj_ctx),
@@ -6579,8 +6579,8 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path,
     store->gen_rand_obj_instance_name(&obj);
   }
 
+  rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
   using namespace rgw::putobj;
-  AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
 
   AtomicObjectProcessor processor(&aio, store, binfo, bowner.get_id(),
                                   obj_ctx, obj, 0, s->req_id);
diff --git a/src/rgw/rgw_putobj_aio.h b/src/rgw/rgw_putobj_aio.h
deleted file mode 100644 (file)
index 73a07cc..0000000
+++ /dev/null
@@ -1,83 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2018 Red Hat, Inc.
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#pragma once
-
-#include <boost/intrusive/list.hpp>
-#include "rgw_common.h"
-#include "services/svc_rados.h" // cant forward declare RGWSI_RADOS::Obj
-
-namespace librados {
-class ObjectReadOperation;
-class ObjectWriteOperation;
-}
-
-namespace rgw::putobj {
-
-struct Result {
-  rgw_raw_obj obj;
-  int result = 0;
-};
-struct ResultEntry : Result, boost::intrusive::list_base_hook<> {
-  virtual ~ResultEntry() {}
-};
-// a list of polymorphic entries that frees them on destruction
-template <typename T, typename ...Args>
-struct OwningList : boost::intrusive::list<T, Args...> {
-  OwningList() = default;
-  ~OwningList() { this->clear_and_dispose(std::default_delete<T>{}); }
-  OwningList(OwningList&&) = default;
-  OwningList& operator=(OwningList&&) = default;
-  OwningList(const OwningList&) = delete;
-  OwningList& operator=(const OwningList&) = delete;
-};
-using ResultList = OwningList<ResultEntry>;
-
-// returns the first error code or 0 if all succeeded
-inline int check_for_errors(const ResultList& results) {
-  for (auto& e : results) {
-    if (e.result < 0) {
-      return e.result;
-    }
-  }
-  return 0;
-}
-
-// interface to submit async librados operations and wait on their completions.
-// each call returns a list of results from prior completions
-class Aio {
- public:
-  virtual ~Aio() {}
-
-  virtual ResultList submit(RGWSI_RADOS::Obj& obj,
-                            const rgw_raw_obj& raw_obj,
-                            librados::ObjectReadOperation *op,
-                            bufferlist *data, uint64_t cost) = 0;
-
-  virtual ResultList submit(RGWSI_RADOS::Obj& obj,
-                            const rgw_raw_obj& raw_obj,
-                            librados::ObjectWriteOperation *op,
-                            uint64_t cost) = 0;
-
-  // poll for any ready completions without waiting
-  virtual ResultList poll() = 0;
-
-  // return any ready completions. if there are none, wait for the next
-  virtual ResultList wait() = 0;
-
-  // wait for all outstanding completions and return their results
-  virtual ResultList drain() = 0;
-};
-
-} // namespace rgw::putobj
index 315f36f84bd370f99a2a3455ece71cff9943dca4..4dbb2599fca2433d2cc43225f9d90e847fdc5c71 100644 (file)
@@ -12,7 +12,7 @@
  *
  */
 
-#include "rgw_putobj_aio.h"
+#include "rgw_aio.h"
 #include "rgw_putobj_processor.h"
 #include "rgw_multi.h"
 #include "services/svc_sys_obj.h"
@@ -58,7 +58,7 @@ int HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset)
 }
 
 
-static int process_completed(const ResultList& completed, RawObjSet *written)
+static int process_completed(const AioResultList& completed, RawObjSet *written)
 {
   std::optional<int> error;
   for (auto& r : completed) {
index 65c26f8e3be3af131126d014856a98e3ef55055f..4aca1a3225dde134991253b08f328937ca583403 100644 (file)
 #include "rgw_rados.h"
 #include "services/svc_rados.h"
 
-namespace rgw::putobj {
+namespace rgw {
+
+class Aio;
+
+namespace putobj {
 
 // a data consumer that writes an object in a bucket
 class ObjectProcessor : public DataProcessor {
@@ -67,7 +71,6 @@ class HeadObjectProcessor : public ObjectProcessor {
 };
 
 
-class Aio;
 using RawObjSet = std::set<rgw_raw_obj>;
 
 // a data sink that writes to rados objects and deletes them on cancelation
@@ -210,4 +213,5 @@ class MultipartObjectProcessor : public ManifestObjectProcessor {
                rgw_zone_set *zones_trace, bool *canceled) override;
 };
 
-} // namespace rgw::putobj
+} // namespace putobj
+} // namespace rgw
diff --git a/src/rgw/rgw_putobj_throttle.cc b/src/rgw/rgw_putobj_throttle.cc
deleted file mode 100644 (file)
index 6587888..0000000
+++ /dev/null
@@ -1,152 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2018 Red Hat, Inc.
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#include "include/rados/librados.hpp"
-
-#include "rgw_putobj_throttle.h"
-#include "rgw_rados.h"
-
-namespace rgw::putobj {
-
-void AioThrottle::aio_cb(void *cb, void *arg)
-{
-  Pending& p = *static_cast<Pending*>(arg);
-  p.result = p.completion->get_return_value();
-  p.parent->put(p);
-}
-
-bool AioThrottle::waiter_ready() const
-{
-  switch (waiter) {
-  case Wait::Available: return is_available();
-  case Wait::Completion: return has_completion();
-  case Wait::Drained: return is_drained();
-  default: return false;
-  }
-}
-
-ResultList AioThrottle::submit(RGWSI_RADOS::Obj& obj,
-                               const rgw_raw_obj& raw_obj,
-                               librados::ObjectWriteOperation *op,
-                               uint64_t cost)
-{
-  auto p = std::make_unique<Pending>();
-  p->obj = raw_obj;
-  p->cost = cost;
-
-  if (cost > window) {
-    p->result = -EDEADLK; // would never succeed
-    completed.push_back(*p);
-  } else {
-    get(*p);
-    p->result = obj.aio_operate(p->completion, op);
-    if (p->result < 0) {
-      put(*p);
-    }
-  }
-  p.release();
-  return std::move(completed);
-}
-
-ResultList AioThrottle::submit(RGWSI_RADOS::Obj& obj,
-                               const rgw_raw_obj& raw_obj,
-                               librados::ObjectReadOperation *op,
-                               bufferlist *data, uint64_t cost)
-{
-  auto p = std::make_unique<Pending>();
-  p->obj = raw_obj;
-  p->cost = cost;
-
-  if (cost > window) {
-    p->result = -EDEADLK; // would never succeed
-    completed.push_back(*p);
-  } else {
-    get(*p);
-    p->result = obj.aio_operate(p->completion, op, data);
-    if (p->result < 0) {
-      put(*p);
-    }
-  }
-  p.release();
-  return std::move(completed);
-}
-
-void AioThrottle::get(Pending& p)
-{
-  std::unique_lock lock{mutex};
-
-  // wait for the write size to become available
-  pending_size += p.cost;
-  if (!is_available()) {
-    ceph_assert(waiter == Wait::None);
-    waiter = Wait::Available;
-    cond.wait(lock, [this] { return is_available(); });
-    waiter = Wait::None;
-  }
-
-  // register the pending write and attach a completion
-  p.parent = this;
-  p.completion = librados::Rados::aio_create_completion(&p, nullptr, aio_cb);
-  pending.push_back(p);
-}
-
-void AioThrottle::put(Pending& p)
-{
-  p.completion->release();
-  p.completion = nullptr;
-
-  std::scoped_lock lock{mutex};
-
-  // move from pending to completed
-  pending.erase(pending.iterator_to(p));
-  completed.push_back(p);
-
-  pending_size -= p.cost;
-
-  if (waiter_ready()) {
-    cond.notify_one();
-  }
-}
-
-ResultList AioThrottle::poll()
-{
-  std::unique_lock lock{mutex};
-  return std::move(completed);
-}
-
-ResultList AioThrottle::wait()
-{
-  std::unique_lock lock{mutex};
-  if (completed.empty() && !pending.empty()) {
-    ceph_assert(waiter == Wait::None);
-    waiter = Wait::Completion;
-    cond.wait(lock, [this] { return has_completion(); });
-    waiter = Wait::None;
-  }
-  return std::move(completed);
-}
-
-ResultList AioThrottle::drain()
-{
-  std::unique_lock lock{mutex};
-  if (!pending.empty()) {
-    ceph_assert(waiter == Wait::None);
-    waiter = Wait::Drained;
-    cond.wait(lock, [this] { return is_drained(); });
-    waiter = Wait::None;
-  }
-  return std::move(completed);
-}
-
-} // namespace rgw::putobj
diff --git a/src/rgw/rgw_putobj_throttle.h b/src/rgw/rgw_putobj_throttle.h
deleted file mode 100644 (file)
index 3421231..0000000
+++ /dev/null
@@ -1,85 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2018 Red Hat, Inc.
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#pragma once
-
-#include <memory>
-#include "common/ceph_mutex.h"
-#include "services/svc_rados.h"
-#include "rgw_putobj_aio.h"
-
-namespace librados {
-class AioCompletion;
-}
-
-namespace rgw::putobj {
-
-// a throttle for aio operations that enforces a maximum window on outstanding
-// bytes. only supports a single waiter, so all public functions must be called
-// from the same thread
-class AioThrottle : public Aio {
- protected:
-  const uint64_t window;
-  uint64_t pending_size = 0;
-
-  bool is_available() const { return pending_size <= window; }
-  bool has_completion() const { return !completed.empty(); }
-  bool is_drained() const { return pending.empty(); }
-
-  struct Pending : ResultEntry {
-    AioThrottle *parent = nullptr;
-    uint64_t cost = 0;
-    librados::AioCompletion *completion = nullptr;
-  };
-  OwningList<Pending> pending;
-  ResultList completed;
-
-  enum class Wait { None, Available, Completion, Drained };
-  Wait waiter = Wait::None;
-
-  bool waiter_ready() const;
-
-  ceph::mutex mutex = ceph::make_mutex("AioThrottle");
-  ceph::condition_variable cond;
-
-  void get(Pending& p);
-  void put(Pending& p);
-
-  static void aio_cb(void *cb, void *arg);
-
- public:
-  AioThrottle(uint64_t window) : window(window) {}
-
-  virtual ~AioThrottle() {
-    // must drain before destructing
-    ceph_assert(pending.empty());
-    ceph_assert(completed.empty());
-  }
-
-  ResultList submit(RGWSI_RADOS::Obj& obj, const rgw_raw_obj& raw_obj,
-                    librados::ObjectReadOperation *op,
-                    bufferlist *data, uint64_t cost) override;
-
-  ResultList submit(RGWSI_RADOS::Obj& obj, const rgw_raw_obj& raw_obj,
-                    librados::ObjectWriteOperation *op,
-                    uint64_t cost) override;
-
-  ResultList poll() override;
-
-  ResultList wait() override;
-
-  ResultList drain() override;
-};
-
-} // namespace rgw::putobj
index 93ce2247ee4358b668d3a87d413c5cc8b270df05..080195b6597b1cc62b0bc6962952c56e321d2ad8 100644 (file)
 #include "rgw_cache.h"
 #include "rgw_acl.h"
 #include "rgw_acl_s3.h" /* for dumping s3policy in debug log */
+#include "rgw_aio_throttle.h"
 #include "rgw_bucket.h"
 #include "rgw_rest_conn.h"
 #include "rgw_cr_rados.h"
 #include "rgw_cr_rest.h"
 #include "rgw_putobj_processor.h"
-#include "rgw_putobj_throttle.h"
 
 #include "cls/rgw/cls_rgw_ops.h"
 #include "cls/rgw/cls_rgw_client.h"
@@ -4183,8 +4183,8 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
   obj_time_weight set_mtime_weight;
   set_mtime_weight.high_precision = high_precision_time;
 
+  rgw::AioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
   using namespace rgw::putobj;
-  AioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
   AtomicObjectProcessor processor(&aio, this, dest_bucket_info, user_id,
                                   obj_ctx, dest_obj, olh_epoch, tag);
   int ret = processor.prepare();
@@ -4724,8 +4724,8 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx,
   string tag;
   append_rand_alpha(cct, tag, tag, 32);
 
+  rgw::AioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
   using namespace rgw::putobj;
-  AioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
   AtomicObjectProcessor processor(&aio, this, dest_bucket_info,
                                   dest_bucket_info.owner, obj_ctx,
                                   dest_obj, olh_epoch, tag);
index 322e58ce8154da96d349bd3cf05397c1bfc116b6..37132816c7723e2c37ad1ab069598198f51950a8 100644 (file)
@@ -12,7 +12,7 @@
  *
  */
 
-#include "rgw/rgw_putobj_throttle.h"
+#include "rgw/rgw_aio_throttle.h"
 #include "rgw/rgw_rados.h"
 
 #include "include/rados/librados.hpp"
@@ -54,18 +54,18 @@ class RadosFixture : public ::testing::Test {
   }
 };
 
-using PutObj_Throttle = RadosFixture;
+using Aio_Throttle = RadosFixture;
 
-namespace rgw::putobj {
+namespace rgw {
 
-inline bool operator==(const Result& lhs, const Result& rhs) {
+inline bool operator==(const AioResult& lhs, const AioResult& rhs) {
   return lhs.obj == rhs.obj && lhs.result == rhs.result;
 }
-std::ostream& operator<<(std::ostream& out, const Result& r) {
+std::ostream& operator<<(std::ostream& out, const AioResult& r) {
   return out << "{r=" << r.result << " obj='" << r.obj << "'";
 }
 
-TEST_F(PutObj_Throttle, NoThrottleUpToMax)
+TEST_F(Aio_Throttle, NoThrottleUpToMax)
 {
   AioThrottle throttle(4);
   auto raw = make_raw_obj(__PRETTY_FUNCTION__);
@@ -89,11 +89,11 @@ TEST_F(PutObj_Throttle, NoThrottleUpToMax)
   auto completions = throttle.drain();
   ASSERT_EQ(4u, completions.size());
   for (auto& c : completions) {
-    EXPECT_EQ(Result({raw, -EINVAL}), c);
+    EXPECT_EQ(AioResult({raw, -EINVAL}), c);
   }
 }
 
-TEST_F(PutObj_Throttle, CostOverWindow)
+TEST_F(Aio_Throttle, CostOverWindow)
 {
   AioThrottle throttle(4);
   auto raw = make_raw_obj(__PRETTY_FUNCTION__);
@@ -102,10 +102,10 @@ TEST_F(PutObj_Throttle, CostOverWindow)
   librados::ObjectWriteOperation op;
   auto c = throttle.submit(obj, raw, &op, 8);
   ASSERT_EQ(1u, c.size());
-  EXPECT_EQ(Result({raw, -EDEADLK}), c.front());
+  EXPECT_EQ(AioResult({raw, -EDEADLK}), c.front());
 }
 
-TEST_F(PutObj_Throttle, AioThrottleOverMax)
+TEST_F(Aio_Throttle, ThrottleOverMax)
 {
   constexpr uint64_t window = 4;
   AioThrottle throttle(window);
@@ -133,4 +133,4 @@ TEST_F(PutObj_Throttle, AioThrottleOverMax)
   EXPECT_EQ(window, max_outstanding);
 }
 
-} // namespace rgw::putobj
+} // namespace rgw