RGWDataSyncShardMarkerTrack *marker_tracker;
RGWOmapAppend *error_repo;
+ bool remove_from_repo;
+
+ set<string> keys;
public:
RGWDataSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker,
- RGWOmapAppend *_error_repo) : RGWCoroutine(_sync_env->cct),
+ RGWOmapAppend *_error_repo, bool _remove_from_repo) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
raw_key(_raw_key), entry_marker(_entry_marker),
sync_status(0),
marker_tracker(_marker_tracker),
- error_repo(_error_repo) {
+ error_repo(_error_repo), remove_from_repo(_remove_from_repo) {
set_description() << "data sync single entry (source_zone=" << sync_env->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
}
if (ret < 0) {
return set_cr_error(-EIO);
}
- marker_tracker->reset_need_retry(raw_key);
+ if (marker_tracker) {
+ marker_tracker->reset_need_retry(raw_key);
+ }
call(new RGWRunBucketSyncCoroutine(sync_env, bucket_name, bucket_instance, shard_id));
}
- } while (marker_tracker->need_retry(raw_key));
+ } while (marker_tracker && marker_tracker->need_retry(raw_key));
sync_status = retcode;
if (sync_status < 0) {
yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", bucket_name + ":" + bucket_instance,
-sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
- yield error_repo->append(raw_key);
+ if (retcode < 0) {
+ ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure: retcode=" << retcode << dendl;
+ }
+ if (!error_repo->append(raw_key)) {
+ ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure in error repo: retcode=" << retcode << dendl;
+ }
+ } else if (remove_from_repo) {
+ keys = {raw_key};
+ yield call(new RGWRadosRemoveOmapKeysCR(sync_env->store, error_repo->get_pool(), error_repo->get_oid(), keys));
+ if (retcode < 0) {
+ ldout(sync_env->store->ctx(), 0) << "ERROR: failed to remove omap key from error repo ("
+ << error_repo->get_pool() << ":" << error_repo->get_oid() << " retcode=" << retcode << dendl;
+ }
}
/* FIXME: what do do in case of error */
- if (!entry_marker.empty()) {
+ if (marker_tracker && !entry_marker.empty()) {
/* update marker */
yield call(marker_tracker->finish(entry_marker));
}
};
#define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
+#define DATA_SYNC_MAX_ERR_ENTRIES 10
class RGWDataSyncShardCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
RGWContinuousLeaseCR *lease_cr;
string status_oid;
+
+ string error_oid;
RGWOmapAppend *error_repo;
+ map<string, bufferlist> error_entries;
+ string error_marker;
+ int max_error_entries;
public:
RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env,
sync_marker(_marker),
marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
- lease_cr(NULL) {
+ lease_cr(NULL), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES) {
set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id;
status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
- error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store, pool, status_oid + ".retry");
+ error_oid = status_oid + ".retry";
}
~RGWDataSyncShardCR() {
lease_cr->abort();
lease_cr->put();
}
- delete error_repo;
+ if (error_repo) {
+ error_repo->put();
+ }
}
void append_modified_shards(set<string>& keys) {
ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl;
} else {
// fetch remote and write locally
- yield spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker, error_repo), false);
+ yield spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker, error_repo, false), false);
if (retcode < 0) {
lease_cr->go_down();
drain_all();
int incremental_sync() {
reenter(&incremental_cr) {
+ error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store, pool, error_oid, 1 /* no buffer */);
+ error_repo->get();
+ spawn(error_repo, false);
yield init_lease_cr();
while (!lease_cr->is_locked()) {
if (lease_cr->is_done()) {
for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
yield {
ldout(sync_env->cct, 20) << __func__ << "(): async update notification: " << *modified_iter << dendl;
- spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo), false);
+ spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo, false), false);
}
}
+ /* process bucket shards that previously failed */
+ yield call(new RGWRadosGetOmapKeysCR(sync_env->store, pool, error_oid, error_marker, &error_entries, max_error_entries));
+ ldout(sync_env->cct, 20) << __func__ << "(): read error repo, got " << error_entries.size() << " entries" << dendl;
+ iter = error_entries.begin();
+ for (; iter != error_entries.end(); ++iter) {
+ ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << iter->first << dendl;
+ spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, nullptr /* no marker tracker */, error_repo, true), false);
+ error_marker = iter->first;
+ }
+ if ((int)error_entries.size() != max_error_entries) {
+ error_marker.clear();
+ }
+
+
yield call(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &shard_info));
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: failed to fetch remote data log info: ret=" << retcode << dendl;
- lease_cr->go_down();
+ stop_spawned_services();
drain_all();
return set_cr_error(retcode);
}
yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated));
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: failed to read remote data log info: ret=" << retcode << dendl;
- lease_cr->go_down();
+ stop_spawned_services();
drain_all();
return set_cr_error(retcode);
}
*/
if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
spawned_keys.insert(log_iter->entry.key);
- spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo), false);
+ spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false), false);
if (retcode < 0) {
- lease_cr->go_down();
+ stop_spawned_services();
drain_all();
return set_cr_error(retcode);
}
}
return 0;
}
+ void stop_spawned_services() {
+ lease_cr->go_down();
+ if (error_repo) {
+ error_repo->put();
+ error_repo = NULL;
+ }
+ }
};
class RGWDataSyncShardControlCR : public RGWBackoffControlCR {