]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/multisite-notification: Add integration test for bucket notifications in multisit...
authorkchheda3 <kchheda3@bloomberg.net>
Mon, 11 Dec 2023 20:47:30 +0000 (15:47 -0500)
committerCasey Bodley <cbodley@redhat.com>
Wed, 10 Apr 2024 13:18:06 +0000 (09:18 -0400)
Signed-off-by: kchheda3 <kchheda3@bloomberg.net>
(cherry picked from commit 1ed8df24ae0f279a6e7d294231b5e6c1e45fb663)

src/test/rgw/rgw_multi/conn.py
src/test/rgw/rgw_multi/multisite.py
src/test/rgw/rgw_multi/tests.py
src/test/rgw/rgw_multi/zone_cloud.py
src/test/rgw/rgw_multi/zone_es.py
src/test/rgw/rgw_multi/zone_rados.py

index 59bc2fdd3d2f7b1a3246ad4019d2bdfdcea2334b..0ef66b0c4ca9405119952dc3ecef73b1e6388a94 100644 (file)
@@ -1,6 +1,7 @@
 import boto
 import boto.s3.connection
 import boto.iam.connection
+import boto3
 
 def get_gateway_connection(gateway, credentials):
     """ connect to the given gateway """
@@ -39,3 +40,25 @@ def get_gateway_iam_connection(gateway, credentials):
                 port = gateway.port,
                 is_secure = False)
     return gateway.iam_connection
+
+
+def get_gateway_s3_client(gateway, credentials, region):
+  """ connect to boto3 s3 client api of the given gateway """
+  if gateway.s3_client is None:
+      gateway.s3_client = boto3.client('s3',
+                                        endpoint_url='http://' + gateway.host + ':' + str(gateway.port),
+                                        aws_access_key_id=credentials.access_key,
+                                        aws_secret_access_key=credentials.secret,
+                                        region_name=region)
+  return gateway.s3_client
+
+
+def get_gateway_sns_client(gateway, credentials, region):
+  """ connect to boto3 s3 client api of the given gateway """
+  if gateway.sns_client is None:
+      gateway.sns_client = boto3.client('sns',
+                                        endpoint_url='http://' + gateway.host + ':' + str(gateway.port),
+                                        aws_access_key_id=credentials.access_key,
+                                        aws_secret_access_key=credentials.secret,
+                                        region_name=region)
+  return gateway.sns_client
index 5d4dcd1aa7ae1480c038f2d33088b027ede9aa93..8642ea3a57ceea47f83ecee135216457e67adb5e 100644 (file)
@@ -3,7 +3,7 @@ from io import StringIO
 
 import json
 
-from .conn import get_gateway_connection, get_gateway_iam_connection, get_gateway_secure_connection
+from .conn import get_gateway_connection, get_gateway_iam_connection, get_gateway_secure_connection, get_gateway_s3_client, get_gateway_sns_client
 
 class Cluster:
     """ interface to run commands against a distinct ceph cluster """
@@ -27,6 +27,8 @@ class Gateway:
         self.secure_connection = None
         self.ssl_port = ssl_port
         self.iam_connection = None
+        self.s3_client = None
+        self.sns_client = None
 
     @abstractmethod
     def start(self, args = []):
@@ -190,6 +192,9 @@ class ZoneConn(object):
             self.secure_conn = get_gateway_secure_connection(self.zone.gateways[0], self.credentials)
 
             self.iam_conn = get_gateway_iam_connection(self.zone.gateways[0], self.credentials)
+            region = "" if self.zone.zonegroup is None else self.zone.zonegroup.name
+            self.s3_client = get_gateway_s3_client(self.zone.gateways[0], self.credentials, region)
+            self.sns_client = get_gateway_sns_client(self.zone.gateways[0], self.credentials,region)
 
             # create connections for the rest of the gateways (if exist)
             for gw in list(self.zone.gateways):
index 923bea58888604ff5add8d0cb9357d3c07361df6..6958d39920cfbc5be7f4a026a690a118d53820c2 100644 (file)
@@ -17,7 +17,7 @@ from boto.s3.website import WebsiteConfiguration
 from boto.s3.cors import CORSConfiguration
 
 from nose.tools import eq_ as eq
-from nose.tools import assert_not_equal, assert_equal
+from nose.tools import assert_not_equal, assert_equal, assert_true, assert_false
 from nose.plugins.attrib import attr
 from nose.plugins.skip import SkipTest
 
@@ -66,6 +66,7 @@ num_buckets = 0
 run_prefix=''.join(random.choice(string.ascii_lowercase) for _ in range(6))
 
 num_roles = 0
+num_topic = 0
 
 def get_zone_connection(zone, credentials):
     """ connect to the zone's first gateway """
