secondary.zone.start()
zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+def test_list_bucket_key_marker_encoding():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ primary = zonegroup_conns.rw_zones[0]
+ secondary = zonegroup_conns.rw_zones[1]
+
+ bucket = primary.create_bucket(gen_bucket_name())
+ log.debug('created bucket=%s', bucket.name)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # test for object names with '%' character.
+ for obj in range(1, 1100):
+ k = new_key(primary, bucket.name, f'obj%{obj * 11}')
+ k.set_contents_from_string('foo')
+
+ # wait for the secondary to catch up
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ cmd = ['bucket', 'sync', 'init'] + secondary.zone.zone_args()
+ cmd += ['--bucket', bucket.name]
+ cmd += ['--source-zone', primary.name]
+ secondary.zone.cluster.admin(cmd)
+
+ cmd = ['bucket', 'sync', 'run'] + secondary.zone.zone_args()
+ cmd += ['--bucket', bucket.name, '--source-zone', primary.name]
+ secondary.zone.cluster.admin(cmd)
+
+ # write an object during incremental sync.
+ objname = 'test_incremental'
+ k = new_key(primary, bucket, objname)
+ k.set_contents_from_string('foo')
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ # the object uploaded after bucket sync init should be replicated
+ check_object_exists(bucket, objname)
+
def test_role_sync():
zonegroup = realm.master_zonegroup()
Bucket=dest_bucket.name,
CopySource={'Bucket': source_bucket.name, 'Key': objname},
Key=objname)
- assert e.response['Error']['Code'] == 'AccessDenied'
-
-
-@attr('object_tagging')
-def test_create_delete_obj_tagging():
- zonegroup = realm.master_zonegroup()
- zonegroup_conns = ZonegroupConns(zonegroup)
- primary = zonegroup_conns.rw_zones[0]
- secondary = zonegroup_conns.rw_zones[1]
-
- bucket = primary.create_bucket(gen_bucket_name())
- log.debug('created bucket=%s', bucket.name)
-
- objname = 'dummy'
-
- # upload a dummy object and wait for sync.
- k = new_key(primary, bucket, objname)
- k.set_contents_from_string('foo')
- zonegroup_meta_checkpoint(zonegroup)
-
- zonegroup_data_checkpoint(zonegroup_conns)
- log.debug('created object=%s', objname)
-
- zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
-
- # put object tagging on primary zone
- primary.s3_client.put_object_tagging(Bucket=bucket.name,
- Key= objname,
- Tagging={
- 'TagSet': [
- {
- 'Key': 'key1',
- 'Value': 'value1'
- },
- ]})
-
- zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
-
- response = secondary.s3_client.get_object_tagging(Bucket=bucket.name,
- Key = objname)
- assert(response['TagSet'][0]['Key'] == 'key1')
- assert(response['TagSet'][0]['Value'] == 'value1')
-
- primary.s3_client.delete_object_tagging(Bucket=bucket.name,
- Key= objname)
-
- zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
-
- response = secondary.s3_client.get_object_tagging(Bucket=bucket.name,
- Key = objname)
-
- assert(response['TagSet'] == [])
-
-@attr('object_tagging')
-def test_create_delete_versioned_obj_tagging():
-
- zonegroup = realm.master_zonegroup()
- zonegroup_conns = ZonegroupConns(zonegroup)
-
- primary = zonegroup_conns.rw_zones[0]
- secondary = zonegroup_conns.rw_zones[1]
-
- bucket = primary.create_bucket(gen_bucket_name())
- log.debug('created bucket=%s', bucket.name)
- zonegroup_meta_checkpoint(zonegroup)
-
- # upload an initial object
- objname = 'dummy'
- key = new_key(primary, bucket, objname)
- key.set_contents_from_string('bar')
- zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
-
- # enable versioning
- bucket.configure_versioning(True)
- zonegroup_meta_checkpoint(zonegroup)
-
- # upload object
- k = new_key(primary, bucket, objname)
- k.set_contents_from_string('foo')
- log.debug('created new version id=%s', k.version_id)
- zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
-
- # put object tagging on primary zone
- primary.s3_client.put_object_tagging(Bucket=bucket.name,
- Key= objname,
- VersionId = k.version_id,
- Tagging={
- 'TagSet': [
- {
- 'Key': 'key1',
- 'Value': 'value1'
- },
- ]})
-
- zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
-
- response = secondary.s3_client.get_object_tagging(Bucket=bucket.name,
- Key = objname,
- VersionId = k.version_id)
- assert(response['TagSet'][0]['Key'] == 'key1')
- assert(response['TagSet'][0]['Value'] == 'value1')
-
- primary.s3_client.delete_object_tagging(Bucket=bucket.name,
- Key= objname,
- VersionId = k.version_id)
-
- zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
-
- response = secondary.s3_client.get_object_tagging(Bucket=bucket.name,
- Key = objname,
- VersionId = k.version_id)
-
- assert(response['TagSet'] == [])
+ assert e.response['Error']['Code'] == 'AccessDenied'
\ No newline at end of file