#define INCREMENTAL_MAX_ENTRIES 100
- ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " sync_marker=" << sync_marker.marker << dendl;
- {
- spawned_keys.clear();
- yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated));
- if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: failed to read remote data log info: ret=" << retcode << dendl;
- stop_spawned_services();
- drain_all();
- return set_cr_error(retcode);
+ ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " sync_marker=" << sync_marker.marker << dendl;
+ spawned_keys.clear();
+ yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to read remote data log info: ret=" << retcode << dendl;
+ stop_spawned_services();
+ drain_all();
+ return set_cr_error(retcode);
+ }
+ for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
+ ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl;
+ if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
+ ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl;
+ marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
+ continue;
}
- for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
- ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl;
- if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
- ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl;
- marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
- continue;
- }
- if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
- ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl;
- } else {
- /*
- * don't spawn the same key more than once. We can do that as long as we don't yield
- */
- if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
- spawned_keys.insert(log_iter->entry.key);
- spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false), false);
- if (retcode < 0) {
- stop_spawned_services();
- drain_all();
- return set_cr_error(retcode);
- }
+ if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
+ ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl;
+ } else {
+ /*
+ * don't spawn the same key more than once. We can do that as long as we don't yield
+ */
+ if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
+ spawned_keys.insert(log_iter->entry.key);
+ spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false), false);
+ if (retcode < 0) {
+ stop_spawned_services();
+ drain_all();
+ return set_cr_error(retcode);
}
}
- }
+ }
while ((int)num_spawned() > spawn_window) {
set_status() << "num_spawned() > spawn_window";
yield wait_for_child();
}
/* not waiting for child here */
}
+ /* not waiting for child here */
}
- }
- ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " sync_marker=" << sync_marker.marker << " truncated=" << truncated << dendl;
- if (!truncated) {
+ }
+ ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " sync_marker=" << sync_marker.marker << " truncated=" << truncated << dendl;
+ if (!truncated) {
#define INCREMENTAL_INTERVAL 20
- yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
- }
+ yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
+ }
} while (true);
}
return 0;