]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: data sync, only use complete log entries
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 24 Nov 2015 18:48:16 +0000 (10:48 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 12 Feb 2016 00:13:38 +0000 (16:13 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_data_sync.cc

index 4abed26a39c1a959885af07d20f77b4c3bb8dd5b..8a3a13dd4c3501333875a5d3a2ebac9a9b03f259 100644 (file)
@@ -1788,6 +1788,7 @@ class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
   uint64_t versioned_epoch;
   utime_t timestamp;
   RGWModifyOp op;
+  RGWPendingState op_state;
 
   T entry_marker;
   RGWSyncShardMarkerTrack<T> *marker_tracker;
@@ -1817,6 +1818,10 @@ public:
 
   int operate() {
     reenter(this) {
+      /* skip entries that are not complete */
+      if (op_state != CLS_RGW_STATE_COMPLETE) {
+        goto done;
+      }
       yield {
         if (op == CLS_RGW_OP_ADD ||
             op == CLS_RGW_OP_LINK_OLH) {
@@ -1841,6 +1846,7 @@ public:
         ldout(store->ctx(), 0) << "ERROR: failed to sync object: " << bucket.name << ":" << bucket.bucket_id << ":" << shard_id << "/" << key << dendl;
         sync_status = retcode;
       }
+done:
       /* update marker */
       yield call(marker_tracker->finish(entry_marker));
       if (sync_status == 0) {
@@ -1928,7 +1934,7 @@ int RGWBucketShardFullSyncCR::operate()
             RGWModifyOp op = (entry.key.instance.empty() || entry.key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH);
 
             spawn(new RGWBucketSyncSingleEntryCR<rgw_obj_key>(store, async_rados, source_zone, bucket_info, shard_id,
-                                                              entry.key, entry.versioned_epoch, entry.mtime, op, entry.key, marker_tracker), false);
+                                                              entry.key, entry.versioned_epoch, entry.mtime, op, CLS_RGW_STATE_COMPLETE, entry.key, marker_tracker), false);
           }
         }
         while ((int)num_spawned() > spawn_window) {
@@ -2033,7 +2039,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
               versioned_epoch = entry.ver.epoch;
             }
             spawn(new RGWBucketSyncSingleEntryCR<string>(store, async_rados, source_zone, bucket_info, shard_id,
-                                                         key, versioned_epoch, entry.timestamp, entry.op, entry.id, marker_tracker), false);
+                                                         key, versioned_epoch, entry.timestamp, entry.op, entry.state, entry.id, marker_tracker), false);
           }
         }
         while ((int)num_spawned() > spawn_window) {