uint64_t versioned_epoch;
utime_t timestamp;
RGWModifyOp op;
+ RGWPendingState op_state;
T entry_marker;
RGWSyncShardMarkerTrack<T> *marker_tracker;
int operate() {
reenter(this) {
+ /* skip entries that are not complete */
+ if (op_state != CLS_RGW_STATE_COMPLETE) {
+ goto done;
+ }
yield {
if (op == CLS_RGW_OP_ADD ||
op == CLS_RGW_OP_LINK_OLH) {
ldout(store->ctx(), 0) << "ERROR: failed to sync object: " << bucket.name << ":" << bucket.bucket_id << ":" << shard_id << "/" << key << dendl;
sync_status = retcode;
}
+done:
/* update marker */
yield call(marker_tracker->finish(entry_marker));
if (sync_status == 0) {
RGWModifyOp op = (entry.key.instance.empty() || entry.key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH);
spawn(new RGWBucketSyncSingleEntryCR<rgw_obj_key>(store, async_rados, source_zone, bucket_info, shard_id,
- entry.key, entry.versioned_epoch, entry.mtime, op, entry.key, marker_tracker), false);
+ entry.key, entry.versioned_epoch, entry.mtime, op, CLS_RGW_STATE_COMPLETE, entry.key, marker_tracker), false);
}
}
while ((int)num_spawned() > spawn_window) {
versioned_epoch = entry.ver.epoch;
}
spawn(new RGWBucketSyncSingleEntryCR<string>(store, async_rados, source_zone, bucket_info, shard_id,
- key, versioned_epoch, entry.timestamp, entry.op, entry.id, marker_tracker), false);
+ key, versioned_epoch, entry.timestamp, entry.op, entry.state, entry.id, marker_tracker), false);
}
}
while ((int)num_spawned() > spawn_window) {