class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
-
string marker_oid;
rgw_data_sync_marker sync_marker;
-
- map<string, string> key_to_marker;
- map<string, string> marker_to_key;
-
- void handle_finish(const string& marker) override {
- map<string, string>::iterator iter = marker_to_key.find(marker);
- if (iter == marker_to_key.end()) {
- return;
- }
- key_to_marker.erase(iter->second);
- reset_need_retry(iter->second);
- marker_to_key.erase(iter);
- }
-
RGWSyncTraceNodeRef tn;
public:
sync_marker);
}
- /*
- * create index from key -> marker, and from marker -> key
- * this is useful so that we can insure that we only have one
- * entry for any key that is used. This is needed when doing
- * incremenatl sync of data, and we don't want to run multiple
- * concurrent sync operations for the same bucket shard
- */
- bool index_key_to_marker(const string& key, const string& marker) {
- if (key_to_marker.find(key) != key_to_marker.end()) {
- set_need_retry(key);
- return false;
- }
- key_to_marker[key] = marker;
- marker_to_key[marker] = key;
- return true;
- }
-
RGWOrderCallCR *allocate_order_control_cr() override {
return new RGWLastCallerWinsCR(sync_env->cct);
}
marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
continue;
}
- if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
- tn->log(20, SSTR("skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard"));
- marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
- continue;
- }
if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?"));
} else {