From ab26135f5133d822e34ffcd0c571bb05fa0e9743 Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Sun, 2 Feb 2020 21:03:25 +0200 Subject: [PATCH] rgw/pubsub: fix tests to sync from master Signed-off-by: Yuval Lifshitz --- src/test/rgw/rgw_multi/tests.py | 4 +- src/test/rgw/rgw_multi/tests_ps.py | 633 ++++++++++++++--------------- 2 files changed, 318 insertions(+), 319 deletions(-) diff --git a/src/test/rgw/rgw_multi/tests.py b/src/test/rgw/rgw_multi/tests.py index 911eaf0dcac..4ff642e68fe 100644 --- a/src/test/rgw/rgw_multi/tests.py +++ b/src/test/rgw/rgw_multi/tests.py @@ -354,7 +354,7 @@ def compare_bucket_status(target_zone, source_zone, bucket_name, log_status, syn return True def zone_data_checkpoint(target_zone, source_zone): - if not target_zone.syncs_from(source_zone): + if not target_zone.syncs_from(source_zone.name): return log_status = data_source_log_status(source_zone) @@ -384,7 +384,7 @@ def zonegroup_data_checkpoint(zonegroup_conns): zone_data_checkpoint(target_conn.zone, source_conn.zone) def zone_bucket_checkpoint(target_zone, source_zone, bucket_name): - if not target_zone.syncs_from(source_zone): + if not target_zone.syncs_from(source_zone.name): return log_status = bucket_source_log_status(source_zone, bucket_name) diff --git a/src/test/rgw/rgw_multi/tests_ps.py b/src/test/rgw/rgw_multi/tests_ps.py index 4eb5bb89140..05f1d797ac2 100644 --- a/src/test/rgw/rgw_multi/tests_ps.py +++ b/src/test/rgw/rgw_multi/tests_ps.py @@ -568,19 +568,19 @@ def init_env(require_ps=True): zonegroup_meta_checkpoint(zonegroup) - ps_zones = [] - zones = [] + ps_zone = None + master_zone = None for conn in zonegroup_conns.zones: + if conn.zone == zonegroup.master_zone: + master_zone = conn if is_ps_zone(conn): zone_meta_checkpoint(conn.zone) - ps_zones.append(conn) - elif not conn.zone.is_read_only(): - zones.append(conn) + ps_zone = conn - assert_not_equal(len(zones), 0) + assert_not_equal(master_zone, None) if require_ps: - assert_not_equal(len(ps_zones), 0) - return zones, ps_zones + assert_not_equal(ps_zone, None) + return master_zone, ps_zone def get_ip(): @@ -608,12 +608,12 @@ NOTIFICATION_SUFFIX = "_notif" def test_ps_info(): """ log information for manual testing """ return SkipTest("only used in manual testing") - zones, ps_zones = init_env() + master_zone, ps_zone = init_env() realm = get_realm() zonegroup = realm.master_zonegroup() bucket_name = gen_bucket_name() # create bucket on the first of the rados zones - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) # create objects in the bucket number_of_objects = 10 for i in range(number_of_objects): @@ -623,23 +623,23 @@ def test_ps_info(): print('user: ' + get_user()) print('tenant: ' + get_tenant()) print('Master Zone') - print_connection_info(zones[0].conn) + print_connection_info(master_zone.conn) print('PubSub Zone') - print_connection_info(ps_zones[0].conn) + print_connection_info(ps_zone.conn) print('Bucket: ' + bucket_name) def test_ps_s3_notification_low_level(): """ test low level implementation of s3 notifications """ - zones, ps_zones = init_env() + master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() # create bucket on the first of the rados zones - zones[0].create_bucket(bucket_name) + master_zone.create_bucket(bucket_name) # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_meta_checkpoint(ps_zone.zone) # create topic topic_name = bucket_name + TOPIC_SUFFIX - topic_conf = PSTopic(ps_zones[0].conn, topic_name) + topic_conf = PSTopic(ps_zone.conn, topic_name) result, status = topic_conf.set_config() assert_equal(status/100, 2) parsed_result = json.loads(result) @@ -651,25 +651,25 @@ def test_ps_s3_notification_low_level(): 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*'] }] - s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) - zone_meta_checkpoint(ps_zones[0].zone) + zone_meta_checkpoint(ps_zone.zone) # get auto-generated topic - generated_topic_conf = PSTopic(ps_zones[0].conn, generated_topic_name) + generated_topic_conf = PSTopic(ps_zone.conn, generated_topic_name) result, status = generated_topic_conf.get_config() parsed_result = json.loads(result) assert_equal(status/100, 2) assert_equal(parsed_result['topic']['name'], generated_topic_name) # get auto-generated notification - notification_conf = PSNotification(ps_zones[0].conn, bucket_name, + notification_conf = PSNotification(ps_zone.conn, bucket_name, generated_topic_name) result, status = notification_conf.get_config() parsed_result = json.loads(result) assert_equal(status/100, 2) assert_equal(len(parsed_result['topics']), 1) # get auto-generated subscription - sub_conf = PSSubscription(ps_zones[0].conn, notification_name, + sub_conf = PSSubscription(ps_zone.conn, notification_name, generated_topic_name) result, status = sub_conf.get_config() parsed_result = json.loads(result) @@ -699,20 +699,20 @@ def test_ps_s3_notification_low_level(): # cleanup topic_conf.del_config() # delete the bucket - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) def test_ps_s3_notification_records(): """ test s3 records fetching """ - zones, ps_zones = init_env() + master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() # create bucket on the first of the rados zones - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_meta_checkpoint(ps_zone.zone) # create topic topic_name = bucket_name + TOPIC_SUFFIX - topic_conf = PSTopic(ps_zones[0].conn, topic_name) + topic_conf = PSTopic(ps_zone.conn, topic_name) result, status = topic_conf.set_config() assert_equal(status/100, 2) parsed_result = json.loads(result) @@ -723,12 +723,12 @@ def test_ps_s3_notification_records(): 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*'] }] - s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) - zone_meta_checkpoint(ps_zones[0].zone) + zone_meta_checkpoint(ps_zone.zone) # get auto-generated subscription - sub_conf = PSSubscription(ps_zones[0].conn, notification_name, + sub_conf = PSSubscription(ps_zone.conn, notification_name, topic_name) _, status = sub_conf.get_config() assert_equal(status/100, 2) @@ -738,7 +738,7 @@ def test_ps_s3_notification_records(): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # get the events from the subscription result, _ = sub_conf.get_events() @@ -755,21 +755,21 @@ def test_ps_s3_notification_records(): # delete the keys for key in bucket.list(): key.delete() - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) def test_ps_s3_notification(): """ test s3 notification set/get/delete """ - zones, ps_zones = init_env() + master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() # create bucket on the first of the rados zones - zones[0].create_bucket(bucket_name) + master_zone.create_bucket(bucket_name) # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_meta_checkpoint(ps_zone.zone) topic_name = bucket_name + TOPIC_SUFFIX # create topic topic_name = bucket_name + TOPIC_SUFFIX - topic_conf = PSTopic(ps_zones[0].conn, topic_name) + topic_conf = PSTopic(ps_zone.conn, topic_name) response, status = topic_conf.set_config() assert_equal(status/100, 2) parsed_result = json.loads(response) @@ -780,7 +780,7 @@ def test_ps_s3_notification(): 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*'] }] - s3_notification_conf1 = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf1 = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf1.set_config() assert_equal(status/100, 2) # create another s3 notification with the same topic @@ -789,10 +789,10 @@ def test_ps_s3_notification(): 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'] }] - s3_notification_conf2 = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf2 = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf2.set_config() assert_equal(status/100, 2) - zone_meta_checkpoint(ps_zones[0].zone) + zone_meta_checkpoint(ps_zone.zone) # get all notification on a bucket response, status = s3_notification_conf1.get_config() @@ -820,37 +820,37 @@ def test_ps_s3_notification(): # cleanup topic_conf.del_config() # delete the bucket - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) def test_ps_s3_topic_on_master(): """ test s3 topics set/get/delete on master """ - zones, _ = init_env(require_ps=False) + master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() bucket_name = gen_bucket_name() topic_name = bucket_name + TOPIC_SUFFIX # clean all topics - delete_all_s3_topics(zones[0], zonegroup.name) + delete_all_s3_topics(master_zone, zonegroup.name) # create s3 topics endpoint_address = 'amqp://127.0.0.1:7001' endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' - topic_conf1 = PSTopicS3(zones[0].conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args) + topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf1.set_config() assert_equal(topic_arn, 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_1') endpoint_address = 'http://127.0.0.1:9001' endpoint_args = 'push-endpoint='+endpoint_address - topic_conf2 = PSTopicS3(zones[0].conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args) + topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf2.set_config() assert_equal(topic_arn, 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_2') endpoint_address = 'http://127.0.0.1:9002' endpoint_args = 'push-endpoint='+endpoint_address - topic_conf3 = PSTopicS3(zones[0].conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args) + topic_conf3 = PSTopicS3(master_zone.conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf3.set_config() assert_equal(topic_arn, 'arn:aws:sns:' + zonegroup.name + ':' + get_tenant() + ':' + topic_name + '_3') @@ -890,8 +890,8 @@ def test_ps_s3_topic_on_master(): def test_ps_s3_topic_with_secret_on_master(): """ test s3 topics with secret set/get/delete on master """ - zones, _ = init_env(require_ps=False) - if zones[0].secure_conn is None: + master_zone, _ = init_env(require_ps=False) + if master_zone.secure_conn is None: return SkipTest('secure connection is needed to test topic with secrets') realm = get_realm() @@ -900,12 +900,12 @@ def test_ps_s3_topic_with_secret_on_master(): topic_name = bucket_name + TOPIC_SUFFIX # clean all topics - delete_all_s3_topics(zones[0], zonegroup.name) + delete_all_s3_topics(master_zone, zonegroup.name) # create s3 topics endpoint_address = 'amqp://user:password@127.0.0.1:7001' endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' - bad_topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) + bad_topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) try: result = bad_topic_conf.set_config() except Exception as err: @@ -913,7 +913,7 @@ def test_ps_s3_topic_with_secret_on_master(): else: assert False, 'user password configuration set allowed only over HTTPS' - topic_conf = PSTopicS3(zones[0].secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) + topic_conf = PSTopicS3(master_zone.secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() assert_equal(topic_arn, @@ -940,17 +940,17 @@ def test_ps_s3_topic_with_secret_on_master(): def test_ps_s3_notification_on_master(): """ test s3 notification set/get/delete on master """ - zones, _ = init_env(require_ps=False) + master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() bucket_name = gen_bucket_name() # create bucket - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX # create s3 topic endpoint_address = 'amqp://127.0.0.1:7001' endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' - topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) + topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX @@ -966,7 +966,7 @@ def test_ps_s3_notification_on_master(): 'TopicArn': topic_arn, 'Events': [] }] - s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) @@ -996,7 +996,7 @@ def test_ps_s3_notification_on_master(): # cleanup topic_conf.del_config() # delete the bucket - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) def ps_s3_notification_filter(on_master): @@ -1008,18 +1008,18 @@ def ps_s3_notification_filter(on_master): if proc is None: return SkipTest('end2end amqp tests require rabbitmq-server installed') if on_master: - zones, _ = init_env(require_ps=False) - ps_zone = zones[0] + master_zone, _ = init_env(require_ps=False) + ps_zone = master_zone else: - zones, ps_zones = init_env(require_ps=True) - ps_zone = ps_zones[0] + master_zone, ps_zone = init_env(require_ps=True) + ps_zone = ps_zone realm = get_realm() zonegroup = realm.master_zonegroup() # create bucket bucket_name = gen_bucket_name() - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX # start amqp receivers @@ -1153,7 +1153,7 @@ def ps_s3_notification_filter(on_master): print('wait for 5sec for the messages...') time.sleep(5) else: - zone_bucket_checkpoint(ps_zone.zone, zones[0].zone, bucket_name) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) found_in1 = [] found_in2 = [] @@ -1188,7 +1188,7 @@ def ps_s3_notification_filter(on_master): # delete the bucket for key in bucket.list(): key.delete() - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) stop_amqp_receiver(receiver, task) clean_rabbitmq(proc) @@ -1203,17 +1203,17 @@ def test_ps_s3_notification_filter(): def test_ps_s3_notification_errors_on_master(): """ test s3 notification set/get/delete on master """ - zones, _ = init_env(require_ps=False) + master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() bucket_name = gen_bucket_name() # create bucket - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX # create s3 topic endpoint_address = 'amqp://127.0.0.1:7001' endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' - topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) + topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification with invalid event name @@ -1222,7 +1222,7 @@ def test_ps_s3_notification_errors_on_master(): 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:Kaboom'] }] - s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) try: result, status = s3_notification_conf.set_config() except Exception as error: @@ -1235,7 +1235,7 @@ def test_ps_s3_notification_errors_on_master(): 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:Put'] }] - s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) try: _, _ = s3_notification_conf.set_config() except Exception as error: @@ -1249,7 +1249,7 @@ def test_ps_s3_notification_errors_on_master(): 'TopicArn': invalid_topic_arn, 'Events': ['s3:ObjectCreated:Put'] }] - s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) try: _, _ = s3_notification_conf.set_config() except Exception as error: @@ -1263,7 +1263,7 @@ def test_ps_s3_notification_errors_on_master(): 'TopicArn': invalid_topic_arn , 'Events': ['s3:ObjectCreated:Put'] }] - s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) try: _, _ = s3_notification_conf.set_config() except Exception as error: @@ -1276,7 +1276,7 @@ def test_ps_s3_notification_errors_on_master(): 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:Put'] }] - s3_notification_conf = PSNotificationS3(zones[0].conn, 'kaboom', topic_conf_list) + s3_notification_conf = PSNotificationS3(master_zone.conn, 'kaboom', topic_conf_list) try: _, _ = s3_notification_conf.set_config() except Exception as error: @@ -1295,16 +1295,16 @@ def test_ps_s3_notification_errors_on_master(): # cleanup # delete the bucket - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) def test_objcet_timing(): return SkipTest("only used in manual testing") - zones, _ = init_env(require_ps=False) + master_zone, _ = init_env(require_ps=False) # create bucket bucket_name = gen_bucket_name() - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) # create objects in the bucket (async) print('creating objects...') number_of_objects = 1000 @@ -1336,7 +1336,7 @@ def test_objcet_timing(): print('average time for object deletion: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') # cleanup - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) def test_ps_s3_notification_push_amqp_on_master(): @@ -1347,13 +1347,13 @@ def test_ps_s3_notification_push_amqp_on_master(): proc = init_rabbitmq() if proc is None: return SkipTest('end2end amqp tests require rabbitmq-server installed') - zones, _ = init_env(require_ps=False) + master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() # create bucket bucket_name = gen_bucket_name() - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) topic_name1 = bucket_name + TOPIC_SUFFIX + '_1' topic_name2 = bucket_name + TOPIC_SUFFIX + '_2' @@ -1368,11 +1368,11 @@ def test_ps_s3_notification_push_amqp_on_master(): endpoint_address = 'amqp://' + hostname # with acks from broker endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' - topic_conf1 = PSTopicS3(zones[0].conn, topic_name1, zonegroup.name, endpoint_args=endpoint_args) + topic_conf1 = PSTopicS3(master_zone.conn, topic_name1, zonegroup.name, endpoint_args=endpoint_args) topic_arn1 = topic_conf1.set_config() # without acks from broker endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=none' - topic_conf2 = PSTopicS3(zones[0].conn, topic_name2, zonegroup.name, endpoint_args=endpoint_args) + topic_conf2 = PSTopicS3(master_zone.conn, topic_name2, zonegroup.name, endpoint_args=endpoint_args) topic_arn2 = topic_conf2.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX @@ -1383,7 +1383,7 @@ def test_ps_s3_notification_push_amqp_on_master(): 'Events': ['s3:ObjectCreated:*'] }] - s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) @@ -1444,7 +1444,7 @@ def test_ps_s3_notification_push_amqp_on_master(): topic_conf1.del_config() topic_conf2.del_config() # delete the bucket - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) clean_rabbitmq(proc) @@ -1456,15 +1456,15 @@ def test_ps_s3_notification_push_kafka(): if kafka_proc is None or zk_proc is None: return SkipTest('end2end kafka tests require kafka/zookeeper installed') - zones, ps_zones = init_env(require_ps=True) + master_zone, ps_zone = init_env() realm = get_realm() zonegroup = realm.master_zonegroup() # create bucket bucket_name = gen_bucket_name() - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_meta_checkpoint(ps_zone.zone) # name is constant for manual testing topic_name = bucket_name+'_topic' # create consumer on the topic @@ -1472,7 +1472,7 @@ def test_ps_s3_notification_push_kafka(): task.start() # create topic - topic_conf = PSTopic(ps_zones[0].conn, topic_name, + topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint='kafka://' + kafka_server, endpoint_args='kafka-ack-level=broker') result, status = topic_conf.set_config() @@ -1485,7 +1485,7 @@ def test_ps_s3_notification_push_kafka(): 'Events': [] }] - s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) @@ -1501,7 +1501,7 @@ def test_ps_s3_notification_push_kafka(): [thr.join() for thr in client_threads] # wait for sync - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) keys = list(bucket.list()) receiver.verify_s3_events(keys, exact_match=True) @@ -1513,14 +1513,14 @@ def test_ps_s3_notification_push_kafka(): client_threads.append(thr) [thr.join() for thr in client_threads] - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) receiver.verify_s3_events(keys, exact_match=True, deletions=True) # cleanup s3_notification_conf.del_config() topic_conf.del_config() # delete the bucket - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) stop_kafka_receiver(receiver, task) clean_kafka(kafka_proc, zk_proc, kafka_log) @@ -1532,13 +1532,13 @@ def test_ps_s3_notification_push_kafka_on_master(): kafka_proc, zk_proc, kafka_log = init_kafka() if kafka_proc is None or zk_proc is None: return SkipTest('end2end kafka tests require kafka/zookeeper installed') - zones, _ = init_env(require_ps=False) + master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() # create bucket bucket_name = gen_bucket_name() - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) # name is constant for manual testing topic_name = bucket_name+'_topic' # create consumer on the topic @@ -1549,10 +1549,10 @@ def test_ps_s3_notification_push_kafka_on_master(): endpoint_address = 'kafka://' + kafka_server # without acks from broker endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker' - topic_conf1 = PSTopicS3(zones[0].conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args) + topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args) topic_arn1 = topic_conf1.set_config() endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none' - topic_conf2 = PSTopicS3(zones[0].conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args) + topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args) topic_arn2 = topic_conf2.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX @@ -1563,7 +1563,7 @@ def test_ps_s3_notification_push_kafka_on_master(): 'Events': [] }] - s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) @@ -1608,7 +1608,7 @@ def test_ps_s3_notification_push_kafka_on_master(): topic_conf1.del_config() topic_conf2.del_config() # delete the bucket - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) stop_kafka_receiver(receiver, task) clean_kafka(kafka_proc, zk_proc, kafka_log) @@ -1617,8 +1617,8 @@ def kafka_security(security_type): """ test pushing kafka s3 notification on master """ if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") - zones, _ = init_env(require_ps=False) - if security_type == 'SSL_SASL' and zones[0].secure_conn is None: + master_zone, _ = init_env(require_ps=False) + if security_type == 'SSL_SASL' and master_zone.secure_conn is None: return SkipTest("secure connection is needed to test SASL_SSL security") kafka_proc, zk_proc, kafka_log = init_kafka() if kafka_proc is None or zk_proc is None: @@ -1628,7 +1628,7 @@ def kafka_security(security_type): # create bucket bucket_name = gen_bucket_name() - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) # name is constant for manual testing topic_name = bucket_name+'_topic' # create consumer on the topic @@ -1648,9 +1648,9 @@ def kafka_security(security_type): endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=none&use-ssl=true&ca-location='+KAFKA_DIR+'rootCA.crt' if security_type == 'SSL_SASL': - topic_conf = PSTopicS3(zones[0].secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) + topic_conf = PSTopicS3(master_zone.secure_conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) else: - topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) + topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification @@ -1659,7 +1659,7 @@ def kafka_security(security_type): 'Events': [] }] - s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) s3_notification_conf.set_config() # create objects in the bucket (async) @@ -1707,7 +1707,7 @@ def kafka_security(security_type): # delete the bucket for key in bucket.list(): key.delete() - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) stop_kafka_receiver(receiver, task) clean_kafka(kafka_proc, zk_proc, kafka_log) @@ -1724,7 +1724,7 @@ def test_ps_s3_notification_push_http_on_master(): if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") hostname = get_ip() - zones, _ = init_env(require_ps=False) + master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() @@ -1737,13 +1737,13 @@ def test_ps_s3_notification_push_http_on_master(): # create bucket bucket_name = gen_bucket_name() - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX # create s3 topic endpoint_address = 'http://'+host+':'+str(port) endpoint_args = 'push-endpoint='+endpoint_address - topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) + topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX @@ -1751,7 +1751,7 @@ def test_ps_s3_notification_push_http_on_master(): 'TopicArn': topic_arn, 'Events': [] }] - s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) @@ -1799,7 +1799,7 @@ def test_ps_s3_notification_push_http_on_master(): topic_conf.del_config() s3_notification_conf.del_config(notification=notification_name) # delete the bucket - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) http_server.close() @@ -1808,7 +1808,7 @@ def test_ps_s3_opaque_data(): if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") hostname = get_ip() - zones, ps_zones = init_env(require_ps=True) + master_zone, ps_zone = init_env() realm = get_realm() zonegroup = realm.master_zonegroup() @@ -1821,16 +1821,16 @@ def test_ps_s3_opaque_data(): # create bucket bucket_name = gen_bucket_name() - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_meta_checkpoint(ps_zone.zone) # create s3 topic endpoint_address = 'http://'+host+':'+str(port) opaque_data = 'http://1.2.3.4:8888' endpoint_args = 'push-endpoint='+endpoint_address+'&OpaqueData='+opaque_data - topic_conf = PSTopic(ps_zones[0].conn, topic_name, endpoint=endpoint_address, endpoint_args=endpoint_args) + topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint=endpoint_address, endpoint_args=endpoint_args) result, status = topic_conf.set_config() assert_equal(status/100, 2) parsed_result = json.loads(result) @@ -1841,7 +1841,7 @@ def test_ps_s3_opaque_data(): 'TopicArn': topic_arn, 'Events': [] }] - s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) @@ -1856,7 +1856,7 @@ def test_ps_s3_opaque_data(): [thr.join() for thr in client_threads] # wait for sync - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # check http receiver keys = list(bucket.list()) @@ -1872,7 +1872,7 @@ def test_ps_s3_opaque_data(): topic_conf.del_config() s3_notification_conf.del_config(notification=notification_name) # delete the bucket - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) http_server.close() @@ -1881,7 +1881,7 @@ def test_ps_s3_opaque_data_on_master(): if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") hostname = get_ip() - zones, _ = init_env(require_ps=False) + master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() @@ -1894,14 +1894,14 @@ def test_ps_s3_opaque_data_on_master(): # create bucket bucket_name = gen_bucket_name() - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX # create s3 topic endpoint_address = 'http://'+host+':'+str(port) endpoint_args = 'push-endpoint='+endpoint_address opaque_data = 'http://1.2.3.4:8888' - topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args, opaque_data=opaque_data) + topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args, opaque_data=opaque_data) topic_arn = topic_conf.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX @@ -1909,7 +1909,7 @@ def test_ps_s3_opaque_data_on_master(): 'TopicArn': topic_arn, 'Events': [] }] - s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) @@ -1944,19 +1944,19 @@ def test_ps_s3_opaque_data_on_master(): topic_conf.del_config() s3_notification_conf.del_config(notification=notification_name) # delete the bucket - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) http_server.close() def test_ps_topic(): """ test set/get/delete of topic """ - _, ps_zones = init_env() + _, ps_zone = init_env() realm = get_realm() zonegroup = realm.master_zonegroup() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX # create topic - topic_conf = PSTopic(ps_zones[0].conn, topic_name) + topic_conf = PSTopic(ps_zone.conn, topic_name) _, status = topic_conf.set_config() assert_equal(status/100, 2) # get topic @@ -1979,14 +1979,14 @@ def test_ps_topic(): def test_ps_topic_with_endpoint(): """ test set topic with endpoint""" - _, ps_zones = init_env() + _, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX # create topic dest_endpoint = 'amqp://localhost:7001' dest_args = 'amqp-exchange=amqp.direct&amqp-ack-level=none' - topic_conf = PSTopic(ps_zones[0].conn, topic_name, + topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint=dest_endpoint, endpoint_args=dest_args) _, status = topic_conf.set_config() @@ -2003,19 +2003,19 @@ def test_ps_topic_with_endpoint(): def test_ps_notification(): """ test set/get/delete of notification """ - zones, ps_zones = init_env() + master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX # create topic - topic_conf = PSTopic(ps_zones[0].conn, topic_name) + topic_conf = PSTopic(ps_zone.conn, topic_name) topic_conf.set_config() # create bucket on the first of the rados zones - zones[0].create_bucket(bucket_name) + master_zone.create_bucket(bucket_name) # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_meta_checkpoint(ps_zone.zone) # create notifications - notification_conf = PSNotification(ps_zones[0].conn, bucket_name, + notification_conf = PSNotification(ps_zone.conn, bucket_name, topic_name) _, status = notification_conf.set_config() assert_equal(status/100, 2) @@ -2036,25 +2036,25 @@ def test_ps_notification(): # cleanup topic_conf.del_config() - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) def test_ps_notification_events(): """ test set/get/delete of notification on specific events""" - zones, ps_zones = init_env() + master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX # create topic - topic_conf = PSTopic(ps_zones[0].conn, topic_name) + topic_conf = PSTopic(ps_zone.conn, topic_name) topic_conf.set_config() # create bucket on the first of the rados zones - zones[0].create_bucket(bucket_name) + master_zone.create_bucket(bucket_name) # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_meta_checkpoint(ps_zone.zone) # create notifications events = "OBJECT_CREATE,OBJECT_DELETE" - notification_conf = PSNotification(ps_zones[0].conn, bucket_name, + notification_conf = PSNotification(ps_zone.conn, bucket_name, topic_name, events) _, status = notification_conf.set_config() @@ -2071,29 +2071,29 @@ def test_ps_notification_events(): # cleanup notification_conf.del_config() topic_conf.del_config() - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) def test_ps_subscription(): """ test set/get/delete of subscription """ - zones, ps_zones = init_env() + master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX # create topic - topic_conf = PSTopic(ps_zones[0].conn, topic_name) + topic_conf = PSTopic(ps_zone.conn, topic_name) topic_conf.set_config() # create bucket on the first of the rados zones - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_meta_checkpoint(ps_zone.zone) # create notifications - notification_conf = PSNotification(ps_zones[0].conn, bucket_name, + notification_conf = PSNotification(ps_zone.conn, bucket_name, topic_name) _, status = notification_conf.set_config() assert_equal(status/100, 2) # create subscription - sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX, + sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX, topic_name) _, status = sub_conf.set_config() assert_equal(status/100, 2) @@ -2107,7 +2107,7 @@ def test_ps_subscription(): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # get the create events from the subscription result, _ = sub_conf.get_events() @@ -2121,13 +2121,12 @@ def test_ps_subscription(): for key in bucket.list(): key.delete() # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # get the delete events from the subscriptions - result, _ = sub_conf.get_events() - for event in events['events']: - log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') + #result, _ = sub_conf.get_events() + #for event in events['events']: + # log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"') # TODO: check deletions # TODO: use exact match # verify_events_by_elements(events, keys, exact_match=False, deletions=True) @@ -2144,58 +2143,58 @@ def test_ps_subscription(): # cleanup notification_conf.del_config() topic_conf.del_config() - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) def test_ps_event_type_subscription(): """ test subscriptions for different events """ - zones, ps_zones = init_env() + master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() # create topic for objects creation topic_create_name = bucket_name+TOPIC_SUFFIX+'_create' - topic_create_conf = PSTopic(ps_zones[0].conn, topic_create_name) + topic_create_conf = PSTopic(ps_zone.conn, topic_create_name) topic_create_conf.set_config() # create topic for objects deletion topic_delete_name = bucket_name+TOPIC_SUFFIX+'_delete' - topic_delete_conf = PSTopic(ps_zones[0].conn, topic_delete_name) + topic_delete_conf = PSTopic(ps_zone.conn, topic_delete_name) topic_delete_conf.set_config() # create topic for all events topic_name = bucket_name+TOPIC_SUFFIX+'_all' - topic_conf = PSTopic(ps_zones[0].conn, topic_name) + topic_conf = PSTopic(ps_zone.conn, topic_name) topic_conf.set_config() # create bucket on the first of the rados zones - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_meta_checkpoint(ps_zone.zone) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # create notifications for objects creation - notification_create_conf = PSNotification(ps_zones[0].conn, bucket_name, + notification_create_conf = PSNotification(ps_zone.conn, bucket_name, topic_create_name, "OBJECT_CREATE") _, status = notification_create_conf.set_config() assert_equal(status/100, 2) # create notifications for objects deletion - notification_delete_conf = PSNotification(ps_zones[0].conn, bucket_name, + notification_delete_conf = PSNotification(ps_zone.conn, bucket_name, topic_delete_name, "OBJECT_DELETE") _, status = notification_delete_conf.set_config() assert_equal(status/100, 2) # create notifications for all events - notification_conf = PSNotification(ps_zones[0].conn, bucket_name, + notification_conf = PSNotification(ps_zone.conn, bucket_name, topic_name, "OBJECT_DELETE,OBJECT_CREATE") _, status = notification_conf.set_config() assert_equal(status/100, 2) # create subscription for objects creation - sub_create_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_create', + sub_create_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_create', topic_create_name) _, status = sub_create_conf.set_config() assert_equal(status/100, 2) # create subscription for objects deletion - sub_delete_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_delete', + sub_delete_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_delete', topic_delete_name) _, status = sub_delete_conf.set_config() assert_equal(status/100, 2) # create subscription for all events - sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_all', + sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_all', topic_name) _, status = sub_conf.set_config() assert_equal(status/100, 2) @@ -2205,7 +2204,7 @@ def test_ps_event_type_subscription(): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # get the events from the creation subscription result, _ = sub_create_conf.get_events() @@ -2235,7 +2234,7 @@ def test_ps_event_type_subscription(): for key in bucket.list(): key.delete() # wait for sync - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) log.debug("Event (OBJECT_DELETE) synced") # get the events from the creations subscription @@ -2283,29 +2282,29 @@ def test_ps_event_type_subscription(): topic_create_conf.del_config() topic_delete_conf.del_config() topic_conf.del_config() - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) def test_ps_event_fetching(): """ test incremental fetching of events from a subscription """ - zones, ps_zones = init_env() + master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX # create topic - topic_conf = PSTopic(ps_zones[0].conn, topic_name) + topic_conf = PSTopic(ps_zone.conn, topic_name) topic_conf.set_config() # create bucket on the first of the rados zones - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_meta_checkpoint(ps_zone.zone) # create notifications - notification_conf = PSNotification(ps_zones[0].conn, bucket_name, + notification_conf = PSNotification(ps_zone.conn, bucket_name, topic_name) _, status = notification_conf.set_config() assert_equal(status/100, 2) # create subscription - sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX, + sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX, topic_name) _, status = sub_conf.set_config() assert_equal(status/100, 2) @@ -2315,7 +2314,7 @@ def test_ps_event_fetching(): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) max_events = 15 total_events_count = 0 next_marker = None @@ -2341,29 +2340,29 @@ def test_ps_event_fetching(): topic_conf.del_config() for key in bucket.list(): key.delete() - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) def test_ps_event_acking(): """ test acking of some events in a subscription """ - zones, ps_zones = init_env() + master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX # create topic - topic_conf = PSTopic(ps_zones[0].conn, topic_name) + topic_conf = PSTopic(ps_zone.conn, topic_name) topic_conf.set_config() # create bucket on the first of the rados zones - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_meta_checkpoint(ps_zone.zone) # create notifications - notification_conf = PSNotification(ps_zones[0].conn, bucket_name, + notification_conf = PSNotification(ps_zone.conn, bucket_name, topic_name) _, status = notification_conf.set_config() assert_equal(status/100, 2) # create subscription - sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX, + sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX, topic_name) _, status = sub_conf.set_config() assert_equal(status/100, 2) @@ -2373,7 +2372,7 @@ def test_ps_event_acking(): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # get the create events from the subscription result, _ = sub_conf.get_events() @@ -2406,29 +2405,29 @@ def test_ps_event_acking(): topic_conf.del_config() for key in bucket.list(): key.delete() - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) def test_ps_creation_triggers(): """ test object creation notifications in using put/copy/post """ - zones, ps_zones = init_env() + master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX # create topic - topic_conf = PSTopic(ps_zones[0].conn, topic_name) + topic_conf = PSTopic(ps_zone.conn, topic_name) topic_conf.set_config() # create bucket on the first of the rados zones - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_meta_checkpoint(ps_zone.zone) # create notifications - notification_conf = PSNotification(ps_zones[0].conn, bucket_name, + notification_conf = PSNotification(ps_zone.conn, bucket_name, topic_name) _, status = notification_conf.set_config() assert_equal(status/100, 2) # create subscription - sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX, + sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX, topic_name) _, status = sub_conf.set_config() assert_equal(status/100, 2) @@ -2447,7 +2446,7 @@ def test_ps_creation_triggers(): uploader.complete_upload() fp.close() # wait for sync - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # get the create events from the subscription result, _ = sub_conf.get_events() @@ -2463,7 +2462,7 @@ def test_ps_creation_triggers(): topic_conf.del_config() for key in bucket.list(): key.delete() - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) def test_ps_s3_creation_triggers_on_master(): @@ -2474,13 +2473,13 @@ def test_ps_s3_creation_triggers_on_master(): proc = init_rabbitmq() if proc is None: return SkipTest('end2end amqp tests require rabbitmq-server installed') - zones, _ = init_env(require_ps=False) + master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() # create bucket bucket_name = gen_bucket_name() - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX # start amqp receiver @@ -2491,7 +2490,7 @@ def test_ps_s3_creation_triggers_on_master(): # create s3 topic endpoint_address = 'amqp://' + hostname endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' - topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) + topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX @@ -2499,7 +2498,7 @@ def test_ps_s3_creation_triggers_on_master(): 'Events': ['s3:ObjectCreated:Put', 's3:ObjectCreated:Copy'] }] - s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) @@ -2532,7 +2531,7 @@ def test_ps_s3_creation_triggers_on_master(): for key in bucket.list(): key.delete() # delete the bucket - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) clean_rabbitmq(proc) @@ -2544,13 +2543,13 @@ def test_ps_s3_multipart_on_master(): proc = init_rabbitmq() if proc is None: return SkipTest('end2end amqp tests require rabbitmq-server installed') - zones, _ = init_env(require_ps=False) + master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() # create bucket bucket_name = gen_bucket_name() - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX # start amqp receivers @@ -2565,11 +2564,11 @@ def test_ps_s3_multipart_on_master(): # create s3 topics endpoint_address = 'amqp://' + hostname endpoint_args = 'push-endpoint=' + endpoint_address + '&amqp-exchange=' + exchange + '&amqp-ack-level=broker' - topic_conf1 = PSTopicS3(zones[0].conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args) + topic_conf1 = PSTopicS3(master_zone.conn, topic_name+'_1', zonegroup.name, endpoint_args=endpoint_args) topic_arn1 = topic_conf1.set_config() - topic_conf2 = PSTopicS3(zones[0].conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args) + topic_conf2 = PSTopicS3(master_zone.conn, topic_name+'_2', zonegroup.name, endpoint_args=endpoint_args) topic_arn2 = topic_conf2.set_config() - topic_conf3 = PSTopicS3(zones[0].conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args) + topic_conf3 = PSTopicS3(master_zone.conn, topic_name+'_3', zonegroup.name, endpoint_args=endpoint_args) topic_arn3 = topic_conf3.set_config() # create s3 notifications @@ -2583,7 +2582,7 @@ def test_ps_s3_multipart_on_master(): {'Id': notification_name+'_3', 'TopicArn': topic_arn3, 'Events': ['s3:ObjectCreated:CompleteMultipartUpload'] }] - s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) @@ -2626,51 +2625,51 @@ def test_ps_s3_multipart_on_master(): for key in bucket.list(): key.delete() # delete the bucket - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) clean_rabbitmq(proc) def test_ps_versioned_deletion(): """ test notification of deletion markers """ - zones, ps_zones = init_env() + master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX # create topics - topic_conf1 = PSTopic(ps_zones[0].conn, topic_name+'_1') + topic_conf1 = PSTopic(ps_zone.conn, topic_name+'_1') _, status = topic_conf1.set_config() assert_equal(status/100, 2) - topic_conf2 = PSTopic(ps_zones[0].conn, topic_name+'_2') + topic_conf2 = PSTopic(ps_zone.conn, topic_name+'_2') _, status = topic_conf2.set_config() assert_equal(status/100, 2) # create bucket on the first of the rados zones - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) bucket.configure_versioning(True) # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_meta_checkpoint(ps_zone.zone) # create notifications event_type1 = 'OBJECT_DELETE' - notification_conf1 = PSNotification(ps_zones[0].conn, bucket_name, + notification_conf1 = PSNotification(ps_zone.conn, bucket_name, topic_name+'_1', event_type1) _, status = notification_conf1.set_config() assert_equal(status/100, 2) event_type2 = 'DELETE_MARKER_CREATE' - notification_conf2 = PSNotification(ps_zones[0].conn, bucket_name, + notification_conf2 = PSNotification(ps_zone.conn, bucket_name, topic_name+'_2', event_type2) _, status = notification_conf2.set_config() assert_equal(status/100, 2) # create subscriptions - sub_conf1 = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_1', + sub_conf1 = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_1', topic_name+'_1') _, status = sub_conf1.set_config() assert_equal(status/100, 2) - sub_conf2 = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX+'_2', + sub_conf2 = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX+'_2', topic_name+'_2') _, status = sub_conf2.set_config() assert_equal(status/100, 2) @@ -2685,7 +2684,7 @@ def test_ps_versioned_deletion(): delete_marker_key = bucket.delete_key(key.name) # wait for sync - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # delete the deletion marker delete_marker_key.delete() @@ -2694,7 +2693,7 @@ def test_ps_versioned_deletion(): bucket.delete_key(key.name, version_id=v1) # wait for sync - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # get the delete events from the subscription result, _ = sub_conf1.get_events() @@ -2717,7 +2716,7 @@ def test_ps_versioned_deletion(): zonegroup_conns = ZonegroupConns(zonegroup) try: zonegroup_bucket_checkpoint(zonegroup_conns, bucket_name) - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) except: log.debug('zonegroup_bucket_checkpoint failed, cannot delete bucket') sub_conf1.del_config() @@ -2736,13 +2735,13 @@ def test_ps_s3_metadata_on_master(): proc = init_rabbitmq() if proc is None: return SkipTest('end2end amqp tests require rabbitmq-server installed') - zones, _ = init_env(require_ps=False) + master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() # create bucket bucket_name = gen_bucket_name() - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX # start amqp receiver @@ -2753,7 +2752,7 @@ def test_ps_s3_metadata_on_master(): # create s3 topic endpoint_address = 'amqp://' + hostname endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' - topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) + topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX @@ -2769,7 +2768,7 @@ def test_ps_s3_metadata_on_master(): } }] - s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) @@ -2825,7 +2824,7 @@ def test_ps_s3_metadata_on_master(): s3_notification_conf.del_config() topic_conf.del_config() # delete the bucket - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) clean_rabbitmq(proc) @@ -2837,13 +2836,13 @@ def test_ps_s3_tags_on_master(): proc = init_rabbitmq() if proc is None: return SkipTest('end2end amqp tests require rabbitmq-server installed') - zones, _ = init_env(require_ps=False) + master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() # create bucket bucket_name = gen_bucket_name() - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) topic_name = bucket_name + TOPIC_SUFFIX # start amqp receiver @@ -2854,7 +2853,7 @@ def test_ps_s3_tags_on_master(): # create s3 topic endpoint_address = 'amqp://' + hostname endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' - topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) + topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX @@ -2867,17 +2866,17 @@ def test_ps_s3_tags_on_master(): } }] - s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket with tags tags = 'hello=world&ka=boom' key_name1 = 'key1' - put_object_tagging(zones[0].conn, bucket_name, key_name1, tags) + put_object_tagging(master_zone.conn, bucket_name, key_name1, tags) tags = 'foo=bar&ka=boom' key_name2 = 'key2' - put_object_tagging(zones[0].conn, bucket_name, key_name2, tags) + put_object_tagging(master_zone.conn, bucket_name, key_name2, tags) key_name3 = 'key3' key = bucket.new_key(key_name3) key.set_contents_from_string('bar') @@ -2888,7 +2887,7 @@ def test_ps_s3_tags_on_master(): expected_tags = [{'val': 'world', 'key': 'hello'}, {'val': 'boom', 'key': 'ka'}] # check amqp receiver for event in receiver.get_and_reset_events(): - obj_tags = event['s3']['object']['tags'] + obj_tags = event['Records'][0]['s3']['object']['tags'] assert_equal(obj_tags[0], expected_tags[0]) # delete the objects @@ -2898,7 +2897,7 @@ def test_ps_s3_tags_on_master(): time.sleep(5) # check amqp receiver for event in receiver.get_and_reset_events(): - obj_tags = event['s3']['object']['tags'] + obj_tags = event['Records'][0]['s3']['object']['tags'] assert_equal(obj_tags[0], expected_tags[0]) # cleanup @@ -2906,7 +2905,7 @@ def test_ps_s3_tags_on_master(): s3_notification_conf.del_config() topic_conf.del_config() # delete the bucket - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) clean_rabbitmq(proc) @@ -2918,13 +2917,13 @@ def test_ps_s3_versioned_deletion_on_master(): proc = init_rabbitmq() if proc is None: return SkipTest('end2end amqp tests require rabbitmq-server installed') - zones, _ = init_env(require_ps=False) + master_zone, _ = init_env(require_ps=False) realm = get_realm() zonegroup = realm.master_zonegroup() # create bucket bucket_name = gen_bucket_name() - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) bucket.configure_versioning(True) topic_name = bucket_name + TOPIC_SUFFIX @@ -2936,7 +2935,7 @@ def test_ps_s3_versioned_deletion_on_master(): # create s3 topic endpoint_address = 'amqp://' + hostname endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=' + exchange +'&amqp-ack-level=broker' - topic_conf = PSTopicS3(zones[0].conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) + topic_conf = PSTopicS3(master_zone.conn, topic_name, zonegroup.name, endpoint_args=endpoint_args) topic_arn = topic_conf.set_config() # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX @@ -2950,7 +2949,7 @@ def test_ps_s3_versioned_deletion_on_master(): {'Id': notification_name+'_3', 'TopicArn': topic_arn, 'Events': ['s3:ObjectRemoved:Delete'] }] - s3_notification_conf = PSNotificationS3(zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(master_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) @@ -2998,7 +2997,7 @@ def test_ps_s3_versioned_deletion_on_master(): s3_notification_conf.del_config() topic_conf.del_config() # delete the bucket - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) clean_rabbitmq(proc) @@ -3006,7 +3005,7 @@ def test_ps_push_http(): """ test pushing to http endpoint """ if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") - zones, ps_zones = init_env() + master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX @@ -3017,20 +3016,20 @@ def test_ps_push_http(): http_server = StreamingHTTPServer(host, port) # create topic - topic_conf = PSTopic(ps_zones[0].conn, topic_name) + topic_conf = PSTopic(ps_zone.conn, topic_name) _, status = topic_conf.set_config() assert_equal(status/100, 2) # create bucket on the first of the rados zones - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_meta_checkpoint(ps_zone.zone) # create notifications - notification_conf = PSNotification(ps_zones[0].conn, bucket_name, + notification_conf = PSNotification(ps_zone.conn, bucket_name, topic_name) _, status = notification_conf.set_config() assert_equal(status/100, 2) # create subscription - sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX, + sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX, topic_name, endpoint='http://'+host+':'+str(port)) _, status = sub_conf.set_config() assert_equal(status/100, 2) @@ -3040,7 +3039,7 @@ def test_ps_push_http(): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # check http server keys = list(bucket.list()) # TODO: use exact match @@ -3050,8 +3049,8 @@ def test_ps_push_http(): for key in bucket.list(): key.delete() # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_meta_checkpoint(ps_zone.zone) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # check http server # TODO: use exact match http_server.verify_events(keys, deletions=True, exact_match=False) @@ -3060,7 +3059,7 @@ def test_ps_push_http(): sub_conf.del_config() notification_conf.del_config() topic_conf.del_config() - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) http_server.close() @@ -3068,7 +3067,7 @@ def test_ps_s3_push_http(): """ test pushing to http endpoint s3 record format""" if skip_push_tests: return SkipTest("PubSub push tests don't run in teuthology") - zones, ps_zones = init_env() + master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX @@ -3079,23 +3078,23 @@ def test_ps_s3_push_http(): http_server = StreamingHTTPServer(host, port) # create topic - topic_conf = PSTopic(ps_zones[0].conn, topic_name, + topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint='http://'+host+':'+str(port)) result, status = topic_conf.set_config() assert_equal(status/100, 2) parsed_result = json.loads(result) topic_arn = parsed_result['arn'] # create bucket on the first of the rados zones - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_meta_checkpoint(ps_zone.zone) # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'] }] - s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket @@ -3104,7 +3103,7 @@ def test_ps_s3_push_http(): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # check http server keys = list(bucket.list()) # TODO: use exact match @@ -3114,8 +3113,8 @@ def test_ps_s3_push_http(): for key in bucket.list(): key.delete() # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_meta_checkpoint(ps_zone.zone) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # check http server # TODO: use exact match http_server.verify_s3_events(keys, deletions=True, exact_match=False) @@ -3123,7 +3122,7 @@ def test_ps_s3_push_http(): # cleanup s3_notification_conf.del_config() topic_conf.del_config() - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) http_server.close() @@ -3135,7 +3134,7 @@ def test_ps_push_amqp(): proc = init_rabbitmq() if proc is None: return SkipTest('end2end amqp tests require rabbitmq-server installed') - zones, ps_zones = init_env() + master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX @@ -3143,20 +3142,20 @@ def test_ps_push_amqp(): exchange = 'ex1' task, receiver = create_amqp_receiver_thread(exchange, topic_name) task.start() - topic_conf = PSTopic(ps_zones[0].conn, topic_name) + topic_conf = PSTopic(ps_zone.conn, topic_name) _, status = topic_conf.set_config() assert_equal(status/100, 2) # create bucket on the first of the rados zones - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_meta_checkpoint(ps_zone.zone) # create notifications - notification_conf = PSNotification(ps_zones[0].conn, bucket_name, + notification_conf = PSNotification(ps_zone.conn, bucket_name, topic_name) _, status = notification_conf.set_config() assert_equal(status/100, 2) # create subscription - sub_conf = PSSubscription(ps_zones[0].conn, bucket_name+SUB_SUFFIX, + sub_conf = PSSubscription(ps_zone.conn, bucket_name+SUB_SUFFIX, topic_name, endpoint='amqp://'+hostname, endpoint_args='amqp-exchange='+exchange+'&amqp-ack-level=broker') _, status = sub_conf.set_config() @@ -3167,7 +3166,7 @@ def test_ps_push_amqp(): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # check amqp receiver keys = list(bucket.list()) # TODO: use exact match @@ -3177,8 +3176,8 @@ def test_ps_push_amqp(): for key in bucket.list(): key.delete() # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_meta_checkpoint(ps_zone.zone) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # check amqp receiver # TODO: use exact match receiver.verify_events(keys, deletions=True, exact_match=False) @@ -3188,7 +3187,7 @@ def test_ps_push_amqp(): sub_conf.del_config() notification_conf.del_config() topic_conf.del_config() - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) clean_rabbitmq(proc) @@ -3200,7 +3199,7 @@ def test_ps_s3_push_amqp(): proc = init_rabbitmq() if proc is None: return SkipTest('end2end amqp tests require rabbitmq-server installed') - zones, ps_zones = init_env() + master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX @@ -3208,7 +3207,7 @@ def test_ps_s3_push_amqp(): exchange = 'ex1' task, receiver = create_amqp_receiver_thread(exchange, topic_name) task.start() - topic_conf = PSTopic(ps_zones[0].conn, topic_name, + topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint='amqp://' + hostname, endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none') result, status = topic_conf.set_config() @@ -3216,16 +3215,16 @@ def test_ps_s3_push_amqp(): parsed_result = json.loads(result) topic_arn = parsed_result['arn'] # create bucket on the first of the rados zones - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_meta_checkpoint(ps_zone.zone) # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'] }] - s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket @@ -3234,7 +3233,7 @@ def test_ps_s3_push_amqp(): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # check amqp receiver keys = list(bucket.list()) # TODO: use exact match @@ -3244,8 +3243,8 @@ def test_ps_s3_push_amqp(): for key in bucket.list(): key.delete() # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_meta_checkpoint(ps_zone.zone) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # check amqp receiver # TODO: use exact match receiver.verify_s3_events(keys, deletions=True, exact_match=False) @@ -3254,22 +3253,22 @@ def test_ps_s3_push_amqp(): stop_amqp_receiver(receiver, task) s3_notification_conf.del_config() topic_conf.del_config() - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) clean_rabbitmq(proc) def test_ps_delete_bucket(): """ test notification status upon bucket deletion """ - zones, ps_zones = init_env() + master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() # create bucket on the first of the rados zones - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_meta_checkpoint(ps_zone.zone) topic_name = bucket_name + TOPIC_SUFFIX # create topic topic_name = bucket_name + TOPIC_SUFFIX - topic_conf = PSTopic(ps_zones[0].conn, topic_name) + topic_conf = PSTopic(ps_zone.conn, topic_name) response, status = topic_conf.set_config() assert_equal(status/100, 2) parsed_result = json.loads(response) @@ -3280,12 +3279,12 @@ def test_ps_delete_bucket(): 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*'] }] - s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create non-s3 notification - notification_conf = PSNotification(ps_zones[0].conn, bucket_name, + notification_conf = PSNotification(ps_zone.conn, bucket_name, topic_name) _, status = notification_conf.set_config() assert_equal(status/100, 2) @@ -3296,20 +3295,20 @@ def test_ps_delete_bucket(): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for bucket sync - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) keys = list(bucket.list()) # delete objects from the bucket for key in bucket.list(): key.delete() # wait for bucket sync - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # delete the bucket - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) # wait for meta sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_meta_checkpoint(ps_zone.zone) # get the events from the auto-generated subscription - sub_conf = PSSubscription(ps_zones[0].conn, notification_name, + sub_conf = PSSubscription(ps_zone.conn, notification_name, topic_name) result, _ = sub_conf.get_events() records = json.loads(result) @@ -3329,14 +3328,14 @@ def test_ps_delete_bucket(): def test_ps_missing_topic(): """ test creating a subscription when no topic info exists""" - zones, ps_zones = init_env() + master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX # create bucket on the first of the rados zones - zones[0].create_bucket(bucket_name) + master_zone.create_bucket(bucket_name) # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_meta_checkpoint(ps_zone.zone) # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_arn = 'arn:aws:sns:::' + topic_name @@ -3344,7 +3343,7 @@ def test_ps_missing_topic(): 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*'] }] - s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) try: s3_notification_conf.set_config() except: @@ -3353,7 +3352,7 @@ def test_ps_missing_topic(): assert 'missing topic is expected' # cleanup - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) def test_ps_s3_topic_update(): @@ -3363,7 +3362,7 @@ def test_ps_s3_topic_update(): rabbit_proc = init_rabbitmq() if rabbit_proc is None: return SkipTest('end2end amqp tests require rabbitmq-server installed') - zones, ps_zones = init_env() + master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name = bucket_name+TOPIC_SUFFIX @@ -3372,7 +3371,7 @@ def test_ps_s3_topic_update(): exchange = 'ex1' amqp_task, receiver = create_amqp_receiver_thread(exchange, topic_name) amqp_task.start() - topic_conf = PSTopic(ps_zones[0].conn, topic_name, + topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint='amqp://' + hostname, endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none') result, status = topic_conf.set_config() @@ -3392,16 +3391,16 @@ def test_ps_s3_topic_update(): http_server = StreamingHTTPServer(hostname, port) # create bucket on the first of the rados zones - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_meta_checkpoint(ps_zone.zone) # create s3 notification notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*'] }] - s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket @@ -3410,14 +3409,14 @@ def test_ps_s3_topic_update(): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) keys = list(bucket.list()) # TODO: use exact match receiver.verify_s3_events(keys, exact_match=False) # update the same topic with new endpoint - topic_conf = PSTopic(ps_zones[0].conn, topic_name, + topic_conf = PSTopic(ps_zone.conn, topic_name, endpoint='http://'+ hostname + ':' + str(port)) _, status = topic_conf.set_config() assert_equal(status/100, 2) @@ -3435,8 +3434,8 @@ def test_ps_s3_topic_update(): key = bucket.new_key(str(i+100)) key.set_contents_from_string('bar') # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_meta_checkpoint(ps_zone.zone) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) keys = list(bucket.list()) # verify that notifications are still sent to amqp @@ -3448,7 +3447,7 @@ def test_ps_s3_topic_update(): 'TopicArn': topic_arn, 'Events': ['s3:ObjectCreated:*'] }] - s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) @@ -3459,8 +3458,8 @@ def test_ps_s3_topic_update(): key = bucket.new_key(str(i+200)) key.set_contents_from_string('bar') # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_meta_checkpoint(ps_zone.zone) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) keys = list(bucket.list()) # check that updates switched to http @@ -3474,7 +3473,7 @@ def test_ps_s3_topic_update(): key.delete() s3_notification_conf.del_config() topic_conf.del_config() - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) http_server.close() clean_rabbitmq(rabbit_proc) @@ -3488,7 +3487,7 @@ def test_ps_s3_notification_update(): if rabbit_proc is None: return SkipTest('end2end amqp tests require rabbitmq-server installed') - zones, ps_zones = init_env() + master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX topic_name2 = bucket_name+'http'+TOPIC_SUFFIX @@ -3503,14 +3502,14 @@ def test_ps_s3_notification_update(): # start an http server in a separate thread http_server = StreamingHTTPServer(hostname, http_port) - topic_conf1 = PSTopic(ps_zones[0].conn, topic_name1, + topic_conf1 = PSTopic(ps_zone.conn, topic_name1, endpoint='amqp://' + hostname, endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none') result, status = topic_conf1.set_config() parsed_result = json.loads(result) topic_arn1 = parsed_result['arn'] assert_equal(status/100, 2) - topic_conf2 = PSTopic(ps_zones[0].conn, topic_name2, + topic_conf2 = PSTopic(ps_zone.conn, topic_name2, endpoint='http://'+hostname+':'+str(http_port)) result, status = topic_conf2.set_config() parsed_result = json.loads(result) @@ -3518,16 +3517,16 @@ def test_ps_s3_notification_update(): assert_equal(status/100, 2) # create bucket on the first of the rados zones - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_meta_checkpoint(ps_zone.zone) # create s3 notification with topic1 notification_name = bucket_name + NOTIFICATION_SUFFIX topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn1, 'Events': ['s3:ObjectCreated:*'] }] - s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) # create objects in the bucket @@ -3536,7 +3535,7 @@ def test_ps_s3_notification_update(): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) keys = list(bucket.list()) # TODO: use exact match @@ -3547,7 +3546,7 @@ def test_ps_s3_notification_update(): 'TopicArn': topic_arn2, 'Events': ['s3:ObjectCreated:*'] }] - s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) @@ -3558,8 +3557,8 @@ def test_ps_s3_notification_update(): key = bucket.new_key(str(i+100)) key.set_contents_from_string('bar') # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_meta_checkpoint(ps_zone.zone) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) keys = list(bucket.list()) # check that updates switched to http @@ -3574,7 +3573,7 @@ def test_ps_s3_notification_update(): s3_notification_conf.del_config() topic_conf1.del_config() topic_conf2.del_config() - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) http_server.close() clean_rabbitmq(rabbit_proc) @@ -3588,7 +3587,7 @@ def test_ps_s3_multiple_topics_notification(): if rabbit_proc is None: return SkipTest('end2end amqp tests require rabbitmq-server installed') - zones, ps_zones = init_env() + master_zone, ps_zone = init_env() bucket_name = gen_bucket_name() topic_name1 = bucket_name+'amqp'+TOPIC_SUFFIX topic_name2 = bucket_name+'http'+TOPIC_SUFFIX @@ -3603,14 +3602,14 @@ def test_ps_s3_multiple_topics_notification(): # start an http server in a separate thread http_server = StreamingHTTPServer(hostname, http_port) - topic_conf1 = PSTopic(ps_zones[0].conn, topic_name1, + topic_conf1 = PSTopic(ps_zone.conn, topic_name1, endpoint='amqp://' + hostname, endpoint_args='amqp-exchange=' + exchange + '&amqp-ack-level=none') result, status = topic_conf1.set_config() parsed_result = json.loads(result) topic_arn1 = parsed_result['arn'] assert_equal(status/100, 2) - topic_conf2 = PSTopic(ps_zones[0].conn, topic_name2, + topic_conf2 = PSTopic(ps_zone.conn, topic_name2, endpoint='http://'+hostname+':'+str(http_port)) result, status = topic_conf2.set_config() parsed_result = json.loads(result) @@ -3618,9 +3617,9 @@ def test_ps_s3_multiple_topics_notification(): assert_equal(status/100, 2) # create bucket on the first of the rados zones - bucket = zones[0].create_bucket(bucket_name) + bucket = master_zone.create_bucket(bucket_name) # wait for sync - zone_meta_checkpoint(ps_zones[0].zone) + zone_meta_checkpoint(ps_zone.zone) # create s3 notification notification_name1 = bucket_name + NOTIFICATION_SUFFIX + '_1' notification_name2 = bucket_name + NOTIFICATION_SUFFIX + '_2' @@ -3635,7 +3634,7 @@ def test_ps_s3_multiple_topics_notification(): 'TopicArn': topic_arn2, 'Events': ['s3:ObjectCreated:*'] }] - s3_notification_conf = PSNotificationS3(ps_zones[0].conn, bucket_name, topic_conf_list) + s3_notification_conf = PSNotificationS3(ps_zone.conn, bucket_name, topic_conf_list) _, status = s3_notification_conf.set_config() assert_equal(status/100, 2) result, _ = s3_notification_conf.get_config() @@ -3644,11 +3643,11 @@ def test_ps_s3_multiple_topics_notification(): assert_equal(result['TopicConfigurations'][1]['Id'], notification_name2) # get auto-generated subscriptions - sub_conf1 = PSSubscription(ps_zones[0].conn, notification_name1, + sub_conf1 = PSSubscription(ps_zone.conn, notification_name1, topic_name1) _, status = sub_conf1.get_config() assert_equal(status/100, 2) - sub_conf2 = PSSubscription(ps_zones[0].conn, notification_name2, + sub_conf2 = PSSubscription(ps_zone.conn, notification_name2, topic_name2) _, status = sub_conf2.get_config() assert_equal(status/100, 2) @@ -3659,7 +3658,7 @@ def test_ps_s3_multiple_topics_notification(): key = bucket.new_key(str(i)) key.set_contents_from_string('bar') # wait for sync - zone_bucket_checkpoint(ps_zones[0].zone, zones[0].zone, bucket_name) + zone_bucket_checkpoint(ps_zone.zone, master_zone.zone, bucket_name) # get the events from both of the subscription result, _ = sub_conf1.get_events() @@ -3688,6 +3687,6 @@ def test_ps_s3_multiple_topics_notification(): # delete objects from the bucket for key in bucket.list(): key.delete() - zones[0].delete_bucket(bucket_name) + master_zone.delete_bucket(bucket_name) http_server.close() clean_rabbitmq(rabbit_proc) -- 2.39.5