]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: Actually pull logbacking_generations into datalog
authorAdam C. Emerson <aemerson@redhat.com>
Wed, 27 Jan 2021 01:07:45 +0000 (20:07 -0500)
committerAdam C. Emerson <aemerson@redhat.com>
Mon, 5 Apr 2021 17:45:20 +0000 (13:45 -0400)
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
(cherry picked from commit eb0f8ffcc785146a1fb249f4531620787be216ba)
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/rgw/rgw_datalog.cc
src/rgw/rgw_datalog.h
src/rgw/rgw_log_backing.h

index 2b04d530d1c6f0de0328b1e3cc5cefa27e38ec39..c64b22d518a9ffca525344ff59529a4893202041 100644 (file)
@@ -178,8 +178,8 @@ public:
     lr::ObjectWriteOperation op;
     cls_log_trim(op, {}, {}, {}, std::string(marker));
     auto r = rgw_rados_operate(ioctx, oids[index], &op, null_yield);
-    if (r == -ENOENT) r = 0;
-    if (r < 0) {
+    if (r == -ENOENT) r = -ENODATA;
+    if (r < 0 && r != -ENODATA) {
       lderr(cct) << __PRETTY_FUNCTION__
                 << ": failed to get info from " << oids[index]
                 << cpp_strerror(-r) << dendl;
@@ -191,7 +191,7 @@ public:
     lr::ObjectWriteOperation op;
     cls_log_trim(op, {}, {}, {}, std::string(marker));
     auto r = ioctx.aio_operate(oids[index], c, &op, 0);
-    if (r == -ENOENT) r = 0;
+    if (r == -ENOENT) r = -ENODATA;
     if (r < 0) {
       lderr(cct) << __PRETTY_FUNCTION__
                 << ": failed to get info from " << oids[index]
@@ -333,7 +333,7 @@ public:
           librados::AioCompletion* c) override {
     int r = 0;
     if (marker == rgw::cls::fifo::marker(0, 0).to_string()) {
-      rgw_complete_aio_completion(c, 0);
+      rgw_complete_aio_completion(c, -ENODATA);
     } else {
       fifos[index]->trim(marker, false, c);
     }
@@ -352,6 +352,65 @@ RGWDataChangesLog::RGWDataChangesLog(CephContext* cct)
     prefix(get_prefix()),
     changes(cct->_conf->rgw_data_log_changes_size) {}
 
+bs::error_code DataLogBackends::handle_init(entries_t e) noexcept {
+  std::unique_lock l(m);
+
+  for (const auto& [gen_id, gen] : e) {
+    if (gen.empty) {
+      lderr(datalog.cct)
+       << __PRETTY_FUNCTION__ << ":" << __LINE__
+       << ": ERROR: given empty generation: gen_id=" << gen_id << dendl;
+    }
+    if (count(gen_id) != 0) {
+      lderr(datalog.cct)
+       << __PRETTY_FUNCTION__ << ":" << __LINE__
+       << ": ERROR: generation already exists: gen_id=" << gen_id << dendl;
+    }
+    try {
+      switch (gen.type) {
+      case log_type::omap:
+       emplace(gen_id, new RGWDataChangesOmap(ioctx, datalog, gen_id, shards));
+       break;
+      case log_type::fifo:
+       emplace(gen_id, new RGWDataChangesFIFO(ioctx, datalog, gen_id, shards));
+       break;
+      default:
+       lderr(datalog.cct)
+         << __PRETTY_FUNCTION__ << ":" << __LINE__
+         << ": IMPOSSIBLE: invalid log type: gen_id=" << gen_id
+         << ", type" << gen.type << dendl;
+       return bs::error_code(EFAULT, bs::system_category());
+      }
+    } catch (const bs::system_error& err) {
+      lderr(datalog.cct)
+         << __PRETTY_FUNCTION__ << ":" << __LINE__
+         << ": error setting up backend: gen_id=" << gen_id
+         << ", err=" << err.what() << dendl;
+      return err.code();
+    }
+  }
+  return {};
+}
+bs::error_code DataLogBackends::handle_new_gens(entries_t e) noexcept {
+  return handle_init(std::move(e));
+}
+bs::error_code DataLogBackends::handle_empty_to(uint64_t new_tail) noexcept {
+  std::unique_lock l(m);
+  auto i = cbegin();
+  if (i->first < new_tail) {
+    return {};
+  }
+  if (new_tail >= (cend() - 1)->first) {
+    lderr(datalog.cct)
+      << __PRETTY_FUNCTION__ << ":" << __LINE__
+      << ": ERROR: attempt to trim head: new_tail=" << new_tail << dendl;
+    return bs::error_code(EFAULT, bs::system_category());
+  }
+  erase(i, upper_bound(new_tail));
+  return {};
+}
+
+
 int RGWDataChangesLog::start(const RGWZone* _zone,
                             const RGWZoneParams& zoneparams,
                             librados::Rados* lr)
@@ -371,31 +430,21 @@ int RGWDataChangesLog::start(const RGWZone* _zone,
     return -r;
   }
 
-  auto found = log_backing_type(ioctx, *defbacking, num_shards,
-                               [this](int i) { return get_oid(0, i); },
-                               null_yield);
+  auto besr = logback_generations::init<DataLogBackends>(
+    ioctx, metadata_log_oid(), [this](uint64_t gen_id, int shard) {
+      return get_oid(gen_id, shard);
+    },
+    num_shards, *defbacking, null_yield, *this);
 
-  if (!found) {
-    lderr(cct) << __PRETTY_FUNCTION__
-              << ": Error when checking log type: "
-              << found.error().message() << dendl;
-  }
-  try {
-    switch (*found) {
-    case log_type::omap:
-      bes.set_zero(new RGWDataChangesOmap(ioctx, *this, 0, num_shards));
-      break;
-    case log_type::fifo:
-      bes.set_zero(new RGWDataChangesFIFO(ioctx, *this, 0, num_shards));
-      break;
-    }
-  } catch (bs::system_error& e) {
+
+  if (!besr) {
     lderr(cct) << __PRETTY_FUNCTION__
-              << ": Error when starting backend: "
-              << e.what() << dendl;
-    return ceph::from_error_code(e.code());
+              << ": Error initializing backends: "
+              << besr.error().message() << dendl;
+    return ceph::from_error_code(besr.error());
   }
 
+  bes = std::move(*besr);
   renew_thread = make_named_thread("rgw_dt_lg_renew",
                                   &RGWDataChangesLog::renew_run, this);
   return 0;
@@ -425,7 +474,7 @@ int RGWDataChangesLog::renew_entries()
   l.unlock();
 
   auto ut = real_clock::now();
-  auto be = bes.head();
+  auto be = bes->head();
   for (const auto& bs : entries) {
     auto index = choose_oid(bs);
 
@@ -592,7 +641,7 @@ int RGWDataChangesLog::add_entry(const RGWBucketInfo& bucket_info, int shard_id)
 
     ldout(cct, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
 
-    auto be = bes.head();
+    auto be = bes->head();
     ret = be->push(index, now, change.key, std::move(bl));
 
     now = real_clock::now();
@@ -634,7 +683,9 @@ int DataLogBackends::list(int shard, int max_entries,
     if (r < 0)
       return r;
 
-    *out_marker = gencursor(gen_id, out_cursor);
+    if (out_marker && !out_cursor.empty()) {
+      *out_marker = gencursor(gen_id, out_cursor);
+    }
     for (auto& g : gentries) {
       g.log_id = gencursor(gen_id, g.log_id);
     }
@@ -653,7 +704,7 @@ int RGWDataChangesLog::list_entries(int shard, int max_entries,
                                    std::string* out_marker, bool* truncated)
 {
   assert(shard < num_shards);
-  return bes.list(shard, max_entries, entries, marker, out_marker, truncated);
+  return bes->list(shard, max_entries, entries, marker, out_marker, truncated);
 }
 
 int RGWDataChangesLog::list_entries(int max_entries,
@@ -684,8 +735,12 @@ int RGWDataChangesLog::list_entries(int max_entries,
 int RGWDataChangesLog::get_info(int shard_id, RGWDataChangesLogInfo *info)
 {
   assert(shard_id < num_shards);
-  auto be = bes.head();
-  return be->get_info(shard_id, info);
+  auto be = bes->head();
+  auto r = be->get_info(shard_id, info);
+  if (!info->marker.empty()) {
+    info->marker = gencursor(be->gen_id, info->marker);
+  }
+  return r;
 }
 
 int DataLogBackends::trim_entries(int shard_id, std::string_view marker)
@@ -696,13 +751,13 @@ int DataLogBackends::trim_entries(int shard_id, std::string_view marker)
   const auto tail_gen = begin()->first;
   if (target_gen < tail_gen) return 0;
   auto r = 0;
-  for (auto i = lower_bound(0);
-       i != end() && i->first <= target_gen && i->first <= head_gen && r >= 0;
-       i = upper_bound(i->first)) {
-    auto be = i->second;
+  for (auto be = lower_bound(0)->second;
+       be->gen_id <= target_gen && be->gen_id <= head_gen && r >= 0;
+       be = upper_bound(be->gen_id)->second) {
     l.unlock();
     auto c = be->gen_id == target_gen ? cursor : be->max_marker();
     r = be->trim(shard_id, c);
+    if (r == -ENODATA && be->gen_id < target_gen) r = 0;
     l.lock();
   };
   return r;
@@ -711,7 +766,7 @@ int DataLogBackends::trim_entries(int shard_id, std::string_view marker)
 int RGWDataChangesLog::trim_entries(int shard_id, std::string_view marker)
 {
   assert(shard_id < num_shards);
-  return bes.trim_entries(shard_id, marker);
+  return bes->trim_entries(shard_id, marker);
 }
 
 class GenTrim : public rgw::cls::fifo::Completion<GenTrim> {
@@ -735,6 +790,8 @@ public:
   void handle(Ptr&& p, int r) {
     auto gen_id = be->gen_id;
     be.reset();
+    if (r == -ENOENT) r = -ENODATA;
+    if (r == -ENODATA && gen_id < target_gen) r = 0;
     if (r < 0) {
       complete(std::move(p), r);
       return;
@@ -781,7 +838,7 @@ int RGWDataChangesLog::trim_entries(int shard_id, std::string_view marker,
                                    librados::AioCompletion* c)
 {
   assert(shard_id < num_shards);
-  bes.trim_entries(shard_id, marker, c);
+  bes->trim_entries(shard_id, marker, c);
   return 0;
 }
 
index 0915bebde11cf31333164bb8aa263edd2512a6b8..e9a768d546c00e12c508ef6d6514801e2e03c68e 100644 (file)
@@ -36,6 +36,7 @@
 #include "cls/log/cls_log_types.h"
 
 #include "rgw_basic_types.h"
+#include "rgw_log_backing.h"
 #include "rgw_sync_policy.h"
 #include "rgw_zone.h"
 #include "rgw_trim_bilog.h"
@@ -121,11 +122,22 @@ class RGWDataChangesLog;
 
 class RGWDataChangesBE;
 
-class DataLogBackends
-  : private bc::flat_map<uint64_t, boost::intrusive_ptr<RGWDataChangesBE>> {
+class DataLogBackends final
+  : public logback_generations,
+    private bc::flat_map<uint64_t, boost::intrusive_ptr<RGWDataChangesBE>> {
+  friend class logback_generations;
   friend class GenTrim;
 
   std::mutex m;
+  RGWDataChangesLog& datalog;
+
+  DataLogBackends(librados::IoCtx& ioctx,
+                 std::string oid,
+                 fu2::unique_function<std::string(
+                   uint64_t, int) const>&& get_oid,
+                 int shards, RGWDataChangesLog& datalog) noexcept
+    : logback_generations(ioctx, oid, std::move(get_oid),
+                         shards), datalog(datalog) {}
 public:
 
   boost::intrusive_ptr<RGWDataChangesBE> head() {
@@ -144,20 +156,28 @@ public:
   void set_zero(RGWDataChangesBE* be) {
     emplace(0, be);
   }
+
+  bs::error_code handle_init(entries_t e) noexcept override;
+  bs::error_code handle_new_gens(entries_t e) noexcept override;
+  bs::error_code handle_empty_to(uint64_t new_tail) noexcept override;
 };
 
 class RGWDataChangesLog {
+  friend DataLogBackends;
   CephContext *cct;
   librados::IoCtx ioctx;
   rgw::BucketChangeObserver *observer = nullptr;
   const RGWZone* zone;
-  DataLogBackends bes;
+  std::unique_ptr<DataLogBackends> bes;
 
   const int num_shards;
   std::string get_prefix() {
     auto prefix = cct->_conf->rgw_data_log_obj_prefix;
     return prefix.empty() ? prefix : "data_log"s;
   }
+  std::string metadata_log_oid() {
+    return get_prefix() + "generations_metadata"s;
+  }
   std::string prefix;
 
   ceph::mutex lock = ceph::make_mutex("RGWDataChangesLog::lock");
index 55a3139d11e2b468f9763eaf051a268f8197817f..ef2583c35b2046df2be3e0fe0c6e469f6afdb074 100644 (file)
@@ -135,6 +135,8 @@ private:
 protected:
   const int shards;
 
+private:
+
   uint64_t watchcookie = 0;
 
   obj_version version;