]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: bucket sync transitions back to StateInit on OP_SYNCSTOP 23574/head
authorCasey Bodley <cbodley@redhat.com>
Tue, 14 Aug 2018 18:06:40 +0000 (14:06 -0400)
committerCasey Bodley <cbodley@redhat.com>
Tue, 14 Aug 2018 18:38:32 +0000 (14:38 -0400)
the handling for OP_SYNCSTOP->OP_RESYNC is incorrect because it's not
safe to continue incremental sync after SYNCSTOP. any changes between
SYNCSTOP and RESYNC will not be written to the bilog, so incremental
sync cannot sync them

SYNCSTOP now transitions back to StateInit, where
RGWInitBucketShardSyncStatusCoroutine will query the remote bilog info
to see if it's still disabled. if so, it deletes the sync status object
and finishes. otherwise, StateInit will transition back to StateFull to
resync the bucket

Fixes: http://tracker.ceph.com/issues/26895
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_data_sync.cc

index a61fb9bc29b366f1a8016542eab24b21aa9a3bf4..1c550a6381cc2c83f54caa7c4e47130536233702 100644 (file)
@@ -2873,7 +2873,7 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
   RGWBucketInfo *bucket_info;
   boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
   list<rgw_bi_log_entry> list_result;
-  list<rgw_bi_log_entry>::iterator entries_iter;
+  list<rgw_bi_log_entry>::iterator entries_iter, entries_end;
   map<pair<string, string>, pair<real_time, RGWModifyOp> > squash_map;
   rgw_bucket_shard_sync_info& sync_info;
   rgw_obj_key key;
@@ -2882,7 +2882,6 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
   bool updated_status{false};
   const string& status_oid;
   const string& zone_id;
-  ceph::real_time sync_modify_time;
 
   string cur_id;
 
@@ -2934,28 +2933,23 @@ int RGWBucketShardIncrementalSyncCR::operate()
       set_status() << "listing bilog; position=" << sync_info.inc_marker.position;
       yield call(new RGWListBucketIndexLogCR(sync_env, bs, sync_info.inc_marker.position,
                                              &list_result));
-      if (retcode < 0 && retcode != -ENOENT ) {
+      if (retcode < 0 && retcode != -ENOENT) {
+        /* wait for all operations to complete */
         drain_all();
-        if (!syncstopped) {
-          /* wait for all operations to complete */
-          return set_cr_error(retcode);
-        } else {
-          /* no need to retry */
-          break;       
-       }
+        return set_cr_error(retcode);
       }
       squash_map.clear();
-      for (auto& e : list_result) {
-        if (e.op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP && (sync_modify_time < e.timestamp)) {
-          ldout(sync_env->cct, 20) << " syncstop on " << e.timestamp << dendl;
-          sync_modify_time = e.timestamp;
+      entries_iter = list_result.begin();
+      entries_end = list_result.end();
+      for (; entries_iter != entries_end; ++entries_iter) {
+        auto e = *entries_iter;
+        if (e.op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP) {
+          ldout(sync_env->cct, 20) << "syncstop on " << e.timestamp << dendl;
           syncstopped = true;
-          continue;
+          entries_end = entries_iter; // dont sync past here
+          break;
         }
-        if (e.op == RGWModifyOp::CLS_RGW_OP_RESYNC && (sync_modify_time < e.timestamp)) {
-          ldout(sync_env->cct, 20) << " resync on " << e.timestamp << dendl;
-          sync_modify_time = e.timestamp;
-          syncstopped = false;
+        if (e.op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
           continue;
         }
         if (e.op == CLS_RGW_OP_CANCEL) {
@@ -2978,7 +2972,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
       }
 
       entries_iter = list_result.begin();
-      for (; entries_iter != list_result.end(); ++entries_iter) {
+      for (; entries_iter != entries_end; ++entries_iter) {
         if (!lease_cr->is_locked()) {
           drain_all();
           return set_cr_error(-ECANCELED);
@@ -2995,7 +2989,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
         sync_info.inc_marker.position = cur_id;
 
         if (entry->op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP || entry->op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
-          ldout(sync_env->cct, 20) << "detected syncstop or resync  on " << entries_iter->timestamp << " , skipping entry" << dendl;
+          ldout(sync_env->cct, 20) << "detected syncstop or resync on " << entries_iter->timestamp << ", skipping entry" << dendl;
           marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
           continue;
         }
@@ -3112,19 +3106,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
           }
         }
       }
-    } while (!list_result.empty() && sync_status == 0);
-
-    if (syncstopped) {
-      drain_all();
-
-      yield {
-        const string& oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs);
-        RGWRados *store = sync_env->store;
-        call(new RGWRadosRemoveCR(store, rgw_raw_obj{store->get_zone_params().log_pool, oid}));
-      }
-      lease_cr->abort();
-      return set_cr_done();
-    }
+    } while (!list_result.empty() && sync_status == 0 && !syncstopped);
 
     while (num_spawned()) {
       yield wait_for_child();
@@ -3139,8 +3121,17 @@ int RGWBucketShardIncrementalSyncCR::operate()
         /* not waiting for child here */
       }
     }
-
     tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
+
+    if (syncstopped) {
+      // transition back to StateInit in RGWRunBucketSyncCoroutine. if sync is
+      // still disabled, we'll delete the sync status object. otherwise we'll
+      // restart full sync to catch any changes that happened while sync was
+      // disabled
+      sync_info.state = rgw_bucket_shard_sync_info::StateInit;
+      return set_cr_done();
+    }
+
     yield call(marker_tracker.flush());
     if (retcode < 0) {
       tn->log(0, SSTR("ERROR: marker_tracker.flush() returned retcode=" << retcode));
@@ -3148,15 +3139,8 @@ int RGWBucketShardIncrementalSyncCR::operate()
     }
     if (sync_status < 0) {
       tn->log(0, SSTR("ERROR: failure in sync, backing out (sync_status=" << sync_status<< ")"));
-    }
-
-    /* wait for all operations to complete */
-    drain_all();
-
-    if (sync_status < 0) {
       return set_cr_error(sync_status);
     }
-
     return set_cr_done();
   }
   return 0;
