]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: InitBucketFullSyncStatusCR gets num shards from remote
authorAdam C. Emerson <aemerson@redhat.com>
Thu, 2 Sep 2021 21:36:09 +0000 (17:36 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Fri, 17 Sep 2021 17:19:15 +0000 (13:19 -0400)
As specified in rgw_bucket_index_marker_info, unless we're doing the
compatibility check, in which case we look at generation 0.

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h

index 43628191678191e62a564a5dd875b40f6b9d2dcb..4d3f62688b85433854f1a27190ebeffcae239904 100644 (file)
@@ -2812,7 +2812,8 @@ RGWRemoteBucketManager::RGWRemoteBucketManager(const DoutPrefixProvider *_dpp,
     full_status_obj(sync_env->svc->zone->get_zone_params().log_pool,
                     RGWBucketPipeSyncStatusManager::full_status_oid(source_zone,
                                                                     source_bucket_info.bucket,
-                                                                    dest_bucket))
+                                                                    dest_bucket)),
+    source_bucket_info(source_bucket_info)
 {
   rgw_bucket_index_marker_info remote_info;
   BucketIndexShardsManager remote_markers;
@@ -3140,24 +3141,27 @@ class InitBucketFullSyncStatusCR : public RGWCoroutine {
   const rgw_raw_obj& status_obj;
   rgw_bucket_sync_status& status;
   RGWObjVersionTracker& objv;
-  const int num_shards;
+  const RGWBucketInfo& source_info;
   const bool check_compat;
 
   const rgw_bucket_index_marker_info& info;
   BucketIndexShardsManager marker_mgr;
 
   bool all_incremental = true;
+  bool no_zero = false;
+
 public:
   InitBucketFullSyncStatusCR(RGWDataSyncCtx* sc,
                              const rgw_bucket_sync_pair_info& sync_pair,
                              const rgw_raw_obj& status_obj,
                              rgw_bucket_sync_status& status,
                              RGWObjVersionTracker& objv,
-                             int num_shards, bool check_compat,
+                            const RGWBucketInfo& source_info,
+                             bool check_compat,
                              const rgw_bucket_index_marker_info& info)
     : RGWCoroutine(sc->cct), sc(sc), sync_env(sc->env),
       sync_pair(sync_pair), status_obj(status_obj),
-      status(status), objv(objv), num_shards(num_shards),
+      status(status), objv(objv), source_info(source_info),
       check_compat(check_compat), info(info)
   {}
 
@@ -3173,27 +3177,50 @@ public:
       status.state = BucketSyncState::Init;
 
       if (info.oldest_gen == 0) {
-        if (check_compat) {
-          // try to convert existing per-shard incremental status for backward compatibility
-          yield call(new CheckAllBucketShardStatusIsIncremental(sc, sync_pair, num_shards, &all_incremental));
-          if (retcode < 0) {
-            return set_cr_error(retcode);
-          }
-          if (all_incremental) {
-            // we can use existing status and resume incremental sync
-            status.state = BucketSyncState::Incremental;
-          }
-        }
+       if (check_compat) {
+         // use shard count from our log gen=0
+         // try to convert existing per-shard incremental status for backward compatibility
+         if (source_info.layout.logs.front().gen > 0) {
+           ldpp_dout(dpp, 20) << "no generation zero when checking compatibility" << dendl;
+           no_zero = true;
+         }
+         if (auto& log = source_info.layout.logs.front();
+             log.layout.type != rgw::BucketLogType::InIndex) {
+           ldpp_dout(dpp, 20) << "unrecognized log layout type when checking compatibility " << log.layout.type << dendl;
+           no_zero = true;
+         }
+         if (!no_zero) {
+           yield {
+             const int num_shards0 =
+               source_info.layout.logs.front().layout.in_index.layout.num_shards;
+             call(new CheckAllBucketShardStatusIsIncremental(sc, sync_pair,
+                                                             num_shards0,
+                                                             &all_incremental));
+           }
+           if (retcode < 0) {
+             return set_cr_error(retcode);
+           }
+           if (all_incremental) {
+             // we can use existing status and resume incremental sync
+             status.state = BucketSyncState::Incremental;
+           }
+         } else {
+           all_incremental = false;
+         }
+       }
       }
 
       if (status.state != BucketSyncState::Incremental) {
-        // initialize all shard sync status. this will populate the log marker
+       // initialize all shard sync status. this will populate the log marker
         // positions where incremental sync will resume after full sync
-        yield call(new InitBucketShardStatusCollectCR(sc, sync_pair, info.latest_gen, marker_mgr, num_shards));
-        if (retcode < 0) {
+       yield {
+         const int num_shards = marker_mgr.get().size();
+         call(new InitBucketShardStatusCollectCR(sc, sync_pair, info.latest_gen, marker_mgr, num_shards));
+       }
+       if (retcode < 0) {
           ldout(cct, 20) << "failed to init bucket shard status: "
-              << cpp_strerror(retcode) << dendl;
-          return set_cr_error(retcode);
+                        << cpp_strerror(retcode) << dendl;
+         return set_cr_error(retcode);
         }
 
         if (sync_env->sync_module->should_full_sync()) {
@@ -3203,7 +3230,7 @@ public:
         }
       }
 
-      status.shards_done_with_gen.resize(num_shards);
+      status.shards_done_with_gen.resize(marker_mgr.get().size());
       status.incremental_gen = info.latest_gen;
 
       ldout(cct, 20) << "writing bucket sync status during init. state=" << status.state << ". marker=" << status.full.position.to_str() << dendl;
@@ -3211,7 +3238,7 @@ public:
       // write bucket sync status
       using CR = RGWSimpleRadosWriteCR<rgw_bucket_sync_status>;
       yield call(new CR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
-                        status_obj, status, &objv, false));
+                       status_obj, status, &objv, false));
       if (retcode < 0) {
         ldout(cct, 20) << "failed to write bucket shard status: "
             << cpp_strerror(retcode) << dendl;
@@ -3226,9 +3253,9 @@ public:
 RGWCoroutine *RGWRemoteBucketManager::init_sync_status_cr(RGWObjVersionTracker& objv, rgw_bucket_index_marker_info& info)
 {
   constexpr bool check_compat = false;
-  const int num_shards = num_pipes();
   return new InitBucketFullSyncStatusCR(&sc, sync_pairs[0], full_status_obj,
-                                        full_status, objv, num_shards, check_compat, info);
+                                        full_status, objv, source_bucket_info,
+                                       check_compat, info);
 }
 
 #define OMAP_READ_MAX_ENTRIES 10
