]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: start marker tracker sync entry can fail
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 13 Nov 2015 04:50:12 +0000 (20:50 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 12 Feb 2016 00:13:32 +0000 (16:13 -0800)
We might hit duplicate entry, just skip it, don't assert.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_data_sync.cc
src/rgw/rgw_sync.cc
src/rgw/rgw_sync.h

index 774213f6b42a5a2607f839d9ce3a85d3bbaed293..ac8b1362d4b73aeba089b6be750ef356d5ff8a6c 100644 (file)
@@ -918,11 +918,14 @@ public:
         for (; iter != entries.end(); ++iter) {
           ldout(store->ctx(), 20) << __func__ << ": full sync: " << iter->first << dendl;
           total_entries++;
-          marker_tracker->start(iter->first, total_entries, utime_t());
+          if (!marker_tracker->start(iter->first, total_entries, utime_t())) {
+            ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl;
+          } else {
             // fetch remote and write locally
-          yield spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, iter->first, iter->first, marker_tracker), false);
-          if (retcode < 0) {
-            return set_cr_error(retcode);
+            yield spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, iter->first, iter->first, marker_tracker), false);
+            if (retcode < 0) {
+              return set_cr_error(retcode);
+            }
           }
           sync_marker.marker = iter->first;
         }
@@ -989,10 +992,13 @@ public:
               ldout(store->ctx(), 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl;
               continue;
             }
-            marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp);
-            yield spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, log_iter->entry.key, log_iter->log_id, marker_tracker), false);
-            if (retcode < 0) {
-              return set_cr_error(retcode);
+            if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
+              ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl;
+            } else {
+              yield spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, log_iter->entry.key, log_iter->log_id, marker_tracker), false);
+              if (retcode < 0) {
+                return set_cr_error(retcode);
+              }
             }
          }
        }
@@ -2033,13 +2039,15 @@ int RGWBucketShardFullSyncCR::operate()
         yield {
           bucket_list_entry& entry = *entries_iter;
           total_entries++;
-          marker_tracker->start(entry.key, total_entries, utime_t());
           list_marker = entry.key;
+          if (!marker_tracker->start(entry.key, total_entries, utime_t())) {
+            ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << entry.key << ". Duplicate entry?" << dendl;
+          } else {
+            RGWModifyOp op = (entry.key.instance.empty() || entry.key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH);
 
-          RGWModifyOp op = (entry.key.instance.empty() || entry.key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH);
-
-          spawn(new RGWBucketSyncSingleEntryCR<rgw_obj_key>(store, async_rados, source_zone, bucket_info, shard_id,
-                                               entry.key, entry.versioned_epoch, entry.mtime, op, entry.key, marker_tracker), false);
+            spawn(new RGWBucketSyncSingleEntryCR<rgw_obj_key>(store, async_rados, source_zone, bucket_info, shard_id,
+                                                              entry.key, entry.versioned_epoch, entry.mtime, op, entry.key, marker_tracker), false);
+          }
         }
         while ((int)num_spawned() > spawn_window) {
           yield wait_for_child();
@@ -2144,14 +2152,17 @@ int RGWBucketShardIncrementalSyncCR::operate()
           rgw_obj_key key(entries_iter->object, entries_iter->instance);
           ldout(store->ctx(), 20) << "[inc sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << dendl;
           rgw_bi_log_entry& entry = *entries_iter;
-          marker_tracker->start(entry.id, 0, entries_iter->timestamp);
           inc_marker.position = entry.id;
-          uint64_t versioned_epoch = 0;
-          if (entry.ver.pool < 0) {
-            versioned_epoch = entry.ver.epoch;
+          if (!marker_tracker->start(entry.id, 0, entries_iter->timestamp)) {
+            ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << entry.id << ". Duplicate entry?" << dendl;
+          } else {
+            uint64_t versioned_epoch = 0;
+            if (entry.ver.pool < 0) {
+              versioned_epoch = entry.ver.epoch;
+            }
+            spawn(new RGWBucketSyncSingleEntryCR<string>(store, async_rados, source_zone, bucket_info, shard_id,
+                                                         key, versioned_epoch, entry.timestamp, entry.op, entry.id, marker_tracker), false);
           }
-          spawn(new RGWBucketSyncSingleEntryCR<string>(store, async_rados, source_zone, bucket_info, shard_id,
-                                               key, versioned_epoch, entry.timestamp, entry.op, entry.id, marker_tracker), false);
         }
         while ((int)num_spawned() > spawn_window) {
           yield wait_for_child();
index 3ececd2af3376af0a61d46b5d7fd60e73ab6606d..8b9c73016a7ff5c39012d4c0235f484ceb651c4e 100644 (file)
@@ -1209,19 +1209,21 @@ public:
         for (; iter != entries.end(); ++iter) {
           ldout(sync_env->cct, 20) << __func__ << ": full sync: " << iter->first << dendl;
           total_entries++;
-          marker_tracker->start(iter->first, total_entries, utime_t());
-
+          if (!marker_tracker->start(iter->first, total_entries, utime_t())) {
+            ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl;
+          } else {
             // fetch remote and write locally
-          yield {
-            RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker), false);
-            if (retcode < 0) {
-              return retcode;
-            }
-            assert(stack);
-            stack->get();
+            yield {
+              RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker), false);
+              if (retcode < 0) {
+                return retcode;
+              }
+              assert(stack);
+              stack->get();
 
-            stack_to_pos[stack] = iter->first;
-            pos_to_prev[iter->first] = marker;
+              stack_to_pos[stack] = iter->first;
+              pos_to_prev[iter->first] = marker;
+            }
           }
           marker = iter->first;
         }
@@ -1340,17 +1342,20 @@ public:
           yield call(new RGWReadMDLogEntriesCR(sync_env, shard_id, &max_marker, INCREMENTAL_MAX_ENTRIES, &log_entries, &truncated));
           for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
             ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << dendl;
-            marker_tracker->start(log_iter->id, 0, log_iter->timestamp);
-            raw_key = log_iter->section + ":" + log_iter->name;
-            yield {
-              RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, marker_tracker), false);
-              assert(stack);
-              stack->get();
-
-              stack_to_pos[stack] = log_iter->id;
-              pos_to_prev[log_iter->id] = marker;
-              marker = log_iter->id;
+            if (!marker_tracker->start(log_iter->id, 0, log_iter->timestamp)) {
+              ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->id << ". Duplicate entry?" << dendl;
+            } else {
+              raw_key = log_iter->section + ":" + log_iter->name;
+              yield {
+                RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, marker_tracker), false);
+                assert(stack);
+                stack->get();
+
+                stack_to_pos[stack] = log_iter->id;
+                pos_to_prev[log_iter->id] = marker;
+              }
             }
+            marker = log_iter->id;
           }
         }
         collect_children();
index 51f3e4ef0419e194c9a42df0410dd91ee6240780..dfd1a57f73fe8f64bf5749e1fc5edf60cca7b404 100644 (file)
@@ -226,8 +226,12 @@ public:
   RGWSyncShardMarkerTrack(int _window_size) : window_size(_window_size), updates_since_flush(0) {}
   virtual ~RGWSyncShardMarkerTrack() {}
 
-  void start(const T& pos, int index_pos, const utime_t& timestamp) {
+  bool start(const T& pos, int index_pos, const utime_t& timestamp) {
+    if (pending.find(pos) != pending.end()) {
+      return false;
+    }
     pending[pos] = marker_entry(index_pos, timestamp);
+    return true;
   }
 
   RGWCoroutine *finish(const T& pos) {