http_server.close()
+@attr('data_path_v2_test')
+def test_ps_s3_list_topics_migration():
+ """ test list topics on migration"""
+ if get_config_cluster() == 'noname':
+ return SkipTest('realm is needed for migration test')
+
+ # Initialize connections and configurations
+ conn1 = connection()
+ tenant = 'kaboom1'
+ conn2 = connect_random_user(tenant)
+ bucket_name = gen_bucket_name()
+ topics = [f"{bucket_name}{TOPIC_SUFFIX}{i}" for i in range(1, 7)]
+ tenant_topics = [f"{tenant}_{topic}" for topic in topics]
+
+ # Define topic names with version
+ topic_versions = {
+ "topic1_v2": f"{topics[0]}_v2",
+ "topic2_v2": f"{topics[1]}_v2",
+ "topic3_v1": f"{topics[2]}_v1",
+ "topic4_v1": f"{topics[3]}_v1",
+ "topic5_v1": f"{topics[4]}_v1",
+ "topic6_v1": f"{topics[5]}_v1",
+ "tenant_topic1_v2": f"{tenant_topics[0]}_v2",
+ "tenant_topic2_v1": f"{tenant_topics[1]}_v1",
+ "tenant_topic3_v1": f"{tenant_topics[2]}_v1"
+ }
+
+ # Get necessary configurations
+ host = get_ip()
+ http_port = random.randint(10000, 20000)
+ endpoint_address = 'http://' + host + ':' + str(http_port)
+ endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true'
+ zonegroup = get_config_zonegroup()
+ conf_cluster = get_config_cluster()
+
+ # Make sure there are no leftover topics on v2
+ zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
+ delete_all_topics(conn1, '', conf_cluster)
+ delete_all_topics(conn2, tenant, conf_cluster)
+
+ # Start v1 notification
+ # Make sure there are no leftover topics on v1
+ zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2)
+ delete_all_topics(conn1, '', conf_cluster)
+ delete_all_topics(conn2, tenant, conf_cluster)
+
+ # Create s3 - v1 topics
+ topic_conf = PSTopicS3(conn1, topic_versions['topic3_v1'], zonegroup, endpoint_args=endpoint_args)
+ topic_arn3 = topic_conf.set_config()
+ topic_conf = PSTopicS3(conn1, topic_versions['topic4_v1'], zonegroup, endpoint_args=endpoint_args)
+ topic_arn4 = topic_conf.set_config()
+ topic_conf = PSTopicS3(conn1, topic_versions['topic5_v1'], zonegroup, endpoint_args=endpoint_args)
+ topic_arn5 = topic_conf.set_config()
+ topic_conf = PSTopicS3(conn1, topic_versions['topic6_v1'], zonegroup, endpoint_args=endpoint_args)
+ topic_arn6 = topic_conf.set_config()
+ tenant_topic_conf = PSTopicS3(conn2, topic_versions['tenant_topic2_v1'], zonegroup, endpoint_args=endpoint_args)
+ tenant_topic_arn2 = tenant_topic_conf.set_config()
+ tenant_topic_conf = PSTopicS3(conn2, topic_versions['tenant_topic3_v1'], zonegroup, endpoint_args=endpoint_args)
+ tenant_topic_arn3 = tenant_topic_conf.set_config()
+
+ # Start v2 notification
+ zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
+
+ # Create s3 - v2 topics
+ topic_conf = PSTopicS3(conn1, topic_versions['topic1_v2'], zonegroup, endpoint_args=endpoint_args)
+ topic_arn1 = topic_conf.set_config()
+ topic_conf = PSTopicS3(conn1, topic_versions['topic2_v2'], zonegroup, endpoint_args=endpoint_args)
+ topic_arn2 = topic_conf.set_config()
+ tenant_topic_conf = PSTopicS3(conn2, topic_versions['tenant_topic1_v2'], zonegroup, endpoint_args=endpoint_args)
+ tenant_topic_arn1 = tenant_topic_conf.set_config()
+
+ # Verify topics list
+ try:
+ # Verify no tenant topics
+ res, status = topic_conf.get_list()
+ assert_equal(status // 100, 2)
+ listTopicsResponse = res.get('ListTopicsResponse', {})
+ listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
+ topics = listTopicsResult.get('Topics', {})
+ member = topics['member'] if topics else []
+ assert_equal(len(member), 6)
+
+ # Verify tenant topics
+ res, status = tenant_topic_conf.get_list()
+ assert_equal(status // 100, 2)
+ listTopicsResponse = res.get('ListTopicsResponse', {})
+ listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
+ topics = listTopicsResult.get('Topics', {})
+ member = topics['member'] if topics else []
+ assert_equal(len(member), 3)
+ finally:
+ # Cleanup created topics
+ topic_conf.del_config(topic_arn1)
+ topic_conf.del_config(topic_arn2)
+ topic_conf.del_config(topic_arn3)
+ topic_conf.del_config(topic_arn4)
+ topic_conf.del_config(topic_arn5)
+ topic_conf.del_config(topic_arn6)
+ tenant_topic_conf.del_config(tenant_topic_arn1)
+ tenant_topic_conf.del_config(tenant_topic_arn2)
+ tenant_topic_conf.del_config(tenant_topic_arn3)
+
+
+@attr('basic_test')
+def test_ps_s3_list_topics():
+ """ test list topics"""
+
+ # Initialize connections, topic names and configurations
+ conn1 = connection()
+ tenant = 'kaboom1'
+ conn2 = connect_random_user(tenant)
+ bucket_name = gen_bucket_name()
+ topic_name1 = bucket_name + TOPIC_SUFFIX + '1'
+ topic_name2 = bucket_name + TOPIC_SUFFIX + '2'
+ topic_name3 = bucket_name + TOPIC_SUFFIX + '3'
+ tenant_topic_name1 = tenant + "_" + topic_name1
+ tenant_topic_name2 = tenant + "_" + topic_name2
+ host = get_ip()
+ http_port = random.randint(10000, 20000)
+ endpoint_address = 'http://' + host + ':' + str(http_port)
+ endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true'
+ zonegroup = get_config_zonegroup()
+
+ # Make sure there are no leftover topics
+ delete_all_topics(conn1, '', get_config_cluster())
+ delete_all_topics(conn2, tenant, get_config_cluster())
+
+ # Create s3 - v2 topics
+ topic_conf = PSTopicS3(conn1, topic_name1, zonegroup, endpoint_args=endpoint_args)
+ topic_arn1 = topic_conf.set_config()
+ topic_conf = PSTopicS3(conn1, topic_name2, zonegroup, endpoint_args=endpoint_args)
+ topic_arn2 = topic_conf.set_config()
+ topic_conf = PSTopicS3(conn1, topic_name3, zonegroup, endpoint_args=endpoint_args)
+ topic_arn3 = topic_conf.set_config()
+ tenant_topic_conf = PSTopicS3(conn2, tenant_topic_name1, zonegroup, endpoint_args=endpoint_args)
+ tenant_topic_arn1 = tenant_topic_conf.set_config()
+ tenant_topic_conf = PSTopicS3(conn2, tenant_topic_name2, zonegroup, endpoint_args=endpoint_args)
+ tenant_topic_arn2 = tenant_topic_conf.set_config()
+
+ # Verify topics list
+ try:
+ # Verify no tenant topics
+ res, status = topic_conf.get_list()
+ assert_equal(status // 100, 2)
+ listTopicsResponse = res.get('ListTopicsResponse', {})
+ listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
+ topics = listTopicsResult.get('Topics', {})
+ member = topics['member'] if topics else [] # version 2
+ assert_equal(len(member), 3)
+
+ # Verify topics for tenant
+ res, status = tenant_topic_conf.get_list()
+ assert_equal(status // 100, 2)
+ listTopicsResponse = res.get('ListTopicsResponse', {})
+ listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
+ topics = listTopicsResult.get('Topics', {})
+ member = topics['member'] if topics else []
+ assert_equal(len(member), 2)
+ finally:
+ # Cleanup created topics
+ topic_conf.del_config(topic_arn1)
+ topic_conf.del_config(topic_arn2)
+ topic_conf.del_config(topic_arn3)
+ tenant_topic_conf.del_config(tenant_topic_arn1)
+ tenant_topic_conf.del_config(tenant_topic_arn2)
+
+@attr('data_path_v2_test')
+def test_ps_s3_list_topics_v1():
+ """ test list topics on v1"""
+ if get_config_cluster() == 'noname':
+ return SkipTest('realm is needed')
+
+ # Initialize connections and configurations
+ conn1 = connection()
+ tenant = 'kaboom1'
+ conn2 = connect_random_user(tenant)
+ bucket_name = gen_bucket_name()
+ topic_name1 = bucket_name + TOPIC_SUFFIX + '1'
+ topic_name2 = bucket_name + TOPIC_SUFFIX + '2'
+ topic_name3 = bucket_name + TOPIC_SUFFIX + '3'
+ tenant_topic_name1 = tenant + "_" + topic_name1
+ tenant_topic_name2 = tenant + "_" + topic_name2
+ host = get_ip()
+ http_port = random.randint(10000, 20000)
+ endpoint_address = 'http://' + host + ':' + str(http_port)
+ endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true'
+ zonegroup = get_config_zonegroup()
+ conf_cluster = get_config_cluster()
+
+ # Make sure there are no leftover topics
+ delete_all_topics(conn1, '', conf_cluster)
+ delete_all_topics(conn2, tenant, conf_cluster)
+
+ # Make sure that we disable v2
+ zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2)
+
+ # Create s3 - v1 topics
+ topic_conf = PSTopicS3(conn1, topic_name1, zonegroup, endpoint_args=endpoint_args)
+ topic_arn1 = topic_conf.set_config()
+ topic_conf = PSTopicS3(conn1, topic_name2, zonegroup, endpoint_args=endpoint_args)
+ topic_arn2 = topic_conf.set_config()
+ topic_conf = PSTopicS3(conn1, topic_name3, zonegroup, endpoint_args=endpoint_args)
+ topic_arn3 = topic_conf.set_config()
+ tenant_topic_conf = PSTopicS3(conn2, tenant_topic_name1, zonegroup, endpoint_args=endpoint_args)
+ tenant_topic_arn1 = tenant_topic_conf.set_config()
+ tenant_topic_conf = PSTopicS3(conn2, tenant_topic_name2, zonegroup, endpoint_args=endpoint_args)
+ tenant_topic_arn2 = tenant_topic_conf.set_config()
+
+ # Verify topics list
+ try:
+ # Verify no tenant topics
+ res, status = topic_conf.get_list()
+ assert_equal(status // 100, 2)
+ listTopicsResponse = res.get('ListTopicsResponse', {})
+ listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
+ topics = listTopicsResult.get('Topics', {})
+ member = topics['member'] if topics else []
+ assert_equal(len(member), 3)
+
+ # Verify tenant topics
+ res, status = tenant_topic_conf.get_list()
+ assert_equal(status // 100, 2)
+ listTopicsResponse = res.get('ListTopicsResponse', {})
+ listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
+ topics = listTopicsResult.get('Topics', {})
+ member = topics['member'] if topics else []
+ assert_equal(len(member), 2)
+ finally:
+ # Cleanup created topics
+ topic_conf.del_config(topic_arn1)
+ topic_conf.del_config(topic_arn2)
+ topic_conf.del_config(topic_arn3)
+ tenant_topic_conf.del_config(tenant_topic_arn1)
+ tenant_topic_conf.del_config(tenant_topic_arn2)
+
+
@attr('basic_test')
def test_ps_s3_topic_permissions():
""" test s3 topic set/get/delete permissions """