]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: Lay groundwork for multigenerational datalog
authorAdam C. Emerson <aemerson@redhat.com>
Wed, 6 Jan 2021 08:40:50 +0000 (03:40 -0500)
committerAdam C. Emerson <aemerson@redhat.com>
Mon, 29 Mar 2021 16:25:58 +0000 (12:25 -0400)
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/rgw/cls_fifo_legacy.cc
src/rgw/cls_fifo_legacy.h
src/rgw/rgw_datalog.cc
src/rgw/rgw_datalog.h
src/rgw/rgw_log_backing.h

index f95b796152d3379f92f29bebf2be542382491a19..3ddb2578d35410232285d6bdd6d0d6dbf9d7f8db 100644 (file)
 #include "cls/fifo/cls_fifo_types.h"
 #include "cls/fifo/cls_fifo_ops.h"
 
-#include "librados/AioCompletionImpl.h"
-
-#include "rgw_tools.h"
-
 #include "cls_fifo_legacy.h"
 
 namespace rgw::cls::fifo {
@@ -382,67 +378,6 @@ struct partinfo_completion : public lr::ObjectOperationCompletion {
   }
 };
 
-template<typename T>
-struct Completion {
-private:
-  lr::AioCompletion* _cur = nullptr;
-  lr::AioCompletion* _super;
-public:
-
-  using Ptr = std::unique_ptr<T>;
-
-  lr::AioCompletion* cur() const {
-    return _cur;
-  }
-  lr::AioCompletion* super() const {
-    return _super;
-  }
-
-  Completion(lr::AioCompletion* super) : _super(super) {
-    super->pc->get();
-  }
-
-  ~Completion() {
-    if (_super) {
-      _super->pc->put();
-    }
-    if (_cur)
-      _cur->release();
-    _super = nullptr;
-    _cur = nullptr;
-  }
-
-  // The only times that aio_operate can return an error are:
-  // 1. The completion contains a null pointer. This should just
-  //    crash, and in our case it does.
-  // 2. An attempt is made to write to a snapshot. RGW doesn't use
-  //    snapshots, so we don't care.
-  //
-  // So we will just assert that initiating an Aio operation succeeds
-  // and not worry about recovering.
-  static lr::AioCompletion* call(Ptr&& p) {
-    p->_cur = lr::Rados::aio_create_completion(static_cast<void*>(p.get()),
-                                              &cb);
-    auto c = p->_cur;
-    p.release();
-    return c;
-  }
-  static void complete(Ptr&& p, int r) {
-    auto c = p->_super;
-    p->_super = nullptr;
-    c->pc->put();
-    rgw_complete_aio_completion(c, r);
-  }
-
-  static void cb(lr::completion_t, void* arg) {
-    auto t = static_cast<T*>(arg);
-    auto r = t->_cur->get_return_value();
-    t->_cur->release();
-    t->_cur = nullptr;
-    t->handle(Ptr(t), r);
-  }
-};
-
 lr::ObjectReadOperation get_part_info(CephContext* cct,
                                      fifo::part_header* header,
                                      std::uint64_t tid, int* r = 0)
index b6b5f04bb30ad4318de09d41673f9c437c994aea..307abbb198918003bf7a3005e6f36cde82c531f6 100644 (file)
 #include "cls/fifo/cls_fifo_types.h"
 #include "cls/fifo/cls_fifo_ops.h"
 
+#include "librados/AioCompletionImpl.h"
+
+#include "rgw_tools.h"
+
 namespace rgw::cls::fifo {
 namespace cb = ceph::buffer;
 namespace fifo = rados::cls::fifo;
@@ -265,6 +269,67 @@ public:
                     lr::AioCompletion* c //< AIO Completion
     );
 };
