]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/multisite: RunBucketSourcesSync no longer takes optional target
authorCasey Bodley <cbodley@redhat.com>
Mon, 22 Nov 2021 18:05:40 +0000 (13:05 -0500)
committerCasey Bodley <cbodley@redhat.com>
Fri, 27 May 2022 19:47:33 +0000 (15:47 -0400)
RGWDataSyncSingleEntryCR is the only caller of RGWRunBucketSourcesSyncCR

it always provides a source_bs, and never provides a target_bs. so remove
all the complexity related to target_bs, and the idea that we'd need to
sync several source bucket shards related to the target bucket

we now just have the single loop over the target buckets that use the
given bucket as a source

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_data_sync.cc

index 5ebb21909e5ec1522b27b5eae21cfd566e7f647c..4640cb942f8a268c0d08ab8fcb3407a35f1e7042 100644 (file)
@@ -1250,12 +1250,6 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
   boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
 
-  std::optional<rgw_bucket_shard> target_bs;
-  std::optional<rgw_bucket_shard> source_bs;
-
-  std::optional<rgw_bucket> target_bucket;
-  std::optional<rgw_bucket> source_bucket;
-
   rgw_sync_pipe_info_set pipes;
   rgw_sync_pipe_info_set::iterator siter;
 
@@ -1271,12 +1265,6 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
   RGWRESTConn *conn{nullptr};
   rgw_zone_id last_zone;
 
-  int source_num_shards{0};
-  int target_num_shards{0};
-
-  int num_shards{0};
-  int cur_shard{0};
-  bool again = false;
   std::optional<uint64_t> gen;
   rgw_bucket_index_marker_info marker_info;
   BucketIndexShardsManager marker_mgr;
@@ -1284,8 +1272,7 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
 public:
   RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
                             boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
-                            std::optional<rgw_bucket_shard> _target_bs,
-                            std::optional<rgw_bucket_shard> _source_bs,
+                            const rgw_bucket_shard& source_bs,
                             const RGWSyncTraceNodeRef& _tn_parent,
                            std::optional<uint64_t> gen,
                             ceph::real_time* progress);
@@ -1373,7 +1360,6 @@ public:
           ldout(cct, 4) << "starting sync on " << bucket_shard_str{state->key}
               << ' ' << *state->obligation << dendl;
           yield call(new RGWRunBucketSourcesSyncCR(sc, lease_cr,
-                                                   std::nullopt, /* target_bs */
                                                    state->key, tn,
                                                    state->obligation->gen,
                                                   &progress));
@@ -4732,39 +4718,32 @@ static RGWCoroutine* sync_bucket_shard_cr(RGWDataSyncCtx* sc,
 
 RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
                                                      boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
-                                                     std::optional<rgw_bucket_shard> _target_bs,
-                                                     std::optional<rgw_bucket_shard> _source_bs,
+                                                     const rgw_bucket_shard& source_bs,
                                                      const RGWSyncTraceNodeRef& _tn_parent,
                                                     std::optional<uint64_t> gen,
                                                      ceph::real_time* progress)
   : RGWCoroutine(_sc->env->cct), sc(_sc), sync_env(_sc->env),