@@ -3303,7 +3330,7 @@ class RGWReadPendingBucketShardsCoroutine : public RGWCoroutine {
   RGWDataSyncCtx *sc;
   RGWDataSyncEnv *sync_env;
   rgw::sal::RadosStore* store;
-  
+
   const int shard_id;
   int max_entries;
 
@@ -5171,8 +5198,6 @@ class RGWSyncBucketCR : public RGWCoroutine {
   rgw_bucket_index_marker_info info;
 
   RGWSyncTraceNodeRef tn;
-  rgw_bucket_index_marker_info remote_info;
-  BucketIndexShardsManager remote_markers;
 
 public:
   RGWSyncBucketCR(RGWDataSyncCtx *_sc,
@@ -5284,7 +5309,7 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp)
               yield set_sleeping(true);
             }
           }
-          
+
           // if state was incremental, remove all per-shard status objects
           if (bucket_status.state == BucketSyncState::Incremental) {
             yield {
@@ -5346,22 +5371,16 @@ int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp)
         yield call(new ReadCR(dpp, env->async_rados, env->svc->sysobj,
                             status_obj, &bucket_status, false, &objv));
         if (retcode < 0) {
-          RELEASE_LOCK(bucket_lease_cr);       
+          RELEASE_LOCK(bucket_lease_cr);
           tn->log(20, SSTR("ERROR: reading the status after acquiring the lock failed. error: " << retcode));
           return set_cr_error(retcode);
         }
         tn->log(20, SSTR("status after acquiring the lock is: " << bucket_status.state));
 
-        // init sync status
-        yield {
-          init_check_compat = objv.read_version.ver <= 1; // newly-created
-          rgw_read_remote_bilog_info(dpp, sc->conn, sync_pair.source_bs.bucket,
-                                      remote_info, remote_markers, null_yield);
-          const int num_shards = remote_markers.get().size();
-          call(new InitBucketFullSyncStatusCR(sc, sync_pair, status_obj,
-                                              bucket_status, objv, num_shards,
-                                              init_check_compat, info));
-        }
+       yield call(new InitBucketFullSyncStatusCR(sc, sync_pair, status_obj,
+                                                 bucket_status, objv,
+                                                 sync_pipe.source_bucket_info,
+                                                 init_check_compat, info));
 
         if (retcode < 0) {
           tn->log(20, SSTR("ERROR: init full sync failed. error: " << retcode));
index 5f45ecb2f3cbd63361a65095777dd6779095639d..7c2abed736acc6126201375a73e27b19b494c404 100644 (file)
@@ -674,10 +674,12 @@ class RGWRemoteBucketManager {
 
   RGWDataSyncCtx sc;
   rgw_bucket_sync_status full_status;
+  const RGWBucketInfo source_bucket_info;
   rgw_bucket_shard_sync_info shard_status;
 
   RGWBucketSyncCR *sync_cr{nullptr};
 
+
 public:
   RGWRemoteBucketManager(const DoutPrefixProvider *_dpp,
                      RGWDataSyncEnv *_sync_env,