+
+template<typename T>
+struct Completion {
+private:
+  lr::AioCompletion* _cur = nullptr;
+  lr::AioCompletion* _super;
+public:
+
+  using Ptr = std::unique_ptr<T>;
+
+  lr::AioCompletion* cur() const {
+    return _cur;
+  }
+  lr::AioCompletion* super() const {
+    return _super;
+  }
+
+  Completion(lr::AioCompletion* super) : _super(super) {
+    super->pc->get();
+  }
+
+  ~Completion() {
+    if (_super) {
+      _super->pc->put();
+    }
+    if (_cur)
+      _cur->release();
+    _super = nullptr;
+    _cur = nullptr;
+  }
+
+  // The only times that aio_operate can return an error are:
+  // 1. The completion contains a null pointer. This should just
+  //    crash, and in our case it does.
+  // 2. An attempt is made to write to a snapshot. RGW doesn't use
+  //    snapshots, so we don't care.
+  //
+  // So we will just assert that initiating an Aio operation succeeds
+  // and not worry about recovering.
+  static lr::AioCompletion* call(Ptr&& p) {
+    p->_cur = lr::Rados::aio_create_completion(static_cast<void*>(p.get()),
+                                              &cb);
+    auto c = p->_cur;
+    p.release();
+    return c;
+  }
+  static void complete(Ptr&& p, int r) {
+    auto c = p->_super;
+    p->_super = nullptr;
+    rgw_complete_aio_completion(c, r);
+  }
+
+  static void cb(lr::completion_t, void* arg) {
+    auto t = static_cast<T*>(arg);
+    auto r = t->_cur->get_return_value();
+    t->_cur->release();
+    t->_cur = nullptr;
+    t->handle(Ptr(t), r);
+  }
+};
+
 }
 
 #endif // CEPH_RGW_CLS_FIFO_LEGACY_H
