]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: move full sync from SyncBucketShard to SyncBucket
authorCasey Bodley <cbodley@redhat.com>
Tue, 6 Oct 2020 21:59:41 +0000 (17:59 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Mon, 31 Jan 2022 20:32:56 +0000 (15:32 -0500)
renamed ListBucketShardCR to ListRemoteBucketCR and removed the shard-id
parameter

renamed BucketFullSyncShardMarkerTrack to BucketFullSyncMarkerTrack,
which now updates the bucket-level rgw_bucket_sync_status

renamed BucketShardFullSyncCR to BucketFullSyncCR

BucketSyncCR now takes a bucket-wide lease during full sync

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_data_sync.cc

index 287b531e747b1ea177572671642a3595d8a629bf..3778784fb3481a4b74fa48951a922749678b629f 100644 (file)
@@ -3468,34 +3468,30 @@ struct bucket_list_result {
   }
 };
 
-class RGWListBucketShardCR: public RGWCoroutine {
+class RGWListRemoteBucketCR: public RGWCoroutine {
   RGWDataSyncCtx *sc;
   RGWDataSyncEnv *sync_env;
   const rgw_bucket_shard& bs;
-  const string instance_key;
   rgw_obj_key marker_position;
 
   bucket_list_result *result;
 
 public:
-  RGWListBucketShardCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& bs,
-                       rgw_obj_key& _marker_position, bucket_list_result *_result)
+  RGWListRemoteBucketCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& bs,
+                        rgw_obj_key& _marker_position, bucket_list_result *_result)
     : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), bs(bs),