@@ -455,6 +456,13 @@ def gen_role_name():
     num_roles += 1
     return "roles" + '-' + run_prefix + '-' + str(num_roles)
 
+
+def gen_topic_name():
+    global num_topic
+
+    num_topic += 1
+    return "topic" + '-' + run_prefix + '-' + str(num_topic)
+
 class ZonegroupConns:
     def __init__(self, zonegroup):
         self.zonegroup = zonegroup
@@ -502,6 +510,34 @@ def check_all_buckets_dont_exist(zone_conn, buckets):
 
     return True
 
+
+def get_topics(zone):
+    """
+    Get list of topics in cluster.
+    """
+    cmd = ['topic', 'list'] + zone.zone_args()
+    topics_json, _ = zone.cluster.admin(cmd, read_only=True)
+    topics = json.loads(topics_json)
+    return topics['topics']
+
+
+def create_topic_per_zone(zonegroup_conns, topics_per_zone=1):
+    topics = []
+    zone_topic = []
+    for zone in zonegroup_conns.rw_zones:
+        for _ in range(topics_per_zone):
+            topic_name = gen_topic_name()
+            log.info('create topic zone=%s name=%s', zone.name, topic_name)
+            attributes = {
+                "push-endpoint": "http://kaboom:9999",
+                "persistent": "true",
+            }
+            topic_arn = zone.create_topic(topic_name, attributes)
+            topics.append(topic_arn)
+            zone_topic.append((zone, topic_arn))
+
+    return topics, zone_topic
+
 def create_role_per_zone(zonegroup_conns, roles_per_zone = 1):
     roles = []
     zone_role = []
@@ -3109,3 +3145,105 @@ def test_sync_flow_symmetrical_zonegroup_all_rgw_down():
         test_sync_flow_symmetrical_zonegroup_all()
     finally:
         start_2nd_rgw(zonegroup)
+
+def test_topic_notification_sync():
+    zonegroup = realm.master_zonegroup()
+    zonegroup_meta_checkpoint(zonegroup)
+    # let wait for users and other settings to sync across all zones.
+    time.sleep(config.checkpoint_delay)
+    # create topics in each zone.
+    zonegroup_conns = ZonegroupConns(zonegroup)
+    topic_arns, zone_topic = create_topic_per_zone(zonegroup_conns)
+    log.debug("topic_arns: %s", topic_arns)
+
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # verify topics exists in all zones
+    for conn in zonegroup_conns.zones:
+        topic_list = conn.list_topics()
+        log.debug("topics for zone=%s = %s", conn.name, topic_list)
+        assert_equal(len(topic_list), len(topic_arns))
+        for topic_arn_map in topic_list:
+            assert_true(topic_arn_map['TopicArn'] in topic_arns)
+
+    # create a bucket
+    bucket = zonegroup_conns.rw_zones[0].create_bucket(gen_bucket_name())
+    log.debug('created bucket=%s', bucket.name)
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # create bucket_notification in each zone.
+    notification_ids = []
+    num = 1
+    for zone_conn, topic_arn in zone_topic:
+        noti_id = "bn" + '-' + run_prefix + '-' + str(num)
+        notification_ids.append(noti_id)
+        topic_conf = {'Id': noti_id,
+                      'TopicArn': topic_arn,
+                      'Events': ['s3:ObjectCreated:*']
+                     }
+        num += 1
+        log.info('creating bucket notification for zone=%s name=%s', zone_conn.name, noti_id)
+        zone_conn.create_notification(bucket.name, [topic_conf])
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # verify notifications exists in all zones
+    for conn in zonegroup_conns.zones:
+        notification_list = conn.list_notifications(bucket.name)
+        log.debug("notifications for zone=%s = %s", conn.name, notification_list)
+        assert_equal(len(notification_list), len(topic_arns))
+        for notification in notification_list:
+            assert_true(notification['Id'] in notification_ids)
+
+    # verify bucket_topic mapping
+    # create a new bucket and subcribe it to first topic.
+    bucket_2 = zonegroup_conns.rw_zones[0].create_bucket(gen_bucket_name())
+    notif_id = "bn-2" + '-' + run_prefix
+    topic_conf = {'Id': notif_id,
+                  'TopicArn': topic_arns[0],
+                  'Events': ['s3:ObjectCreated:*']
+                  }
+    zonegroup_conns.rw_zones[0].create_notification(bucket_2.name, [topic_conf])
+    zonegroup_meta_checkpoint(zonegroup)
+    for conn in zonegroup_conns.zones:
+        topics = get_topics(conn.zone)
+        for topic in topics:
+            if topic['arn'] == topic_arns[0]:
+                assert_equal(len(topic['subscribed_buckets']), 2)
+                assert_true(bucket_2.name in topic['subscribed_buckets'])
+            else:
+                assert_equal(len(topic['subscribed_buckets']), 1)
+            assert_true(bucket.name in topic['subscribed_buckets'])
+
+    # delete the 2nd bucket and verify the mapping is removed.
+    zonegroup_conns.rw_zones[0].delete_bucket(bucket_2.name)
+    zonegroup_meta_checkpoint(zonegroup)
+    for conn in zonegroup_conns.zones:
+        topics = get_topics(conn.zone)
+        for topic in topics:
+            assert_equal(len(topic['subscribed_buckets']), 1)
+        '''TODO(Remove the break once the https://tracker.ceph.com/issues/20802
+           is fixed, as the secondary site bucket instance info is currently not
+           getting deleted coz of the bug hence the bucket-topic mapping
+           deletion is not invoked on secondary sites.)'''
+        break
+
+    # delete notifications
+    zonegroup_conns.rw_zones[0].delete_notifications(bucket.name)
+    log.debug('Deleting all notifications for  bucket=%s', bucket.name)
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # verify notification deleted in all zones
+    for conn in zonegroup_conns.zones:
+        notification_list = conn.list_notifications(bucket.name)
+        assert_equal(len(notification_list), 0)
+
+    # delete topics
+    for zone_conn, topic_arn in zone_topic:
+        log.debug('deleting topic zone=%s arn=%s', zone_conn.name, topic_arn)
+        zone_conn.delete_topic(topic_arn)
+    zonegroup_meta_checkpoint(zonegroup)
+
+    # verify topics deleted in all zones
+    for conn in zonegroup_conns.zones:
+        topic_list = conn.list_topics()
+        assert_equal(len(topic_list), 0)
index 7c94aaa8a60f456c262616aa312b318fb3952328..fdec751ff5222e5db1c4a609169726bb0b35cf6e 100644 (file)
@@ -310,6 +310,27 @@ class CloudZone(Zone):
         def has_role(self, role_name):
             assert False
 
