]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: Add generation to ChangeStatus
authorAdam C. Emerson <aemerson@redhat.com>
Tue, 8 Feb 2022 18:11:44 +0000 (13:11 -0500)
committerCasey Bodley <cbodley@redhat.com>
Tue, 8 Feb 2022 21:31:32 +0000 (16:31 -0500)
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/rgw/rgw_datalog.cc
src/rgw/rgw_datalog.h

index 246332e195958dca2db1edb3b3665c34abed89da..1212a9398c51c3b09a830ef2a7412baf345cf8f7 100644 (file)
@@ -514,7 +514,7 @@ int RGWDataChangesLog::renew_entries(const DoutPrefixProvider *dpp)
 
   /* we can't keep the bucket name as part of the cls_log_entry, and we need
    * it later, so we keep two lists under the map */
-  bc::flat_map<int, std::pair<std::vector<rgw_bucket_shard>,
+  bc::flat_map<int, std::pair<std::vector<BucketGen>,
                              RGWDataChangesBE::entries>> m;
 
   std::unique_lock l(lock);
@@ -535,7 +535,7 @@ int RGWDataChangesLog::renew_entries(const DoutPrefixProvider *dpp)
     change.gen = gen;
     encode(change, bl);
 
-    m[index].first.push_back(bs);
+    m[index].first.push_back({bs, gen});
     be->prepare(ut, change.key, std::move(bl), m[index].second);
   }
 
@@ -554,22 +554,25 @@ int RGWDataChangesLog::renew_entries(const DoutPrefixProvider *dpp)
 
     auto expiration = now;
     expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
-    for (auto& bs : buckets) {
-      update_renewed(bs, expiration);
+    for (auto& [bs, gen] : buckets) {
+      update_renewed(bs, gen, expiration);
     }
   }
 
   return 0;
 }
 
-void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs,
-                                   ChangeStatusPtr& status)
+auto RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs,
+                                   uint64_t gen)
+  -> ChangeStatusPtr
 {
   ceph_assert(ceph_mutex_is_locked(lock));
-  if (!changes.find(bs, status)) {
-    status = ChangeStatusPtr(new ChangeStatus);
-    changes.add(bs, status);
+  ChangeStatusPtr status;
+  if (!changes.find({bs, gen}, status)) {
+    status = std::make_shared<ChangeStatus>();
+    changes.add({bs, gen}, status);
   }
+  return status;
 }
 
 void RGWDataChangesLog::register_renew(const rgw_bucket_shard& bs,
@@ -580,11 +583,11 @@ void RGWDataChangesLog::register_renew(const rgw_bucket_shard& bs,
 }
 
 void RGWDataChangesLog::update_renewed(const rgw_bucket_shard& bs,
+                                      uint64_t gen,
                                       real_time expiration)
 {
   std::scoped_lock l{lock};
-  ChangeStatusPtr status;
-  _get_change(bs, status);
+  auto status = _get_change(bs, gen);
 
   ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name="
                 << bs.bucket.name << " shard_id=" << bs.shard_id
@@ -637,8 +640,7 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp,
 
   std::unique_lock l(lock);
 
-  ChangeStatusPtr status;
-  _get_change(bs, status);
+  auto status = _get_change(bs, gen.gen);
   l.unlock();
 
   auto now = real_clock::now();
@@ -646,8 +648,8 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp,
   std::unique_lock sl(status->lock);
 
   ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name
-                << " shard_id=" << shard_id << " now=" << now
-                << " cur_expiration=" << status->cur_expiration << dendl;
+                    << " shard_id=" << shard_id << " now=" << now
+                    << " cur_expiration=" << status->cur_expiration << dendl;
 
   if (now < status->cur_expiration) {
     /* no need to send, recently completed */
index 423316bee0d34181778e567efe0ae2d85bfd2252..d54f76a3249a22055c072d862fadc97b76b903c9 100644 (file)
@@ -198,6 +198,38 @@ public:
   int trim_generations(const DoutPrefixProvider *dpp, std::optional<uint64_t>& through);
 };
 
+struct BucketGen {
+  rgw_bucket_shard shard;
+  uint64_t gen;
+
+  BucketGen(const rgw_bucket_shard& shard, uint64_t gen)
+    : shard(shard), gen(gen) {}
+
+  BucketGen(rgw_bucket_shard&& shard, uint64_t gen)
+    : shard(std::move(shard)), gen(gen) {}
+
+  BucketGen(const BucketGen&) = default;
+  BucketGen(BucketGen&&) = default;
+  BucketGen& operator =(const BucketGen&) = default;
+  BucketGen& operator =(BucketGen&&) = default;
+
+  ~BucketGen() = default;
+};
+
+inline bool operator ==(const BucketGen& l, const BucketGen& r) {
+  return (l.shard == r.shard) && (l.gen == r.gen);
+}
+
+inline bool operator <(const BucketGen& l, const BucketGen& r) {
+  if (l.shard < r.shard) {
+    return true;
+  } else if (l.shard == r.shard) {
+    return l.gen < r.gen;
+  } else {
+    return false;
+  }
+}
+
 class RGWDataChangesLog {
   friend DataLogBackends;
   CephContext *cct;
@@ -234,14 +266,16 @@ class RGWDataChangesLog {
 
   using ChangeStatusPtr = std::shared_ptr<ChangeStatus>;
 
-  lru_map<rgw_bucket_shard, ChangeStatusPtr> changes;
+  lru_map<BucketGen, ChangeStatusPtr> changes;
 
-  bc::flat_set<std::pair<rgw_bucket_shard, uint64_t>> cur_cycle;
+  bc::flat_set<BucketGen> cur_cycle;
 
-  void _get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status);
+  ChangeStatusPtr _get_change(const rgw_bucket_shard& bs, uint64_t gen);
   void register_renew(const rgw_bucket_shard& bs,
                      const rgw::bucket_log_layout_generation& gen);
-  void update_renewed(const rgw_bucket_shard& bs, ceph::real_time expiration);
+  void update_renewed(const rgw_bucket_shard& bs,
+                     uint64_t gen,
+                     ceph::real_time expiration);
 
   ceph::mutex renew_lock = ceph::make_mutex("ChangesRenewThread::lock");
   ceph::condition_variable renew_cond;