}
sync_marker.marker = iter->first;
- while ((int)num_spawned() > spawn_window) {
- set_status() << "num_spawned() > spawn_window";
- yield wait_for_child();
- int ret;
- while (collect(&ret, lease_stack.get())) {
- if (ret < 0) {
- tn->log(10, "a sync operation returned error");
- }
- }
- }
+ drain_all_but_stack_cb(lease_stack.get(),
+ [&](int ret) {
+ tn->log(10, "a sync operation returned error");
+ });
}
} while (omapvals->more);
omapvals.reset();
spawn(sync_single_entry(source_bs, log_iter->entry.key, log_iter->log_id,
log_iter->log_timestamp, false), false);
}
- while ((int)num_spawned() > spawn_window) {
- set_status() << "num_spawned() > spawn_window";
- yield wait_for_child();
- int ret;
- while (collect(&ret, lease_stack.get())) {
- if (ret < 0) {
- tn->log(10, "a sync operation returned error");
- /* we have reported this error */
- }
- /* not waiting for child here */
- }
- }
+
+ drain_all_but_stack_cb(lease_stack.get(),
+ [&](int ret) {
+ tn->log(10, "a sync operation returned error");
+ });
}
tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker
int RGWBucketShardFullSyncCR::operate()
{
- int ret;
reenter(this) {
list_marker = sync_info.full_marker.position;
entry->key, &marker_tracker, zones_trace, tn),
false);
}
- while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
- yield wait_for_child();
- bool again = true;
- while (again) {
- again = collect(&ret, nullptr);
- if (ret < 0) {
- tn->log(10, "a sync operation returned error");
- sync_status = ret;
- /* we have reported this error */
- }
- }
- }
+ drain_with_cb(BUCKET_SYNC_SPAWN_WINDOW,
+ [&](int ret) {
+ tn->log(10, "a sync operation returned error");
+ sync_status = ret;
+ return 0;
+ });
}
} while (list_result.is_truncated && sync_status == 0);
set_status("done iterating over all objects");
/* wait for all operations to complete */
- while (num_spawned()) {
- yield wait_for_child();
- bool again = true;
- while (again) {
- again = collect(&ret, nullptr);
- if (ret < 0) {
- tn->log(10, "a sync operation returned error");
- sync_status = ret;
- /* we have reported this error */
- }
- }
- }
+
+ drain_all_cb([&](int ret) {
+ tn->log(10, "a sync operation returned error");
+ sync_status = ret;
+ return 0;
+ });
tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
if (lease_cr && !lease_cr->is_locked()) {
return set_cr_error(-ECANCELED);
false);
}
// }
- while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
- set_status() << "num_spawned() > spawn_window";
- yield wait_for_child();
- bool again = true;
- while (again) {
- again = collect(&ret, nullptr);
- if (ret < 0) {
- tn->log(10, "a sync operation returned error");
- sync_status = ret;
- /* we have reported this error */
- }
- /* not waiting for child here */
- }
- }
+ drain_with_cb(BUCKET_SYNC_SPAWN_WINDOW,
+ [&](int ret) {
+ tn->log(10, "a sync operation returned error");
+ sync_status = ret;
+ return 0;
+ });
}
} while (!list_result.empty() && sync_status == 0 && !syncstopped);
- while (num_spawned()) {
- yield wait_for_child();
- bool again = true;
- while (again) {
- again = collect(&ret, nullptr);
- if (ret < 0) {
- tn->log(10, "a sync operation returned error");
- sync_status = ret;
- /* we have reported this error */
- }
- /* not waiting for child here */
- }
- }
+ drain_all_cb([&](int ret) {
+ tn->log(10, "a sync operation returned error");
+ sync_status = ret;
+ return 0;
+ });
tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
if (syncstopped) {
ldpp_dout(sync_env->dpp, 20) << __func__ << "(): sync_pair=" << sync_pair << dendl;
- yield spawn(new RGWRunBucketSyncCoroutine(sc, lease_cr, sync_pair, tn,
- &*cur_shard_progress), false);
- while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
- set_status() << "num_spawned() > spawn_window";
- yield wait_for_child();
- again = true;
- while (again) {
- again = collect(&ret, nullptr);
- if (ret < 0) {
- tn->log(10, "a sync operation returned error");
- drain_all();
- return set_cr_error(ret);
- }
- }
- }
- }
- }
- while (num_spawned()) {
- set_status() << "draining";
- yield wait_for_child();
- again = true;
- while (again) {
- again = collect(&ret, nullptr);
- if (ret < 0) {
- tn->log(10, "a sync operation returned error");
- drain_all();
- return set_cr_error(ret);
- }
+ yield_spawn_window(new RGWRunBucketSyncCoroutine(sc, lease_cr, sync_pair, tn,
+ &*cur_shard_progress),
+ BUCKET_SYNC_SPAWN_WINDOW,
+ [&](int ret) {
+ tn->log(10, "a sync operation returned error");
+ return ret;
+ });
}
}
+ drain_all_cb([&](int ret) {
+ tn->log(10, "a sync operation returned error");
+ return ret;
+ });
if (progress) {
*progress = *std::min_element(shard_progress.begin(), shard_progress.end());
}