]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: Clean up RGWDataSyncShardCR
authorAdam C. Emerson <aemerson@redhat.com>
Mon, 8 Aug 2022 19:03:03 +0000 (15:03 -0400)
committerCasey Bodley <cbodley@redhat.com>
Tue, 9 Aug 2022 19:29:21 +0000 (15:29 -0400)
Remove no-longer-used functions and data members they depended on.

Fixes: https://tracker.ceph.com/issues/57063
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/rgw/rgw_data_sync.cc

index 32829f038765657c117a87d760068d6bb02cf0db..16c8726b11f7129182996ce13485ad1535d07623 100644 (file)
@@ -1617,8 +1617,6 @@ public:
   }
 };
 
-static constexpr auto DATA_SYNC_MAX_ERR_ENTRIES = 10;
-
 class RGWDataBaseSyncShardCR : public RGWCoroutine {
 protected:
   RGWDataSyncCtx *const sc;
@@ -1766,7 +1764,7 @@ public:
 };
 
 class RGWDataIncSyncShardCR : public RGWDataBaseSyncShardCR {
-  static constexpr int max_error_entries = DATA_SYNC_MAX_ERR_ENTRIES;
+  static constexpr int max_error_entries = 10;
   static constexpr uint32_t retry_backoff_secs = 60;
 
   ceph::mutex& inc_lock;
@@ -1976,83 +1974,43 @@ public:
 };
 
 class RGWDataSyncShardCR : public RGWCoroutine {
-  static constexpr auto OMAP_GET_MAX_ENTRIES = 100;
-  RGWDataSyncCtx *sc;
-  RGWDataSyncEnv *sync_env;
-
-  rgw_pool pool;
-
-  uint32_t shard_id;
+  RGWDataSyncCtx *const sc;
+  const rgw_pool pool;
+  const uint32_t shard_id;
   rgw_data_sync_marker& sync_marker;
   rgw_data_sync_status sync_status;
-
-  RGWRadosGetOmapValsCR::ResultPtr omapvals;
-  std::map<std::string, bufferlist> entries;
-  std::map<std::string, bufferlist>::iterator iter;
-
-  string oid;
-
-  std::optional<RGWDataSyncShardMarkerTrack> marker_tracker;
-
-  std::string next_marker;
-  vector<rgw_data_change_log_entry> log_entries;
-  vector<rgw_data_change_log_entry>::iterator log_iter;
-  bool truncated = false;
+  const RGWSyncTraceNodeRef tn;
+  bool *reset_backoff; // TODO We do nothing with this pointer.
 
   ceph::mutex inc_lock = ceph::make_mutex("RGWDataSyncShardCR::inc_lock");
   ceph::condition_variable inc_cond;
 
-  boost::asio::coroutine incremental_cr;
-  boost::asio::coroutine full_cr;
-
+  RGWDataSyncEnv *const sync_env{ sc->env };
 
-  bc::flat_set<rgw_data_notify_entry> modified_shards;
-  bc::flat_set<rgw_data_notify_entry> current_modified;
+  const string status_oid{ RGWDataSyncStatusManager::shard_obj_name(
+      sc->source_zone, shard_id) };
+  const rgw_raw_obj error_repo{ pool, status_oid + ".retry" };
 
-  bc::flat_set<rgw_data_notify_entry>::iterator modified_iter;
-
-  uint64_t total_entries = 0;
-  bool *reset_backoff = nullptr;
+  // target number of entries to cache before recycling idle ones
+  static constexpr size_t target_cache_size = 256;
+  boost::intrusive_ptr<rgw::bucket_sync::Cache> bucket_shard_cache {
+    rgw::bucket_sync::Cache::create(target_cache_size) };
 
   boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
   boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
-  string status_oid;
-
-  rgw_raw_obj error_repo;
-  std::map<std::string, bufferlist> error_entries;
-  string error_marker;
-  ceph::real_time entry_timestamp;
-  static constexpr int max_error_entries = DATA_SYNC_MAX_ERR_ENTRIES;
-
-  ceph::coarse_real_time error_retry_time;
-  static constexpr uint32_t retry_backoff_secs = 60;
-
-  RGWSyncTraceNodeRef tn;
-
-  rgw_bucket_shard source_bs;
-  std::optional<uint64_t> gen;
 
-  // target number of entries to cache before recycling idle ones
-  static constexpr size_t target_cache_size = 256;
-  boost::intrusive_ptr<rgw::bucket_sync::Cache> bucket_shard_cache;
-
-  int parse_bucket_key(const std::string& key, rgw_bucket_shard& bs) const {
-    return rgw_bucket_parse_bucket_key(sync_env->cct, key,
-                                       &bs.bucket, &bs.shard_id);
-  }
+  bc::flat_set<rgw_data_notify_entry> modified_shards;
 
 public:
-  RGWDataSyncShardCR(RGWDataSyncCtx *_sc, rgw_pool& _pool,
-                     uint32_t _shard_id, rgw_data_sync_marker& _marker,
-                     const rgw_data_sync_status& _sync_status,
-                     RGWSyncTraceNodeRef& _tn, bool *_reset_backoff)
-    : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
-      pool(_pool), shard_id(_shard_id), sync_marker(_marker), sync_status(_sync_status),
-      status_oid(RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id)),
-      error_repo(pool, status_oid + ".retry"), tn(_tn),
-      bucket_shard_cache(rgw::bucket_sync::Cache::create(target_cache_size))
-  {
-    set_description() << "data sync shard source_zone=" << sc->source_zone << " shard_id=" << shard_id;
+  RGWDataSyncShardCR(RGWDataSyncCtx* const _sc, const rgw_pool& pool,
+                     const uint32_t shard_id, rgw_data_sync_marker& marker,
+                     const rgw_data_sync_status& sync_status,
+                     RGWSyncTraceNodeRef& tn, bool *reset_backoff)
+    : RGWCoroutine(_sc->cct), sc(_sc), pool(pool), shard_id(shard_id),
+      sync_marker(marker), sync_status(sync_status), tn(tn),
+      reset_backoff(reset_backoff) {
+    set_description() << "data sync shard source_zone=" << sc->source_zone
+                     << " shard_id=" << shard_id;
   }
 
   ~RGWDataSyncShardCR() override {
@@ -2146,249 +2104,6 @@ public:
                                             lock_name, lock_duration, this));
     lease_stack.reset(spawn(lease_cr.get(), false));
   }
