]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: data sync: refactor boilerplate spawn window handling
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 9 Jun 2020 22:38:00 +0000 (15:38 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Thu, 25 Jun 2020 19:15:12 +0000 (12:15 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_data_sync.cc

index deaf913559e5a5f2e5e8cb97e8c1e7964c85e61d..5a1dbff1765534048362e4ab83ad2153ad552726 100644 (file)
@@ -1575,16 +1575,10 @@ public:
           }
           sync_marker.marker = iter->first;
 
-          while ((int)num_spawned() > spawn_window) {
-            set_status() << "num_spawned() > spawn_window";
-            yield wait_for_child();
-            int ret;
-            while (collect(&ret, lease_stack.get())) {
-              if (ret < 0) {
-                tn->log(10, "a sync operation returned error");
-              }
-            }
-          }
+          drain_all_but_stack_cb(lease_stack.get(),
+                                 [&](int ret) {
+                                   tn->log(10, "a sync operation returned error");
+                                 });
         }
       } while (omapvals->more);
       omapvals.reset();
@@ -1726,18 +1720,11 @@ public:
             spawn(sync_single_entry(source_bs, log_iter->entry.key, log_iter->log_id,
                                     log_iter->log_timestamp, false), false);
           }
-          while ((int)num_spawned() > spawn_window) {
-            set_status() << "num_spawned() > spawn_window";
-            yield wait_for_child();
-            int ret;
-            while (collect(&ret, lease_stack.get())) {
-              if (ret < 0) {
-                tn->log(10, "a sync operation returned error");
-                /* we have reported this error */
-              }
-              /* not waiting for child here */
-            }
-          }
+
+          drain_all_but_stack_cb(lease_stack.get(),
+                                 [&](int ret) {
+                                   tn->log(10, "a sync operation returned error");
+                                 });
         }
 
         tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker
@@ -3745,7 +3732,6 @@ public:
 
 int RGWBucketShardFullSyncCR::operate()
 {
-  int ret;
   reenter(this) {
     list_marker = sync_info.full_marker.position;
 
@@ -3801,34 +3787,22 @@ int RGWBucketShardFullSyncCR::operate()
                                  entry->key, &marker_tracker, zones_trace, tn),
                       false);
         }
-        while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
-          yield wait_for_child();
-          bool again = true;
-          while (again) {
-            again = collect(&ret, nullptr);
-            if (ret < 0) {
-              tn->log(10, "a sync operation returned error");
-              sync_status = ret;
-              /* we have reported this error */
-            }
-          }
-        }
+        drain_with_cb(BUCKET_SYNC_SPAWN_WINDOW,
+                      [&](int ret) {
+                tn->log(10, "a sync operation returned error");
+                sync_status = ret;
+                return 0;
+              });
       }
     } while (list_result.is_truncated && sync_status == 0);
     set_status("done iterating over all objects");
     /* wait for all operations to complete */
-    while (num_spawned()) {
-      yield wait_for_child();
-      bool again = true;
-      while (again) {
-        again = collect(&ret, nullptr);
-        if (ret < 0) {
-          tn->log(10, "a sync operation returned error");
-          sync_status = ret;
-          /* we have reported this error */
-        }
-      }
-    }
+
+    drain_all_cb([&](int ret) {
+      tn->log(10, "a sync operation returned error");
+      sync_status = ret;
+      return 0;
+    });
     tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
     if (lease_cr && !lease_cr->is_locked()) {
       return set_cr_error(-ECANCELED);
@@ -4106,36 +4080,20 @@ int RGWBucketShardIncrementalSyncCR::operate()
                   false);
           }
         // }
-        while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
-          set_status() << "num_spawned() > spawn_window";
-          yield wait_for_child();
-          bool again = true;
-          while (again) {
-            again = collect(&ret, nullptr);
-            if (ret < 0) {
-              tn->log(10, "a sync operation returned error");
-              sync_status = ret;
-              /* we have reported this error */
-            }
-            /* not waiting for child here */
-          }
-        }
+        drain_with_cb(BUCKET_SYNC_SPAWN_WINDOW,
+                      [&](int ret) {
+                tn->log(10, "a sync operation returned error");
+                sync_status = ret;
+                return 0;
+              });
       }
     } while (!list_result.empty() && sync_status == 0 && !syncstopped);
 
-    while (num_spawned()) {
-      yield wait_for_child();
-      bool again = true;
-      while (again) {
-        again = collect(&ret, nullptr);
-        if (ret < 0) {
-          tn->log(10, "a sync operation returned error");
-          sync_status = ret;
-          /* we have reported this error */
-        }
-        /* not waiting for child here */
-      }
-    }
+    drain_all_cb([&](int ret) {
+      tn->log(10, "a sync operation returned error");
+      sync_status = ret;
+      return 0;
+    });
     tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
 
     if (syncstopped) {
@@ -4378,36 +4336,19 @@ int RGWRunBucketSourcesSyncCR::operate()
 
         ldpp_dout(sync_env->dpp, 20) << __func__ << "(): sync_pair=" << sync_pair << dendl;
 
-        yield spawn(new RGWRunBucketSyncCoroutine(sc, lease_cr, sync_pair, tn,
-                                                  &*cur_shard_progress), false);
-        while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
-          set_status() << "num_spawned() > spawn_window";
-          yield wait_for_child();
-          again = true;
-          while (again) {
-            again = collect(&ret, nullptr);
-            if (ret < 0) {
-              tn->log(10, "a sync operation returned error");
-              drain_all();
-              return set_cr_error(ret);
-            }
-          }
-        }
-      }
-    }
-    while (num_spawned()) {
-      set_status() << "draining";
-      yield wait_for_child();
-      again = true;
-      while (again) {
-        again = collect(&ret, nullptr);
-        if (ret < 0) {
-          tn->log(10, "a sync operation returned error");
-          drain_all();
-          return set_cr_error(ret);
-        }
+        yield_spawn_window(new RGWRunBucketSyncCoroutine(sc, lease_cr, sync_pair, tn,
+                                                         &*cur_shard_progress),
+                           BUCKET_SYNC_SPAWN_WINDOW,
+                           [&](int ret) {
+                             tn->log(10, "a sync operation returned error");
+                             return ret;
+                           });
       }
     }
+    drain_all_cb([&](int ret) {
+                   tn->log(10, "a sync operation returned error");
+                   return ret;
+                 });
     if (progress) {
       *progress = *std::min_element(shard_progress.begin(), shard_progress.end());
     }