From: Shilpa Jagannath Date: Fri, 18 Apr 2025 18:26:20 +0000 (-0400) Subject: qa/multisite: add object tagging tests to multisite suite X-Git-Tag: testing/wip-vshankar-testing-20250714.111344-debug~4^2~2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=6db5a934b9e508c95292e00937ea624cb0f0ee52;p=ceph-ci.git qa/multisite: add object tagging tests to multisite suite Signed-off-by: Shilpa Jagannath --- diff --git a/src/test/rgw/rgw_multi/tests.py b/src/test/rgw/rgw_multi/tests.py index 616a41f2992..406196f24c4 100644 --- a/src/test/rgw/rgw_multi/tests.py +++ b/src/test/rgw/rgw_multi/tests.py @@ -5865,3 +5865,115 @@ def test_copy_obj_perm_check_between_zonegroups(zonegroup): CopySource={'Bucket': source_bucket.name, 'Key': objname}, Key=objname) assert e.response['Error']['Code'] == 'AccessDenied' + + +@attr('object_tagging') +def test_create_delete_obj_tagging(): + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + primary = zonegroup_conns.rw_zones[0] + secondary = zonegroup_conns.rw_zones[1] + + bucket = primary.create_bucket(gen_bucket_name()) + log.debug('created bucket=%s', bucket.name) + + objname = 'dummy' + + # upload a dummy object and wait for sync. + k = new_key(primary, bucket, objname) + k.set_contents_from_string('foo') + zonegroup_meta_checkpoint(zonegroup) + + zonegroup_data_checkpoint(zonegroup_conns) + log.debug('created object=%s', objname) + + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + + # put object tagging on primary zone + primary.s3_client.put_object_tagging(Bucket=bucket.name, + Key= objname, + Tagging={ + 'TagSet': [ + { + 'Key': 'key1', + 'Value': 'value1' + }, + ]}) + + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + + response = secondary.s3_client.get_object_tagging(Bucket=bucket.name, + Key = objname) + assert(response['TagSet'][0]['Key'] == 'key1') + assert(response['TagSet'][0]['Value'] == 'value1') + + primary.s3_client.delete_object_tagging(Bucket=bucket.name, + Key= objname) + + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + + response = secondary.s3_client.get_object_tagging(Bucket=bucket.name, + Key = objname) + + assert(response['TagSet'] == []) + +@attr('object_tagging') +def test_create_delete_versioned_obj_tagging(): + + zonegroup = realm.master_zonegroup() + zonegroup_conns = ZonegroupConns(zonegroup) + + primary = zonegroup_conns.rw_zones[0] + secondary = zonegroup_conns.rw_zones[1] + + bucket = primary.create_bucket(gen_bucket_name()) + log.debug('created bucket=%s', bucket.name) + zonegroup_meta_checkpoint(zonegroup) + + # upload an initial object + objname = 'dummy' + key = new_key(primary, bucket, objname) + key.set_contents_from_string('bar') + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + + # enable versioning + bucket.configure_versioning(True) + zonegroup_meta_checkpoint(zonegroup) + + # upload object + k = new_key(primary, bucket, objname) + k.set_contents_from_string('foo') + log.debug('created new version id=%s', k.version_id) + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + + # put object tagging on primary zone + primary.s3_client.put_object_tagging(Bucket=bucket.name, + Key= objname, + VersionId = k.version_id, + Tagging={ + 'TagSet': [ + { + 'Key': 'key1', + 'Value': 'value1' + }, + ]}) + + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + + response = secondary.s3_client.get_object_tagging(Bucket=bucket.name, + Key = objname, + VersionId = k.version_id) + assert(response['TagSet'][0]['Key'] == 'key1') + assert(response['TagSet'][0]['Value'] == 'value1') + + primary.s3_client.delete_object_tagging(Bucket=bucket.name, + Key= objname, + VersionId = k.version_id) + + zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name) + + response = secondary.s3_client.get_object_tagging(Bucket=bucket.name, + Key = objname, + VersionId = k.version_id) + + assert(response['TagSet'] == [])