From: Yehuda Sadeh Date: Tue, 9 Jun 2020 22:38:00 +0000 (-0700) Subject: rgw: data sync: refactor boilerplate spawn window handling X-Git-Tag: v16.1.0~421^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d8d598d5314f08a73da7da0a2aa859351cc91636;p=ceph.git rgw: data sync: refactor boilerplate spawn window handling Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index deaf913559e5..5a1dbff17655 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1575,16 +1575,10 @@ public: } sync_marker.marker = iter->first; - while ((int)num_spawned() > spawn_window) { - set_status() << "num_spawned() > spawn_window"; - yield wait_for_child(); - int ret; - while (collect(&ret, lease_stack.get())) { - if (ret < 0) { - tn->log(10, "a sync operation returned error"); - } - } - } + drain_all_but_stack_cb(lease_stack.get(), + [&](int ret) { + tn->log(10, "a sync operation returned error"); + }); } } while (omapvals->more); omapvals.reset(); @@ -1726,18 +1720,11 @@ public: spawn(sync_single_entry(source_bs, log_iter->entry.key, log_iter->log_id, log_iter->log_timestamp, false), false); } - while ((int)num_spawned() > spawn_window) { - set_status() << "num_spawned() > spawn_window"; - yield wait_for_child(); - int ret; - while (collect(&ret, lease_stack.get())) { - if (ret < 0) { - tn->log(10, "a sync operation returned error"); - /* we have reported this error */ - } - /* not waiting for child here */ - } - } + + drain_all_but_stack_cb(lease_stack.get(), + [&](int ret) { + tn->log(10, "a sync operation returned error"); + }); } tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker @@ -3745,7 +3732,6 @@ public: int RGWBucketShardFullSyncCR::operate() { - int ret; reenter(this) { list_marker = sync_info.full_marker.position; @@ -3801,34 +3787,22 @@ int RGWBucketShardFullSyncCR::operate() entry->key, &marker_tracker, zones_trace, tn), false); } - while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) { - yield wait_for_child(); - bool again = true; - while (again) { - again = collect(&ret, nullptr); - if (ret < 0) { - tn->log(10, "a sync operation returned error"); - sync_status = ret; - /* we have reported this error */ - } - } - } + drain_with_cb(BUCKET_SYNC_SPAWN_WINDOW, + [&](int ret) { + tn->log(10, "a sync operation returned error"); + sync_status = ret; + return 0; + }); } } while (list_result.is_truncated && sync_status == 0); set_status("done iterating over all objects"); /* wait for all operations to complete */ - while (num_spawned()) { - yield wait_for_child(); - bool again = true; - while (again) { - again = collect(&ret, nullptr); - if (ret < 0) { - tn->log(10, "a sync operation returned error"); - sync_status = ret; - /* we have reported this error */ - } - } - } + + drain_all_cb([&](int ret) { + tn->log(10, "a sync operation returned error"); + sync_status = ret; + return 0; + }); tn->unset_flag(RGW_SNS_FLAG_ACTIVE); if (lease_cr && !lease_cr->is_locked()) { return set_cr_error(-ECANCELED); @@ -4106,36 +4080,20 @@ int RGWBucketShardIncrementalSyncCR::operate() false); } // } - while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) { - set_status() << "num_spawned() > spawn_window"; - yield wait_for_child(); - bool again = true; - while (again) { - again = collect(&ret, nullptr); - if (ret < 0) { - tn->log(10, "a sync operation returned error"); - sync_status = ret; - /* we have reported this error */ - } - /* not waiting for child here */ - } - } + drain_with_cb(BUCKET_SYNC_SPAWN_WINDOW, + [&](int ret) { + tn->log(10, "a sync operation returned error"); + sync_status = ret; + return 0; + }); } } while (!list_result.empty() && sync_status == 0 && !syncstopped); - while (num_spawned()) { - yield wait_for_child(); - bool again = true; - while (again) { - again = collect(&ret, nullptr); - if (ret < 0) { - tn->log(10, "a sync operation returned error"); - sync_status = ret; - /* we have reported this error */ - } - /* not waiting for child here */ - } - } + drain_all_cb([&](int ret) { + tn->log(10, "a sync operation returned error"); + sync_status = ret; + return 0; + }); tn->unset_flag(RGW_SNS_FLAG_ACTIVE); if (syncstopped) { @@ -4378,36 +4336,19 @@ int RGWRunBucketSourcesSyncCR::operate() ldpp_dout(sync_env->dpp, 20) << __func__ << "(): sync_pair=" << sync_pair << dendl; - yield spawn(new RGWRunBucketSyncCoroutine(sc, lease_cr, sync_pair, tn, - &*cur_shard_progress), false); - while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) { - set_status() << "num_spawned() > spawn_window"; - yield wait_for_child(); - again = true; - while (again) { - again = collect(&ret, nullptr); - if (ret < 0) { - tn->log(10, "a sync operation returned error"); - drain_all(); - return set_cr_error(ret); - } - } - } - } - } - while (num_spawned()) { - set_status() << "draining"; - yield wait_for_child(); - again = true; - while (again) { - again = collect(&ret, nullptr); - if (ret < 0) { - tn->log(10, "a sync operation returned error"); - drain_all(); - return set_cr_error(ret); - } + yield_spawn_window(new RGWRunBucketSyncCoroutine(sc, lease_cr, sync_pair, tn, + &*cur_shard_progress), + BUCKET_SYNC_SPAWN_WINDOW, + [&](int ret) { + tn->log(10, "a sync operation returned error"); + return ret; + }); } } + drain_all_cb([&](int ret) { + tn->log(10, "a sync operation returned error"); + return ret; + }); if (progress) { *progress = *std::min_element(shard_progress.begin(), shard_progress.end()); }