+        def create_topic(self, topicname, attributes):
+            assert False
+
+        def delete_topic(self, topic_arn):
+            assert False
+
+        def get_topic(self, topic_arn):
+            assert False
+
+        def list_topics(self):
+            assert False
+
+        def create_notification(self, bucket_name, config):
+            assert False
+
+        def delete_notifications(self, bucket_name):
+            assert False
+
+        def list_notifications(self, bucket_name):
+            assert False
+
     def get_conn(self, credentials):
         return self.Conn(self, credentials)
 
index 84628b775d1cb6018c304215e11e6ed304dd4bc9..2ccdcf042332333d4c16303007085e7bb66f8a97 100644 (file)
@@ -252,6 +252,27 @@ class ESZone(Zone):
         def has_role(self, role_name):
             assert False
 
+        def create_topic(self, topicname, attributes):
+            assert False
+
+        def delete_topic(self, topic_arn):
+            assert False
+
+        def list_topics(self):
+            assert False
+
+        def get_topic(self, topic_arn):
+            assert False
+
+        def create_notification(self, bucket_name, config):
+            assert False
+
+        def delete_notification(self, bucket_name):
+            assert False
+
+        def list_notifications(self, bucket_name):
+            assert False
+
     def get_conn(self, credentials):
         return self.Conn(self, credentials)
 
index 7b7fe5228cbfdf083553b372d07ed6e72508649c..3761676a3d781368f2267b6495de9c80541d5047 100644 (file)
@@ -142,6 +142,34 @@ class RadosZone(Zone):
                 return False
             return True
 
+        def create_topic(self, topicname, attributes):
+            result = self.sns_client.create_topic(Name=topicname, Attributes=attributes)
+            self.topic_arn = result['TopicArn']
+            return self.topic_arn
+
+        def delete_topic(self, topic_arn):
+            return self.sns_client.delete_topic(TopicArn=topic_arn)
+
+        def get_topic(self, topic_arn):
+            return self.sns_client.get_topic_attributes(TopicArn=topic_arn)
+
+        def list_topics(self):
+            return self.sns_client.list_topics()['Topics']
+
+        def create_notification(self, bucket_name, topic_conf_list):
+            return self.s3_client.put_bucket_notification_configuration(
+                Bucket=bucket_name, NotificationConfiguration={'TopicConfigurations': topic_conf_list})
+
+        def delete_notifications(self, bucket_name):
+            return self.s3_client.put_bucket_notification_configuration(Bucket=bucket_name,
+                                                                        NotificationConfiguration={})
+
+        def list_notifications(self, bucket_name):
+            out = self.s3_client.get_bucket_notification_configuration(Bucket=bucket_name)
+            if 'TopicConfigurations' in out:
+              return out['TopicConfigurations']
+            return []
+
     def get_conn(self, credentials):
         return self.Conn(self, credentials)