]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: Redimension bucket sync cache to include optional generation
authorAdam C. Emerson <aemerson@redhat.com>
Tue, 22 Feb 2022 19:50:33 +0000 (14:50 -0500)
committerCasey Bodley <cbodley@redhat.com>
Fri, 27 May 2022 19:47:34 +0000 (15:47 -0400)
The alternative would be to compare generations and throw out older
generation/no generation if we have a (newer) one.

But if we have the potential for older generations and blank
generations coming up on error retry, then we have to keep them
around.

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/rgw/rgw_bucket_sync_cache.h
src/rgw/rgw_data_sync.cc

index de81d26c9bc31fc446a8429fc70e64371a85d38d..064fdce48106e2b0d2eb39fd6dcdfffc0f76f700 100644 (file)
@@ -23,7 +23,7 @@ namespace rgw::bucket_sync {
 // per bucket-shard state cached by DataSyncShardCR
 struct State {
   // the source bucket shard to sync
-  rgw_bucket_shard key;
+  std::pair<rgw_bucket_shard, std::optional<uint64_t>> key;
   // current sync obligation being processed by DataSyncSingleEntry
   std::optional<rgw_data_sync_obligation> obligation;
   // incremented with each new obligation
@@ -31,7 +31,10 @@ struct State {
   // highest timestamp applied by all sources
   ceph::real_time progress_timestamp;
 
-  State(const rgw_bucket_shard& key) noexcept : key(key) {}
+  State(const std::pair<rgw_bucket_shard, std::optional<uint64_t>>& key ) noexcept
+    : key(key) {}
+  State(const rgw_bucket_shard& shard, std::optional<uint64_t> gen) noexcept
+    : key(shard, gen) {}
 };
 
 struct Entry;
@@ -39,7 +42,7 @@ struct EntryToKey;
 class Handle;
 
 using lru_config = ceph::common::intrusive_lru_config<
-    rgw_bucket_shard, Entry, EntryToKey>;
+  std::pair<rgw_bucket_shard, std::optional<uint64_t>>, Entry, EntryToKey>;
 
 // a recyclable cache entry
 struct Entry : State, ceph::common::intrusive_lru_base<lru_config> {
@@ -47,7 +50,7 @@ struct Entry : State, ceph::common::intrusive_lru_base<lru_config> {
 };
 
 struct EntryToKey {
-  using type = rgw_bucket_shard;
+  using type = std::pair<rgw_bucket_shard, std::optional<uint64_t>>;
   const type& operator()(const Entry& e) { return e.key; }
 };
 
@@ -71,7 +74,7 @@ class Cache : public thread_unsafe_ref_counter<Cache> {
 
   // find or create a cache entry for the given key, and return a Handle that
   // keeps it lru-pinned until destruction
-  Handle get(const rgw_bucket_shard& key);
+  Handle get(const rgw_bucket_shard& shard, std::optional<uint64_t> gen);
 };
 
 // a State handle that keeps the Cache referenced
@@ -104,9 +107,9 @@ class Handle {
   State* operator->() const noexcept { return entry.get(); }
 };
 
-inline Handle Cache::get(const rgw_bucket_shard& key)
+inline Handle Cache::get(const rgw_bucket_shard& shard, std::optional<uint64_t> gen)
 {
-  auto result = cache.get_or_create(key);
+  auto result = cache.get_or_create({ shard, gen });
   return {this, std::move(result.first)};
 }
 
index d625e1a0c1f65ebffec2ab75f8b42015559fbc8c..7ca5753dfcf0c22f220b308c04afb79331815a35 100644 (file)
@@ -1340,10 +1340,10 @@ public:
           obligation_counter = state->counter;
           progress = ceph::real_time{};
 
-          ldout(cct, 4) << "starting sync on " << bucket_shard_str{state->key}
+          ldout(cct, 4) << "starting sync on " << bucket_shard_str{state->key.first}
               << ' ' << *state->obligation << dendl;
           yield call(new RGWRunBucketSourcesSyncCR(sc, lease_cr,
-                                                   state->key, tn,
+                                                   state->key.first, tn,
                                                    state->obligation->gen,
                                                   &progress));
           if (retcode < 0) {
@@ -1355,7 +1355,7 @@ public:
         complete = std::move(*state->obligation);
         state->obligation.reset();
 
-        tn->log(10, SSTR("sync finished on " << bucket_shard_str{state->key}
+        tn->log(10, SSTR("sync finished on " << bucket_shard_str{state->key.first}
                          << " progress=" << progress << ' ' << complete << " r=" << retcode));
       }
       sync_status = retcode;
@@ -1481,7 +1481,7 @@ class RGWDataSyncShardCR : public RGWCoroutine {
                                   std::optional<uint64_t> gen,
                                   const std::string& marker,
                                   ceph::real_time timestamp, bool retry) {
-    auto state = bucket_shard_cache->get(src);
+    auto state = bucket_shard_cache->get(src, gen);
     auto obligation = rgw_data_sync_obligation{src, gen, marker, timestamp, retry};
     return new RGWDataSyncSingleEntryCR(sc, std::move(state), std::move(obligation),
                                         &*marker_tracker, error_repo,