-      instance_key(bs.get_key()), marker_position(_marker_position),
-      result(_result) {}
+      marker_position(_marker_position), result(_result) {}
 
   int operate(const DoutPrefixProvider *dpp) override {
     reenter(this) {
       yield {
-        rgw_http_param_pair pairs[] = { { "rgwx-bucket-instance", instance_key.c_str() },
-                                       { "versions" , NULL },
+        rgw_http_param_pair pairs[] = { { "versions" , NULL },
                                        { "format" , "json" },
                                        { "objs-container" , "true" },
                                        { "key-marker" , marker_position.name.c_str() },
                                        { "version-id-marker" , marker_position.instance.c_str() },
                                        { NULL, NULL } };
-        // don't include tenant in the url, it's already part of instance_key
-        string p = string("/") + bs.bucket.name;
+        string p = string("/") + bs.bucket.get_key(':', 0);
         call(new RGWReadRESTResourceCR<bucket_list_result>(sync_env->cct, sc->conn, sync_env->http_manager, p, pairs, result));
       }
       if (retcode < 0) {
@@ -3551,37 +3547,35 @@ public:
 
 #define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10
 
-class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj_key, rgw_obj_key> {
+class RGWBucketFullSyncMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj_key, rgw_obj_key> {
   RGWDataSyncCtx *sc;
   RGWDataSyncEnv *sync_env;
 
-  string marker_oid;
-  rgw_bucket_shard_full_sync_marker sync_marker;
+  const rgw_raw_obj& status_obj;
+  rgw_bucket_sync_status& sync_status;
   RGWSyncTraceNodeRef tn;
   RGWObjVersionTracker& objv_tracker;
 
 public:
-  RGWBucketFullSyncShardMarkerTrack(RGWDataSyncCtx *_sc,
-                                    const string& _marker_oid,
-                                    const rgw_bucket_shard_full_sync_marker& _marker,
-                                    RGWSyncTraceNodeRef tn,
-                                    RGWObjVersionTracker& objv_tracker)
+  RGWBucketFullSyncMarkerTrack(RGWDataSyncCtx *_sc,
+                               const rgw_raw_obj& status_obj,
+                               rgw_bucket_sync_status& sync_status,
+                               RGWSyncTraceNodeRef tn,
+                               RGWObjVersionTracker& objv_tracker)
     : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
-      sc(_sc), sync_env(_sc->env), marker_oid(_marker_oid),
-      sync_marker(_marker), tn(std::move(tn)), objv_tracker(objv_tracker)
+      sc(_sc), sync_env(_sc->env), status_obj(status_obj),
+      sync_status(sync_status), tn(std::move(tn)), objv_tracker(objv_tracker)
   {}
 
-  RGWCoroutine* store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
-    sync_marker.position = new_marker;
-    sync_marker.count = index_pos;
 
-    map<string, bufferlist> attrs;
-    sync_marker.encode_attr(attrs);
+  RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
+    sync_status.full.position = new_marker;
+    sync_status.full.count = index_pos;
 
-    tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
-    return new RGWSimpleRadosWriteAttrsCR(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj,
-                                          rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, marker_oid),
-                                          attrs, &objv_tracker);
+    tn->log(20, SSTR("updating marker oid=" << status_obj.oid << " marker=" << new_marker));
+    return new RGWSimpleRadosWriteCR<rgw_bucket_sync_status>(
+        sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj,
+       status_obj, sync_status, &objv_tracker);
   }
 
   RGWOrderCallCR *allocate_order_control_cr() override {
@@ -3880,28 +3874,29 @@ done:
 
 #define BUCKET_SYNC_SPAWN_WINDOW 20
 
-class RGWBucketShardFullSyncCR : public RGWCoroutine {
+class RGWBucketFullSyncCR : public RGWCoroutine {
   RGWDataSyncCtx *sc;
   RGWDataSyncEnv *sync_env;
   rgw_bucket_sync_pipe& sync_pipe;
+  rgw_bucket_sync_status& sync_status;
   rgw_bucket_shard& bs;
   boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
   bucket_list_result list_result;
   list<bucket_list_entry>::iterator entries_iter;
-  rgw_bucket_shard_sync_info& sync_info;
   rgw_obj_key list_marker;
   bucket_list_entry *entry{nullptr};
 
   int total_entries{0};
 
-  int sync_status{0};
+  int sync_result{0};
 
-  const string& status_oid;
+  const rgw_raw_obj& status_obj;
+  RGWObjVersionTracker& objv;
 
   rgw_zone_set zones_trace;
 
   RGWSyncTraceNodeRef tn;
-  RGWBucketFullSyncShardMarkerTrack marker_tracker;
+  RGWBucketFullSyncMarkerTrack marker_tracker;
 
   struct _prefix_handler {
     RGWBucketSyncFlowManager::pipe_rules_ref rules;
@@ -3948,20 +3943,20 @@ class RGWBucketShardFullSyncCR : public RGWCoroutine {
   } prefix_handler;
 
 public:
-  RGWBucketShardFullSyncCR(RGWDataSyncCtx *_sc,
-                           rgw_bucket_sync_pipe& _sync_pipe,
-                           const std::string& status_oid,
-                           boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
-                           rgw_bucket_shard_sync_info& sync_info,
-                           RGWSyncTraceNodeRef tn_parent,
-                           RGWObjVersionTracker& objv_tracker)
+  RGWBucketFullSyncCR(RGWDataSyncCtx *_sc,
+                      rgw_bucket_sync_pipe& _sync_pipe,
+                      const rgw_raw_obj& status_obj,
+                      boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
+                      rgw_bucket_sync_status& sync_status,
+                      RGWSyncTraceNodeRef tn_parent,
+                      RGWObjVersionTracker& objv_tracker)
     : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
-      sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
-      lease_cr(std::move(lease_cr)), sync_info(sync_info),
-      status_oid(status_oid),
+      sync_pipe(_sync_pipe), sync_status(sync_status),
+      bs(_sync_pipe.info.source_bs),
+      lease_cr(std::move(lease_cr)), status_obj(status_obj), objv(objv_tracker),
       tn(sync_env->sync_tracer->add_node(tn_parent, "full_sync",
                                          SSTR(bucket_shard_str{bs}))),
-      marker_tracker(sc, status_oid, sync_info.full_marker, tn, objv_tracker)
+      marker_tracker(sc, status_obj, sync_status, tn, objv_tracker)
   {
     zones_trace.insert(sc->source_zone.id, sync_pipe.info.dest_bs.bucket.get_key());
     prefix_handler.set_rules(sync_pipe.get_rules());
@@ -3970,12 +3965,12 @@ public:
   int operate(const DoutPrefixProvider *dpp) override;
 };
 
-int RGWBucketShardFullSyncCR::operate(const DoutPrefixProvider *dpp)
+int RGWBucketFullSyncCR::operate(const DoutPrefixProvider *dpp)
 {
   reenter(this) {
-    list_marker = sync_info.full_marker.position;
+    list_marker = sync_status.full.position;
 
-    total_entries = sync_info.full_marker.count;
+    total_entries = sync_status.full.count;
     do {
       if (lease_cr && !lease_cr->is_locked()) {
         drain_all();
@@ -3990,8 +3985,7 @@ int RGWBucketShardFullSyncCR::operate(const DoutPrefixProvider *dpp)
         break;
       }
 
-      yield call(new RGWListBucketShardCR(sc, bs, list_marker,
-                                          &list_result));
+      yield call(new RGWListRemoteBucketCR(sc, bs, list_marker, &list_result));
       if (retcode < 0 && retcode != -ENOENT) {
         set_status("failed bucket listing, going down");
         drain_all();
@@ -4031,19 +4025,19 @@ int RGWBucketShardFullSyncCR::operate(const DoutPrefixProvider *dpp)
                       [&](uint64_t stack_id, int ret) {
                 if (ret < 0) {
                   tn->log(10, "a sync operation returned error");
-                  sync_status = ret;
+                  sync_result = ret;
                 }
                 return 0;
               });
       }
-    } while (list_result.is_truncated && sync_status == 0);
+    } while (list_result.is_truncated && sync_result == 0);
     set_status("done iterating over all objects");
-    /* wait for all operations to complete */
 
+    /* wait for all operations to complete */
     drain_all_cb([&](uint64_t stack_id, int ret) {
       if (ret < 0) {
         tn->log(10, "a sync operation returned error");
-        sync_status = ret;
+        sync_result = ret;
       }
       return 0;
     });
@@ -4051,26 +4045,29 @@ int RGWBucketShardFullSyncCR::operate(const DoutPrefixProvider *dpp)
     if (lease_cr && !lease_cr->is_locked()) {
       return set_cr_error(-ECANCELED);
     }
+    yield call(marker_tracker.flush());
+    if (retcode < 0) {
+      tn->log(0, SSTR("ERROR: marker_tracker.flush() returned retcode=" << retcode));
+      return set_cr_error(retcode);
+    }
     /* update sync state to incremental */
-    if (sync_status == 0) {
-      yield {
-        sync_info.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
-        map<string, bufferlist> attrs;
-        sync_info.encode_state_attr(attrs);
-        call(new RGWSimpleRadosWriteAttrsCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
-                                            rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, status_oid),
-                                            attrs));
-      }
+    if (sync_result == 0) {
+      sync_status.state = BucketSyncState::Incremental;
+      tn->log(5, SSTR("set bucket state=" << sync_status.state));
+      yield call(new RGWSimpleRadosWriteCR<rgw_bucket_sync_status>(
+             dpp, sync_env->async_rados, sync_env->svc->sysobj,
+              status_obj, sync_status, &objv));
+      tn->log(5, SSTR("bucket status objv=" << objv));
     } else {
-      tn->log(10, SSTR("backing out with sync_status=" << sync_status));
+      tn->log(10, SSTR("backing out with sync_status=" << sync_result));
     }
-    if (retcode < 0 && sync_status == 0) { /* actually tried to set incremental state and failed */
+    if (retcode < 0 && sync_result == 0) { /* actually tried to set incremental state and failed */
       tn->log(0, SSTR("ERROR: failed to set sync state on bucket "
           << bucket_shard_str{bs} << " retcode=" << retcode));
       return set_cr_error(retcode);
     }
-    if (sync_status < 0) {
-      return set_cr_error(sync_status);
+    if (sync_result < 0) {
+      return set_cr_error(sync_result);
     }
     return set_cr_done();
   }
@@ -4901,6 +4898,7 @@ class RGWSyncBucketShardCR : public RGWCoroutine {
   boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
   rgw_bucket_sync_pair_info sync_pair;
   rgw_bucket_sync_pipe& sync_pipe;
+  BucketSyncState& bucket_state;
   ceph::real_time* progress;
 
   const std::string status_oid;
@@ -4914,10 +4912,12 @@ public:
                        boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
                        const rgw_bucket_sync_pair_info& _sync_pair,
                        rgw_bucket_sync_pipe& sync_pipe,
+                       BucketSyncState& bucket_state,
                        const RGWSyncTraceNodeRef& tn,
                        ceph::real_time* progress)
     : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
-      lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), sync_pipe(sync_pipe), progress(progress),
+      lease_cr(std::move(lease_cr)), sync_pair(_sync_pair),
+      sync_pipe(sync_pipe), bucket_state(bucket_state), progress(progress),
       status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair)),
       tn(tn) {
   }
@@ -4931,57 +4931,24 @@ int RGWSyncBucketShardCR::operate(const DoutPrefixProvider *dpp)
     yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &sync_status, &objv_tracker));
     if (retcode < 0 && retcode != -ENOENT) {
       tn->log(0, "ERROR: failed to read sync status for bucket");
-      drain_all();
       return set_cr_error(retcode);
     }
 
-    tn->log(20, SSTR("sync status for source bucket: " << sync_status.state));
-
-    do {
-      if (sync_status.state == rgw_bucket_shard_sync_info::StateInit ||
-          sync_status.state == rgw_bucket_shard_sync_info::StateStopped) {
-        yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pair, sync_status, objv_tracker, false));
-        if (retcode == -ENOENT) {
-          tn->log(0, "bucket sync disabled");
-          drain_all();
-          return set_cr_done();
-        }
-        if (retcode < 0) {
-          tn->log(0, SSTR("ERROR: init sync on bucket failed, retcode=" << retcode));
-          drain_all();
-          return set_cr_error(retcode);
-        }
-      }
-      if (progress) {
-        *progress = sync_status.inc_marker.timestamp;
-      }
-
-      if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
-        yield call(new RGWBucketShardFullSyncCR(sc, sync_pipe,
-                                                status_oid, lease_cr,
-                                                sync_status, tn, objv_tracker));
-        if (retcode < 0) {
-          tn->log(5, SSTR("full sync on bucket failed, retcode=" << retcode));
-          drain_all();
-          return set_cr_error(retcode);
-        }
-      }
-
-      if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
-        yield call(new RGWBucketShardIncrementalSyncCR(sc, sync_pipe,
-                                                       status_oid, lease_cr,
-                                                       sync_status, tn,
-                                                       objv_tracker, progress));
-        if (retcode < 0) {
-          tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode));
-          drain_all();
-          return set_cr_error(retcode);
-        }
-      }
-      // loop back to previous states unless incremental sync returns normally
-    } while (sync_status.state != rgw_bucket_shard_sync_info::StateIncrementalSync);
+    tn->log(20, SSTR("sync status for source bucket shard: " << sync_status.state));
+    sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
+    if (progress) {
+      *progress = sync_status.inc_marker.timestamp;
+    }
 
