]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: DataSyncSingleEntry takes cached state
authorCasey Bodley <cbodley@redhat.com>
Tue, 31 Mar 2020 13:23:12 +0000 (09:23 -0400)
committerCasey Bodley <cbodley@redhat.com>
Mon, 13 Apr 2020 18:08:18 +0000 (14:08 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_data_sync.cc

index 4b485599dcc92c8012f224cbd2e8aaf08fd38315..dd7e3b99a5d25bf9c4228fc07e485b25cfbe65ac 100644 (file)
@@ -22,6 +22,7 @@
 #include "rgw_http_client.h"
 #include "rgw_bucket.h"
 #include "rgw_bucket_sync.h"
+#include "rgw_bucket_sync_cache.h"
 #include "rgw_metadata.h"
 #include "rgw_sync_counters.h"
 #include "rgw_sync_error_repo.h"
@@ -1288,7 +1289,7 @@ public:
 class RGWDataSyncSingleEntryCR : public RGWCoroutine {
   RGWDataSyncCtx *sc;
   RGWDataSyncEnv *sync_env;
-  rgw_bucket_shard source_bs;
+  rgw::bucket_sync::Handle state; // cached bucket-shard state
   rgw_data_sync_obligation obligation;
   RGWDataSyncShardMarkerTrack *marker_tracker;
   boost::intrusive_ptr<RGWOmapAppend> error_repo;
@@ -1297,14 +1298,13 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine {
   ceph::real_time progress;
   int sync_status = 0;
 public:
-  RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& source_bs,
-                          rgw_data_sync_obligation obligation, RGWDataSyncShardMarkerTrack *_marker_tracker,
+  RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc, rgw::bucket_sync::Handle state,
+                           rgw_data_sync_obligation obligation,
+                           RGWDataSyncShardMarkerTrack *_marker_tracker,
                            RGWOmapAppend *_error_repo, const RGWSyncTraceNodeRef& _tn_parent)
-    : RGWCoroutine(_sc->cct),
-      sc(_sc), sync_env(_sc->env), source_bs(source_bs),
-      obligation(std::move(obligation)),
-      marker_tracker(_marker_tracker),
-      error_repo(_error_repo) {
+    : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
+      state(std::move(state)), obligation(std::move(obligation)),
+      marker_tracker(_marker_tracker), error_repo(_error_repo) {
     set_description() << "data sync single entry (source_zone=" << sc->source_zone << ") " << obligation;
     tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", obligation.key);
   }
@@ -1315,11 +1315,11 @@ public:
         if (marker_tracker) {
           marker_tracker->reset_need_retry(obligation.key);
         }
-        tn->log(0, SSTR("triggering sync of source bucket/shard " << bucket_shard_str{source_bs}));
+        tn->log(0, SSTR("triggering sync of source bucket/shard " << bucket_shard_str{state->key}));
 
         yield call(new RGWRunBucketSourcesSyncCR(sc,
                                                  std::nullopt, /* target_bs */
-                                                 source_bs,
+                                                 state->key,
                                                  tn, &progress));
         if (retcode == 0) {
           tn->log(20, SSTR("RunBucketSources progress=" << progress));
@@ -1400,7 +1400,7 @@ class RGWDataSyncShardCR : public RGWCoroutine {
   std::string next_marker;
   list<rgw_data_change_log_entry> log_entries;
   list<rgw_data_change_log_entry>::iterator log_iter;
-  bool truncated;
+  bool truncated = false;
 
   ceph::mutex inc_lock = ceph::make_mutex("RGWDataSyncShardCR::inc_lock");
   ceph::condition_variable inc_cond;
@@ -1414,11 +1414,9 @@ class RGWDataSyncShardCR : public RGWCoroutine {
 
   set<string>::iterator modified_iter;
 
-  int total_entries;
-
-  int spawn_window;
-
-  bool *reset_backoff;
+  uint64_t total_entries = 0;
+  static constexpr int spawn_window = BUCKET_SHARD_SYNC_SPAWN_WINDOW;
+  bool *reset_backoff = nullptr;
 
   boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
   boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
@@ -1426,23 +1424,27 @@ class RGWDataSyncShardCR : public RGWCoroutine {
 
 
   string error_oid;
-  RGWOmapAppend *error_repo;
+  RGWOmapAppend *error_repo = nullptr;
   std::map<std::string, bufferlist> error_entries;
   string error_marker;
   ceph::real_time entry_timestamp;
-  int max_error_entries;
+  static constexpr int max_error_entries = DATA_SYNC_MAX_ERR_ENTRIES;
 
   ceph::coarse_real_time error_retry_time;
 
 #define RETRY_BACKOFF_SECS_MIN 60
 #define RETRY_BACKOFF_SECS_DEFAULT 60
 #define RETRY_BACKOFF_SECS_MAX 600
-  uint32_t retry_backoff_secs;
+  uint32_t retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
 
   RGWSyncTraceNodeRef tn;
 
   rgw_bucket_shard source_bs;
 
+  // target number of entries to cache before recycling idle ones
+  static constexpr size_t target_cache_size = 256;
+  boost::intrusive_ptr<rgw::bucket_sync::Cache> bucket_shard_cache;
+
   int parse_bucket_key(const std::string& key, rgw_bucket_shard& bs) const {
     return rgw_bucket_parse_bucket_key(sync_env->cct, key,
                                        &bs.bucket, &bs.shard_id);
@@ -1451,24 +1453,19 @@ class RGWDataSyncShardCR : public RGWCoroutine {
                                   const std::string& key,
                                   const std::string& marker,
                                   ceph::real_time timestamp, bool retry) {
+    auto state = bucket_shard_cache->get(src);
     auto obligation = rgw_data_sync_obligation{key, marker, timestamp, retry};
-    return new RGWDataSyncSingleEntryCR(sc, src, std::move(obligation),
+    return new RGWDataSyncSingleEntryCR(sc, std::move(state), std::move(obligation),
                                         &*marker_tracker, error_repo, tn);
   }
 public:
-  RGWDataSyncShardCR(RGWDataSyncCtx *_sc,
-                     rgw_pool& _pool,
+  RGWDataSyncShardCR(RGWDataSyncCtx *_sc, rgw_pool& _pool,
                      uint32_t _shard_id, rgw_data_sync_marker& _marker,
-                     RGWSyncTraceNodeRef& _tn,
-                     bool *_reset_backoff) : RGWCoroutine(_sc->cct),
-                                                      sc(_sc), sync_env(_sc->env),
-                                                     pool(_pool),
-                                                     shard_id(_shard_id),
-                                                     sync_marker(_marker),
-                                                      truncated(false),
-                                                      total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
-                                                      lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
-                                                      retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT), tn(_tn) {
+                     RGWSyncTraceNodeRef& _tn, bool *_reset_backoff)
+    : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
+      pool(_pool), shard_id(_shard_id), sync_marker(_marker), tn(_tn),
+      bucket_shard_cache(rgw::bucket_sync::Cache::create(target_cache_size))
+  {
     set_description() << "data sync shard source_zone=" << sc->source_zone << " shard_id=" << shard_id;
     status_oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id);
     error_oid = status_oid + ".retry";