]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: bucket sync: track progress by stack id
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 25 Jun 2020 21:04:52 +0000 (14:04 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Thu, 16 Jul 2020 19:23:50 +0000 (12:23 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_data_sync.cc

index 082916d97ac87acada1d47d49a6121ea66d8cf02..300a5d862dd5e22cb25465b149e78b5dea8acbeb 100644 (file)
@@ -1217,8 +1217,10 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
 
   RGWSyncTraceNodeRef tn;
   ceph::real_time* progress;
-  std::vector<ceph::real_time> shard_progress;
-  std::vector<ceph::real_time>::iterator cur_shard_progress;
+  std::map<uint64_t, ceph::real_time> shard_progress;
+
+  ceph::real_time *cur_progress{nullptr};
+  std::optional<ceph::real_time> min_progress;
 
   RGWRESTConn *conn{nullptr};
   rgw_zone_id last_zone;
@@ -1241,6 +1243,25 @@ public:
                             ceph::real_time* progress);
 
   int operate() override;
+
+  void handle_complete_stack(uint64_t stack_id) {
+    auto iter = shard_progress.find(stack_id);
+    if (iter == shard_progress.end()) {
+      lderr(cct) << "ERROR: RGWRunBucketSourcesSyncCR::handle_complete_stack(): stack_id=" << stack_id << " not found! Likely a bug" << dendl;
+      return;
+    }
+    if (progress) {
+      if (!min_progress) {
+        min_progress = iter->second;
+      } else {
+        if (iter->second < *min_progress) {
+          min_progress = iter->second;
+        }
+      }
+    }
+
+    shard_progress.erase(stack_id);
+  }
 };
 
 class RGWDataSyncSingleEntryCR : public RGWCoroutine {
@@ -4332,9 +4353,7 @@ int RGWRunBucketSourcesSyncCR::operate()
 
       ldpp_dout(sync_env->dpp, 20) << __func__ << "(): num shards=" << num_shards << " cur_shard=" << cur_shard << dendl;
 
-      shard_progress.resize(num_shards);
-      cur_shard_progress = shard_progress.begin();
-      for (; num_shards > 0; --num_shards, ++cur_shard, ++cur_shard_progress) {
+      for (; num_shards > 0; --num_shards, ++cur_shard) {
         /*
          * use a negatvie shard_id for backward compatibility,
          * this affects the crafted status oid
@@ -4348,10 +4367,13 @@ int RGWRunBucketSourcesSyncCR::operate()
 
         ldpp_dout(sync_env->dpp, 20) << __func__ << "(): sync_pair=" << sync_pair << dendl;
 
+        cur_progress = (progress ? &shard_progress[prealloc_stack_id()] : nullptr);
+
         yield_spawn_window(new RGWRunBucketSyncCoroutine(sc, lease_cr, sync_pair, tn,
-                                                         &*cur_shard_progress),
+                                                         cur_progress),
                            BUCKET_SYNC_SPAWN_WINDOW,
                            [&](uint64_t stack_id, int ret) {
+                             handle_complete_stack(stack_id);
                              if (ret < 0) {
                                tn->log(10, "a sync operation returned error");
                              }
@@ -4360,13 +4382,14 @@ int RGWRunBucketSourcesSyncCR::operate()
       }
     }
     drain_all_cb([&](uint64_t stack_id, int ret) {
+                   handle_complete_stack(stack_id);
                    if (ret < 0) {
                      tn->log(10, "a sync operation returned error");
                    }
                    return ret;
                  });
-    if (progress) {
-      *progress = *std::min_element(shard_progress.begin(), shard_progress.end());
+    if (progress && min_progress) {
+      *progress = *min_progress;
     }
     return set_cr_done();
   }