-    drain_all();
+    yield call(new RGWBucketShardIncrementalSyncCR(sc, sync_pipe,
+                                                   status_oid, lease_cr,
+                                                   sync_status, tn,
+                                                   objv_tracker, progress));
+    if (retcode < 0) {
+      tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode));
+      return set_cr_error(retcode);
+    }
+    // TODO: handle transition to StateStopped
     return set_cr_done();
   }
 
@@ -4991,7 +4958,8 @@ int RGWSyncBucketShardCR::operate(const DoutPrefixProvider *dpp)
 class RGWSyncBucketCR : public RGWCoroutine {
   RGWDataSyncCtx *sc;
   RGWDataSyncEnv *env;
-  boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
+  boost::intrusive_ptr<const RGWContinuousLeaseCR> data_lease_cr;
+  boost::intrusive_ptr<RGWContinuousLeaseCR> bucket_lease_cr;
   rgw_bucket_sync_pair_info sync_pair;
   rgw_bucket_sync_pipe sync_pipe;
   ceph::real_time* progress;
@@ -4999,6 +4967,7 @@ class RGWSyncBucketCR : public RGWCoroutine {
   const rgw_raw_obj status_obj;
   rgw_bucket_sync_status bucket_status;
   RGWObjVersionTracker objv;
+  bool init_check_compat = false;
 
   RGWSyncTraceNodeRef tn;
 
@@ -5009,7 +4978,7 @@ public:
                   const RGWSyncTraceNodeRef& _tn_parent,
                   ceph::real_time* progress)
     : RGWCoroutine(_sc->cct), sc(_sc), env(_sc->env),
-      lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), progress(progress),
+      data_lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), progress(progress),
       status_obj(env->svc->zone->get_zone_params().log_pool,
                  RGWBucketPipeSyncStatusManager::full_status_oid(sc->source_zone,
                                                                  sync_pair.source_bs.bucket,
@@ -5058,34 +5027,67 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp)
       return set_cr_error(retcode);
     }
 
