]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: DataSyncSingleEntry does not duplicate bucket sync
authorCasey Bodley <cbodley@redhat.com>
Tue, 31 Mar 2020 13:23:14 +0000 (09:23 -0400)
committerCasey Bodley <cbodley@redhat.com>
Mon, 13 Apr 2020 18:08:18 +0000 (14:08 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_data_sync.cc

index dd7e3b99a5d25bf9c4228fc07e485b25cfbe65ac..f0162da1679f3a135d20c32ab266cb936daf33c7 100644 (file)
@@ -1290,7 +1290,8 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine {
   RGWDataSyncCtx *sc;
   RGWDataSyncEnv *sync_env;
   rgw::bucket_sync::Handle state; // cached bucket-shard state
-  rgw_data_sync_obligation obligation;
+  rgw_data_sync_obligation obligation; // input obligation
+  std::optional<rgw_data_sync_obligation> complete; // obligation to complete
   RGWDataSyncShardMarkerTrack *marker_tracker;
   boost::intrusive_ptr<RGWOmapAppend> error_repo;
   RGWSyncTraceNodeRef tn;
@@ -1311,59 +1312,78 @@ public:
 
   int operate() override {
     reenter(this) {
-      do {
-        if (marker_tracker) {
-          marker_tracker->reset_need_retry(obligation.key);
+      if (state->obligation) {
+        // this is already syncing in another DataSyncSingleEntryCR
+        if (state->obligation->timestamp < obligation.timestamp) {
+          // cancel existing obligation and overwrite it
+          tn->log(10, SSTR("canceling existing obligation " << *state->obligation));
+          complete = std::move(*state->obligation);
+          *state->obligation = std::move(obligation);
+        } else {
+          // cancel new obligation
+          tn->log(10, SSTR("canceling new obligation " << obligation));
+          complete = std::move(obligation);
         }
-        tn->log(0, SSTR("triggering sync of source bucket/shard " << bucket_shard_str{state->key}));
+      } else {
+        // start syncing a new obligation
+        state->obligation = obligation;
 
-        yield call(new RGWRunBucketSourcesSyncCR(sc,
-                                                 std::nullopt, /* target_bs */
-                                                 state->key,
-                                                 tn, &progress));
-        if (retcode == 0) {
-          tn->log(20, SSTR("RunBucketSources progress=" << progress));
-        }
-      } while (marker_tracker && marker_tracker->need_retry(obligation.key));
+        do {
+          yield {
+            tn->log(4, SSTR("triggering sync of source bucket/shard " << bucket_shard_str{state->key} << " " << *state->obligation));
+
+            call(new RGWRunBucketSourcesSyncCR(sc,
+                                               std::nullopt, /* target_bs */
+                                               state->key,
+                                               tn, &progress));
+          }
+        } while (marker_tracker && marker_tracker->need_retry(obligation.key));
 
+        tn->log(10, SSTR("sync finished on " << bucket_shard_str{state->key}));
+
+        // any new obligations will process themselves
+        complete = std::move(*state->obligation);
+        state->obligation.reset();
+      }
       sync_status = retcode;
 
       if (sync_status == -ENOENT) {
         // this was added when 'tenant/' was added to datalog entries, because
         // preexisting tenant buckets could never sync and would stay in the
         // error_repo forever
-        tn->log(0, SSTR("WARNING: skipping data log entry for missing bucket " << obligation.key));
+        tn->log(0, SSTR("WARNING: skipping data log entry for missing bucket " << complete->key));
         sync_status = 0;
       }
 
       if (sync_status < 0) {
         // write actual sync failures for 'radosgw-admin sync error list'
         if (sync_status != -EBUSY && sync_status != -EAGAIN) {
-          yield call(sync_env->error_logger->log_error_cr(sc->conn->get_remote_id(), "data", obligation.key,
+          yield call(sync_env->error_logger->log_error_cr(sc->conn->get_remote_id(), "data", complete->key,
                                                           -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
           if (retcode < 0) {
             tn->log(0, SSTR("ERROR: failed to log sync failure: retcode=" << retcode));
           }
         }
-        if (error_repo) {
+        if (error_repo && complete->timestamp != ceph::real_time{}) {
+          tn->log(10, SSTR("writing " << *complete << " to error repo for retry"));
           yield call(rgw_error_repo_write_cr(sync_env->store->svc()->rados, error_repo->get_obj(),
-                                             obligation.key, obligation.timestamp));
+                                            complete->key, complete->timestamp));
           if (retcode < 0) {
             tn->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode));
           }
         }
-      } else if (error_repo && obligation.retry) {
+      } else if (error_repo && complete->retry) {
         yield call(rgw_error_repo_remove_cr(sync_env->store->svc()->rados, error_repo->get_obj(),
-                                            obligation.key, obligation.timestamp));
+                                            complete->key, complete->timestamp));
         if (retcode < 0) {
           tn->log(0, SSTR("ERROR: failed to remove omap key from error repo ("
              << error_repo->get_obj() << " retcode=" << retcode));
         }
       }
       /* FIXME: what do do in case of error */
-      if (marker_tracker && !obligation.marker.empty()) {
+      if (marker_tracker && !complete->marker.empty()) {
         /* update marker */
-        yield call(marker_tracker->finish(obligation.marker));
+        yield call(marker_tracker->finish(complete->marker));
       }
       if (sync_status == 0) {
         sync_status = retcode;