]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: Break out RGWDataIncSyncShardCR
authorAdam C. Emerson <aemerson@redhat.com>
Thu, 4 Aug 2022 22:00:31 +0000 (18:00 -0400)
committerCasey Bodley <cbodley@redhat.com>
Tue, 9 Aug 2022 19:29:21 +0000 (15:29 -0400)
This was formerly the function
RGWDataSyncShardCR::incremental_sync. As with full_sync, we transfer
responsibility for acquiring the lease to the top level
RGWDataSyncShardCR coroutine.

Fixes: https://tracker.ceph.com/issues/57063
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/rgw/rgw_data_sync.cc

index 18c9fa04f951a5b341ac27bdaffbf4274dab5da3..5b74150cc8dc8d96096d898fab5912841e7c0f0b 100644 (file)
@@ -1765,6 +1765,216 @@ public:
   }
 };
 
+class RGWDataIncSyncShardCR : public RGWDataBaseSyncShardCR {
+  static constexpr int max_error_entries = DATA_SYNC_MAX_ERR_ENTRIES;
+  static constexpr uint32_t retry_backoff_secs = 60;
+
+  ceph::mutex& inc_lock;
+  bc::flat_set<rgw_data_notify_entry>& modified_shards;
+
+  bc::flat_set<rgw_data_notify_entry> current_modified;
+  decltype(current_modified)::iterator modified_iter;
+
+  ceph::coarse_real_time error_retry_time;
+  string error_marker;
+  std::map<std::string, bufferlist> error_entries;
+  decltype(error_entries)::iterator iter;
+  ceph::real_time entry_timestamp;
+  std::optional<uint64_t> gen;
+
+  string next_marker;
+  vector<rgw_data_change_log_entry> log_entries;
+  decltype(log_entries)::iterator log_iter;
+  bool truncated = false;
+
+  utime_t get_idle_interval() const {
+    ceph::timespan interval = std::chrono::seconds(cct->_conf->rgw_data_sync_poll_interval);
+    if (!ceph::coarse_real_clock::is_zero(error_retry_time)) {
+      auto now = ceph::coarse_real_clock::now();
+      if (error_retry_time > now) {
+        auto d = error_retry_time - now;
+        if (interval > d) {
+          interval = d;
+        }
+      }
+    }
+    // convert timespan -> time_point -> utime_t
+    return utime_t(ceph::coarse_real_clock::zero() + interval);
+  }
+
+
+public:
+
+  RGWDataIncSyncShardCR(
+    RGWDataSyncCtx *const sc, const rgw_pool& pool, const uint32_t shard_id,
+    rgw_data_sync_marker& sync_marker, RGWSyncTraceNodeRef tn,
+    const string& status_oid, const rgw_raw_obj& error_repo,
+    const boost::intrusive_ptr<RGWContinuousLeaseCR>& lease_cr,
+    const rgw_data_sync_status& sync_status,
+    const boost::intrusive_ptr<rgw::bucket_sync::Cache>& bucket_shard_cache,
+    ceph::mutex& inc_lock,
+    bc::flat_set<rgw_data_notify_entry>& modified_shards)
+    : RGWDataBaseSyncShardCR(sc, pool, shard_id, sync_marker, tn,
+                            status_oid, error_repo, lease_cr, sync_status,
+                            bucket_shard_cache),
+      inc_lock(inc_lock), modified_shards(modified_shards) {}
+
+  int operate(const DoutPrefixProvider *dpp) override {
+    reenter(this) {
+      tn->log(10, "start incremental sync");
+      marker_tracker.emplace(sc, status_oid, sync_marker, tn);
+      do {
+        if (!lease_cr->is_locked()) {
+          lease_cr->go_down();
+          drain_all();
+          return set_cr_error(-ECANCELED);
+        }
+       {
+         current_modified.clear();
+         std::unique_lock il(inc_lock);
+         current_modified.swap(modified_shards);
+         il.unlock();
+       }
+
+        if (current_modified.size() > 0) {
+          tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
+        }
+        /* process out of band updates */
+        for (modified_iter = current_modified.begin();
+            modified_iter != current_modified.end();
+            ++modified_iter) {
+          retcode = parse_bucket_key(modified_iter->key, source_bs);
+          if (retcode < 0) {
+            tn->log(1, SSTR("failed to parse bucket shard: "
+                           << modified_iter->key));
+           continue;
+          }
+          tn->log(20, SSTR("received async update notification: "
+                          << modified_iter->key));
+          spawn(data_sync_single_entry(sc, source_bs, modified_iter->gen, {},
+                                      ceph::real_time{}, lease_cr,
+                                      bucket_shard_cache, &*marker_tracker,
+                                      error_repo, tn, false), false);
+       }
+
+        if (error_retry_time <= ceph::coarse_real_clock::now()) {
+          /* process bucket shards that previously failed */
+          omapvals = std::make_shared<RGWRadosGetOmapValsCR::Result>();
+          yield call(new RGWRadosGetOmapValsCR(sc->env->store, error_repo,
+                                               error_marker, max_error_entries,
+                                              omapvals));
+          error_entries = std::move(omapvals->entries);
+          tn->log(20, SSTR("read error repo, got " << error_entries.size()
+                          << " entries"));
+          iter = error_entries.begin();
+          for (; iter != error_entries.end(); ++iter) {
+            error_marker = iter->first;
+            entry_timestamp = rgw::error_repo::decode_value(iter->second);
+            retcode = rgw::error_repo::decode_key(iter->first, source_bs, gen);
+            if (retcode == -EINVAL) {
+              // backward compatibility for string keys that don't encode a gen
+              retcode = parse_bucket_key(error_marker, source_bs);
+            }
+            if (retcode < 0) {
+              tn->log(1, SSTR("failed to parse bucket shard: " << error_marker));
+              spawn(rgw::error_repo::remove_cr(sc->env->store->svc()->rados,
+                                              error_repo, error_marker,
+                                              entry_timestamp),
+                   false);
+              continue;
+            }
+            tn->log(10, SSTR("gen is " << gen));
+            if (!gen) {
+              // write all full sync obligations for the bucket to error repo
+              spawn(new RGWDataIncrementalSyncFullObligationCR(
+                     sc, source_bs,error_marker, entry_timestamp, tn), false);
+            } else {
+              tn->log(20, SSTR("handle error entry key="
+                              << to_string(source_bs, gen)
+                              << " timestamp=" << entry_timestamp));
+              spawn(data_sync_single_entry(sc, source_bs, gen, "",
+                                          entry_timestamp, lease_cr,
+                                          bucket_shard_cache, &*marker_tracker,
+                                          error_repo, tn, true), false);
+            }
+          }
+          if (!omapvals->more) {
+            error_retry_time = ceph::coarse_real_clock::now() +
+             make_timespan(retry_backoff_secs);
+            error_marker.clear();
+          }
+        }
+        omapvals.reset();
+
+        tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker="
+                        << sync_marker.marker));
+        yield call(new RGWReadRemoteDataLogShardCR(sc, shard_id,
+                                                  sync_marker.marker,
+                                                   &next_marker, &log_entries,
+                                                  &truncated));
+        if (retcode < 0 && retcode != -ENOENT) {
+          tn->log(0, SSTR("ERROR: failed to read remote data log info: ret="
+                         << retcode));
+          lease_cr->go_down();
+          drain_all();
+          return set_cr_error(retcode);
+        }
+
+        if (log_entries.size() > 0) {
+          tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
+        }
+
+        for (log_iter = log_entries.begin();
+            log_iter != log_entries.end();
+            ++log_iter) {
+          tn->log(20, SSTR("shard_id=" << shard_id << " log_entry: "
+                          << log_iter->log_id << ":" << log_iter->log_timestamp
+                          << ":" << log_iter->entry.key));
+          retcode = parse_bucket_key(log_iter->entry.key, source_bs);
+          if (retcode < 0) {
+            tn->log(1, SSTR("failed to parse bucket shard: "
+                           << log_iter->entry.key));
+            marker_tracker->try_update_high_marker(log_iter->log_id, 0,
+                                                  log_iter->log_timestamp);
+            continue;
+          }
+          if (!marker_tracker->start(log_iter->log_id, 0,
+                                    log_iter->log_timestamp)) {
+            tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id
+                           << ". Duplicate entry?"));
+          } else {
+            tn->log(1, SSTR("incremental sync on " << log_iter->entry.key
+                           << "shard: " << shard_id << "on gen "
+                           << log_iter->entry.gen));
+            yield_spawn_window(
+             data_sync_single_entry(sc, source_bs,log_iter->entry.gen,
+                                    log_iter->log_id, log_iter->log_timestamp,
+                                    lease_cr,bucket_shard_cache,
+                                    &*marker_tracker, error_repo, tn, false),
+             cct->_conf->rgw_data_sync_spawn_window, std::nullopt);
+          }
+        }
+
+        tn->log(20, SSTR("shard_id=" << shard_id <<
+                        " sync_marker="<< sync_marker.marker
+                        << " next_marker=" << next_marker
+                        << " truncated=" << truncated));
+        if (!next_marker.empty()) {
+          sync_marker.marker = next_marker;
+        } else if (!log_entries.empty()) {
+          sync_marker.marker = log_entries.back().log_id;
+        }
+        if (!truncated) {
+          // we reached the end, wait a while before checking for more
+          tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
+         yield wait(get_idle_interval());
+       }
+      } while (true);
+    }
+    return 0;
+  }
+};
+
 class RGWDataSyncShardCR : public RGWCoroutine {
   static constexpr auto OMAP_GET_MAX_ENTRIES = 100;
   RGWDataSyncCtx *sc;