-    if (bucket_status.state == BucketSyncState::Init) {
-      // init sync status
-      yield {
-       // use exclusive create if it didn't exist. if we lose the race to
-       // create it, we'll fail with EEXIST and RGWSyncBucketCR() will come
-       // back later and read the new status
-       const bool exclusive = objv.version_for_check() == nullptr;
-        int num_shards = sync_pipe.dest_bucket_info.layout.current_index.layout.normal.num_shards;
-        call(new InitBucketFullSyncStatusCR(sc, sync_pair, status_obj,
-                                            bucket_status, objv, num_shards,
-                                            exclusive));
-      }
-      if (retcode < 0) {
-        return set_cr_error(retcode);
+    do {
+      tn->log(20, SSTR("sync status for source bucket: " << bucket_status.state));
+
+      if (bucket_status.state == BucketSyncState::Init ||
+          bucket_status.state == BucketSyncState::Stopped) {
+        // init sync status
+        yield {
+          init_check_compat = objv.read_version.ver <= 1; // newly-created
+          int num_shards = sync_pipe.dest_bucket_info.layout.current_index.layout.normal.num_shards;
+          call(new InitBucketFullSyncStatusCR(sc, sync_pair, status_obj,
+                                              bucket_status, objv, num_shards,
+                                              init_check_compat));
+        }
+        if (retcode < 0) {
+          return set_cr_error(retcode);
+        }
       }
-    }
 
-    if (bucket_status.state == BucketSyncState::Full) {
-      // TODO: full sync
-    }
+      if (bucket_status.state == BucketSyncState::Full) {
+        // take a bucket-wide lease for full sync to prevent the bucket shards
+        // from duplicating the work
+        yield {
+          const std::string lock_name = "full sync";
+          const uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
+          bucket_lease_cr.reset(new RGWContinuousLeaseCR(env->async_rados, env->store, status_obj,
+                                                         lock_name, lock_duration, this));
+          spawn(bucket_lease_cr.get(), false);
+        }
+        while (!bucket_lease_cr->is_locked()) {
+          if (bucket_lease_cr->is_done()) {
+            tn->log(5, "failed to take bucket lease");
+            set_status("lease lock failed, early abort");
+            drain_all();
+            return set_cr_error(bucket_lease_cr->get_ret_status());
+          }
+          tn->log(5, "waiting on bucket lease");
+          yield set_sleeping(true);
+        }
 
-    if (bucket_status.state == BucketSyncState::Incremental) {
-      yield call(new RGWSyncBucketShardCR(sc, lease_cr, sync_pair,
-                                          sync_pipe, tn, progress));
-      if (retcode < 0) {
-        return set_cr_error(retcode);
+        yield call(new RGWBucketFullSyncCR(sc, sync_pipe, status_obj,
+                                           bucket_lease_cr, bucket_status,
+                                           tn, objv));
+        bucket_lease_cr->go_down();
+        drain_all();
+        bucket_lease_cr.reset();
+        if (retcode < 0) {
+          return set_cr_error(retcode);
+        }
       }
-    }
+
+      if (bucket_status.state == BucketSyncState::Incremental) {
+        yield call(new RGWSyncBucketShardCR(sc, data_lease_cr, sync_pair,
+                                            sync_pipe, bucket_status.state,
+                                            tn, progress));
+        if (retcode < 0) {
+          return set_cr_error(retcode);
+        }
+      }
+      // loop back to previous states unless incremental sync returns normally
+    } while (bucket_status.state != BucketSyncState::Incremental);
+
     return set_cr_done();
   }