From: Yehuda Sadeh Date: Fri, 13 May 2016 18:13:48 +0000 (-0700) Subject: rgw: data sync retries sync on prevously failed bucket shards X-Git-Tag: v11.0.0~361^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f1ccc4cd973d16e7676b2374eeefe4ee6f6a4630;p=ceph.git rgw: data sync retries sync on prevously failed bucket shards Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index d9c577b25e67..ae771911b24b 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -786,16 +786,19 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine { RGWDataSyncShardMarkerTrack *marker_tracker; RGWOmapAppend *error_repo; + bool remove_from_repo; + + set keys; public: RGWDataSyncSingleEntryCR(RGWDataSyncEnv *_sync_env, const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker, - RGWOmapAppend *_error_repo) : RGWCoroutine(_sync_env->cct), + RGWOmapAppend *_error_repo, bool _remove_from_repo) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), raw_key(_raw_key), entry_marker(_entry_marker), sync_status(0), marker_tracker(_marker_tracker), - error_repo(_error_repo) { + error_repo(_error_repo), remove_from_repo(_remove_from_repo) { set_description() << "data sync single entry (source_zone=" << sync_env->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker; } @@ -808,20 +811,34 @@ public: if (ret < 0) { return set_cr_error(-EIO); } - marker_tracker->reset_need_retry(raw_key); + if (marker_tracker) { + marker_tracker->reset_need_retry(raw_key); + } call(new RGWRunBucketSyncCoroutine(sync_env, bucket_name, bucket_instance, shard_id)); } - } while (marker_tracker->need_retry(raw_key)); + } while (marker_tracker && marker_tracker->need_retry(raw_key)); sync_status = retcode; if (sync_status < 0) { yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", bucket_name + ":" + bucket_instance, -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status))); - yield error_repo->append(raw_key); + if (retcode < 0) { + ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure: retcode=" << retcode << dendl; + } + if (!error_repo->append(raw_key)) { + ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure in error repo: retcode=" << retcode << dendl; + } + } else if (remove_from_repo) { + keys = {raw_key}; + yield call(new RGWRadosRemoveOmapKeysCR(sync_env->store, error_repo->get_pool(), error_repo->get_oid(), keys)); + if (retcode < 0) { + ldout(sync_env->store->ctx(), 0) << "ERROR: failed to remove omap key from error repo (" + << error_repo->get_pool() << ":" << error_repo->get_oid() << " retcode=" << retcode << dendl; + } } /* FIXME: what do do in case of error */ - if (!entry_marker.empty()) { + if (marker_tracker && !entry_marker.empty()) { /* update marker */ yield call(marker_tracker->finish(entry_marker)); } @@ -838,6 +855,7 @@ public: }; #define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20 +#define DATA_SYNC_MAX_ERR_ENTRIES 10 class RGWDataSyncShardCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; @@ -884,7 +902,12 @@ class RGWDataSyncShardCR : public RGWCoroutine { RGWContinuousLeaseCR *lease_cr; string status_oid; + + string error_oid; RGWOmapAppend *error_repo; + map error_entries; + string error_marker; + int max_error_entries; public: RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env, @@ -896,10 +919,10 @@ public: sync_marker(_marker), marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"), total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL), - lease_cr(NULL) { + lease_cr(NULL), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES) { set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id; status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id); - error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store, pool, status_oid + ".retry"); + error_oid = status_oid + ".retry"; } ~RGWDataSyncShardCR() { @@ -908,7 +931,9 @@ public: lease_cr->abort(); lease_cr->put(); } - delete error_repo; + if (error_repo) { + error_repo->put(); + } } void append_modified_shards(set& keys) { @@ -994,7 +1019,7 @@ public: ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl; } else { // fetch remote and write locally - yield spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker, error_repo), false); + yield spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker, error_repo, false), false); if (retcode < 0) { lease_cr->go_down(); drain_all(); @@ -1028,6 +1053,9 @@ public: int incremental_sync() { reenter(&incremental_cr) { + error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store, pool, error_oid, 1 /* no buffer */); + error_repo->get(); + spawn(error_repo, false); yield init_lease_cr(); while (!lease_cr->is_locked()) { if (lease_cr->is_done()) { @@ -1050,14 +1078,28 @@ public: for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) { yield { ldout(sync_env->cct, 20) << __func__ << "(): async update notification: " << *modified_iter << dendl; - spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo), false); + spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo, false), false); } } + /* process bucket shards that previously failed */ + yield call(new RGWRadosGetOmapKeysCR(sync_env->store, pool, error_oid, error_marker, &error_entries, max_error_entries)); + ldout(sync_env->cct, 20) << __func__ << "(): read error repo, got " << error_entries.size() << " entries" << dendl; + iter = error_entries.begin(); + for (; iter != error_entries.end(); ++iter) { + ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << iter->first << dendl; + spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, nullptr /* no marker tracker */, error_repo, true), false); + error_marker = iter->first; + } + if ((int)error_entries.size() != max_error_entries) { + error_marker.clear(); + } + + yield call(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &shard_info)); if (retcode < 0) { ldout(sync_env->cct, 0) << "ERROR: failed to fetch remote data log info: ret=" << retcode << dendl; - lease_cr->go_down(); + stop_spawned_services(); drain_all(); return set_cr_error(retcode); } @@ -1069,7 +1111,7 @@ public: yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated)); if (retcode < 0) { ldout(sync_env->cct, 0) << "ERROR: failed to read remote data log info: ret=" << retcode << dendl; - lease_cr->go_down(); + stop_spawned_services(); drain_all(); return set_cr_error(retcode); } @@ -1088,9 +1130,9 @@ public: */ if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) { spawned_keys.insert(log_iter->entry.key); - spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo), false); + spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false), false); if (retcode < 0) { - lease_cr->go_down(); + stop_spawned_services(); drain_all(); return set_cr_error(retcode); } @@ -1119,6 +1161,13 @@ public: } return 0; } + void stop_spawned_services() { + lease_cr->go_down(); + if (error_repo) { + error_repo->put(); + error_repo = NULL; + } + } }; class RGWDataSyncShardControlCR : public RGWBackoffControlCR {