RGWDataSyncStatusManager::sync_status_oid(sc->source_zone) };
map<int, RGWDataChangesLogInfo> shards_info;
+ int ret = 0;
public:
}
int operate(const DoutPrefixProvider *dpp) override {
- int ret = 0;
reenter(this) {
if (!lease_cr->is_locked()) {
drain_all();
RGWSyncTraceNodeRef tn;
RGWBucketIncSyncShardMarkerTrack marker_tracker;
+ int ret = 0;
public:
RGWBucketShardIncrementalSyncCR(RGWDataSyncCtx *_sc,
int RGWBucketShardIncrementalSyncCR::operate(const DoutPrefixProvider *dpp)
{
- int ret = 0;
reenter(this) {
do {
if (lease_cr && !lease_cr->is_locked()) {
if (bucket_status.state == BucketSyncState::Full) {
yield call(new RGWBucketFullSyncCR(sc, sync_pipe, status_obj,
bucket_lease_cr, bucket_status,
- tn, objv, no_lease));
+ tn, objv));
if (retcode < 0) {
tn->log(20, SSTR("ERROR: full sync failed. error: " << retcode));
RELEASE_LOCK(bucket_lease_cr);
secondary.zone.start()
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+def get_bucket_sync_state(zone, source_zone, bucket_name):
+ cmd = ['bucket', 'sync', 'status'] + zone.zone_args()
+ cmd += ['--bucket', bucket_name]
+ cmd += ['--source-zone', source_zone.name]
+ cmd += ['--format', 'json']
+ status_json, retcode = zone.cluster.admin(cmd, check_retcode=False, read_only=True)
+ if retcode != 0:
+ return None
+ status = json.loads(status_json)
+ sources = status.get('sources', [])
+ if not sources:
+ return None
+ # 'status' field is set for non-incremental states. It is absent
+ # when the bucket is in incremental sync.
+ source_status = sources[0].get('status', '')
+ if not source_status:
+ return 'incremental-sync'
+ if source_status.startswith('full sync'):
+ return 'full-sync'
+ if source_status.startswith('init'):
+ return 'init'
+ if source_status.startswith('stopped'):
+ return 'stopped'
+ return source_status
+
+@attr('bucket_sync_disable')
+def test_bucket_sync_run_during_full_sync():
+ """
+ Test that 'bucket sync run' completes full sync and transitions the bucket
+ to incremental-sync state, even when the background sync is running
+ concurrently and may hold the bucket-wide lock.
+ """
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ primary = zonegroup_conns.rw_zones[0]
+ secondary = zonegroup_conns.rw_zones[1]
+ num_objects = 1000
+
+ # 1. Create bucket on primary.
+ bucket = primary.create_bucket(gen_bucket_name())
+ log.debug('created bucket=%s', bucket.name)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # 2. Disable bucket sync so objects do not replicate while we upload.
+ disable_bucket_sync(realm.meta_master_zone(), bucket.name)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # 3. Upload 1000 objects to the primary while sync is disabled.
+ log.debug('uploading %d objects to bucket=%s', num_objects, bucket.name)
+ for i in range(num_objects):
+ primary.s3_client.put_object(Bucket=bucket.name, Key=f'obj{i}', Body=b'data')
+
+ # 4. Re-enable bucket sync. This resets the secondary to full sync state.
+ enable_bucket_sync(realm.meta_master_zone(), bucket.name)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # 5. Immediately run 'bucket sync run' on the secondary.
+ log.debug('running bucket sync run on secondary zone=%s', secondary.name)
+ cmd = ['bucket', 'sync', 'run'] + secondary.zone.zone_args()
+ cmd += ['--bucket', bucket.name, '--source-zone', primary.name]
+ secondary.zone.cluster.admin(cmd)
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ # 6. Validate all 1000 objects are present on the secondary.
+ bucket_keys_eq(primary.zone, secondary.zone, bucket.name)
+
+ # 7. Validate that sync state has moved to incremental-sync.
+ state = get_bucket_sync_state(secondary.zone, primary.zone, bucket.name)
+ log.debug('bucket sync state after bucket sync run: %s', state)
+ assert state == 'incremental-sync', \
+ f'Expected incremental-sync after bucket sync run, got: {state}'
+
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
def test_list_bucket_key_marker_encoding():
zonegroup = realm.master_zonegroup()
zonegroup_conns = ZonegroupConns(zonegroup)