}
};
+#define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
+
class RGWDataSyncShardCR : public RGWCoroutine {
RGWRados *store;
RGWHTTPManager *http_manager;
int total_entries;
+ int spawn_window;
+
bool *reset_backoff;
+ set<string> spawned_keys;
+
public:
RGWDataSyncShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
RGWRESTConn *_conn, rgw_bucket& _pool, const string& _source_zone,
shard_id(_shard_id),
sync_marker(_marker),
marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
- total_entries(0), reset_backoff(NULL) {
+ total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL) {
set_description() << "data sync shard source_zone=" << source_zone << " shard_id=" << shard_id;
}
#define INCREMENTAL_MAX_ENTRIES 100
ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
if (datalog_marker > sync_marker.marker) {
+ spawned_keys.clear();
yield call(new RGWReadRemoteDataLogShardCR(store, http_manager, async_rados, conn, shard_id, &sync_marker.marker, &log_entries, &truncated));
for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl;
if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl;
} else {
- yield spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, log_iter->entry.key, log_iter->log_id, marker_tracker), false);
- if (retcode < 0) {
- drain_all();
- return set_cr_error(retcode);
+ /*
+ * don't spawn the same key more than once. We can do that as long as we don't yield
+ */
+ if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
+ spawned_keys.insert(log_iter->entry.key);
+ spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, log_iter->entry.key, log_iter->log_id, marker_tracker), false);
+ if (retcode < 0) {
+ drain_all();
+ return set_cr_error(retcode);
+ }
}
}
}
+ while ((int)num_spawned() > spawn_window) {
+ set_status() << "num_spawned() > spawn_window";
+ yield wait_for_child();
+ int ret;
+ while (collect(&ret)) {
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: a sync operation returned error" << dendl;
+ /* we should have reported this error */
+#warning deal with error
+ }
+ /* not waiting for child here */
+ }
+ }
}
ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
if (datalog_marker == sync_marker.marker) {
ldout(store->ctx(), 20) << __func__ << ": skipping sync of entry: " << entry->id << ":" << key << " sync already in progress for object" << dendl;
continue;
}
- yield {
+ // yield {
set_status() << "start object sync";
if (!marker_tracker->start(entry->id, 0, entries_iter->timestamp)) {
ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << entry->id << ". Duplicate entry?" << dendl;
spawn(new RGWBucketSyncSingleEntryCR<string, rgw_obj_key>(store, async_rados, source_zone, bucket_info, shard_id,
key, versioned_epoch, entry->timestamp, entry->op, entry->state, entry->id, marker_tracker), false);
}
- }
+ // }
while ((int)num_spawned() > spawn_window) {
set_status() << "num_spawned() > spawn_window";
yield wait_for_child();