From: Yehuda Sadeh Date: Tue, 1 Dec 2015 21:27:20 +0000 (-0800) Subject: rgw: data sync, don't yield when spawning X-Git-Tag: v10.1.0~354^2~159 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=52486a9635f64a4b04f596e95a27c9df8880f1a2;p=ceph.git rgw: data sync, don't yield when spawning this enables us to filter out duplicate keys without worrying about races, since spawned crs will only be executed once we yield. Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index f741c014c664..c46cbce9880c 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -739,6 +739,8 @@ public: } }; +#define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20 + class RGWDataSyncShardCR : public RGWCoroutine { RGWRados *store; RGWHTTPManager *http_manager; @@ -779,8 +781,12 @@ class RGWDataSyncShardCR : public RGWCoroutine { int total_entries; + int spawn_window; + bool *reset_backoff; + set spawned_keys; + public: RGWDataSyncShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, RGWRESTConn *_conn, rgw_bucket& _pool, const string& _source_zone, @@ -793,7 +799,7 @@ public: shard_id(_shard_id), sync_marker(_marker), marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"), - total_entries(0), reset_backoff(NULL) { + total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL) { set_description() << "data sync shard source_zone=" << source_zone << " shard_id=" << shard_id; } @@ -920,6 +926,7 @@ public: #define INCREMENTAL_MAX_ENTRIES 100 ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl; if (datalog_marker > sync_marker.marker) { + spawned_keys.clear(); yield call(new RGWReadRemoteDataLogShardCR(store, http_manager, async_rados, conn, shard_id, &sync_marker.marker, &log_entries, &truncated)); for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) { ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl; @@ -930,13 +937,32 @@ public: if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) { ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl; } else { - yield spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, log_iter->entry.key, log_iter->log_id, marker_tracker), false); - if (retcode < 0) { - drain_all(); - return set_cr_error(retcode); + /* + * don't spawn the same key more than once. We can do that as long as we don't yield + */ + if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) { + spawned_keys.insert(log_iter->entry.key); + spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, log_iter->entry.key, log_iter->log_id, marker_tracker), false); + if (retcode < 0) { + drain_all(); + return set_cr_error(retcode); + } } } } + while ((int)num_spawned() > spawn_window) { + set_status() << "num_spawned() > spawn_window"; + yield wait_for_child(); + int ret; + while (collect(&ret)) { + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: a sync operation returned error" << dendl; + /* we should have reported this error */ +#warning deal with error + } + /* not waiting for child here */ + } + } } ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl; if (datalog_marker == sync_marker.marker) { @@ -2089,7 +2115,7 @@ int RGWBucketShardIncrementalSyncCR::operate() ldout(store->ctx(), 20) << __func__ << ": skipping sync of entry: " << entry->id << ":" << key << " sync already in progress for object" << dendl; continue; } - yield { + // yield { set_status() << "start object sync"; if (!marker_tracker->start(entry->id, 0, entries_iter->timestamp)) { ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << entry->id << ". Duplicate entry?" << dendl; @@ -2101,7 +2127,7 @@ int RGWBucketShardIncrementalSyncCR::operate() spawn(new RGWBucketSyncSingleEntryCR(store, async_rados, source_zone, bucket_info, shard_id, key, versioned_epoch, entry->timestamp, entry->op, entry->state, entry->id, marker_tracker), false); } - } + // } while ((int)num_spawned() > spawn_window) { set_status() << "num_spawned() > spawn_window"; yield wait_for_child();