string oid;
- RGWDataSyncShardMarkerTrack *marker_tracker;
+ std::optional<RGWDataSyncShardMarkerTrack> marker_tracker;
std::string next_marker;
list<rgw_data_change_log_entry> log_entries;
pool(_pool),
shard_id(_shard_id),
sync_marker(_marker),
- marker_tracker(NULL), truncated(false),
+ truncated(false),
total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT), tn(_tn) {
}
~RGWDataSyncShardCR() override {
- delete marker_tracker;
if (lease_cr) {
lease_cr->abort();
}
modified_shards.insert(keys.begin(), keys.end());
}
- void set_marker_tracker(RGWDataSyncShardMarkerTrack *mt) {
- delete marker_tracker;
- marker_tracker = mt;
- }
-
int operate() override {
int r;
while (true) {
}
tn->log(10, "took lease");
oid = full_data_sync_index_shard_oid(sc->source_zone, shard_id);
- set_marker_tracker(new RGWDataSyncShardMarkerTrack(sc, status_oid, sync_marker, tn));
+ marker_tracker.emplace(sc, status_oid, sync_marker, tn);
total_entries = sync_marker.pos;
entry_timestamp = sync_marker.timestamp; // time when full sync started
do {
tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first << ". Duplicate entry?"));
} else {
// fetch remote and write locally
- yield spawn(new RGWDataSyncSingleEntryCR(sc, source_bs, iter->first, iter->first, entry_timestamp, marker_tracker, error_repo, false, tn), false);
+ yield spawn(new RGWDataSyncSingleEntryCR(sc, source_bs, iter->first, iter->first, entry_timestamp, &*marker_tracker, error_repo, false, tn), false);
}
sync_marker.marker = iter->first;
1 /* no buffer */);
error_repo->get();
spawn(error_repo, false);
- set_marker_tracker(new RGWDataSyncShardMarkerTrack(sc, status_oid, sync_marker, tn));
+ marker_tracker.emplace(sc, status_oid, sync_marker, tn);
do {
if (!lease_cr->is_locked()) {
stop_spawned_services();
continue;
}
tn->log(20, SSTR("received async update notification: " << *modified_iter));
- spawn(new RGWDataSyncSingleEntryCR(sc, source_bs, *modified_iter, string(), ceph::real_time{}, marker_tracker, nullptr, false, tn), false);
+ spawn(new RGWDataSyncSingleEntryCR(sc, source_bs, *modified_iter, string(), ceph::real_time{}, &*marker_tracker, nullptr, false, tn), false);
}
if (error_retry_time <= ceph::coarse_real_clock::now()) {
tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?"));
} else {
spawn(new RGWDataSyncSingleEntryCR(sc, source_bs, log_iter->entry.key, log_iter->log_id,
- log_iter->log_timestamp, marker_tracker,
+ log_iter->log_timestamp, &*marker_tracker,
error_repo, false, tn), false);
}
while ((int)num_spawned() > spawn_window) {
rgw_data_sync_status sync_status;
- RGWDataSyncShardMarkerTrack *marker_tracker;
-
ceph::mutex shard_crs_lock =
ceph::make_mutex("RGWDataSyncCR::shard_crs_lock");
map<int, RGWDataSyncShardControlCR *> shard_crs;
RGWDataSyncCR(RGWDataSyncCtx *_sc, uint32_t _num_shards, RGWSyncTraceNodeRef& _tn, bool *_reset_backoff) : RGWCoroutine(_sc->cct),
sc(_sc), sync_env(_sc->env),
num_shards(_num_shards),
- marker_tracker(NULL),
reset_backoff(_reset_backoff), tn(_tn) {
}