From: Yehuda Sadeh Date: Wed, 11 May 2016 22:59:27 +0000 (-0700) Subject: rgw: store failed data sync entries in separate omap X-Git-Tag: v11.0.0~361^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b7deb7cb09304f1b0963139296bdb3abb22895ff;p=ceph.git rgw: store failed data sync entries in separate omap so that we can reiterate over them Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index ef4e43412cb1..d9c577b25e67 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -785,13 +785,17 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine { RGWDataSyncShardMarkerTrack *marker_tracker; + RGWOmapAppend *error_repo; + public: RGWDataSyncSingleEntryCR(RGWDataSyncEnv *_sync_env, - const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker) : RGWCoroutine(_sync_env->cct), + const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker, + RGWOmapAppend *_error_repo) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), raw_key(_raw_key), entry_marker(_entry_marker), sync_status(0), - marker_tracker(_marker_tracker) { + marker_tracker(_marker_tracker), + error_repo(_error_repo) { set_description() << "data sync single entry (source_zone=" << sync_env->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker; } @@ -814,6 +818,7 @@ public: if (sync_status < 0) { yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", bucket_name + ":" + bucket_instance, -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status))); + yield error_repo->append(raw_key); } /* FIXME: what do do in case of error */ if (!entry_marker.empty()) { @@ -878,6 +883,9 @@ class RGWDataSyncShardCR : public RGWCoroutine { RGWContinuousLeaseCR *lease_cr; string status_oid; + + RGWOmapAppend *error_repo; + public: RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env, rgw_bucket& _pool, @@ -891,6 +899,7 @@ public: lease_cr(NULL) { set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id; status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id); + error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store, pool, status_oid + ".retry"); } ~RGWDataSyncShardCR() { @@ -899,6 +908,7 @@ public: lease_cr->abort(); lease_cr->put(); } + delete error_repo; } void append_modified_shards(set& keys) { @@ -984,7 +994,7 @@ public: ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl; } else { // fetch remote and write locally - yield spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker), false); + yield spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker, error_repo), false); if (retcode < 0) { lease_cr->go_down(); drain_all(); @@ -1040,7 +1050,7 @@ public: for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) { yield { ldout(sync_env->cct, 20) << __func__ << "(): async update notification: " << *modified_iter << dendl; - spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker), false); + spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo), false); } } @@ -1078,7 +1088,7 @@ public: */ if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) { spawned_keys.insert(log_iter->entry.key); - spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker), false); + spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo), false); if (retcode < 0) { lease_cr->go_down(); drain_all(); @@ -2229,7 +2239,7 @@ int RGWBucketShardIncrementalSyncCR::operate() yield { set_status("acquiring sync lock"); uint32_t lock_duration = cct->_conf->rgw_sync_lease_period; - string lock_name = "sync_lock"; + string lock_name = "sync_lock.incremental"; /* allow concurrent full sync and incremental sync on the same bucket */ RGWRados *store = sync_env->store; lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, status_oid, lock_name, lock_duration, this);