From ddfb0193828b09104f24595bd43de3c074437dee Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Wed, 7 Oct 2015 16:24:56 -0700 Subject: [PATCH] rgw: tie incremental data sync and other fixes Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_data_sync.cc | 39 +++++++++++++++++++++++++++++---------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index f64f85c9d8bea..35117d905e03f 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -748,12 +748,14 @@ public: sync_status = retcode; #warning what do do in case of error - yield { - /* update marker */ - int ret = call(marker_tracker->finish(entry_marker)); - if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: marker_tracker->finish(" << entry_marker << ") returned ret=" << ret << dendl; - return set_state(RGWCoroutine_Error, sync_status); + if (!entry_marker.empty()) { + yield { + /* update marker */ + int ret = call(marker_tracker->finish(entry_marker)); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: marker_tracker->finish(" << entry_marker << ") returned ret=" << ret << dendl; + return set_state(RGWCoroutine_Error, sync_status); + } } } if (sync_status == 0) { @@ -802,6 +804,9 @@ class RGWDataSyncShardCR : public RGWCoroutine { set modified_shards; + set current_modified; + + set::iterator modified_iter; public: @@ -899,6 +904,19 @@ public: RGWDataSyncStatusManager::shard_obj_name(source_zone, shard_id), sync_marker)); do { + current_modified.clear(); + inc_lock.Lock(); + current_modified.swap(modified_shards); + inc_lock.Unlock(); + + /* process out of band updates */ + for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) { + yield { + ldout(store->ctx(), 20) << __func__ << "(): async update notification: " << *modified_iter << dendl; + spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, *modified_iter, string(), marker_tracker), false); + } + } + yield { int ret = call(new RGWReadRemoteDataLogShardInfoCR(store, http_manager, async_rados, shard_id, &shard_info)); if (ret < 0) { @@ -1995,6 +2013,8 @@ int RGWRunBucketSyncCoroutine::operate() return set_state(RGWCoroutine_Error, retcode); } + ldout(store->ctx(), 20) << __func__ << "(): sync status for bucket " << bucket_name << ":" << bucket_id << ":" << shard_id << ": " << sync_status.state << dendl; + yield { int r = call(new RGWGetBucketInstanceInfoCR(async_rados, store, bucket_name, bucket_id, &bucket_info)); if (r < 0) { @@ -2016,13 +2036,12 @@ int RGWRunBucketSyncCoroutine::operate() ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status" << dendl; return r; } + sync_status.state = rgw_bucket_shard_sync_info::StateFullSync; } - - sync_status.state = rgw_bucket_shard_sync_info::StateFullSync; } if (retcode < 0) { - ldout(store->ctx(), 0) << "ERROR: full sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl; + ldout(store->ctx(), 0) << "ERROR: init sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl; return set_state(RGWCoroutine_Error, retcode); } yield { @@ -2034,8 +2053,8 @@ int RGWRunBucketSyncCoroutine::operate() ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status" << dendl; return r; } + sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync; } - sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync; } if (retcode < 0) { -- 2.39.5