]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: data sync retries sync on prevously failed bucket shards
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 13 May 2016 18:13:48 +0000 (11:13 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 13 May 2016 18:29:47 +0000 (11:29 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_data_sync.cc

index d9c577b25e670a91901a66d3dd026e181066954b..ae771911b24bf045d943255125ccc83c118d6442 100644 (file)
@@ -786,16 +786,19 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine {
   RGWDataSyncShardMarkerTrack *marker_tracker;
 
   RGWOmapAppend *error_repo;
+  bool remove_from_repo;
+
+  set<string> keys;
 
 public:
   RGWDataSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
                           const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker,
-                           RGWOmapAppend *_error_repo) : RGWCoroutine(_sync_env->cct),
+                           RGWOmapAppend *_error_repo, bool _remove_from_repo) : RGWCoroutine(_sync_env->cct),
                                                       sync_env(_sync_env),
                                                      raw_key(_raw_key), entry_marker(_entry_marker),
                                                       sync_status(0),
                                                       marker_tracker(_marker_tracker),
-                                                      error_repo(_error_repo) {
+                                                      error_repo(_error_repo), remove_from_repo(_remove_from_repo) {
     set_description() << "data sync single entry (source_zone=" << sync_env->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
   }
 
@@ -808,20 +811,34 @@ public:
           if (ret < 0) {
             return set_cr_error(-EIO);
           }
-          marker_tracker->reset_need_retry(raw_key);
+          if (marker_tracker) {
+            marker_tracker->reset_need_retry(raw_key);
+          }
           call(new RGWRunBucketSyncCoroutine(sync_env, bucket_name, bucket_instance, shard_id));
         }
-      } while (marker_tracker->need_retry(raw_key));
+      } while (marker_tracker && marker_tracker->need_retry(raw_key));
 
       sync_status = retcode;
 
       if (sync_status < 0) {
         yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", bucket_name + ":" + bucket_instance,
                                                         -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
-        yield error_repo->append(raw_key);
+        if (retcode < 0) {
+          ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure: retcode=" << retcode << dendl;
+        }
+        if (!error_repo->append(raw_key)) {
+          ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure in error repo: retcode=" << retcode << dendl;
+        }
+      } else if (remove_from_repo) {
+        keys = {raw_key};
+        yield call(new RGWRadosRemoveOmapKeysCR(sync_env->store, error_repo->get_pool(), error_repo->get_oid(), keys));
+        if (retcode < 0) {
+          ldout(sync_env->store->ctx(), 0) << "ERROR: failed to remove omap key from error repo ("
+             << error_repo->get_pool() << ":" << error_repo->get_oid() << " retcode=" << retcode << dendl;
+        }
       }
       /* FIXME: what do do in case of error */
-      if (!entry_marker.empty()) {
+      if (marker_tracker && !entry_marker.empty()) {
         /* update marker */
         yield call(marker_tracker->finish(entry_marker));
       }
@@ -838,6 +855,7 @@ public:
 };
 
 #define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
+#define DATA_SYNC_MAX_ERR_ENTRIES 10
 
 class RGWDataSyncShardCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
@@ -884,7 +902,12 @@ class RGWDataSyncShardCR : public RGWCoroutine {
   RGWContinuousLeaseCR *lease_cr;
   string status_oid;
 
+
+  string error_oid;
   RGWOmapAppend *error_repo;
+  map<string, bufferlist> error_entries;
+  string error_marker;
+  int max_error_entries;
 
 public:
   RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env,
@@ -896,10 +919,10 @@ public:
                                                      sync_marker(_marker),
                                                       marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
                                                       total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
-                                                      lease_cr(NULL) {
+                                                      lease_cr(NULL), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES) {
     set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id;
     status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
-    error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store, pool, status_oid + ".retry");
+    error_oid = status_oid + ".retry";
   }
 
   ~RGWDataSyncShardCR() {
@@ -908,7 +931,9 @@ public:
       lease_cr->abort();
       lease_cr->put();
     }
-    delete error_repo;
+    if (error_repo) {
+      error_repo->put();
+    }
   }
 
   void append_modified_shards(set<string>& keys) {
@@ -994,7 +1019,7 @@ public:
             ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl;
           } else {
             // fetch remote and write locally
-            yield spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker, error_repo), false);
+            yield spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker, error_repo, false), false);
             if (retcode < 0) {
               lease_cr->go_down();
               drain_all();
@@ -1028,6 +1053,9 @@ public:
 
   int incremental_sync() {
     reenter(&incremental_cr) {
+      error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store, pool, error_oid, 1 /* no buffer */);
+      error_repo->get();
+      spawn(error_repo, false);
       yield init_lease_cr();
       while (!lease_cr->is_locked()) {
         if (lease_cr->is_done()) {
@@ -1050,14 +1078,28 @@ public:
         for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
           yield {
             ldout(sync_env->cct, 20) << __func__ << "(): async update notification: " << *modified_iter << dendl;
-            spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo), false);
+            spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo, false), false);
           }
         }
 
+        /* process bucket shards that previously failed */
+        yield call(new RGWRadosGetOmapKeysCR(sync_env->store, pool, error_oid, error_marker, &error_entries, max_error_entries));
+        ldout(sync_env->cct, 20) << __func__ << "(): read error repo, got " << error_entries.size() << " entries" << dendl;
+        iter = error_entries.begin();
+        for (; iter != error_entries.end(); ++iter) {
+          ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << iter->first << dendl;
+          spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, nullptr /* no marker tracker */, error_repo, true), false);
+          error_marker = iter->first;
+        }
+        if ((int)error_entries.size() != max_error_entries) {
+          error_marker.clear();
+        }
+
+
         yield call(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &shard_info));
         if (retcode < 0) {
           ldout(sync_env->cct, 0) << "ERROR: failed to fetch remote data log info: ret=" << retcode << dendl;
-          lease_cr->go_down();
+          stop_spawned_services();
           drain_all();
           return set_cr_error(retcode);
         }
@@ -1069,7 +1111,7 @@ public:
           yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated));
           if (retcode < 0) {
             ldout(sync_env->cct, 0) << "ERROR: failed to read remote data log info: ret=" << retcode << dendl;
-            lease_cr->go_down();
+            stop_spawned_services();
             drain_all();
             return set_cr_error(retcode);
           }
@@ -1088,9 +1130,9 @@ public:
                */
               if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
                 spawned_keys.insert(log_iter->entry.key);
-                spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo), false);
+                spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false), false);
                 if (retcode < 0) {
-                  lease_cr->go_down();
+                  stop_spawned_services();
                   drain_all();
                   return set_cr_error(retcode);
                 }
@@ -1119,6 +1161,13 @@ public:
     }
     return 0;
   }
+  void stop_spawned_services() {
+    lease_cr->go_down();
+    if (error_repo) {
+      error_repo->put();
+      error_repo = NULL;
+    }
+  }
 };
 
 class RGWDataSyncShardControlCR : public RGWBackoffControlCR {