omapkeys.reset();
#define INCREMENTAL_MAX_ENTRIES 100
- tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker));
- {
- spawned_keys.clear();
- yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated));
- if (retcode < 0) {
- tn->log(0, SSTR("ERROR: failed to read remote data log info: ret=" << retcode));
- stop_spawned_services();
- drain_all();
- return set_cr_error(retcode);
- }
+ tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker));
+ spawned_keys.clear();
+ yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated));
+ if (retcode < 0) {
+ tn->log(0, SSTR("ERROR: failed to read remote data log info: ret=" << retcode));
+ stop_spawned_services();
+ drain_all();
+ return set_cr_error(retcode);
+ }
- if (log_entries.size() > 0) {
- tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
- }
+ if (log_entries.size() > 0) {
+ tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
+ }
- for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
- tn->log(20, SSTR("shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key));
- if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
- tn->log(20, SSTR("skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard"));
- marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
- continue;
- }
- if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
- tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?"));
- } else {
- /*
- * don't spawn the same key more than once. We can do that as long as we don't yield
- */
- if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
- spawned_keys.insert(log_iter->entry.key);
- spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false, tn), false);
- if (retcode < 0) {
- stop_spawned_services();
- drain_all();
- return set_cr_error(retcode);
- }
+ for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
+ tn->log(20, SSTR("shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key));
+ if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
+ tn->log(20, SSTR("skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard"));
+ marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
+ continue;
+ }
+ if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
+ tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?"));
+ } else {
+ /*
+ * don't spawn the same key more than once. We can do that as long as we don't yield
+ */
+ if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
+ spawned_keys.insert(log_iter->entry.key);
+ spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false, tn), false);
+ if (retcode < 0) {
+ stop_spawned_services();
+ drain_all();
+ return set_cr_error(retcode);
}
}
- }
- while ((int)num_spawned() > spawn_window) {
- set_status() << "num_spawned() > spawn_window";
- yield wait_for_child();
- int ret;
- while (collect(&ret, lease_stack.get())) {
- if (ret < 0) {
- tn->log(10, "a sync operation returned error");
- /* we have reported this error */
- }
- /* not waiting for child here */
+ }
+ }
+ while ((int)num_spawned() > spawn_window) {
+ set_status() << "num_spawned() > spawn_window";
+ yield wait_for_child();
+ int ret;
+ while (collect(&ret, lease_stack.get())) {
+ if (ret < 0) {
+ tn->log(10, "a sync operation returned error");
+ /* we have reported this error */
}
+ /* not waiting for child here */
}
- }
- tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker << " truncated=" << truncated));
- if (!truncated) {
+ }
+
+ tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker << " truncated=" << truncated));
+ if (!truncated) {
// we reached the end, wait a while before checking for more
tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
- yield wait(get_idle_interval());
+ yield wait(get_idle_interval());
}
} while (true);
}