]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: store failed data sync entries in separate omap
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 11 May 2016 22:59:27 +0000 (15:59 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 13 May 2016 18:22:03 +0000 (11:22 -0700)
so that we can reiterate over them

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_data_sync.cc

index ef4e43412cb198fdb17b1197a4842e3056656028..d9c577b25e670a91901a66d3dd026e181066954b 100644 (file)
@@ -785,13 +785,17 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine {
 
   RGWDataSyncShardMarkerTrack *marker_tracker;
 
+  RGWOmapAppend *error_repo;
+
 public:
   RGWDataSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
-                          const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker) : RGWCoroutine(_sync_env->cct),
+                          const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker,
+                           RGWOmapAppend *_error_repo) : RGWCoroutine(_sync_env->cct),
                                                       sync_env(_sync_env),
                                                      raw_key(_raw_key), entry_marker(_entry_marker),
                                                       sync_status(0),
-                                                      marker_tracker(_marker_tracker) {
+                                                      marker_tracker(_marker_tracker),
+                                                      error_repo(_error_repo) {
     set_description() << "data sync single entry (source_zone=" << sync_env->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
   }
 
@@ -814,6 +818,7 @@ public:
       if (sync_status < 0) {
         yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", bucket_name + ":" + bucket_instance,
                                                         -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
+        yield error_repo->append(raw_key);
       }
       /* FIXME: what do do in case of error */
       if (!entry_marker.empty()) {
@@ -878,6 +883,9 @@ class RGWDataSyncShardCR : public RGWCoroutine {
 
   RGWContinuousLeaseCR *lease_cr;
   string status_oid;
+
+  RGWOmapAppend *error_repo;
+
 public:
   RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env,
                      rgw_bucket& _pool,
@@ -891,6 +899,7 @@ public:
                                                       lease_cr(NULL) {
     set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id;
     status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
+    error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store, pool, status_oid + ".retry");
   }
 
   ~RGWDataSyncShardCR() {
@@ -899,6 +908,7 @@ public:
       lease_cr->abort();
       lease_cr->put();
     }
+    delete error_repo;
   }
 
   void append_modified_shards(set<string>& keys) {
@@ -984,7 +994,7 @@ public:
             ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl;
           } else {
             // fetch remote and write locally
-            yield spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker), false);
+            yield spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker, error_repo), false);
             if (retcode < 0) {
               lease_cr->go_down();
               drain_all();
@@ -1040,7 +1050,7 @@ public:
         for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
           yield {
             ldout(sync_env->cct, 20) << __func__ << "(): async update notification: " << *modified_iter << dendl;
-            spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker), false);
+            spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo), false);
           }
         }
 
@@ -1078,7 +1088,7 @@ public:
                */
               if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
                 spawned_keys.insert(log_iter->entry.key);
-                spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker), false);
+                spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo), false);
                 if (retcode < 0) {
                   lease_cr->go_down();
                   drain_all();
@@ -2229,7 +2239,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
     yield {
       set_status("acquiring sync lock");
       uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
-      string lock_name = "sync_lock";
+      string lock_name = "sync_lock.incremental"; /* allow concurrent full sync and incremental sync on the same bucket */
       RGWRados *store = sync_env->store;
       lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, status_oid,
                                           lock_name, lock_duration, this);