-
-  int full_sync() {
-    int max_entries = OMAP_GET_MAX_ENTRIES;
-    reenter(&full_cr) {
-      tn->log(10, "start full sync");
-      yield init_lease_cr();
-      while (!lease_cr->is_locked()) {
-        if (lease_cr->is_done()) {
-          tn->log(5, "failed to take lease");
-          set_status("lease lock failed, early abort");
-          drain_all();
-          return set_cr_error(lease_cr->get_ret_status());
-        }
-        set_sleeping(true);
-        yield;
-      }
-      tn->log(10, "took lease");
-      oid = full_data_sync_index_shard_oid(sc->source_zone, shard_id);
-      marker_tracker.emplace(sc, status_oid, sync_marker, tn);
-      total_entries = sync_marker.pos;
-      entry_timestamp = sync_marker.timestamp; // time when full sync started
-      do {
-        if (!lease_cr->is_locked()) {
-          lease_cr->go_down();
-          drain_all();
-          return set_cr_error(-ECANCELED);
-        }
-        omapvals = std::make_shared<RGWRadosGetOmapValsCR::Result>();
-        yield call(new RGWRadosGetOmapValsCR(sync_env->store, rgw_raw_obj(pool, oid),
-                                             sync_marker.marker, max_entries, omapvals));
-        if (retcode < 0) {
-          lease_cr->go_down();
-          drain_all();
-          return set_cr_error(retcode);
-        }
-        entries = std::move(omapvals->entries);
-        if (entries.size() > 0) {
-          tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
-        }
-        tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync"));
-        iter = entries.begin();
-        for (; iter != entries.end(); ++iter) {
-          retcode = parse_bucket_key(iter->first, source_bs);
-          if (retcode < 0) {
-            tn->log(1, SSTR("failed to parse bucket shard: " << iter->first));
-            marker_tracker->try_update_high_marker(iter->first, 0, entry_timestamp);
-            continue;
-          }
-          tn->log(20, SSTR("full sync: " << iter->first));
-          total_entries++;
-          if (!marker_tracker->start(iter->first, total_entries, entry_timestamp)) {
-            tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first << ". Duplicate entry?"));
-          } else {
-            tn->log(10, SSTR("timestamp for " << iter->first << " is :" << entry_timestamp));
-            yield_spawn_window(new RGWDataFullSyncSingleEntryCR(sc, pool, source_bs, iter->first, sync_status,
-                            error_repo, entry_timestamp, lease_cr, bucket_shard_cache, &*marker_tracker, tn),
-                            cct->_conf->rgw_data_sync_spawn_window, std::nullopt);
-          }
-         sync_marker.marker = iter->first;
-        }
-      } while (omapvals->more);
-      omapvals.reset();
-
-      drain_all_but_stack(lease_stack.get());
-
-      tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
-
-      yield {
-        /* update marker to reflect we're done with full sync */
-        sync_marker.state = rgw_data_sync_marker::IncrementalSync;
-        sync_marker.marker = sync_marker.next_step_marker;
-        sync_marker.next_step_marker.clear();
-        call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj,
-                                                             rgw_raw_obj(pool, status_oid),
-                                                             sync_marker));
-      }
-      if (retcode < 0) {
-        tn->log(0, SSTR("ERROR: failed to set sync marker: retcode=" << retcode));
-        lease_cr->go_down();
-        drain_all();
-        return set_cr_error(retcode);
-      }
-      // clean up full sync index
-      yield {
-        const auto& pool = sync_env->svc->zone->get_zone_params().log_pool;
-        auto oid = full_data_sync_index_shard_oid(sc->source_zone.id, shard_id);
-        call(new RGWRadosRemoveCR(sync_env->store, {pool, oid}));
-      }
-      // keep lease and transition to incremental_sync()
-    }
-    return 0;
-  }
-
-  int incremental_sync() {
-    reenter(&incremental_cr) {
-      tn->log(10, "start incremental sync");
-      if (lease_cr) {
-        tn->log(10, "lease already held from full sync");
-      } else {
-        yield init_lease_cr();
-        while (!lease_cr->is_locked()) {
-          if (lease_cr->is_done()) {
-            tn->log(5, "failed to take lease");
-            set_status("lease lock failed, early abort");
-            drain_all();
-            return set_cr_error(lease_cr->get_ret_status());
-          }
-          set_sleeping(true);
-          yield;
-        }
-        set_status("lease acquired");
-        tn->log(10, "took lease");
-      }
-      marker_tracker.emplace(sc, status_oid, sync_marker, tn);
-      do {
-        if (!lease_cr->is_locked()) {
-          lease_cr->go_down();
-          drain_all();
-          return set_cr_error(-ECANCELED);
-        }
-        current_modified.clear();
-        inc_lock.lock();
-        current_modified.swap(modified_shards);
-        inc_lock.unlock();
-
-        if (current_modified.size() > 0) {
-          tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
-        }
-        /* process out of band updates */
-        for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
-          retcode = parse_bucket_key(modified_iter->key, source_bs);
-          if (retcode < 0) {
-            tn->log(1, SSTR("failed to parse bucket shard: " << modified_iter->key));
-            continue;
-          }
-          tn->log(20, SSTR("received async update notification: " << modified_iter->key));
-          spawn(data_sync_single_entry(sc, source_bs, modified_iter->gen, string(),
-                                  ceph::real_time{}, lease_cr, bucket_shard_cache, &*marker_tracker, error_repo, tn, false), false);
-        }
-
-        if (error_retry_time <= ceph::coarse_real_clock::now()) {
-          /* process bucket shards that previously failed */
-          omapvals = std::make_shared<RGWRadosGetOmapValsCR::Result>();
-          yield call(new RGWRadosGetOmapValsCR(sync_env->store, error_repo,
-                                               error_marker, max_error_entries, omapvals));
-          error_entries = std::move(omapvals->entries);
-          tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries"));
-          iter = error_entries.begin();
-          for (; iter != error_entries.end(); ++iter) {
-            error_marker = iter->first;
-            entry_timestamp = rgw::error_repo::decode_value(iter->second);
-            retcode = rgw::error_repo::decode_key(iter->first, source_bs, gen);
-            if (retcode == -EINVAL) {
-              // backward compatibility for string keys that don't encode a gen
-              retcode = parse_bucket_key(error_marker, source_bs);
-            }
-            if (retcode < 0) {
-              tn->log(1, SSTR("failed to parse bucket shard: " << error_marker));
-              spawn(rgw::error_repo::remove_cr(sync_env->store->svc()->rados, error_repo,
-                                               error_marker, entry_timestamp), false);
-              continue;
-            }
-            tn->log(10, SSTR("gen is " << gen));
-            if (!gen) {
-              // write all full sync obligations for the bucket to error repo
-              spawn(new RGWDataIncrementalSyncFullObligationCR(sc, source_bs, error_marker, entry_timestamp, tn), false);
-            } else {
-              tn->log(20, SSTR("handle error entry key=" << to_string(source_bs, gen) << " timestamp=" << entry_timestamp));
-              spawn(data_sync_single_entry(sc, source_bs, gen, "", entry_timestamp, lease_cr,
-                                      bucket_shard_cache, &*marker_tracker, error_repo, tn, true), false);
-            }
-          }
-          if (!omapvals->more) {
-            error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs);
-            error_marker.clear();
-          }
-        }
-        omapvals.reset();
-
-        tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker));
-        yield call(new RGWReadRemoteDataLogShardCR(sc, shard_id, sync_marker.marker,
-                                                   &next_marker, &log_entries, &truncated));
-        if (retcode < 0 && retcode != -ENOENT) {
-          tn->log(0, SSTR("ERROR: failed to read remote data log info: ret=" << retcode));
-          lease_cr->go_down();
-          drain_all();
-          return set_cr_error(retcode);
-        }
-
-        if (log_entries.size() > 0) {
-          tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
-        }
-
-        for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
-          tn->log(20, SSTR("shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key));
-          retcode = parse_bucket_key(log_iter->entry.key, source_bs);
-          if (retcode < 0) {
-            tn->log(1, SSTR("failed to parse bucket shard: " << log_iter->entry.key));
-            marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
-            continue;
-          }
-          if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
-            tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?"));
-          } else {
-            tn->log(1, SSTR("incremental sync on " << log_iter->entry.key  << "shard: " << shard_id << "on gen " << log_iter->entry.gen));
-            yield_spawn_window(data_sync_single_entry(sc, source_bs, log_iter->entry.gen, log_iter->log_id,
-                                                 log_iter->log_timestamp, lease_cr,bucket_shard_cache,
-                                                 &*marker_tracker, error_repo, tn, false),
-                               cct->_conf->rgw_data_sync_spawn_window, std::nullopt);
-          }
-        }
-
-        tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker
-                         << " next_marker=" << next_marker << " truncated=" << truncated));
-        if (!next_marker.empty()) {
-          sync_marker.marker = next_marker;
-        } else if (!log_entries.empty()) {
-          sync_marker.marker = log_entries.back().log_id;
-        }
-        if (!truncated) {
-          // we reached the end, wait a while before checking for more
-          tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
-         yield wait(get_idle_interval());
-       }
-      } while (true);
-    }
-    return 0;
-  }
-
-  utime_t get_idle_interval() const {
-    ceph::timespan interval = std::chrono::seconds(cct->_conf->rgw_data_sync_poll_interval);
-    if (!ceph::coarse_real_clock::is_zero(error_retry_time)) {
-      auto now = ceph::coarse_real_clock::now();
-      if (error_retry_time > now) {
-        auto d = error_retry_time - now;
-        if (interval > d) {
-          interval = d;
-        }
-      }
-    }
-    // convert timespan -> time_point -> utime_t
-    return utime_t(ceph::coarse_real_clock::zero() + interval);
-  }
 };
 
 class RGWDataSyncShardControlCR : public RGWBackoffControlCR {