]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add InitBucketFullSyncStatusCR
authorCasey Bodley <cbodley@redhat.com>
Tue, 6 Oct 2020 21:59:28 +0000 (17:59 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Mon, 31 Jan 2022 20:12:40 +0000 (15:12 -0500)
a coroutine to initialize a bucket for full sync using a new bucket-wide
sync status object

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h

index f66a0620c3afce24af8c105487c19c51ed5c79e2..119466e46b6e16a564d6d2a2a7901b98e5f1f43e 100644 (file)
@@ -2753,15 +2753,17 @@ class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
   rgw_bucket_shard_sync_info& status;
   RGWObjVersionTracker& objv_tracker;
   rgw_bucket_index_marker_info info;
+  bool exclusive;
 public:
   RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncCtx *_sc,
                                         const rgw_bucket_sync_pair_info& _sync_pair,
                                         rgw_bucket_shard_sync_info& _status,
-                                        RGWObjVersionTracker& objv_tracker)
+                                        RGWObjVersionTracker& objv_tracker,
+                                        bool exclusive)
     : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
       sync_pair(_sync_pair),
       sync_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, _sync_pair)),
-      status(_status), objv_tracker(objv_tracker)
+      status(_status), objv_tracker(objv_tracker), exclusive(exclusive)
   {}
 
   int operate(const DoutPrefixProvider *dpp) override {
@@ -2802,7 +2804,8 @@ public:
         if (write_status) {
           map<string, bufferlist> attrs;
           status.encode_all_attrs(attrs);
-          call(new RGWSimpleRadosWriteAttrsCR(dpp, sync_env->async_rados, sync_env->svc->sysobj, obj, attrs, &objv_tracker));
+          call(new RGWSimpleRadosWriteAttrsCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
+                                              obj, attrs, &objv_tracker, exclusive));
         } else {
           call(new RGWRadosRemoveCR(store, obj, &objv_tracker));
         }
@@ -2824,11 +2827,13 @@ RGWRemoteBucketManager::RGWRemoteBucketManager(const DoutPrefixProvider *_dpp,
                                                const rgw_zone_id& _source_zone,
                                                RGWRESTConn *_conn,
                                                const RGWBucketInfo& source_bucket_info,
-                                               const rgw_bucket& dest_bucket) : dpp(_dpp), sync_env(_sync_env)
+                                               const rgw_bucket& dest_bucket)
+  : dpp(_dpp), sync_env(_sync_env), conn(_conn), source_zone(_source_zone),
+    full_status_obj(sync_env->svc->zone->get_zone_params().log_pool,
+                    RGWBucketPipeSyncStatusManager::full_status_oid(source_zone,
+                                                                    source_bucket_info.bucket,
+                                                                    dest_bucket))
 {
-  conn = _conn;
-  source_zone = _source_zone;
-
   int num_shards = (source_bucket_info.layout.current_index.layout.normal.num_shards <= 0 ? 
                     1 : source_bucket_info.layout.current_index.layout.normal.num_shards);
 
@@ -2854,14 +2859,6 @@ RGWRemoteBucketManager::RGWRemoteBucketManager(const DoutPrefixProvider *_dpp,
   sc.init(sync_env, conn, source_zone);
 }
 
-RGWCoroutine *RGWRemoteBucketManager::init_sync_status_cr(int num, RGWObjVersionTracker& objv_tracker)
-{
-  if ((size_t)num >= sync_pairs.size()) {
-    return nullptr;
-  }
-  return new RGWInitBucketShardSyncStatusCoroutine(&sc, sync_pairs[num], init_status, objv_tracker);
-}
-
 #define BUCKET_SYNC_ATTR_PREFIX RGW_ATTR_PREFIX "bucket-sync."
 
 template <class T>
@@ -2960,6 +2957,234 @@ int RGWReadBucketPipeSyncStatusCoroutine::operate(const DoutPrefixProvider *dpp)
   return 0;
 }
 