@@ -3228,45 +3212,48 @@ int RGWRunBucketSyncCoroutine::operate()
       return set_cr_error(retcode);
     }
 
-    if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
-      yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status));
-      if (retcode == -ENOENT) {
-        tn->log(0, "bucket sync disabled");
-        lease_cr->go_down();
-        drain_all();
-        return set_cr_done();
-      }
-      if (retcode < 0) {
-        tn->log(0, SSTR("ERROR: init sync on bucket failed, retcode=" << retcode));
-        lease_cr->go_down();
-        drain_all();
-        return set_cr_error(retcode);
+    do {
+      if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
+        yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status));
+        if (retcode == -ENOENT) {
+          tn->log(0, "bucket sync disabled");
+          lease_cr->abort(); // deleted lease object, abort instead of unlock
+          drain_all();
+          return set_cr_done();
+        }
+        if (retcode < 0) {
+          tn->log(0, SSTR("ERROR: init sync on bucket failed, retcode=" << retcode));
+          lease_cr->go_down();
+          drain_all();
+          return set_cr_error(retcode);
+        }
       }
-    }
 
-    if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
-      yield call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info,
-                                              status_oid, lease_cr.get(),
-                                              sync_status, tn));
-      if (retcode < 0) {
-        tn->log(5, SSTR("full sync on bucket failed, retcode=" << retcode));
-        lease_cr->go_down();
-        drain_all();
-        return set_cr_error(retcode);
+      if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
+        yield call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info,
+                                                status_oid, lease_cr.get(),
+                                                sync_status, tn));
+        if (retcode < 0) {
+          tn->log(5, SSTR("full sync on bucket failed, retcode=" << retcode));
+          lease_cr->go_down();
+          drain_all();
+          return set_cr_error(retcode);
+        }
       }
-    }
 
-    if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
-      yield call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info,
-                                                     status_oid, lease_cr.get(),
-                                                     sync_status, tn));
-      if (retcode < 0) {
-        tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode));
-        lease_cr->go_down();
-        drain_all();
-        return set_cr_error(retcode);
+      if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
+        yield call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info,
+                                                       status_oid, lease_cr.get(),
+                                                       sync_status, tn));
+        if (retcode < 0) {
+          tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode));
+          lease_cr->go_down();
+          drain_all();
+          return set_cr_error(retcode);
+        }
       }
-    }
+      // loop back to previous states unless incremental sync returns normally
+    } while (sync_status.state != rgw_bucket_shard_sync_info::StateIncrementalSync);
 
     lease_cr->go_down();
     drain_all();