-    lease_cr(std::move(lease_cr)), target_bs(_target_bs), source_bs(_source_bs),
+    lease_cr(std::move(lease_cr)),
     tn(sync_env->sync_tracer->add_node(
         _tn_parent, "bucket_sync_sources",
-        SSTR( "target=" << target_bucket.value_or(rgw_bucket()) <<
-              ":source_bucket=" << source_bucket.value_or(rgw_bucket()) <<
-              ":source_zone=" << sc->source_zone))),
+        SSTR( "source=" << source_bs << ":source_zone=" << sc->source_zone))),
     progress(progress),
     gen(gen)
 {
-  if (target_bs) {
-    target_bucket = target_bs->bucket;
-  }
-  if (source_bs) {
-    source_bucket = source_bs->bucket;
-  }
+  sync_pair.source_bs = source_bs;
 }
 
 int RGWRunBucketSourcesSyncCR::operate(const DoutPrefixProvider *dpp)
 {
   reenter(this) {
-    yield call(new RGWGetBucketPeersCR(sync_env, target_bucket, sc->source_zone, source_bucket, &pipes, tn));
+    yield call(new RGWGetBucketPeersCR(sync_env, std::nullopt, sc->source_zone,
+                                       sync_pair.source_bs.bucket, &pipes, tn));
     if (retcode < 0 && retcode != -ENOENT) {
       tn->log(0, SSTR("ERROR: failed to read sync status for bucket. error: " << retcode));
       return set_cr_error(retcode);
     }
 
-    ldpp_dout(dpp, 20) << __func__ << "(): requested source_bs=" << source_bs << " target_bs=" << target_bs << dendl;
+    ldpp_dout(dpp, 20) << __func__ << "(): requested source_bs=" << sync_pair.source_bs << dendl;
 
     if (pipes.empty()) {
       ldpp_dout(dpp, 20) << __func__ << "(): no relevant sync pipes found" << dendl;
@@ -4772,66 +4751,25 @@ int RGWRunBucketSourcesSyncCR::operate(const DoutPrefixProvider *dpp)
     }
 
     for (siter = pipes.begin(); siter != pipes.end(); ++siter) {
-      {
-        ldpp_dout(dpp, 20) << __func__ << "(): sync pipe=" << *siter << dendl;
-       yield call(new RGWReadRemoteBucketIndexLogInfoCR(
-                    sc, siter->source.get_bucket_info().bucket, &marker_info));
-       if (retcode < 0) {
-         tn->log(0, SSTR("ERROR: failed to fetch markers for bucket: "
-                         << siter->source.get_bucket_info().bucket));
-         return set_cr_error(retcode);
-       }
-       retcode = marker_mgr.from_string(marker_info.max_marker, -1);
-       if (retcode < 0) {
-         ldpp_dout(dpp, 0) << "ERROR: failed to decode markers for bucket: "
-                           << siter->source.get_bucket_info().bucket << dendl;
-         return set_cr_error(retcode);
-       }
-       source_num_shards = marker_mgr.get().size();
+      ldpp_dout(dpp, 20) << __func__ << "(): sync pipe=" << *siter << dendl;
 
-       target_num_shards = siter->target.get_bucket_info().layout.current_index.layout.normal.num_shards;
-        if (source_bs) {
-          sync_pair.source_bs = *source_bs;
-        } else {
-          sync_pair.source_bs.bucket = siter->source.get_bucket();
-        }
-        sync_pair.dest_bucket = siter->target.get_bucket();
+      sync_pair.dest_bucket = siter->target.get_bucket();
+      sync_pair.handler = siter->handler;
 
-        sync_pair.handler = siter->handler;
+      ldpp_dout(dpp, 20) << __func__ << "(): sync_pair=" << sync_pair << dendl;
 
-        if (sync_pair.source_bs.shard_id >= 0) {
-          num_shards = 1;
-          cur_shard = sync_pair.source_bs.shard_id;
-        } else {
-          num_shards = std::max<int>(1, source_num_shards);
-          cur_shard = std::min<int>(0, source_num_shards);
-        }
-      }
-
-      ldpp_dout(dpp, 20) << __func__ << "(): num shards=" << num_shards << " cur_shard=" << cur_shard << dendl;
+      cur_progress = (progress ? &shard_progress[prealloc_stack_id()] : nullptr);
 
-      for (; num_shards > 0; --num_shards, ++cur_shard) {
-        /*
-         * use a negatvie shard_id for backward compatibility,
-         * this affects the crafted status oid
-         */
-        sync_pair.source_bs.shard_id = (source_num_shards > 0 ? cur_shard : -1);
-
-        ldpp_dout(dpp, 20) << __func__ << "(): sync_pair=" << sync_pair << dendl;
-
-        cur_progress = (progress ? &shard_progress[prealloc_stack_id()] : nullptr);
-
-        yield_spawn_window(sync_bucket_shard_cr(sc, lease_cr, sync_pair,
-                                                gen, tn, cur_progress),
-                           BUCKET_SYNC_SPAWN_WINDOW,
-                           [&](uint64_t stack_id, int ret) {
-                             handle_complete_stack(stack_id);
-                             if (ret < 0) {
-                               tn->log(10, SSTR("ERROR: a sync operation returned error: " << ret));
-                             }
-                             return ret;
-                           });
-      }
+      yield_spawn_window(sync_bucket_shard_cr(sc, lease_cr, sync_pair,
+                                              gen, tn, cur_progress),
+                         BUCKET_SYNC_SPAWN_WINDOW,
+                         [&](uint64_t stack_id, int ret) {
+                           handle_complete_stack(stack_id);
+                           if (ret < 0) {
+                             tn->log(10, SSTR("ERROR: a sync operation returned error: " << ret));
+                           }
+                           return ret;
+                         });
     }
     drain_all_cb([&](uint64_t stack_id, int ret) {
                    handle_complete_stack(stack_id);