CopySource={'Bucket': source_bucket.name, 'Key': objname},
Key=objname)
assert e.response['Error']['Code'] == 'AccessDenied'
+
+
+@attr('object_tagging')
+def test_create_delete_obj_tagging():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+ primary = zonegroup_conns.rw_zones[0]
+ secondary = zonegroup_conns.rw_zones[1]
+
+ bucket = primary.create_bucket(gen_bucket_name())
+ log.debug('created bucket=%s', bucket.name)
+
+ objname = 'dummy'
+
+ # upload a dummy object and wait for sync.
+ k = new_key(primary, bucket, objname)
+ k.set_contents_from_string('foo')
+ zonegroup_meta_checkpoint(zonegroup)
+
+ zonegroup_data_checkpoint(zonegroup_conns)
+ log.debug('created object=%s', objname)
+
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ # put object tagging on primary zone
+ primary.s3_client.put_object_tagging(Bucket=bucket.name,
+ Key= objname,
+ Tagging={
+ 'TagSet': [
+ {
+ 'Key': 'key1',
+ 'Value': 'value1'
+ },
+ ]})
+
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ response = secondary.s3_client.get_object_tagging(Bucket=bucket.name,
+ Key = objname)
+ assert(response['TagSet'][0]['Key'] == 'key1')
+ assert(response['TagSet'][0]['Value'] == 'value1')
+
+ primary.s3_client.delete_object_tagging(Bucket=bucket.name,
+ Key= objname)
+
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ response = secondary.s3_client.get_object_tagging(Bucket=bucket.name,
+ Key = objname)
+
+ assert(response['TagSet'] == [])
+
+@attr('object_tagging')
+def test_create_delete_versioned_obj_tagging():
+
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ primary = zonegroup_conns.rw_zones[0]
+ secondary = zonegroup_conns.rw_zones[1]
+
+ bucket = primary.create_bucket(gen_bucket_name())
+ log.debug('created bucket=%s', bucket.name)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload an initial object
+ objname = 'dummy'
+ key = new_key(primary, bucket, objname)
+ key.set_contents_from_string('bar')
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ # enable versioning
+ bucket.configure_versioning(True)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload object
+ k = new_key(primary, bucket, objname)
+ k.set_contents_from_string('foo')
+ log.debug('created new version id=%s', k.version_id)
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ # put object tagging on primary zone
+ primary.s3_client.put_object_tagging(Bucket=bucket.name,
+ Key= objname,
+ VersionId = k.version_id,
+ Tagging={
+ 'TagSet': [
+ {
+ 'Key': 'key1',
+ 'Value': 'value1'
+ },
+ ]})
+
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ response = secondary.s3_client.get_object_tagging(Bucket=bucket.name,
+ Key = objname,
+ VersionId = k.version_id)
+ assert(response['TagSet'][0]['Key'] == 'key1')
+ assert(response['TagSet'][0]['Value'] == 'value1')
+
+ primary.s3_client.delete_object_tagging(Bucket=bucket.name,
+ Key= objname,
+ VersionId = k.version_id)
+
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ response = secondary.s3_client.get_object_tagging(Bucket=bucket.name,
+ Key = objname,
+ VersionId = k.version_id)
+
+ assert(response['TagSet'] == [])