From 6b935f18ba0f2f53e84d7db3fdb5103916bd3102 Mon Sep 17 00:00:00 2001 From: kchheda3 Date: Mon, 11 Dec 2023 15:47:30 -0500 Subject: [PATCH] rgw/multisite-notification: Add integration test for bucket notifications in multisite config. Signed-off-by: kchheda3 (cherry picked from commit 1ed8df24ae0f279a6e7d294231b5e6c1e45fb663) --- src/test/rgw/rgw_multi/conn.py | 23 +++++ src/test/rgw/rgw_multi/multisite.py | 7 +- src/test/rgw/rgw_multi/tests.py | 140 ++++++++++++++++++++++++++- src/test/rgw/rgw_multi/zone_cloud.py | 21 ++++ src/test/rgw/rgw_multi/zone_es.py | 21 ++++ src/test/rgw/rgw_multi/zone_rados.py | 28 ++++++ 6 files changed, 238 insertions(+), 2 deletions(-) diff --git a/src/test/rgw/rgw_multi/conn.py b/src/test/rgw/rgw_multi/conn.py index 59bc2fdd3d2f7..0ef66b0c4ca94 100644 --- a/src/test/rgw/rgw_multi/conn.py +++ b/src/test/rgw/rgw_multi/conn.py @@ -1,6 +1,7 @@ import boto import boto.s3.connection import boto.iam.connection +import boto3 def get_gateway_connection(gateway, credentials): """ connect to the given gateway """ @@ -39,3 +40,25 @@ def get_gateway_iam_connection(gateway, credentials): port = gateway.port, is_secure = False) return gateway.iam_connection + + +def get_gateway_s3_client(gateway, credentials, region): + """ connect to boto3 s3 client api of the given gateway """ + if gateway.s3_client is None: + gateway.s3_client = boto3.client('s3', + endpoint_url='http://' + gateway.host + ':' + str(gateway.port), + aws_access_key_id=credentials.access_key, + aws_secret_access_key=credentials.secret, + region_name=region) + return gateway.s3_client + + +def get_gateway_sns_client(gateway, credentials, region): + """ connect to boto3 s3 client api of the given gateway """ + if gateway.sns_client is None: + gateway.sns_client = boto3.client('sns', + endpoint_url='http://' + gateway.host + ':' + str(gateway.port), + aws_access_key_id=credentials.access_key, + aws_secret_access_key=credentials.secret, + region_name=region) + return gateway.sns_client diff --git a/src/test/rgw/rgw_multi/multisite.py b/src/test/rgw/rgw_multi/multisite.py index 5d4dcd1aa7ae1..8642ea3a57cee 100644 --- a/src/test/rgw/rgw_multi/multisite.py +++ b/src/test/rgw/rgw_multi/multisite.py @@ -3,7 +3,7 @@ from io import StringIO import json -from .conn import get_gateway_connection, get_gateway_iam_connection, get_gateway_secure_connection +from .conn import get_gateway_connection, get_gateway_iam_connection, get_gateway_secure_connection, get_gateway_s3_client, get_gateway_sns_client class Cluster: """ interface to run commands against a distinct ceph cluster """ @@ -27,6 +27,8 @@ class Gateway: self.secure_connection = None self.ssl_port = ssl_port self.iam_connection = None + self.s3_client = None + self.sns_client = None @abstractmethod def start(self, args = []): @@ -190,6 +192,9 @@ class ZoneConn(object): self.secure_conn = get_gateway_secure_connection(self.zone.gateways[0], self.credentials) self.iam_conn = get_gateway_iam_connection(self.zone.gateways[0], self.credentials) + region = "" if self.zone.zonegroup is None else self.zone.zonegroup.name + self.s3_client = get_gateway_s3_client(self.zone.gateways[0], self.credentials, region) + self.sns_client = get_gateway_sns_client(self.zone.gateways[0], self.credentials,region) # create connections for the rest of the gateways (if exist) for gw in list(self.zone.gateways): diff --git a/src/test/rgw/rgw_multi/tests.py b/src/test/rgw/rgw_multi/tests.py index 923bea5888860..6958d39920cfb 100644 --- a/src/test/rgw/rgw_multi/tests.py +++ b/src/test/rgw/rgw_multi/tests.py @@ -17,7 +17,7 @@ from boto.s3.website import WebsiteConfiguration from boto.s3.cors import CORSConfiguration from nose.tools import eq_ as eq -from nose.tools import assert_not_equal, assert_equal +from nose.tools import assert_not_equal, assert_equal, assert_true, assert_false from nose.plugins.attrib import attr from nose.plugins.skip import SkipTest @@ -66,6 +66,7 @@ num_buckets = 0 run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6)) num_roles = 0 +num_topic = 0 def get_zone_connection(zone, credentials): """ connect to the zone's first gateway """ @@ -455,6 +456,13 @@ def gen_role_name(): num_roles += 1 return "roles" + '-' + run_prefix + '-' + str(num_roles) + +def gen_topic_name(): + global num_topic + + num_topic += 1 + return "topic" + '-' + run_prefix + '-' + str(num_topic) + class ZonegroupConns: def __init__(self, zonegroup): self.zonegroup = zonegroup @@ -502,6 +510,34 @@ def check_all_buckets_dont_exist(zone_conn, buckets): return True + +def get_topics(zone): + """ + Get list of topics in cluster. + """ + cmd = ['topic', 'list'] + zone.zone_args() + topics_json, _ = zone.cluster.admin(cmd, read_only=True) + topics = json.loads(topics_json) + return topics['topics'] + + +def create_topic_per_zone(zonegroup_conns, topics_per_zone=1): + topics = [] + zone_topic = [] + for zone in zonegroup_conns.rw_zones: + for _ in range(topics_per_zone): + topic_name = gen_topic_name() + log.info('create topic zone=%s name=%s', zone.name, topic_name) + attributes = { + "push-endpoint": "http://kaboom:9999", + "persistent": "true", + } + topic_arn = zone.create_topic(topic_name, attributes) + topics.append(topic_arn) + zone_topic.append((zone, topic_arn)) + + return topics, zone_topic + def create_role_per_zone(zonegroup_conns, roles_per_zone = 1): roles = [] zone_role = [] @@ -3109,3 +3145,105 @@ def test_sync_flow_symmetrical_zonegroup_all_rgw_down(): test_sync_flow_symmetrical_zonegroup_all() finally: start_2nd_rgw(zonegroup) + +def test_topic_notification_sync(): + zonegroup = realm.master_zonegroup() + zonegroup_meta_checkpoint(zonegroup) + # let wait for users and other settings to sync across all zones. + time.sleep(config.checkpoint_delay) + # create topics in each zone. + zonegroup_conns = ZonegroupConns(zonegroup) + topic_arns, zone_topic = create_topic_per_zone(zonegroup_conns) + log.debug("topic_arns: %s", topic_arns) + + zonegroup_meta_checkpoint(zonegroup) + + # verify topics exists in all zones + for conn in zonegroup_conns.zones: + topic_list = conn.list_topics() + log.debug("topics for zone=%s = %s", conn.name, topic_list) + assert_equal(len(topic_list), len(topic_arns)) + for topic_arn_map in topic_list: + assert_true(topic_arn_map['TopicArn'] in topic_arns) + + # create a bucket + bucket = zonegroup_conns.rw_zones[0].create_bucket(gen_bucket_name()) + log.debug('created bucket=%s', bucket.name) + zonegroup_meta_checkpoint(zonegroup) + + # create bucket_notification in each zone. + notification_ids = [] + num = 1 + for zone_conn, topic_arn in zone_topic: + noti_id = "bn" + '-' + run_prefix + '-' + str(num) + notification_ids.append(noti_id) + topic_conf = {'Id': noti_id, + 'TopicArn': topic_arn, + 'Events': ['s3:ObjectCreated:*'] + } + num += 1 + log.info('creating bucket notification for zone=%s name=%s', zone_conn.name, noti_id) + zone_conn.create_notification(bucket.name, [topic_conf]) + zonegroup_meta_checkpoint(zonegroup) + + # verify notifications exists in all zones + for conn in zonegroup_conns.zones: + notification_list = conn.list_notifications(bucket.name) + log.debug("notifications for zone=%s = %s", conn.name, notification_list) + assert_equal(len(notification_list), len(topic_arns)) + for notification in notification_list: + assert_true(notification['Id'] in notification_ids) + + # verify bucket_topic mapping + # create a new bucket and subcribe it to first topic. + bucket_2 = zonegroup_conns.rw_zones[0].create_bucket(gen_bucket_name()) + notif_id = "bn-2" + '-' + run_prefix + topic_conf = {'Id': notif_id, + 'TopicArn': topic_arns[0], + 'Events': ['s3:ObjectCreated:*'] + } + zonegroup_conns.rw_zones[0].create_notification(bucket_2.name, [topic_conf]) + zonegroup_meta_checkpoint(zonegroup) + for conn in zonegroup_conns.zones: + topics = get_topics(conn.zone) + for topic in topics: + if topic['arn'] == topic_arns[0]: + assert_equal(len(topic['subscribed_buckets']), 2) + assert_true(bucket_2.name in topic['subscribed_buckets']) + else: + assert_equal(len(topic['subscribed_buckets']), 1) + assert_true(bucket.name in topic['subscribed_buckets']) + + # delete the 2nd bucket and verify the mapping is removed. + zonegroup_conns.rw_zones[0].delete_bucket(bucket_2.name) + zonegroup_meta_checkpoint(zonegroup) + for conn in zonegroup_conns.zones: + topics = get_topics(conn.zone) + for topic in topics: + assert_equal(len(topic['subscribed_buckets']), 1) + '''TODO(Remove the break once the https://tracker.ceph.com/issues/20802 + is fixed, as the secondary site bucket instance info is currently not + getting deleted coz of the bug hence the bucket-topic mapping + deletion is not invoked on secondary sites.)''' + break + + # delete notifications + zonegroup_conns.rw_zones[0].delete_notifications(bucket.name) + log.debug('Deleting all notifications for bucket=%s', bucket.name) + zonegroup_meta_checkpoint(zonegroup) + + # verify notification deleted in all zones + for conn in zonegroup_conns.zones: + notification_list = conn.list_notifications(bucket.name) + assert_equal(len(notification_list), 0) + + # delete topics + for zone_conn, topic_arn in zone_topic: + log.debug('deleting topic zone=%s arn=%s', zone_conn.name, topic_arn) + zone_conn.delete_topic(topic_arn) + zonegroup_meta_checkpoint(zonegroup) + + # verify topics deleted in all zones + for conn in zonegroup_conns.zones: + topic_list = conn.list_topics() + assert_equal(len(topic_list), 0) diff --git a/src/test/rgw/rgw_multi/zone_cloud.py b/src/test/rgw/rgw_multi/zone_cloud.py index 7c94aaa8a60f4..fdec751ff5222 100644 --- a/src/test/rgw/rgw_multi/zone_cloud.py +++ b/src/test/rgw/rgw_multi/zone_cloud.py @@ -310,6 +310,27 @@ class CloudZone(Zone): def has_role(self, role_name): assert False + def create_topic(self, topicname, attributes): + assert False + + def delete_topic(self, topic_arn): + assert False + + def get_topic(self, topic_arn): + assert False + + def list_topics(self): + assert False + + def create_notification(self, bucket_name, config): + assert False + + def delete_notifications(self, bucket_name): + assert False + + def list_notifications(self, bucket_name): + assert False + def get_conn(self, credentials): return self.Conn(self, credentials) diff --git a/src/test/rgw/rgw_multi/zone_es.py b/src/test/rgw/rgw_multi/zone_es.py index 84628b775d1cb..2ccdcf0423323 100644 --- a/src/test/rgw/rgw_multi/zone_es.py +++ b/src/test/rgw/rgw_multi/zone_es.py @@ -252,6 +252,27 @@ class ESZone(Zone): def has_role(self, role_name): assert False + def create_topic(self, topicname, attributes): + assert False + + def delete_topic(self, topic_arn): + assert False + + def list_topics(self): + assert False + + def get_topic(self, topic_arn): + assert False + + def create_notification(self, bucket_name, config): + assert False + + def delete_notification(self, bucket_name): + assert False + + def list_notifications(self, bucket_name): + assert False + def get_conn(self, credentials): return self.Conn(self, credentials) diff --git a/src/test/rgw/rgw_multi/zone_rados.py b/src/test/rgw/rgw_multi/zone_rados.py index 7b7fe5228cbfd..3761676a3d781 100644 --- a/src/test/rgw/rgw_multi/zone_rados.py +++ b/src/test/rgw/rgw_multi/zone_rados.py @@ -142,6 +142,34 @@ class RadosZone(Zone): return False return True + def create_topic(self, topicname, attributes): + result = self.sns_client.create_topic(Name=topicname, Attributes=attributes) + self.topic_arn = result['TopicArn'] + return self.topic_arn + + def delete_topic(self, topic_arn): + return self.sns_client.delete_topic(TopicArn=topic_arn) + + def get_topic(self, topic_arn): + return self.sns_client.get_topic_attributes(TopicArn=topic_arn) + + def list_topics(self): + return self.sns_client.list_topics()['Topics'] + + def create_notification(self, bucket_name, topic_conf_list): + return self.s3_client.put_bucket_notification_configuration( + Bucket=bucket_name, NotificationConfiguration={'TopicConfigurations': topic_conf_list}) + + def delete_notifications(self, bucket_name): + return self.s3_client.put_bucket_notification_configuration(Bucket=bucket_name, + NotificationConfiguration={}) + + def list_notifications(self, bucket_name): + out = self.s3_client.get_bucket_notification_configuration(Bucket=bucket_name) + if 'TopicConfigurations' in out: + return out['TopicConfigurations'] + return [] + def get_conn(self, credentials): return self.Conn(self, credentials) -- 2.39.5