]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/multisite: allow bucket sync disable/enable
authorYuval Lifshitz <ylifshit@redhat.com>
Tue, 18 May 2021 15:59:54 +0000 (18:59 +0300)
committerAdam C. Emerson <aemerson@redhat.com>
Wed, 27 Apr 2022 21:47:02 +0000 (17:47 -0400)
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h

index 4a96ae46f15ae07c27614026481d6fa275fe9dc4..48b76f2a5c55cbefcbde4acf99a9d6607aeb7d9d 100644 (file)
@@ -2746,60 +2746,43 @@ class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
 
   rgw_bucket_shard_sync_info& status;
   RGWObjVersionTracker& objv_tracker;
-  rgw_bucket_index_marker_info& info;
   const BucketIndexShardsManager& marker_mgr;
   bool exclusive;
 public:
   RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncCtx *_sc,
                                         const rgw_bucket_sync_pair_info& _sync_pair,
                                         rgw_bucket_shard_sync_info& _status,
-                                        rgw_bucket_index_marker_info& _info,
+                                        uint64_t latest_gen,
                                         const BucketIndexShardsManager& _marker_mgr,
                                         RGWObjVersionTracker& objv_tracker,
                                         bool exclusive)
     : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
       sync_pair(_sync_pair),
-      sync_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, _sync_pair, _info.latest_gen)),
-      status(_status), objv_tracker(objv_tracker), info(_info), marker_mgr(_marker_mgr), exclusive(exclusive)
+      sync_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, _sync_pair, latest_gen)),
+      status(_status), objv_tracker(objv_tracker), marker_mgr(_marker_mgr), exclusive(exclusive)
   {}
 
   int operate(const DoutPrefixProvider *dpp) override {
     reenter(this) {
       yield {
-        auto store = sync_env->store;
         rgw_raw_obj obj(sync_env->svc->zone->get_zone_params().log_pool, sync_status_oid);
-        const bool stopped = status.state == rgw_bucket_shard_sync_info::StateStopped;
-        bool write_status = false;
 
-        auto max_marker = marker_mgr.get(sync_pair.source_bs.shard_id, "");
-
-        if (info.syncstopped) {
-          if (stopped && !sync_env->sync_module->should_full_sync()) {
-            // preserve our current incremental marker position
-            write_status = true;
-          }
-        } else {
-          // whether or not to do full sync, incremental sync will follow anyway
-          if (sync_env->sync_module->should_full_sync()) {
-            status.inc_marker.position = max_marker;
-          }
-          write_status = true;
-          status.inc_marker.timestamp = ceph::real_clock::now();
-          status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
-        }
-
-        if (write_status) {
-          map<string, bufferlist> attrs;
-          status.encode_all_attrs(attrs);
-          call(new RGWSimpleRadosWriteAttrsCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
-                                              obj, attrs, &objv_tracker, exclusive));
-        } else {
-          call(new RGWRadosRemoveCR(store, obj, &objv_tracker));
+        // whether or not to do full sync, incremental sync will follow anyway
+        if (sync_env->sync_module->should_full_sync()) {
+          const auto max_marker = marker_mgr.get(sync_pair.source_bs.shard_id, "");
+          status.inc_marker.position = max_marker;
         }
+        status.inc_marker.timestamp = ceph::real_clock::now();
+        status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
+
+        map<string, bufferlist> attrs;
+        status.encode_all_attrs(attrs);
+        call(new RGWSimpleRadosWriteAttrsCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
+                                            obj, attrs, &objv_tracker, exclusive));
+        ldout(cct, 20) << "init marker position: " << status.inc_marker.position << 
+          ". written to shard status object: " << sync_status_oid << dendl;
       }
-      if (info.syncstopped) {
-        retcode = -ENOENT;
-      }
+
       if (retcode < 0) {
         return set_cr_error(retcode);
       }
@@ -3007,7 +2990,7 @@ class InitBucketShardStatusCR : public RGWCoroutine {
   rgw_bucket_sync_pair_info pair;
   rgw_bucket_shard_sync_info status;
   RGWObjVersionTracker objv;
-  rgw_bucket_index_marker_info& info;
+  const uint64_t latest_gen;
   const BucketIndexShardsManager& marker_mgr;
 
   int tries = 10; // retry on racing writes
@@ -3017,15 +3000,15 @@ class InitBucketShardStatusCR : public RGWCoroutine {
  public:
   InitBucketShardStatusCR(RGWDataSyncCtx* sc,
                          const rgw_bucket_sync_pair_info& pair,
-                         rgw_bucket_index_marker_info& info,
+                         uint64_t latest_gen,
                          const BucketIndexShardsManager& marker_mgr)
-    : RGWCoroutine(sc->cct), sc(sc), pair(pair), info(info), marker_mgr(marker_mgr)
+    : RGWCoroutine(sc->cct), sc(sc), pair(pair), latest_gen(latest_gen), marker_mgr(marker_mgr)
   {}
   int operate(const DoutPrefixProvider *dpp) {
     reenter(this) {
       // try exclusive create with empty status
       objv.generate_new_write_ver(cct);
-      yield call(new InitCR(sc, pair, status, info, marker_mgr, objv, exclusive));
+      yield call(new InitCR(sc, pair, status, latest_gen, marker_mgr, objv, exclusive));
       if (retcode >= 0) {
         return set_cr_done();
       } else if (retcode != -EEXIST) {
@@ -3037,11 +3020,11 @@ class InitBucketShardStatusCR : public RGWCoroutine {
       while (--tries) {
         objv.clear();
         // read current status and objv
-        yield call(new ReadCR(sc, pair, &status, &objv, info.latest_gen));
+        yield call(new ReadCR(sc, pair, &status, &objv, latest_gen));
         if (retcode < 0) {
           return set_cr_error(retcode);
         }
-        yield call(new InitCR(sc, pair, status, info, marker_mgr, objv, exclusive));
+        yield call(new InitCR(sc, pair, status, latest_gen, marker_mgr, objv, exclusive));
         if (retcode >= 0) {
           return set_cr_done();
         } else if (retcode != -ECANCELED) {
@@ -3058,7 +3041,7 @@ class InitBucketShardStatusCollectCR : public RGWShardCollectCR {
   static constexpr int max_concurrent_shards = 16;
   RGWDataSyncCtx* sc;
   rgw_bucket_sync_pair_info sync_pair;
-  rgw_bucket_index_marker_info& info;
+  const uint64_t latest_gen;
   const BucketIndexShardsManager& marker_mgr;
 
   const int num_shards;
@@ -3074,11 +3057,11 @@ class InitBucketShardStatusCollectCR : public RGWShardCollectCR {
  public:
   InitBucketShardStatusCollectCR(RGWDataSyncCtx* sc,
                                  const rgw_bucket_sync_pair_info& sync_pair,
-                                 rgw_bucket_index_marker_info& info,
+                                 uint64_t latest_gen,
                                  const BucketIndexShardsManager& marker_mgr,
                                  int num_shards)
     : RGWShardCollectCR(sc->cct, max_concurrent_shards),
-      sc(sc), sync_pair(sync_pair), info(info), marker_mgr(marker_mgr), num_shards(num_shards)
+      sc(sc), sync_pair(sync_pair), latest_gen(latest_gen), marker_mgr(marker_mgr), num_shards(num_shards)
   {}
 
   bool spawn_next() override {
@@ -3086,7 +3069,75 @@ class InitBucketShardStatusCollectCR : public RGWShardCollectCR {
       return false;
     }
     sync_pair.source_bs.shard_id = shard++;
-    spawn(new InitBucketShardStatusCR(sc, sync_pair, info, marker_mgr), false);
+    spawn(new InitBucketShardStatusCR(sc, sync_pair, latest_gen, marker_mgr), false);
+    return true;
+  }
+};
+
+class RemoveBucketShardStatusCR : public RGWCoroutine {
+  RGWDataSyncCtx* const sc;
+  RGWDataSyncEnv* const sync_env;
+
+  rgw_bucket_sync_pair_info sync_pair;
+  rgw_raw_obj obj;
+  RGWObjVersionTracker objv;
+
+public:
+  RemoveBucketShardStatusCR(RGWDataSyncCtx* sc,
+                             const rgw_bucket_sync_pair_info& sync_pair, uint64_t latest_gen)
+    : RGWCoroutine(sc->cct), sc(sc), sync_env(sc->env),
+      sync_pair(sync_pair),
+      obj(sync_env->svc->zone->get_zone_params().log_pool, 
+          RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair, latest_gen))
+  {}
+
+  int operate(const DoutPrefixProvider *dpp) override {
+    reenter(this) {
+      yield call(new RGWRadosRemoveCR(sync_env->store, obj, &objv));
+                       if (retcode < 0 && retcode != -ENOENT) {
+        ldout(cct, 20) << "ERROR: failed to remove bucket shard status for: " << sync_pair << 
+          ". with error: " << retcode << dendl;
+        return set_cr_error(retcode);
+      }
+      ldout(cct, 20) << "removed bucket shard status object: " << obj.oid << dendl;
+      return set_cr_done();
+    }
+    return 0;
+  }
+};
+
+class RemoveBucketShardStatusCollectCR : public RGWShardCollectCR {
+  static constexpr int max_concurrent_shards = 16;
+  RGWDataSyncCtx* const sc;
+  RGWDataSyncEnv* const sync_env;
+  rgw_bucket_sync_pair_info sync_pair;
+  const uint64_t latest_gen;
+
+  const int num_shards;
+  int shard = 0;
+
+  int handle_result(int r) override {
+    if (r < 0) {
+      ldout(cct, 4) << "failed to remove bucket shard status object: "
+          << cpp_strerror(r) << dendl;
+    }
+    return r;
+  }
+ public:
+  RemoveBucketShardStatusCollectCR(RGWDataSyncCtx* sc,
+                                 const rgw_bucket_sync_pair_info& sync_pair,
+                                                                                                                                uint64_t latest_gen,
+                                 int num_shards)
+    : RGWShardCollectCR(sc->cct, max_concurrent_shards),
+      sc(sc), sync_env(sc->env), sync_pair(sync_pair), latest_gen(latest_gen), num_shards(num_shards)
+  {}
+
+  bool spawn_next() override {
+    if (shard >= num_shards || status < 0) { // stop spawning on any errors
+      return false;
+    }
+    sync_pair.source_bs.shard_id = shard++;
+    spawn(new RemoveBucketShardStatusCR(sc, sync_pair, latest_gen),  false);
     return true;
   }
 };
@@ -3102,7 +3153,7 @@ class InitBucketFullSyncStatusCR : public RGWCoroutine {
   const int num_shards;
   const bool check_compat;
 
-  rgw_bucket_index_marker_info info;
+  const rgw_bucket_index_marker_info& info;
   BucketIndexShardsManager marker_mgr;
 
   bool all_incremental = true;
@@ -3112,22 +3163,16 @@ public:
                              const rgw_raw_obj& status_obj,
                              rgw_bucket_sync_status& status,
                              RGWObjVersionTracker& objv,
-                             int num_shards, bool check_compat)
+                             int num_shards, bool check_compat,
+                             const rgw_bucket_index_marker_info& info)
     : RGWCoroutine(sc->cct), sc(sc), sync_env(sc->env),
       sync_pair(sync_pair), status_obj(status_obj),
       status(status), objv(objv), num_shards(num_shards),
-      check_compat(check_compat)
+      check_compat(check_compat), info(info)
   {}
 
   int operate(const DoutPrefixProvider *dpp) override {
     reenter(this) {
-      yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pair.dest_bucket, &info));
-      if (retcode < 0) {
-        lderr(cct) << "failed to read remote bilog info: "
-            << cpp_strerror(retcode) << dendl;
-        return set_cr_error(retcode);
-      }
-
       retcode = marker_mgr.from_string(info.max_marker, -1);
       if (retcode < 0) {
         lderr(cct) << "failed to parse bilog shard markers: "
@@ -3141,6 +3186,7 @@ public:
         if (check_compat) {
           // try to convert existing per-shard incremental status for backward compatibility
           yield call(new CheckAllBucketShardStatusIsIncremental(sc, sync_pair, num_shards, &all_incremental));
+          ldout(cct, 20) << "check for 'all incremental' in compatibility mode" << dendl;
           if (retcode < 0) {
             return set_cr_error(retcode);
           }
@@ -3154,7 +3200,7 @@ public:
       if (status.state != BucketSyncState::Incremental) {
         // initialize all shard sync status. this will populate the log marker
         // positions where incremental sync will resume after full sync
-        yield call(new InitBucketShardStatusCollectCR(sc, sync_pair, info, marker_mgr, num_shards));
+        yield call(new InitBucketShardStatusCollectCR(sc, sync_pair, info.latest_gen, marker_mgr, num_shards));
         if (retcode < 0) {
           ldout(cct, 20) << "failed to init bucket shard status: "
               << cpp_strerror(retcode) << dendl;
@@ -3171,7 +3217,7 @@ public:
       status.shards_done_with_gen.resize(num_shards);
       status.incremental_gen = info.latest_gen;
 
-      ldout(cct, 20) << "writing bucket sync state=" << status.state << dendl;
+      ldout(cct, 20) << "writing bucket sync status during init. state=" << status.state << ". marker=" << status.full.position.to_str() << dendl;
 
       // write bucket sync status
       using CR = RGWSimpleRadosWriteCR<rgw_bucket_sync_status>;
@@ -3188,12 +3234,12 @@ public:
   }
 };
 
-RGWCoroutine *RGWRemoteBucketManager::init_sync_status_cr(RGWObjVersionTracker& objv)
+RGWCoroutine *RGWRemoteBucketManager::init_sync_status_cr(RGWObjVersionTracker& objv, rgw_bucket_index_marker_info& info)
 {
   constexpr bool check_compat = false;
   const int num_shards = num_pipes();
   return new InitBucketFullSyncStatusCR(&sc, sync_pairs[0], full_status_obj,
-                                        full_status, objv, num_shards, check_compat);
+                                        full_status, objv, num_shards, check_compat, info);
 }
 
 #define OMAP_READ_MAX_ENTRIES 10
@@ -4302,12 +4348,13 @@ int RGWBucketShardIncrementalSyncCR::operate(const DoutPrefixProvider *dpp)
       for (; entries_iter != entries_end; ++entries_iter) {
         auto e = *entries_iter;
         if (e.op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP) {
-          ldpp_dout(dpp, 20) << "syncstop on " << e.timestamp << dendl;
+          ldpp_dout(dpp, 20) << "syncstop at: " << e.timestamp << ". marker: " << e.id << dendl;
           syncstopped = true;
           entries_end = std::next(entries_iter); // stop after this entry
           break;
         }
         if (e.op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
+          ldpp_dout(dpp, 20) << "syncstart at: " << e.timestamp << ". marker: " << e.id << dendl;
           continue;
         }
         if (e.op == CLS_RGW_OP_CANCEL) {
@@ -4688,7 +4735,7 @@ int RGWRunBucketSourcesSyncCR::operate(const DoutPrefixProvider *dpp)
   reenter(this) {
     yield call(new RGWGetBucketPeersCR(sync_env, target_bucket, sc->source_zone, source_bucket, &pipes, tn));
     if (retcode < 0 && retcode != -ENOENT) {
-      tn->log(0, "ERROR: failed to read sync status for bucket");
+      tn->log(0, SSTR("ERROR: failed to read sync status for bucket. error: " << retcode));
       return set_cr_error(retcode);
     }
 
@@ -4742,7 +4789,7 @@ int RGWRunBucketSourcesSyncCR::operate(const DoutPrefixProvider *dpp)
                            [&](uint64_t stack_id, int ret) {
                              handle_complete_stack(stack_id);
                              if (ret < 0) {
-                               tn->log(10, "a sync operation returned error");
+                               tn->log(10, SSTR("ERROR: a sync operation returned error: " << ret));
                              }
                              return ret;
                            });
@@ -4751,7 +4798,7 @@ int RGWRunBucketSourcesSyncCR::operate(const DoutPrefixProvider *dpp)
     drain_all_cb([&](uint64_t stack_id, int ret) {
                    handle_complete_stack(stack_id);
                    if (ret < 0) {
-                     tn->log(10, "a sync operation returned error");
+                     tn->log(10, SSTR("a sync operation returned error: " << ret));
                    }
                    return ret;
                  });
@@ -5046,7 +5093,7 @@ class RGWSyncBucketShardCR : public RGWCoroutine {
   boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
   rgw_bucket_sync_pair_info sync_pair;
   rgw_bucket_sync_pipe& sync_pipe;
-  BucketSyncState& bucket_state;
+  bool& bucket_stopped;
   uint64_t generation;
   ceph::real_time* progress;
 
@@ -5062,13 +5109,13 @@ public:
                        boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
                        const rgw_bucket_sync_pair_info& _sync_pair,
                        rgw_bucket_sync_pipe& sync_pipe,
-                       BucketSyncState& bucket_state,
+                       bool& bucket_stopped,
                        uint64_t generation,
                        const RGWSyncTraceNodeRef& tn,
                        ceph::real_time* progress)
     : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
       lease_cr(std::move(lease_cr)), sync_pair(_sync_pair),
-      sync_pipe(sync_pipe), bucket_state(bucket_state), generation(generation), progress(progress),
+      sync_pipe(sync_pipe), bucket_stopped(bucket_stopped), generation(generation), progress(progress),
       shard_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair, generation)),
       bucket_status_obj(sc->env->svc->zone->get_zone_params().log_pool,
                  RGWBucketPipeSyncStatusManager::full_status_oid(sc->source_zone,
@@ -5085,7 +5132,7 @@ int RGWSyncBucketShardCR::operate(const DoutPrefixProvider *dpp)
   reenter(this) {
     yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &sync_status, &objv_tracker, generation));
     if (retcode < 0 && retcode != -ENOENT) {
-      tn->log(0, "ERROR: failed to read sync status for bucket");
+      tn->log(0, SSTR("ERROR: failed to read sync status for bucket. error: " << retcode));
       return set_cr_error(retcode);
     }
 
@@ -5103,7 +5150,12 @@ int RGWSyncBucketShardCR::operate(const DoutPrefixProvider *dpp)
       tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode));
       return set_cr_error(retcode);
     }
-    // TODO: handle transition to StateStopped
+
+    if (sync_status.state == rgw_bucket_shard_sync_info::StateStopped) {
+      tn->log(20, SSTR("syncstopped indication for source bucket shard"));
+      bucket_stopped = true;
+    }
+
     return set_cr_done();
   }
 
@@ -5124,8 +5176,10 @@ class RGWSyncBucketCR : public RGWCoroutine {
   const uint32_t lock_duration;
   const rgw_raw_obj status_obj;
   rgw_bucket_sync_status bucket_status;
+  bool bucket_stopped = false;
   RGWObjVersionTracker objv;
   bool init_check_compat = false;
+  rgw_bucket_index_marker_info info;
 
   RGWSyncTraceNodeRef tn;
 
@@ -5162,6 +5216,9 @@ static RGWCoroutine* sync_bucket_shard_cr(RGWDataSyncCtx* sc,
                              gen, tn, progress);
 }
 
+#define RELEASE_LOCK(cr) \
+       if (cr) {cr->go_down(); drain_all(); cr.reset();}
+
 int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp)
 {
   reenter(this) {
@@ -5184,113 +5241,170 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp)
 
     // read bucket sync status
     using ReadCR = RGWSimpleRadosReadCR<rgw_bucket_sync_status>;
+    using WriteCR = RGWSimpleRadosWriteCR<rgw_bucket_sync_status>;
+
     yield call(new ReadCR(dpp, env->async_rados, env->svc->sysobj,
                           status_obj, &bucket_status, false, &objv));
     if (retcode == -ENOENT) {
       // use exclusive create to set state=Init
       objv.generate_new_write_ver(cct);
-      using WriteCR = RGWSimpleRadosWriteCR<rgw_bucket_sync_status>;
       yield call(new WriteCR(dpp, env->async_rados, env->svc->sysobj,
                              status_obj, bucket_status, &objv, true));
+      tn->log(20, "bucket status object does not exist, create a new one");
       if (retcode == -EEXIST) {
         // raced with another create, read its status
+        tn->log(20, "raced with another create, read its status");
         yield call(new ReadCR(dpp, env->async_rados, env->svc->sysobj,
                               status_obj, &bucket_status, false, &objv));
       }
     }
     if (retcode < 0) {
+      tn->log(20, SSTR("ERROR: failed to read bucket status object. error: " << retcode));
       return set_cr_error(retcode);
     }
 
     do {
-      tn->log(20, SSTR("sync status for source bucket: " << bucket_status.state));
+      tn->log(20, SSTR("sync status for source bucket: " << bucket_status.state << 
+            ". lease is: " << (bucket_lease_cr ? "taken" : "not taken") << ". stop indications is: " << bucket_stopped));
+
+      if (bucket_status.state == BucketSyncState::Init || 
+          bucket_status.state == BucketSyncState::Stopped ||
+          bucket_stopped) { 
+        // if state is Init or Stopped, we query the remote RGW for ther state
+                   yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pair.dest_bucket, &info));
+        if (retcode < 0) {
+          return set_cr_error(retcode);
+        }
+        if (info.syncstopped) {
+          // remote indicates stopped state
+          tn->log(20, "remote bilog indicates that sync was stopped");
+          if (!bucket_lease_cr) {
+            bucket_lease_cr.reset(new RGWContinuousLeaseCR(env->async_rados, env->store, status_obj,
+                                                         lock_name, lock_duration, this));
+            yield spawn(bucket_lease_cr.get(), false);
+            while (!bucket_lease_cr->is_locked()) {
+              if (bucket_lease_cr->is_done()) {
+                tn->log(5, "ERROR: failed to take bucket lease");
+                set_status("lease lock failed, early abort");
+                drain_all();
+                return set_cr_error(bucket_lease_cr->get_ret_status());
+              }
+              tn->log(5, "waiting on bucket lease");
+              yield set_sleeping(true);
+            }
+          }
+          
+          // check if local state is "stopped"
+          yield call(new ReadCR(dpp, env->async_rados, env->svc->sysobj,
+                status_obj, &bucket_status, false, &objv));
+          if (retcode < 0) {
+            tn->log(20, SSTR("ERROR: failed to read status before writing 'stopped'. error: " << retcode));
+            RELEASE_LOCK(bucket_lease_cr);
+            return set_cr_error(retcode);
+          }
+          if (bucket_status.state != BucketSyncState::Stopped) {
+            // make sure that state is changed to stopped localy
+            bucket_status.state = BucketSyncState::Stopped;
+            yield call(new WriteCR(dpp, env->async_rados, env->svc->sysobj,
+                  status_obj, bucket_status, &objv, false));
+            if (retcode < 0) {
+              tn->log(20, SSTR("ERROR: failed to write 'stopped' status. error: " << retcode));
+              RELEASE_LOCK(bucket_lease_cr);
+              return set_cr_error(retcode);
+            }
+          }
+          yield {
+            const int num_shards = sync_pipe.dest_bucket_info.layout.current_index.layout.normal.num_shards;
+            call(new RemoveBucketShardStatusCollectCR(sc, sync_pair, info.latest_gen, num_shards));
+          }
+          RELEASE_LOCK(bucket_lease_cr);
+          return set_cr_done();
+                         }
+        bucket_stopped = false;
+      }
 
-      // if the state wasn't Incremental, take a bucket-wide lease to prevent
-      // different shards from duplicating the init and full sync
       if (bucket_status.state != BucketSyncState::Incremental) {
-        assert(!bucket_lease_cr);
-        bucket_lease_cr.reset(new RGWContinuousLeaseCR(env->async_rados, env->store, status_obj,
+        // if the state wasn't Incremental, take a bucket-wide lease to prevent
+        // different shards from duplicating the init and full sync
+        if (!bucket_lease_cr) {
+          bucket_lease_cr.reset(new RGWContinuousLeaseCR(env->async_rados, env->store, status_obj,
                                                        lock_name, lock_duration, this));
-        yield spawn(bucket_lease_cr.get(), false);
-        while (!bucket_lease_cr->is_locked()) {
-          if (bucket_lease_cr->is_done()) {
-            tn->log(5, "failed to take bucket lease");
-            set_status("lease lock failed, early abort");
-            drain_all();
-            return set_cr_error(bucket_lease_cr->get_ret_status());
+          yield spawn(bucket_lease_cr.get(), false);
+          while (!bucket_lease_cr->is_locked()) {
+            if (bucket_lease_cr->is_done()) {
+              tn->log(5, "ERROR: failed to take bucket lease");
+              set_status("lease lock failed, early abort");
+              drain_all();
+              return set_cr_error(bucket_lease_cr->get_ret_status());
+            }
+            tn->log(5, "waiting on bucket lease");
+            yield set_sleeping(true);
           }
-          tn->log(5, "waiting on bucket lease");
-          yield set_sleeping(true);
         }
+
         // reread the status after acquiring the lock
         yield call(new ReadCR(dpp, env->async_rados, env->svc->sysobj,
-                              status_obj, &bucket_status, false, &objv));
+                            status_obj, &bucket_status, false, &objv));
         if (retcode < 0) {
-          bucket_lease_cr->go_down();
-          drain_all();
-          bucket_lease_cr.reset();
+          RELEASE_LOCK(bucket_lease_cr);       
+          tn->log(20, SSTR("ERROR: reading the status after acquiring the lock failed. error: " << retcode));
           return set_cr_error(retcode);
         }
-      }
 
-      if (bucket_status.state == BucketSyncState::Init ||
-          bucket_status.state == BucketSyncState::Stopped) {
-        assert(bucket_lease_cr);
         // init sync status
         yield {
           init_check_compat = objv.read_version.ver <= 1; // newly-created
-          int num_shards = sync_pipe.dest_bucket_info.layout.current_index.layout.normal.num_shards;
+          const int num_shards = sync_pipe.dest_bucket_info.layout.current_index.layout.normal.num_shards;
           call(new InitBucketFullSyncStatusCR(sc, sync_pair, status_obj,
                                               bucket_status, objv, num_shards,
-                                              init_check_compat));
+                                              init_check_compat, info));
         }
+
         if (retcode < 0) {
-          bucket_lease_cr->go_down();
-          drain_all();
-          bucket_lease_cr.reset();
+          tn->log(20, SSTR("ERROR: init full sync failed. error: " << retcode));
+          RELEASE_LOCK(bucket_lease_cr);
           return set_cr_error(retcode);
         }
       }
 
+      assert(bucket_status.state == BucketSyncState::Incremental || 
+          bucket_status.state == BucketSyncState::Full);
+
       if (bucket_status.state == BucketSyncState::Full) {
         assert(bucket_lease_cr);
         yield call(new RGWBucketFullSyncCR(sc, sync_pipe, status_obj,
                                            bucket_lease_cr, bucket_status,
                                            tn, objv));
         if (retcode < 0) {
-          bucket_lease_cr->go_down();
-          drain_all();
-          bucket_lease_cr.reset();
+          tn->log(20, SSTR("ERROR: full sync failed. error: " << retcode));
+          RELEASE_LOCK(bucket_lease_cr);
           return set_cr_error(retcode);
         }
       }
 
       if (bucket_status.state == BucketSyncState::Incremental) {
         // lease not required for incremental sync
-        if (bucket_lease_cr) {
-          bucket_lease_cr->go_down();
-          drain_all();
-          bucket_lease_cr.reset();
-        }
+        RELEASE_LOCK(bucket_lease_cr);
 
         // if a specific gen was requested, compare that to the sync status
         if (gen) {
           const auto current_gen = bucket_status.incremental_gen;
           if (*gen > current_gen) {
             retcode = -EAGAIN;
-            tn->log(10, SSTR("requested sync of future generation "
+            tn->log(10, SSTR("ERROR: requested sync of future generation "
                              << *gen << " > " << current_gen
                              << ", returning " << retcode << " for later retry"));
             return set_cr_error(retcode);
           } else if (*gen < current_gen) {
-            tn->log(10, SSTR("requested sync of past generation "
+            tn->log(10, SSTR("WARNING: requested sync of past generation "
                              << *gen << " < " << current_gen
                              << ", returning success"));
             return set_cr_done();
           }
         }
 
-        if (size_t(sync_pair.source_bs.shard_id) >= bucket_status.shards_done_with_gen.size()) {
+        assert(sync_pair.source_bs.shard_id >= 0);
+        if (static_cast<std::size_t>(sync_pair.source_bs.shard_id) >= bucket_status.shards_done_with_gen.size()) {
           tn->log(1, SSTR("bucket shard " << sync_pair.source_bs << " index out of bounds"));
           return set_cr_done(); // return success so we don't retry
         }
@@ -5301,14 +5415,15 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp)
         }
 
         yield call(new RGWSyncBucketShardCR(sc, data_lease_cr, sync_pair,
-                                            sync_pipe, bucket_status.state,
+                                            sync_pipe, bucket_stopped,
                                             bucket_status.incremental_gen, tn, progress));
         if (retcode < 0) {
+          tn->log(20, SSTR("ERROR: incremental sync failed. error: " << retcode));
           return set_cr_error(retcode);
         }
       }
       // loop back to previous states unless incremental sync returns normally
-    } while (bucket_status.state != BucketSyncState::Incremental);
+    } while (bucket_status.state != BucketSyncState::Incremental || bucket_stopped);
 
     return set_cr_done();
   }
@@ -5386,11 +5501,13 @@ int RGWBucketPipeSyncStatusManager::init_sync_status(const DoutPrefixProvider *d
   list<RGWCoroutinesStack *> stacks;
   // pass an empty objv tracker to each so that the version gets incremented
   std::list<RGWObjVersionTracker> objvs;
+  std::list<rgw_bucket_index_marker_info> infos;
 
   for (auto& mgr : source_mgrs) {
     RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
     objvs.emplace_back();
-    stack->call(mgr->init_sync_status_cr(objvs.back()));
+    infos.emplace_back();
+    stack->call(mgr->init_sync_status_cr(objvs.back(), infos.back()));
 
     stacks.push_back(stack);
   }
index e1b5b2d9cb0a5680a7b937477fc1a0eeaea4742f..5f45ecb2f3cbd63361a65095777dd6779095639d 100644 (file)
@@ -686,7 +686,7 @@ public:
                      const rgw_bucket& dest_bucket);
 
   RGWCoroutine *read_sync_status_cr(int num, rgw_bucket_shard_sync_info *sync_status);
-  RGWCoroutine *init_sync_status_cr(RGWObjVersionTracker& objv_tracker);
+  RGWCoroutine *init_sync_status_cr(RGWObjVersionTracker& objv_tracker, rgw_bucket_index_marker_info& info);
   RGWCoroutine *run_sync_cr(int num);
 
   int num_pipes() {