index d2c985b29dbf186344df9ac72e7047116e3049e7..45c9d1ccaa18e461f4583e7ec8a792ad4087db6a 100644 (file)
@@ -383,10 +383,10 @@ int RGWDataChangesLog::start(const RGWZone* _zone,
   try {
     switch (*found) {
     case log_type::omap:
-      be = std::make_unique<RGWDataChangesOmap>(ioctx, *this, 0, num_shards);
+      bes.set_zero(new RGWDataChangesOmap(ioctx, *this, 0, num_shards));
       break;
     case log_type::fifo:
-      be = std::make_unique<RGWDataChangesFIFO>(ioctx, *this, 0, num_shards);
+      bes.set_zero(new RGWDataChangesFIFO(ioctx, *this, 0, num_shards));
       break;
     }
   } catch (bs::system_error& e) {
@@ -396,7 +396,6 @@ int RGWDataChangesLog::start(const RGWZone* _zone,
     return ceph::from_error_code(e.code());
   }
 
-  ceph_assert(be);
   renew_thread = make_named_thread("rgw_dt_lg_renew",
                                   &RGWDataChangesLog::renew_run, this);
   return 0;
@@ -426,6 +425,7 @@ int RGWDataChangesLog::renew_entries()
   l.unlock();
 
   auto ut = real_clock::now();
+  auto be = bes.head();
   for (const auto& bs : entries) {
     auto index = choose_oid(bs);
 
@@ -593,6 +593,7 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketI
 
     ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
 
+    auto be = bes.head();
     ret = be->push(index, now, change.key, std::move(bl));
 
     now = real_clock::now();
@@ -616,14 +617,44 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp, const RGWBucketI
   return ret;
 }
 
+int DataLogBackends::list(int shard, int max_entries,
+                         std::vector<rgw_data_change_log_entry>& entries,
+                         std::optional<std::string_view> marker,
+                         std::string* out_marker, bool* truncated)
+{
+  auto [gen_id, cursor] = cursorgeno(marker);
+  std::string out_cursor;
+  while (max_entries > 0) {
+    std::vector<rgw_data_change_log_entry> gentries;
+    std::unique_lock l(m);
+    auto i = lower_bound(gen_id);
+    if (i == end()) return 0;
+    auto be = i->second;
+    auto r = be->list(shard, max_entries, gentries, cursor,
+                     &out_cursor, truncated);
+    if (r < 0)
+      return r;
+
+    *out_marker = gencursor(gen_id, out_cursor);
+    for (auto& g : gentries) {
+      g.log_id = gencursor(gen_id, g.log_id);
+    }
+    max_entries -= gentries.size();
+    std::move(gentries.begin(), gentries.end(),
+             std::back_inserter(entries));
+    cursor = {};
+    ++gen_id;
+  }
+  return 0;
+}
+
 int RGWDataChangesLog::list_entries(int shard, int max_entries,
                                    std::vector<rgw_data_change_log_entry>& entries,
                                    std::optional<std::string_view> marker,
                                    std::string* out_marker, bool* truncated)
 {
   assert(shard < num_shards);
-  return be->list(shard, max_entries, entries, std::string(marker.value_or("")),
-                 out_marker, truncated);
+  return bes.list(shard, max_entries, entries, marker, out_marker, truncated);
 }
 
 int RGWDataChangesLog::list_entries(int max_entries,
@@ -654,20 +685,105 @@ int RGWDataChangesLog::list_entries(int max_entries,
 int RGWDataChangesLog::get_info(int shard_id, RGWDataChangesLogInfo *info)
 {
   assert(shard_id < num_shards);
+  auto be = bes.head();
   return be->get_info(shard_id, info);
 }
 
+int DataLogBackends::trim_entries(int shard_id, std::string_view marker)
+{
+  auto [target_gen, cursor] = cursorgen(marker);
+  std::unique_lock l(m);
+  const auto head_gen = (end() - 1)->second->gen_id;
+  const auto tail_gen = begin()->first;
+  if (target_gen < tail_gen) return 0;
+  auto r = 0;
+  for (auto i = lower_bound(0);
+       i != end() && i->first <= target_gen && i->first <= head_gen && r >= 0;
+       i = upper_bound(i->first)) {
+    auto be = i->second;
+    l.unlock();
+    auto c = be->gen_id == target_gen ? cursor : be->max_marker();
+    r = be->trim(shard_id, c);
+    l.lock();
+  };
+  return r;
+}
+
 int RGWDataChangesLog::trim_entries(int shard_id, std::string_view marker)
 {
   assert(shard_id < num_shards);
-  return be->trim(shard_id, marker);
+  return bes.trim_entries(shard_id, marker);
+}
+
+class GenTrim : public rgw::cls::fifo::Completion<GenTrim> {
+public:
+  DataLogBackends* const bes;
+  const int shard_id;
+  const uint64_t target_gen;
+  const std::string cursor;
+  const uint64_t head_gen;
+  const uint64_t tail_gen;
+  boost::intrusive_ptr<RGWDataChangesBE> be;
+
+  GenTrim(DataLogBackends* bes, int shard_id, uint64_t target_gen, std::string cursor,
+         uint64_t head_gen, uint64_t tail_gen,
+         boost::intrusive_ptr<RGWDataChangesBE>&& be,
+         lr::AioCompletion* super)
+    : Completion(super), bes(bes), shard_id(shard_id), target_gen(target_gen),
+      cursor(std::move(cursor)), head_gen(head_gen), tail_gen(tail_gen),
+      be(std::move(be)) {}
+
+  void handle(Ptr&& p, int r) {
+    auto gen_id = be->gen_id;
+    be.reset();
+    if (r < 0) {
+      complete(std::move(p), r);
+      return;
+    }
+
+    {
+      std::unique_lock l(bes->m);
+      auto i = bes->upper_bound(gen_id);
+      if (i == bes->end() || i->first > target_gen || i->first > head_gen) {
+       l.unlock();
+       complete(std::move(p), r);
+       return;
+      }
+      be = i->second;
+    }
+    auto c = be->gen_id == target_gen ? cursor : be->max_marker();
+    r = be->trim(shard_id, c, call(std::move(p)));
+  }
+};
+
+void DataLogBackends::trim_entries(int shard_id, std::string_view marker,
+                                  librados::AioCompletion* c)
+{
+  auto [target_gen, cursor] = cursorgen(marker);
+  std::unique_lock l(m);
+  const auto head_gen = (end() - 1)->second->gen_id;
+  const auto tail_gen = begin()->first;
+  if (target_gen < tail_gen) {
+    l.unlock();
+    rgw_complete_aio_completion(c, 0);
+    return;
+  }
+  auto be = lower_bound(0)->second;
+  l.unlock();
+  auto p = be.get();
+  auto gt = std::make_unique<GenTrim>(this, shard_id, target_gen,
+                                     std::string(cursor), head_gen, tail_gen,
+                                     std::move(be), c);
+
+  p->trim(shard_id, cursor,  GenTrim::call(std::move(gt)));
 }
 
 int RGWDataChangesLog::trim_entries(int shard_id, std::string_view marker,
                                    librados::AioCompletion* c)
 {
   assert(shard_id < num_shards);
-  return be->trim(shard_id, marker, c);
+  bes.trim_entries(shard_id, marker, c);
+  return 0;
 }
 
 bool RGWDataChangesLog::going_down() const
@@ -721,6 +837,7 @@ void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs)
   modified_shards[shard_id].insert(key);
 }
 
-std::string_view RGWDataChangesLog::max_marker() const {
-  return be->max_marker();
+std::string RGWDataChangesLog::max_marker() const {
+  return gencursor(std::numeric_limits<uint64_t>::max(),
+                  "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
 }
index c84449e45bc787e33d2ee34f2e8784bccc8fde08..c207b30aa1e591186cc54a738b8e8816548092e2 100644 (file)
@@ -13,6 +13,8 @@
 #include <vector>
 
 #include <boost/container/flat_map.hpp>
+#include <boost/smart_ptr/intrusive_ptr.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
 
 #undef FMT_HEADER_ONLY
 #define FMT_HEADER_ONLY 1
@@ -119,12 +121,37 @@ class RGWDataChangesLog;
 
 class RGWDataChangesBE;
 
+class DataLogBackends
+  : private bc::flat_map<uint64_t, boost::intrusive_ptr<RGWDataChangesBE>> {
+  friend class GenTrim;
+
+  std::mutex m;
+public:
+
+  boost::intrusive_ptr<RGWDataChangesBE> head() {
+    std::unique_lock l(m);
+    auto i = end();
+    --i;
+    return i->second;
+  }
+  int list(int shard, int max_entries,
+          std::vector<rgw_data_change_log_entry>& entries,
+          std::optional<std::string_view> marker,
+          std::string* out_marker, bool* truncated);
+  int trim_entries(int shard_id, std::string_view marker);
+  void trim_entries(int shard_id, std::string_view marker,
+                   librados::AioCompletion* c);
+  void set_zero(RGWDataChangesBE* be) {
+    emplace(0, be);
+  }
+};
+
 class RGWDataChangesLog {
   CephContext *cct;
   librados::IoCtx ioctx;
   rgw::BucketChangeObserver *observer = nullptr;
   const RGWZone* zone;
-  std::unique_ptr<RGWDataChangesBE> be;
+  DataLogBackends bes;
 
   const int num_shards;
   std::string get_prefix() {
@@ -213,16 +240,15 @@ public:
     bucket_filter = std::move(f);
   }
   // a marker that compares greater than any other
-  std::string_view max_marker() const;
+  std::string max_marker() const;
   std::string get_oid(uint64_t gen_id, int shard_id) const;
 };
 
-class RGWDataChangesBE {
+class RGWDataChangesBE : public boost::intrusive_ref_counter<RGWDataChangesBE> {
 protected:
   librados::IoCtx& ioctx;
   CephContext* const cct;
   RGWDataChangesLog& datalog;
-  uint64_t gen_id;
 
   std::string get_oid(int shard_id) {
     return datalog.get_oid(gen_id, shard_id);
@@ -231,6 +257,8 @@ public:
   using entries = std::variant<std::list<cls_log_entry>,
                               std::vector<ceph::buffer::list>>;
 
+  const uint64_t gen_id;
+
   RGWDataChangesBE(librados::IoCtx& ioctx,
                   RGWDataChangesLog& datalog,
                   uint64_t gen_id)
index 242bf0e1c00a4397f74f8babd4b71b99dc795fd4..55a3139d11e2b468f9763eaf051a268f8197817f 100644 (file)
@@ -244,4 +244,13 @@ cursorgen(std::string_view cursor_) {
   return { *gen_id, cursor };
 }
 
+inline std::pair<uint64_t, std::string_view>
+cursorgeno(std::optional<std::string_view> cursor) {
+  if (cursor) {
+    return cursorgen(*cursor);
+  } else {
+    return { 0, ""s };
+  }
+}
+
 #endif