RGWDataSyncShardMarkerTrack *marker_tracker;
+ RGWOmapAppend *error_repo;
+
public:
RGWDataSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
- const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker) : RGWCoroutine(_sync_env->cct),
+ const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker,
+ RGWOmapAppend *_error_repo) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
raw_key(_raw_key), entry_marker(_entry_marker),
sync_status(0),
- marker_tracker(_marker_tracker) {
+ marker_tracker(_marker_tracker),
+ error_repo(_error_repo) {
set_description() << "data sync single entry (source_zone=" << sync_env->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
}
if (sync_status < 0) {
yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", bucket_name + ":" + bucket_instance,
-sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
+ yield error_repo->append(raw_key);
}
/* FIXME: what do do in case of error */
if (!entry_marker.empty()) {
RGWContinuousLeaseCR *lease_cr;
string status_oid;
+
+ RGWOmapAppend *error_repo;
+
public:
RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env,
rgw_bucket& _pool,
lease_cr(NULL) {
set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id;
status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
+ error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store, pool, status_oid + ".retry");
}
~RGWDataSyncShardCR() {
lease_cr->abort();
lease_cr->put();
}
+ delete error_repo;
}
void append_modified_shards(set<string>& keys) {
ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl;
} else {
// fetch remote and write locally
- yield spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker), false);
+ yield spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker, error_repo), false);
if (retcode < 0) {
lease_cr->go_down();
drain_all();
for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
yield {
ldout(sync_env->cct, 20) << __func__ << "(): async update notification: " << *modified_iter << dendl;
- spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker), false);
+ spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo), false);
}
}
*/
if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
spawned_keys.insert(log_iter->entry.key);
- spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker), false);
+ spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo), false);
if (retcode < 0) {
lease_cr->go_down();
drain_all();
yield {
set_status("acquiring sync lock");
uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
- string lock_name = "sync_lock";
+ string lock_name = "sync_lock.incremental"; /* allow concurrent full sync and incremental sync on the same bucket */
RGWRados *store = sync_env->store;
lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, status_oid,
lock_name, lock_duration, this);