+// wrap ReadSyncStatus and set a flag if it's not in incremental
+class CheckBucketShardStatusIsIncremental : public RGWReadBucketPipeSyncStatusCoroutine {
+  bool* result;
+  rgw_bucket_shard_sync_info status;
+ public:
+  CheckBucketShardStatusIsIncremental(RGWDataSyncCtx* sc,
+                                      const rgw_bucket_sync_pair_info& sync_pair,
+                                      bool* result)
+    : RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &status, nullptr),
+      result(result)
+  {}
+
+  int operate(const DoutPrefixProvider *dpp) override {
+    int r = RGWReadBucketPipeSyncStatusCoroutine::operate(dpp);
+    if (state == RGWCoroutine_Done &&
+        status.state != rgw_bucket_shard_sync_info::StateIncrementalSync) {
+      *result = false;
+    }
+    return r;
+  }
+};
+
+class CheckAllBucketShardStatusIsIncremental : public RGWShardCollectCR {
+  // start with 1 shard, and only spawn more if we detect an existing shard.
+  // this makes the backward compatilibility check far less expensive in the
+  // general case where no shards exist
+  static constexpr int initial_concurrent_shards = 1;
+  static constexpr int max_concurrent_shards = 16;
+
+  RGWDataSyncCtx* sc;
+  rgw_bucket_sync_pair_info sync_pair;
+  const int num_shards;
+  bool* result;
+  int shard = 0;
+ public:
+  CheckAllBucketShardStatusIsIncremental(RGWDataSyncCtx* sc,
+                                         const rgw_bucket_sync_pair_info& sync_pair,
+                                         int num_shards, bool* result)
+    : RGWShardCollectCR(sc->cct, initial_concurrent_shards),
+      sc(sc), sync_pair(sync_pair), num_shards(num_shards), result(result)
+  {}
+
+  bool spawn_next() override {
+    // stop spawning if we saw any errors or non-incremental shards
+    if (shard >= num_shards || status < 0 || !*result) {
+      return false;
+    }
+    sync_pair.dest_bs.shard_id = shard++;
+    spawn(new CheckBucketShardStatusIsIncremental(sc, sync_pair, result), false);
+    return true;
+  }
+
+ private:
+  int handle_result(int r) override {
+    if (r < 0) {
+      ldout(cct, 4) << "failed to read bucket shard status: "
+          << cpp_strerror(r) << dendl;
+    } else if (shard == 0) {
+      // enable concurrency once the first shard succeeds
+      max_concurrent = max_concurrent_shards;
+    }
+    return r;
+  }
+};
+
+// wrap InitBucketShardSyncStatus with local storage for 'status' and 'objv'
+// and a loop to retry on racing writes
+class InitBucketShardStatusCR : public RGWCoroutine {
+  RGWDataSyncCtx* sc;
+  const rgw_bucket_sync_pair_info& pair;
+  rgw_bucket_shard_sync_info status;
+  RGWObjVersionTracker objv;
+  int tries = 10; // retry on racing writes
+  bool exclusive = true; // first try is exclusive
+  using ReadCR = RGWReadBucketPipeSyncStatusCoroutine;
+  using InitCR = RGWInitBucketShardSyncStatusCoroutine;
+ public:
+  InitBucketShardStatusCR(RGWDataSyncCtx* sc, const rgw_bucket_sync_pair_info& pair)
+    : RGWCoroutine(sc->cct), sc(sc), pair(pair)
+  {}
+  int operate(const DoutPrefixProvider *dpp) {
+    reenter(this) {
+      // try exclusive create with empty status
+      objv.generate_new_write_ver(cct);
+      yield call(new InitCR(sc, pair, status, objv, exclusive));
+      if (retcode >= 0) {
+        return set_cr_done();
+      } else if (retcode != -EEXIST) {
+        return set_cr_error(retcode);
+      }
+
+      exclusive = false;
+      // retry loop to reinitialize
+      while (--tries) {
+        objv.clear();
+        // read current status and objv
+        yield call(new ReadCR(sc, pair, &status, &objv));
+        if (retcode < 0) {
+          return set_cr_error(retcode);
+        }
+        yield call(new InitCR(sc, pair, status, objv, exclusive));
+        if (retcode >= 0) {
+          return set_cr_done();
+        } else if (retcode != -ECANCELED) {
+          return set_cr_error(retcode);
+        }
+      }
+      return set_cr_error(retcode);
+    }
+    return 0;
+  }
+};
+
+class InitBucketShardStatusCollectCR : public RGWShardCollectCR {
+  static constexpr int max_concurrent_shards = 16;
+  RGWDataSyncCtx* sc;
+  rgw_bucket_sync_pair_info sync_pair;
+  const int num_shards;
+  int shard = 0;
+
+  int handle_result(int r) override {
+    if (r < 0) {
+      ldout(cct, 4) << "failed to init bucket shard status: "
+          << cpp_strerror(r) << dendl;
+    }
+    return r;
+  }
+ public:
+  InitBucketShardStatusCollectCR(RGWDataSyncCtx* sc,
+                                 const rgw_bucket_sync_pair_info& sync_pair,
+                                 int num_shards)
+    : RGWShardCollectCR(sc->cct, max_concurrent_shards),
+      sc(sc), sync_pair(sync_pair), num_shards(num_shards)
+  {}
+
+  bool spawn_next() override {
+    if (shard >= num_shards || status < 0) { // stop spawning on any errors
+      return false;
+    }
+    sync_pair.dest_bs.shard_id = shard++;
+    spawn(new InitBucketShardStatusCR(sc, sync_pair), false);
+    return true;
+  }
+};
+
+class InitBucketFullSyncStatusCR : public RGWCoroutine {
+  RGWDataSyncCtx *sc;
+  RGWDataSyncEnv *sync_env;
+
+  const rgw_bucket_sync_pair_info& sync_pair;
+  const rgw_raw_obj& status_obj;
+  rgw_bucket_sync_status& status;
+  RGWObjVersionTracker& objv;
+  const int num_shards;
+  const bool check_compat;
+
+  bool all_incremental = true;
+public:
+  InitBucketFullSyncStatusCR(RGWDataSyncCtx* sc,
+                             const rgw_bucket_sync_pair_info& sync_pair,
+                             const rgw_raw_obj& status_obj,
+                             rgw_bucket_sync_status& status,
+                             RGWObjVersionTracker& objv,
+                             int num_shards, bool check_compat)
+    : RGWCoroutine(sc->cct), sc(sc), sync_env(sc->env),
+      sync_pair(sync_pair), status_obj(status_obj),
+      status(status), objv(objv), num_shards(num_shards),
+      check_compat(check_compat)
+  {}
+
+  int operate(const DoutPrefixProvider *dpp) override {
+    reenter(this) {
+      status.state = BucketSyncState::Init;
+
+      if (check_compat) {
+        // try to convert existing per-shard incremental status for backward compatibility
+        yield call(new CheckAllBucketShardStatusIsIncremental(sc, sync_pair, num_shards, &all_incremental));
+        if (retcode < 0) {
+          return set_cr_error(retcode);
+        }
+        if (all_incremental) {
+          // we can use existing status and resume incremental sync
+          status.state = BucketSyncState::Incremental;
+        }
+      }
+
+      if (status.state != BucketSyncState::Incremental) {
+        // initialize all shard sync status. this will populate the log marker
+        // positions where incremental sync will resume after full sync
+        yield call(new InitBucketShardStatusCollectCR(sc, sync_pair, num_shards));
+        if (retcode < 0) {
+          ldout(cct, 20) << "failed to init bucket shard status: "
+              << cpp_strerror(retcode) << dendl;
+          return set_cr_error(retcode);
+        }
+
+        if (sync_env->sync_module->should_full_sync()) {
+          status.state = BucketSyncState::Full;
+        } else {
+          status.state = BucketSyncState::Incremental;
+        }
+      }
+
+      ldout(cct, 20) << "writing bucket sync state=" << status.state << dendl;
+
+      // write bucket sync status
+      using CR = RGWSimpleRadosWriteCR<rgw_bucket_sync_status>;
+      yield call(new CR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
+                        status_obj, status, &objv, false));
+      if (retcode < 0) {
+        ldout(cct, 20) << "failed to write bucket shard status: "
+            << cpp_strerror(retcode) << dendl;
+        return set_cr_error(retcode);
+      }
+      return set_cr_done();
+    }
+    return 0;
+  }
+};
+
+RGWCoroutine *RGWRemoteBucketManager::init_sync_status_cr(RGWObjVersionTracker& objv)
+{
+  constexpr bool check_compat = false;
+  const int num_shards = num_pipes();
+  return new InitBucketFullSyncStatusCR(&sc, sync_pairs[0], full_status_obj,
+                                        full_status, objv, num_shards, check_compat);
+}
+
 #define OMAP_READ_MAX_ENTRIES 10
 class RGWReadRecoveringBucketShardsCoroutine : public RGWCoroutine {
   RGWDataSyncCtx *sc;
@@ -4749,7 +4974,7 @@ int RGWSyncBucketShardCR::operate(const DoutPrefixProvider *dpp)
     do {
       if (sync_status.state == rgw_bucket_shard_sync_info::StateInit ||
           sync_status.state == rgw_bucket_shard_sync_info::StateStopped) {
-        yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pair, sync_status, objv_tracker));
+        yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pair, sync_status, objv_tracker, false));
         if (retcode == -ENOENT) {
           tn->log(0, "bucket sync disabled");
           drain_all();
@@ -4868,11 +5093,8 @@ int RGWBucketPipeSyncStatusManager::init_sync_status(const DoutPrefixProvider *d
 
   for (auto& mgr : source_mgrs) {
     RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
-
-    for (int i = 0; i < mgr->num_pipes(); ++i) {
-      objvs.emplace_back();
-      stack->call(mgr->init_sync_status_cr(i, objvs.back()));
-    }
+    objvs.emplace_back();
+    stack->call(mgr->init_sync_status_cr(objvs.back()));
 
     stacks.push_back(stack);
   }
index 2f33a7eb0e5489b2062204b1d1311377c17341a7..27885783533f2d89632c21471162e64ab4191698 100644 (file)
@@ -648,10 +648,12 @@ class RGWRemoteBucketManager {
   RGWRESTConn *conn{nullptr};
   rgw_zone_id source_zone;
 
+  rgw_raw_obj full_status_obj;
   std::vector<rgw_bucket_sync_pair_info> sync_pairs;
 
   RGWDataSyncCtx sc;
-  rgw_bucket_shard_sync_info init_status;
+  rgw_bucket_sync_status full_status;
+  rgw_bucket_shard_sync_info shard_status;
 
   RGWBucketSyncCR *sync_cr{nullptr};
 
@@ -662,12 +664,8 @@ public:
                      const RGWBucketInfo& source_bucket_info,
                      const rgw_bucket& dest_bucket);
 
-  void init(const rgw_zone_id& _source_zone, RGWRESTConn *_conn,
-            const rgw_bucket& source_bucket, int shard_id,
-            const rgw_bucket& dest_bucket);
-
   RGWCoroutine *read_sync_status_cr(int num, rgw_bucket_shard_sync_info *sync_status);
-  RGWCoroutine *init_sync_status_cr(int num, RGWObjVersionTracker& objv_tracker);
+  RGWCoroutine *init_sync_status_cr(RGWObjVersionTracker& objv_tracker);
   RGWCoroutine *run_sync_cr(int num);
 
   int num_pipes() {