continue
check_role_eq(source_conn, target_conn, role)
+
+@attr('data_sync_init')
+def test_bucket_full_sync_after_data_sync_init():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ primary = zonegroup_conns.rw_zones[0]
+ secondary = zonegroup_conns.rw_zones[1]
+
+ bucket = primary.create_bucket(gen_bucket_name())
+ log.debug('created bucket=%s', bucket.name)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ try:
+ # stop secondary zone before it starts a bucket full sync
+ secondary.zone.stop()
+
+ # write some objects that don't sync yet
+ for obj in range(1, 6):
+ k = new_key(primary, bucket.name, f'obj{obj * 11}')
+ k.set_contents_from_string('foo')
+
+ cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args()
+ cmd += ['--source-zone', primary.name]
+ secondary.zone.cluster.admin(cmd)
+ finally:
+ # Do this as a finally so we bring the zone back up even on error.
+ secondary.zone.start()
+
+ # expect all objects to replicate via 'bucket full sync'
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+ zonegroup_data_checkpoint(zonegroup_conns)
+
+@attr('data_sync_init')
+@attr('bucket_reshard')
+def test_resharded_bucket_full_sync_after_data_sync_init():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ primary = zonegroup_conns.rw_zones[0]
+ secondary = zonegroup_conns.rw_zones[1]
+
+ bucket = primary.create_bucket(gen_bucket_name())
+ log.debug('created bucket=%s', bucket.name)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ try:
+ # stop secondary zone before it starts a bucket full sync
+ secondary.zone.stop()
+
+ # Write zeroth generation
+ for obj in range(1, 6):
+ k = new_key(primary, bucket.name, f'obj{obj * 11}')
+ k.set_contents_from_string('foo')
+
+ # Write several more generations
+ generations = [17, 19, 23, 29, 31, 37]
+ for num_shards in generations:
+ reshard_bucket(primary.zone, bucket.name, num_shards)
+ for obj in range(1, 6):
+ k = new_key(primary, bucket.name, f'obj{obj * num_shards}')
+ k.set_contents_from_string('foo')
+
+ cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args()
+ cmd += ['--source-zone', primary.name]
+ secondary.zone.cluster.admin(cmd)
+ finally:
+ # Do this as a finally so we bring the zone back up even on error.
+ secondary.zone.start()
+
+ # expect all objects to replicate via 'bucket full sync'
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+ zonegroup_data_checkpoint(zonegroup_conns)
+
+@attr('data_sync_init')
+def test_bucket_incremental_sync_after_data_sync_init():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ primary = zonegroup_conns.rw_zones[0]
+ secondary = zonegroup_conns.rw_zones[1]
+
+ bucket = primary.create_bucket(gen_bucket_name())
+ log.debug('created bucket=%s', bucket.name)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload a dummy object and wait for sync. this forces each zone to finish
+ # a full sync and switch to incremental
+ k = new_key(primary, bucket, 'dummy')
+ k.set_contents_from_string('foo')
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ try:
+ # stop secondary zone before it syncs the rest
+ secondary.zone.stop()
+
+ # Write more objects to primary
+ for obj in range(1, 6):
+ k = new_key(primary, bucket.name, f'obj{obj * 11}')
+ k.set_contents_from_string('foo')
+
+ cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args()
+ cmd += ['--source-zone', primary.name]
+ secondary.zone.cluster.admin(cmd)
+ finally:
+ # Do this as a finally so we bring the zone back up even on error.
+ secondary.zone.start()
+
+ # expect remaining objects to replicate via 'bucket incremental sync'
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+ zonegroup_data_checkpoint(zonegroup_conns)
+
+@attr('data_sync_init')
+@attr('bucket_reshard')
+def test_resharded_bucket_incremental_sync_latest_after_data_sync_init():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ primary = zonegroup_conns.rw_zones[0]
+ secondary = zonegroup_conns.rw_zones[1]
+
+ bucket = primary.create_bucket(gen_bucket_name())
+ log.debug('created bucket=%s', bucket.name)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # Write zeroth generation to primary
+ for obj in range(1, 6):
+ k = new_key(primary, bucket.name, f'obj{obj * 11}')
+ k.set_contents_from_string('foo')
+
+ # Write several more generations
+ generations = [17, 19, 23, 29, 31, 37]
+ for num_shards in generations:
+ reshard_bucket(primary.zone, bucket.name, num_shards)
+ for obj in range(1, 6):
+ k = new_key(primary, bucket.name, f'obj{obj * num_shards}')
+ k.set_contents_from_string('foo')
+
+ # wait for the secondary to catch up to the latest gen
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ try:
+ # stop secondary zone before it syncs the rest
+ secondary.zone.stop()
+
+ # write some more objects to the last gen
+ for obj in range(1, 6):
+ k = new_key(primary, bucket.name, f'obj{obj * generations[-1]}')
+ k.set_contents_from_string('foo')
+
+ cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args()
+ cmd += ['--source-zone', primary.name]
+ secondary.zone.cluster.admin(cmd)
+ finally:
+ # Do this as a finally so we bring the zone back up even on error.
+ secondary.zone.start()
+
+ # expect remaining objects in last gen to replicate via 'bucket incremental sync'
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+ zonegroup_data_checkpoint(zonegroup_conns)
+
+@attr('data_sync_init')
+@attr('bucket_reshard')
+def test_resharded_bucket_incremental_sync_oldest_after_data_sync_init():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ primary = zonegroup_conns.rw_zones[0]
+ secondary = zonegroup_conns.rw_zones[1]
+
+ bucket = primary.create_bucket(gen_bucket_name())
+ log.debug('created bucket=%s', bucket.name)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # Write zeroth generation to primary
+ for obj in range(1, 6):
+ k = new_key(primary, bucket.name, f'obj{obj * 11}')
+ k.set_contents_from_string('foo')
+
+ # wait for the secondary to catch up
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ try:
+ # stop secondary zone before it syncs later generations
+ secondary.zone.stop()
+
+ # Write several more generations
+ generations = [17, 19, 23, 29, 31, 37]
+ for num_shards in generations:
+ reshard_bucket(primary.zone, bucket.name, num_shards)
+ for obj in range(1, 6):
+ k = new_key(primary, bucket.name, f'obj{obj * num_shards}')
+ k.set_contents_from_string('foo')
+
+ cmd = ['data', 'sync', 'init'] + secondary.zone.zone_args()
+ cmd += ['--source-zone', primary.name]
+ secondary.zone.cluster.admin(cmd)
+ finally:
+ # Do this as a finally so we bring the zone back up even on error.
+ secondary.zone.start()
+
+ # expect all generations to replicate via 'bucket incremental sync'
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+ zonegroup_data_checkpoint(zonegroup_conns)